RabbitMQ 高级特性之发送方确认

发布于:2025-07-04 ⋅ 阅读:(17) ⋅ 点赞:(0)

1. 简介

当消息从 producer 发送给 broker是,消息丢失了,那么 broker 就不会收到消息,那么后续的持久化机制、消息确认模式也就没有作用。于是,我们需要尽可能地保证 producer 发送地消息成功到达 broker。broker 中包含交换机与队列,于是,我们不仅需要保证消息能够成功到达交换机,也需要成功到达我们所指定的队列。

2. 事务与 publisher cnofirm(发送方确认)机制

对于上面这个问题,RabbitMQ 提出了两种解决策略,即:

  • 使用事务
  • publisher confirm 机制

由于事务会比较消耗性能,下面就主要讲解 publisher confirm 机制。

3. publisher confirm 机制

在 publisher confirm 机制下,分为两个模式:

  • confirm 确认模式
  • return 退回模式

这两个模式都需要在配置文件中添加下面的配置项:

spring:
  rabbitmq:
    publisher-confirm-type: correlated #消息发送确认

3.1 confirm 确认模式

下面是生产者的代码:

    @RequestMapping("/confirm")
    public String confirm() {
        String message = "confirm test";

        /**
         * 若在此处设置回调函数,那么会影响到所有 rabbitTemplate
         * 并且只能发送一次消息,因为发送那个多次消息就相当于设置了多个回调函数,规定只能设置一次
         */
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            /**
             *
             * @param correlationData correlation data for the callback.
             * @param ack true: 消息到达交换机; false: 消息没有到达交换机
             * @param cause An optional cause, for nack, when available, otherwise null.
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("接收到消息, 消息 id: " + (null == correlationData ? null : correlationData.getId()));
                } else {
                    System.out.println("未接收到消息, 消息 id: " + (null == correlationData ? null : correlationData.getId()) + ", cause: " + cause);
                }
            }
        });

        String id = UUID.randomUUID().toString();

        CorrelationData correlationData = new CorrelationData(id);

        confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, Constants.CONFIRM_ROUTINGKEY, message, correlationData);

        return "消息发送成功";
    }

上面的代码与在 spring 环境中发送消息的代码类似,但是多出了设置回调函数这一步。

ConfirmCallback 即为回调函数,当消息不论消息是否发哦是那个到了 broker,都会给 producer 发送一个 ack,若 ack 为 true,就表示消息到达了交换机;若 ack 为 false,就表示消息没有到达交换机。我们可以根据 ack 来进行不同的业务操作。

代码运行结果如下:

 当连续发送两条消息时的结果如下:

报错信息中显示:一个 RabbitMQTemplate 实例是能设置一次 ConfirmCallback。

但是当我们连续两次调用该接口时,就导致 ConfirmCallback 被创建了两次,也就会报错。对于这种情况,我们可以将 RabbitMQTemplate 提取出来,自定义一个 RabbitMQTemplate,在这个类中通过设置回调函数达到之创建一次就能使用多次的效果,代码如下:

@Configuration
public class RabbitTemplateConfig {

    @Bean("normalRabbitTemplate")
    public RabbitTemplate normalRabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

    @Bean("confirmRabbitTemplate")
    public RabbitTemplate confirmRabbitTemplate1(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            /**
             *
             * @param correlationData correlation data for the callback.
             * @param ack true: 消息到达交换机; false: 消息没有到达交换机
             * @param cause An optional cause, for nack, when available, otherwise null.
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("接收到消息, 消息 id: " + messageId);
                } else {
                    System.out.println("未接收到消息, 消息 id: " + messageId + ", cause: " + cause);
                }
            }
        });

        return rabbitTemplate;
    }
}

 在上面的配置类中,我们创建了两个方法用来返回 RabbitMQTemplate,这是因为如果我们只创建 confirmRabbitMQTemplate,那么别的接口使用到的 RabbitMQTemplate 就是我们修改后的结果,这时就需要再定义一个 normalRabbitMQTemplate 用来返回没有设置回调函数的 RabbitMQTemplate。这样就不会影响别的接口使用。调用者根据自己的需要注入不同的实例。

修改后的接口如下:

@RequestMapping("/producer")
@RestController
public class ProducerController {

    @Resource(name = "normalRabbitTemplate")
    private RabbitTemplate rabbitTemplate;

    @Resource(name = "confirmRabbitTemplate")
    private RabbitTemplate confirmRabbitTemplate;

    /**
     * 确认模式
     */
    @RequestMapping("/confirm")
    public String confirm() {
        String message = "confirm test";

        String id = UUID.randomUUID().toString();

        CorrelationData correlationData = new CorrelationData(id);

        confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, Constants.CONFIRM_ROUTINGKEY, message, correlationData);

        return "消息发送成功";
    }

}

