Kafka——常见工具脚本大汇总

发布于:2025-08-04 ⋅ 阅读:(15) ⋅ 点赞:(0)

引言

在分布式消息系统领域,Kafka 凭借其高吞吐量、可扩展性和消息持久化特性成为首选。其丰富的命令行工具(Shell 脚本)是实现高效运维和灵活开发的基石。与传统消息中间件(如 RabbitMQ)相比,Kafka 脚本不仅覆盖了基础的消息生产消费,更深入到集群管理、性能测试、数据修复等核心场景,形成了完整的工具链体系。

Kafka 脚本的演进与设计哲学

Kafka 脚本的发展经历了从早期功能分散到模块化整合的过程。在 0.8 版本之前,脚本主要依赖 ZooKeeper 进行元数据管理;自 0.9 版本引入 __consumer_offsets 主题后,脚本功能逐渐集中到 Broker 端,形成了独立的运维体系。其设计哲学体现在以下三点:

  1. 命令与功能的高内聚:每个脚本专注解决一类问题,如 kafka-topics.sh 负责主题全生命周期管理,kafka-consumer-groups.sh 聚焦消费者组位移控制。

  2. 参数的标准化与兼容性:通过 --bootstrap-server 统一连接配置,替代旧版 --zookeeper,避免绕过安全认证。

  3. 操作的幂等性与可追溯性:大部分脚本支持 --dry-run 预检查功能,降低误操作风险。

脚本分类与应用场景矩阵

根据功能定位,Kafka 脚本可分为六大类,覆盖从开发测试到生产运维的全流程:

分类 核心脚本 典型应用场景
基础工具 kafka-console-producer.sh kafka-console-consumer.sh 快速验证消息收发、调试消费逻辑
集群管理 kafka-topics.sh kafka-reassign-partitions.sh 主题创建/删除、分区副本迁移、负载均衡调整
性能测试 kafka-producer-perf-test.sh kafka-consumer-perf-test.sh 评估生产者/消费者吞吐量、延迟指标,优化集群配置
数据运维 kafka-dump-log.sh kafka-delete-records.sh 查看消息文件内容、删除指定偏移量消息、修复数据损坏
安全管理 kafka-acls.sh kafka-delegation-tokens.sh 配置访问控制列表、管理轻量级认证令牌
高级工具 kafka-mirror-maker.sh kafka-streams-application-reset.sh 集群间数据同步、Kafka Streams 应用位移重置

核心脚本详解:从基础到进阶的操作指南

基础工具:消息生产消费的入口

kafka-console-producer.sh:快速验证消息生产

# 基础用法:向指定主题发送消息
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
​
# 高级用法:设置acks=-1保证持久化,启用LZ4压缩
bin/kafka-console-producer.sh --broker-list kafka-host:port --topic test-topic \
  --producer-property acks=-1 --producer-property compression.type=lz4
  • 参数解析

    • --producer-property:支持动态设置生产者参数,如 linger.ms=100 优化批量发送。

    • --property:设置消息格式(如 key.separator=: 支持键值对消息)。

  • 注意事项

    • 若启用 SASL 认证,需通过 --producer.config 指定认证配置文件。

    • 生产大消息时需调整 max.request.size 参数(默认 1MB)。

kafka-console-consumer.sh:灵活控制消费行为

# 从头开始消费,禁用自动提交
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic \
  --group my-consumer-group --from-beginning --no-auto-commit
​
# 按时间窗口回溯消费(如最近30分钟)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic \
  --group my-consumer-group --offset-commit-time "2023-01-01T12:00:00+08:00"
  • 参数解析

    • --from-beginning:等价于设置 auto.offset.reset=earliest,覆盖默认的 latest 策略。

    • --max-messages:限制消费的最大消息数,适用于测试场景。

  • 最佳实践

    • 消费历史数据时,建议先通过 kafka-consumer-groups.sh 重置位移,再启动消费者。

    • 结合 --formatter 参数(如 kafka.tools.DefaultMessageFormatter)可自定义消息输出格式。

集群管理:主题与分区的精细控制

kafka-topics.sh:主题全生命周期管理

# 创建主题:3个分区,2个副本,消息保留7天
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic my-topic \
  --partitions 3 --replication-factor 2 --config retention.ms=604800000
​
# 查询主题详情
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-topic
​
# 增加分区数(只能扩容不能缩容)
bin/kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic my-topic \
  --partitions 5
  • 参数解析

    • --config:动态设置主题级参数,如 cleanup.policy=compact 启用日志压缩。

    • --delete:标记主题为删除状态,需配合 delete.topic.enable=true 配置。

  • 常见问题

    • 分区数增加后,原有消息不会自动迁移到新分区,新消息按分区策略分配。

    • 副本数调整需通过 kafka-reassign-partitions.sh 实现。

