#RabbitMQ# 消息队列进阶

发布于:2025-05-29 ⋅ 阅读:(24) ⋅ 点赞:(0)

目录

消息可靠性

一 生产者的可靠性

1 生产者的重连

2 生产者的确认

(1 Confirm*

(2 Return

二 MQ的可靠性

1 数据持久化

2 Lazy Queue*

三 消费者的可靠性

1 消费者确认机制

2 消费失败处理

3 业务幂等性

四 延迟消息


消息可靠性

在消息队列中,可靠性(Reliability) 是指确保消息在 生产者、Broker(消息中间件)、消费者 三者之间传输时,不会因系统故障、网络问题或程序错误而丢失或重复处理。其核心目标是保证消息从发送到最终消费的 完整性和一致性

首先整体概括一下大体的内容,首先我们研究的对象为三个,生产者消费者Broker

生产者的可靠性的保障需要有retry重试机制以及确认机制。确认机制有两个confirm和return(confirm是监听消息到达Broker,return是监听消息从交换机路由到队列)retry是生产者连接或 Confirm 失败后的重试。

Broker可靠性需要数据的持久化以及懒队列存储方式(默认)的实现

消费者的可靠性的保障需要有确认机制以及失败重试机制,确认机制主要是分为两种,一种是自动一种是手动。失败重试机制先可以指定重试规则如果失败可借助死信队列的机制实现最终的处理

一 生产者的可靠性

1 生产者的重试

可以人为配置,但是要注意配置的时长,防止影响业务性能。并且SpringAMQP的重试机制默认是阻塞式重试,会出现线程资源占用。

2 生产者的确认

首先生产者的确认有两种机制,一种是Publisher Confirm 一种是Publisher Return.

(1 Confirm*

其作用是确保将信息发送到Broke.

1 配置:开启confirm机制

correlated:使用异步回调,自己可以定义一个回调实现类用于接收回调消息,不需要阻塞等待,可以提高性能。

# 配置
spring:
  rabbitmq:
    publisher-confirm-type: correlated  # 开启Confirm模式

2 然后编写回调实现类

这个onfirm方法的作用是当生产者通过RabbitTemplate发送消息时,若启动Confirm机制,消息会异步发送到Broker,在Broke处理完消息后,会向生产者发送ack,或者nack,这个重写的confirm方法是在底层被SpringAMQP监听的,当SpringAMQP监听到Broke返回的确认结果时,会自动调用已注册的ConfirmCallBack实现类的confirm方法。

@Component
@Slf4j
public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("Confirm 成功: 消息ID={}", correlationData.getId());
        } else {
            log.error("Confirm 失败: 消息ID={}, 原因={}", correlationData.getId(), cause);
        }
    }
}

3 发送消息

@Service
public class OrderProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendOrderMessage(String message) {
        // 1. 创建关联数据(用于 Confirm 回调)
        CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());

        // 2. 发送消息到交换机
        rabbitTemplate.convertAndSend(
            "order.direct",  // 交换机名称
            "order.key",    // 路由键
            message,         // 消息内容
            cd              // 关联数据
        );

        log.info("消息已发送: ID={}", cd.getId());
    }
}

(2 Return

其作用是确保信息到达Broke后,并能正确路由到队列。(这种情况可以避免,人为确保可以正确路由)

1 配置开启Return机制

publisher-returns: true              # 启用Return机制

2 回调实现类Return

@Component
@Slf4j
public class RabbitReturnCallback implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.error("Return 回调: 消息无法路由到队列 [Exchange={}, RoutingKey={}, ReplyCode={}]", 
            returned.getExchange(), 
            returned.getRoutingKey(), 
            returned.getReplyCode()
        );
    }
}

二 MQ的可靠性

在默认情况下,RabbitMQ会将连接收到的消息保存在内存中以降低消息收发的延迟。

  • 一旦MQ宕机,内存当中的消息会丢失
  • 内存空间有限,当消费者故障或者处理过慢时,会导致消息积压,会触发PageOut机制存储,将消息写入磁盘,但是会引发MQ阻塞。

1 数据持久化

