RabbitMQ 重试机制 和 TTL

发布于:2025-09-08 ⋅ 阅读:(13) ⋅ 点赞:(0)

目录

1. 重试机制

1.1 简介

1.2 配置文件 

1.3 消费者确认机制为 auto 时

1.4 消费者确认机制为 manual 时

2. TTL

2.1 设置消息的过期时间

2.2 设置队列的过期时间

2.3 给过期队列中消息设置过期时间


1. 重试机制

1.1 简介

在消息传递过程中, 可能会遇到各种问题, 如网络故障, 服务不可用, 资源不足... 这些问题可能导致消息处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送. 但如果是程序逻辑引起的错误, 那么多次重试也是没有用的, 可以设置重试次数.

1.2 配置文件 

spring:
  application:
    name: rabbitmq-extensions-demo
  rabbitmq:
    addresses: amqp://study:study@192.168.100.10:5672/extension
    listener:
      simple:
#        acknowledge-mode: none
        acknowledge-mode: auto
#        acknowledge-mode: manual
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 5000ms # 初始失败等待时长为5秒
          max-attempts: 5 # 最大重试次数
    //重试机制
    @Bean("retryQueue")
    public Queue retryQueue(){
        return QueueBuilder.durable(Constants.RETRY_QUEUE).build();
    }

    @Bean("retryExchange")
    public DirectExchange retryExchange(){
        return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).build();
    }

    @Bean("retryBinding")
    public Binding retryBinding(@Qualifier("retryQueue") Queue queue, @Qualifier("retryExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("retry").noargs();
    }

1.3 消费者确认机制为 auto 时

如果程序逻辑错误, 那么就会不断重试, 造成消息积压. 因此我们就需要设置重试次数, 当多次重试还是失败, 消息就会被自动确认, 自然消息就会丢失 (这里可以弄个死信队列, 来存储这些丢失的消息)

生产者 : 

    @RequestMapping("/retry")
    public String retry() {
        System.out.println("retry...");
        rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE, "retry", "retry test...");
        return "消息发送成功";
    }

消费者 : 

@Component
public class RetryListener {
    @RabbitListener(queues = Constants.RETRY_QUEUE)
    public void handlerMessage(Message message) throws UnsupportedEncodingException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s, deliveryTag: %s \n",
                new String(message.getBody(), StandardCharsets.UTF_8), deliveryTag);
        int num = 3/0;
        System.out.println("业务处理完成");
    }
}

由于消费者代码逻辑错误, 消息重发机制触发了 4 次, 一共发了 5 次, 日志中, 每次的 deliverTag 都一样, 表示发送的是同一个消息.  

查看队列中消息也已经被删除.

1.4 消费者确认机制为 manual 时

更改消费者 : 

    @RabbitListener(queues = Constants.RETRY_QUEUE)
    public void handlerMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s, deliveryTag: %s \n",
                new String(message.getBody(), StandardCharsets.UTF_8), deliveryTag);
        try {
            int num = 3/0;
            System.out.println("业务处理完成");
            channel.basicAck(deliveryTag, false);
        }catch (Exception e){

            // 不成功消息重新入队列
            channel.basicNack(deliveryTag, false, true);
        }

    }

这里我们发现, 虽然我们设置了重试机制, 但是 deliveryTag 还是在不断更新... 队列中的消息也不会消失. 原因是手动模式下, 消费者需要显示的对消息进行确认, 如果消费者在消息处理过程中遇到异常, 可以选择确认, 不确认消息,也可以选择重新入队. 所以重试的控制权不在应用程序本身, 而在于代码逻辑本身.

  • 消费者确认机制为 AUTO 时, 如果程序逻辑异常, 多次重试还是失败. 那么消息就会自动确认, 进而消息就会丢失.
  • 消费者确认机制为 MANAUL 时, 如果程序逻辑异常, 多次重试依然处理失败, 无法被确认, 消息就会积压.
  • 消费者确认机制为 NONE 时, 不管发生什么情况, 当消息从 Broker 内部发出时, 就会自动确认, 因此它不存在任何内容

2. TTL

TTL -> time to live, 指的是过期时间.

  • 给消息设置过期时间 : 当消息到达过期时间, 还没有被消费, 就会被自动清除.
  • 给队列设置过期时间 :  就相当于给队列的所有消息设置了一个过期时间, 这消息的过期时间与队列的过期时间是相同的. 从消息入队列开始算起, 经过这个时间, 就会被删除

2.1 设置消息的过期时间

    @Bean("ttlQueue")
    public Queue ttlQueue(){
        return QueueBuilder.durable(Constants.TTL_QUEUE).build();
    }
    @Bean("ttlExchange")
    public DirectExchange ttlExchange(){
        return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();
    }
    @Bean("ttlBinding")
    public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue, @Qualifier("ttlExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();
    }
    @RequestMapping("/ttl")
    public String ttl() {
        System.out.println("ttl...");
        
        MessagePostProcessor messageInfo = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("30000"); //单位: 毫秒, 过期时间为30s
                return message;
            }
        };
        rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 30s", messageInfo);
        
        rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 10s...", message -> {
            message.getMessageProperties().setExpiration("10000");  //单位: 毫秒, 过期时间为10s
            return message;
        });
        return "消息发送成功";
    }

不需要消费者, 不然看不到消息在队列中的自动删除.

2.2 设置队列的过期时间

    //设置ttl
    @Bean("ttlQueue2")
    public Queue ttlQueue2(){
        return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20000).build();  //设置队列的ttl为20s
    }

    @Bean("ttlBinding2")
    public Binding ttlBinding2(@Qualifier("ttlQueue2") Queue queue, @Qualifier("ttlExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();
    }
    @RequestMapping("/ttl2")
    public String ttl2() {
        System.out.println("ttl2...");
        //发送普通消息
        rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...");
        return "消息发送成功";
    }

我们发现了消息经过 20 秒后被删除了, 但是队列没有被删除, 说明过期队列是给消息上过期时间的. 这个队列也有 TTL 的标志了.

2.3 给过期队列中消息设置过期时间

如果两种方法同时使用, 那么就以过期时间较小的值为准.

设置队列的过期时间, 一旦消息过期, 就会从队列中删除.

设置消息的过期时间, 即使消息过期, 如果消息不在队首, 还得等到消息到达队首之后才会进行判定是否过期. 如果过期, 那就删除, 反之就投递到相应的消费者中.

为什么这两种方法处理的方式不一样?

因为设置队列的过期时间, 那么队列中过期的消息一定在队首, RabbitMQ 只需要定期从队首扫描消息是否有过期的消息即可, 而设置消息的过期时间, 每条消息的过期时间都不一致, 如果要删除队列的所有过期消息那么就要扫描整个队列, 所以不如等到消息要进行投递时再判断消息是否过期, 这样可以减少一定的资源消耗.


网站公告

今日签到

点亮在社区的每一天
去签到