分布式微服务系统架构第170集:Kafka消费者并发-多节点消费-可扩展性

发布于:2025-09-09 ⋅ 阅读:(22) ⋅ 点赞:(0)

加群联系作者vx:xiaoda0423

仓库地址:https://webvueblog.github.io/JavaPlusDoc/

https://1024bat.cn/

https://github.com/webVueBlog/fastapi_plus

https://webvueblog.github.io/JavaPlusDoc/

点击勘误issues,哪吒感谢大家的阅读

发送流程(Producer → Broker)

  1. 应用构造消息:设置 topic、key(可选,决定有序性)、value、headers、timestamp

  2. 序列化key.serializer / value.serializer 把对象转为字节。

  3. 分区选择

  • 有 key:按 key 的 hash 选定分区(同 key 固定到同一分区,分区内有序)。

  • 无 key:采用“粘性分区”(Sticky),尽量把一段时间的消息打到同一分区以增大批量。

  • 批量聚合:进入 RecordAccumulator(按分区分别聚合);达到 batch.size 或等待到 linger.ms 即形成一个批次。可选 压缩(gzip/lz4/zstd/snappy)。

  • Sender 发送:后台 Sender 线程读取批次,按 元数据 找到该分区的 Leader Broker,发出 ProduceRequest

  • 幂等与重试:网络/瞬时错误时按 retries 与退避重试;开启 enable.idempotence 可避免重复写入导致的乱序/重复。

  • Broker 追加(Leader):顺序写入日志(先入页缓存→追加到 segment,更新索引),并向 ISR 中的 Follower 复制。

  • 确认策略(acks) :

    • acks=0:不等确认直接返回(最快,可靠性最低)。

    • acks=1:Leader 追加成功即回 ACK。

    • acks=all:ISR 都追到 高水位 HW 再回 ACK(最安全)。

  • 回调完成:Producer 收到 RecordMetadata(topic, partition, offset, timestamp) 或异常,触发回调。

  • (可选)事务 EOSinitTransactions → beginTransaction → send(...) → sendOffsetsToTransaction(...) → commitTransaction,实现“生产 + 提交位点”同事务;消费者需 isolation.level=read_committed

  • Broker 内部(极简文字版)

    • 日志:按 segment 切分与保留策略(大小/时间);

    • 复制:Controller 维护 ISR;Follower 落后会被踢出 ISR;Leader 变更时短暂漂移;

    • 高水位 HW:表示所有 ISR 都已复制到的最大位点,用于 acks=all 与 read_committed 的可见性。

    消费流程(Consumer Group)

    1. 入组:消费者用 group.id 找到 CoordinatorJoinGroup 汇报订阅,分配器(建议 CooperativeStickyAssignor)做分区分配,SyncGroup 下发结果。可配 静态成员(group.instance.id) 减少抖动。

    2. poll 循环取数:消费者对各自分配到的分区发 FetchRequest;Broker 返回成批的消息数据(可能为压缩块)。

    3. 解压/反序列化:客户端按批解压并用 deserializer 还原对象。

    4. 业务处理:把记录交给应用逻辑/线程池处理;若需要分区内严格有序,应“同分区单通道”处理。

    5. 回压控制:业务慢时提高 max.poll.interval.ms、降低 max.poll.records,或临时对分区 pause(),处理完再 resume()

    6. 提交位点(offset) :

    • 自动:enable.auto.commit=true 周期性提交(简单但控制弱)。

    • 手动:处理成功后 commitSync/commitAsync;位点写入 __consumer_offsets(压缩主题)。

    • 事务链路:使用 sendOffsetsToTransaction(...) 与生产同事务提交,消费端设置 read_committed

  • 再均衡:成员增减/订阅变化触发再均衡;容器先 onPartitionsRevoked 做最后提交/落库,再 onPartitionsAssigned 接手新分区继续处理。

  • 失败/重试/DLT(死信)文字路径

    1. 业务异常 → 错误处理器判断是否可重试

    2. 进程内退避重试 N 次(固定或指数退避)。

    3. 仍失败 → 投递到 DLT(<原Topic>.DLT) ,同时把 original-topic/partition/offset/exception 等信息放在 headers

    4. 监控与排查:DLT 上统计异常类型与堆积量。

    5. 回放:修复后从 DLT 读出 →(可修复字段)→ 用原 key 重投回主 Topic 或“回放专用 Topic”,下游消费者幂等处理。

    一图看懂(端到端路径)

    App(Producer)
        |
        | 1. send(record) → 序列化(Serializer) → 分区器(Partitioner)
        v
    RecordAccumulator(批次聚合/压缩) --linger.ms/batch.size-->
        |
        | 2. Sender线程取批次 → 查元数据(metadata) → 选择Leader
        v
    Broker Leader 分区
        |
        | 3. 追加顺序写入(页缓存→日志段segment/索引) → 复制给Follower
        |    • acks=0:不等
        |    • acks=1:Leader写入后回
        |    • acks=all:ISR同步到HW后回
        v
    Producer 回调(onCompletion) ←——————— (返回ack或异常)
                                       ↑
    Consumer Group
        |                                |
        | 4. 加入组(Find/Join/SyncGroup) | 5. Fetch循环:拉批次、解压/反序列化、业务处理
        |    分配分区(Assignor)          |    → 提交位点(Commit offset)
        v                                |
    __consumer_offsets(压缩主题)  <——— 提交offset(auto/手动/事务内)

    发送路径(Producer)—关键步骤

    1. 应用→客户端
      producer.send(record, callback) 把 POJO/JSON 交给 Serializer(如 StringSerializerJsonSerializer)转字节。

    2. 分区选择

    • 有 key:StickyPartitioner/默认分区器按 key hash → 固定分区(保证同 key 有序)。

    • 无 key:粘性分区策略减少批次碎片(提高吞吐)。

    1. 累加与打批(RecordAccumulator)

    • 按分区维护批次:满足 batch.size 或到达 linger.ms 即可发送。

    • 可开启压缩:compression.type=gzip|lz4|zstd|snappy(强烈建议开)。

    1. Sender 线程发送

    • 取批次 → 查元数据(主题/分区→Leader broker) → 以 ProduceRequest 发给 Leader。

    • 重试:网络/瞬时错误自动重试(retries/retry.backoff.ms),要配合

      • enable.idempotence=true(默认开启)

      • max.in.flight.requests.per.connection<=5(或=1以严格顺序)
        以避免重复/乱序

    1. Broker Leader 处理

    • 先写 OS Page Cache → 顺序追加到 日志段(segment) ,更新索引;

    • 复制给 Follower:达到 HW(高水位) 后,按 acks 规则回 ACK:

      • acks=0:不等复制;

      • acks=1:Leader 落盘即回;

      • acks=all:ISR 全部跟上 HW 再回(最安全)。

    1. 回调完成

    • 成功:返回 RecordMetadata(topic, partition, offset, timestamp)

    • 失败:返回异常(可区分可重试/不可重试)。

    事务/Exactly-Once:设置 transactional.idinitTransactions() → beginTransaction() → send(...) → (可选)sendOffsetsToTransaction(...) → commitTransaction();消费方用 isolation.level=read_committed

    发送端关键参数(高频实战)

    • 可靠性:acks=allretries 大,enable.idempotence=truedelivery.timeout.ms 合理放大

    • 吞吐:linger.ms(如 550ms)、batch.size(如 64KB256KB)、compression.type=zstd/snappy

    • 顺序:max.in.flight.requests.per.connection=1~5(幂等+小并发)


    Broker 内部简述

    • 日志log.segment.bytes/segment.ms 控制切段;log.retention.* 控制保留;cleanup.policy=delete|compact

    • 复制:控制器监控 ISR;Follower 落后出 ISR;Leader 变更时可能触发短暂不可用

    • 时间戳CreateTime(生产时)与 LogAppendTime(追加时),受 log.message.timestamp.type 影响。


    消费路径(Consumer)—关键步骤

    1. 加入消费组

    • findCoordinator 找到组协调者 → JoinGroup(汇报订阅的主题/分区) → 分配器(如 CooperativeStickyAssignor)分配分区 → SyncGroup 下发结果。

    • 稳定性增强

      • 协作式再均衡(减少抖动);

      • 静态成员group.instance.id 绑定实例身份,缩短扩缩容抖动;

      • 心跳:heartbeat.interval.ms,会话:session.timeout.ms

    1. 拉取数据(poll 循环)

    • fetch.min.bytes + fetch.max.wait.ms 控制“凑批”与等待;

    • max.partition.fetch.bytes 控制单分区单次最大字节;

    • 收到数据后解压、反序列化,交给业务线程。

    1. 业务处理与回压

    • max.poll.records 控制每次最多拉多少;

    • 处理慢要提高 max.poll.interval.ms,或用 pause()/resume() 对分区回压;

    • 严格有序时,可“每分区一个工作队列”。

    1. 提交位点(offset commit)

    • 自动enable.auto.commit=true(简单,但易丢/重复);

    • 手动:处理成功后 commitSync/commitAsync

    • 位点存储在 __consumer_offsets压缩主题);

    • EOS 链路:用事务 sendOffsetsToTransaction(...) 与下游写入同事务提交,实现精确一次对外可见。

    消费端关键参数(高频实战)

    • 并发:同组实例数 ≤ 分区数;Spring:@KafkaListener(concurrency="N")

    • 提交:enable.auto.commit=false + 手动 ack

    • 回压:max.poll.recordsmax.poll.interval.mspause/resume

    • 再均衡:partition.assignment.strategy=CooperativeStickyAssignorgroup.instance.id


    失败链与 DLT 所在位置(串上前文)

    • 业务抛异常 → 错误处理器决定退避重试转发 DLT<topic>.DLT);

    • DLT 携带 original-topic/partition/offset/exception 等 headers,后续人工或自动回放

    • 高吞吐/长时间重试:用重试主题分离重试,避免阻塞主消费。


    最小可用模板

    Producer(Spring Kafka)

    spring:
      kafka:
        bootstrap-servers: broker1:9092,broker2:9092
        producer:
          acks: all
          retries: 10
          properties:
            enable.idempotence: true
            max.in.flight.requests.per.connection: 5
          linger-ms: 20
          batch-size: 131072      # 128KB
          compression-type: zstd
    @Autowired KafkaTemplate<String, String> kt;
    public void sendOrder(String key, String json) {
      kt.send("order.events", key, json)
        .whenComplete((md, ex) -> {
          if (ex != null) log.error("send fail", ex);
          else log.info("ok p={} off={}", md.partition(), md.offset());
        });
    }

    Consumer(Spring Kafka,手动 ack + DLT)

    spring:
      kafka:
        consumer:
          group-id: order-consumer
          enable-auto-commit: false
          auto-offset-reset: latest
          properties:
            partition.assignment.strategy: org.apache.kafka.clients.consumer.CooperativeStickyAssignor
            # 静态成员(K8s可用Pod名等)
            group.instance.id: ${HOSTNAME:}
        listener:
          ack-mode: MANUAL
    @Configuration
    @EnableKafka
    class Cfg {
      @Bean
      ConcurrentKafkaListenerContainerFactory<String,String> factory(
          ConsumerFactory<String,String> cf, KafkaTemplate<String,String> kt) {
        var f = new ConcurrentKafkaListenerContainerFactory<String,String>();
        f.setConsumerFactory(cf);
        f.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    
        var recoverer = new DeadLetterPublishingRecoverer(
            kt, (r,e) -> new TopicPartition(r.topic()+".DLT", r.partition()));
        var backoff = new FixedBackOff(1000, 3); // 1s×3次
        f.setCommonErrorHandler(new DefaultErrorHandler(recoverer, backoff));
        return f;
      }
    }
    
    @Component
    class OrderListener {
      @KafkaListener(topics="order.events", concurrency="6")
      public void on(ConsumerRecord<String,String> r, Acknowledgment ack) {
        process(r.key(), r.value()); // 幂等
        ack.acknowledge();
      }
    }

    原生消费者(要点)

    • 一个线程一个 Consumer

    • poll() → 业务线程池 → 按分区维护已完成位点 → commitAsync/Sync

    • 需要严格分区内有序就“每分区专属线程/队列”。

    1)DLT 是什么,为什么要有?

    • 定义:当一条消息在“正常消费流程”中多次处理失败(比如 JSON 解析错误、字段缺失、业务幂等校验不过、下游服务永久不可用等),为了不阻塞同分区后续消息、方便隔离排查后续人工/自动回放,把这条“问题消息”单独发到一个备用主题,这个主题就叫 DLT(常见命名:<原主题>.DLT)。

    • 作用

    1. 防止所谓 poison pill(毒丸消息) 卡死分区;

    2. 和“重试主题/延迟重试”搭配,形成完整的失败处置链

    3. 提供可观测(统计失败原因)、可回放(修复后重新处理)的安全网。


    2)一条消息从“正常消费”走到 DLT 的全过程

    用一个按键下单的例子串起来(order.events → 消费者 → DLT):

    [Producer]  -->  [order.events 主题]
                          |
                          v
                  [Consumer(同组,多实例)]
                          |
                      业务处理
           成功 --------------------> ack 提交位点
           失败
            |
            v
        错误处理器(ErrorHandler)
         |      \
      可重试?   不可重试(如格式错/幂等冲突)
       |                 \
       |(退避重试N次)    直接投 DLT
       v                   \
     重试仍失败 --------------> [order.events.DLT]
                                    |
                               人工/自动回放

    落地细节(以 Spring Kafka 为例):

    • 每次失败时错误处理器会决定:再试一次(可能做固定/指数退避),还是转发到 DLT

    • 转发时会把原始元数据(原 topic、partition、offset、timestamp、异常类型与消息)放到 headers,用于排查与回放定位。

    • DLT 的分区选择通常沿用原分区或沿用消息 key 的 hash(可配置),这样回放时仍能大体保持键内顺序


    3)三种常见的失败处置策略

    A. “失败即入 DLT”

    • 适合明显不可重试的错误:JSON 结构错误、必填字段缺失等。

    • 优点:实现最简单,不拖累主消费。

    • 缺点:对瞬时故障没有容错。

    B. “进程内重试 N 次 + DLT”(最常见)

    • 消费者线程内捕获异常,退避重试 3~5 次,再仍失败就投 DLT。

    • 优点:不需要额外主题;对偶发故障友好。

    • 缺点:重试期间占用分区,并发受限;长重试会延迟后续消息。

    C. “分离重试主题(延迟队列)+ DLT”(推荐在高吞吐/长重试场景)

    • 失败后发到重试主题(如 order.events.RETRY.5sRETRY.30s),由独立消费组在指定延时后再处理;多级失败后再进 DLT

    • 优点:不阻塞主消费;灵活控制多级退避。

    • 缺点:需要多建主题与路由配置。


    4)Spring Kafka 实战

    4.1 “进程内重试 + DLT”(DefaultErrorHandler + DeadLetterPublishingRecoverer

    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
    
      @Bean
      public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
          ConsumerFactory<String, String> cf, KafkaTemplate<String, String> kt) {
    
        var f = new ConcurrentKafkaListenerContainerFactory<String, String>();
        f.setConsumerFactory(cf);
        f.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    
        // 失败后转发到 <topic>.DLT,同分区
        var recoverer = new DeadLetterPublishingRecoverer(
            kt, (rec, ex) -> new TopicPartition(rec.topic() + ".DLT", rec.partition()));
    
        // 固定退避:每次间隔1s,共重试3次(失败再进 DLT)
        var backoff = new FixedBackOff(1000L, 3L);
    
        f.setCommonErrorHandler(new DefaultErrorHandler(recoverer, backoff));
        return f;
      }
    }

    运行时,进入 DLT 的记录会带上 headers:原 topic/partition/offset/timestamp、异常类名、异常消息等,方便排查。

    监听器示例:

    @KafkaListener(topics = "order.events", groupId = "order-consumer", concurrency = "6")
    public void onMessage(ConsumerRecord<String, String> r, Acknowledgment ack) {
      try {
        process(r.key(), r.value()); // 业务幂等
        ack.acknowledge();
      } catch (TransientException e) {
        throw e; // 让默认错误处理器重试
      } catch (Exception fatal) {
        // 明确不可重试的异常也可直接抛出,快速进入 DLT
        throw fatal;
      }
    }

    4.2 “分离重试主题 + DLT”(@RetryableTopic 配置式)

    @Configuration
    @EnableKafka
    public class RetryTopicConfig {
    
      @Bean
      public RetryTopicConfiguration retryableTopics(KafkaTemplate<String, String> kt) {
        return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1000, 2.0, 30000) // 1s -> 2s -> 4s ... 最大30s
            .maxAttempts(5)                        // 共5次(含初次),之后进 DLT
            .doNotAutoCreateRetryTopics()          // 也可让它自动创建
            .create(kafkaTemplate());
      }
    
      @Bean
      public KafkaTemplate<String, String> kafkaTemplate() { return kt; }
    }

    然后:

    @RetryableTopic(attempts = "5", backoff = @Backoff(delay = 1000, multiplier = 2.0, maxDelay = 30000))
    @KafkaListener(topics = "order.events", groupId = "order-consumer")
    public void onMessage(ConsumerRecord<String, String> r) {
      process(r.key(), r.value());
    }
    
    @DltHandler // 监听 DLT(可做告警/落库/标记)
    public void dlt(ConsumerRecord<String, String> r) {
      // 这里能拿到 headers:原始位点与异常信息
    }

    5)主题与参数规划建议

    • 命名规范

      • 主题:order.events

      • DLT:order.events.DLT

      • 重试:order.events.RETRY.5s / 30s / 5m(或框架自动生成)

    • 分区数:DLT 一般与主主题同分区数,方便回放保持 key/分区相对一致。

    • 副本因子:与生产主题一致(例如 3)。

    • 保留策略:DLT 通常较长保留(7~30 天),不开启 compaction(需要保留每条失败记录)。

    • 消息键:仍使用业务 key(如 orderId)→ DLT 内同 key 聚集,便于排查与重放。

    • 监控:对 DLT 的 消息堆积、增长速率、异常类型分布报警;DLT 消费失败量也要监控。

    创建示例(命令行):

    kafka-topics.sh --create --topic order.events.DLT \
      --partitions 12 --replication-factor 3 \
      --config retention.ms=1209600000   # 14 天

    6)Headers 里一般会带什么(用于排查/回放)

    不同框架键名略有差异,核心信息都在:

    • 原始定位:original-topic / original-partition / original-offset / original-timestamp

    • 异常信息:exception-class / exception-message / stacktrace

    • 可能还会带:原消息的 key/headers 透传

    用途:在 DLT 消费或回放程序里精确定位原消息位置与失败原因,生成报表/告警,或选择性回放


    7)如何回放 DLT(replay)

    三种常见做法:

    1. 人工一次性回放:写个小工具(或开关型消费者),从 DLT 拉取 → 修复/补齐字段 → 再投回原主题(或“专用回放主题”)。

    2. 规则引擎自动回放:比如遇到“下游 5xx”类异常,半小时后自动重投一次。

    3. 离线导出 + 批修复:把 DLT 导出到仓库(ClickHouse/ES/S3),数据修复后按批重投。

    最简单的回放代码(示意):

    @KafkaListener(topics = "order.events.DLT", groupId = "dlt-replayer")
    public void onDeadLetter(ConsumerRecord<String, String> r) {
      var repaired = tryFix(r.value());          // 修复/补齐/做兼容
      if (repaired != null) {
        kafkaTemplate.send("order.events", r.key(), repaired); // 重投
      } else {
        // 仍无法修复:落库、告警、人工介入
      }
    }

    注意

    • 回放时尽量沿用原 key,保持与主消费的一致分区路由;

    • 要有幂等,避免二次入库产生重复。


    8)常见坑 & 最佳实践

    • 无限重试:坚决避免。要么限定次数,要么走重试主题

    • 阻塞分区:进程内重试时间太长会阻塞后续消息;高吞吐/长重试请用 重试主题

    • 丢元数据:投 DLT 时一定带上原位点与异常信息(headers),否则后续很难排查。

    • 无监控:DLT 不监控就等于黑洞;至少对“增长速率、积压量、异常类型 TOP N”报警。

    • 无回放通道:上线前就准备好回放工具或 DLT 消费器,别等 DLT 积了一堆才补。

    • 顺序问题:跨重试/回放无法保证全局顺序,只能保证分区内顺序;需要强顺序时,务必按业务 key 分区限制并发

    一、核心原则(先记住这几条)

    • 并发的硬上限 = 分区数(partitions) :同一消费组内,一个分区同一时刻只会被一个消费者实例的一个线程消费。要进一步扩并发,优先加分区或加实例(同组)。

    • 多实例=多节点扩容:多个进程/容器用同一个 group.id 就会分摊分区;缩容/扩容都会触发再均衡(rebalance)。

    • 顺序只在“分区内”保证:如果你需要按业务键有序,务必设置消息 key,让同 key 进同一分区。

    • 至少一次交付(At-Least-Once)+ 幂等处理是最常见组合;严格 EOS(Exactly-Once)只在“消费→处理→再写回 Kafka/DB”链路必须做到时再考虑。

    • 再均衡优化:启用 CooperativeStickyAssignor + Static Membership,能大幅降低扩缩容时的抖动。


    二、并发模型对比与选型

    1. 多进程/多节点并发(推荐)

    • 多副本服务实例(K8s 部署 N 副本),同一 group.id

    • 优点:简单、弹性、隔离好;扩容=加副本。

    • 注意:实例数不要超过分区数(超过会有实例空闲)。

    1. 单进程内多线程

    • Spring Kafka:@KafkaListener(concurrency="N");原生客户端:一个线程一个 Consumer(Consumer 不是线程安全的)。

    • 优点:资源利用高;缺点:和进程级扩容比,故障域更大。

    1. 单 Consumer + 工作线程池(work-pool)

    • 一个 Consumer 线程 poll(),把消息投给业务线程池处理;用 pause()/resume() 做回压;处理完成后再按分区顺序提交位点。

    • 优点:能控制业务处理并行度且保序;缺点:实现复杂。


    三、容量规划与分区数估算

    • 粗略估算:
      所需分区数 ≈ 峰值QPS × 单条处理耗时(秒) ÷ 单分区极限吞吐
      举例:峰值 5,000 msg/s,每条处理 20 ms(0.02s),单分区能稳态 500 msg/s,
      → 分区数 ≈ 5000×0.02 ÷ 500 = 0.2 ≈ 至少 1;考虑抖动与冗余,建议 6~12 分区起步,方便水平扩展。

    • 在线增加分区是可行的(会改变 key→partition 映射,跨分区无序不受影响;分区内仍保序)。


    四、偏移量与投递语义

    • 自动提交(enable.auto.commit=true) :吞吐高但丢失控制弱,不推荐生产。

    • 手动同步提交:处理完再 commitSync,典型 At-Least-Once

    • 手动异步提交commitAsync 吞吐更好,但失败需回调补偿。

    • Exactly-Once(严格) :

      • Kafka→Kafka:启用事务transactional.id,生产端 enable.idempotence=true),消费端用 read_committed,Spring Kafka 用 KafkaTransactionManager

      • Kafka→DB:要么用幂等写(唯一键/去重表),要么引入本地事务外加 outbox


    五、回压与再均衡稳定性

    • 回压:当业务处理跟不上 poll(),提升 max.poll.interval.ms;或 pause(partitions) 等处理完再 resume;或降低 max.poll.records

    • 再均衡抖动优化

      • partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor(协作式、增量再均衡)。

      • 静态成员:为每个实例设置稳定的 group.instance.id,缩短扩缩容抖动。

      • 合理设置 session.timeout.ms / heartbeat.interval.ms,减少误判掉线。


    六、Spring Kafka 实践模板

    1) application.yml(关键参数)

    spring:
      kafka:
        bootstrap-servers: PLAINTEXT://kafka-1:9092,kafka-2:9092,kafka-3:9092
        consumer:
          group-id: order-consumer
          enable-auto-commit: false
          auto-offset-reset: latest
          # 拉取与处理节奏
          max-poll-records: 500
          max-poll-interval: 300000  # 5min,业务慢时适当调大
          # 再均衡与稳定性
          properties:
            partition.assignment.strategy: org.apache.kafka.clients.consumer.CooperativeStickyAssignor
            group.instance.id: ${HOSTNAME:}${random.uuid}   # K8s: 用 Pod 名或节点唯一ID,确保静态成员
            # 拉取批量与等待
            fetch.min.bytes: 1048576       # 1MB
            fetch.max.wait.ms: 500
        listener:
          ack-mode: MANUAL                 # 手动确认
          type: single                     # single/ batch

    2) Listener 与并发

    @Component
    public class OrderConsumer {
    
        // 并发度=容器线程数(不要超过分区总数)
        @KafkaListener(topics = "order.events", concurrency = "6", containerFactory = "kafkaListenerContainerFactory")
        public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
            try {
                // 业务处理(注意幂等)
                process(record.key(), record.value());
                ack.acknowledge(); // 成功再提交偏移量
            } catch (TransientException e) {
                throw e; // 交给错误处理器重试/投 DLT
            } catch (Exception fatal) {
                // 可记录并直接投 DLT(避免阻塞)
                throw fatal;
            }
        }
    
        private void process(String key, String json) { /* ... */ }
    }

    3) 容器工厂 + 错误处理 + DLT

    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
                ConsumerFactory<String, String> cf,
                KafkaTemplate<String, String> kafkaTemplate) {
    
            var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
            factory.setConsumerFactory(cf);
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    
            // 重试 & 死信:3 次间隔退避,失败投到 *.DLT
            var recoverer = new DeadLetterPublishingRecoverer(
                    kafkaTemplate,
                    (r, e) -> new TopicPartition(r.topic() + ".DLT", r.partition())
            );
            var backoff = new FixedBackOff(1000L, 3L);
            factory.setCommonErrorHandler(new DefaultErrorHandler(recoverer, backoff));
    
            // 可选:批量监听时启用
            // factory.setBatchListener(true);
    
            return factory;
        }
    }

    4) 事务(Kafka→Kafka 严格 EOS 可选)

    spring:
      kafka:
        producer:
          properties:
            enable.idempotence: true
          transaction-id-prefix: tx-order-
    @Bean
    public KafkaTransactionManager<String, String> kafkaTxManager(ProducerFactory<String, String> pf) {
        return new KafkaTransactionManager<>(pf);
    }

    然后在业务里使用 @Transactional("kafkaTxManager") 包住“消费→处理→发送到下游主题”的逻辑。


    七、原生 Java 消费者:单 Consumer + 线程池回压示例(保序提交)

    public class WorkerPoolConsumer implements Runnable {
        private final KafkaConsumer<String, String> consumer;
        private final ExecutorService pool = new ThreadPoolExecutor(
                8, 8, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200)); // 有界队列做回压
        private final Map<TopicPartition, OffsetAndMetadata> commitMap = new ConcurrentHashMap<>();
    
        public WorkerPoolConsumer(Properties props, String topic) {
            this.consumer = new KafkaConsumer<>(props);
            consumer.subscribe(List.of(topic));
        }
    
        @Override public void run() {
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                    if (poolIsBusy()) {
                        consumer.pause(consumer.assignment()); // 回压:暂停拉取
                        continue;
                    } else {
                        consumer.resume(consumer.assignment());
                    }
    
                    for (ConsumerRecord<String, String> r : records) {
                        pool.submit(() -> {
                            try {
                                process(r.key(), r.value()); // 业务幂等
                                // 记录待提交 offset(分区内最大已完成位点)
                                commitMap.compute(new TopicPartition(r.topic(), r.partition()),
                                        (tp, old) -> newer(old, new OffsetAndMetadata(r.offset() + 1)));
                            } catch (Exception ex) {
                                // 记录并投递 DLT(略)
                            }
                        });
                    }
    
                    // 提交已完成的位点(异步)
                    if (!commitMap.isEmpty()) {
                        consumer.commitAsync(new HashMap<>(commitMap), (m, e) -> { /* 回调日志 */ });
                    }
                }
            } finally {
                try { consumer.commitSync(commitMap); } catch (Exception ignore) {}
                consumer.close();
                pool.shutdown();
            }
        }
    
        private boolean poolIsBusy() {
            ThreadPoolExecutor tpe = (ThreadPoolExecutor) pool;
            return tpe.getQueue().remainingCapacity() < 50; // 阈值可调
        }
        private OffsetAndMetadata newer(OffsetAndMetadata a, OffsetAndMetadata b) {
            return (a == null || b.offset() > a.offset()) ? b : a;
        }
        private void process(String key, String value) {/* ... */}
    }

    注意:以上模式在分区内可能乱序提交必须谨慎。若严格分区内保序,可为每个分区维护单独的有序工作队列或使用“每分区一个消费线程”。


    八、扩缩容与运维要点

    • 扩容步骤

    1. 看 分区数 是否已成为瓶颈(消费者副本数 ≤ 分区数);

    2. 增加副本(同组)→ 观察再均衡耗时与 lag;

    3. 仍不够则增分区(注意热点 key),或做批处理/并行度优化

    4. 栈调参数:max.poll.recordsfetch.min.bytesfetch.max.wait.msmax.partition.fetch.bytes

  • 监控(Prometheus + kafka_exporter):

    • 消费 lag、rebalance 次数、处理耗时分布、失败率、DLT 量、max.poll.interval.ms 触发次数。

  • K8s 自动扩缩:可依据 lag 或处理时延设置 HPA(如将 lag 转成自定义指标)。


网站公告

今日签到

点亮在社区的每一天
去签到