RabbitMQ实现数据持久化包括3个方面

  • 交换机持久化
  • 队列持久化
  • 消息持久化
  1. 三要素需同时持久化

    • 队列(durable=true)、交换机(durable=true)、消息(deliveryMode=2三者缺一不可。
    • 任意一环未持久化,消息可能丢失。
    • 队列持久化与交换机持久化都可以通过注解当中的配置属性指定。
  2. 消费者手动确认

    • 消费端需关闭自动确认(autoAck=false),并手动发送ACK(channel.basicAck())后,消息才会从队列删除。

代码:

举例说明

@Configuration
public class RabbitMQConfig {

    // 持久化交换机
    @Bean
    public DirectExchange durableExchange() {
        return new DirectExchange("durable.direct", true, false); // durable=true
    }

    // 持久化队列
    @Bean
    public Queue durableQueue() {
        return new Queue("durable.queue", true); // durable=true
    }

    // 绑定关系
    @Bean
    public Binding durableBinding() {
        return BindingBuilder.bind(durableQueue())
            .to(durableExchange())
            .with("routing.key");
    }
}
@Service
@RequiredArgsConstructor
public class MessageProducer {
    private final RabbitTemplate rabbitTemplate;

    public void sendPersistentMessage() {
        // 消息内容
        String message = "hello everyone!";

        // 创建消息属性并设置持久化
        MessageProperties properties = new MessageProperties();
        properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // deliveryMode=2

        // 构建消息
        Message amqpMessage = new Message(message.getBytes(), properties);

        // 发送消息(交换机、路由键、消息、关联数据)
        rabbitTemplate.convertAndSend(
            "durable.direct",   // 交换机名称
            "routing.key",      // 路由键
            amqpMessage,        // 消息对象
            new CorrelationData(UUID.randomUUID().toString())
        );
    }
}

持久化的本质是通过磁盘写入实现数据的持久存储

2 Lazy Queue*

从 RabbitMQ 3.6.0 版本开始,引入了惰性队列 Lazy Queue 的概念,也就是惰性队列。

惰性队列的特性:

  • 接收到消息后直接存入磁盘而非内存。(内存中会存储少量消息)
  • 消费者要获取消息时才会从磁盘中读取并加载到内存当中。
  • 支持数百万条的消息存储。
  • 在3.12版本之后所有的队列都是LazyQueue模式,无法更改。
  • 开启持久化和生产者确认时,只有在消息持久化完成后才会给生产者返回ACK

如果是老版本的话需要手动再指定,新版本不需指定,默认

@RabbitListener(queuesToDeclare = @Queue(
    name = "explicit.lazy.queue",
    durable = "true",
    arguments = @Argument(name = "x-queue-mode", value = "lazy") // 显式设置惰性队列
))
public void listen(String msg) {
    // 处理消息
}
@Bean
public Queue explicitLazyQueue() {
    return QueueBuilder.durable("explicit.lazy.queue")
        .lazy() // 显式声明惰性队列(兼容低版本 RabbitMQ)
        .build();
}

三 消费者的可靠性

1 消费者确认机制

概念:为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制。当消费者消息处理结束时向Broker发送一个回执,告知消息处理的状态。

  • ack:成功处理消息,RabbitMQ从队列当中删除消息。
  • nack:消息处理失败,RabbitMQ需要再次投递消息。
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息。

配置

spring:
  rabbitmq:
    host: 192.168.100.100
    port: 5672
    virtual-host: /hmall
    username: hmall
    password: 123456
    connection-timeout: 1s
    listener:
        simple:
          prefetch: 1
          acknowledge-mode: auto #开启消费者确认机制
    publisher-confirm-type: correlated # 开启confirm模式

2 消费失败处理

自动确认

当消费者出现异常后,消息会不断requeue重新入队,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力。

    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto # 默认自动确认
        retry:
          enabled: true # 开启消费者重试机制
          max-attempts: 3 # 最大重试次数
          multiplier: 1 # 重试时间间隔
          initial-interval: 1000ms # 初始重试时间间隔

本地重试(Local Retry)
Spring 的 retry 机制通过 本地重试 替代 RabbitMQ 的 requeue,避免消息反复入队。从而提升性能。

将失败策略修改为RepublicMessageRecoverer,首先先定义接收失败的交换机队列机器绑定关系,然后定义RepublicMessageRecoverer

@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled",havingValue = "true")
public class RabbitListenerConfig {

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
        // 关键点:失败时将消息重新发布到死信交换机
        return new RepublishMessageRecoverer(
                rabbitTemplate,
                "dlx.direct",   // 死信交换机名称
                "dlx.routing.key" // 死信路由键
        );
    }
}

