RabbitMQ高级特性——消息确认、持久性、发送方确认、重试

发布于:2025-08-15 ⋅ 阅读:(11) ⋅ 点赞:(0)

目录

一、消息确认

1.1消息确认机制

1.2手动确认方法

1. basicAck - 确认单条消息处理成功

2. basicNack - 否定确认(消息处理失败)

3.basicReject - 否定确认(拒绝单条消息)

不同确认方式的对比

1.3代码示例

1.3.1主要流程:

二、持久性

2.1交换机持久化

2.2队列持久化

2.3消息持久化

三、发送方确认

3.1Confirm确认模式

3.2return退回模式

基本概念

常见面试题

四、重试机制

4.1步骤如下:

4.2重试注意事项


一、消息确认

1.1消息确认机制

生产者发送消息之后,到达消费端之后,可能出现以下情况:

  • 消息处理成功
  • 消息处理异常

RabbitMQ向消费者发送消息之后,就会把该消息删除掉,那么第二种情况(消息处理异常),就会造成消息丢失

为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制

消息确认机制分为以下两种:

  • 自动确认:当autoAcktrue时,RabbitMQ会自动认为已成功发送消息,并从内存/磁盘中删除该消息,而不管消费者是否真正的消费到了这些消息,自动确认模式适合对于消息可靠性要求不高的场景
  • 手动确认:当autoAckfalse时,RabbitMQ会等待消费者显式调用basic.Ack命令,收到确认消息后才从内存/磁盘中移出消息,这种模式适合对消息可靠性要求比较高的场景

当autoAck参数置为false,对于RabbitMQ服务端而言,队列中的消息分成了两个部分:

  1. 是等待投递给消费者的消息
  2. 是已经投递给消费者,但是还没有收到消费者确认信号的消息

如果RabbitMQ一直没有收到消费者的确认应答信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者,也有可能还是原来的消费者

1.2手动确认方法

当在 RabbitMQ 中将 autoAck 参数设置为 false 时,消费者需要手动发送确(acknowledgement)来告知 RabbitMQ 消息已成功处理。以下是主要的手动确认方法(三种):

1. basicAck - 确认单条消息处理成功

channel.basicAck(long deliveryTag, boolean multiple);
  • 参数

    • deliveryTag: 消息的唯一标识ID(64位长整型)

    • multiple: 是否批量确认

      • false: 只确认当前消息

      • true: 确认所有比当前deliveryTag小的未确认消息

2. basicNack - 否定确认(消息处理失败)

Channel.basicReject(long deliveryTag,boolean multiple, boolean requeue)
  • 参数

    • deliveryTag: 消息的唯一标识ID

    • multiple: 是否批量否定确认

    • requeue: 是否重新入队

      • true: 消息重新放回队列(可能被其他消费者或自己再次获取)

      • false: 消息会被丢弃或进入死信队列(如果配置了DLX)

3.basicReject - 否定确认(拒绝单条消息)

Channel.basicReject(long deliveryTag, boolean requeue)
requeue: 表⽰拒绝后, 这条消息如何处理. 如果requeue 参数设置为true, 则RabbitMQ会重新将这条 消息存⼊队列,以便可以发送给下⼀个订阅的消费者. 如果requeue参数设置为false, 则RabbitMQ会把 消息从队列中移除, ⽽不会把它发送给新的消费者

不同确认方式的对比

方法 批量处理 重新入队选项 备注
basicAck 支持 - 确认成功处理
basicNack 支持 支持 更灵活的拒绝方式
basicReject 不支持 支持 只能拒绝单条消息

1.3代码示例

基于SpringBoot来演⽰消息的确认机制, 使⽤⽅式和使⽤RabbitMQ Java Client 库有⼀定差异

Spring—AMQP对消息确认机制提供了三种策略:

public enum AcknowledgeMode{
     NONE,
     MANUAL,
     AUTO;
}
1.AcknowledgeMode.NONE
  • 这种模式下, 消息⼀旦投递给消费者, 不管消费者是否成功处理了消息, RabbitMQ 就会⾃动确认 消息, 从RabbitMQ队列中移除消息. 如果消费者处理消息失败, 消息可能会丢失.
2.AcknowledgeMode.AUTO(默认)
  • 这种模式下, 消费者在消息处理成功时会⾃动确认消息, 但如果处理过程中抛出了异常, 则不会确认消息.
3.AcknowledgeMode.MANUAL
  • ⼿动确认模式下, 消费者必须在成功处理消息后显式调⽤ basicAck ⽅法来确认消息. 如果消息未被确认, RabbitMQ 会认为消息尚未被成功处理, 并且会在消费者可⽤时重新投递该消息, 这 种模式提⾼了消息处理的可靠性, 因为即使消费者处理消息后失败, 消息也不会丢失, ⽽是可以被 重新处理.

1.3.1主要流程:

  1.  配置确认机制(⾃动确认/⼿动机制)
  2. ⽣产者发送消息
  3. 消费端逻辑
  4.  测试

1.配置确认机制

