上篇文章:
目录
3.3.1 RabbitMQ Delayed Message Plugin插件安装
1 TTL
1.1 介绍
TTL表示过期时间,RabbitMQ提供对消息设置TTL和对队列设置TTL两种方式:
如果对队列设置TTL,就表示队列中的消息都是该TTL过期。
如果同时设置消息和队列的TTL,那么取两者较小值为TTL的时间。
TTL的应用场景还是比较广泛的,比如订单系统和支付系统通过RabbitMQ通信,订单具有超时时间,超时未支付就会取消订单,此时订单就可以设置TTL。
1.2 消息TTL
队列、交换机名称和声明:
public class RabbitMQConnection {
public static final String TTL_QUEUE = "ttl.queue";
public static final String TTL_EXCHANGE = "ttl.exchange";
}
@Configuration
public class RabbitMQConfig {
@Bean("ttlQueue")
public Queue ttlQueue(){
return QueueBuilder.durable(RabbitMQConnection.TTL_QUEUE).build();
}
@Bean("ttlExchange")
public DirectExchange ttlExchange(){
return ExchangeBuilder.directExchange(RabbitMQConnection.TTL_EXCHANGE).durable(true).build();
}
@Bean("ttlQueueBinding")
public Binding ttlQueueBinding(@Qualifier("ttlExchange") DirectExchange directExchange, @Qualifier("ttlQueue") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("ttl");
}
}
生产者:
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Resource(name = "rabbitTemplate")
private RabbitTemplate rabbitTemplate;
@RequestMapping("ttl")
public String ttl(){
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");
return message;
}
};
rabbitTemplate.convertAndSend(RabbitMQConnection.TTL_EXCHANGE, "ttl","Hello SpringBoot RabbitMQ",messagePostProcessor);
return "发送成功";
}
}
设置消息TTL主要是在MessagePostProcessor类中为消息setExpiration()设置过期时间,单位为ms,接受类型为String。设置10s过期时间,观察结果:
1.3 队列TTL
队列、交换机名称和声明:
public class RabbitMQConnection {
public static final String TTL_QUEUE = "ttl.queue";
public static final String TTL_QUEUE2 = "ttl.queue2";
public static final String TTL_EXCHANGE = "ttl.exchange";
}
@Configuration
public class RabbitMQConfig {
@Bean("ttlQueue")
public Queue ttlQueue(){
return QueueBuilder.durable(RabbitMQConnection.TTL_QUEUE).build();
}
@Bean("ttlQueue2")
public Queue ttlQueue2(){
return QueueBuilder.durable(RabbitMQConnection.TTL_QUEUE2).ttl(20000).build();
}
@Bean("ttlExchange")
public DirectExchange ttlExchange(){
return ExchangeBuilder.directExchange(RabbitMQConnection.TTL_EXCHANGE).durable(true).build();
}
@Bean("ttlQueueBinding")
public Binding ttlQueueBinding(@Qualifier("ttlExchange") DirectExchange directExchange, @Qualifier("ttlQueue") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("ttl");
}
@Bean("ttlQueueBinding2")
public Binding ttlQueueBinding2(@Qualifier("ttlExchange") DirectExchange directExchange, @Qualifier("ttlQueue2") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("ttl");
}
}
生产者:
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Resource(name = "rabbitTemplate")
private RabbitTemplate rabbitTemplate;
@RequestMapping("ttl")
public String ttl(){
rabbitTemplate.convertAndSend(RabbitMQConnection.TTL_EXCHANGE, "ttl","Hello SpringBoot RabbitMQ");
return "发送成功";
}
}
设置队列过期时间是在声明队列的时候,使用ttl()方法进行设置,单位ms。设置20s队列过期时间,观察结果:
注意:队列TTL和消息TTL的区别。队列TTL:一旦消息过期,会立即删除消息。消息TTL:消息过期也不会立即从队列中删除,而是等待消息即将被投递到消费者之间再判断是否过期(是否删除)。
为什么有这样的区别?队列TTL:因为队列的先进先出特性,先进队列的一定先到期,因此只需要定期从队头判断是否过期,过期即连续删除。消息TTL:由于每个消息的TTL都不一样,因此如果要实时判断消息是否过期就需要不断扫描全队列(开销大),因此采用惰性删除即消息即将被投递到消费者之间再判断是否过期。
上述区别会产生一个现象,在队列TTL中,连续发送的消息(认为是在同一时刻)在超过过期时间后所有消息都会消失。而在消息TTL中,连续发送过期时间长和过期时间短的消息,即使过期时间短的消息已经过期,只要过期时间长的消息还未过期,过期时间短的消息仍然会在队列中。
2 死信队列
2.1 介绍
当消息因为某些原因,无法再被消费者消费,就变成死信(Dead Letter)。死信就会进入死信交换机(DLX,Dead Letter Exchange),由死信交换机路由到死信队列(DLQ,Dead Letter Queue)。死信队列的消息也可以被消费者消费:
消息变成死信有如下三种原因:
1.消息确认机制采用手动确认,并调用basicReject()或basicNack(),requeue==false。由于拒绝重新入队,因此消息变为死信。
2.消息超过过期时间TTL,超过过期时间的消息无法再被消费,也就变成死信。
3.队列满了,队列有长度限制,超出队列长度的消息就会变为死信。
2.2 案例演示
2.2.1 消息超过TTL
队列、交换机名称和声明:
public class RabbitMQConnection {
public static final String NORMAL_QUEUE = "normal.queue";
public static final String NORMAL_EXCHANGE = "normal.exchange";
public static final String DL_QUEUE = "dl.queue";
public static final String DL_EXCHANGE = "dl.exchange";
}
@Configuration
public class RabbitMQConfig {
//正常队列正常交换机和死信队列死信交换机的声明和绑定
@Bean("normalQueue")
public Queue normalQueue(){
return QueueBuilder.durable(RabbitMQConnection.NORMAL_QUEUE)
.deadLetterExchange(RabbitMQConnection.DL_EXCHANGE)//绑定死信交换机
.deadLetterRoutingKey("dl")//死信携带的routingKey
.ttl(10000)//制造死信条件(消息过期)
.build();
}
@Bean("normalExchange")
public DirectExchange normalExchange(){
return ExchangeBuilder.directExchange(RabbitMQConnection.NORMAL_EXCHANGE).durable(true).build();
}
@Bean("normalQueueBinding")
public Binding normalQueueBinding(@Qualifier("normalExchange") DirectExchange directExchange, @Qualifier("normalQueue") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("normal");
}
@Bean("dlQueue")
public Queue dlQueue(){
return QueueBuilder.durable(RabbitMQConnection.DL_QUEUE).build();
}
@Bean("dlExchange")
public DirectExchange dlExchange(){
return ExchangeBuilder.directExchange(RabbitMQConnection.DL_EXCHANGE).durable(true).build();
}
@Bean("dlQueueBinding")
public Binding dlQueueBinding(@Qualifier("dlExchange") DirectExchange directExchange, @Qualifier("dlQueue") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("dl");
}
}
生产者:
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Resource(name = "rabbitTemplate")
private RabbitTemplate rabbitTemplate;
@RequestMapping("dl")
public String dl(){
rabbitTemplate.convertAndSend(RabbitMQConnection.NORMAL_EXCHANGE, "normal","Hello SpringBoot RabbitMQ");
return "发送成功";
}
}
运行代码可以发现,创建了两个队列normal.queue和dl.queue,normal.queue的TTL表示设置队列过期时间,DLX表示该队列绑定了死信交换机,DLK则是该队列消息过期时变成死信发送给死信交换机携带了routingKey。
刚开始生产者给normal.queue发送消息,10s后队列消息过期变为死信,就被发送到死信交换机,死信交换机将死信路由到dl.queue死信队列。
2.2.2 手动确认拒绝重新入队
添加消费者代码(监听正常队列和死信队列),构造拒绝重新入队的死信条件(需要提前开启manual级别):
@Component
public class DlListener {
@RabbitListener(queues = RabbitMQConnection.NORMAL_QUEUE)
public void queueListener1(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.printf("listener [" + RabbitMQConnection.NORMAL_QUEUE + "]收到消息:%s, deliveryTag:%d \n",
new String(message.getBody(), "UTF-8"),
deliveryTag);
int num = 1 / 0;
System.out.println("消息处理完成");
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
System.out.println("消息处理发生异常,拒绝重新入队");
channel.basicNack(deliveryTag, false, false);
}
}
@RabbitListener(queues = RabbitMQConnection.DL_QUEUE)
public void queueListener2(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("listener [" + RabbitMQConnection.DL_QUEUE + "]收到消息:%s, deliveryTag:%d \n",
new String(message.getBody(), "UTF-8"),
deliveryTag);
System.out.println("listener [" + RabbitMQConnection.DL_QUEUE + "]消息处理完成");
channel.basicAck(deliveryTag, false);
}
}
运行代码可以发现,当正常队列的消息被消费者1处理时发生异常,由于选择拒绝重新入队,因此消息变为死信进入死信队列,然后就被消费者2监听到并处理了消息。
2.2.3 队列满了
修改队列声明,添加最大长度属性(需要先停止服务器并提前在管理界面删除已经存在的队列):
@Bean("normalQueue")
public Queue normalQueue(){
return QueueBuilder.durable(RabbitMQConnection.NORMAL_QUEUE)
.deadLetterExchange(RabbitMQConnection.DL_EXCHANGE)//绑定死信交换机
.deadLetterRoutingKey("dl")//死信携带的routingKey
.ttl(10000)//制造死信条件(消息过期)
.maxLength(10)
.build();
}
生产者连续发送20条消息:
@RequestMapping("dl")
public String dl() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(RabbitMQConnection.NORMAL_EXCHANGE, "normal", "Hello SpringBoot RabbitMQ");
}
return "发送成功";
}
移除消费者,观察管理界面的队列消息变化情况:
可以发现normal.queue添加了Lim属性表示队列最大长度。由于队长为10,因此发送20条消息后10条直接变为死信进入死信队列。而10秒TTL后所有消息都过期,变为死信又进入死信队列。
2.3 总结
死信和死信队列:死信是指无法被消费者处理或消费的消息,死信队列就是存储死信的队列。
死信产生原因:1.消息处理发生异常,手动确认返回Nack时选择拒绝重新入队的策略。2.消息TTL过期。3.队列满了。
死信队列应用场景:比如订单系统和支付系统,订单消息存储在消息队列中,如果消息不能正确被支付系统处理,为了保证消息可靠性选择手动确认并重新入队,就会导致订单系统反复处理失败这条消息,其它消费无法得到处理(消息积压)。此时就可以利用死信队列,把处理失败的消息不让重新入队,而是存入死信队列,让其它专门处理死信的消费者进行消费(支付失败的订单发送工单专门交给人工处理)。
还有就是订单超时,那订单系统的订单的状态就需要修改(待支付变为订单超时),就可以把超时的订单存到死信队列,让支付系统的其它消费者消费,支付系统通过给订单系统发送消息来告知订单状态的改变。
3 延迟队列
3.1 介绍
消息发送后,让消费者等待一段时间再允许从队列中获取消息,这样的队列就是延迟队列。
延迟队列的作用类似于定时器,只是定时器往往工作在同一个服务场景,而延迟队列可以工作在分布式场景。常见应用场景如下:
1.预约服务:在景区预约平台或线上会议室预约后,往往都会把消息延迟到景区开门或会议开始前再通知提醒。
2.智能家居:用户在下班前就通过手机或远程遥控来控制家里的用电系统、空调、热水器等在用户快到家前就开始工作,这时控制指令就会延迟推送,待到达指定时间时再发送到家居的控制系统。
3.2 实现
RabbitMQ并没有直接提供延迟队列,延迟队列的实现可以使用TTL+死信队列来实现。
具体方案如下:
Queue是设置了TTL,需要延迟通知的消费者监听死信队列,当发送消息到Queue,假设延迟10s通知,那TTL就是10s。10s后消息变为死信,转发到死信队列,此时消费者就可以获取到延迟10s的消息进行消费了。
具体代码在死信队列已经实现,不再演示。
3.3 延迟队列插件
上述实现方式存在一个问题,如果设置的不是队列的TTL,而是消息的TTL,各种消息TTL不一致,就会出现无法按特定时间延迟的结果。
比如消息1TTL为10s,消息2TTL为30s,消息2先发送,消息1后发送。由于消息2在队头,因此RabbitMQ就会反复扫描队头判断是否过期,只有队头判断过期后才会再判断消息1,因此消息1本来要延迟10s,结果却被迫延迟30s后再被消费。
要解决这个问题,RabbitMQ提供了一个插件专门用来提供延迟队列功能。
3.3.1 RabbitMQ Delayed Message Plugin插件安装
首先前往插件的github地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/
点击右侧的Releases:
找到和RabbitMQ匹配的版本下载.ez文件:
根据不同的操作系统寻找不同的安装路径,把下载的插件复制到该目录里(如果没有该目录,手动创建):
注意:如果是docker,则使用如下命令:docker cp 宿主机文件 容器名称或ID:容器目录。
3.3.2 插件使用
查看插件:rabbitmq-plugins list
启动插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启服务:service rabbitmq-server restart
3.3.3 案例演示
队列交换机名称和声明:
public class RabbitMQConnection {
public static final String DELAY_QUEUE = "delay.queue";
public static final String DELAY_EXCHANGE = "delay.exchange";
}
@Configuration
public class RabbitMQConfig {
@Bean("delayQueue")
public Queue delayQueue(){
return QueueBuilder.durable(RabbitMQConnection.DELAY_QUEUE).build();
}
@Bean("delayExchange")
public DirectExchange delayExchange(){
return ExchangeBuilder.directExchange(RabbitMQConnection.DELAY_EXCHANGE).durable(true).delayed().build();
}
@Bean("delayQueueBinding")
public Binding delayQueueBinding(@Qualifier("delayExchange") DirectExchange directExchange, @Qualifier("delayQueue") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("delay");
}
}
声明交换机时,注意使用delayed()来声明这是延迟交换机。这也就意味着延迟队列插件的延迟原理不是放到队列中延迟一定时间在消费,而是消息在交换机上延迟一定时间再把消息路由到队列。
生产者代码:
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Resource(name = "rabbitTemplate")
private RabbitTemplate rabbitTemplate;
@RequestMapping("delay")
public String delay() {
MessagePostProcessor messagePostProcessor1 = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(30000);//延迟30s
return message;
}
};
MessagePostProcessor messagePostProcessor2 = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(10000);//延迟10s
return message;
}
};
rabbitTemplate.convertAndSend(RabbitMQConnection.DELAY_EXCHANGE, "delay", "Hello SpringBoot RabbitMQ",messagePostProcessor1);
rabbitTemplate.convertAndSend(RabbitMQConnection.DELAY_EXCHANGE, "delay", "Hello SpringBoot RabbitMQ",messagePostProcessor2);
System.out.println("消息发送成功,当前时间:"+new Date());
return "发送成功";
}
}
消费者代码:
@Component
public class DelayListener {
@RabbitListener(queues = RabbitMQConnection.DELAY_QUEUE)
public void queueListener2(Message message, Channel channel) throws IOException {
System.out.printf("listener [" + RabbitMQConnection.DELAY_QUEUE + "]收到消息:%s, 时间:%s \n",
new String(message.getBody(), "UTF-8"), new Date());
}
}
可以发现使用延迟队列插件,即使先发送延时时间长的消息,消息处理也不会乱序。运行结果如下:
3.4 总结
延迟队列概念:消息需要经过一定时间的延迟后在发送给消费者进行消费,存储这样的消息的队列就是延迟队列。
应用场景:比如订单超时支付自动取消,订单系统下单时设置延迟时间,并将订单消息投递到RabbitMQ中,消息超时则把订单消息发送给消费者(订单系统的订单状态处理模块),订单系统根据是否收到支付系统支付成功的消息或超时订单来修改订单状态(成功支付或超时未支付)。
还有就是用户发起退款服务,退款信息消息设置延迟24小时并发送给RabbitMQ,当24小时内商家未退款,则退款信息消息自动发送给退款系统的消费者服务,由它们判断是否需要退款(避免重复退款)。
实现方法以及优缺点对比:
1.死信队列+TTL,优点:灵活实现不需要插件;缺点:存在消息顺序性问题,需要增加代码逻辑来实现(需要额外的死信交换机和死信队列),增加系统复杂性。
2.延迟插件,优点:可以直接使用延迟队列,简化延迟消息的实现逻辑,同时没有消息顺序性问题;缺点:需要特定的插件,并且插件还得注意版本问题。
下篇文章: