微服务消息队列之RabbitMQ,深入了解

发布于:2025-08-02 ⋅ 阅读:(8) ⋅ 点赞:(0)


在之前的基础篇中,我们探讨了如何在项目中使用消息队列(MQ)进行消息传递。那么,在传递过程中,消息是否会丢失,从而导致后续服务无法正常执行?答案是肯定的。由于消息传递本质上依赖网络通信,网络的不确定性天然存在消息丢失的风险。因此,保证消息的可靠性至关重要。

要解决这个问题,首先需要分析哪些环节可能导致消息丢失?

1.MQ丢失的情况分析

我们知道消息队列在项目中执行的过程是这样的:
在这里插入图片描述

消息从生产者到消费者的每一步都可能导致消息丢失,可以从三种情况分析:

  1. 生产者发送阶段
    • 生产者连接MQ失败
    • 消息到达MQ后未找到Exchange
    • 消息到达Exchange后未匹配Queue
    • MQ进程异常
  2. MQ存储阶段(Queue持久化)
    • 消息到达MQ,保存到队列后,尚未消费就突然宕机
  3. 消费者处理消息阶段
    • 消息接收后尚未处理突然宕机
    • 消息接收后处理过程中抛出异常

从上面情况分析,我们可以发现主要是三个阶段会发生消息丢失,现在我们具有问题具体分析:

2.如何保证发送者消息的可靠?

2.1 生产者连接MQ失败——生产者重试机制

问题本质:当生产者因网络抖动、MQ集群短暂不可用等原因连接失败时

解决方法:生产者重试机制

2.1.1 生产者重试机制
1.修改生产者模块的配置文件application.yaml,添加重试机制
spring:
  rabbitmq:
    host: 127.0.0.1 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123456 # 密码
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

2.关闭虚拟机,构造生成者连接不到MQ情况
docker stop mq
3.启动测试类,观察是否重试

在这里插入图片描述

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。

如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

2.2 其余三种情况——生产者确认机制

在网络繁忙场景下,重试机制能解决大部分生产者到MQ的连接级消息丢失,但仍存在消息成功抵达MQ服务端后丢失的少数情况,例如:

  • 消息到达MQ后未找到Exchange
  • 消息到达Exchange后未匹配Queue
  • MQ进程处理消息时异常崩溃

针对这些场景,RabbitMQ提供了生产者消息确认机制,通过两种独立但互补的机制保障可靠性:

  1. Publisher Confirm:确认消息是否被MQ成功接收(持久化到磁盘/写入内存队列)
  2. Publisher Return:捕获因路由失败(无Exchange/无匹配Queue)被MQ丢弃的消息

当生产者启用这两种机制后,MQ会通过异步回调向生产者返回明确的操作结果(成功确认或失败原因)。

具体如图所示:

在这里插入图片描述

流程总结如下:

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack,代表投递成功。为什么失败还返回投递成功?这是因为这是人为导致的,不是网络链路导致的
  • 临时消息投递到了MQ,并且入队成功,返回ack,告知投递成功!
  • 持久消息投递到了MQ,并且入队完成持久化,返回ack,告知投递成功
  • 其他情况都会返回nack,告知投递失败

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。

默认两种机制都是关闭状态,需要通过配置文件来开启。

2.2.1 实现生产者确认
1.开启生产者确认

在publisher模块的application.yaml中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执
2.定义ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

在这里插入图片描述

package com.itheima.publisher.config;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
 * @program: mq-demo
 * @description:
 * @author: WangXin
 * @create: 2025-08-01 21:21
 **/
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
    private final RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("触发return callback,");
                log.debug("exchange: {}", returned.getExchange());
                log.debug("routingKey: {}", returned.getRoutingKey());
                log.debug("message: {}", returned.getMessage());
                log.debug("replyCode: {}", returned.getReplyCode());
                log.debug("replyText: {}", returned.getReplyText());
            }
        });
    }
}
3.定义ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

