在之前的基础篇中,我们探讨了如何在项目中使用消息队列(MQ)进行消息传递。那么,在传递过程中,消息是否会丢失,从而导致后续服务无法正常执行?答案是肯定的。由于消息传递本质上依赖网络通信,网络的不确定性天然存在消息丢失的风险。因此,保证消息的可靠性至关重要。
要解决这个问题,首先需要分析哪些环节可能导致消息丢失?
1.MQ丢失的情况分析
我们知道消息队列在项目中执行的过程是这样的:
消息从生产者到消费者的每一步都可能导致消息丢失,可以从三种情况分析:
- 生产者发送阶段
- 生产者连接MQ失败
- 消息到达MQ后未找到Exchange
- 消息到达Exchange后未匹配Queue
- MQ进程异常
- MQ存储阶段(Queue持久化)
- 消息到达MQ,保存到队列后,尚未消费就突然宕机
- 消费者处理消息阶段
- 消息接收后尚未处理突然宕机
- 消息接收后处理过程中抛出异常
从上面情况分析,我们可以发现主要是三个阶段会发生消息丢失,现在我们具有问题具体分析:
2.如何保证发送者消息的可靠?
2.1 生产者连接MQ失败——生产者重试机制
问题本质:当生产者因网络抖动、MQ集群短暂不可用等原因连接失败时
解决方法:生产者重试机制
2.1.1 生产者重试机制
1.修改生产者模块的配置文件application.yaml
,添加重试机制
spring:
rabbitmq:
host: 127.0.0.1 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123456 # 密码
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
2.关闭虚拟机,构造生成者连接不到MQ情况
docker stop mq
3.启动测试类,观察是否重试
注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
2.2 其余三种情况——生产者确认机制
在网络繁忙场景下,重试机制能解决大部分生产者到MQ的连接级消息丢失,但仍存在消息成功抵达MQ服务端后丢失的少数情况,例如:
- 消息到达MQ后未找到Exchange
- 消息到达Exchange后未匹配Queue
- MQ进程处理消息时异常崩溃
针对这些场景,RabbitMQ提供了生产者消息确认机制,通过两种独立但互补的机制保障可靠性:
Publisher Confirm
:确认消息是否被MQ成功接收(持久化到磁盘/写入内存队列)Publisher Return
:捕获因路由失败(无Exchange/无匹配Queue)被MQ丢弃的消息
当生产者启用这两种机制后,MQ会通过异步回调向生产者返回明确的操作结果(成功确认或失败原因)。
具体如图所示:
流程总结如下:
- 当消息投递到MQ,但是路由失败时,通过
Publisher Return
返回异常信息,同时返回ack,代表投递成功。为什么失败还返回投递成功?这是因为这是人为导致的,不是网络链路导致的 - 临时消息投递到了MQ,并且入队成功,返回ack,告知投递成功!
- 持久消息投递到了MQ,并且入队完成持久化,返回ack,告知投递成功
- 其他情况都会返回nack,告知投递失败
其中ack
和nack
属于Publisher Confirm机制,ack
是投递成功;nack
是投递失败。而return
则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启。
2.2.1 实现生产者确认
1.开启生产者确认
在publisher模块的application.yaml
中添加配置:
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制
这里publisher-confirm-type
有三种模式可选:
none
:关闭confirm机制simple
:同步阻塞等待MQ的回执correlated
:MQ异步回调返回回执
2.定义ReturnCallback
每个RabbitTemplate
只能配置一个ReturnCallback
,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:
package com.itheima.publisher.config;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
* @program: mq-demo
* @description:
* @author: WangXin
* @create: 2025-08-01 21:21
**/
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("触发return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
}
});
}
}
3.定义ConfirmCallback
由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
package com.itheima.publisher.amqp;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.Resource;
/**
* @program: mq-demo
* @description:
* @author: WangXin
* @create: 2025-08-01 21:24
**/
@SpringBootTest
@Slf4j
public class test {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
void testPublisherConfirm() {
// 1.创建CorrelationData
CorrelationData correlationData = new CorrelationData();
// 2.给Future添加ConfirmCallback
correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onSuccess(CorrelationData.Confirm result) {
if(result.isAck()){
log.debug("发送消息成功,收到 ack!");
}else{
log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
}
}
@Override
public void onFailure(Throwable ex) {
log.error("send message fail", ex);
}
});
// 3.发送消息
rabbitTemplate.convertAndSend("hmall.direct", "blue", "hello", correlationData);
}
}
5.测试结果
先把日志信息改成debug
先启动消费者,创建消息通道**(这里是采用注解新建Direct类型的交换机,基础篇定义过)**,再启动测试类,正常结果:
当我填错router key的结果
rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", correlationData);
3. 如何保证MQ的可靠性?
3.1 数据持久化:防止消息丢失的核心保障
问题背景
消息到达MQ后,若未及时保存,仍可能因服务重启或崩溃导致丢失。RabbitMQ默认将数据存储在内存中(临时数据),需手动配置持久化以实现可靠性。
持久化三要素
- 交换机持久化
- 控制台操作:Exchanges页面创建交换机时,设置
Durability
为Durable
- 效果:MQ重启后交换机配置保留
- 控制台操作:Exchanges页面创建交换机时,设置
- 队列持久化
- 控制台操作:Queues页面创建队列时,设置
Durability
为Durable
- 效果:MQ重启后队列配置及元数据保留
- 控制台操作:Queues页面创建队列时,设置
- 消息持久化
- 控制台操作:发送消息时添加properties参数,设置
delivery_mode=2
- 效果:消息内容写入磁盘,MQ重启后仍存在
- 控制台操作:发送消息时添加properties参数,设置
生产者确认与持久化的协同
- 开启生产者确认(Publisher Confirm)时,MQ在消息完成持久化后发送ACK回执
- 性能优化:MQ批量持久化消息(约100ms间隔),减少IO操作
- 建议:生产者确认采用异步处理,避免阻塞业务线程
3.2 LazyQueue模式
传统队列的问题
- 内存瓶颈:
- 消费者故障/网络问题 → 消息积压
- 消息量激增 → 内存占用飙升
- PageOut阻塞:
- 达到内存预警 → 消息刷盘(PageOut)
- PageOut期间队列完全阻塞
LazyQueue解决方案(3.6.0+)
特性 | 传统队列 | LazyQueue |
---|---|---|
存储位置 | 内存 | 直接存入磁盘 |
消息加载 | 即时加载 | 消费时加载(懒加载) |
内存占用 | 高 | 极低 |
抗堆积能力 | 差(易触发PageOut) | 强(支持百万消息) |
版本提示:3.12+版本默认所有队列为Lazy模式
配置方式
- 控制台配置
- 创建队列时添加参数:
x-queue-mode=lazy
- 创建队列时添加参数:
- 代码配置(Spring AMQP)
// 方式1:QueueBuilder
@Bean
public Queue lazyQueue() {
return QueueBuilder.durable("lazy.queue")
.lazy() // 关键配置
.build();
}
// 方式2:注解声明
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg) {
// 消息处理逻辑
}
- 更新现有队列为Lazy模式
# 命令行配置Policy
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
Lazy
:策略名称(自定义)"^lazy-queue$"
:正则匹配队列名'{"queue-mode":"lazy"}'
:设置队列模式--apply-to queues
:策略作用对象
控制台Policy配置
- 进入Admin → Policies页面
- 添加新Policy:
- Pattern:
^lazy-queue$
(队列名正则) - Definition:
queue-mode=lazy
- Apply to:Queues
- Pattern:
4.消费者如何保证可靠性?
1. 消费者确认机制(ACK机制)
核心目的:让RabbitMQ知晓消费者处理状态,决定消息是否重新投递
三种确认模式:
模式 | 特点 | 适用场景 |
---|---|---|
none | 自动ACK,消息立即删除(高风险) | 测试环境,非关键业务 |
manual | 需手动调用API发送ack/nack(灵活但代码侵入性强) | 需精细控制确认时机的复杂业务 |
auto | Spring自动处理(推荐) 业务异常→nack 消息异常→reject |
大多数业务场景 |
关键异常类型(触发reject):
- MessageConversionException(消息转换失败)
- MethodArgumentNotValidException(参数校验失败)
- ClassCastException(类型转换错误)
- 其他不可恢复异常
最佳实践:生产环境使用auto模式,配合异常分类处理
2. 失败重试机制
配置示例:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启重试
initial-interval: 1000 # 初始间隔(ms)
multiplier: 2 # 间隔倍数
max-attempts: 3 # 最大重试次数
stateless: true # 无状态重试
重试行为:
- 本地重试(非无限requeue)
- 达到最大重试次数后抛出AmqpRejectAndDontRequeueException
- 最终返回reject,消息被丢弃
3. 失败处理策略
三级处理策略:
推荐方案:RepublishMessageRecoverer
@Bean
public MessageRecoverer republishRecoverer(RabbitTemplate rabbitTemplate) {
// 将失败消息路由到error.direct交换机的error队列
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
死信队列配置:
@Bean
public DirectExchange errorExchange() {
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue() {
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding() {
return BindingBuilder.bind(errorQueue())
.to(errorExchange())
.with("error");
}
4. 业务幂等性保障
双重防护策略:
① 唯一消息ID方案:
@Bean
public MessageConverter messageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setCreateMessageIds(true); // 启用消息ID生成
return converter;
}
- 消费者记录已处理消息ID
- 重复消息直接跳过
② 业务状态判断(推荐):
public void markOrderPaySuccess(Long orderId) {
// 原子操作:仅当状态为未支付时才更新
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1) // 关键幂等条件
.update();
}
SQL等效:
UPDATE order SET status=2 WHERE id=? AND status=1
5. 兜底方案(最终一致性保障)
主动查询补偿机制:
关键设计:
- 定时任务扫描超时未支付订单
- 主动查询第三方支付状态
- 基于乐观锁更新订单状态
- 报警机制监控补偿执行情况
全链路可靠性保障体系
核心要点:
三阶段防护:
- 生产者:重试机制 + 确认机制
- MQ:持久化 + LazyQueue
- 消费者:ACK + 重试 + 死信队列
幂等性设计:
- 更新操作必须带状态校验
- 关键业务使用原子操作
最终一致性:
- 死信队列人工干预
- 定时任务主动补偿
- 全链路监控报警
通过以上措施,可确保消息系统达到99.99%的可靠性,即使在极端故障情况下也能保证业务数据的最终一致性。