一、何为死信队列
RabbitMQ的死信队列(Dead Letter Queue,DLQ)是一种特殊的队列机制,用于处理那些无法被正常消费的消息。这些消息可能由于各种原因无法被消费者正确处理,如果不加以处理,可能会导致队列堵塞,影响系统的正常运行。
1、死信队列的作用
- 隔离问题消息:将无法处理的消息转移到专门的死信队列中,避免影响其他正常消息的消费。
- 故障排查:通过分析死信队列中的消息,可以快速定位和解决消息消费失败的原因。
- 提高系统稳定性:防止因个别消息处理失败而导致整个队列阻塞,从而提升系统的健壮性和可靠性。
2、死信产生的原因
在 RabbitMQ 中,消息变成死信的情况主要有以下几种:
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false
如果消费者明确拒绝了某条消息,并且不希望重新入队,则该消息会被发送到死信队列(前提是配置了死信队列)。 - 消息过期(TTL 过期)
如果消息设置了生存时间(Time To Live, TTL),并且在队列中等待的时间超过了这个限制,消息也会成为死信。 - 队列达到最大长度限制
当队列已经满了(即达到了预设的最大长度),新进入的消息会根据策略被丢弃或放入死信队列。
3、死信队列的核心组件
- 死信交换机(Dead Letter Exchange, DLX) 每个普通队列可以通过配置指定一个死信交换机。当消息变成死信时,RabbitMQ 会自动将该消息发布到对应的 DLX。
- 绑定键(Routing Key) 可以为 DLX 指定一个绑定键,死信消息将会使用这个绑定键来路由到相应的死信队列。
- 死信队列(DLQ) 实际上是一个普通的队列,只是它接收的是来自 DLX 的死信消息
4、私信队列消息示意图
+---------------------+
| |
| 生产者 Producer |
| |
+----------+----------+
|
| 发送消息到业务交换机
v
+-------------------------+
| |
| 业务交换机 BusinessExchange |
| |
+------------+------------+
|
| 根据路由键 routingKey
v
+----------------------------+
| |
| 业务队列 BusinessQueue |
| (配置了 DLX 和 DLK) |
| |
+-------------+--------------+
|
+---------+------------------+
| | |
消息被正常消费 消息达到最大重试次数
(channel.basicAck) 或被拒绝 requeue=false
| | |
v v v
+------------------+ +--------------------+
| | | |
| 正常消费者 | | 死信交换机 DLX |
| | | |
+------------------+ +---------+----------+
|
+-------v--------+
| |
| 死信队列 DLQ |
| |
+--------+-------+
|
+------v-------+
| |
| 死信消费者 |
| |
+--------------+
二、准备基本环境
1、pom.xml引入的java包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${springboot-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${springboot-version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${springboot-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.57</version>
</dependency>
</dependencies>
2、yaml配置文件
# 8004是zookeeper服务器的支付服务提供者端口号
server:
port: 8004
spring:
application:
name: cloud-mq
rabbitmq:
addresses: 192.168.96.133
port: 5672
username: guest
password: guest
virtual-host: /
#消费者配置
listener:
#todo 切记,设置了重拾机制,要抛出异常,不可try catch 后不抛异常,否则重试机制失效
simple:
#开启ack 手动确认消息是否被消费成功
acknowledge-mode: manual
retry:
enabled: true
# 消费失败后,继续消费,然后最多消费5次就不再消费。
max-attempts: 5
# 消费失败后 ,重试初始间隔时间 2秒
initial-interval: 2000
# 重试最大间隔时间5秒
max-interval: 5000
# 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
multiplier: 2
direct:
#开启ack 手动确认消息是否被消费成功
acknowledge-mode: manual
#todo 切记,设置了重拾机制,要抛出异常,不可try catch 后不抛异常,否则重试机制失效
retry:
enabled: true
# 消费失败后,继续消费,然后最多消费3次就不再消费。
max-attempts: 3
# 消费失败后 ,重试初始间隔时间 3秒
initial-interval: 3000
# 重试最大间隔时间
max-interval: 7000
# 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
multiplier: 2
# 生产者配置
template:
retry:
# 开启消息发送失败重试机制
enabled: true
# 生产者 true-开启消息抵达队列的确认
publisher-returns: false
#simple 配置用于设置 RabbitMQ 消息生产者的消息确认类型为“简单确认”。这意味着当消息被发送到 RabbitMQ 之后,只有在消息成功投递到队列中后,RabbitMQ 才会向生产者发送一个确认(ack)通知。如果消息未能成功投递,则不会收到确认。
#该配置通常与 publisher-returns: true 一起使用以启用消息返回机制,但在此配置中 publisher-returns 被设置为 false,表示不启用消息返回功能
publisher-confirm-type: simple
3、主启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author 10564
*/
@SpringBootApplication
public class ApplicationRabbitmq {
public static void main(String[] args) {
SpringApplication.run(ApplicationRabbitmq.class, args);
}
}
三、手动确认消息示例
1、定义消息队列Queue名称
package org.xwb.springcloud.constant;
/**
* @author Administrator
*/
public class MqDeadLetterConstant {
/**
* 手动确认 正常消息队列名称
*/
public static final String BUSINESS_QUEUE = "businessQueue";
/**
* 手动确认 正常交换机名称
*/
public static final String BUSINESS_EXCHANGE = "businessExchange";
/**
* 手动确认 路由key名称
*/
public static final String BUSINESS_ROUTING_KEY = "businessRoutingKey";
/**
* 死信队列名称DeadLetter
*/
public static final String DEAD_LETTER_QUEUE = "deadLetterQueue";
/**
* 死信交换机名称
*/
public static final String DEAD_LETTER_EXCHANGE = "deadLetterExchange";
/**
* 死信路由key名称
*/
public static final String DEAD_LETTER_ROUTING_KEY = "deadLetterRoutingKey";
}
2、配置类Configuration
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqDeadLetterConstant;
/**
* @author Administrator
*/
@Configuration
public class RabbitmqDeadLetterConfig {
/**
* 死信队列
*/
@Bean
public Queue deadLetterQueue() {
return new Queue(MqDeadLetterConstant.DEAD_LETTER_QUEUE, true);
}
/**
* 死信交换机
*/
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(MqDeadLetterConstant.DEAD_LETTER_EXCHANGE);
}
// 将死信队列绑定到死信交换机,并设置路由键与死信队列的路由键一致
@Bean
public Binding bindingDeadLetter() {
// 绑定死信队列到死信交换机和路由键一致
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(MqDeadLetterConstant.DEAD_LETTER_ROUTING_KEY);
}
//todo =======================业务队列、交换机配置========================
/**
* 定义业务队列
* todo 配置业务队列,并设置死信交换机、死信队列、TTL、路由键等信息
*/
@Bean
public Queue businessQueue() {
//业务的队列
return QueueBuilder.durable(MqDeadLetterConstant.BUSINESS_QUEUE)
//设置死信交换机(DLX),当消息无法被业务消费时(例如过期、拒绝等),消息会被转发到指定的死信交换机
.deadLetterExchange(MqDeadLetterConstant.DEAD_LETTER_EXCHANGE)
//设置死信队列的路由键,用于将消息正确地路由到死信队列中
.deadLetterRoutingKey(MqDeadLetterConstant.DEAD_LETTER_ROUTING_KEY)
// 可选:设置消息过期时间
.ttl(10000)
.build();
}
/**
* 业务队列 交换机
*/
@Bean
public DirectExchange ackBusinessExchange() {
return new DirectExchange(MqDeadLetterConstant.BUSINESS_EXCHANGE);
}
/**
* 将业务队列绑定到业务交换机上,指定交换机的路由
*/
@Bean
public Binding bindingBusiness() {
return BindingBuilder.bind(businessQueue()).to(ackBusinessExchange()).with(MqDeadLetterConstant.BUSINESS_ROUTING_KEY);
}
}
3、生产者Producer
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqDeadLetterConstant;
import javax.annotation.Resource;
/**
* @author Administrator
*/
@Component
public class DeadLetterProducer {
private static final Logger log = LoggerFactory.getLogger(DeadLetterProducer.class);
@Resource
private RabbitTemplate rabbitTemplate;
public void senderDeadLetterMessage(String message) {
log.info("\n生产者DeadLetter发送消息:【{}】\n", message);
//参数1:交换机名称
//参数2:路由key
//参数3:消息
//topic_exchange交换机 需要指定路由key 绑定到该交换机且符合路由key的队列都会收到消息
rabbitTemplate.convertAndSend(MqDeadLetterConstant.BUSINESS_EXCHANGE, MqDeadLetterConstant.BUSINESS_ROUTING_KEY, message);
}
}
4、消费者Consumer
1、直接进入死信队列 测试结果
import com.alibaba.fastjson2.util.DateUtils;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqDeadLetterConstant;
import java.io.IOException;
import java.util.Date;
/**
* @author 10564
*/
@Component
public class DeadLetterBusinessAndConsumer {
private static final Logger log = LoggerFactory.getLogger(DeadLetterBusinessAndConsumer.class);
/**
* @param msg 消息内容
* @param channel 通道对象
* @param tag 消息的tag
*/
@RabbitListener(queues = MqDeadLetterConstant.BUSINESS_QUEUE)
public void receiveAckBusinessQueueMessage(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
try {
log.info("\n消费者eadLetter接收消息:【{}】\n", msg);
if ("normal".equals(msg)) {
log.info("\n已被正常消费 normal 【{}】,【{}】\n", msg, DateUtils.format(new Date()));
//todo 消息正常处理,从队列中删除掉当前消息
channel.basicAck(tag, false);
} else if ("deadLetter".equals(msg)) {
log.info("\n模拟消费异常,需要进入死信队列中 deadLetter 【{}】,【{}】\n", msg, DateUtils.format(new Date()));
//channel.basicReject(tag, false);也可以使用
channel.basicNack(tag, false,false);
} else {
throw new Exception(msg);
}
} catch (Exception e) {
log.info("\n消费者处理消息异常:【{}】\n", e.getMessage());
//todo true 允许加入队列, false 拒绝加入队列 ,进入私信队列中
channel.basicReject(tag, false);
}
}
/**
* 死信队列消费者
*
* @param msg 消息
* @param channel 通道对象
* @param tag 消息的tag
*/
@RabbitListener(queues = MqDeadLetterConstant.DEAD_LETTER_QUEUE)
public void receiveAckDlqQueue(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
// 处理死信队列消息
log.error("\n死信消费者接收消息:【{}】\n", msg);
//todo true 允许加入队列, false 拒绝加入队列
channel.basicAck(tag, false);
} catch (Exception e) {
log.info("\n死信消费者消息异常:【{}】\n", e.getMessage());
try {
channel.basicNack(tag, false, true);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
}
用到的basicNack,basicAck,basicReject
的具体参数意义参考手动确认消息basicAck、basicNack、basicReject的使用
2、触发重试机制最终进入私信队列 测试结果
package org.xwb.springcloud.messagetype.deadletter;
import com.alibaba.fastjson2.util.DateUtils;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqDeadLetterConstant;
import java.io.IOException;
import java.util.Date;
import java.util.Map;
/**
* @author 10564
*/
@Component
public class DeadLetterBusinessAndRetryCountConsumer {
private static final Logger log = LoggerFactory.getLogger(DeadLetterBusinessAndRetryCountConsumer.class);
Map<String, Integer> retryCountMap = new java.util.HashMap<>();
@Value("${spring.rabbitmq.listener.simple.retry.max-attempts}")
private Integer retryCount;
/**
* @param msg 消息内容
* @param channel 通道对象
* @param tag 消息的tag
*/
@RabbitListener(queues = MqDeadLetterConstant.BUSINESS_QUEUE)
public void receiveAckBusinessQueueMessage(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
try {
log.info("\n消费者eadLetter接收消息:【{}】\n", msg);
if ("normal".equals(msg)) {
log.info("\n已被正常消费 normal 【{}】,【{}】\n", msg, DateUtils.format(new Date()));
//todo 消息正常处理,从队列中删除掉当前消息
channel.basicAck(tag, false);
} else {
//todo 此处模拟消息消费异常,计数器累加,超过5次则拒绝加入队列 正常情况下使用redis 实现计数器,此处为了演示直接使用map代替
Integer timesObj = retryCountMap.get("msg:" + tag);
int times = timesObj == null ? 0 : timesObj;
if (times >= retryCount - 1) {
channel.basicNack(tag, false, false);
} else {
times = times == 0 ? 1 : times + 1;
retryCountMap.put("msg:" + tag, times);
log.error("\n已被消费异常,开始重试【{}】,第【{}】次\n", msg, times);
throw new Exception(msg);
}
}
} catch (Exception e) {
//抛出异常,触发重试机制
throw e;
}
}
/**
* 死信队列消费者
*
* @param msg 消息
* @param channel 通道对象
* @param tag 消息的tag
*/
@RabbitListener(queues = MqDeadLetterConstant.DEAD_LETTER_QUEUE)
public void receiveAckDlqQueue(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
// 处理死信队列消息
log.error("\n死信消费者接收消息:【{}】\n", msg);
//todo true 允许加入队列, false 拒绝加入队列
channel.basicAck(tag, false);
} catch (Exception e) {
log.error("\n死信消费者消息异常:【{}】\n", e.getMessage());
try {
channel.basicNack(tag, false, true);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
}
5、测试Test
package org.xwb.springcloud.controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xwb.springcloud.messagetype.deadletter.DeadLetterProducer;
import javax.annotation.Resource;
/**
* @author 10564
*/
@RestController
@RequestMapping("/mq")
public class MqMessageController {
@Resource
private DeadLetterProducer deadLetterProducer;
@GetMapping("/deadLetter")
public void deadLetter(String message) {
deadLetterProducer.senderDeadLetterMessage(message);
}
}
6、测试结果
1、直接进入私信队列
### deadLetter
GET http://localhost:8004/mq/deadLetter?message=normal
###结果
2025-06-22 09:04:47.926 INFO 11116 --- [nio-8004-exec-5] o.x.s.m.deadletter.DeadLetterProducer :
生产者DeadLetter发送消息:【normal】
2025-06-22 09:04:47.929 INFO 11116 --- [ntContainer#2-1] o.x.s.m.d.DeadLetterBusinessAndConsumer :
d消费者eadLetter接收消息:【normal】
2025-06-22 09:04:47.929 INFO 11116 --- [ntContainer#2-1] o.x.s.m.d.DeadLetterBusinessAndConsumer :
已被正常消费 normal 【normal】,【2025-06-22 09:04:47】
### deadLetter
GET http://localhost:8004/mq/deadLetter?message=deadLetter
###结果
2025-06-22 09:05:11.311 INFO 11116 --- [nio-8004-exec-6] o.x.s.m.deadletter.DeadLetterProducer :
生产者DeadLetter发送消息:【deadLetter】
2025-06-22 09:05:11.315 INFO 11116 --- [ntContainer#2-1] o.x.s.m.d.DeadLetterBusinessAndConsumer :
d消费者eadLetter接收消息:【deadLetter】
2025-06-22 09:05:11.315 INFO 11116 --- [ntContainer#2-1] o.x.s.m.d.DeadLetterBusinessAndConsumer :
模拟消费异常,需要进入死信队列中 deadLetter 【deadLetter】,【2025-06-22 09:05:11】
2025-06-22 09:05:11.319 ERROR 11116 --- [ntContainer#1-1] o.x.s.m.d.DeadLetterBusinessAndConsumer :
死信消费者接收消息:【deadLetter】
### deadLetter
GET http://localhost:8004/mq/deadLetter?message=deadLetter1
###结果
2025-06-22 09:05:29.494 INFO 11116 --- [nio-8004-exec-7] o.x.s.m.deadletter.DeadLetterProducer :
生产者DeadLetter发送消息:【deadLetter1】
2025-06-22 09:05:29.497 INFO 11116 --- [ntContainer#2-1] o.x.s.m.d.DeadLetterBusinessAndConsumer :
d消费者eadLetter接收消息:【deadLetter1】
2025-06-22 09:05:29.497 INFO 11116 --- [ntContainer#2-1] o.x.s.m.d.DeadLetterBusinessAndConsumer :
消费者处理消息异常:【deadLetter1】
2025-06-22 09:05:29.499 ERROR 11116 --- [ntContainer#1-1] o.x.s.m.d.DeadLetterBusinessAndConsumer :
死信消费者接收消息:【deadLetter1】
2、触发重试机制最终进入私信队列-测试结果
### deadLetter
GET http://localhost:8004/mq/deadLetter?message=deadLetter1
###结果
2025-06-22 11:10:06.909 INFO 2280 --- [nio-8004-exec-1] o.x.s.m.deadletter.DeadLetterProducer :
生产者DeadLetter发送消息:【deadLetter1】
2025-06-22 11:10:06.918 INFO 2280 --- [ntContainer#2-1] o.x.s.m.d.DeadLetterBusinessAndConsumer :
消费者eadLetter接收消息:【deadLetter1】
2025-06-22 11:10:06.918 ERROR 2280 --- [ntContainer#2-1] o.x.s.m.d.DeadLetterBusinessAndConsumer :
已被消费异常,开始重试【deadLetter1】,第【1】次
2025-06-22 11:10:08.921 INFO 2280 --- [ntContainer#2-1] o.x.s.m.d.DeadLetterBusinessAndConsumer :
消费者eadLetter接收消息:【deadLetter1】
2025-06-22 11:10:08.921 ERROR 2280 --- [ntContainer#2-1] o.x.s.m.d.DeadLetterBusinessAndConsumer :
已被消费异常,开始重试【deadLetter1】,第【2】次
2025-06-22 11:10:12.928 INFO 2280 --- [ntContainer#2-1] o.x.s.m.d.DeadLetterBusinessAndConsumer :
消费者eadLetter接收消息:【deadLetter1】
2025-06-22 11:10:12.929 ERROR 2280 --- [ntContainer#2-1] o.x.s.m.d.DeadLetterBusinessAndConsumer :
已被消费异常,开始重试【deadLetter1】,第【3】次
2025-06-22 11:10:17.935 INFO 2280 --- [ntContainer#2-1] o.x.s.m.d.DeadLetterBusinessAndConsumer :
消费者eadLetter接收消息:【deadLetter1】
2025-06-22 11:10:17.935 ERROR 2280 --- [ntContainer#2-1] o.x.s.m.d.DeadLetterBusinessAndConsumer :
已被消费异常,开始重试【deadLetter1】,第【4】次
2025-06-22 11:10:22.948 INFO 2280 --- [ntContainer#2-1] o.x.s.m.d.DeadLetterBusinessAndConsumer :
消费者eadLetter接收消息:【deadLetter1】
2025-06-22 11:10:22.960 ERROR 2280 --- [ntContainer#1-1] o.x.s.m.d.DeadLetterBusinessAndConsumer :
死信消费者接收消息:【deadLetter1】