然后再定义:死信队列+死信交换机+正常队列

首先定义死信交换机死信队列并绑定,然后定义队列规则,将死信队列交换机与队列绑定

可在yml配置中编写重试规则

    // 1. 定义死信交换机(DLX)
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange("dlx.direct", true, false);
    }

    // 2. 定义死信队列(DLQ)
    @Bean
    public Queue dlq() {
        return new Queue("dlq.queue", true);
    }

    // 3. 绑定死信队列到死信交换机
    @Bean
    public Binding dlqBinding() {
        return BindingBuilder.bind(dlq()).to(dlxExchange()).with("dlx.routing.key");
    }

    // 4. 定义原始队列,并绑定到死信交换机
    @Bean
    public Queue originQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "dlx.direct");    // 指定死信交换机
        args.put("x-dead-letter-routing-key", "dlx.routing.key"); // 指定死信路由键
        return new Queue("origin.queue", true, false, false, args);
    }

如果这个队列出现重试依旧出错将会进入死信交换机后再路由进入死信队列

生产者发送消息 → 原始队列(origin.queue)
                ↓
消费者尝试处理消息 → 成功 → ACK
                ↓
消费者处理失败 → 重试 → 仍失败 → 拒绝消息(requeue=false)
                ↓
消息变为死信 → 转发到死信交换机(dlx.direct)
                ↓
死信交换机根据路由键(dlx.routing.key) → 路由到死信队列(dlx.queue)
                ↓
死信队列中的消息被专门的消费者处理(如人工干预、日志记录、重试等)

手动确认

1 xml文件

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual # 手动确认模式
        retry:
          enabled: true         # 开启消费者重试
          max-attempts: 3       # 最大重试次数(含首次消费)
          initial-interval: 1000 # 初始重试间隔(ms)
          multiplier: 2          # 重试间隔乘数(1s → 2s → 4s)
          max-interval: 10000    # 最大重试间隔(ms)

2 声明队列,交换机以及绑定关系

@Configuration
public class RabbitMQConfig {

    // 正常业务交换机
    public static final String ORDER_EXCHANGE = "order_exchange";
    // 正常业务队列
    public static final String ORDER_QUEUE = "order_queue";
    // 死信交换机
    public static final String DLX_EXCHANGE = "dlx_exchange";
    // 死信队列
    public static final String DLX_QUEUE = "dlx_queue";

    /**
     * 声明正常业务交换机(直连交换机)
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(ORDER_EXCHANGE);
    }

    /**
     * 声明正常业务队列,并绑定死信交换机
     */
    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable(ORDER_QUEUE)
                .deadLetterExchange(DLX_EXCHANGE) // 绑定死信交换机
                .deadLetterRoutingKey("dlx.routing.key") // 死信路由键
                .build();
    }

    /**
     * 绑定队列与交换机
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with("order.routing.key");
    }

    /**
     * 声明死信交换机(直连交换机)
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(DLX_EXCHANGE);
    }

    /**
     * 声明死信队列
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(DLX_QUEUE, true);
    }

    /**
     * 绑定死信队列与交换机
     */
    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with("dlx.routing.key");
    }
}

3 配置重试策略与失败消息转发

@Configuration
public class RetryConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 配置重试策略
     */
    @Bean
    public RetryPolicy retryPolicy() {
        return new SimpleRetryPolicy();
    }

    /**
     * 配置重试模板
     */
    @Bean
    public RetryTemplate retryTemplate(RetryPolicy retryPolicy) {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy);

        // 指数退避策略
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);    // 初始间隔 1s
        backOffPolicy.setMultiplier(2);            // 间隔乘数
        backOffPolicy.setMaxInterval(10000);       // 最大间隔 10s
        retryTemplate.setBackOffPolicy(backOffPolicy);

        return retryTemplate;
    }

    /**
     * 配置消息恢复器(失败后投递到死信交换机)
     */
    @Bean
    public MessageRecoverer messageRecoverer() {
        return new RepublishMessageRecoverer(
                rabbitTemplate,
                RabbitMQConfig.DLX_EXCHANGE,  // 死信交换机
                "dlx.routing.key"             // 死信路由键
        );
    }

    /**
     * 配置监听器容器工厂(核心)
     */
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory,
            RetryTemplate retryTemplate,
            MessageRecoverer messageRecoverer) {

        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认

        // 配置重试
        factory.setAdviceChain(
                RetryInterceptorBuilder.stateless()
                        .retryOperations(retryTemplate)
                        .recoverer(messageRecoverer) // 失败后转发到死信队列
                        .build()
        );

        return factory;
    }
}

