publicclassConsumerConfigextendsAbstractConfig{.../** <code>max.poll.records</code> */publicstaticfinalString MAX_POLL_RECORDS_CONFIG ="max.poll.records";privatestaticfinalString MAX_POLL_RECORDS_DOC ="The maximum number of records returned in a single call to poll()."+" Note, that <code>"+ MAX_POLL_RECORDS_CONFIG +"</code> does not impact the underlying fetching behavior."+" The consumer will cache the records from each fetch request and returns them incrementally from each poll.";/** <code>max.poll.interval.ms</code> */publicstaticfinalString MAX_POLL_INTERVAL_MS_CONFIG =CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;privatestaticfinalString MAX_POLL_INTERVAL_MS_DOC =CommonClientConfigs.MAX_POLL_INTERVAL_MS_DOC;publicstaticfinalString MAX_POLL_INTERVAL_MS_CONFIG ="max.poll.interval.ms";publicstaticfinalString MAX_POLL_INTERVAL_MS_DOC ="The maximum delay between invocations of poll() when using "+"consumer group management. This places an upper bound on the amount of time that the consumer can be idle "+"before fetching more records. If poll() is not called before expiration of this timeout, then the consumer "+"is considered failed and the group will rebalance in order to reassign the partitions to another member. "+"For consumers using a non-null <code>group.instance.id</code> which reach this timeout, partitions will not be immediately reassigned. "+"Instead, the consumer will stop sending heartbeats and partitions will be reassigned "+"after expiration of <code>session.timeout.ms</code>. This mirrors the behavior of a static consumer which has shutdown.";/**
* <code>session.timeout.ms</code>
*/publicstaticfinalString SESSION_TIMEOUT_MS_CONFIG =CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG;privatestaticfinalString SESSION_TIMEOUT_MS_DOC =CommonClientConfigs.SESSION_TIMEOUT_MS_DOC;publicstaticfinalString SESSION_TIMEOUT_MS_CONFIG ="session.timeout.ms";publicstaticfinalString SESSION_TIMEOUT_MS_DOC ="The timeout used to detect client failures when using "+"Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness "+"to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, "+"then the broker will remove this client from the group and initiate a rebalance. Note that the value "+"must be in the allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> "+"and <code>group.max.session.timeout.ms</code>.";/**
* <code>heartbeat.interval.ms</code>
*/publicstaticfinalString HEARTBEAT_INTERVAL_MS_CONFIG =CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;privatestaticfinalString HEARTBEAT_INTERVAL_MS_DOC =CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;publicstaticfinalString HEARTBEAT_INTERVAL_MS_CONFIG ="heartbeat.interval.ms";publicstaticfinalString HEARTBEAT_INTERVAL_MS_DOC ="The expected time between heartbeats to the consumer "+"coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the "+"consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. "+"The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher "+"than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";}
Heartbeat
/**
* A helper class for managing the heartbeat to the coordinator
*/publicfinalclassHeartbeat{privatefinalint maxPollIntervalMs;privatefinalGroupRebalanceConfig rebalanceConfig;privatefinalTime time;privatefinalTimer heartbeatTimer;privatefinalTimer sessionTimer;privatefinalTimer pollTimer;privatefinalLogger log;privatefinalExponentialBackoff retryBackoff;privatevolatilelong lastHeartbeatSend =0L;privatevolatileboolean heartbeatInFlight =false;privatevolatilelong heartbeatAttempts =0L;publicHeartbeat(GroupRebalanceConfig config,Time time){if(config.heartbeatIntervalMs >= config.sessionTimeoutMs)thrownewIllegalArgumentException("Heartbeat must be set lower than the session timeout");this.rebalanceConfig = config;this.time = time;this.heartbeatTimer = time.timer(config.heartbeatIntervalMs);this.sessionTimer = time.timer(config.sessionTimeoutMs);this.maxPollIntervalMs = config.rebalanceTimeoutMs;this.pollTimer = time.timer(maxPollIntervalMs);this.retryBackoff =newExponentialBackoff(rebalanceConfig.retryBackoffMs,CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
rebalanceConfig.retryBackoffMaxMs,CommonClientConfigs.RETRY_BACKOFF_JITTER);finalLogContext logContext =newLogContext("[Heartbeat groupID="+ config.groupId +"] ");this.log = logContext.logger(getClass());}privatevoidupdate(long now){
heartbeatTimer.update(now);
sessionTimer.update(now);
pollTimer.update(now);}publicvoidpoll(long now){update(now);
pollTimer.reset(maxPollIntervalMs);}booleanhasInflight(){return heartbeatInFlight;}voidsentHeartbeat(long now){
lastHeartbeatSend = now;
heartbeatInFlight =true;update(now);
heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);if(log.isTraceEnabled()){
log.trace("Sending heartbeat request with {}ms remaining on timer", heartbeatTimer.remainingMs());}}voidfailHeartbeat(){update(time.milliseconds());
heartbeatInFlight =false;
heartbeatTimer.reset(retryBackoff.backoff(heartbeatAttempts++));
log.trace("Heartbeat failed, reset the timer to {}ms remaining", heartbeatTimer.remainingMs());}voidreceiveHeartbeat(){update(time.milliseconds());
heartbeatInFlight =false;
heartbeatAttempts =0L;
sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);}booleanshouldHeartbeat(long now){update(now);return heartbeatTimer.isExpired();}longlastHeartbeatSend(){returnthis.lastHeartbeatSend;}longtimeToNextHeartbeat(long now){update(now);return heartbeatTimer.remainingMs();}booleansessionTimeoutExpired(long now){update(now);return sessionTimer.isExpired();}voidresetTimeouts(){update(time.milliseconds());
sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
pollTimer.reset(maxPollIntervalMs);
heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);}voidresetSessionTimeout(){update(time.milliseconds());
sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);}booleanpollTimeoutExpired(long now){update(now);return pollTimer.isExpired();}longlastPollTime(){return pollTimer.currentTimeMs();}}
:1. session.timeout.ms
作用: 定义消费者在 Group Coordinator 中会话的超时时间。如果消费者在此时间内未发送心跳,Group Coordinator 会认为该消费者已失效,触发 Rebalancing。