配置确认机制
spring:
 rabbitmq:
     addresses: amqp://study:study@110.41.51.65:15673/bite
     listener:
         simple:
         acknowledge-mode: none

2.发送消息

3.编写消费端逻辑

4.运行程序(测试)

二、持久性

  •  持久化是RabbitMQ可靠性保证机制之一,前面了解一下消费端处理消息时,消息如何不丢失,但是该如何保证生产者发送的消息不丢失呢
  • RabbitMQ的持久化分为个部分:交换机的持久化队列的持久化消息的持久化

2.1交换机持久化

  • 交换机的持久化是通过在声明交换机时将durable参数设置为true交换机默认就是持久化的),相当于将交换机的属性在服务器内部保存,后续服务器发生意外或者关闭后,重启RabbitMQ时不在需要重新创建交换机啦,交换机会自动建立,相当于一直存在
ExchangeBuilder. topicExchange(ExCHANGE_NAME) .durable(true) .build()

2.2队列持久化

  • 队列的持久化是通过在声明队列时将durable参数设置为true(队列默认就是持久化的
  • 队列的持久化能保证队列本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不回丢失,要确保消息不会丢失,需要将消息设置为持久化
QueueBuilder.durable(Constant.ACK_QUEUE).build();

也可以将队列设置为非持久化

QueueBuilder.nonDurable(Constant.ACK_QUEUE).build();

2.3消息持久化

消息要实现持久化,需要把消息投递模式(MessageProperties)中的(deliveryMode)设置为2,

public enum MessageDeliveryMode {
    NON_PERSISTENT,//非持久化
    PERSISTENT;//持久化
}

注意⚠️

消息是存储在队列中的,所以消息的持久化,需要队列持久化+消息持久化

  • 如果只设置了队列持久化,MQ重启后,消息会丢失
  • 如果只设置消息持久化,MQ重启后,队列会丢失,消息也随之消失
  • 如果将所有消息都设置为持久化,会严重影响RabbitM Q的性能,导致写入磁盘的速度比写入内存的速度慢得不只一点点,需要根据实际结果来选择是否将消息持久化

三、发送方确认

发送方确认(Publisher Confirms)是 RabbitMQ 提供的一种可靠消息投递机制,用于确保消息已成功到达服务器。这是比事务更轻量级的解决方案。

在使用RabbitMQ时候,可以通过消息持久化 来解决因为服务器的异常崩溃而导致的消息丢失,但是还有一个问题,当消息的生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?如果在消息到达服务器以前已经丢失(比如MQ重启),持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?

RabbitMQ为我们提供了两种解决方案:

  1. 通过事务机制实现
  2. 通过发送方确认(publisher confirm)机制实现

主要了解confirm机制来实现发送方的确认

Rabbitm MQ提供了两个方式来控制消息的可靠性投递

  1. confirm确认模式
  2. return退回模式

3.1Confirm确认模式

生产者(producer)在发送消息的时候,对发送端设置一个ConfirmCallback的监听,无论消息是否有到达Exchange,这个监听都会被执行,如果Exchange成功收到,ACK为true,如果没有收到消息,ACK就为false

步骤如下

  1. 配置RabbitMQ
  2. 设置确认回调逻辑并发送消息

1.配置RabbitMQ

spring:
 rabbitmq:
     addresses: amqp: //admin:admin@8.140.60.17:15672/
     listener:
         simple:
             acknowledge-mode: manual #消息接收确认
     publisher-confirm-type: correlated #消息发送确

2.设置确认回调逻辑并发送消息


# 生产者代码
@RequestMapping("/producer")
@RestController
public class ProducerController {
   
    @Resource(name = "confirmRabbitTemplate")
    private RabbitTemplate confirmRabbitTemplate;


@RequestMapping("/confirm")
    public String confirm() {
        CorrelationData correlationData = new CorrelationData("1");
        confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm111", "confirm test...", correlationData);
        return "消息发送成功";
    }
}



@Configuration
public class RabbitTemplateConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
    @Bean
    public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //设置回调方法
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("执行了confirm方法");
                if (ack){
                    System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());
                }else {
                    System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);
                    //相应的业务处理
                }
            }
        });
        //消息被退回时, 回调方法
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println("消息退回:"+returned);
            }
        });
        return rabbitTemplate;
    }

3.2return退回模式

Return 退回模式是 RabbitMQ 提供的另一种可靠性机制,用于处理消息从 Exchange 路由到 Queue 失败的情况。当消息无法被正确路由时,RabbitMQ 会将消息退回给生产者。

基本概念

  1. 与 Confirm 模式的区别

    • Confirm 模式:确认消息是否到达 Exchange

    • Return 模式:处理消息从 Exchange 路由到 Queue 失败的情况

  2. 触发条件

    • Exchange 不存在

    • Exchange 与 Queue 之间没有绑定匹配的路由键

    • mandatory 参数设置为 true

步骤如下

  1. 配置RabbitMQ
  2. 设置返回回调逻辑并发送消息

1.配置RabbitMQ

