kafka 参数篇

发布于:2025-03-27 ⋅ 阅读:(36) ⋅ 点赞:(0)

kafka Rebalancing 相关的参数有哪些

kafka 部分源码 (可忽略)

ConsumerConfig
public class ConsumerConfig extends AbstractConfig {
...
    /** <code>max.poll.records</code> */
    public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
    private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll()."
        + " Note, that <code>" + MAX_POLL_RECORDS_CONFIG + "</code> does not impact the underlying fetching behavior."
        + " The consumer will cache the records from each fetch request and returns them incrementally from each poll.";

    /** <code>max.poll.interval.ms</code> */
    public static final String MAX_POLL_INTERVAL_MS_CONFIG = CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
    private static final String MAX_POLL_INTERVAL_MS_DOC = CommonClientConfigs.MAX_POLL_INTERVAL_MS_DOC;
    
    public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms";
    public static final String MAX_POLL_INTERVAL_MS_DOC = "The maximum delay between invocations of poll() when using "
                                                          + "consumer group management. This places an upper bound on the amount of time that the consumer can be idle "
                                                          + "before fetching more records. If poll() is not called before expiration of this timeout, then the consumer "
                                                          + "is considered failed and the group will rebalance in order to reassign the partitions to another member. "
                                                          + "For consumers using a non-null <code>group.instance.id</code> which reach this timeout, partitions will not be immediately reassigned. "
                                                          + "Instead, the consumer will stop sending heartbeats and partitions will be reassigned "
                                                          + "after expiration of <code>session.timeout.ms</code>. This mirrors the behavior of a static consumer which has shutdown.";
    
    /**
     * <code>session.timeout.ms</code>
     */
    public static final String SESSION_TIMEOUT_MS_CONFIG = CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG;
    private static final String SESSION_TIMEOUT_MS_DOC = CommonClientConfigs.SESSION_TIMEOUT_MS_DOC;
    public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
    public static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect client failures when using "
                                                        + "Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness "
                                                        + "to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, "
                                                        + "then the broker will remove this client from the group and initiate a rebalance. Note that the value "
                                                        + "must be in the allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> "
                                                        + "and <code>group.max.session.timeout.ms</code>.";
    /**
     * <code>heartbeat.interval.ms</code>
     */
    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
    private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;

    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
    public static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer "
                                                           + "coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the "
                                                           + "consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. "
                                                           + "The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher "
                                                           + "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";

}
Heartbeat
/**
 * A helper class for managing the heartbeat to the coordinator
 */
public final class Heartbeat {
    private final int maxPollIntervalMs;
    private final GroupRebalanceConfig rebalanceConfig;
    private final Time time;
    private final Timer heartbeatTimer;
    private final Timer sessionTimer;
    private final Timer pollTimer;
    private final Logger log;
    private final ExponentialBackoff retryBackoff;

    private volatile long lastHeartbeatSend = 0L;
    private volatile boolean heartbeatInFlight = false;
    private volatile long heartbeatAttempts = 0L;

    public Heartbeat(GroupRebalanceConfig config,
                     Time time) {
        if (config.heartbeatIntervalMs >= config.sessionTimeoutMs)
            throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout");
        this.rebalanceConfig = config;
        this.time = time;
        this.heartbeatTimer = time.timer(config.heartbeatIntervalMs);
        this.sessionTimer = time.timer(config.sessionTimeoutMs);
        this.maxPollIntervalMs = config.rebalanceTimeoutMs;
        this.pollTimer = time.timer(maxPollIntervalMs);
        this.retryBackoff = new ExponentialBackoff(rebalanceConfig.retryBackoffMs,
                CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
                rebalanceConfig.retryBackoffMaxMs,
                CommonClientConfigs.RETRY_BACKOFF_JITTER);

        final LogContext logContext = new LogContext("[Heartbeat groupID=" + config.groupId + "] ");
        this.log = logContext.logger(getClass());
    }

    private void update(long now) {
        heartbeatTimer.update(now);
        sessionTimer.update(now);
        pollTimer.update(now);
    }

    public void poll(long now) {
        update(now);
        pollTimer.reset(maxPollIntervalMs);
    }

