RabbitMQ 高级特性之消息确认

发布于:2025-07-04 ⋅ 阅读:(13) ⋅ 点赞:(0)

1. 简介

RabbitMQ 的消息发送流程:

  • producer 将消息发送给 broker,consumer 从 broker 中获取消息并消费

那么在这里就涉及到了两种消息发送,即 producer 与 broker 之间和 consumer 与 broker 之间。

“消息确认” 讨论的是 consumer 与 broker 之间的消息发送。

2. 为什么会有这个特性

当 broker 给 consumer 发送消息时,可能会出现下面两种情况:

  • 消息未成功到达 consumer;
  • 消息成功到达 consumer,但是 consumer 没有成功消费这条消息,如:在处理消息时发生异常等情况。

这时,就需要有一种解决方案,保证 broker 与 consumer 之间消息传输的可靠性,于是就有了消息确认这一特性。

3. 使用 RabbitMQ Java 时如何进行消息确认(不是重点)

public class ConsumerDemo1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USERNAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        //创建信道
        Channel channel = connection.createChannel();

        //声明交换机
        channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);

        //声明队列
        //如果队列不存在,就创建
        channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);

        //消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);

    }
}

这是一段路由模式的代码,在这段代码中,有下面一条语句:

channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);

在这个方法中,有三个参数:

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
  • queue:consumer 通过哪个队列获取 broker 发送的消息;
  • autoAck:是否自动确认;
  • callback:consumer 消费消息的逻辑。

其中,autoAck 就是消息确认的体现:

  • autoAck 为 true:RabbitMQ 会将发送给 consumer 的消息视为已被成功接收和消费(consumer 可能并没有成功接收到或成功消费,但是 RabbitMQ 不管了),就会被将这条消息删除;
  • autoAck 为 false:当 RabbitMQ 发送消息后,并不会马上就将消息删除,而是会等 consumer 调用 Basic.Ack,收到 ack 后,才会将消息删除。

将 autoAck 设置为 false 后,若 broker 长时间没有收到 consumer 发送的 ack 且 consumer 已经断开连接,就会将这条消息重新入队列,继续发送给 consumer 进行消费,此时,队列中的消息就分为了两种:

  • 还未被发送的消息;
  • 已经发送了的消息,但是没有收到 ack 而重新入队列等待被消费。

4. 在 spring 中使用 RabbitMQ 时如何进行消息确认

4.1 basicAck

在 spring 下的 Channel 类中提供了下面几种方法:

void basicAck(long deliveryTag, boolean multiple) throws IOException;

在这个方法中,有三个参数:

  • deliveryTag:是 broker 给 consumer 发送消息的唯一标识,在一个 channel 中 deliveryTag 是唯一的;
  • mulitple: 是否批量确认

使用这个方法后,就会告知 broker 这条消息已经成功被消费,可以将其删除。

4.2 basicNack

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

 在这个方法中,多了一个参数:

  • requeue:是否重新入队列。

使用这个方法,就相当于给 broker 发送 nack,即这条消息没有被正确消费。

若 requeue 为 true,就会将这条消息重新入队列,继续给 consumer 消费;

若 requeue 为 false,broker 就会这条消息删除。

4.3 basicReject

void basicReject(long deliveryTag, boolean requeue) throws IOException;

这个方法与 basicNack 大致相同,此处省略。

4.4 配置

在 spring 中,提供了三种配置用于消息确认:

  • none:当消息发送给 consumer,不管 consumer 是否成功消费了消息,broker 都会当作这条消息被成功消费了,然后删除这条消息;
  • auto:在 consumer 处理消息时没有抛出异常时,就会确认消息,反之就不会确认,并且将消息重新放入队列中,进行下一次的消费;
  • manual:手动确认,我们需要在代码中指定这条消息是消费成功还是消费失败,分别使用 basicAck 和 basicNack。
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none

5. 代码测试

