概念
延迟队列顾名思义就是消息不立即发送给消费者消费,而是延迟一段时间再交给消费者。
RabbitMQ本身没有直接支持延迟队列的的功能,但是可以通过前面所介绍的TTL+死信队列的方式组合
模拟出延迟队列的功能.
RabbitMQ 有些版本还支持延迟队列的插件安装,我们也可以通过安装这个插件实现延迟队列的功能。
TTL + 死信队列
实现思路:
假设一个应用中需要将每条消息都设置为10秒的延迟,生产者通过normal_exchange这个交换器将发送的消息存储在normal_queue这个队列中.消费者订阅的并非是normal_queue这个队列,而是dlx_queue这个队列.当消息从normal_queue这个队列中过期之后被存入dlx_queue这个队列中,消费者就恰巧消费到了延迟10秒的这条消息。
代码演示:
常量设置:
//死信队列
public static final String DL_QUEUE = "DL_QUEUE";
public static final String DL_EXCHANGE = "DL_EXCHANGE";
public static final String DL_KEY = "DL_KEY";
//普通队列
public static final String NORMAL_QUEUE = "NORMAL_QUEUE";
public static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";
public static final String NORMAL_KEY = "NORMAL_KEY";
声明队列、交换机、绑定关系:
//普通队列
@Bean("normalQueue")
public Queue normalQueue() {
return QueueBuilder.durable(MQConstants.NORMAL_QUEUE)
.deadLetterExchange(MQConstants.DL_EXCHANGE)
.deadLetterRoutingKey(MQConstants.DL_KEY)
.build();
}
@Bean("normalExchange")
public Exchange normalExchange() {
return ExchangeBuilder.directExchange(MQConstants.NORMAL_EXCHANGE).durable(true).build();
}
@Bean("normalBinding")
public Binding normalBinding(@Qualifier("normalExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(MQConstants.NORMAL_KEY).noargs();
}
//死信队列
@Bean("dlQueue")
public Queue dlQueue() {
return QueueBuilder.durable(MQConstants.DL_QUEUE).build();
}
@Bean("dlExchange")
public Exchange dlExchange() {
return ExchangeBuilder.directExchange(MQConstants.DL_EXCHANGE).durable(true).build();
}
@Bean("dlBinding")
public Binding dlBinding(@Qualifier("dlExchange") Exchange exchange, @Qualifier("dlQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(MQConstants.DL_KEY).noargs();
}
生产者:将消息过期时间设置为 10 s
@RequestMapping("/dl")
public String dl() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(MQConstants.NORMAL_EXCHANGE, MQConstants.NORMAL_KEY, "dl" + i,
message -> {
message.getMessageProperties().setExpiration("10000");
return message;
});
}
return "消息发送成功";
}
消费者需要消费的队列是死信队列:
@Component
@RabbitListener(queues = MQConstants.DL_QUEUE)
public class DLListener {
@RabbitHandler
public void handle(String messageContent, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
channel.basicAck(deliveryTag, false);
System.out.println("消息成功消费:" + messageContent);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, false);
}
}
}
存在的问题
当我们先发送一条延迟时间长的消息,然后再发送一条延迟时间短的消息,我们会发现,短的消息并没有被即使消费,而是等到长的消息时间一到,才被消费了
@RequestMapping("/dl")
public String dl() {
rabbitTemplate.convertAndSend(MQConstants.NORMAL_EXCHANGE, MQConstants.NORMAL_KEY, "30s ",
message -> {
message.getMessageProperties().setExpiration("30000");
return message;
});
rabbitTemplate.convertAndSend(MQConstants.NORMAL_EXCHANGE, MQConstants.NORMAL_KEY, "10s ",
message -> {
message.getMessageProperties().setExpiration("10000");
return message;
});
return "消息发送成功";
}
原因如下:
消息过期之后,不一定会被马上丢弃,因为RabbitMQ只会检查队首消息是否过期,如果过期则丢到死信队列,此时就会造成一个问题,如果第一个消息的延时时间很长,第二个消息的延时时间很短,那第二个
消息并不会优先得到执行。
所以在考虑使用TTL+死信队列实现延迟任务队列的时候,需要确认业务上每个任务的延迟时间是一致的,如果遇到不同的任务类型需要不同的延迟的话,需要为每一种不同延迟时间的消息建立单独的消息队列。
延迟队列的插件
安装
官方文档:Scheduling Messages with RabbitMQ
下载链接:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
下载的插件需要存放到哪个目录:https://www.rabbitmq.com/docs/installing-plugins
根据你不同的环境去选择不同的目录:
Linux命令:
#查看插件列表
rabbitmq-plugins list
#启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#重启服务
service rabbitmq-server restart
我们去到 rabbitmq 管理界面查看 exchange 有没有延迟类型 “x-delayed-messge” ,如果存在这一类型说明我们的插件安装成功了
代码演示
常量类:
//延迟队列
public static final String DELAY_QUEUE = "DELAY_QUEUE";
public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";
public static final String DELAY_KEY = "DELAY_KEY";
声明:
//延迟队列
@Bean("delayQueue")
public Queue delayQueue() {
return QueueBuilder.durable(MQConstants.DELAY_QUEUE).build();
}
@Bean("delayExchange")
public Exchange delayExchange() {
return ExchangeBuilder.directExchange(MQConstants.DL_EXCHANGE).durable(true).delayed().build();
}
@Bean("delayBinding")
public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(MQConstants.DELAY_KEY).noargs();
}
生产者:这里我们发送三条不同过期时间的消息来进行演示:
通过setDelayLong() 方法设置延迟时间
@RequestMapping("/delay")
public String delay() {
rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE, MQConstants.DELAY_KEY, "30s ",
message -> {
message.getMessageProperties().setDelayLong(30000L);
return message;
});
rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE, MQConstants.DELAY_KEY, "10s ",
message -> {
message.getMessageProperties().setDelayLong(10000L);
return message;
});
rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE, MQConstants.DELAY_KEY, "40s ",
message -> {
message.getMessageProperties().setDelayLong(40000L);
return message;
});
return "消息发送成功";
}
这里我们将确认模式设置为自动模式,不进行手动确认,便于我们书写代码:
@Component
@RabbitListener(queues = MQConstants.DELAY_QUEUE)
public class DelayListener {
@RabbitHandler
public void handle(String message) {
System.out.printf("%tc 接收到的消息为:%s\n", new Date(), message);
}
}
最终效果:
总结
1.基于死信实现的延迟队列
a优点:1)灵活不需要额外的插件支持
b.缺点: 1) 存在消息顺序问题 2)需要额外的逻辑来处理死信队列的消息,增加了系统的复杂性
2.基于插件实现的延迟队列
a.优点:1)通过插件可以直接创建延迟队列,简化延迟消息的实现. 2)避免了DLX的时序问题
b.缺点:1)需要依赖特定的插件,有运维工作2)只适用特定版本