    boolean hasInflight() {
        return heartbeatInFlight;
    }

    void sentHeartbeat(long now) {
        lastHeartbeatSend = now;
        heartbeatInFlight = true;
        update(now);
        heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);

        if (log.isTraceEnabled()) {
            log.trace("Sending heartbeat request with {}ms remaining on timer", heartbeatTimer.remainingMs());
        }
    }

    void failHeartbeat() {
        update(time.milliseconds());
        heartbeatInFlight = false;
        heartbeatTimer.reset(retryBackoff.backoff(heartbeatAttempts++));

        log.trace("Heartbeat failed, reset the timer to {}ms remaining", heartbeatTimer.remainingMs());
    }

    void receiveHeartbeat() {
        update(time.milliseconds());
        heartbeatInFlight = false;
        heartbeatAttempts = 0L;
        sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
    }

    boolean shouldHeartbeat(long now) {
        update(now);
        return heartbeatTimer.isExpired();
    }
    
    long lastHeartbeatSend() {
        return this.lastHeartbeatSend;
    }

    long timeToNextHeartbeat(long now) {
        update(now);
        return heartbeatTimer.remainingMs();
    }

    boolean sessionTimeoutExpired(long now) {
        update(now);
        return sessionTimer.isExpired();
    }

    void resetTimeouts() {
        update(time.milliseconds());
        sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
        pollTimer.reset(maxPollIntervalMs);
        heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
    }

    void resetSessionTimeout() {
        update(time.milliseconds());
        sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
    }

    boolean pollTimeoutExpired(long now) {
        update(now);
        return pollTimer.isExpired();
    }

    long lastPollTime() {
        return pollTimer.currentTimeMs();
    }
}

:1. session.timeout.ms

  • 作用: 定义消费者在 Group Coordinator 中会话的超时时间。如果消费者在此时间内未发送心跳,Group Coordinator 会认为该消费者已失效,触发 Rebalancing。

源码注释

在使用Kafka消费者组管理机制时,用于检测客户端故障的会话超时时间。客户端会周期性地向Broker发送心跳信号以表明其存活状态。如果Broker在此会话超时到期前未收到心跳,则会将该客户端从消费者组中移除并触发再平衡。需要注意的是,该值必须处于Broker端配置参数group.min.session.timeout.ms(最小会话超时)和group.max.session.timeout.ms(最大会话超时)规定的允许范围内。

  • 默认值: 通常为 10 秒(10000ms:)。
  • 可能导致 Rebalancing 的原因:
    • :消费者处理任务过重,导致无法按时发送心跳。
    • :网络延迟或故障,导致心跳无法及时到达 Group Coordinator。
  • 优化建议: 根据消费者的处理能力和网络状况适当增大该值,但注意过大的值会延迟消费者失效的检测。

:2. heartbeat.interval.ms:

  • 作用: 定义消费者发送心跳的时间间隔。

源码注释:

在使用Kafka消费者组管理机制时,与消费者协调器之间预期的心跳间隔时间。心跳机制用于维持消费者会话的活跃状态,并在新消费者加入或离开组时促进再平衡过程。该参数值必须小于session.timeout.ms(会话超时时间),通常建议设置为该值的1/3以下。可通过进一步调低此值来控制常规再平衡的预期响应时间。

  • 默认值: 通常为 3 秒(3000ms:)。
  • 可能导致 Rebalancing 的原因:
    • :心跳间隔设置过长,导致消费者在: session.timeout.ms: :内未能发送足够的心跳。
    • :消费者处理任务过重或网络问题,导致心跳未能按时发送。
  • 优化建议: 确保 heartbeat.interval.ms: 的值小于 session.timeout.ms: 的三分之一,并适当调整以平衡心跳频率和网络负载。