spring:
 rabbitmq:
     addresses: amqp: //admin:admin@8.140.60.17:15672/
     listener:
         simple:
             acknowledge-mode: manual #消息接收确认
     publisher-confirm-type: correlated #消息发送确

2.设置返回回调逻辑并发送消息(结合confirm)


# 生产者代码
@RequestMapping("/producer")
@RestController
public class ProducerController {
   @Resource(name = "confirmRabbitTemplate")
    private RabbitTemplate confirmRabbitTemplate;
   
    @RequestMapping("/returns")
    public String returns() {
        CorrelationData correlationData = new CorrelationData("5");
        confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,         "confirm111", "returns test...", correlationData);
        return "消息发送成功";
    }
}



@Configuration
public class RabbitTemplateConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
    @Bean
    public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //设置回调方法
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("执行了confirm方法");
                if (ack){
                    System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());
                }else {
                    System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);
                    //相应的业务处理
                }
            }
        });
        //消息被退回时, 回调方法
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println("消息退回:"+returned);
            }
        });
        return rabbitTemplate;
    }

常见面试题

如何保证RabbitMQ消息的可靠传输?


这是⼀张RabbitMQ消息传递图
问题环节 问题描述 可能原因 解决方案
生产者到Broker 消息未能到达RabbitMQ服务器 网络中断、Broker宕机、生产者崩溃 1. 启用Confirm确认模式
2. 实现发送重试机制
3. 持久化未确认消息
Exchange到Queue 消息到达Exchange但无法路由到任何Queue 路由键错误、绑定关系缺失、目标队列不存在 1. 启用Return退回模式(mandatory=true)
2. 配置死信队列
3. 加强绑定关系验证
Broker存储 Broker宕机导致消息丢失 未持久化消息、磁盘故障、集群节点失效 1. 全面持久化(交换机/队列/消息)
2. 配置镜像队列
3. 定期备份元数据
消费者处理 消息被获取但未成功处理 消费者崩溃、业务逻辑异常、自动确认模式下提前确认 1. 使用手动ACK模式
2. 实现消费重试机制
3. 保证消费逻辑幂等性

四、重试机制

在消息传递过程中,可能会遇到各种问题,如网络故障,服务不可用,资源不足等,这些问题可能导致消息处理失败.为了解决这些问题,RabbitMQ提供了重试机制,允许消息在处理失败后重新发送.
但如果是程序逻辑引起的错误,那么多次重试也是没有用的,可以设置重试次数


4.1步骤如下

  1. 配置RabbitMQ
  2. 配置交换机与队列
  3. 编写生产者
  4. 编写消费者
  5. 测试结果

1.配置RabbitMQ

spring:
  rabbitmq:
    addresses: amqp://admin:admin@8.140.60.17:15672/
    listener:
      simple:
        acknowledge-mode: auto #消息接收确认 
        retry:
          enabled: true # 开启消费者失败重试 
          initial-interval: 5000ms # 初始失败等待时⻓为5秒 
          max-attempts: 5 # 最⼤重试次数(包括⾃⾝消费的⼀次) 

2.配置交换机与队列

    /*
    * 重试机制*/
    @Bean("retryQueue")
    public Queue retryQueue(){
        return QueueBuilder.durable(Constants.RETRY_QUEUE).build();
    }
    @Bean("retryExchange")
    public DirectExchange retryExchange(){
        return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).build();
    }
    @Bean("retryBinding")
    public Binding retryBinding(@Qualifier("retryQueue") Queue queue,@Qualifier("retryExchange") DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("retry");
    }

3.编写生产者

    /*
    * 重试机制*/
    @RequestMapping("/retry")
    public String retry(){
        rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE,"retry","retry test...");
        return "消息发送成功";
    }

4.编写消费者

@Component
public class RetryListener {
    @RabbitListener(queues = Constants.RETRY_QUEUE)
    public void handlerMessage(Message message) throws UnsupportedEncodingException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s," +
                " deliveryTag: %S \n",new String(message.getBody(),"UTF-8"),deliveryTag);
        int num = 10/0;
        System.out.println("业务处理完成");
    } 
}

5.测试结果:

在重试设置的次数之后,还未成功发送消息就会抛出异常,可以手动处理异常

@Component
public class RetryListener {

    @RabbitListener(queues = Constants.RETRY_QUEUE)
    public void handlerMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.printf("["+Constants.RETRY_QUEUE+"]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);
        try {
            int num = 3/0;
            System.out.println("业务处理完成");
            channel.basicAck(deliveryTag, false);
        }catch (Exception e){
            channel.basicNack(deliveryTag, false, true);
        }

    }
}

测试结果:

发现手动处理完异常就不会在重试

4.2重试注意事项

1. 自动确认模式 : 程序逻辑异常, 多次重试还是失败, 消息就会被自动确认, 那么消息就丢失 

2. 手动确认模式:程序逻辑异常, 多次重试消息依然处理失败, 无法被确认, 就⼀直是 unacked的状态, 导致消息积压


网站公告

今日签到

点亮在社区的每一天
去签到