一 kafka事务介绍
1.1 Kafka事务的作用
Exactly-Once Semantics (EOS):在“消费 → 处理 → 生产”的流式链路里避免重复写与重复读带来的副作用,确保“处理一次且仅一次”的可见效果。
跨分区 / 跨 Topic 原子性:将一次处理内写入的多分区多主题消息,以及本次消费位点 offset 的提交,绑定在同一个事务里,要么都生效,要么都回滚。
1.2 相关术语
PID / Producer ID、Epoch、Sequence Number:幂等生产者元数据,避免重复写。
事务协调器(Transaction Coordinator):位于 broker 侧的协调者,管理事务状态机与两阶段提交。
控制批次(Control Batch / Control Records):日志里的特殊记录,用于标记事务,主要是 COMMIT / ABORT(注意:数据分区不写“BEGIN”标记)。
LSO(Last Stable Offset) 与 HW(High Watermark):对
read_committed
消费者只暴露到 LSO,屏蔽未决事务。__transaction_state
:kafka内部主题,用于持久化事务状态机。__consumer_offsets
:kafka内部主题,存消费组位点;位点也可以被纳入事务。僵尸实例:一个旧的 Producer 实例(带着同样的
transactional.id
)在崩溃或网络分区后挂掉了,但它可能在恢复后继续尝试往 Kafka 写数据,但是与此同时,已经有一个新的 Producer 实例已经起来并接管了同样的transactional.id,我们把这个宕机后又恢复的producer叫做僵尸实例
1.3 消费者隔离级别
消费者的隔离级别有下面两种
read_uncommitted
(默认):可读到未提交和已提交数据。read_committed
:只读取已提交事务的数据(EOS 流水线应使用)。
假设想要配置消费者隔离级别为read_committed,可通过下面配置完成
props.put("isolation.level", "read_committed");
二、使用 Kafka 事务
2.1 生产者端配置
Properties props = new Properties();
// broker地址
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
// transactional.id 必须唯一且稳定(可复用)
props.put("transactional.id", "order-service-txn-1");
// 配了 transactional.id 会自动开启,但是最好还是显式配置
props.put("enable.idempotence", "true");
/**
然后通常由客户端自动/隐式设置为适配幂等语义:
要求acks=all、retries>0 max.in.flight.requests.per.connection<=5 等,
不配置就会取默认的值,比如retries = Integer.MAX_VALUE
*/
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 找到协调器、申请 PID/epoch、登记事务状态
producer.initTransactions();
2.2 事务性生产
// 开启事务
producer.beginTransaction();
// 发送消息
producer.send(new ProducerRecord<>("demo-topic", "key1", "message-1"));
producer.send(new ProducerRecord<>("demo-topic", "key2", "message-2"));
producer.send(new ProducerRecord<>("demo-topic2", "key3", "message-3"));
// 提交事务
producer.commitTransaction();
这样,对于配置了read_committed的消费者而言,要么这三个消息同时可见,要么同时不可见。
2.3 实践建议
使用稳定且可复用的
transactional.id,这样
服务重启后就可恢复事务上下文,还能对“僵尸实例”做围栏。事务应尽可能短小且频繁提交,避免长时间占用导致 LSO 卡住,增加读延迟。
失败重试要以事务回滚为界,确保回滚后可安全重放。
EOS 只覆盖 Kafka 内部的原子性;涉及外部系统,则需要额外使用 Outbox/Saga 等模式。
三 kafka事务的实现
3.1 关键组件
事务生产者:发数据、报告参与分区、发起事务结束(提交/回滚)。
消费组协调器(Group Coordinator):当offset被纳入事务时,消费组协调器需要把最新offset发送到专门存储offset的内部主题
__consumer_offsets中,
所以消费者协调器和__consumer_offsets里的对应分区
也是事务参与者。事务协调器(Transaction Coordinator):负责给生产者分配 Producer ID/epoch(每个
transactional.id
对应一个PID),维护事务状态机,持久化事务日志,并且当事务结束时(commit 或 abort),事务协调器 会把这个事务的结果(commit/abort 标记)广播到所有该事务涉及的分区)。数据分区所在的 Broker Leader:接受数据与控制批次写入,维护 High Watermark/Last Stable Offset与中止事务索引。
消费者:根据隔离级别获取数据,包括
read_committed
和read_uncommitted
,依赖隔离级别和abortedTransactions
过滤。
3.2 事务实现流程
下图是kafka事务消息的总体流程图
3.2.1 幂等生产者
协调器为每个
transactional.id
分配 PID 与 epoch。生产者对每个分区维护单调递增的序列号;Broker 端以
(PID, epoch, seq)
去重,避免“重复写”。若同一
transactional.id
的新实例启动并initTransactions()
,协调器会提升 epoch 并围栏旧实例;旧实例写入不会成功并且得到INVALID_PRODUCER_EPOCH
/ProducerFencedException
。
3.2.2 事务状态机与内部日志
事务协调器将每个事务的状态持久化到
__transaction_state
:
EMPTY/ONGOING → PREPARE_COMMIT | PREPARE_ABORT → COMPLETE_COMMIT | COMPLETE_ABORT
事务涉及到的分区集合(数据分区与
__consumer_offsets
的目标分区)由生产者在首次写入/首次提交位点时通过
AddPartitionsToTxn
/AddOffsetsToTxn
报告给协调器并持久化。
3.2.3 两阶段提交(2PC)
与传统数据库不同的是,数据分区里只写“结束标记”——COMMIT 或 ABORT 的控制批次;不写 BEGIN。BEGIN 只体现在协调器的内部状态机与日志。信息会包含自己所属的事务producer。
阶段 A:事务进行中(ONGOING)
beginTransaction()
后,生产者向多个分区写入消息(每条携带 PID/epoch/seq)。如首次写入某分区,生产者会先向协调器请求
AddPartitionsToTxn
,协调器会记录“本事务涉及到这个分区”。
阶段 B:准备提交(PREPARE_COMMIT)/ 准备回滚(PREPARE_ABORT)
生产者调用
commitTransaction()
(或abortTransaction()
),就会发送EndTxn请求
给协调器。协调器把事务状态改为
PREPARE_COMMIT
(或PREPARE_ABORT
)并写入kafka内内部主题__transaction_state
。扇出:协调器向所有涉及分区的 leader 发起WriteTxnMarkers请求。
阶段 C:各分区落盘控制记录 + 反馈
在收到事务协调器的WriteTxnMarkers
请求后,
各分区在自己的日志里追加一个“控制批次(Control Batch)”,类型为 COMMIT 或 ABORT。注意kafka没有“BEGIN”控制批次,BEGIN 信息由协调器掌分区 leader 追加成功后应答协调器。
当所有目标分区都落成控制批次,协调器将事务状态置为
COMPLETE_COMMIT
(或COMPLETE_ABORT
),并更新__transaction_state
。
3.3 可见性控制
HW(High Watermark):副本多数派确认的最高位移。
read_uncommitted
可读到 HW。LSO(Last Stable Offset):保证其之前没有“未决事务”的最末位移。
对read_committed
,Broker 只返回 ≤ LSO 的数据,从源头屏蔽未提交事务。为何消费者还能拿到“已中止事务”的数据片段?
为性能考虑,Broker 可能仍返回包含已中止事务记录的批次,但会携带一个
abortedTransactions 列表(含producerId
与firstOffset
)。客户端在解码时跳过这些记录。事务索引(.txnindex):每个日志段都有一个中止事务索引,Broker 用它在 Fetch 时快速收集
abortedTransactions
列表。
小结:在
read_committed
下,消费者不用“暂存不确定状态数据”去等控制标记;Broker 通过 LSO 保证不给你发“未决事务”的记录。客户端只需在已决事务里过滤 ABORT 记录(根据abortedTransactions
)。
3.4 消费-处理-生产 模式中消费offset与输出的原子绑定
sendOffsetsToTransaction(offsets, groupMetadata)
背后做了两件事,
1 AddOffsetsToTxn
告诉事务协调器:这次事务会提交哪个消费组的位点
2 TxnOffsetCommit
把位点写入 __consumer_offsets
对应分区
在最终 COMMIT(或 ABORT)时,__consumer_offsets
分区也会收到相应的 COMMIT/ABORT 控制批次,从而与输出数据一并原子生效(或放弃)。
3.5 常见故障的处理
3.5.1 失败与恢复
如果某些分区暂不可用,协调器会持续重试
WriteTxnMarkers
(最终一致的 2PC)。事务超时(由客户端
transaction.timeout.ms
申请,受 broker 上限约束)协调器主动 ABORT 并下发 ABORT 标记。协调器宕机可通过
__transaction_state
重放恢复事务状态并继续扇出事务标记。在事物未提交之前,配置了
read_committed的消费者
不会看到未决事务。
3.5.2 应对僵尸实例
Kafka 引入了 Producer Epoch,通过围栏机制来隔离僵尸实例。每个 Producer 在第一次用某个
transactional.id
初始化事务时,Kafka 的 Transaction Coordinator 会给它分配一个 producerId 和 producerEpoch。当相同transactional.id
的新实例启动时,Coordinator 会给它分配 更高的 epoch,并更新元数据。就这样,新实例可以用高 epoch 写数据,而旧实例(僵尸)带着低 epoch 再写数据时,Broker 会直接拒绝。
四 运维和调优要点
事务大小与超时
客户端的
transaction.timeout.ms
受 Broker 端上限约束(如transaction.max.timeout.ms
)。事务过大或时间过长,会拖慢 LSO 前进,导致
read_committed
消费延迟升高
围栏与异常
ProducerFencedException
/INVALID_PRODUCER_EPOCH
:同一transactional.id
新实例已接管;旧实例必须停止。TransactionAbortedException
:本事务已被中止;需要清理/重启事务。
副本与可靠性
幂等/EOS 通常要求
acks=all
与合适的min.insync.replicas
。避免不干净选主导致重写。
重要监控指标
生产端:
transactional.commit.latency.avg
、transactional.abort.rate
、record-errors
/retries
。Broker:
transaction-coordinator-metrics
(扇出延迟、超时/中止率)、replica-fetcher-metrics
。消费端:
records-lag-max
(在read_committed
下对 LSO 滞后敏感)。
主题压缩与控制记录
控制批次(COMMIT/ABORT)是特殊记录,日志清理/压缩会保留其必要语义,确保历史可正确回放。
边界与限制
事务只在同一 Kafka 集群内跨 Topic/分区原子;不跨外部系统。
超大事务(大量分区/消息)会放大标记扇出成本与恢复时间。
五 Kafka Streams 中的事务
processing.guarantee=exactly_once_v2
/exactly_once
:Streams 在内部为每个任务(Task)维护事务性生产者,把处理结果与位点绑定到同一事务中;重平衡时靠 epoch 围栏防止旧实例写入。