目录
1.3 消费者消息确认(解决消费者接收到消息后未消费就宕机)
一:消息可靠性问题
几种消息丢失的情况
1. 发送时丢失
- 生产者发送的消息未送达exchange
- 消息到达exchange后未到达queue
2. MQ宕机,MQ基于内存,宕机后queue消息丢失
3. consumer接收到消息后未消费就宕机
1.1 生产者消息确认(解决发送时消息丢失)
生产者确认机制
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
结果有两种请求:
- publisher-confirm,
发送者确认消息成功投递到交换机,返回ack
消息未投递到交换机,返回nack
- publisher-return,
发送者回执消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
调用 ReturnCallback ,做好后序方案,重发消息或者记录消息
注意:确认机制发送消息时,需要给每个消息设置一个全局唯一的ID,避免ACK冲突
【配置】在publisher这个微服务的application.yml中添加配置:
spring:
rabbitmq:
publisher-confirm-type: correlated
correlatedpublisher-returns: true
template:
mandatory : true
publish-confirm-type:开启publisher-confirm
simple:同步等待confirm结果,直到超时
correlated:异步回调,定义Confirmcallback,MQ返回结果时会回调这个Confirmcallback
publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义Returncallback
template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback; false:则直接丢弃消息
1.2 消息持久化(解决MQ宕机)
MQ默认是内存存储消息,开启持久化功能可以保证缓存在MQ中的消息不丢失,在SpringAMQP中,交换机,队列,消息都是默认持久化的。
1.3 消费者消息确认(解决消费者接收到消息后未消费就宕机)
RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:
- manual:手动ack,需要在业务代码结束后,调用api发送ack。
- auto:自动ack,由spring监测l代码是否出现异常,没有异常则返回ack;抛出异常则返回nack(推荐使用)
- none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除(不推荐使用)
【配置】在消费者这个微服务的application.yml中添加配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: none
# none,关闭ack ; manual,手动ack ; auto:自动ack
失败重试机制
【缺点】消费者失败重试的弊端
当消费者出现异常后,为了防止消息丢失,默认消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,MQ负载加大。
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
【配置】在消费者这个微服务的application.yml中添加配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1
retry:
enabled: true #开启消费者失败重试
initial-interval:1000#初识的失败等待时长为1秒
multiplier: 1 #下次失败的等待时长倍数
max-attempts: 3 #最大本地重试次数
stateless: true # true无状态; false有状态。如果包含事务,改为false
消费者失败消息处理策略
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式.
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(死信队列)
RepublishMessageRecoverer实现图示