目录
1. 基于 TTL(Time-To-Live)+ 死信队列(Dead Letter Queue)
一、TTL
TTL(Time to Live,过期时间)即过期时间,RabbitMQ可以对消息和队列设置TTL过期时间
当消息到达存活的时间之后,还没有被消费就自动清除
类似于 购物订单超时了没有付款,订单被自动取消
1.1设置消息的TTL
目前有两种方法可以设置消息的TTL
- 设置队列的TTL,该队列中的所有消息均为相同的过期时间
- 针对消息本身设置的TTL,每条消息TTL可以不同
但是如果两种方法一起使用,则会根据两种方法的较小的数据为准
针对消息本身设置的TTL方法,是在发送消息的方针中加入expiration的属性参数(单位为毫秒)
完整代码:
//TTL
public static final String TTL_QUEUE = "ttl.queue";
public static final String TTL_QUEUE2 = "ttl2.queue";
public static final String TTL_EXCHANGE = "ttl.exchange";
@Configuration
public class TTLRabbitMQ {
@Bean("ttlQueue")
public Queue ttlQueue(){
return QueueBuilder.durable(Constants.TTL_QUEUE).build();
}
@Bean("ttlExchange")
public DirectExchange ttlExchange(){
return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();
}
@Bean("ttlBinding")
public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue, @Qualifier("ttlExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();
}
}
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Resource(name = "rabbitTemplate")
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ttl")
public String ttl() {
System.out.println("ttl...");
rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 30s...", message -> {
message.getMessageProperties().setExpiration("30000"); //单位: 毫秒, 过期时间为30s
return message;
});
rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 10s...", message -> {
message.getMessageProperties().setExpiration("10000"); //单位: 毫秒, 过期时间为10s
return message;
});
return "消息发送成功";
}
}
运行程序,观察结果
1.可以发现当发送四条消息,两个队列的Ready消息均为4,
2.10秒钟之后, 刷新⻚⾯, 发现ttl2队列的消息已被删除(再过20秒ttl队列消息也会被删除)
- 如果不设置TTL,则表⽰此消息不会过期;如果将TTL设置为0,则表⽰除⾮此时可以直接将消息投递到 消费者,否则该消息会被⽴即丢弃
1.2设置队列的TTL
设置队列TTL的方法是在创建队列时,加入x-messahe-ttl参数实现(单位是毫米)
完整代码:
//TTL
public static final String TTL_QUEUE = "ttl.queue";
public static final String TTL_QUEUE2 = "ttl2.queue";
public static final String TTL_EXCHANGE = "ttl.exchange";
//设置ttl
@Bean("ttlQueue2")
public Queue ttlQueue2(){
return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20000).build(); //设置队列的ttl为20s
}
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Resource(name = "rabbitTemplate")
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ttl2")
public String ttl2() {
System.out.println("ttl2...");
//发送普通消息
rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...");
return "消息发送成功";
}
}
启动程序,验证结果:
运⾏之后发现,新增了⼀个队列, 队列有⼀个 TTL 标识
1.发送消息后, 可以看到, Ready消息为1

