【RabbitMQ】消息队列需要解决的几个问题

发布于:2022-08-08 ⋅ 阅读:(351) ⋅ 点赞:(0)

目录

一:消息可靠性问题

1.1 生产者消息确认(解决发送时消息丢失)

1.2 消息持久化(解决MQ宕机)

1.3 消费者消息确认(解决消费者接收到消息后未消费就宕机)

失败重试机制

消费者失败消息处理策略

二:死信交换机


一:消息可靠性问题

几种消息丢失的情况

1. 发送时丢失

  • 生产者发送的消息未送达exchange
  • 消息到达exchange后未到达queue

2. MQ宕机,MQ基于内存,宕机后queue消息丢失

3. consumer接收到消息后未消费就宕机

1.1 生产者消息确认(解决发送时消息丢失)

生产者确认机制
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

结果有两种请求:

  • publisher-confirm,

                发送者确认消息成功投递到交换机,返回ack

                消息未投递到交换机,返回nack

  • publisher-return,

                发送者回执消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

                调用 ReturnCallback ,做好后序方案,重发消息或者记录消息


 

 注意:确认机制发送消息时,需要给每个消息设置一个全局唯一的ID,避免ACK冲突


【配置】在publisher这个微服务的application.yml中添加配置:

spring:
        rabbitmq:
                publisher-confirm-type: correlated
                correlatedpublisher-returns: true
                template:
                        mandatory : true

publish-confirm-type:开启publisher-confirm
    simple:同步等待confirm结果,直到超时
    correlated:异步回调,定义Confirmcallback,MQ返回结果时会回调这个Confirmcallback
publish-returns:开启publish-return功能,

        同样是基于callback机制,不过是定义Returncallback
template.mandatory:定义消息路由失败时的策略。

        true,则调用ReturnCallback; false:则直接丢弃消息


1.2 消息持久化(解决MQ宕机)

MQ默认是内存存储消息,开启持久化功能可以保证缓存在MQ中的消息不丢失,在SpringAMQP中,交换机,队列,消息都是默认持久化的。


1.3 消费者消息确认(解决消费者接收到消息后未消费就宕机)

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto:自动ack,由spring监测l代码是否出现异常,没有异常则返回ack;抛出异常则返回nack(推荐使用)
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除(不推荐使用)

【配置】在消费者这个微服务的application.yml中添加配置:

spring:
        rabbitmq:
                listener:
                        simple:
                                prefetch: 1
                                acknowledge-mode: none

# none,关闭ack ; manual,手动ack ; auto:自动ack

失败重试机制

【缺点】消费者失败重试的弊端
当消费者出现异常后,为了防止消息丢失,默认消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,MQ负载加大。
 

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

【配置】在消费者这个微服务的application.yml中添加配置:

spring:
        rabbitmq:
                listener:
                        simple:
                                prefetch: 1
                                retry:
                                        enabled: true #开启消费者失败重试
                                        initial-interval:1000#初识的失败等待时长为1秒
                                        multiplier: 1 #下次失败的等待时长倍数 
                                        max-attempts: 3 #最大本地重试次数
                                        stateless: true # true无状态; false有状态。如果包含事务,改为false

消费者失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  1. RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式.
  2. ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  3. RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(死信队列)

RepublishMessageRecoverer实现图示


二:死信交换机

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到