死信队列
死信(dead message) 简单理解就是因为种种原因, 无法被消费的信息, 就是死信。
当消息在⼀个队列中变成死信之后,它能被重新被发送到另⼀个交换器中,这个交换器就是DLX( Dead Letter Exchange ),绑定DLX的队列, 就称为死信队列(DeadLetter Queue,简称DLQ)。
消息变成死信⼀般是由于以下⼏种情况:
1. 消息被拒绝( Basic.Reject/Basic.Nack ),并且设置 requeue 参数为 false.
2. 消息过期。
3. 队列达到最大长度。
代码示例
死信交换机和死信队列和普通的交换机, 队列没有区别。
死信队列和交换机的绑定:
//正常交换机队列
@Bean("normalQueue")
public Queue normalQueue(){
return QueueBuilder.durable(Constants.NORMAL_QUEUE)
.deadLetterExchange(Constants.DL_EXCHANGE)
.deadLetterRoutingKey("dlx")
.ttl(10000)
.build();
}
@Bean("normalExchange")
public DirectExchange normalExchange(){
return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();
}
@Bean("normalBinding")
public Binding normalBinding(@Qualifier("normalQueue") Queue queue,@Qualifier("normalExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();
}
//死信交换机和队列
@Bean("dlQueue")
public Queue dlQueue(){
return QueueBuilder.durable(Constants.DL_QUEUE).build();
}
@Bean("dlExchange")
public DirectExchange dlExchange(){
return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();
}
@Bean("dlBinding")
public Binding dlBinding(@Qualifier("dlQueue") Queue queue,@Qualifier("dlExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();
}
程序启动之后, 观察队列:
D: durable的缩写, 设置持久化
TTL: Time to Live, 队列设置了TTL
Lim: 队列设置了⻓度(x-max-length)
DLX: 队列设置了死信交换机(x-dead-letter-exchange)
DLK: 队列设置了死信RoutingKey(x-dead-letter-routing-key)
死信队列常见面试题:
1. 死信队列的概念
死信(Dead Letter)是消息队列中的⼀种特殊消息, 它指的是那些⽆法被正常消费或处理的消息. 在消息队列系统中, 如RabbitMQ, 死信队列⽤于存储这些死信消息。
2. 死信的来源
1) 消息过期: 消息在队列中存活的时间超过了设定的TTL
2) 消息被拒绝: 消费者在处理消息时, 可能因为消息内容错误, 处理逻辑异常等原因拒绝处理该消息. 如果拒绝时指定不重新⼊队(requeue=false), 消息也会成为死信.
3) 队列满了: 当队列达到最⼤⻓度, ⽆法再容纳新的消息时, 新来的消息会被处理为死信.
3. 死信队列的应⽤场景
对于RabbitMQ来说, 死信队列是⼀个⾮常有⽤的特性. 它可以处理异常情况下,消息不能够被消费者正确消费⽽被置⼊死信队列中的情况, 应⽤程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况, 进⽽可以改善和优化系统。⽐如: ⽤⼾⽀付订单之后, ⽀付系统会给订单系统返回当前订单的⽀付状态为了保证⽀付信息不丢失, 需要使⽤到死信队列机制. 当消息消费异常时, 将消息投⼊到死信队列中, 由订单系统的其他消费者来监听这个队列, 并对数据进⾏处理(⽐如发送⼯单等,进⾏⼈⼯确认)。
常见的应⽤场景还有:
• 消息重试:将死信消息重新发送到原队列或另⼀个队列进⾏重试处理。
• 消息丢弃:直接丢弃这些⽆法处理的消息,以避免它们占⽤系统资源。
• ⽇志收集:将死信消息作为⽇志收集起来,⽤于后续分析和问题定位。
延迟队列
延迟队列(Delayed Queue),即消息被发送以后, 并不想让消费者⽴刻拿到消息, ⽽是等待特定时间后,消费者才能拿到这个消息进⾏消费。
RabbitMQ本⾝没有直接⽀持延迟队列的的功能,但是可以通过前⾯所介绍的TTL+死信队列的⽅式组合模拟出延迟队列的功能。
假设⼀个应⽤中需要将每条消息都设置为10秒的延迟, ⽣产者通过 normal_exchange 这个交换器将发送的消息存储在 normal_queue 这个队列中。消费者订阅的并非是 normal_queue 这个队列, ⽽是 dlx_queue 这个队列. 当消息从 normal_queue 这个队列中过期之后被存⼊ dlx_queue 这个
队列中,消费者就恰巧消费到了延迟10秒的这条消息。
TTL和死信队列实现延迟队列
⽣产者: 发送两条消息,⼀条消息10s后过期, 第⼆条20s后过期。
@RequestMapping("/delay")
public String delay() {
//发送带ttl的消息
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal",
"ttl test 10s..."+new Date(), messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setExpiration("10000");
//10s过期
return messagePostProcessor;
});
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal",
"ttl test 20s..."+new Date(), messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setExpiration("20000");
//20s过期
return messagePostProcessor;
});
return "发送成功!";
}
消费者:
//指定监听队列的名称
@RabbitListener(queues = Constant.DLX_QUEUE)
public void ListenerDLXQueue(Message message, Channel channel) throws
Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("%tc 死信队列接收到消息: %s, deliveryTag: %d%n", new
Date(), new String(message.getBody(),"UTF-8"),
message.getMessageProperties().getDeliveryTag());
}
延迟队列面试题:
介绍下RabbitMQ的延迟队列
延迟队列是⼀个特殊的队列, 消息发送之后, 并不⽴即给消费者, ⽽是等待特定的时间, 才发送给消费者。
延迟队列的应⽤场景有很多, 比如:
1. 订单在⼗分钟内未⽀付⾃动取消
2. ⽤⼾注册成功后, 3天后发调查问卷
3. ⽤⼾发起退款, 24⼩时后商家未处理, 则默认同意, ⾃动退款
但RabbitMQ本⾝并没直接实现延迟队列, 通常有两种⽅法:
1. TTL+死信队列组合的⽅式
2. 使⽤官⽅提供的延迟插件实现延迟功能
⼆者对⽐:
1. 基于死信实现的延迟队列
a. 优点: 1) 灵活不需要额外的插件⽀持
b. 缺点: 1) 存在消息顺序问题 2) 需要额外的逻辑来处理死信队列的消息, 增加了系统的复杂性
2. 基于插件实现的延迟队列
a. 优点: 1) 通过插件可以直接创建延迟队列, 简化延迟消息的实现. 2) 避免了DLX的时序问题
b. 缺点: 1) 需要依赖特定的插件, 有运维⼯作 2) 只适⽤特定版本
事务
RabbitMQ是基于AMQP协议实现的, 该协议实现了事务机制, 因此RabbitMQ也⽀持事务机制. SpringAMQP也提供了对事务相关的操作. RabbitMQ事务允许开发者确保消息的发送和接收是原⼦性的, 要么全部成功, 要么全部失败。
配置事务管理器:
@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true); //开启事务
return rabbitTemplate;
}
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){
return new RabbitTransactionManager(connectionFactory);
}
消息分发
RabbitMQ队列拥有多个消费者时, 队列会把收到的消息分派给不同的消费者. 每条消息只会发送给订阅列表⾥的⼀个消费者。
默认情况下, RabbitMQ是以轮询的⽅法进⾏分发的, ⽽不管消费者是否已经消费并已经确认了消息. 这种⽅式是不太合理的, 试想⼀下, 如果某些消费者消费速度慢, ⽽某些消费者消费速度快, 就可能会导致某些消费者消息积压, 某些消费者空闲, 进⽽应⽤整体的吞吐量下降。
如何处理呢? 我们可以使⽤前⾯章节讲到的channel.basicQos(int prefetchCount) ⽅法, 来限制当前信道上的消费者所能保持的最⼤未确认消息的数量。
⽐如: 消费端调⽤了 channelbasicQos(5) , RabbitMQ会为该消费者计数, 发送⼀条消息计数+1, 消费⼀条消息计数-1, 当达到了设定的上限, RabbitMQ就不会再向它发送消息了,直到消费者确认了某条消息.类似TCP/IP中的"滑动窗⼝"。
应用场景
限流
订单系统每秒最多处理5000请求, 正常情况下, 订单系统可以正常满⾜需求。
但是在秒杀时间点, 请求瞬间增多, 每秒1万个请求, 如果这些请求全部通过MQ发送到订单系统, ⽆疑会把订单系统压垮。
RabbitMQ提供了限流机制, 可以控制消费端⼀次只拉取N个请求
通过设置prefetchCount参数, 同时也必须要设置消息应答⽅式为⼿动应答
prefetchCount: 控制消费者从队列中预取(prefetch)消息的数量, 以此来实现流控制和负载均衡。
负载均衡
我们也可以⽤此配置,来实现"负载均衡"
如下图所⽰, 在有两个消费者的情况下,⼀个消费者处理任务⾮常快, 另⼀个⾮常慢,就会造成⼀个消费者会⼀直很忙, ⽽另⼀个消费者很闲. 这是因为 RabbitMQ 只是在消息进⼊队列时分派消息. 它不考虑消费者未确认消息的数量。
以上,关于RabbitMQ,希望对你有所帮助。