Java研学-RabbitMQ(七)

发布于:2025-08-15 ⋅ 阅读:(17) ⋅ 点赞:(0)

一 MQ消息可靠性

  RabbitMQ 默认将消息存于内存以追求低延迟,但存在宕机丢消息和内存积压阻塞的风险,可通过数据持久化(将交换机、队列及消息标记为 durable,使数据落盘)和惰性队列(Lazy Queue)(通过 x-queue-mode=lazy 参数让消息直接写入磁盘而非内存,牺牲部分性能换取高吞吐与抗积压能力)的组合方案,从根本上解决消息可靠性与内存溢出问题,尤其适用于大消息量或高可用性要求的场景。

二 数据持久化

  于Spring中构建的交换机与队列,默认为持久化的。Spring发送消息默认为持久化的。(控制台需要选择)

  持久化交换机/队列(配置 durable=true)和持久化消息(设置 delivery_mode=2)会将数据写入磁盘,确保重启后不丢失,但会因磁盘 I/O 降低性能,适合关键业务;而临时交换机/队列(默认)和临时消息(delivery_mode=1)仅存内存,重启后丢失,但吞吐量更高,适用于日志、实时监控等非核心场景,配置时需根据可靠性需求权衡选择。

组件 作用 典型场景 关键注意事项
持久化交换机 重启后自动恢复交换机元数据(类型、绑定规则),确保消息能路由到队列。 生产环境核心交换机(如订单路由、通知分发)。
避免重启后消息因路由丢失。
仅交换机持久化 ≠ 消息不丢失,需配合队列和消息持久化。
默认建议启用(durable=true)。
持久化队列 重启后恢复队列结构(名称、属性)及未消费消息的元数据(需消息持久化配合)。 长期存储消息的队列(如待处理订单、延迟任务)。
防止重启后队列消失导致积压丢失。
队列持久化 ≠ 消息持久化,两者需同时配置。
惰性队列(Lazy Queue)可优化大积压磁盘存储。
持久化消息 消息内容写入磁盘,与持久化队列配合实现重启后消息完全恢复。 关键业务消息(支付成功通知、用户注册验证码)。
需重试或审计的场景。
同步磁盘写入导致吞吐量下降 30%-50%。
建议仅对高价值消息启用(delivery_mode=2)。

1 交换机持久化

在这里插入图片描述

2 队列持久化

在这里插入图片描述

3 消息持久化

在这里插入图片描述
  于队列中发送2条消息并重启MQ
在这里插入图片描述
  重启MQ后,只有持久化的消息存在于队列中
在这里插入图片描述

4 百万条数据测试

  ① 关闭生产者确认机制

spring:
  rabbitmq:
    host: 192.168.44.128
    port: 5672
    virtual-host: /midhuang
    username: dahuang
    password: "dahuang66"
    connection-timeout: 1s    # 连接超时时间

    # 生产者确认机制配置
    publisher-confirm-type: none        # 关闭异步回调确认模式
    publisher-returns: false            # 关闭Return机制(捕获路由失败)

    template:
      mandatory: true                   # 强制触发ReturnCallback
      # 生产者重试配置(针对网络波动等临时性失败)
      retry:
        enabled: true                   # 开启重试
        initial-interval: 1000ms        # 首次重试间隔(1秒)
        multiplier: 1.0                 # 后续重试间隔倍数(1.0表示固定间隔)
        max-attempts: 3                 # 最大重试次数(实际重试次数=max-attempts-1)
logging:
  level:
    cn.tj: DEBUG # 设置为 DEBUG 以查看详细日志

  ② 测试代码 – 发送临时消息

@Slf4j
@SpringBootTest
class PublisherApplicationTests {
    // 注入 RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Test
    void testPageout() {
        // 创建非持久化消息
        Message message = MessageBuilder
                .withBody("hello".getBytes(StandardCharsets.UTF_8))//使用标准UTF-8编码
                .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)//设置为非持久化模式
                .build();

