前言
在项目中,引入了RabbitMQ这一中间件,必然也需要在业务中增加对数据安全性的一层考虑,来保证RabbitMQ消息的可靠性,否则一个个消息丢失可能导致整个业务的数据出现不一致等问题,对系统带来巨大的影响,消息的可靠性可以主要在三个方面去考虑:生产者消息确认,消费者消息确认,消息持久化,这篇文件说明生产者消息确认的。
一、消息确认流程图
由图可知,消息确认是分为生产者确认和消费者确认的,生产者和MQ之间的消息确认机制为生产者消息确认,MQ和消费者之间的消息确认机制为消费者消息确认
消息丢失的情景有三种情况:
- 发送消息过程中出现网络问题:生产者以为发送成功,但MQ没有收到;(需要生产者消息确认)
- 接收到消息后由于MQ服务器宕机或重启等原因(消息默认存在内存中)导致消息丢失;(需要消息持久化)
- 消费者接收到消息后处理消息出错,没有完成消息的处理,但是自动返回ack(这时候需要开启手动确认模式,消费者消息确认)
二、生产者消息确认
RabbitMQ提供了publisher confirm机制来避免消息投递到MQ过程中丢失。这种机制下每个message都必须要有一个独一无二的ID,来区分开不同的消息,避免ack(消息确认参数)冲突。每当消息发送到MQ成功后,MQ都会返回一个结果给生产者,以保证生产者消息确认。在生产者消息确认时,又有两种返回结果方式(通常两个都要实现)来确保消息投递可靠性,分别为publisher-confirm和publisher-return,以下作出说明。
1、publisher-confirm(发送者确认)
消息成功投递到交换机,返回ack
消息未投递到交换机,返回nack
2、publisher-return(发送者回执)
消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
三、代码实现
1、配置文件
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
#确认消息已发送到交换机(Exchange)
publisher-confirm-type: correlated
#确认消息已发送到队列(Queue)
publisher-returns: true
publish-confirm-type有三个值,
- none:禁用发布确认模式,是默认值
- simple:同步等待confirm结果,直到超时
- correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-returns:开启消息失败回调,回调函数ReturnCallback
2、配置ConfirmCallback函数和ReturnCallback函数
/**
* 生产者消息回调配置类
*/
@Configuration
@Slf4j
public class ProviderCallBackConfig {
@Resource
private CachingConnectionFactory cachingConnectionFactory;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
// 当mandatory设置为true时,若exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
// 那么broker会调用basic.return方法将消息返还给生产者。当mandatory设置为false时,出现上述情况broker会直接将消息丢弃。
rabbitTemplate.setMandatory(true);
/**
* TODO RabbitMQ发送者消息确认回调,解决消息可靠性问题
* 消息确认回调,确认消息是否到达broker
* data:消息唯一标识
* ack:确认结果
* cause:失败原因
*/
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
log.info("消息唯一标识: {}", data);
log.info("确认状态: {}", ack);
log.info("造成原因: {}", cause);
if (ack) {
//消息发送成功后,更新数据库消息状态等逻辑
log.info("消息发送成功");
} else {
//信息发送失败,打印日志后,可以根据业务选择是否重发消息
log.error("消息发送失败");
}
});
/**
* TODO RabbitMQ发送者消息失败回调,解决消息可靠性问题
* 消息失败回调,比如router不到queue时回调
*/
rabbitTemplate.setReturnsCallback((res) -> {
//若发送失败,打印错误信息,然后可以根据业务选择重发消息
log.error("消息发送queue时失败");
log.error("ReturnCallback: "+"消息:"+res.getMessage());
log.error("ReturnCallback: "+"回应码:"+res.getReplyCode());
log.error("ReturnCallback: "+"回应信息:"+res.getReplyText());
log.error("ReturnCallback: "+"交换机:"+res.getExchange());
log.error("ReturnCallback: "+"路由键:"+res.getRoutingKey());
});
return rabbitTemplate;
}
}
到这里,生产者推送消息的消息确认调用回调函数已经完毕。
可以看到上面写了两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback;
那么以上这两种回调函数都是在什么情况会触发呢?
先从总体的情况分析,推送消息存在3种情况:
①消息推送到server,但是在server里找不到交换机
②消息推送到server,找到交换机了,但是没找到队列
③消息推送成功
①消息推送到server,但是在server里找不到交换机
写个测试接口,把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机是没有创建没有配置的):
@GetMapping("/testProviderMessageBack")
@ApiOperation(value = "测试生产者消息回调")
@ApiOperationSupport(order = 5)
public String testProviderMessageBack() {
CorrelationData data = new CorrelationData();
data.setId("111");
rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", "测试生产者消息回调",data);
return "ok";
}
调用接口,查看项目的控制台输出情况(原因里面有说,没有找到交换机'non-existent-exchange'):
结论: ①这种情况触发的是 ConfirmCallback 回调函数
消息唯一标识: CorrelationData [id=111]
确认状态: false
造成原因: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)
消息发送失败
②消息推送到server,找到交换机了,但是没找到队列
这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在DirectRabitConfig里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作:
@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}
然后写个测试接口,把消息推送到名为‘lonelyDirectExchange’的交换机上(这个交换机是没有任何队列配置的):
@GetMapping("/testProviderMessageBack2")
@ApiOperation(value = "测试生产者消息回调2")
@ApiOperationSupport(order = 6)
public String testProviderMessageBack2() {
CorrelationData data = new CorrelationData();
data.setId("222");
rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestLonelyDirectRouting", "测试生产者消息回调2",data);
return "ok";
}
消息唯一标识: CorrelationData [id=222]
确认状态: true
造成原因: null
消息发送成功
消息发送queue时失败
ReturnCallback: 消息:(Body:'测试生产者消息回调2' MessageProperties [headers={spring_returned_message_correlation=222}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
ReturnCallback: 回应码:312
ReturnCallback: 回应信息:NO_ROUTE
ReturnCallback: 交换机:lonelyDirectExchange
ReturnCallback: 路由键:TestLonelyDirectRouting
这种情况下,两个函数都被调用了,消息是推送成功到服务器了的,所以ConfirmCallback对消息确认情况是true;
而在RetrunCallback回调函数的打印参数里面可以看到,消息是推送到了交换机成功了,但是在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE 。
结论:②这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。
③消息推送成功
那么测试下,按照正常调用之前消息推送的接口就行,就调用下 /sendDirectMessage接口,可以看到控制台输出:
结论:这种情况触发的是 ConfirmCallback 回调函数。