kafka-reassign-partitions.sh:分区副本的智能迁移

# 生成副本重分配计划
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topic my-topic \
  --broker-list "0,1,2" --generate
​
# 执行重分配计划(需替换实际生成的JSON内容)
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute \
  --reassignment-json-file reassignment-plan.json
​
# 验证重分配状态
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --verify \
  --reassignment-json-file reassignment-plan.json
  • 适用场景

    • 平衡集群负载:将热点分区的副本迁移到空闲 Broker。

    • 数据恢复:在 Broker 故障后重建副本。

  • 注意事项

    • 重分配过程会消耗额外的网络和 IO 资源,建议在业务低峰期执行。

    • 若出现 IN_PROGRESS 状态,可通过 --abort 参数终止并回滚。

性能测试:吞吐量与延迟的精准评估

kafka-producer-perf-test.sh:生产者性能压测

# 发送100万条消息,每条1KB,测试同步发送性能
bin/kafka-producer-perf-test.sh --topic test-topic --num-records 1000000 \
  --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092 acks=1
​
# 输出解析
2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency
  • 参数解析

    • --throughput:设置目标吞吐量(-1 表示不限制)。

    • --producer-props:支持设置生产者参数,如 compression.type=lz4 降低网络带宽。

  • 性能优化

    • 调整 batch.sizelinger.ms 参数可平衡吞吐量和延迟。

    • 启用 acks=1 可在保证数据不丢失的前提下提升性能。

kafka-consumer-perf-test.sh:消费者性能验证

# 消费100万条消息,测试拉取速度
bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --messages 1000000 \
  --topic test-topic --group consumer-test
​
# 输出解析
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2023-01-01 12:00:00, 2023-01-01 12:00:10, 9765.62, 1723.24, 10000000, 176460
  • 参数解析

    • --fetch-size:设置单次拉取的最大数据量(默认 1MB)。

    • --threads:指定消费线程数,提升并发处理能力。

  • 注意事项

    • 若消费速度滞后,可增加消费者实例数或调整分区数。

    • 该脚本未计算分位数延迟,建议结合监控工具(如 Prometheus)进行深度分析。

高级运维:数据修复与集群同步的关键工具

数据运维:消息级别的精细操作

kafka-dump-log.sh:消息文件的深度解析

# 查看消息批次元数据
bin/kafka-dump-log.sh --files /path/to/kafka-logs/test-topic-0/00000000000000000000.log
​
# 显示每条消息的详细内容
bin/kafka-dump-log.sh --files /path/to/kafka-logs/test-topic-0/00000000000000000000.log \
  --deep-iteration --print-data-log
  • 输出解析

    • baseOffset:批次起始位移。

    • CreateTime:消息创建时间(毫秒级时间戳)。

    • valuesize:消息体大小(字节)。

  • 典型场景

    • 定位损坏消息:通过 CRC32 校验值判断消息完整性。

    • 分析压缩效果:比较压缩前后的消息大小,优化压缩算法选择。

kafka-delete-records.sh:精准删除分区消息

# 删除分区0中偏移量1000-2000的消息
bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --topic test-topic \
  --partitions 0 --offset-json-file delete-plan.json
​
# delete-plan.json 内容示例
{
  "partitions": [
    {
      "topic": "test-topic",
      "partition": 0,
      "offsets": [1000, 2000]
    }
  ]
}
  • 注意事项

    • 删除操作是异步的,需通过 --verify 参数检查状态。

    • 该脚本不会立即物理删除数据,而是标记为可清理,依赖日志压缩或保留策略最终删除。

集群同步:跨集群数据复制的利器

kafka-mirror-maker.sh:集群间数据镜像

# 基础配置:从源集群同步到目标集群
bin/kafka-mirror-maker.sh --consumer.config source.properties \
  --producer.config target.properties --whitelist ".*"

# source.properties 示例
bootstrap.servers=source-cluster:9092
group.id=mirror-maker-group
auto.offset.reset=earliest

# target.properties 示例
bootstrap.servers=target-cluster:9092
acks=all
compression.type=lz4
  • 参数解析

    • --whitelist:正则表达式匹配需要同步的主题。

    • --num.streams:指定复制线程数,提升同步速度。

  • 高级配置

    • 启用 --sync.topic.acls 同步主题 ACL 信息。

    • 通过 --transforms 参数对消息进行过滤或转换(如脱敏处理)。

安全与权限管理:保障集群访问控制

kafka-acls.sh:细粒度的访问控制

# 允许用户alice读取主题test-topic
bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:alice \
  --operation Read --topic test-topic

# 查看所有ACL规则
bin/kafka-acls.sh --bootstrap-server localhost:9092 --list