4 消费者实现

@Component
public class OrderConsumer {

    private static final Logger log = LoggerFactory.getLogger(OrderConsumer.class);

    @RabbitListener(queues = RabbitMQConfig.ORDER_QUEUE)
    public void handleOrderMessage(@Payload Order order, 
                                  Channel channel, 
                                  @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            // 业务处理逻辑(模拟可能抛出异常)
            processOrder(order);

            // 处理成功,手动确认
            channel.basicAck(deliveryTag, false);
            log.info("订单处理成功: {}", order.getId());
        } catch (Exception e) {
            // 记录异常日志
            log.error("订单处理失败: {}", order.getId(), e);
            
            // 手动拒绝消息(不重新入队,由重试机制处理)
            channel.basicNack(deliveryTag, false, false);
        }
    }

    private void processOrder(Order order) throws Exception {
        // 模拟业务处理(如数据库操作、远程调用等)
        if (order.getAmount() < 0) {
            throw new IllegalArgumentException("订单金额非法");
        }
        // ... 其他业务逻辑
    }
}

死信队列消费者

@Component
public class DlxConsumer {

    @RabbitListener(queues = RabbitMQConfig.DLX_QUEUE)
    public void handleDlxMessage(Order failedOrder) {
        // 记录日志或人工处理
        System.err.println("收到死信消息: " + failedOrder);
        // 可在此处发送告警邮件或记录到数据库
    }
}

3 业务幂等性

解决方案:

1 唯一id

代码:

    @Bean
    public MessageConverter jacksonMessageConvertor() {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        converter.setCreateMessageIds(true);
        return converter;
    }

为每一条消息设置一个全局唯一id以达到幂等性

2 业务判断

四 延迟消息

1 死信交换机

2 延迟消息插件

RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列_rabbitmq延迟队列插件-CSDN博客

spring:
  rabbitmq:
    host: 127.0.0.1  # RabbitMQ服务器地址
    port: 5672         # RabbitMQ默认端口
    username: guest    # RabbitMQ用户名
    password: guest    # RabbitMQ密码
    virtual-host: /    # 虚拟主机路径(默认为/)

    # 连接工厂配置
    connection-timeout: 5000ms  # 连接超时时间
    requested-heartbeat: 60s    # 心跳间隔
    requested-channel-max: 200  # 最大通道数
    publisher-confirm-type: correlated  # 启用生产者确认
    publisher-returns: true             # 启用未路由消息返回
    template:
      retry:
        enabled: true                   # 启用重试机制
        max-attempts: 3                 # 最大重试次数
        initial-interval: 1000ms        # 初始重试间隔
    listener:
      simple:
        acknowledge-mode: manual      # 消费者手动确认模式
        auto-startup: true            # 自动启动监听器
        concurrency: 1-5              # 消费者线程池范围(最小1~最大5)
        max-concurrency: 10           # 最大并发数
        prefetch-count: 10            # 每次从MQ拉取的消息数量(防止内存溢出)
        default-requeue-rejected: false  # 拒绝的消息不重新入队
        retry:
          enabled: true               # 启用消费者重试
          max-attempts: 3             # 最大重试次数
          initial-interval: 2000ms    # 初始重试间隔
    # 消息转换器配置(JSON序列化)
    message-converter: jackson2JsonMessageConverter  # 使用Jackson2JsonMessageConverter
    # 高级配置(惰性队列)
    lazy-queues: true  # 启用惰性队列(RabbitMQ 3.12+ 默认开启)
    queue:
      mode: lazy       # 显式声明队列为惰性队列(兼容旧版本)
      version: 2       # 使用Classic Queue v2(CQv2)优化性能

# RabbitMQ连接超时相关配置(可选)
rabbitmq:
  connection:
    timeout: 5000ms
    blocking-io-timeout: 5000ms  # 阻塞连接检查超时时间


网站公告

今日签到

点亮在社区的每一天
去签到