查看kafka服务版本
[root@localhost eicar]# kafka-server-start.sh --version
[2025-06-23 11:10:54,106] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
3.3.2 (Commit:b66af662e61082cb)
[root@localhost eicar]#
查看消费者组信息
[root@localhost eicar]# kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
python-consumer-group
[root@localhost eicar]#
查看指定消费者组详细信息
一个组可以有多个消费者。
主题中的消息只能被同一个组中的一个消费者消费。
一个主题可以被多个消费者组消费。
[root@localhost opt]# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group python-consumer-group --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
python-consumer-group test-topic 0 39 39 0 rdkafka-76d00c65-de7c-4b57-9213-7af52aa7cdd8 /172.16.1.108 rdkafka
偏移量就是消费者消费到哪个消息了,kafka集群有记录的该信息。
CURRENT-OFFSET: 消费者当前消费到的偏移量
LOG-END-OFFSET: 分区最新消息的偏移量(该第39了)
LAG: 消费滞后量(未消费的消息数)
创建topic
kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
列出所有topic
[root@localhost eicar]# kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
test-topic
[root@localhost eicar]#
启动kafka生产者
kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
启动kafka消费者
kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server localhost:9092
python kafka消费者实现过程:
- 初始化消费者服务
- 订阅主题
- 检测消息
- 解析消息