一、RabbitMQ 延时队列实现方式
- 基于 TTL(Time-To-Live)+ 死信队列(Dead Letter Queue)
这是最常用的实现方式,核心思路是:
(1)消息设置过期时间(TTL)
(2)消息过期后进入绑定的死信队列
(3)消费者监听死信队列,实现延时消费
// 1. 配置交换机和队列
@Configuration
public class DelayQueueConfig {
// 普通交换机(用于接收原始消息)
public static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机
public static final String DEAD_EXCHANGE = "dead_exchange";
// 普通队列(消息过期后会进入死信队列)
public static final String NORMAL_QUEUE = "normal_queue";
// 死信队列(实际消费的队列)
public static final String DEAD_QUEUE = "dead_queue";
// 声明普通交换机
@Bean
public DirectExchange normalExchange() {
return new DirectExchange(NORMAL_EXCHANGE);
}
// 声明死信交换机
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(DEAD_EXCHANGE);
}
// 声明普通队列(设置死信相关参数)
@Bean
public Queue normalQueue() {
Map<String, Object> args = new HashMap<>();
// 设置死信交换机
args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 设置死信路由键
args.put("x-dead-letter-routing-key", "dead_routing_key");
// 队列消息统一过期时间(可选,也可在发送消息时单独设置)
// args.put("x-message-ttl", 10000);
return QueueBuilder.durable(NORMAL_QUEUE).withArguments(args).build();
}
// 声明死信队列
@Bean
public Queue deadQueue() {
return QueueBuilder.durable(DEAD_QUEUE).build();
}
// 绑定普通队列和普通交换机
@Bean
public Binding normalBinding() {
return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normal_routing_key");
}
// 绑定死信队列和死信交换机
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead_routing_key");
}
}
// 2. 发送延时消息(设置消息级别的TTL)
@Service
public class DelayMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendDelayMessage(String message, long delayMillis) {
// 设置消息过期时间
rabbitTemplate.convertAndSend(
DelayQueueConfig.NORMAL_EXCHANGE,
"normal_routing_key",
message,
correlationData -> {
correlationData.getMessageProperties().setExpiration(String.valueOf(delayMillis));
return correlationData;
}
);
}
}
// 3. 消费死信队列消息(延时后的消息)
@Service
public class DelayMessageReceiver {
@RabbitListener(queues = DelayQueueConfig.DEAD_QUEUE)
public void receiveDelayMessage(String message) {
System.out.println("收到延时消息:" + message + ",时间:" + new Date());
}
}
- 基于 RabbitMQ 插件(rabbitmq_delayed_message_exchange)
更推荐的方式,需先安装插件:
(1)下载对应版本的 rabbitmq_delayed_message_exchange 插件
(2)启用插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
@Configuration
public class DelayedExchangeConfig {
// 延时交换机(类型必须是 x-delayed-message)
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); // 指定转发类型
return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, args);
}
@Bean
public Queue delayedQueue() {
return QueueBuilder.durable("delayed_queue").build();
}
@Bean
public Binding delayedBinding() {
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed_routing_key").noargs();
}
}
// 发送延时消息
@Service
public class DelayedMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendDelayedMessage(String message, long delayMillis) {
rabbitTemplate.convertAndSend(
"delayed_exchange",
"delayed_routing_key",
message,
correlationData -> {
// 设置延时时间(毫秒)
correlationData.getMessageProperties().setHeader("x-delay", delayMillis);
return correlationData;
}
);
}
}
二、保证消息幂等性的方案
消息幂等性指:同一条消息被多次消费时,结果是一致的,不会重复处理。常见实现方式:
- 基于唯一 ID + Redis / 数据库去重
(1)发送消息时生成唯一 ID(如 UUID)
(2)消费前检查该 ID 是否已处理
(3)处理完成后标记该 ID 为已处理
@Service
public class IdempotentMessageReceiver {
@Autowired
private StringRedisTemplate redisTemplate;
@RabbitListener(queues = "delayed_queue")
public void receiveMessage(Message message) {
// 1. 获取消息唯一ID(假设放在消息头)
String messageId = message.getMessageProperties().getMessageId();
if (StringUtils.isEmpty(messageId)) {
// 非法消息,直接拒绝
throw new AmqpRejectAndDontRequeueException("消息ID为空");
}
// 2. 检查是否已处理(Redis分布式锁保证原子性)
String key = "message:processed:" + messageId;
Boolean isFirst = redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS);
if (Boolean.FALSE.equals(isFirst)) {
// 已处理过,直接返回
System.out.println("消息已处理,ID:" + messageId);
return;
}
// 3. 处理消息业务逻辑
String content = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("处理消息:" + content);
}
}
- 基于业务唯一标识去重
如果消息没有全局 ID,可使用业务字段组合作为唯一标识(如订单号):
// 例如处理订单支付消息,用订单号作为唯一标识
String orderNo = extractOrderNo(content); // 从消息中提取订单号
String key = "order:processed:" + orderNo;
// 后续逻辑同上(检查Redis -> 处理业务)
- 数据库唯一约束
通过数据库唯一索引实现幂等:
@Transactional
public void processOrder(String orderNo) {
// 插入记录前检查,或直接插入(利用唯一索引报错)
try {
orderMapper.insert(new Order(orderNo)); // 假设orderNo有唯一索引
// 处理订单逻辑
} catch (DuplicateKeyException e) {
// 已处理过,忽略
log.info("订单已处理:{}", orderNo);
}
}
总结
1.延时队列实现:
(1)简单场景用 TTL + 死信队列
(2)生产环境推荐用 rabbitmq_delayed_message_exchange 插件(更可靠)
2.幂等性保证核心:
(1)为消息生成唯一标识(全局 ID 或业务唯一键)
(2)消费前检查标识是否已处理(Redis / 数据库)
(3)确保检查和标记操作的原子性(分布式锁 / 事务)
这两种机制结合,可实现可靠的延时任务处理。