【面试题】如何保证MQ的消息不丢失、不重复

发布于:2025-06-10 ⋅ 阅读:(22) ⋅ 点赞:(0)


在消息队列的使用过程中, 消息丢失和消息重复是两个常见且令开发人员困扰的问题。

因为从生产者发送消息,到 Broker 保存消息,再到消费者消费消息,每个环节都暗藏着消息丢失的风险;而消息重复的产生,往往源于生产者的重复发送或消费者的重复接收。

那么接下来,我们深入剖析一下这两个问题及其对应的策略。

一、消息丢失问题的解决方案

(一)发送端丢失

生产者发送消息时,处理不当极易造成消息丢失。目前,主流消息队列普遍支持同步发送和异步发送两种模式。

同步发送时,生产者发送消息后会同步等待 Broker 返回的 ACK 确认消息,只有收到 ACK 才认定消息发送成功;若长时间未收到,则判定发送失败并进行重试。这种方式虽能确保消息不丢失,但会带来性能瓶颈,因此在实际应用中,异步发送更为常用。

以 Kafka 为例,主流消息队列(如 Kafka 和 RocketMQ)通常采用回调函数来保障异步发送时消息不丢失,具体代码如下:

// 配置Kafka生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // 确保所有副本都收到消息才确认
props.put("retries", 3); // 重试次数

Producer<String, String> producer = new KafkaProducer<>(props);

// 创建消息记录
ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "key", "message");

// 异步发送消息并添加回调处理
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            // 处理发送失败的情况
            logger.error("消息发送失败,topic: {}, partition: {}, 异常信息: {}", 
                metadata.topic(), metadata.partition(), exception.getMessage());
            
            // 可在此处添加重试逻辑或告警机制
        } else {
            // 处理发送成功的情况
            logger.info("消息发送成功,topic: {}, partition: {}, offset: {}",
                metadata.topic(), metadata.partition(), metadata.offset());
        }
    }
});

// 关闭生产者
producer.close();

(二)存储端丢失

即便生产者成功发送消息,也无法保证消息绝对不丢失。因为若消息发送到 Broker 后,在消费者拉取之前,Broker 突然宕机且消息尚未落盘,同样会导致消息丢失。为避免存储阶段的消息丢失,可从以下方面着手:

1. 同步刷盘

异步刷盘存在消息未落盘 Broker 就宕机的风险,而同步刷盘则是在消息成功落盘后,才向 Sender 返回发送成功的确认,从而从消息发送环节保障消息不丢失。在 RocketMQ 中,只需将flushDiskType参数配置为SYNC_FLUSH,即可开启同步刷盘功能。

以下是两种刷盘机制的对比示意图:
在这里插入图片描述
在这里插入图片描述

2. Broker 集群

若 Broker 集群仅有一个节点,即便消息成功落盘,一旦 Broker 发生故障,在恢复前消费者将无法拉取消息;若出现磁盘故障且无法恢复,消息更是会永久丢失。

采用Broker 集群可有效解决该问题。在 Broker 集群环境下,可设置等待 2 个以上节点同步完消息后,再向 Producer 返回成功确认。如此一来,即便某个 Broker 节点挂掉,也能迅速找到替代节点,确保消息的可用性。

以下是 Broker 集群架构图:
在这里插入图片描述

(三)消费端丢失

消费者要确保消息不丢失,需在消费完成后再向 Broker 返回 ACK 确认。主流消息队列中,若 Broker 未收到 ACK,会重新向消费者发送消息。

有时为了解决消息积压问题,消费者会在拉取消息后直接返回 ACK,再异步执行消息处理逻辑。此时,为保证消息不丢失,需在返回 ACK 前将消息持久化到本地,例如保存至数据库,后续可从数据库读取消息进行处理。

以下是消费者消息处理流程图:
在这里插入图片描述

二、消息重复问题的解决方案

消息重复产生的原因主要有两点:

  • 一是生产者发送消息后未收到 ACK,进而进行重复发送;
  • 二是消费者消费完成后,Broker 未收到 ACK,导致消息被重复推送给消费者。

消息重复会对业务产生严重影响,比如电商场景中的重复支付、账务场景中的重复记账等。

以下是消息重复产生原因的分析图:
在这里插入图片描述

在这里插入图片描述

从当前主流消息队列来看,尚无一款能够直接解决消息重复的消费问题,所以通常需要在消费端进行幂等处理

以下是几种常见的幂等处理思路:

(一)唯一键约束

若消息会存储到本地数据库,可将消息 ID 设为唯一键;若消息不存入数据库,也可选取消息 ID 或消息中其他具有唯一性的属性,作为唯一键存储到业务数据表中,以此避免重复消费。

(二)保存消费记录

借助 Redis 保存消息 ID 也是一种有效方式。在消费消息前,先判断 Redis 中是否已存在该消息 ID,示例代码如下:

@Service
public class MessageConsumerService {
    private static final Logger logger = LoggerFactory.getLogger(MessageConsumerService.class);
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Autowired
    private BusinessService businessService; // 业务处理服务
    
    // 消费消息的方法
    public void consumeMessage(String messageId, String messageBody) {
        try {
            // 1. 检查消息是否已消费(利用Redis的原子性操作)
            Boolean isConsumed = redisTemplate.opsForValue().setIfAbsent(
                "message:consumed:" + messageId, // Redis键名,格式为 message:consumed:{消息ID}
                "1",                             // 值设为1表示已消费
                30, TimeUnit.DAYS);             // 设置过期时间,防止内存泄漏
            
            if (isConsumed != null && isConsumed) {
                // 2. 消息未被消费,执行具体业务逻辑
                try {
                    businessService.processMessage(messageBody);
                    logger.info("消息处理成功,messageId: {}", messageId);
                } catch (Exception e) {
                    // 业务处理失败,删除Redis标记以便重新消费
                    redisTemplate.delete("message:consumed:" + messageId);
                    logger.error("消息处理失败,已删除消费标记,messageId: {}", messageId, e);
                    throw e; // 向上抛出异常,触发重试机制
                }
            } else {
                // 3. 消息已被消费,直接跳过
                logger.info("消息已被消费,跳过处理,messageId: {}", messageId);
            }
        } catch (Exception e) {
            // 处理异常情况,可根据业务需求添加告警或补偿逻辑
            logger.error("消息消费过程中发生异常,messageId: {}", messageId, e);
            // 可添加额外的重试逻辑或告警通知
        }
    }
}

需要注意的是,若消费失败,需及时删除 Redis 中保存的消息 ID,防止后续消息无法正常消费。

在这里插入图片描述

三、总结

最后我们用一张图总结一下这篇文章:
在这里插入图片描述

消息不丢失、不重复是消息队列的核心需求,但在实际应用中,满足这一要求并非易事。

对于消息丢失问题,主流消息队列可通过消息重试和消息持久化等手段有效解决;然而,消息重试机制又不可避免地带来了消息重复的风险。

目前,主流消息队列在处理消息重复问题上缺乏现成解决方案,对于不允许重复消费的业务场景,开发人员需在 消费端实现幂等处理逻辑,以保障业务的准确性和稳定性。