        // 循环发送100万条消息
        for (int i = 0; i < 1_000_000; i++) {// 下划线提高数字可读性
            rabbitTemplate.convertAndSend("direct.queue1", message);
        }
    }
}

  ③ Paged Out
   是当内存使用接近阈值(默认总内存40%)时,系统自动将非持久化消息从内存置换到磁盘以释放内存的内存保护机制,触发条件由 vm_memory_high_watermark_paging_ratio(默认0.5,即内存达20%时触发)控制。仅针对非持久化消息(即未设置 delivery_mode=2 的消息)。持久化消息会直接写入磁盘,不受此机制影响。

  与持久化消息(内存+磁盘同步存储)不同,重启时系统仅恢复标记为 delivery_mode=2 的持久化消息,而 "Paged Out" 的临时文件会被自动清理且队列索引无法重建,导致消息丢失,且频繁置换会导致磁盘I/O性能下降(阻塞)。可通过升级为 Lazy Queue(直接写入磁盘)、调整内存阈值或增加消费者实例来优化,避免因内存压力引发的消息处理延迟。

在这里插入图片描述
  ④ 测试代码 – 发送持久化消息

@Slf4j
@SpringBootTest
class PublisherApplicationTests {
    // 注入 RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Test
    void testPageout() {
        // 创建非持久化消息
        Message message = MessageBuilder
                .withBody("hello".getBytes(StandardCharsets.UTF_8))// 使用标准UTF-8编码
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)// 设置为持久化模式
                .build();

        // 循环发送100万条消息
        for (int i = 0; i < 1_000_000; i++) {// 下划线提高数字可读性
            rabbitTemplate.convertAndSend("direct.queue1", message);
        }
    }
}

  每一次的操作都会写入到磁盘中,不会出现阻塞的情况
在这里插入图片描述

三 Lazy Queue

  RabbitMQ 惰性队列(Lazy Queue)自 3.6.0 引入后,通过直接存盘、按需加载机制显著降低内存占用(默认仅缓存 2048 条消息),支持数百万消息堆积且默认持久化,适合低频消费或资源受限场景;3.12.0 起默认全局启用且不可关闭,虽牺牲部分吞吐量(依赖磁盘 I/O)但提升稳定性,需根据业务对延迟/容量的需求选择使用。

1 创建Lazy Queue队列 – 控制台

在这里插入图片描述

2 创建Lazy Queue队列 – 代码

  ① 构造Bean,3.12+ 默认所有队列为惰性,此配置可省略

@Bean
public Queue lazyQueue() {
    return QueueBuilder.durable("lazy.queue") // 持久化队列
            .lazy()                          // 显式开启Lazy模式(3.12+版本可省略)
            .build();
}

  ② 注释,3.12+ 版本无需此参数,因默认已是惰性队列

@RabbitListener(queuesToDeclare = @Queue(
        name = "lazy.queue",
        durable = "true",
        arguments = @Argument(name = "x-queue-mode", value = "lazy") // 3.12+版本无需此参数
))
public void listenLazyQueue(String msg) {
    log.info("接收到lazy.queue的消息:{}", msg);
}

3 发送测试消息

@Slf4j
@SpringBootTest
class PublisherApplicationTests {
    // 注入 RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
	@Test
    void testPageout() {
        // 创建非持久化消息
        Message message = MessageBuilder
                .withBody("hello".getBytes(StandardCharsets.UTF_8)) 
                .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)// 设为非持久化模式
                .build();

        // 循环发送100万条消息
        for (int i = 0; i < 1_000_000; i++) {  // 使用下划线提高数字可读性
            rabbitTemplate.convertAndSend("lazy.queue", message);
        }
    }
}

  速度提升 不会出现阻塞情况 直接存入磁盘 不再是经过内存再到PageOut
在这里插入图片描述
  开启持久化和生产者确认时,RabbitMQ只有在消息持久化完成后才会给生产者返回ACK回执


网站公告

今日签到

点亮在社区的每一天
去签到