package com.itheima.publisher.amqp;


import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.Resource;

/**
 * @program: mq-demo
 * @description:
 * @author: WangXin
 * @create: 2025-08-01 21:24
 **/
@SpringBootTest
@Slf4j
public class test {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    void testPublisherConfirm() {
        // 1.创建CorrelationData
       CorrelationData correlationData = new CorrelationData();
        // 2.给Future添加ConfirmCallback
        correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {

            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                if(result.isAck()){
                    log.debug("发送消息成功,收到 ack!");
                }else{
                    log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
                }
            }

            @Override
            public void onFailure(Throwable ex) {
                log.error("send message fail", ex);
            }
        });
        // 3.发送消息
        rabbitTemplate.convertAndSend("hmall.direct", "blue", "hello", correlationData);
    }

}

5.测试结果

先把日志信息改成debug

在这里插入图片描述

先启动消费者,创建消息通道**(这里是采用注解新建Direct类型的交换机,基础篇定义过)**,再启动测试类,正常结果:

在这里插入图片描述

当我填错router key的结果

 rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", correlationData);

在这里插入图片描述

3. 如何保证MQ的可靠性?

3.1 数据持久化:防止消息丢失的核心保障

问题背景

消息到达MQ后,若未及时保存,仍可能因服务重启或崩溃导致丢失。RabbitMQ默认将数据存储在内存中(临时数据),需手动配置持久化以实现可靠性。

持久化三要素
  1. 交换机持久化
    • 控制台操作:Exchanges页面创建交换机时,设置DurabilityDurable
    • 效果:MQ重启后交换机配置保留
  2. 队列持久化
    • 控制台操作:Queues页面创建队列时,设置DurabilityDurable
    • 效果:MQ重启后队列配置及元数据保留
  3. 消息持久化
    • 控制台操作:发送消息时添加properties参数,设置delivery_mode=2
    • 效果:消息内容写入磁盘,MQ重启后仍存在
生产者确认与持久化的协同
  • 开启生产者确认(Publisher Confirm)时,MQ在消息完成持久化后发送ACK回执
  • 性能优化:MQ批量持久化消息(约100ms间隔),减少IO操作
  • 建议:生产者确认采用异步处理,避免阻塞业务线程

3.2 LazyQueue模式

传统队列的问题
  1. 内存瓶颈
    • 消费者故障/网络问题 → 消息积压
    • 消息量激增 → 内存占用飙升
  2. PageOut阻塞
    • 达到内存预警 → 消息刷盘(PageOut)
    • PageOut期间队列完全阻塞
LazyQueue解决方案(3.6.0+)
特性 传统队列 LazyQueue
存储位置 内存 直接存入磁盘
消息加载 即时加载 消费时加载(懒加载)
内存占用 极低
抗堆积能力 差(易触发PageOut) 强(支持百万消息)

版本提示:3.12+版本默认所有队列为Lazy模式

配置方式
  1. 控制台配置
    • 创建队列时添加参数:x-queue-mode=lazy
  2. 代码配置(Spring AMQP)
// 方式1:QueueBuilder
@Bean
public Queue lazyQueue() {
    return QueueBuilder.durable("lazy.queue")
                      .lazy()  // 关键配置
                      .build();
}