:3. max.poll.interval.ms:

  • 作用: 定义消费者在两次调用 poll(): 方法之间的最大时间间隔。如果消费者在此时间内未调用 poll():,Group Coordinator 会认为该消费者已失效,触发 Rebalancing。 对于使用指定分组id (:group.instance.id😃 的消费者,若达到此超时阈值,分区不会立即重新分配。此时消费者将停止发送心跳信号,分区将在:session.timeout.ms:超时期满后重新分配。该行为模式与已关闭的静态消费者完全一致。
  • 默认值: 通常为 5 分钟(300000ms:)。
  • 可能导致 Rebalancing 的原因:
    • :消费者处理单条消息的时间过长,导致无法按时调用: poll():。
    • :消费者线程被阻塞或死锁。
  • 优化建议: 根据消息处理时间调整该值,或优化消费者的处理逻辑以减少单条消息的处理时间。

:4. max.poll.records

  • 作用: 定义每次调用: poll(): :方法时返回的最大消息数量。
  • 默认值: 通常为 500 条。
  • 可能导致 Rebalancing 的原因:
    • :如果: max.poll.records: :设置过大,消费者可能一次性处理过多消息,导致: poll(): :调用延迟,进而触发: max.poll.interval.ms: :超时。
  • 优化建议: 根据消费者的处理能力调整该值,避免一次性处理过多消息。

:5. group.initial.rebalance.delay.ms

  • 作用: 定义消费者组在初始加入时等待 Rebalancing 的延迟时间。在这段时间内,Group Coordinator 会等待更多消费者加入,以减少 Rebalancing 的频率。
  • 默认值: 通常为 3 秒(3000ms:)。
  • 可能导致 Rebalancing 的原因:
    • :如果该值设置过小,消费者组可能会频繁触发 Rebalancing,尤其是在消费者动态加入或退出的场景中。
  • 优化建议: 根据消费者组的动态性适当增大该值,以减少不必要的 Rebalancing。

6. partition.assignment.strategy

  • 作用: 定义消费者组中分区分配的策略(例如 Range、RoundRobin、Sticky 等)。
  • 默认值: 通常为: RangeAssignor:。
  • 可能导致 Rebalancing 的原因:
    • :如果分区分配策略不均衡,可能导致某些消费者负载过高,进而触发 Rebalancing。
  • 优化建议: 根据业务需求选择合适的分区分配策略,例如使用 StickyAssignor: 来减少 Rebalancing 的频率。

:7. 消费者动态加入或退出**

  • 原因: 消费者组的成员发生变化(例如新增或移除消费者)会触发 Rebalancing。
  • 优化建议:
    • :尽量减少消费者的动态变化。
    • :使用 group.initial.rebalance.delay.ms: 延迟 Rebalancing,等待更多消费者加入。

:8. 网络或 Broker 故障**

  • 原因: 如果 Group Coordinator 或 Broker 发生故障,可能导致消费者无法正常发送心跳或提交偏移量,进而触发 Rebalancing。
  • 优化建议:
    • :确保 Kafka 集群的高可用性。
    • :监控网络状况,避免网络分区或延迟。

:9. 消费者处理逻辑问题**

  • 原因: 如果消费者的处理逻辑中存在阻塞、死锁或长时间的任务,可能导致消费者无法按时调用: poll(): :或发送心跳,进而触发 Rebalancing。
  • 优化建议:
    • :优化消费者的处理逻辑,减少单条消息的处理时间。
    • :使用异步处理或线程池来提高处理效率。

:10. 消费者配置不一致**

  • 原因: 如果消费者组中的消费者配置不一致(例如不同的: session.timeout.ms: :或: max.poll.interval.ms:),可能导致部分消费者失效,触发 Rebalancing。
  • 优化建议: 确保消费者组中的所有消费者使用相同的配置。
:总结

:Kafka Rebalancing 可能由多种参数和场景触发,包括心跳、会话、轮询超时、消费者动态变化、网络故障等。为了减少 Rebalancing 的频率,可以优化以下方面:

  • :调整: session.timeout.ms:、heartbeat.interval.ms: :和: max.poll.interval.ms: :等参数。
  • :优化消费者的处理逻辑,减少单条消息的处理时间。
  • :确保 Kafka 集群的高可用性和网络的稳定性。
  • :使用合适的分区分配策略和消费者组配置。

:通过合理的配置和优化,可以有效减少 Kafka Rebalancing 的频率,提高系统的稳定性和性能。

kafka 生产者常用的参数有哪些

根据您的要求,我将上述关于Kafka生产者参数的信息整理成表格形式如下:

一、可靠性相关参数

参数名 默认值 作用说明 推荐值/场景
acks 1 控制生产者等待Broker确认的机制: • 0:不等待确认(高吞吐,可能丢失) • 1:等待Leader确认(均衡) • all(或-1):等待所有ISR副本确认(最高可靠性) 高可靠性场景:all + min.insync.replicas=2
retries 2147483647 消息发送失败时的重试次数(新版Kafka默认无限重试)。 建议设为3-5,避免长时间阻塞。
enable.idempotence false 启用幂等性,避免因重试导致消息重复(需acks=all和retries>0)。 严格不重复场景:true
max.in.flight.requests.per.connection 5 单个连接上未确认的请求最大数。设为1可保证消息顺序,但降低吞吐量。 幂等性场景可保持默认;严格顺序场景:1
transactional.id null 启用事务性生产者,保证跨分区的原子性提交。 分布式事务场景必填,如金融交易。

二、性能优化参数

batch.size 16384 (16KB) 批量发送的消息大小阈值。增大可提升吞吐量,但增加延迟。 高吞吐场景:32768(32KB)或更高,需测试网络和内存。
linger.ms 0 消息在缓冲区等待批量发送的时间(毫秒)。增大可提高吞吐量,但增加延迟。 高吞吐场景:5-100 ms;实时场景:0(立即发送)。
buffer.memory 33554432 (32MB) 生产者缓冲区的总内存大小。不足时send()阻塞或抛出异常。 根据消息速率调整,如高吞吐场景:64MB或更高。
compression.type none 消息压缩算法(none、gzip、snappy、lz4、zstd)。 带宽敏感场景:snappy(平衡速度/压缩率);极致压缩:zstd。
max.block.ms 60000 (1分钟) send()或partitionsFor()调用阻塞的最大时间,超时抛出异常。 高负载场景适当增大,如120000(2分钟)。

三、序列化与消息格式参数

key.serializer - 键的序列化类(如StringSerializer、ByteArraySerializer)。必填参数。
value.serializer - 值的序列化类(同上)。必填参数。
partitioner.class 默认轮询策略 自定义分区策略(如RoundRobinPartitioner、自定义类)。

四、 网络与连接参数

bootstrap.servers - Kafka集群的Broker地址列表(逗号分隔)。必填参数。 至少配置2-3个Broker,如broker1:9092,broker2:9092。
request.timeout.ms 30000 (30秒) 生产者等待Broker响应的超时时间。 网络不稳定时增大,如60000(60秒)。
connections.max.idle.ms 540000 (9分钟) 空闲连接关闭时间。减少可能影响性能,增大可复用连接。 默认值即可,频繁重连场景可增大。
reconnect.backoff.ms 50 连接失败后重试的初始等待时间(指数退避)。 默认值适用于大多数场景。
reconnect.backoff.max.ms 1000 连接重试的最大等待时间。 网络波动大时可适当增大。

五、高级参数

metadata.max.age.ms 300000 (5分钟) 刷新元数据(如分区信息)的时间间隔。
delivery.timeout.ms 120000 (2分钟) 消息从发送到完成(包括重试)的总超时时间。
client.id - 客户端标识,用于监控和日志。建议唯一标识生产者。

注意事项**

  1. 顺序性与吞吐量的权衡:
    • :若需严格消息顺序,设置: max.in.flight.requests.per.connection=1:,但会降低吞吐量。
  2. 幂等性与事务的兼容性:
    • :启用事务(transactional.id:)时,自动启用幂等性,无需单独设置: enable.idempotence:。
  3. 监控与调优:
    • :监控生产者指标:record-send-rate:、request-latency:、batch-size-avg:。
    • :根据业务需求动态调整参数(如: batch.size: :和: linger.ms:)。

:通过合理配置这些参数,可显著优化 Kafka 生产者的性能、可靠性和资源利用率。

消费者相关参数

为了更清晰地展示Kafka消费者配置参数的信息,我将按照类别对这些参数进行格式化整理。以下是根据您的描述整理后的表格形式:

一、消费者组管理

参数名 默认值 作用说明 推荐值/场景
group.id - 消费者组 ID,相同组内的消费者共享分区分配。必填参数 根据业务命名(如 order-service-group)
session.timeout.ms 10000 (10秒) 消费者与协调器的会话超时时间。超时后触发 Rebalance。 10-30秒,需满足 session.timeout.ms ≥ 3 * heartbeat.interval.ms
heartbeat.interval.ms 3000 (3秒) 消费者发送心跳的频率。心跳用于保持会话活性。 建议设为 session.timeout.ms / 3(如 3000ms)
max.poll.interval.ms 300000 (5分钟) 两次 poll() 调用的最大间隔时间。超时触发 Rebalance。 根据消息处理耗时调整,如 600000ms(10分钟)
partition.assignment.strategy RangeAssignor 分区分配策略:Range(默认)、RoundRobin、Sticky(减少 Rebalance 扰动)。 高动态场景:Sticky。

二、消息拉取与处理

参数名 默认值 作用说明 推荐值/场景
max.poll.records 500 单次 poll() 调用返回的最大消息数。 高吞吐场景:1000-5000;低延迟场景:100-500
fetch.min.bytes 1 消费者拉取请求的最小数据量(字节)。Broker 会等待足够数据后返回。 高吞吐场景:1024(1KB)或更高
fetch.max.wait.ms 500 等待 fetch.min.bytes 数据的最长时间。超时后返回现有数据。 平衡吞吐与延迟,如 500ms
fetch.max.bytes 52428800 (50MB) 单次拉取请求的最大数据量。 根据消息大小调整,避免内存溢出
max.partition.fetch.bytes 1048576 (1MB) 每个分区的最大拉取数据量。 大消息场景:5-10MB

三、偏移量提交

参数名 默认值 作用说明 推荐值/场景
enable.auto.commit true 是否自动提交偏移量。高可靠性场景建议关闭。 手动提交场景:false
auto.commit.interval.ms 5000 (5秒) 自动提交偏移量的间隔时间(仅当 enable.auto.commit=true 时生效)。 自动提交场景:1000-5000ms
auto.offset.reset latest 无有效偏移量时的行为:earliest(从最早消息开始消费)、latest(从最新消息开始消费)。 首次消费或偏移量丢失时:earliest

四、网络与连接

参数名 默认值 作用说明 推荐值/场景
bootstrap.servers - Kafka 集群的 Broker 地址列表(逗号分隔)。必填参数 至少配置 2-3 个 Broker,如 broker1:9092,broker2:9092
request.timeout.ms 30000 (30秒) 消费者等待 Broker 响应的超时时间。 网络不稳定时增大,如 60000ms(60秒)
connections.max.idle.ms 540000 (9分钟) 空闲连接关闭时间。 默认值即可,频繁重连场景可增大
reconnect.backoff.ms 50 连接失败后重试的初始等待时间(指数退避)。 默认值适用于大多数场景
reconnect.backoff.max.ms 1000 连接重试的最大等待时间。 网络波动大时可适当增大

五、反序列化与格式

参数名 默认值 作用说明
key.deserializer - 键的反序列化类(如 StringDeserializer, ByteArrayDeserializer)。必填
value.deserializer - 值的反序列化类(同上)。必填

六、安全认证

参数名 默认值 作用说明
security.protocol PLAINTEXT 安全协议:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。
ssl.keystore.location - SSL 客户端证书路径(启用 SSL 时必填)。
sasl.mechanism - SASL 认证机制(如 PLAIN, SCRAM-SHA-256)。

注意事项**

  1. 避免 Rebalance 抖动:
    • :确保: session.timeout.ms: :和: max.poll.interval.ms: :合理,避免消费者因处理超时被踢出组。
  2. 手动提交 Offset 的原子性:
    • :处理完消息后提交 Offset,避免消息丢失或重复消费。
java


try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            // 处理消息
        }
        consumer.commitSync(); // 同步提交
    }
} finally {
    consumer.close();
}
  1. 监控 Lag(未消费消息数):
bash


kafka-consumer-groups --bootstrap-server broker:9092 --describe --group my-group
  1. 处理重复消费:
    • :通过业务逻辑实现幂等性(如数据库唯一键、Redis 去重)。

:通过合理配置消费者参数,可优化消费性能、保障数据一致性,并减少 Rebalance 对系统的影响。


网站公告

今日签到

点亮在社区的每一天
去签到