目录
1.TTL
1.1设置消息的TTL
目前有两种方法可以设置消息的TTL
一是设置队列的TTL,队列中所有消息都有相同的过期时间,二是对消息本身进行单独设置,每条消息的TTL可以不同,如果两种方法一起使用,则消息的TTL以两者之间较小的那个数值决定。
先看针对每条消息设置TTL
针对每条消息设置TTL的方法是在发送消息的方法中加入expiration的属性参数,单位为ms
1.1.1配置交换机&队列
//TTL
public static final String TTL_QUEUE = "ttl_queue";
public static final String TTL_EXCHANGE_NAME = "ttl_exchange";
//ttl
//1. 交换机
@Bean("ttlExchange")
public Exchange ttlExchange() {
return
ExchangeBuilder.fanoutExchange(Constant.TTL_EXCHANGE_NAME).durable(true).build(
);
}
//2. 队列
@Bean("ttlQueue")
public Queue ttlQueue() {
return QueueBuilder.durable(Constant.TTL_QUEUE).build();
}
//3. 队列和交换机绑定 Binding
@Bean("ttlBinding")
public Binding ttlBinding(@Qualifier("ttlExchange") FanoutExchange exchange,
@Qualifier("ttlQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange);
}
1.1.2发送消息
@RequestMapping("/ttl")
public String ttl(){
String ttlTime = "10000";//10s
rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl
test...", messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setExpiration(ttlTime);
return messagePostProcessor;
});
return "发送成功!";
}
1.1.3运行程序观察结果
发送消息后可以看到Ready数为1
十秒后,刷新页面发现消息已消失
1.2设置队列的TTL
设置队列的TTL的方法是在创建队列时,加入x-message-ttl参数实现的,单位是ms
1.2.1配置队列和交换机的绑定关系
public static final String TTL_QUEUE2 = "ttl_queue2";
//设置ttl
@Bean("ttlQueue2")
public Queue ttlQueue2() {
//设置20秒过期
return QueueBuilder.durable(Constant.TTL_QUEUE2).ttl(20*1000).build();
}
//3. 队列和交换机绑定 Binding
@Bean("ttlBinding2")
public Binding ttlBinding2(@Qualifier("ttlExchange") FanoutExchange exchange,
@Qualifier("ttlQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(exchange);
}
设置过期时间,也可以采用以下方式:
@Bean("ttlQueue2")
public Queue ttlQueue2() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl",20000);//20秒过期
return
QueueBuilder.durable(Constant.TTL_QUEUE2).withArguments(arguments).build();
}
1.2.2发送消息
@RequestMapping("/ttl")
public String ttl() {
// String ttlTime = "30000";//10s
// //发送带ttl的消息
// rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test...", //messagePostProcessor -> {
// messagePostProcessor.getMessageProperties().setExpiration(ttlTime);
// return messagePostProcessor;
// });
//发送不带ttl的消息
rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl
test...");
return "发送成功!";
}
1.2.3运行程序观察结果
运行后发现,新增了一个队列,队列Features有一个TTL标识
发送消息后,可以看到Ready消息为1
采⽤发布订阅模式, 所有与该交换机绑定的队列都会收到消息
20s后,刷新页面,发现消息已经删除
由于ttl_queue队列, 未设置过期时间, 所以ttl_queue的消息未删除。
1.3两者区别
2.死信队列
死信(dead message)简单理解就是因为种种原因无法被消费的信息,就是死信
有死信,自然就有死信队列。当消息在一个队列中变成死信之后,它能被重新发送到另一个交换机中,这个交换机就是DLX(Dead Letter Exchange),绑定DLX的队列,就称为死信队列(Dead Letter Queue)简称DLQ
消息变成死信主要有以下几个原因:
1.消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false
2.消息过期
3.队列达到最大长度
2.1 声名队列和交换机
//死信队列
public static final String DLX_EXCHANGE_NAME = "dlx_exchange";
public static final String DLX_QUEUE = "dlx_queue";
public static final String NORMAL_EXCHANGE_NAME = "normal_exchange";
public static final String NORMAL_QUEUE = “narmal_queue”;
/**
* 死信队列相关配置
*/
@Configuration
public class DLXConfig {
//死信交换机
@Bean("dlxExchange")
public Exchange dlxExchange(){
return
ExchangeBuilder.topicExchange(Constant.DLX_EXCHANGE_NAME).durable(true).build()
;
}
//2. 死信队列
@Bean("dlxQueue")
public Queue dlxQueue() {
return QueueBuilder.durable(Constant.DLX_QUEUE).build();
}
//3. 死信队列和交换机绑定 Binding
@Bean("dlxBinding")
public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange,
@Qualifier("dlxQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();
}
//正常交换机
@Bean("normalExchange")
public Exchange normalExchange(){
return
ExchangeBuilder.topicExchange(Constant.NORMAL_EXCHANGE_NAME).durable(true).buil
d();
}
//正常队列
@Bean("normalQueue")
public Queue normalQueue() {
return QueueBuilder.durable(Constant.NORMAL_QUEUE).build();
}
//正常队列和交换机绑定 Binding
@Bean("normalBinding")
public Binding normalBinding(@Qualifier("normalExchange") Exchange
exchange, @Qualifier("normalQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();
}
}
2.2正常队列绑定死信交换机
当这个队列中存在死信时,RabbitMQ会自动的把这个消息重新发布到设置的DLX上,进而被路由到另一个队列,即死信队列,可以监听这个死信队列中的消息以进行相应的处理
@Bean("normalQueue")
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange",Constant.DLX_EXCHANGE_NAME);//绑定死信队列
arguments.put("x-dead-letter-routing-key","dlx");//设置发送给死信队列的
RoutingKey
return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();
}
简写为
return QueueBuilder.durable(Constant.NORMAL_QUEUE)
.deadLetterExchange(Constant.DLX_EXCHANGE_NAME)
.deadLetterRoutingKey("dlx").build();
2.3制造死信产生的条件
@Bean("normalQueue")
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange",Constant.DLX_EXCHANGE_NAME);//绑定死
信队列
arguments.put("x-dead-letter-routing-key","dlx");//设置发送给死信队列的
RoutingKey
//制造死信产⽣的条件
arguments.put("x-message-ttl",10000);//10秒过期
arguments.put("x-max-length",10);//队列⻓度
return
QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();
}
简写为:
return QueueBuilder.durable(Constant.NORMAL_QUEUE)
.deadLetterExchange(Constant.DLX_EXCHANGE_NAME)
.deadLetterRoutingKey("dlx")
.ttl(10*1000)
.maxLength(10L)
.build();
2.4 发送消息
@RequestMapping("/dlx")
public void dlx() {
//测试过期时间, 当时间达到TTL, 消息⾃动进⼊到死信队列
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal",
"dlx test...");
//测试队列⻓度
// for (int i = 0; i < 20; i++) {
// rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");
// }
// //测试消息拒收
// rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");
}
2.5测试死信
程序启动后观察队列:

10s后:
2.6常见面试题
死信队列作为RabbitMQ的高级特性,也是面试的一大重点