Kafka实现事务的机制

发布于:2025-03-04 ⋅ 阅读:(9) ⋅ 点赞:(0)

1. Kafka中事务的几个基本概念

Kafka 事务主要由 生产者(Producer) 来实现,核心概念包括:

  • TransactionalId:事务 ID,Kafka 用它来唯一标识一个事务。
  • Transaction Coordinator:事务协调器,负责管理事务的状态和提交过程。
  • Producer Id (PID) & Epoch:每个事务生产者会被分配一个唯一的 PIDEpoch,用于事务恢复和幂等性支持。
  • Write-Ahead Log (WAL):Kafka 事务使用内部的 事务日志(WAL) 记录事务状态,以支持事务恢复。

2. Kafka 事务的执行流程

  1. 初始化事务:生产者需要设置 transactional.id 并初始化事务。
  2. 开启事务:调用 beginTransaction() 开始事务。
  3. 发送数据:生产者在事务中发送数据到不同的 topic 分区。
  4. 提交事务:调用 commitTransaction() 提交事务,Kafka 保证所有消息都可见。关键性的一部。只有在执行了commitTransaction之后,协调器才会把所有消息更新为提交状态。
  5. 事务回滚(可选):如果事务失败,调用 abortTransaction(),Kafka 保证所有已发送的消息都被丢弃。
  6. 消费者可见性:消费者只能看到已提交事务的消息,而未提交或回滚的消息不会对消费者可见。

Kafka保证事务的一致性,主要是分别保证生产者,消费者两端都要保证一致性。即生成端保证多个消息要不全部发送成功,要不就全部发送失败。

生产端代码:

producer.initTransactions(); // 初始化事务
try {
    producer.beginTransaction(); // 开始事务

    ProducerRecord<String, String> record1 = new ProducerRecord<>("topicA", "key1", "value1");
    ProducerRecord<String, String> record2 = new ProducerRecord<>("topicB", "key2", "value2");

    producer.send(record1);
    producer.send(record2);

    producer.commitTransaction(); // 提交事务
} catch (Exception e) {
    producer.abortTransaction(); // 事务回滚
}

如上代码所示,只要在commitTransaction()执行成功之后,record1和record2 才被认为发送成功,即事务协调器把两条消息全部改成已提交状态,此时的消息才会被消费者看见。如果abortTransaction()执行,则两条消息都任务发送失败,消费端不回看到任何消息。

消费端代码:

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

// 仅读取已提交事务的数据
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

// 关闭自动提交偏移量
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topicA"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        try {
            // 处理消息(如写入数据库)
            process(record);

            // 手动提交偏移量(只有成功处理后才提交)
            consumer.commitSync();
        } catch (Exception e) {
            // 处理失败,偏移量不会提交,Kafka 会重新投递该消息
            System.err.println("处理失败,稍后重试:" + e.getMessage());
        }
    }
}
  • 只有消息处理成功后才提交偏移量,确保不会丢失消息。
  • 失败时不提交偏移量,Kafka 会重新投递消息。以此保证消息一定被消费掉。

总结

事务提交后 (commitTransaction()),所有消息才会变成 "已提交",消费者才能读取这些消息
如果事务回滚 (abortTransaction()),Kafka 会丢弃这些消息,消费者不会看到它们
消费者必须使用 read_committed 隔离级别,否则可能读取到未提交的事务数据。


网站公告

今日签到

点亮在社区的每一天
去签到