@RequestMapping("/producer")
@RestController
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/ack")
    public String ack() {
        String messageInfo = "consumer ack mode test...";

        rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, Constants.ACK_ROUTINGKEY, messageInfo);

        return "消息发送成功";
    }
}

这段代码代表的是一个 producer,下面接收到的消息都是通过这段代码发送的。

5.1 none

① 无异常时的消费者代码:

    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void listener(Message message, Channel channel) throws IOException {

        long deliveryTag = message.getMessageProperties()
                .getDeliveryTag();

        log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);

    }

代码运行结果如下:

我们可以通过访问 RabbitMQ 客户端来观察这条消息是否成功被消费:

 

可以看到,Messages 这一列中,Ready 和 Unacked 都为 0,表示消息被成功消费。 

 ② 有异常时的消费者代码:

    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void listener(Message message, Channel channel) throws IOException {

        long deliveryTag = message.getMessageProperties()
                .getDeliveryTag();

        int num = 1 / 0;

        log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);

    }

代码运行结果如下:

由于我们使用了除零操作,于是抛出了异常,我们可以通过访问 RabbitMQ 来观察这条消息是否被删除:

和上面一样,在 broker 中这条消息已经被删除,这与 none 配置性质一致。

5.2 auto 

① 无异常时的消费者代码:

@Component
@Slf4j
public class AckListener {

    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void listener(Message message, Channel channel) throws IOException {

        long deliveryTag = message.getMessageProperties()
                .getDeliveryTag();

        log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);

    }

}

代码运行结果如下:

在 RabbitMQ 客户端中显示,这条消息已经被成功消费:

 

② 有异常时的消费者代码:

@Component
@Slf4j
public class AckListener {

    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void listener(Message message, Channel channel) throws IOException {

        long deliveryTag = message.getMessageProperties()
                .getDeliveryTag();

        int num = 1 / 0;

        log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);

    }

}

 代码运行结果如下:

在运行结果中,一直会有报错产生,并且都是两个两个为一组,并且在报错信息中可以看到,producer 发送的消息一直在被消费,这是因为存在异常,就会导致这条消息一直在队列中,通过观察 RabbitMQ 客户端可以看出,这条消息依然保存在队列中:

5.3  manual

① 无异常的消费者代码如下:

@Component
@Slf4j
public class AckListener {

    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void listener(Message message, Channel channel) throws IOException {

        long deliveryTag = message.getMessageProperties()
                .getDeliveryTag();

        try {

            log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);

            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {

            channel.basicNack(deliveryTag, false, true);
        }

    }
}

在这段代码中,我们使用了 basicAck 和 basicNack 来进行消息确认,当消息处理成功后,就会执行 basicAck,告诉 broker 这条消息已经被成功消费,可以将其删除;当消息执行发生异常后,就会执行 basicNack,并且根据 requeue 参数决定如何处理这条消息。

代码运行结果如下:

RabbitMQ 客户端显示这条消息被成功消费:

 

 ② 有异常的消费者代码如下:

当 requeue 为 true:

@Component
@Slf4j
public class AckListener {

    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void listener(Message message, Channel channel) throws IOException {

        long deliveryTag = message.getMessageProperties()
                .getDeliveryTag();

        try {

            log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);

            int n = 1 / 0;

            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {

            channel.basicNack(deliveryTag, false, true);
        }

    }
}

在此处的 basicNack,将 requeue 设置为了 true,当消息处理失败后,就会将消息重新入队列,重新被消费:

我们可以看到,这条消息一直在被消费,并且 delivertTag 在递增。

并且从 RabbitMQ 客户端中可以看到,这条消息依然存在,等待被成功消费:

 当 requeue 为 false:

当处理消息发生异常后,就会将消息从队列中删除。

代码运行结果如下:

虽然异常依然存在,但是消息却没有重复发送,并且 RabbitMQ 中也将这条消息删除: