RabbitMQ高级特性--发送方确认

发布于:2025-03-31 ⋅ 阅读:(31) ⋅ 点赞:(0)
在使用RabbitMQ的时候, 可以通过消息持久化来解决因为服务器的异常崩溃而导致的消息丢失, 但是还有⼀个问题, 当消息的生产者将消息发送出去之后, 消息到底有没有正确地到达服务器呢? 如果在消息到达服务器之前已经丢失(比如RabbitMQ重启, 那么RabbitMQ重启期间生产者消息投递失败), 持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?
RabbitMQ为我们提供了两种解决方案:
   
      1. 通过事务机制实现
      2. 通过发送方确认机制实现
事务机制比较消耗性能,咱们主要介绍confirm机制来实现发送方的确认。
RabbitMQ为我们提供了两个方式来控制消息的可靠性投递:
      1. confirm确认模式
      2. return退回模式

1. confirm确认模式

Producer 在发送消息的时候, 对发送端设置⼀个ConfirmCallback的监听, ⽆论消息是否到达
Exchange, 这个监听都会被执行, 如果Exchange成功收到, ACK( Acknowledge character , 确认
字符)为true, 如果没有收到,则ACK为false。
步骤如下:

1.配置RabbitMQ

spring:
  rabbitmq:
    addresses: amqp://study:study@你的服务器IP:15673/你的虚拟机名
    listener:
      simple:
        acknowledge-mode: manual #消息接收确认
    publisher-confirm-type: correlated #消息发送确认

2.设置确认回调逻辑并发送消息

无论消息确认成功还是失败, 都会调用ConfirmCallback的confirm方法. 如果消息成功发送到Broker,
ack为true.
如果消息发送失败, ack为false, 并且cause提供失败的原因
@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory 
connectionFactory){
 RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
 @Override
 public void confirm(CorrelationData correlationData, boolean ack, 
String cause) {
 System.out.printf("");
 if (ack){
 System.out.printf("消息接收成功, id:%s \n", 
correlationData.getId());
 }else {
 System.out.printf("消息接收失败, id:%s, cause: %s", 
correlationData.getId(), cause);
 }
 }
 });
 return rabbitTemplate;
}


@Resource(name = "confirmRabbitTemplate")
private RabbitTemplate confirmRabbitTemplate;
@RequestMapping("/confirm")
public String confirm() throws InterruptedException 
 CorrelationData correlationData1 = new CorrelationData("1");
 confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME, 
"confirm", "confirm test...", correlationData1);
 return "确认成功";
}

方法说明:

public interface ConfirmCallback {
 /**
 * 确认回调
 * @param correlationData: 发送消息时的附加信息, 通常⽤于在确认回调中识别特定的消
息
 * @param ack: 交换机是否收到消息, 收到为true, 未收到为false
 * @param cause: 当消息确认失败时,这个字符串参数将提供失败的原因.这个原因可以⽤于调
试和错误处理.
 * 成功时, cause为null 
 */
 void confirm(@Nullable CorrelationData correlationData, boolean ack, 
@Nullable String cause);
}

RabbitTemplate.ConfirmCallback 和 ConfirmListener 区别
在RabbitMQ中, ConfirmListener和ConfirmCallback都是⽤来处理消息确认的机制, 但它们属于不同
的客户端库, 并且使用的场景和方式有所不同.
1 . ConfirmListener 是 RabbitMQ Java Client 库中的接口. 这个库是 RabbitMQ 官⽅提供的⼀个直
接与RabbitMQ服务器交互的客户端库. ConfirmListener 接⼝提供了两个方法: handleAck 和
handleNack, ⽤于处理消息确认和否定确认的事件.
2 . ConfirmCallback 是 Spring AMQP 框架中的⼀个接口. 专门为Spring环境设计. 用于简化与
RabbitMQ交互的过程. 它只包含⼀个 confirm 方法,⽤于处理消息确认的回调.
在 Spring Boot 应⽤中, 通常会使用 ConfirmCallback, 因为它与 Spring 框架的其他部分更加整合, 可以利用Spring 的配置和依赖注入功能. 而在使用 RabbitMQ Java Client 库时, 则可能会直接实现
ConfirmListener 接口, 更直接的与RabbitMQ的Channel交互

2.Return退回模式

消息到达Exchange之后, 会根据路由规则匹配, 把消息放⼊Queue中. Exchange到Queue的过程, 如果⼀条消息无法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等), 可以选择把消息退回给发送者. 消息退回给发送者时, 我们可以设置一个返回回调方法,对消息进行处理

步骤如下:

1.配置RabbitMQ

spring:
 rabbitmq:
   addresses: amqp://study:study@你的服务器IP:15673/你的虚拟机名
   listener:
     simple:
       acknowledge-mode: manual #消息接收确认
   publisher-confirm-type: correlated #消息发送确认

2.设置返回回调逻辑并发送消息

消息无法被路由到任何队列,它将返回给发送者,这时setReturnCallback设置的回调将被触发

@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(CachingConnectionFactory 
connectionFactory){
 RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
 rabbitTemplate.setMandatory(true);
 rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
 @Override
 public void returnedMessage(ReturnedMessage returned) {
 System.out.printf("消息被退回: %s", returned);
 }
});
 return rabbitTemplate;
}

@RequestMapping("/msgReturn")
public String msgReturn(){
 CorrelationData correlationData = new CorrelationData("2");
 confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, 
"confirm11", "message return test...", correlationData);
 return "消息发送成功";
}
使用RabbitTemplate的setMandatory方法设置消息的mandatory属性为true(默认为false). 这个属性 的作用是告诉RabbitMQ, 如果⼀条消息无法被任何队列消费, RabbitMQ应该将消息返回给发送者, 此时 ReturnCallback 就会被触发

回调函数中有一个参数,ReturnMessage,包含以下属性:

public class ReturnedMessage {
 //返回的消息对象,包含了消息体和消息属性
 private final Message message;
 //由Broker提供的回复码, 表⽰消息⽆法路由的原因. 通常是⼀个数字代码,每个数字代表不同
的含义. 
 private final int replyCode;
 //⼀个⽂本字符串, 提供了⽆法路由消息的额外信息或错误描述.
 private final String replyText;
 //消息被发送到的交换机名称
 private final String exchange;
 //消息的路由键,即发送消息指定的键
 private final String routingKey;
}