文章目录
在消息队列的使用过程中, 消息丢失和消息重复是两个常见且令开发人员困扰的问题。
因为从生产者发送消息,到 Broker 保存消息,再到消费者消费消息,每个环节都暗藏着消息丢失的风险;而消息重复的产生,往往源于生产者的重复发送或消费者的重复接收。
那么接下来,我们深入剖析一下这两个问题及其对应的策略。
一、消息丢失问题的解决方案
(一)发送端丢失
生产者发送消息时,处理不当极易造成消息丢失。目前,主流消息队列普遍支持同步发送和异步发送两种模式。
同步发送时,生产者发送消息后会同步等待 Broker 返回的 ACK 确认消息,只有收到 ACK 才认定消息发送成功;若长时间未收到,则判定发送失败并进行重试。这种方式虽能确保消息不丢失,但会带来性能瓶颈,因此在实际应用中,异步发送更为常用。
以 Kafka 为例,主流消息队列(如 Kafka 和 RocketMQ)通常采用回调函数来保障异步发送时消息不丢失,具体代码如下:
// 配置Kafka生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // 确保所有副本都收到消息才确认
props.put("retries", 3); // 重试次数
Producer<String, String> producer = new KafkaProducer<>(props);
// 创建消息记录
ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "key", "message");
// 异步发送消息并添加回调处理
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// 处理发送失败的情况
logger.error("消息发送失败,topic: {}, partition: {}, 异常信息: {}",
metadata.topic(), metadata.partition(), exception.getMessage());
// 可在此处添加重试逻辑或告警机制
} else {
// 处理发送成功的情况
logger.info("消息发送成功,topic: {}, partition: {}, offset: {}",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
});
// 关闭生产者
producer.close();
(二)存储端丢失
即便生产者成功发送消息,也无法保证消息绝对不丢失。因为若消息发送到 Broker 后,在消费者拉取之前,Broker 突然宕机且消息尚未落盘,同样会导致消息丢失。为避免存储阶段的消息丢失,可从以下方面着手:
1. 同步刷盘
异步刷盘存在消息未落盘 Broker 就宕机的风险,而同步刷盘则是在消息成功落盘后,才向 Sender 返回发送成功的确认,从而从消息发送环节保障消息不丢失。在 RocketMQ 中,只需将flushDiskType
参数配置为SYNC_FLUSH
,即可开启同步刷盘功能。
以下是两种刷盘机制的对比示意图:
2. Broker 集群
若 Broker 集群仅有一个节点,即便消息成功落盘,一旦 Broker 发生故障,在恢复前消费者将无法拉取消息;若出现磁盘故障且无法恢复,消息更是会永久丢失。
采用Broker 集群可有效解决该问题。在 Broker 集群环境下,可设置等待 2 个以上节点同步完消息后,再向 Producer 返回成功确认。如此一来,即便某个 Broker 节点挂掉,也能迅速找到替代节点,确保消息的可用性。
以下是 Broker 集群架构图:
(三)消费端丢失
消费者要确保消息不丢失,需在消费完成后再向 Broker 返回 ACK 确认。主流消息队列中,若 Broker 未收到 ACK,会重新向消费者发送消息。
有时为了解决消息积压问题,消费者会在拉取消息后直接返回 ACK,再异步执行消息处理逻辑。此时,为保证消息不丢失,需在返回 ACK 前将消息持久化到本地,例如保存至数据库,后续可从数据库读取消息进行处理。
以下是消费者消息处理流程图:
二、消息重复问题的解决方案
消息重复产生的原因主要有两点:
- 一是生产者发送消息后未收到 ACK,进而进行重复发送;
- 二是消费者消费完成后,Broker 未收到 ACK,导致消息被重复推送给消费者。
消息重复会对业务产生严重影响,比如电商场景中的重复支付、账务场景中的重复记账等。
以下是消息重复产生原因的分析图:
从当前主流消息队列来看,尚无一款能够直接解决消息重复的消费问题,所以通常需要在消费端进行幂等处理。
以下是几种常见的幂等处理思路:
(一)唯一键约束
若消息会存储到本地数据库,可将消息 ID 设为唯一键;若消息不存入数据库,也可选取消息 ID 或消息中其他具有唯一性的属性,作为唯一键存储到业务数据表中,以此避免重复消费。
(二)保存消费记录
借助 Redis 保存消息 ID 也是一种有效方式。在消费消息前,先判断 Redis 中是否已存在该消息 ID,示例代码如下:
@Service
public class MessageConsumerService {
private static final Logger logger = LoggerFactory.getLogger(MessageConsumerService.class);
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private BusinessService businessService; // 业务处理服务
// 消费消息的方法
public void consumeMessage(String messageId, String messageBody) {
try {
// 1. 检查消息是否已消费(利用Redis的原子性操作)
Boolean isConsumed = redisTemplate.opsForValue().setIfAbsent(
"message:consumed:" + messageId, // Redis键名,格式为 message:consumed:{消息ID}
"1", // 值设为1表示已消费
30, TimeUnit.DAYS); // 设置过期时间,防止内存泄漏
if (isConsumed != null && isConsumed) {
// 2. 消息未被消费,执行具体业务逻辑
try {
businessService.processMessage(messageBody);
logger.info("消息处理成功,messageId: {}", messageId);
} catch (Exception e) {
// 业务处理失败,删除Redis标记以便重新消费
redisTemplate.delete("message:consumed:" + messageId);
logger.error("消息处理失败,已删除消费标记,messageId: {}", messageId, e);
throw e; // 向上抛出异常,触发重试机制
}
} else {
// 3. 消息已被消费,直接跳过
logger.info("消息已被消费,跳过处理,messageId: {}", messageId);
}
} catch (Exception e) {
// 处理异常情况,可根据业务需求添加告警或补偿逻辑
logger.error("消息消费过程中发生异常,messageId: {}", messageId, e);
// 可添加额外的重试逻辑或告警通知
}
}
}
需要注意的是,若消费失败,需及时删除 Redis 中保存的消息 ID,防止后续消息无法正常消费。
三、总结
最后我们用一张图总结一下这篇文章:
消息不丢失、不重复是消息队列的核心需求,但在实际应用中,满足这一要求并非易事。
对于消息丢失问题,主流消息队列可通过消息重试和消息持久化等手段有效解决;然而,消息重试机制又不可避免地带来了消息重复的风险。
目前,主流消息队列在处理消息重复问题上缺乏现成解决方案,对于不允许重复消费的业务场景,开发人员需在 消费端实现幂等处理逻辑,以保障业务的准确性和稳定性。