1. Kafka中事务的几个基本概念
Kafka 事务主要由 生产者(Producer) 来实现,核心概念包括:
- TransactionalId:事务 ID,Kafka 用它来唯一标识一个事务。
- Transaction Coordinator:事务协调器,负责管理事务的状态和提交过程。
- Producer Id (PID) & Epoch:每个事务生产者会被分配一个唯一的
PID
和Epoch
,用于事务恢复和幂等性支持。 - Write-Ahead Log (WAL):Kafka 事务使用内部的 事务日志(WAL) 记录事务状态,以支持事务恢复。
2. Kafka 事务的执行流程
- 初始化事务:生产者需要设置
transactional.id
并初始化事务。 - 开启事务:调用
beginTransaction()
开始事务。 - 发送数据:生产者在事务中发送数据到不同的 topic 分区。
- 提交事务:调用
commitTransaction()
提交事务,Kafka 保证所有消息都可见。关键性的一部。只有在执行了commitTransaction之后,
协调器才会把所有消息更新为提交状态。 - 事务回滚(可选):如果事务失败,调用
abortTransaction()
,Kafka 保证所有已发送的消息都被丢弃。 - 消费者可见性:消费者只能看到已提交事务的消息,而未提交或回滚的消息不会对消费者可见。
Kafka保证事务的一致性,主要是分别保证生产者,消费者两端都要保证一致性。即生成端保证多个消息要不全部发送成功,要不就全部发送失败。
生产端代码:
producer.initTransactions(); // 初始化事务
try {
producer.beginTransaction(); // 开始事务
ProducerRecord<String, String> record1 = new ProducerRecord<>("topicA", "key1", "value1");
ProducerRecord<String, String> record2 = new ProducerRecord<>("topicB", "key2", "value2");
producer.send(record1);
producer.send(record2);
producer.commitTransaction(); // 提交事务
} catch (Exception e) {
producer.abortTransaction(); // 事务回滚
}
如上代码所示,只要在commitTransaction()执行成功之后,record1和record2 才被认为发送成功,即事务协调器把两条消息全部改成已提交状态,此时的消息才会被消费者看见。如果abortTransaction()执行,则两条消息都任务发送失败,消费端不回看到任何消息。
消费端代码:
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 仅读取已提交事务的数据
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// 关闭自动提交偏移量
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topicA"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// 处理消息(如写入数据库)
process(record);
// 手动提交偏移量(只有成功处理后才提交)
consumer.commitSync();
} catch (Exception e) {
// 处理失败,偏移量不会提交,Kafka 会重新投递该消息
System.err.println("处理失败,稍后重试:" + e.getMessage());
}
}
}
- 只有消息处理成功后才提交偏移量,确保不会丢失消息。
- 失败时不提交偏移量,Kafka 会重新投递消息。以此保证消息一定被消费掉。
总结
✅ 事务提交后 (commitTransaction()
),所有消息才会变成 "已提交",消费者才能读取这些消息。
✅ 如果事务回滚 (abortTransaction()
),Kafka 会丢弃这些消息,消费者不会看到它们。
✅ 消费者必须使用 read_committed
隔离级别,否则可能读取到未提交的事务数据。