// 方式2:注解声明
@RabbitListener(queuesToDeclare = @Queue(
    name = "lazy.queue",
    durable = "true",
    arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg) {
    // 消息处理逻辑
}
  1. 更新现有队列为Lazy模式
# 命令行配置Policy
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
  • Lazy:策略名称(自定义)
  • "^lazy-queue$":正则匹配队列名
  • '{"queue-mode":"lazy"}':设置队列模式
  • --apply-to queues:策略作用对象
控制台Policy配置
  1. 进入Admin → Policies页面
  2. 添加新Policy:
    • Pattern^lazy-queue$(队列名正则)
    • Definitionqueue-mode=lazy
    • Apply to:Queues

4.消费者如何保证可靠性?

1. 消费者确认机制(ACK机制)

核心目的:让RabbitMQ知晓消费者处理状态,决定消息是否重新投递

三种确认模式

模式 特点 适用场景
none 自动ACK,消息立即删除(高风险) 测试环境,非关键业务
manual 需手动调用API发送ack/nack(灵活但代码侵入性强) 需精细控制确认时机的复杂业务
auto Spring自动处理(推荐)
业务异常→nack
消息异常→reject
大多数业务场景

关键异常类型(触发reject):

  • MessageConversionException(消息转换失败)
  • MethodArgumentNotValidException(参数校验失败)
  • ClassCastException(类型转换错误)
  • 其他不可恢复异常

最佳实践:生产环境使用auto模式,配合异常分类处理

2. 失败重试机制

配置示例

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true           # 开启重试
          initial-interval: 1000  # 初始间隔(ms)
          multiplier: 2           # 间隔倍数
          max-attempts: 3         # 最大重试次数
          stateless: true         # 无状态重试

重试行为

  • 本地重试(非无限requeue)
  • 达到最大重试次数后抛出AmqpRejectAndDontRequeueException
  • 最终返回reject,消息被丢弃
3. 失败处理策略

三级处理策略

重试耗尽
默认丢弃
重新入队
转发死信队列

推荐方案:RepublishMessageRecoverer

@Bean
public MessageRecoverer republishRecoverer(RabbitTemplate rabbitTemplate) {
    // 将失败消息路由到error.direct交换机的error队列
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); 
}

死信队列配置

@Bean
public DirectExchange errorExchange() {
    return new DirectExchange("error.direct");
}

@Bean
public Queue errorQueue() {
    return new Queue("error.queue", true);
}

@Bean
public Binding errorBinding() {
    return BindingBuilder.bind(errorQueue())
                         .to(errorExchange())
                         .with("error");
}
4. 业务幂等性保障

双重防护策略

① 唯一消息ID方案

@Bean
public MessageConverter messageConverter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    converter.setCreateMessageIds(true); // 启用消息ID生成
    return converter;
}
  • 消费者记录已处理消息ID
  • 重复消息直接跳过

② 业务状态判断(推荐)

public void markOrderPaySuccess(Long orderId) {
    // 原子操作:仅当状态为未支付时才更新
    lambdaUpdate()
        .set(Order::getStatus, 2)
        .set(Order::getPayTime, now())
        .eq(Order::getId, orderId)
        .eq(Order::getStatus, 1) // 关键幂等条件
        .update();
}

SQL等效UPDATE order SET status=2 WHERE id=? AND status=1

5. 兜底方案(最终一致性保障)

主动查询补偿机制

交易服务 支付服务 订单数据库 定时任务(每20秒) 返回支付状态 更新为已支付 alt [支付成功但状态未更新] 交易服务 支付服务 订单数据库

关键设计

  • 定时任务扫描超时未支付订单
  • 主动查询第三方支付状态
  • 基于乐观锁更新订单状态
  • 报警机制监控补偿执行情况

全链路可靠性保障体系

确认机制
持久化+LazyQueue
ACK机制
本地重试
死信队列
幂等校验
定时任务
生产者
RabbitMQ
磁盘存储
消费者
人工处理
业务数据库
支付状态补偿

核心要点

  1. 三阶段防护

    • 生产者:重试机制 + 确认机制
    • MQ:持久化 + LazyQueue
    • 消费者:ACK + 重试 + 死信队列
  2. 幂等性设计

    • 更新操作必须带状态校验
    • 关键业务使用原子操作
  3. 最终一致性

    • 死信队列人工干预
    • 定时任务主动补偿
    • 全链路监控报警

通过以上措施,可确保消息系统达到99.99%的可靠性,即使在极端故障情况下也能保证业务数据的最终一致性。


网站公告

今日签到

点亮在社区的每一天
去签到