RabbitMQ:业务幂等、死信交换机

发布于:2025-03-17 ⋅ 阅读:(13) ⋅ 点赞:(0)

业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的: f(x) = f(f(x)) 。在程序开发中,幂等性指的是无论操作执行多少次,结果都是一致的在消息队列的上下文中,幂等性意味着即使消费者多次接收到同一条消息,也不会对业务逻辑产生负面影响。

为什么需要幂等性?
在 RabbitMQ 中,消息可能会被重复消费,原因包括:

  • 网络问题:消费者处理完消息后,发送确认(ACK)时网络中断,RabbitMQ 未收到确认,会重新投递消息。
  • 消费者故障:消费者处理消息时崩溃,未发送确认,RabbitMQ 会重新投递消息。
  • 手动重试:业务逻辑中手动触发了消息的重新投递。

如果消费者没有实现幂等性,重复消费可能会导致数据不一致或业务逻辑错误。例如:

  1. 订单支付消息被重复消费,导致用户被多次扣款。
  2. 库存扣减消息被重复消费,导致库存数量错误。

延迟消息接收

延迟消息在以下场景中非常有用:
1、定时任务:如订单超时未支付自动取消(12306买票)。
2、重试机制:如消息处理失败后延迟重试。
3、通知提醒:如预约提醒、会议通知等。

实现延迟消息的常见方法

死信交换机机制

死信: 死信(Dead Letter)是指那些无法被正常消费的消息。
如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称 DLX)
在这里插入图片描述
私信交换机具体实现思路
消费者监听dlx队列

@RabbitListener(bindings = @QueueBinding(  
        value = @Queue(name = "dlx.queue", durable = "true"),  // 定义死信队列,名称为 dlx.queue,并设置为持久化
        exchange = @Exchange(name = "dlx.direct", type = ExchangeTypes.DIRECT),  // 定义死信交换机,名称为 dlx.direct,类型为 DIRECT
        key = {"hi"}  // 绑定路由键为 "hi"
))  
public void listenDlxQueue(String message) {  
    log.info("消费者监听到 dlx.queue 的消息: [{}]", message);  // 打印从死信队列中接收到的消息
}  

定义普通交换机

@Configuration  // 声明这是一个 Spring 配置类
public class NormalConfiguration {  

    // 定义普通交换机
    @Bean  
    public DirectExchange normalExchange() {  
        return new DirectExchange(name = "normal.direct");  // 创建一个名为 normal.direct 的 DIRECT 类型交换机
    }  

    // 定义普通队列
    @Bean  
    public Queue normalQueue() {  
        return QueueBuilder  
                .durable(name = "normal.queue")  // 创建一个名为 normal.queue 的持久化队列
                .deadLetterExchange(dlx = "dlx.direct")  // 设置死信交换机为 dlx.direct
                .build();  
    }  

    // 绑定普通队列到普通交换机
    @Bean  
    public Binding normalExchangeBinding(Queue normalQueue, DirectExchange normalExchange) {  
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(routingKey = "hi");  // 将 normal.queue 绑定到 normal.direct 交换机,并使用路由键 "hi"
    }  
}  

发送消息,同时设置延迟时间

@Test
void testSendDelayMessage() {  
    String messageId = UUID.randomUUID().toString();  // 生成唯一ID
    rabbitTemplate.convertAndSend(  
        "normal.direct",  
        "hi",  
        "hello",  
        message -> {  
            message.getMessageProperties().setExpiration("10000");  
            message.getMessageProperties().setCorrelationId(messageId);  // 设置消息的唯一ID
            return message;  
        }  
    );  
    // 将 messageId 存储到数据库或缓存中,以便后续使用
}

使用死信交换机

public void onPaymentSuccess(String messageId) {
    // 支付成功后的逻辑
    cancelDelayMessage(messageId);  // 取消延迟消息
}

删除队列里等待的消息

@Autowired
private RabbitTemplate rabbitTemplate;

public void cancelDelayMessage(String messageId) {
    // 从队列中删除消息
    rabbitTemplate.execute(channel -> {
        GetResponse response = channel.basicGet("normal.queue", false);  // 从队列中获取消息
        while (response != null) {
            AMQP.BasicProperties properties = response.getProps();
            if (messageId.equals(properties.getCorrelationId())) {
                channel.basicAck(response.getEnvelope().getDeliveryTag(), false);  // 确认消息
                return null;  // 找到并删除消息
            }
            response = channel.basicGet("normal.queue", false);
        }
        return null;
    });
}