RabbitMQ延迟消息

发布于:2025-03-17 ⋅ 阅读:(24) ⋅ 点赞:(0)


延迟消息

生产者在发送消息的时候指定一个时间,消费者不会立即收到该消息,而是在指定时间之后才收到消息,这就是延迟消息

比如说这么一个场景,用户下单后将商品库存进行扣减了,但是用户未对订单进行支付,我们想对订单设置一个超时机制,比如说30分钟内没有支付就直接将订单取消并释放所占用的库存。

这就可以利用MQ的延迟消息来实现

死信交换机

什么死信?当一个队列中的消息满足下列情况之一时,就可以称之为死信

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  1. 收集那些因为失败而被拒绝的消息
  2. 收集那些因为队列满了而被拒绝绝的消息
  3. 收集因为有过期时间到期的消息

前面两种使用的场景是把死信交换机作为兜底场景来使用,而第三种基于死信交换机的接收过期消息就可以实现延迟消息。

通过私信交换机实现延迟消息的步骤如下:

  1. 新建一组交换机deadletter.exchange,队列deadletter.queue,这两相互绑定
  2. 再新建一组交换机ttl.exchange,队列ttl.queue,这两相互绑定,需要注意的是创建ttl.queue队列的时候,使用dead-letter-exchange属性指定deadletter.exchange交换机
  3. 那么此时deadletter.exchange交换机就为死信交换机

比如说生产者向ttl.exchange交换机发送一条30分钟的延迟消息,到达ttl.queue队列,因为该队列没有绑定消费者,等30分钟后该消息就会变成死信,因为该队列绑定了死信交换机,那么该消息就会到达deadletter.exchange交换机,再到达死信交换机绑定的队列,最后被消费者消费。从而实现了延迟消息

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

需要注意的是:RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。
当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。

延迟消息

官方提供私信交换机是用来做兜底的,而不是用来做延迟消息,所以官方提供了延迟消息插件来实现延迟消息。

插件文档

插件下载

该插件实现延时消息的原理就是设计了一个带有延时功能的交换机,能将消息在交换机中暂存一段时间,等消息到期时再把消息发送个绑定的队列。

基于注解方式声明延时交换机

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于@Bean的方式

@Slf4j
@Configuration
public class DelayExchangeConfig {

    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
            .directExchange("delay.direct") // 指定交换机类型和名称
            .delayed() // 设置delay的属性为true
            .durable(true) // 持久化
            .build();
    }

    @Bean
    public Queue delayedQueue(){
        return new Queue("delay.queue");
    }

    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}

发送延迟消息

发送消息时,必须通过x-delay属性设定延迟时间

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}

因为该延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息

延迟消息应用场景

延迟消息可以运用于订单的下单后超时未支付,比如说延迟30分钟订单未支付就自动取消,但如上所说延迟消息越多对MQ的压力也就越大。

一般下单后支付这个操作大部分用户都会在一分钟内完成,我们可以将这个30分钟的延迟消息进行拆分。也就是说将一个30分钟的延迟消息拆分成在用户后下单的第:10秒、20秒、30秒、45秒、60秒、1分30秒,分别设置延迟消息,如果提前发现订单已经支付,则后续的检测取消即可,从而减轻MQ的压力。


,我们可以将这个30分钟的延迟消息进行拆分。也就是说将一个30分钟的延迟消息拆分成在用户后下单的第:10秒、20秒、30秒、45秒、60秒、1分30秒,分别设置延迟消息,如果提前发现订单已经支付,则后续的检测取消即可,从而减轻MQ的压力。