一、核心机制:PID与序列号
1. Producer ID (PID)
- 唯一标识:每个生产者实例启动时,由Kafka Broker分配一个全局唯一的PID,用于标识消息来源。
- 持久化存储:PID由Broker持久化保存,确保生产者重启后仍能追踪历史状态(但跨会话时PID会变更)。
2. 序列号 (Sequence Number)
- 分区级递增:生产者为每个分区维护一个单调递增的序列号,从0开始。
- 消息附加:每条消息发送时,附带当前分区的序列号。
- Broker验证:Broker为每个
<PID, Partition>
对记录最后接收的序列号,新消息的序列号必须满足:
- 等于预期值:
SN_new = SN_old + 1
→ 接受并更新序列号。
- 小于预期值:
SN_new < SN_old + 1
→ 视为重复消息,丢弃。
- 大于预期值:
SN_new > SN_old + 1
→ 视为乱序或丢失,触发异常。
二、分区级别幂等性实现
1. 单分区内的唯一性保证
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-cluster:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 启用幂等性
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 默认值,确保消息可靠存储
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 无限重试
- 机制:通过PID和序列号,确保同一生产者实例向同一分区发送的消息不重复。
- 限制:
- 跨分区无效:同一生产者向不同分区发送的消息可能重复。
- 跨会话无效:生产者重启后PID变更,跨会话消息无法保证幂等性。
2. Broker端去重缓存
- 缓存结构:Broker维护最近接收的
<PID, SequenceNumber>
映射,缓存最近5个批次的消息(固定大小,不可配置)。
- 验证流程:
- 接收消息后,检查PID和序列号是否存在于缓存。
- 若存在且序列号连续,接受消息并更新缓存。
- 若序列号不连续或重复,丢弃消息。
三、配置与启用
1. 生产者配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-cluster:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 启用幂等性
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 默认值,确保消息可靠存储
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 无限重试
2. 默认行为
- 启用幂等性后,
acks
自动设为all
,确保所有副本确认后再返回成功。
- 重试机制默认启用,避免因网络问题导致消息丢失。
四、限制与扩展
1. 单会话限制
- PID变更:生产者重启后,PID变更,跨会话消息无法保证幂等性。
- 解决方案:结合事务机制(
transactional.id
)实现跨会话的精确一次语义。
2. 事务扩展
3. 消费者端处理
- 去重需求:消费者需自行处理重复消息,例如:
- 数据库唯一约束:在消息处理时添加业务唯一键(如订单ID)。
- 业务逻辑去重:通过状态检查避免重复操作。
五、性能与调优
1. 性能影响
- Broker端开销:维护PID和序列号缓存增加内存消耗,但通过固定缓存大小(5个批次)平衡性能与空间。
- 客户端优化:
- 增大
batch.size
和linger.ms
,减少网络请求次数。
- 调整
max.in.flight.requests.per.connection
(默认5)以控制并发请求。
2. 高并发优化
- Broker配置:
- 增加
num.io.threads
和queued.max.requests
,提升处理能力。
- 架构优化:动态均衡分区热点,避免单分区过载。
六、总结
- 核心原理:通过PID和序列号在分区级别实现消息唯一性,确保同一生产者会话内消息不重复。
- 适用场景:单分区消息去重,结合事务可扩展至跨分区和跨会话。
- 消费者责任:需额外处理重复消息,依赖业务逻辑或外部机制(如数据库唯一约束)。