#作者:张桐瑞
文章目录
前言
在 Kafka 消息队列系统中,对于 Kafka 消费者而言,监控其消费进度,即消费者滞后程度,是一项至关重要的工作。这一滞后程度通常以 “消费者 Lag” 或 “Consumer Lag” 来表示。
所谓消费者 Lag,指的是消费者当前落后于生产者的程度。具体而言,若 Kafka 生产者向某主题成功生产了一定数量的消息,而消费者当前消费的消息数量少于该数量,两者的差值即为消费者 Lag。例如,当生产者向某主题成功生产 100 万条消息,而消费者仅消费了 80 万条消息时,该消费者的 Lag 值即为 20 万条消息。
通常情况下,Lag 的计量单位为消息数量。虽然在实际业务讨论中,我们常从主题级别探讨 Lag,但 Kafka 在底层监控 Lag 时,是基于分区进行的。若要获取主题级别的 Lag 值,则需要手动汇总该主题下所有分区的 Lag 值,将其累加后得到最终的主题级 Lag 值。
消费者 Lag 是衡量消费者运行状态的关键指标。在正常运行状态下,消费者的 Lag 值应维持在较低水平,理想状态下甚至接近于 0。这意味着消费者能够及时处理生产者发送的消息,消费滞后程度极小,系统数据流转顺畅。反之,若消费者的 Lag 值较大,则表明其处理消息的速度无法跟上生产者的生产速度。随着时间推移,Lag 值可能会持续增大,进而对下游消息的处理速度产生负面影响,导致整个业务流程效率降低。
更为严重的是,当消费者处理速度过慢时,其所消费的数据可能已不在操作系统的页缓存中。此时,消费者不得不从磁盘读取数据,这将显著增加数据读取延迟,进一步拉大与生产者之间的差距,形成 “马太效应”,即 Lag 值原本较大的消费者处理速度会越来越慢,Lag 值也会愈发增大,最终可能导致消息积压严重,影响业务的正常运行。
鉴于上述情况,在实际业务场景中,运维人员和开发人员必须密切关注消费者的消费进度。一旦发现 Lag 值呈现逐步增加的趋势,应立即展开排查,准确定位问题根源,并及时采取有效措施进行处理,以避免对业务造成损失。
接下来,将详细介绍三种监控 Kafka 消费者消费进度(即消费者 Lag)的方法。
一、使用 Kafka 自带命令行工具 kafka-consumer-groups 脚本
kafka-consumer-groups 脚本是 Kafka 提供的用于操作和管理消费者相关功能的命令行工具,位于 Kafka 安装目录的 bin 子目录下。该工具不仅可以用于管理消费者组,还能够监控独立消费者(Standalone Consumer)的 Lag 值。独立消费者是指未使用消费者组机制的消费者程序,其与消费者组的区别在于,独立消费者通过调用 KafkaConsumer.assign () 方法直接消费指定分区,而消费者组则通过调用 KafkaConsumer.subscribe () 方法进行主题订阅,但两者均需配置 group.id 参数。
使用 kafka-consumer-groups 脚本监控 Lag 值的操作较为简便,通过以下命令即可查看指定消费者的 Lag 值:
$ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>
其中,<Kafka broker连接信息>为 Kafka 集群中 broker 的主机名与端口号组合,如localhost:9092;<group名称>则为消费者程序中配置的 group.id 值。
例如,当执行上述命令并指定 Kafka 集群连接信息为localhost:9092,消费者组名为testgroup时,可能得到如下输出信息:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 714285 714285 0 consumer-1 localhost test-client-1
test 1 714286 714286 0 consumer-2 localhost test-client-2
... ... ... ... ... ... ... ...
在上述输出中,各字段含义如下:
TOPIC:表示消费者订阅的主题名称。
PARTITION:代表主题的分区编号。
CURRENT-OFFSET:是指该消费者组当前在对应分区上最新消费消息的位移值。
LOG-END-OFFSET:表示对应分区当前最新生产的消息的位移值。
LAG:即消费者 Lag 值,通过LOG-END-OFFSET减去CURRENT-OFFSET计算得出,反映了消费者在该分区上落后于生产者的消息数量。
CONSUMER-ID:为消费者实例的 ID。
HOST:显示消费者连接的 Kafka broker 主机名。
CLIENT-ID:表示消费者客户端的 ID。
在实际监控中,我们重点关注LAG列的值。理想状态下,该列所有值应趋近于 0,这表明消费者能够及时消费生产者发送的消息,不存在消费滞后情况。若LAG值较大,则意味着消费者存在消费滞后问题,需要进一步排查原因。
在使用该脚本时,可能会出现以下两种特殊情况:
- 当运行脚本后,输出信息中CONSUMER-ID、HOST和CLIENT-ID列没有值,同时提示 “Consumer group ‘testgroup’ has no active members.”。这种情况是由于在运行 kafka-consumer-groups 脚本时,对应的消费者程序尚未启动,即当前消费者组没有任何活跃成员。不过,此时LAG列的值依然是有效的,可以准确反映消费者组的 Lag 情况。
- 若运行脚本后,命令未返回任何结果,这可能是因为所使用的 Kafka 版本较旧,其 kafka-consumer-groups 脚本尚不支持查询非活跃消费者组的信息。针对这一问题,可选择升级 Kafka 版本,或者采用其他监控方法来获取消费者 Lag 信息。
二、使用 Kafka Java Consumer API 编程
在许多实际应用场景中,通过命令行工具查询 Lag 的方式可能无法满足自动化监控的需求。为此,Kafka 社区提供的 Java Consumer API 提供了更为灵活的程序化监控方案。
Java Consumer API 分别提供了查询当前分区最新消息位移和消费者组最新消费消息位移的方法,利用这些方法,我们可以计算出对应的 Lag 值。以下代码展示了如何通过 Java Consumer API 监控给定消费者组的 Lag 值:
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.OffsetAndMetadata;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
public class KafkaConsumerLagMonitor {
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (AdminClient client = AdminClient.create(props)) {
ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
try {
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 处理中断异常
return Collections.emptyMap();
} catch (ExecutionException e) {
// 处理ExecutionException
return Collections.emptyMap();
} catch (TimeoutException e) {
throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
}
}
}
}
在上述代码中,关键步骤如下:
- 通过AdminClient.listConsumerGroupOffsets方法获取给定消费者组的最新消费消息的位移。
- 使用KafkaConsumer.endOffsets方法获取订阅分区的最新消息位移。
- 执行相应的减法操作,计算出 Lag 值,并将结果封装进一个Map对象中,其中Map的键为TopicPartition,值为对应的 Lag 值。
需要注意的是,此代码仅适用于 Kafka 2.0.0 及以上版本,因为在 Kafka 2.0.0 之前的版本中,不存在AdminClient.listConsumerGroupOffsets方法。在实际应用中,可将lagOf方法集成到生产环境的监控程序中,实现对消费者 Lag 的自动化监控。
三、使用 Kafka 自带的 JMX 监控指标
在实际的大规模监控场景中,通常会借助成熟的监控框架,如 Zabbix 或 prometheus来进行系统监控。上述两种监控消费者 Lag 的方法在集成到现有监控框架时存在一定困难,而 Kafka 默认提供的 JMX 监控指标则为解决这一问题提供了有效途径。
Kafka 消费者提供了一个名为kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"的 JMX 指标,其中包含多个与消费者消费状态相关的属性。与监控消费者 Lag 值密切相关的有两组属性:records-lag-max和records-lead-min。其中,records-lag-max表示在测试窗口时间内,该消费者曾经达到的最大 Lag 值;records-lead-min表示消费者最新消费消息的位移与分区当前第一条消息位移的差值中的最小值,即最小 Lead 值。
Lag 值反映了消费者落后于生产者的消息数量,而 Lead 值则从另一个角度展示了消费者与分区起始消息的距离。Lag 值与 Lead 值相互关联,Lag 值越大,Lead 值越小,反之亦然。
在监控过程中,引入 Lead 值具有重要意义。仅关注 Lag 值可能无法及时察觉潜在的严重问题。例如,当 Lag 值逐渐增大时,我们通常会意识到消费者处理速度变慢,追不上生产者的节奏,但在某些情况下,这可能被认为是可接受的。然而,当监测到 Lead 值越来越小,甚至接近 0 时,就需要高度警惕,这可能预示着消费者端即将出现消息丢失的情况。
由于 Kafka 中的消息默认会设置留存时间(通常为 1 周),若消费者程序处理速度过慢,导致其要消费的数据即将被 Kafka 删除,此时若不及时处理,将会出现消息被删除的情况。这可能引发两种不良后果:一是消费者重新从起始位置消费数据,造成数据重复处理;二是消费者从最新的消息位移处开始消费,导致之前未及时消费的消息被跳过,从而出现消息丢失的假象。这两种情况都会对业务数据的完整性和准确性产生严重影响。
因此,在实际生产环境中,同时监控 Lag 值和 Lead 值至关重要。通过 JConsole 工具可以方便地查看这些 JMX 指标。例如,当使用 JConsole 监控client-id为consumer-1的消费者时,在给定的测量周期内,若最大 Lag 值为 714202,最小 Lead 值为 83,则表明该消费者存在较大的消费滞后性,需要及时排查原因并进行优化。
此外,Kafka 消费者还在分区级别提供了额外的 JMX 指标,其名称为
kafka.consumer:type=consumer-fetch-manager-metrics,partition=“{partition}”,topic=“{topic}”,client-id=“{client-id}”。以client-id为consumer-1,主题为test,分区为0为例,通过该 JMX 指标可以获取到分区级别的 Lag 和 Lead 值,并且还包含records-lag-avg和records-lead-avg两个属性,用于计算平均 Lag 值和平均 Lead 值。在实际监控场景中,这些分区级别的平均指标能够更全面地反映消费者在各个分区上的消费情况,为精准定位问题提供有力支持。
综上所述,通过上述三种方法,即使用 Kafka 自带命令行工具、Java Consumer API 编程以及 JMX 监控指标,能够实现对 Kafka 消费者组消费进度(消费者 Lag)的有效监控。在实际应用中,可根据具体的业务需求和技术架构选择合适的监控方法,以确保 Kafka 消息队列系统的稳定运行,保障业务的正常开展。