1. 简介
当消息从 producer 发送给 broker是,消息丢失了,那么 broker 就不会收到消息,那么后续的持久化机制、消息确认模式也就没有作用。于是,我们需要尽可能地保证 producer 发送地消息成功到达 broker。broker 中包含交换机与队列,于是,我们不仅需要保证消息能够成功到达交换机,也需要成功到达我们所指定的队列。
2. 事务与 publisher cnofirm(发送方确认)机制
对于上面这个问题,RabbitMQ 提出了两种解决策略,即:
- 使用事务
- publisher confirm 机制
由于事务会比较消耗性能,下面就主要讲解 publisher confirm 机制。
3. publisher confirm 机制
在 publisher confirm 机制下,分为两个模式:
- confirm 确认模式
- return 退回模式
这两个模式都需要在配置文件中添加下面的配置项:
spring:
rabbitmq:
publisher-confirm-type: correlated #消息发送确认
3.1 confirm 确认模式
下面是生产者的代码:
@RequestMapping("/confirm")
public String confirm() {
String message = "confirm test";
/**
* 若在此处设置回调函数,那么会影响到所有 rabbitTemplate
* 并且只能发送一次消息,因为发送那个多次消息就相当于设置了多个回调函数,规定只能设置一次
*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData correlation data for the callback.
* @param ack true: 消息到达交换机; false: 消息没有到达交换机
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("接收到消息, 消息 id: " + (null == correlationData ? null : correlationData.getId()));
} else {
System.out.println("未接收到消息, 消息 id: " + (null == correlationData ? null : correlationData.getId()) + ", cause: " + cause);
}
}
});
String id = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(id);
confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, Constants.CONFIRM_ROUTINGKEY, message, correlationData);
return "消息发送成功";
}
上面的代码与在 spring 环境中发送消息的代码类似,但是多出了设置回调函数这一步。
ConfirmCallback 即为回调函数,当消息不论消息是否发哦是那个到了 broker,都会给 producer 发送一个 ack,若 ack 为 true,就表示消息到达了交换机;若 ack 为 false,就表示消息没有到达交换机。我们可以根据 ack 来进行不同的业务操作。
代码运行结果如下:
当连续发送两条消息时的结果如下:
报错信息中显示:一个 RabbitMQTemplate 实例是能设置一次 ConfirmCallback。
但是当我们连续两次调用该接口时,就导致 ConfirmCallback 被创建了两次,也就会报错。对于这种情况,我们可以将 RabbitMQTemplate 提取出来,自定义一个 RabbitMQTemplate,在这个类中通过设置回调函数达到之创建一次就能使用多次的效果,代码如下:
@Configuration
public class RabbitTemplateConfig {
@Bean("normalRabbitTemplate")
public RabbitTemplate normalRabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate1(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData correlation data for the callback.
* @param ack true: 消息到达交换机; false: 消息没有到达交换机
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("接收到消息, 消息 id: " + messageId);
} else {
System.out.println("未接收到消息, 消息 id: " + messageId + ", cause: " + cause);
}
}
});
return rabbitTemplate;
}
}
在上面的配置类中,我们创建了两个方法用来返回 RabbitMQTemplate,这是因为如果我们只创建 confirmRabbitMQTemplate,那么别的接口使用到的 RabbitMQTemplate 就是我们修改后的结果,这时就需要再定义一个 normalRabbitMQTemplate 用来返回没有设置回调函数的 RabbitMQTemplate。这样就不会影响别的接口使用。调用者根据自己的需要注入不同的实例。
修改后的接口如下:
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Resource(name = "normalRabbitTemplate")
private RabbitTemplate rabbitTemplate;
@Resource(name = "confirmRabbitTemplate")
private RabbitTemplate confirmRabbitTemplate;
/**
* 确认模式
*/
@RequestMapping("/confirm")
public String confirm() {
String message = "confirm test";
String id = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(id);
confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, Constants.CONFIRM_ROUTINGKEY, message, correlationData);
return "消息发送成功";
}
}
接下来再连续调用这个接口,观察是否会报错:
这一次代码就没有报错。
当我们使用了未声明的交换机,代码的运行结果如下:
未接收到消息, 消息 id: dcfab6f0-0d9a-470f-9f8e-5b225df71021,
cause: channel error;
protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm.exchangeaaa' in vhost 'extension', class-id=60, method-id=40)
报错信息中表示没有该交换机。
当我们使用了错误的 BindingKey,代码的运行结果如下:
这里居然显示消息发送成功,但是上面的代码使用了错误的 BindingKey,消息只是到了交换机,交换机并没有根据 BindingKey 找到与之绑定的队列,那么消息也就没有到达队列中,即消息发送失败。
但是为什么会出现上面那种情况呢?这就需要使用到退回模式。
3.2 return 退回模式
在确认模式末尾,我们发现,即使消息没有成功到达队列,依然会提示消息发送成功,这是不符合逻辑的。
在 RabbitMQ 中,confirm 确认模式之是保证了 producer 与 交换机之间的消息可靠传输,并没有保证交换机与队列之间的可靠传输。于是,return 退回模式就针对这一缺陷进行补充。
当消息没有成功到达队列后,就会将这条消息退回给 producer,并且携带退回的原因。
退回模式与确认模式并不冲突,二者可以分开,也可以同时存在,下面的代码将二者合并在一起进行编写,代码如下:
@Configuration
public class RabbitTemplateConfig {
/**
*
* @param connectionFactory 会根据配置文件中的 rabbitmq 配置自动填充参数
* @return
*/
@Bean("normalRabbitTemplate")
public RabbitTemplate normalRabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
/**
* 确认模式使用
* 使用 AtomicBoolean, 在消息被退回时打印不同的日志
* @param connectionFactory
* @return
*/
@Bean("confirmRabbitTemplate1")
public RabbitTemplate confirmRabbitTemplate1(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData correlation data for the callback.
* @param ack true: 消息到达交换机; false: 消息没有到达交换机
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String messageId = null == correlationData ? null : correlationData.getId();
if (ack) {
System.out.println("接收到消息, 消息 id: " + messageId);
} else {
System.out.println("未接收到消息, 消息 id: " + messageId + ", cause: " + cause);
}
}
});
/**
* 若消息没有到达队列,就退回
*/
rabbitTemplate.setMandatory(true); //启用强制路由检查
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("消息退退回: " + returned);
}
});
return rabbitTemplate;
}
}
在上面的代码中,使用 setMandatory 开启强制路由检查,当 setMandatory 设置为 true,那么就会进行消息是否成功到达队列的判断,若没有声明为 true,即使编写了 return 模式的代码,也就不会生效。
我们使用 ReturnCallBack 回调函数实现消息没有到达队列后的逻辑。
当我们使用了错误的 BindingKey,代码的运行结果如下:
消息退退回: ReturnedMessage [message=(Body:'confirm test' MessageProperties [headers={spring_returned_message_correlation=f4db7a24-f6aa-499c-8409-65c42605576a}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=confirm.exchange, routingKey=keyaaa]
接收到消息, 消息 id: f4db7a24-f6aa-499c-8409-65c42605576a
这次就识别出了消息没有成功到达队列的情况,但是依然有一个不足:
既然消息没有发送到指定队列,那为什么还会弹出接收到消息的日志呢?
于是我们需要针对上面的代码进行改变,此处不过多进行讲解。