# 删除ACL规则
bin/kafka-acls.sh --bootstrap-server localhost:9092 --delete --allow-principal User:alice \
  --operation Read --topic test-topic
  • 权限模型

    • Principal:支持用户(User)、角色(Role)或服务账户(如 Kerberos 主体)。

    • Operation:包括 Read、Write、Describe、Create 等操作。

  • 最佳实践

    • 结合 --deny-principal 参数实现黑名单机制。

    • 定期审计 ACL 规则,清理冗余权限。

kafka-delegation-tokens.sh:轻量级认证令牌管理

# 创建Delegation Token
bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create \
  --principal User:alice --expiry-time 86400000

# 列出所有有效令牌
bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --list

# 撤销令牌
bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --revoke \
  --token 1234567890abcdef
  • 适用场景

    • 临时授权:允许第三方应用在有限时间内访问集群。

    • 减少 SASL 认证开销:适用于频繁认证的场景。

  • 注意事项

    • 令牌过期后需重新生成,建议结合自动化脚本管理。

    • 启用 SSL 传输可增强令牌安全性。

版本兼容性与故障排查:脚本的进阶应用

kafka-broker-api-versions.sh:验证客户端与服务器兼容性

# 测试客户端与Broker的API版本兼容性
bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092

# 输出解析(部分内容)
Produce(0): 0 to 7 [usable:7]
Fetch(1): 0 to 10 [usable:10]
Listoffsets(2): 0 to 5 [usable:5]
  • 关键指标

    • usable:表示客户端支持的最高 API 版本。

    • usable 值低于服务器版本,可能导致功能降级。

  • 版本兼容性

    • 0.10.2.0 之前版本为单向兼容(高版本 Broker 支持低版本 Client)。

    • 0.10.2.0 及之后版本支持双向兼容。

故障排查:脚本的综合应用

消费者组位移异常

  • 问题现象:消费者组位移滞后(Lag 持续增大)。

  • 排查步骤

    1. 使用 kafka-consumer-groups.sh --describe 查看各分区位移详情。

    2. 检查消费者日志,确认是否存在反压或异常错误。

    3. 若位移未提交,通过 --reset-offsets 重置到安全点。

副本同步异常

  • 问题现象:分区 ISR 列表缩小,副本滞后。

  • 排查步骤

    1. 使用 kafka-topics.sh --describe 查看 ISR 状态。

    2. 检查 Broker 日志,确认是否存在磁盘 I/O 瓶颈或网络延迟。

    3. 执行 kafka-preferred-replica-election.sh 触发 Leader 切换。

总结

常见工具脚本

生产消息:kafka-console-producer 脚本

核心功能:快速验证消息生产逻辑,支持键值对消息格式和高级参数配置。

# 基础用法:向指定主题发送消息
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
​
# 高级用法:设置acks=-1保证持久化,启用LZ4压缩
bin/kafka-console-producer.sh --broker-list kafka-host:port --topic test-topic \
  --producer-property acks=-1 --producer-property compression.type=lz4
  • 参数解析

    • --producer-property:动态设置生产者参数,如 linger.ms=100 优化批量发送。

    • --property:设置消息格式(如 key.separator=: 支持键值对消息)。

  • 注意事项

    • 若启用 SASL 认证,需通过 --producer.config 指定认证配置文件。

    • 生产大消息时需调整 max.request.size 参数(默认 1MB)。

  • 典型场景

    • 测试消息格式兼容性(如 JSON、PROTOBUF)。

    • 模拟高并发生产场景(配合 --batch-size 参数)。

消费消息:kafka-console-consumer 脚本

核心功能:灵活控制消费行为,支持回溯历史数据和自定义消息输出格式。

# 从头开始消费,禁用自动提交
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic \
  --group my-consumer-group --from-beginning --no-auto-commit
​
# 按时间窗口回溯消费(如最近30分钟)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic \
  --group my-consumer-group --offset-commit-time "2023-01-01T12:00:00+08:00"
  • 参数解析

    • --from-beginning:等价于设置 auto.offset.reset=earliest,覆盖默认的 latest 策略。

    • --max-messages:限制消费的最大消息数,适用于测试场景。

  • 高级技巧

    • 结合 --formatter 参数(如 kafka.tools.DefaultMessageFormatter)可自定义消息输出格式。

    • 使用 --partition 参数指定消费特定分区。

  • 注意事项

    • 消费历史数据时,建议先通过 kafka-consumer-groups.sh 重置位移,再启动消费者。

生产者性能测试:kafka-producer-perf-test 脚本

核心功能:精准评估生产者吞吐量、延迟指标,支持压缩和事务测试。