- 采⽤发布订阅模式, 所有与该交换机绑定的队列(ttl_queue和ttl_queue2)都会收到消息
2.20秒钟之后, 刷新⻚⾯, 发现消息已被删除
- 由于ttl_queue队列, 未设置过期时间, 所以ttl_queue的消息未删除
1.3两者之间的区别
- 设置队列TTL属性的⽅法, ⼀旦消息过期, 就会从队列中删除
- 设置消息TTL的⽅法, 即使消息过期, 也不会⻢上从队列中删除, ⽽是在即将投递到消费者之前进⾏判定的.
- 因为设置队列过期时间, 队列中已过期的消息肯定在队列头部, RabbitMQ只要定期从队头开始扫描是否 有过期的消息即可.
- ⽽设置消息TTL的⽅式, 每条消息的过期时间不同, 如果要删除所有过期消息需要扫描整个队列, 所以不如等到此消息即将被消费时再判定是否过期, 如果过期再进⾏删除即可
二、死信队列
2.1死信的概念
- 死信简单理解就是因为种种原因,无法被消费的消息,就是死信
- 有死信,自然就有死信队列.当消息在一个队列中变成死信之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX(Dead Letter Exchange),绑定DLX的队列,就称为死信队列(Dead LetterQueue,简称DLQ).
- RabbitMQ 的死信队列(Dead Letter Queue,DLQ)是一种特殊的队列,用于存储无法被正常消费的消息(即 "死信")。当消息满足特定条件时,会被从原队列转发到死信队列,这有助于消息的可靠性处理和问题排查。
2.2死信产生的条件:
消息变成死信一般是由一下几种情况:
- 消息被拒绝(Basic.Reject / Basic.Nack),并且设置了requeue 参数为false
- 消息过期
- 队列达到最大长度
2.3死信队列的实现
死信队列的工作原理
- 为普通队列设置死信交换机(Dead Letter Exchange,DLX)
- 当消息成为死信时,RabbitMQ 会自动将其发送到设置的死信交换机
- 死信交换机通过绑定关系将消息路由到死信队列
- 可以专门创建消费者处理死信队列中的消息
注意:死信队列和死信交换机 与 普通队列和普通交换机没有区别。
实现代码:
//死信
public static final String NORMAL_QUEUE = "normal.queue";
public static final String NORMAL_EXCHANGE = "normal.exchange";
public static final String DL_QUEUE = "dl.queue";
public static final String DL_EXCHANGE= "dl.exchange";
//死信相关配置
@Configuration
public class DLConfig {
//演示TTL+死信队列模拟的延迟队列存在问题
//制造死信产生的条件
@Bean("normalQueue")
public Queue normalQueue(){
return QueueBuilder.durable(Constants.NORMAL_QUEUE) //绑定普通队列
.deadLetterExchange(Constants.DL_EXCHANGE) //绑定死信交换机
.deadLetterRoutingKey("dlx") //绑定死信路由键
//制造死信条件如下两种
.ttl(10000) //设置TTL 10秒
.maxLength(10L) //设置队列最大长度
.build();
}
/**
* 创建正常队列
* 设置死信交换机和死信路由键
* @return Queue
*/
// @Bean("normalQueue")
// public Queue normalQueue(){
// return QueueBuilder.durable(Constants.NORMAL_QUEUE)
// .deadLetterExchange(Constants.DL_EXCHANGE)
// .deadLetterRoutingKey("dlx")
// .build();
// }
/**
* 创建正常交换机
* @return DirectExchange
*/
@Bean("normalExchange")
public DirectExchange normalExchange(){
return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();
}
/**
* 绑定正常队列到正常交换机
* @param queue 正常队列
* @param exchange 正常交换机
* @return Binding
*/
@Bean("normalBinding")
public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();
}
//死信交换机和队列
/**
* 创建死信队列
* @return Queue
*/
@Bean("dlQueue")
public Queue dlQueue(){
return QueueBuilder.durable(Constants.DL_QUEUE).build();
}
/**
* 创建死信交换机
* @return DirectExchange
*/
@Bean("dlExchange")
public DirectExchange dlExchange(){
return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();
}
/**
* 绑定死信队列到死信交换机
* @param queue 死信队列
* @param exchange 死信交换机
* @return Binding
*/
@Bean("dlBinding")
public Binding dlBinding(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();
}
}
发布消息:
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Resource(name = "rabbitTemplate")
private RabbitTemplate rabbitTemplate;
/**
* 测试死信
* @return
*/
@RequestMapping("/dl")
public String dl() {
System.out.println("dl...");
//发送普通消息
// rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test...");
System.out.printf("%tc 消息发送成功 \n", new Date());
// //测试队列长度
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test..."+i);
}
return "消息发送成功";
}
}
测试结果:
- 已知普通队列长度为10和TTL为10秒,当发送20条消息后,观察队列的情况
- 当启动生产者时,可以看到两个队列均有十条消息,但因为超出普通队列长度,导致后十条消息会直接到达死信队列,等待10后再次观察,发现原有的消息也会给死信队列
2.4常⻅⾯试题
- 死信(Dead Letter)是消息队列中的⼀种特殊消息, 它指的是那些⽆法被正常消费或处理的消息,在消息队列系统中, 如RabbitMQ, 死信队列⽤于存储这些死信消息
- 消息过期: 消息在队列中存活的时间超过了设定的TTL
- 消息被拒绝: 消费者在处理消息时, 可能因为消息内容错误, 处理逻辑异常等原因拒绝处理该消息. 如果拒绝时指定不重新⼊队(requeue=false), 消息也会成为死信.
- 队列满了: 当队列达到最⼤⻓度, ⽆法再容纳新的消息时, 新来的消息会被处理为死信.
- 消息重试机制:对于处理失败的消息,可以在死信队列中进行重试
- 延迟任务:利用消息过期时间,实现延迟任务(如订单超时取消)
- 异常监控:通过监控死信队列,及时发现系统问题
- 数据恢复:死信队列可作为消息的备份,便于数据恢复
三、延迟队列
3.1概念
在 RabbitMQ 中,延迟队列(Delay Queue) 是一种特殊的消息队列,用于存储需要在指定时间后才被消费的消息。它的核心作用是实现消息的 “延时投递”,即消息发送后不会立即被消费者处理,而是等待预设的延迟时间后才进入消费流程。
3.2应用场景
- 订单超时取消:用户下单后,若 30 分钟内未支付,自动取消订单并释放库存。
- 定时任务触发:例如每天凌晨 2 点执行数据备份、定时发送提醒消息等。
- 失败重试机制:当某个操作失败时,延迟一段时间后重试(如接口调用失败后,5 分钟后再次尝试)。
- 消息通知延迟:如用户注册成功后,1 小时后发送新手引导邮件。
3.3RabbitMQ 实现延迟队列的核心原理
RabbitMQ 本身没有直接提供 “延迟队列” 的功能,但可以通过以下两种方式间接实现:
1. 基于 TTL(Time-To-Live)+ 死信队列(Dead Letter Queue)
这是最常用的实现方式,利用了 RabbitMQ 的两个特性:
- TTL(消息存活时间):设置消息的过期时间,当消息超过该时间未被消费时,会变成 “死信”(Dead Letter)。
- 死信队列(DLQ):为队列配置 “死信交换机(Dead Letter Exchange)”,当消息成为死信后,会被自动路由到死信交换机绑定的队列(即死信队列),消费者从死信队列中获取消息,实现延迟效果。
@Configuration
public class DelayConfig {
//延迟队列
public static final String DELAY_QUEUE = "delay.queue";
public static final String DELAY_EXCHANGE = "delay.exchange";
@Bean("delayQueue")
public Queue delayQueue(){
return QueueBuilder.durable(Constants.DELAY_QUEUE).build();
}
@Bean("delayExchange")
public Exchange delayExchange(){
return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();
}
@Bean("delayBinding")
public Binding delayBinding(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("delay").noargs();
}
}
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Resource(name = "rabbitTemplate")
private RabbitTemplate rabbitTemplate;
* 延迟队列测试 - 使用 TTL 实现延迟效果
* 通过设置消息的过期时间来模拟延迟队列的效果
* @return 发送结果提示信息
*/
@RequestMapping("/delay")
public String delay() {
System.out.println("delay...");
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 10s...", message -> {
message.getMessageProperties().setExpiration("10000"); //单位: 毫秒, 过期时间为10s
return message;
});
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 30s...", message -> {
message.getMessageProperties().setExpiration("30000"); //单位: 毫秒, 过期时间为30s
return message;
});
System.out.printf("%tc 消息发送成功 \n", new Date());
return "消息发送成功";
}
/**
* 延迟队列测试 - 使用 RabbitMQ 延迟插件实现延迟效果
* 利用 RabbitMQ 的延迟插件来精确控制消息的延迟时间
* @return 发送结果提示信息
*/
@RequestMapping("/delay2")
public String delay2() {
System.out.println("delay2...");
rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test 30s...", message -> {
message.getMessageProperties().setDelayLong(30000L); //单位: 毫秒, 延迟时间为30s
return message;
});
rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test 10s...", message -> {
message.getMessageProperties().setDelayLong(10000L); //单位: 毫秒, 延迟时间为10s
return message;
});
System.out.printf("%tc 消息发送成功 \n", new Date());
return "消息发送成功";
}
}
2.使用RabbitMQ延迟插件
RabbitMQ 官方提供了一个插件 rabbitmq_delayed_message_exchange
,专门用于实现延迟队列,功能更强大且灵活。
特点:
- 支持为每个消息单独设置延迟时间(无需依赖死信队列)。
- 延迟精度更高,避免了 TTL + 死信队列方式中 “消息堆积导致的延迟偏差” 问题。
实现步骤:
- 安装插件:在 RabbitMQ 服务器上安装
rabbitmq_delayed_message_exchange
插件(需重启 RabbitMQ 生效)。- 声明一个类型为
x-delayed-message
的交换机,并指定延迟类型(如x-delayed-type: direct
)。- 发送消息时,通过
x-delay
头部字段设置延迟时间(单位:毫秒)。- 交换机在延迟时间到达后,会自动将消息路由到绑定的队列,消费者直接从队列中获取消息。
两种实现方式的对比
实现方式 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
TTL + 死信队列 | 无需额外插件,依赖 RabbitMQ 原生特性 | 1. 队列级 TTL 不支持单消息单独设置延迟; 2. 消息堆积可能导致延迟时间不准 |
延迟时间固定、精度要求不高的场景 |
延迟插件(推荐) | 支持单消息单独设置延迟,精度高 | 需要额外安装插件 | 延迟时间灵活、精度要求高的场景 |
代码示例(基于 Spring Boot + 延迟插件)
以下是使用 rabbitmq_delayed_message_exchange
插件实现延迟队列的简单示例:
1. 安装插件
# 下载插件(版本需与RabbitMQ匹配)
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez
# 复制到RabbitMQ插件目录
cp rabbitmq_delayed_message_exchange-3.12.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.12.0/plugins/
# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 重启RabbitMQ
systemctl restart rabbitmq-server
2. 配置队列和交换机(Spring Boot)
@Configuration
public class DelayQueueConfig {
// 延迟交换机名称
public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
// 延迟队列名称
public static final String DELAY_QUEUE_NAME = "delay.queue";
// 路由键
public static final String DELAY_ROUTING_KEY = "delay.routing.key";
// 声明延迟交换机(类型为x-delayed-message)
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); // 指定延迟交换机的底层类型(如direct、topic等)
return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
// 声明延迟队列
@Bean
public Queue delayQueue() {
return QueueBuilder.durable(DELAY_QUEUE_NAME).build();
}
// 绑定交换机和队列
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue())
.to(delayExchange())
.with(DELAY_ROUTING_KEY)
.noargs();
}
}
3. 发送延迟消息
@Service
public class DelayMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendDelayMessage(String message, long delayTime) {
// 发送消息时,通过x-delay头设置延迟时间(毫秒)
rabbitTemplate.convertAndSend(
DelayQueueConfig.DELAY_EXCHANGE_NAME,
DelayQueueConfig.DELAY_ROUTING_KEY,
message,
correlationData -> {
correlationData.getMessageProperties().setHeader("x-delay", delayTime);
return correlationData;
}
);
System.out.println("发送延迟消息:" + message + ",延迟时间:" + delayTime + "ms");
}
}
4. 消费延迟消息
@Service
public class DelayMessageReceiver {
@RabbitListener(queues = DelayQueueConfig.DELAY_QUEUE_NAME)
public void receiveDelayMessage(String message) {
System.out.println("收到延迟消息:" + message + ",时间:" + LocalDateTime.now());
}
}
5. 测试
@SpringBootTest
public class DelayQueueTest {
@Autowired
private DelayMessageSender sender;
@Test
public void testDelayMessage() {
// 发送一条延迟5秒的消息
sender.sendDelayMessage("订单超时取消提醒", 5000);
}
}
输出结果:
发送延迟消息:订单超时取消提醒,延迟时间:5000ms
(5秒后)
收到延迟消息:订单超时取消提醒,时间:2025-08-18T15:30:05
注意事项
- 延迟精度:延迟插件的精度较高,但受 RabbitMQ 服务器负载影响,可能存在毫秒级偏差。
- 消息持久化:若需保证消息不丢失,需将队列、交换机和消息都设置为持久化(durable)。
- 插件版本兼容:延迟插件版本需与 RabbitMQ 版本匹配,否则可能无法正常工作。
- 避免消息堆积:延迟队列中的消息在延迟时间到达前会暂存在内存或磁盘中,需合理设置队列容量,避免堆积过多消息影响性能。
四、事务
RabbitMQ是基于AMQP协议实现的, 该协议实现了事务机制, 因此RabbitMQ也⽀持事务机制. Spring AMQP也提供了对事务相关的操作. RabbitMQ事务允许开发者确保消息的发送和接收是原⼦性的, 要么全部成功, 要么全部失败
rabbitTemplate.setChannelTransacted(true);
是 Spring AMQP 中用于开启 RabbitMQ 事务模式的配置,它会为 RabbitTemplate
操作的信道(Channel)启用事务支持。
相关配置
@Configuration
public class TransactionConfig {
@Bean
public RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean("transQueue")
public Queue transQueue() {
return QueueBuilder.durable("trans_queue").build();
}
}
生产者代码
@RequestMapping("/trans")
@RestController
public class TransactionProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
@RequestMapping("/send")
public String send(){
rabbitTemplate.convertAndSend("","transQueue", "trans teat 1......");
int a = 5/0;
rabbitTemplate.convertAndSend("","transQueue", "trans teat 2......");
return "发送成功";
}
}
- 不加 @Transactional , 会发现消息1发送成功
- 添加 @Transactional , 消息1和消息2全部发送失败
五、消息分发
消息分发机制主要有两种应用场景:
- 限流
- 负载均衡
5.1 介绍
当队列拥有多个消费者时,RabbitMQ默认会通过轮询的方式将消息平均的分发给每个消费者,但是没有可能其中一部分消费者消费消息的速度很快,另一部分消费者消费很慢呢?其实是有可能的,那么这就有可能导致这个系统的吞吐量下降,那如何分发消息才是合理的?在前面学习RabbitMQ JDK Client 时,我们可以通过 channel.basicQos(int prefetchCount) 来设置当前信道的消费者所能拥有的最大未确认消息数量,在Spring AMQP中我们可以通过配置 prefetch 来达到同样的效果,使用消息分发机制时消息确认机制必须为手动确认。
5.2 限流
在秒杀场景下,假设订单系统每秒能处理的订单数是10000,但是秒杀场景下可能某一瞬间会有50000订单数,这就会导致订单系统处理不过来而压垮。可以利用basicQos()来进行限流:
- SpringBoot配置文件用prefetch控制限流数,对应channel.basicQos(int prefetchCount)的prefetchCount。
- 开启消息确认机制的手动确认模式manual。未手动确认的消息都视为未消费完的消费,prefetchCount并不会-1。
配置信息:
spring:
application:
name: rabbit-extensions-demo
rabbitmq:
addresses: amqp://admin:admin@8.140.60.17:5672/xibbei
listener:
simple:
acknowledge-mode: manual #消息接收确认(MQ-消费者):none(自动确认)、auto(正常自动确认,异常不确认)、manual(手动确认)
prefetch: 5 #控制消费者从队列中预取(prefetch)消息的数量
retry:
enabled: true # 启用重试机制
initial-interval: 5000ms # 初始重试间隔5秒
max-attempts: 5 # 最多重试5次
声明队列和交换机:
public class RabbitMQConnection {
public static final String QOS_QUEUE = "qos.queue";
public static final String QOS_EXCHANGE = "qos.exchange";
}
@Configuration
public class QosConfig {
@Bean("qosQueue")
public Queue qosQueue(){
return QueueBuilder.durable(Constants.QOS_QUEUE).build();
}
@Bean("qosExchange")
public Exchange qosExchange(){
return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();
}
@Bean("qosBinding")
public Binding qosBinding(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();
}
}
生产者代码:
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Resource(name = "rabbitTemplate")
private RabbitTemplate rabbitTemplate;
@RequestMapping("qos")
public String qos() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(RabbitMQConnection.QOS_EXCHANGE, "qos", "Hello SpringBoot RabbitMQ");
}
return "发送成功";
}
}
消费者代码:
@Component
public class QosListener {
@RabbitListener(queues = Constants.QOS_QUEUE)
public void handMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//消费者逻辑
System.out.printf("111接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
Thread.sleep(2000);
// System.out.println("业务处理完成");
//肯定确认
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
//否定确认
channel.basicNack(deliveryTag, false, true);
}
}
}
5.3负载均衡
负载均衡主要是根据不同消费者消费消息的速度来协调它们的压力,比如一个消费者处理消息快,另一个消费者处理消息满,那么就可以配置 prefetch(如配置prefetch为1),就可以使这些消费者还未处理完当前消息,不允许处理下一条,这样就可以使处理消息满的消费者可以慢慢处理一条消息,而处理消息快的消费者,可以在处理完一条消息后,继续处理下一条
代码示例:
一、修改 prefetch 配置
spring:
application:
name: rabbit-extensions-demo
rabbitmq:
addresses: amqp://admin:admin@8.140.60.17:5672/xibbei
listener:
simple:
acknowledge-mode: manual # 手动确认模式
prefetch: 1 # 每次只预取1条消息
二、修改消费者代码(取消手动确认的注释并新增一个消费者)
@Component
public class QosListener {
@RabbitListener(queues = Constants.QOS_QUEUE)
public void handMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//消费者逻辑
System.out.printf("111接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
Thread.sleep(2000);
// System.out.println("业务处理完成");
//肯定确认
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
//否定确认
channel.basicNack(deliveryTag, false, true);
}
}
@RabbitListener(queues = Constants.QOS_QUEUE)
public void handMessage2(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//消费者逻辑
System.out.printf("222接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
Thread.sleep(1000);
// System.out.println("业务处理完成");
//肯定确认
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
//否定确认
channel.basicNack(deliveryTag, false, true);
}
}
}
三、测试
接收到消息: qos test...1, deliveryTag: 1
消费者2接收到消息: qos test...0, deliveryTag: 1
接收到消息: qos test...2, deliveryTag: 2
接收到消息: qos test...3, deliveryTag: 3
接收到消息: qos test...4, deliveryTag: 4
接收到消息: qos test...5, deliveryTag: 5
消费者2接收到消息: qos test...6, deliveryTag: 2
接收到消息: qos test...7, deliveryTag: 6
接收到消息: qos test...8, deliveryTag: 7
接收到消息: qos test...9, deliveryTag: 8
接收到消息: qos test...10, deliveryTag: 9
消费者2接收到消息: qos test...11, deliveryTag: 3
接收到消息: qos test...12, deliveryTag: 10
接收到消息: qos test...13, deliveryTag: 11
接收到消息: qos test...14, deliveryTag: 12
接收到消息: qos test...15, deliveryTag: 13
消费者2接收到消息: qos test...16, deliveryTag: 4
接收到消息: qos test...17, deliveryTag: 14
接收到消息: qos test...18, deliveryTag: 15
接收到消息: qos test...19, deliveryTag: 16