接下来再连续调用这个接口,观察是否会报错:

这一次代码就没有报错。 

当我们使用了未声明的交换机,代码的运行结果如下:

未接收到消息, 消息 id: dcfab6f0-0d9a-470f-9f8e-5b225df71021, 
cause: channel error; 
protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm.exchangeaaa' in vhost 'extension', class-id=60, method-id=40)

报错信息中表示没有该交换机。

当我们使用了错误的 BindingKey,代码的运行结果如下:

这里居然显示消息发送成功,但是上面的代码使用了错误的 BindingKey,消息只是到了交换机,交换机并没有根据 BindingKey 找到与之绑定的队列,那么消息也就没有到达队列中,即消息发送失败。

但是为什么会出现上面那种情况呢?这就需要使用到退回模式。

3.2 return 退回模式

在确认模式末尾,我们发现,即使消息没有成功到达队列,依然会提示消息发送成功,这是不符合逻辑的。

在 RabbitMQ 中,confirm 确认模式之是保证了 producer 与 交换机之间的消息可靠传输,并没有保证交换机与队列之间的可靠传输。于是,return 退回模式就针对这一缺陷进行补充。

当消息没有成功到达队列后,就会将这条消息退回给 producer,并且携带退回的原因。

退回模式与确认模式并不冲突,二者可以分开,也可以同时存在,下面的代码将二者合并在一起进行编写,代码如下:

@Configuration
public class RabbitTemplateConfig {

    /**
     *
     * @param connectionFactory 会根据配置文件中的 rabbitmq 配置自动填充参数
     * @return
     */
    @Bean("normalRabbitTemplate")
    public RabbitTemplate normalRabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

    /**
     * 确认模式使用
     * 使用 AtomicBoolean, 在消息被退回时打印不同的日志
     * @param connectionFactory
     * @return
     */
    @Bean("confirmRabbitTemplate1")
    public RabbitTemplate confirmRabbitTemplate1(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            /**
             *
             * @param correlationData correlation data for the callback.
             * @param ack true: 消息到达交换机; false: 消息没有到达交换机
             * @param cause An optional cause, for nack, when available, otherwise null.
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                String messageId = null == correlationData ? null : correlationData.getId();
                if (ack) {
                    System.out.println("接收到消息, 消息 id: " + messageId);
                } else {
                    System.out.println("未接收到消息, 消息 id: " + messageId + ", cause: " + cause);
                }
            }
        });

        /**
         * 若消息没有到达队列,就退回
         */
        rabbitTemplate.setMandatory(true); //启用强制路由检查
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println("消息退退回: " + returned);
            }
        });

        return rabbitTemplate;
    }
}

在上面的代码中,使用 setMandatory 开启强制路由检查,当 setMandatory 设置为 true,那么就会进行消息是否成功到达队列的判断,若没有声明为 true,即使编写了 return 模式的代码,也就不会生效。

我们使用 ReturnCallBack 回调函数实现消息没有到达队列后的逻辑。

当我们使用了错误的 BindingKey,代码的运行结果如下:

消息退退回: ReturnedMessage [message=(Body:'confirm test' MessageProperties [headers={spring_returned_message_correlation=f4db7a24-f6aa-499c-8409-65c42605576a}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=confirm.exchange, routingKey=keyaaa]

接收到消息, 消息 id: f4db7a24-f6aa-499c-8409-65c42605576a

这次就识别出了消息没有成功到达队列的情况,但是依然有一个不足:

既然消息没有发送到指定队列,那为什么还会弹出接收到消息的日志呢?

于是我们需要针对上面的代码进行改变,此处不过多进行讲解。


网站公告

今日签到

点亮在社区的每一天
去签到