在许多业务场景中,我们都需要处理延迟任务。例如:
- 用户下单后,30分钟内未支付则自动取消订单。
- 用户注册成功后,5分钟后发送一份引导邮件。
- 创建一个定时任务,在未来某个指定时间点执行。
这些场景的核心需求都是:在未来的某个时间点,触发一个特定的动作。RabbitMQ 本身没有直接提供延迟队列的功能,但我们可以巧妙地利用其两个核心特性——**消息存活时间(TTL)和死信交换机(Dead-Letter-Exchange)**来组合实现一个强大而可靠的延迟消息系统。
本文将通过一个完整的 Spring Boot + RabbitMQ 示例,深入剖析如何使用 DLX + TTL 实现延迟消息,分析其工作原理、优缺点,并介绍通过插件来解决的更成熟的解决方案。
核心概念
在深入代码之前,我们必须先理解两个关键概念。
1. 消息存活时间 (Time-To-Live, TTL)
TTL 用于设置消息在队列中的最大存活时间,单位为毫秒。当一条消息在一个队列中的存留时间超过了其 TTL 值,它就会“过期”。
TTL 有两种设置方式:
- 对整个队列设置 TTL:通过在
queue.declare
时添加x-message-ttl
参数。该队列中的所有消息都将拥有相同的存活时间。 - 对单条消息设置 TTL:在发送消息时,通过设置消息属性(
expiration
)来指定。这样可以为每条消息赋予不同的存活时间。
当一条消息过期后,它会变成“死信”(Dead Letter)。
2. 死信交换机 (Dead-Letter-Exchange, DLX)
死信交换机本质上也是一个普通的交换机。当一个队列中的消息满足以下任一条件时,它就会变成“死信”,并被 RabbitMQ 自动重新发布到该队列预先配置好的一个“死信交换机”上。
- 消息 TTL 过期(我们本次实现的核心)。
- 消息被消费者拒绝(
basic.reject
或basic.nack
),并且requeue
参数被设置为false
。 - 队列达到最大长度(
x-max-length
)或最大容量(x-max-length-bytes
),导致最早的消息被丢弃。
实现原理:DLX + TTL 组合拳
我们的延迟队列实现思路正是利用了上述两个特性:
- 创建一个普通的业务队列(我们称之为
normal.queue
),不设置任何消费者。 - 为这个
normal.queue
配置一个死信交换机(dead.letter.exchange
)。 - 当生产者发送一条消息时,我们为其设置一个 TTL(例如10秒),并将其发送到与
normal.queue
绑定的业务交换机(normal.exchange
)。 - 由于
normal.queue
没有消费者,消息会在队列中静静地等待。 - 10秒后,消息的 TTL 到期,它变成了“死信”。
- RabbitMQ 自动将这条死信消息从
normal.queue
中移除,并将其路由到预设的dead.letter.exchange
。 dead.letter.exchange
再根据其路由规则,将消息投递到最终的“死信队列”(dead.letter.queue
)。- 我们的消费者只监听这个死信队列。一旦收到消息,就意味着延迟时间已到,可以开始处理业务。
配置Exhange/Queue
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DelayedConfig {
// 1. 声明普通的业务交换机和队列
public static final String NORMAL_EXCHANGE = "normal.exchange";
public static final String NORMAL_QUEUE = "normal.queue";
public static final String NORMAL_ROUTING_KEY = "normal.key";
// 2. 声明死信交换机和死信队列
public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";
public static final String DEAD_LETTER_ROUTING_KEY = "dead.key";
@Bean
public DirectExchange normalExchange() {
return new DirectExchange(NORMAL_EXCHANGE);
}
// 这就是出错的队列声明
@Bean
public Queue normalQueue() {
return QueueBuilder.durable(NORMAL_QUEUE)
.withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)
.withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY)
.build();
}
@Bean
public Binding normalBinding(Queue normalQueue, DirectExchange normalExchange) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY);
}
// 3. 确保死信交换机和死信队列也作为 Bean 被声明
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
@Bean
public Queue deadLetterQueue() {
return new Queue(DEAD_LETTER_QUEUE);
}
@Bean
public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
// 死信队列绑定到死信交换机,使用普通队列指定的 dead-letter-routing-key
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTING_KEY);
}
}
配置解读:
normalQueue()
是配置的核心。我们通过.withArgument()
方法为normal.queue
设置了两个重要参数:x-dead-letter-exchange
:指定了当队列里的消息变成死信后,应该被发往哪个交换机。x-dead-letter-routing-key
:指定了死信消息被发送到死信交换机时,使用哪个路由键。这允许我们更灵活地控制死信消息的流向。
生产者
发送带TTL的消息
生产者将消息发送到业务交换机normal.exchange
,并为每条消息动态设置expiration
属性。
@RestController
@RequestMapping
public class DelayController {
private final RabbitTemplate rabbitTemplate;
public DelayController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@RequestMapping("/delay")
public String delay(){
//发送带ttl的消息
System.out.println("发送延迟消息, 当前时间: " + new Date());
rabbitTemplate.convertAndSend("normal.exchange", "normal.key",
"delay test with ttl 10s..."+new Date(),message -> {
message.getMessageProperties().setExpiration("10000");
return message;
});
rabbitTemplate.convertAndSend("normal.exchange", "normal.key",
"delay test with ttl 20s..."+new Date(), message -> {
message.getMessageProperties().setExpiration("20000");
return message;
});
return "success";
}
}
消费者
监听死信队列
消费者只关心最终的业务处理,所以它监听的是dead.letter.queue
。
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class DelayConsumer {
@RabbitListener(queues = "dead.letter.queue")
public void ListenerDLXQueue(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("%tc 死信队列接收到消息: %s, deliveryTag: %d%n", new Date(), new String(message.getBody(),"UTF-8"),
deliveryTag);
}
}
结果分析与问题洞察
当我们访问 http://127.0.0.1:8080/delay
后,观察控制台输出如下:
发送延迟消息, 当前时间: 周日 8月 10 17:28:13 CST 2025
周日 8月 10 17:28:23 CST 2025 死信队列接收到消息: delay test with ttl 10s...Sun Aug 10 17:28:11 CST 2025, deliveryTag: 1
周日 8月 10 17:28:34 CST 2025 死信队列接收到消息: delay test with ttl 20s...Sun Aug 10 17:28:13 CST 2025, deliveryTag: 2
但是如果将投递顺序调换
@RequestMapping("/delay")
public String delay(){
//发送带ttl的消息
System.out.println("发送延迟消息, 当前时间: " + new Date());
rabbitTemplate.convertAndSend("normal.exchange", "normal.key",
"delay test with ttl 20s..."+new Date(),message -> {
message.getMessageProperties().setExpiration("20000");
return message;
});
rabbitTemplate.convertAndSend("normal.exchange", "normal.key",
"delay test with ttl 10s..."+new Date(), message -> {
message.getMessageProperties().setExpiration("10000");
return message;
});
return "success";
}
再次进行请求,你会看到延迟20s的消息和延迟10s的消息是同时被处理的
发送延迟消息, 当前时间: Sun Aug 10 22:27:15 CST 2025
周日 8月 10 22:27:36 CST 2025 死信队列接收到消息: delay test with ttl 20s...Sun Aug 10 22:27:15 CST 2025, deliveryTag: 1
周日 8月 10 22:27:36 CST 2025 死信队列接收到消息: delay test with ttl 10s...Sun Aug 10 22:27:15 CST 2025, deliveryTag: 2
这是为什么呢?
这是 DLX+TTL 方案最核心的一个“陷阱”:RabbitMQ 只会检查队列头部的消息是否过期。如果队头的消息没有过期,那么后面的消息就算已经过期了,也无法被投递到死信交换机。
在我们的例子中:
20s TTL
的消息先入队,位于队头。10s TTL
的消息后入队,位于其后。- RabbitMQ 盯着队头的
20s TTL
消息。20秒后,该消息过期,被投递到死信队列。 - 此时,
10s TTL
的消息才成为新的队头。RabbitMQ 开始检查它,发现它的 TTL 已经结束了,直接进行投递
这就导致了延迟的“串行”执行,延迟时间有可能会被延后。
改进方案:使用延迟消息插件
为了解决上述的队列头部阻塞问题,并实现更精确、更灵活的延迟控制,RabbitMQ 官方提供了一个非常强大的插件:rabbitmq-delayed-message-exchange
。
插件原理
该插件提供了一种新的交换机类型:x-delayed-message
。这种交换机在接收到消息后,并不会立即投递到队列,而是会根据消息头中的 x-delay
属性(单位毫秒)来等待相应的时间,然后再进行投递。这个过程是在交换机内部完成的,不依赖于队列,因此不会产生队头阻塞问题。
插件代码实践
假设您已经在 RabbitMQ 服务器上启用了 rabbitmq-delayed-message-exchange
插件。
1. 新增插件配置
配置变得异常简单,不再需要死信队列和业务队列的区分。
// 新增一个配置类用于演示插件用法
@Configuration
public class DelayedPluginConfig {
public static final String DELAYED_EXCHANGE = "delayed.plugin.exchange";
public static final String DELAYED_QUEUE = "delayed.plugin.queue";
public static final String DELAYED_ROUTING_KEY = "delayed.plugin.key";
@Bean
public CustomExchange delayedExchange() {
// 声明一个 x-delayed-message 类型的交换机
// durable: 持久化
// autoDelete: 不自动删除
return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false,
Map.of("x-delayed-type", "direct")); // 指定基础交换机类型
}
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE);
}
@Bean
public Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
2. 新增插件生产者
发送消息时,我们不再设置 expiration
,而是添加一个 x-delay
的 header。
// 在 DelayController 中新增一个方法
@RestController
public class DelayController {
// ... 原有代码 ...
@RequestMapping("/delay-plugin")
public String delayPlugin() {
System.out.println("发送延迟消息 (Plugin), 当前时间: " + new Date());
// 发送延迟20秒的消息
rabbitTemplate.convertAndSend(DelayedPluginConfig.DELAYED_EXCHANGE, DelayedPluginConfig.DELAYED_ROUTING_KEY,
"delay test with plugin 20s..." + new Date(), message -> {
message.getMessageProperties().setHeader("x-delay", 20000); // 20秒
return message;
});
// 发送延迟10秒的消息
rabbitTemplate.convertAndSend(DelayedPluginConfig.DELAYED_EXCHANGE, DelayedPluginConfig.DELAYED_ROUTING_KEY,
"delay test with plugin 10s..." + new Date(), message -> {
message.getMessageProperties().setHeader("x-delay", 10000); // 10秒
return message;
});
return "success (plugin)";
}
}
3. 新增插件消费者
消费者直接监听最终的业务队列即可。
// 新增一个消费者类
@Component
public class DelayedPluginConsumer {
@RabbitListener(queues = DelayedPluginConfig.DELAYED_QUEUE)
public void listenDelayedQueue(Message message) throws Exception {
System.out.printf("插件延迟队列 %tc 接收到消息: %s%n", new Date(), new String(message.getBody(),"UTF-8"));
}
}
现在访问 http://127.0.0.1:8080/delay-plugin
,你会发现,即使是后发送的10秒延迟消息,也会比先发送的20秒延迟消息先被消费,完美解决了队头阻塞问题。
结论与选型建议
本文我们详细探讨了两种实现 RabbitMQ 延迟消息的方法:
特性 | DLX + TTL 方案 | 延迟消息插件方案 |
---|---|---|
实现方式 | 依赖队列TTL和死信交换机 | 依赖特定类型的交换机 (x-delayed-message ) |
配置复杂度 | 较高,需要配置两套Exchange和Queue | 较低,一套Exchange和Queue即可 |
延迟精确性 | 受队头消息影响,不精确 | 精确,消息间延迟互不影响 |
依赖 | RabbitMQ原生功能,无需额外插件 | 需要在服务端安装并启用 rabbitmq-delayed-message-exchange 插件 |
适用场景 | 业务场景简单,队列中消息TTL固定,或能容忍延迟误差 | 对延迟时间精确性要求高的场景 |
总结建议:
- 首选延迟消息插件:对于绝大部分需要延迟消息的场景,延迟插件提供了更简单、更精确、更符合直觉的解决方案。它是目前实现延迟消息的最佳实践。
- 了解 DLX+TTL:虽然插件方案更优,但理解 DLX+TTL 的工作原理非常有价值。它不仅能让你实现延迟队列,更能加深你对 RabbitMQ 核心机制的理解,这对于问题排查和设计更复杂的系统大有裨益。