# 发送100万条消息,每条1KB,测试同步发送性能
bin/kafka-producer-perf-test.sh --topic test-topic --num-records 1000000 \
  --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092 acks=1
​
# 输出解析
2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency
  • 参数解析

    • --throughput:设置目标吞吐量(-1 表示不限制)。

    • --producer-props:支持设置生产者参数,如 compression.type=lz4 降低网络带宽。

  • 性能优化

    • 调整 batch.sizelinger.ms 参数可平衡吞吐量和延迟。

    • 启用 acks=1 可在保证数据不丢失的前提下提升性能。

  • 注意事项

    • 测试结果受客户端机器性能影响,建议在专用测试环境执行。

消费者性能测试:kafka-consumer-perf-test 脚本

核心功能:验证消费者拉取速度,支持多线程并发测试。

# 消费100万条消息,测试拉取速度
bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --messages 1000000 \
  --topic test-topic --group consumer-test
​
# 输出解析
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2023-01-01 12:00:00, 2023-01-01 12:00:10, 9765.62, 1723.24, 10000000, 176460
  • 参数解析

    • --fetch-size:设置单次拉取的最大数据量(默认 1MB)。

    • --threads:指定消费线程数,提升并发处理能力。

  • 注意事项

    • 若消费速度滞后,可增加消费者实例数或调整分区数。

    • 该脚本未计算分位数延迟,建议结合监控工具(如 Prometheus)进行深度分析。

查看主题消息总数:GetOffsetShell 工具类

核心功能:通过计算最早位移(logStartOffset)和最新位移(logEndOffset)的差值,获取主题总消息数。

# 计算单个分区的消息数
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 \
  --topic test-topic --time -1 | awk -F':' '{print $3 - $2}'
​
# 计算所有分区总消息数
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test-topic \
  | grep Partition | awk '{print $3 - $2}' | awk '{s+=$1} END {print s}'
  • 参数解析

    • --time -1:获取最新位移,--time -2 获取最早位移。

    • --partitions:指定分区编号(默认查询所有分区)。

  • 注意事项

    • 该工具需 Kafka 0.10.2.0 及以上版本支持。

    • 消息总数可能因日志压缩(Log Compaction)而不准确。

查看消息文件数据:kafka-dump-log 脚本

核心功能:解析消息文件内容,支持元数据和消息体的深度分析。

# 查看消息批次元数据
bin/kafka-dump-log.sh --files /path/to/kafka-logs/test-topic-0/00000000000000000000.log
​
# 显示每条消息的详细内容
bin/kafka-dump-log.sh --files /path/to/kafka-logs/test-topic-0/00000000000000000000.log \
  --deep-iteration --print-data-log
  • 输出解析

    • baseOffset:批次起始位移。

    • CreateTime:消息创建时间(毫秒级时间戳)。

    • valuesize:消息体大小(字节)。

  • 典型场景

    • 定位损坏消息:通过 CRC32 校验值判断消息完整性。

    • 分析压缩效果:比较压缩前后的消息大小,优化压缩算法选择。

  • 注意事项

    • --deep-iteration 参数会显著增加解析时间,仅建议在小文件上使用。

查询消费者组位移:kafka-consumer-groups 脚本

核心功能:监控消费进度、重置位移策略,支持分区级和主题级 Lag 分析。

# 查看消费者组位移详情
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
​
# 重置位移到最早位置
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group \
  --reset-offsets --to-earliest --execute
  • 参数解析

    • --describe:显示分区级位移、Lag 和消费者实例信息。

    • --reset-offsets:支持 --to-earliest--to-latest--to-offset 等策略。

  • 特殊场景处理

    • 消费者组未启动时,CONSUMER-ID 列显示为 -,但 Lag 值依然有效。

    • 旧版本(0.10.2.0 之前)不支持查询非活跃消费者组,需升级版本。

最佳实践总结

  1. 参数标准化:统一使用 --bootstrap-server 替代 --zookeeper,避免安全漏洞。

  2. 操作原子性:重要操作前执行 --dry-run 预检查,如 kafka-reassign-partitions.sh --generate

  3. 监控与日志:结合 kafka-log-dirs.sh 监控磁盘占用,通过 kafka-dump-log.sh 分析消息内容。

  4. 自动化运维:将常用脚本封装为 Ansible 或 Kubernetes Job,实现集群操作的自动化。

延伸思考

Kafka 脚本的灵活性与强大功能使其成为运维和开发的核心工具。在实际应用中,需结合业务场景选择合适的脚本组合,同时通过监控和日志分析持续优化集群性能。未来,随着 Kafka 生态的不断扩展,脚本的功能将更加丰富,成为构建高效、可靠的分布式系统的关键支撑。


网站公告

今日签到

点亮在社区的每一天
去签到