一、重试机制
在消息传输过程中,可能遇到各种问题,如网络故障,服务器不可用等,这些问题可能导致消息处理失败,因此RabbitMQ提供了重试机制,允许消息处理失败后重新发送,但是,如果是因为程序逻辑发生的错误,那么重试多次也是无用的,因此重试机制可以设置重试次数。
1.1 重试配置
spring:
rabbitmq:
addresses: amqp://study:study@110.41.51.65:15673/bite
listener:
simple:
acknowledge-mode: auto #消息接收确认
retry:
enabled: true # 开启消费者失败重试
initial-interval: 5000ms # 初始失败等待时⻓为5秒
max-attempts: 5 # 最⼤重试次数(包括⾃⾝消费的⼀次)
2.2 配置交换机、队列
(1)配置交换机、队列、及绑定关系:
/*
* 重试机制*/
@Bean("retryQueue")
public Queue retryQueue(){
return QueueBuilder.durable(Constants.RETRY_QUEUE).build();
}
@Bean("retryExchange")
public DirectExchange retryExchange(){
return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).build();
}
@Bean("retryBinding")
public Binding retryBinding(@Qualifier("retryQueue") Queue queue,@Qualifier("retryExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("retry");
}
(2)生产者
/*
* 重试机制*/
@RequestMapping("/retry")
public String retry(){
rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE,"retry","retry test...");
return "消息发送成功";
}
(3)消费者
@Component
public class RetryListener {
@RabbitListener(queues = Constants.RETRY_QUEUE)
public void handlerMessage(Message message) throws UnsupportedEncodingException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s," +
" deliveryTag: %S \n",new String(message.getBody(),"UTF-8"),deliveryTag);
int num = 10/0;
System.out.println("业务处理完成");
}
}
2.3 测试
可以看到,重试后还是未能正常消费消息,抛出异常,需要注意的是,如果手动处理异常,是不会触发重试的,如:
@Component
public class RetryListener {
@RabbitListener(queues = Constants.RETRY_QUEUE)
public void handlerMessage(Message message) throws UnsupportedEncodingException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try{
System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s," +
" deliveryTag: %S \n",new String(message.getBody(),"UTF-8"),deliveryTag);
int num = 10/0;
System.out.println("业务处理完成");
}catch (Exception e){
System.out.println("业务处理失败");
}
}
再次测试代码:
没有触发重试
2.4 重试注意事项
1. 自动确认模式 : 程序逻辑异常, 多次重试还是失败, 消息就会被自动确认, 那么消息就丢失 了
2. 手动确认模式:程序逻辑异常, 多次重试消息依然处理失败, 无法被确认, 就⼀直是 unacked的状态, 导致消息积压
二、TTL 机制
TTL 即 Time To Live(过期时间), RabbitMQ可以对消息或队列设置过期时间,当消息过期后,就会被自动清除,无论是对消息设置TTL还是对队列设置TTL,本质上都是设置消息的TTL
2.1 设置消息的TTL
一、准备工作(声明队列、交换机)
//未设置TTL的queue @Bean("ttlQueue") public Queue ttlQueue(){ return QueueBuilder.durable(Constants.TTL_QUEUE).build(); } @Bean("ttlExchange") public DirectExchange ttlExchange(){ return ExchangeBuilder.directExchange(Constants.TTL_exchange).build(); } @Bean("ttlBinding") public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue,@Qualifier("ttlExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("ttl"); }
二、如何设置消息的TTL
设置消息的TTL是在发送消息是设置的,通过下面这个方法来发送:
public void convertAndSend(String exchange, String routingKey, final Object message, final MessagePostProcessor messagePostProcessor)
这个方法比前面使用的方法多了一个参数,就是通过这个messagePostProcessor来设置消息的TTL,只需要在发送消息前,构造一个MessagePostProcesser对象并传入即可:
@RequestMapping("/ttl") public String ttl(){ MessagePostProcessor messagePostProcessor = message -> { message.getMessageProperties().setExpiration("10000"); return message; }; rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl" ,"ttl test 10s...",messagePostProcessor); return "消息发送成功"; }
三、测试代码
10s后:
消息已近被清除
四、设置消息的TTL存在的问题
大家都知道,队列满足先进先出的特性,那么如果先发送一条TTL为30s的消息,再发送一条TTL为10s的消息,那么当10s后,后进队列的那条消息是否会被移除?
不妨测试一下:
@RequestMapping("/ttl") public String ttl(){ MessagePostProcessor messagePostProcessor = message -> { message.getMessageProperties().setExpiration("10000"); return message; }; MessagePostProcessor messagePostProcessor2 = message -> { message.getMessageProperties().setExpiration("30000"); return message; }; rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl" ,"ttl test 30s...",messagePostProcessor2); rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl" ,"ttl test 10s...",messagePostProcessor); return "消息发送成功"; }
运行程序:
为了解决这个问题,我们可以对队列设置TTL
2.2 设置队列的TTL
一、声明队列,绑定交换机(绑定在前面声明的交换机上即可)
//设置TTL的queue @Bean("ttlQueue2") public Queue ttlQueue2(){ return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20*1000).build();//设置队列TTL为20s } @Bean("ttlBinding2") public Binding ttlBinding2(@Qualifier("ttlQueue2") Queue queue,@Qualifier("ttlExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("ttl2"); }
二、如何对队列设置TTL?
其实在上面声明队列时已经设置了,只需要在声明队列时通过 ttl方法 设置即可。
三、在TTL队列中存放为设置TTL的消息,消息是否移除?
生产者代码:
@RequestMapping("/ttl") public String ttl(){ rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl2" ,"ttl2 test..."); return "消息发送成功"; }
测试:
可以看到,消息同样会过期
四、队列TTL为20s,消息TTL为10s,消息什么时候过期?
生产者代码:
@RequestMapping("/ttl") public String ttl(){ MessagePostProcessor messagePostProcessor = message -> { message.getMessageProperties().setExpiration("10000"); return message; }; rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl" ,"ttl test 10s...",messagePostProcessor); rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl2" ,"ttl2 test..."); return "消息发送成功"; }
测试:
三、死信队列
3.1 什么是死信
由于各种原因,导致的无法被消费的消息,就是死信,死信队列就是用来存储死心的队列,当一个消息在队列中变成死信后,可以被重新发送到另一个交换机DLX(Dead Letter Exchange)中,这个交换机绑定的队列就是死信队列DLQ(Dead Letter Queue)。
消息变成死信有以下几种原因:
1> 消息过期
2> 消息被拒绝 ,且requeue参数置为false
3> 队列达到最大长度
3.2 死信代码示例
一、声明队列、交换机及绑定关系
@Bean("normalQueue") public Queue normalQueue(){ return QueueBuilder.durable(Constants.NORMAL_QUEUE).ttl(10000).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build(); } @Bean("normalExchange") public DirectExchange normalExchange(){ return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build(); } @Bean("normalBinding") public Binding normalBinding(@Qualifier("normalQueue") Queue queue,@Qualifier("normalExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("normal"); } @Bean("dlQueue") public Queue dlQueue(){ return QueueBuilder.durable(Constants.DL_QUEUE).build(); } @Bean("dlExchange") public DirectExchange dlExchange(){ return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build(); } @Bean("dlBinding") public Binding dlBinding(@Qualifier("dlQueue") Queue queue,@Qualifier("dlExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("dl"); }
上面在声明了普通交换机、队列以及死信交换机、队列,还要声明普通队列与死信交换机的关系(确保消息变成死信后会通过死信交换机路由到死信队列),只需要在声明普通队列时通过 deadLetterExchange 和 deadLetterRoutingKey 绑定即可。
二、测试由于消息过期而导致的死信
前面在声明normalQueue时已经通过ttl方法设置了过期时间,所以只需编写生产者代码即可:
@RequestMapping("/dl") public String dl(){ rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","dl test..."); return "消息发送成功"; }
运行程序,测试:
三、测试由于消息被拒绝导致的死信
编写消费者代码:
@Component public class DlListener { @RabbitListener(queues = Constants.NORMAL_QUEUE) public void handlerMessage1(Message message, Channel channel) throws IOException { Long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.printf("接收到消息:%s,deliveryTag: %d \n", new String(message.getBody(), "UTF-8"), message.getMessageProperties().getDeliveryTag()); //业务逻辑处理 System.out.println("业务逻辑处理"); int num = 10 / 0; System.out.println("业务处理完成"); channel.basicAck(deliveryTag, false); } catch (Exception e) { //!!!注意requeue一定要置为false才能变成死信 System.out.println("业务处理失败"); channel.basicNack(deliveryTag, false, false); } } @RabbitListener(queues = Constants.DL_QUEUE) public void handlerMessage2(Message message, Channel channel) throws IOException { Long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.printf("死信队列接收到消息:%s,deliveryTag: %d \n", new String(message.getBody(), "UTF-8"), message.getMessageProperties().getDeliveryTag()); channel.basicAck(deliveryTag,false); } }
测试:
四、测试由于队列达到最大长度导致的死信
修改normal队列的声明(添加一个maxLength方法指定队列最大长度):
@Bean("normalQueue") public Queue normalQueue(){ return QueueBuilder.durable(Constants.NORMAL_QUEUE).ttl(10000).maxLength(1).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build(); }
重新编写生产者代码,连续发送10条消息:
@RequestMapping("/dl") public String dl(){ for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","dl test..." + i); } return "消息发送成功"; }
由于队列的最大长度为1,因此应该有9条消息进入死信队列,测试: