RabbitMQ重复消费如何解决

发布于:2025-03-13 ⋅ 阅读:(13) ⋅ 点赞:(0)

消息重复消费的原因

  1. 生产者重试:网络波动导致生产者未收到 Broker 确认,重复发送消息。
  2. 消费者失败:消费者处理消息后未发送 ACK,消息重新入队。
  3. 集群故障转移:主节点宕机,未确认消息被重新投递。

解决方案

1. 消费者幂等性设计

原理:确保同一消息多次处理的结果与一次处理相同。

实现方式
  • 数据库唯一约束
    利用业务字段(如订单号)的唯一性约束,避免重复插入数据。

    CREATE TABLE orders (
        id VARCHAR(64) PRIMARY KEY, -- 唯一订单号
        amount DECIMAL(10,2)
    );
    
    // Java 示例(使用 MyBatis)
    public void processOrder(Order order) {
        try {
            orderMapper.insert(order); // 唯一约束冲突时会抛出异常
            // 业务逻辑...
        } catch (DuplicateKeyException e) {
            // 已处理过该订单,直接跳过
            log.warn("订单已存在: {}", order.getId());
        }
    }
    
  • Redis 原子操作
    使用 Redis 记录已处理消息的 ID,通过 SETNX 命令实现原子性检查。

    // Java 示例(使用 Spring Data Redis)
    public boolean isMessageProcessed(String messageId) {
        Boolean result = redisTemplate.opsForValue()
            .setIfAbsent("msg:" + messageId, "1", Duration.ofMinutes(30));
        return Boolean.TRUE.equals(result);
    }
    
    public void consumeMessage(Message message) {
        String messageId = message.getMessageId();
        if (!isMessageProcessed(messageId)) {
            // 已处理过,直接返回
            return;
        }
        // 业务逻辑...
    }
    

2. 消息全局唯一 ID

原理:为每条消息分配唯一 ID,消费者记录已处理 ID。

实现步骤
  1. 生产者端:发送消息时附加唯一 ID。

    // Java 示例(使用 RabbitTemplate)
    public void sendOrder(Order order) {
        String messageId = UUID.randomUUID().toString();
        Message message = MessageBuilder
            .withBody(order.toJson().getBytes())
            .setHeader("messageId", messageId)
            .build();
        rabbitTemplate.send("order.exchange", "order.key", message);
    }
    
  2. 消费者端:处理前检查 ID 是否已存在。

    // Java 示例(使用 @RabbitListener)
    @RabbitListener(queues = "order.queue")
    public void handleOrder(Message message, Channel channel) throws IOException {
        String messageId = message.getMessageProperties().getHeader("messageId");
        if (redisTemplate.hasKey("processed:" + messageId)) {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            return;
        }
        // 业务逻辑...
        redisTemplate.opsForValue().set("processed:" + messageId, "1", Duration.ofDays(1));
        channel.basicAck(deliveryTag, false);
    }
    

3. 手动确认模式(Manual ACK)

原理:消费者处理完消息后手动发送 ACK,避免消息因异常重新入队。

配置与代码
  1. 配置手动 ACK(Spring Boot):

    spring:
      rabbitmq:
        listener:
          simple:
            acknowledge-mode: manual
    
  2. 消费者逻辑

    @RabbitListener(queues = "order.queue")
    public void handleOrder(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            // 业务逻辑...
            channel.basicAck(deliveryTag, false); // 确认消息
        } catch (Exception e) {
            channel.basicNack(deliveryTag, false, true); // 重入队列
        }
    }
    

4. 消息去重表

原理:在数据库中维护一张去重表,记录已处理的消息 ID。

表结构
CREATE TABLE message_dedup (
    message_id VARCHAR(128) PRIMARY KEY,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
消费者逻辑
public void consumeMessage(Message message) {
    String messageId = extractMessageId(message);
    try {
        jdbcTemplate.update("INSERT INTO message_dedup (message_id) VALUES (?)", messageId);
        // 业务逻辑...
    } catch (DuplicateKeyException e) {
        // 消息已处理,直接ACK
        channel.basicAck(deliveryTag, false);
    }
}

5. 消息过期与死信队列

原理:设置消息 TTL,超时未处理则转入死信队列,避免无限重试。

配置队列 TTL 和死信交换
// Java 配置示例
@Bean
public Queue orderQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 60000); // 消息60秒过期
    args.put("x-dead-letter-exchange", "dlx.exchange");
    args.put("x-dead-letter-routing-key", "dlx.key");
    return new Queue("order.queue", true, false, false, args);
}

@Bean
public DirectExchange dlxExchange() {
    return new DirectExchange("dlx.exchange");
}

@Bean
public Queue dlxQueue() {
    return new Queue("dlx.queue");
}

@Bean
public Binding dlxBinding() {
    return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.key");
}

方案对比与选型

方案 优点 缺点 适用场景
数据库唯一约束 无需额外组件 高并发下数据库压力大 低频业务(如订单创建)
Redis 原子操作 高性能 需维护 Redis 高可用 高频业务(如支付回调)
手动ACK 避免消息丢失 需处理ACK异常 所有需要可靠消费的场景
消息去重表 数据持久化 增加数据库写入压力 数据一致性要求高的场景
死信队列 避免消息堆积 需额外处理死信消息 需要异常消息兜底的场景

总结

  • 幂等性设计是核心:无论消息重复多少次,业务结果保持一致。
  • 组合使用多种方案:例如“手动ACK + Redis去重”兼顾可靠性与性能。
  • 监控与告警:通过 RabbitMQ 管理界面监控消息积压情况,设置阈值告警。