一、集群管理
前台启动Broker
bin/kafka-server-start.sh <path>/server.properties
- 关闭方式:
Ctrl + C
- 关闭方式:
后台启动Broker
bin/kafka-server-start.sh -daemon <path>/server.properties
关闭Broker
bin/kafka-server-stop.sh
二、Topic管理
操作 | 命令 |
---|---|
创建Topic | bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 3 --replication-factor 3 --topic <topicname> |
删除Topic | bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic <topicname> |
查询Topic列表 | bin/kafka-topics.sh --bootstrap-server localhost:9092 --list |
查询Topic详情 | bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic <topicname> |
增加分区数 | bin/kafka-topics.sh --alter --bootstrap-server localhost:9092 --partitions 6 --topic <topicname> |
注意:新版本推荐使用
--bootstrap-server
替代--zookeeper
三、Consumer Groups管理
操作 | 命令 |
---|---|
查询消费组列表 | bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list |
查询消费组详情 | bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group <groupname> |
删除消费组 | bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group <groupname> |
- 重设消费位移:
# 重置到最早位移 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <groupname> --reset-offsets --to-earliest --execute # 重置到最新位移 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <groupname> --reset-offsets --to-latest --execute # 重置到指定位移 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <groupname> --reset-offsets --to-offset 2000 --execute
四、常用运维工具
生产者控制台
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic <topicname>
- 高级参数:
--compression-codec lz4
(压缩)
--request-required-acks all
(ACK机制)
--message-send-max-retries 10
(重试次数)
- 高级参数:
消费者控制台
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topicname> --group <groupname> --from-beginning
性能测试工具:
# 生产者性能测试 bin/kafka-producer-perf-test.sh --topic <topic> --num-records 1000000 --record-size 1000 # 消费者性能测试 bin/kafka-consumer-perf-test.sh --topic <topic> --messages 1000000
获取Topic消息数
# 最新位移(总消息数) bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic <topic> --time -1 # 最早位移 bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic <topic> --time -2
五、分区管理
Preferred Leader选举
bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred --topic <topic> --partition 0
分区重分配:
# 生成迁移计划 bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --generate --topics-to-move-json-file plan.json # 执行迁移 bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file reassign.json # 验证进度 bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --verify --reassignment-json-file reassign.json
六、高级运维
查看日志段元数据
bin/kafka-dump-log.sh --files 000000000000.log --print-data-log
跨集群复制(MirrorMaker)
bin/kafka-mirror-maker.sh --consumer.config consumer.conf --producer.config producer.conf --whitelist ".*"
查看__consumer_offsets
bin/kafka-console-consumer.sh --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
关键注意事项:
版本兼容性:
- ≥ Kafka 2.2.x 推荐使用
--bootstrap-server
替代--zookeeper
- 删除Topic需确保
delete.topic.enable=true
- ≥ Kafka 2.2.x 推荐使用
生产环境建议:
- 分区重分配避免高峰时段操作
- 重设位移前先停止消费者组
- MirrorMaker需监控复制延迟
常用诊断技巧:
# 检查消息积压(Lag) bin/kafka-consumer-groups.sh --describe --group <group> | awk '{print $6}' # 检查Controller状态 bin/kafka-metadata-quorum.sh --status
完整文档参考:Apache Kafka Operations