【博客】
Kafka生产者事务机制原理
一、为什么要引入事务?
在使用 Kafka 的早期版本时,开发者经常会遇到两种场景:
跨会话重复消息
Producer 重启后,之前的重试逻辑会导致同一条消息被再次发送,消费者需要做幂等处理。跨分区原子性缺失
一批消息要同时写入多个 Topic / 多个 Partition,如果某一条失败,前面成功的消息无法回滚,业务数据出现“中间状态”。
Kafka 在 0.11.0.0 引入的事务(Transactions)正是为了解决“恰好一次(Exactly-Once)语义”的痛点,同时兼顾跨会话幂等性、跨分区原子性和 consume-process-produce 模式的一致性。
二、事务的四大核心目标
目标 | 说明 |
---|---|
原子性 | 一组消息要么全部成功,要么全部失败。 |
跨会话幂等 | Producer 重启后,仍能识别并去重“上一次未完成的事务”。 |
一致性 | consume-process-produce 模式下,消费位点与下游发送结果保持一致。 |
隔离性 | 事务未提交的消息对消费者不可见,防止脏读。 |
三、事务 API 速查表
Kafka Producer 端只提供了 5 个与事务相关的方法,掌握它们就能完成 90% 的编程需求:
方法 | 作用 |
---|---|
initTransactions() |
向 Coordinator 注册全局唯一 transactional.id ,做初始化。 |
beginTransaction() |
显式开启一个事务。 |
sendOffsetsToTransaction() |
把消费者 offset 作为事务的一部分提交,用于 consume-process-produce 模式。 |
commitTransaction() |
全部成功,两阶段提交中的“真正提交”。 |
abortTransaction() |
出现异常,回滚当前事务。 |
Spring Boot 用户可以用 @Transactional
或 kafkaTemplate.executeInTransaction()
进行声明式/编程式事务,原理一致。
四、事务运行流程(两阶段提交 2PC)
Kafka 没有照搬传统 XA 的复杂协议,而是基于内部 Topic 实现了一个轻量级 2PC。
1. 组件角色
角色 | 说明 |
---|---|
Producer | 业务进程,负责发送消息。 |
Transaction Coordinator | 一个 Broker 内的模块,充当 2PC 的协调者。 |
__transaction_state |
内部 Topic,持久化事务状态(Ongoing → Prepare → Commit/Abort)。 |
目标 Topic-Partition | 最终存放业务数据。 |
2. Kafka事务机制原理
源码位置:org.apache.kafka.clients.producer.internals.TransactionManager
,画出Kafka 事务 2PC 全景
┌────────────────────────────────────────────────────────────┐
│ Kafka 事务 2PC 全景 │
├───────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ Producer │ │Transaction │ │ Brokers │
│(transaction.id│──▶│Coordinator(TC) │◀──┤(Data partitions)│
└───────────────┘ └──────────────────┘ └──────────────────┘
│ │ │
│ 1. initTransactions │ │
│--------------------->│ 2. 写 __transaction_state
│ │ topic 记录 BEGIN
│ 3. send() │ │
│-----------------------------▶ 4. 写消息到目标分区
│ │ │
│ 5. commitTransaction │ │
│--------------------->│ 6. 写 PREPARE_COMMIT
│ │ │
│ │ 7. 给各分区写 COMMIT 标记
│ │◀────────────────────┘
│ │ 8. 写 __transaction_state
│ │ 记录 COMMITTED
初始化阶段
initTransactions()
→ 找到 Coordinator → 注册transactional.id
,幂等 Producer 自动开启(enable.idempotence=true
)。开始事务
beginTransaction()
仅在客户端打一个标记,不会立即与 Broker 交互。发送消息
调用producer.send()
,消息并未直接写入目标分区,而是暂存客户端的RecordAccumulator
,并标记为事务消息。预提交(Prepare)
客户端 flush 或commitTransaction()
时,Coordinator 收到EndTxn(Prepare)
,把事务状态写入__transaction_state
,并向所有涉及的 Topic-Partition 写入 事务控制消息(Control Batch)。正式提交(Commit)
Coordinator 收到所有 Partition 的 ACK 后,写入__transaction_state
的 Commit 标记,并向各 Partition Leader 发送 COMMIT Marker。
消费者只有在看到 COMMIT Marker 后,才能看到这批消息。至此事务对外可见。异常回滚(Abort)
任何一步失败,Producer 捕获异常后调用abortTransaction()
,流程同上,只是把标记改成 ABORT,消息对消费者永久不可见。
__transaction_state
状态有ongoing
、prepare
、committed
,和对应操作的具体图示:
Producer Transaction Coordinator 日志 & 分区
| | |
|--- init(t.id) ------>|--- 记录事务ID ---------------->|
| | |
|--- begin() --------->|--- 状态=ongoing -------------->|
| | |
|--- send 消息 -------->|--- 写入未提交数据 ------------>|
| | |
|--- commit() -------->|--- 状态=prepare -------------->|
| |--- 写 commit marker ---------->|
| |--- 状态=committed ------------>|
生产者、Transactions Coordinator的相互作用图示:
A:生产者通过initTransactions API向Coordinator 注册事务ID
B:Transactions Coordinator 记录事务日志
C:生产者把消息写入分区
D:分区和Coordinator的交互。(当事务完成以后,消息的状态应该是已提交,消费者才可以消费)
五、代码实现
原生 API
// 1. 配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
props.put("transactional.id", "order-tx-" + UUID.randomUUID());
props.put("enable.idempotence", "true"); // 自动开启幂等
props.put("isolation.level", "read_committed"); // 消费者只读已提交
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
// 2. 初始化
producer.initTransactions();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) continue;
producer.beginTransaction();
try {
for (ConsumerRecord<String, String> r : records) {
String newVal = transform(r.value()); // 业务逻辑
producer.send(new ProducerRecord<>("target-topic", r.key(), newVal));
}
// 把消费位点也放进事务
producer.sendOffsetsToTransaction(
offsets(records),
new ConsumerGroupMetadata("myGroup")
);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
}
Spring Boot(声明式)
@Component
public class OrderListener {
@Autowired
private KafkaTemplate<String, String> template;
@KafkaListener(topics = "order-in")
public void listen(ConsumerRecord<String, String> record,
Acknowledgment ack) {
template.executeInTransaction(t -> {
try {
// 1. 业务
String newVal = processOrder(record.value());
// 2. 写下游
t.send("order-out", record.key(), newVal);
// 3. 提交 offset
t.sendOffsetsToTransaction(
Map.of(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)),
new ConsumerGroupMetadata("order-group"));
return null;
} catch (Exception e) {
throw new KafkaException("事务失败", e); // 触发回滚
}
});
}
private String processOrder(String json) {
// 业务逻辑
return json.toUpperCase();
}
}
六、小结
Kafka 事务 = 幂等 Producer + 两阶段提交 + 内部 Topic 日志。
掌握 init → begin → commit/abort 三步曲,即可获得消息层面的 ACID。
🚀 下一步:把本地数据库事务与 Kafka 事务组合,实现真正的 端到端 Exactly-Once。用思维导图总结本博客内容: