【kafka04】消息队列与微服务之Kafka 图形工具

发布于:2024-11-28 ⋅ 阅读:(12) ⋅ 点赞:(0)

Kafka 在 ZooKeeper 里面的存储结构

topic 结构  

/brokers/topics/[topic]

partition结构

/brokers/topics/[topic]/partitions/[partitionId]/state

broker信息

/brokers/ids/[o...N] 

控制器

/controller
存储center controller中央控制器所在kafka broker的信息 

消费者

/consumers/[groupId]/ids /[consumerIdstring]
每个consumer都有一个唯一的ID,此id用来标记消费者信息
消费者管理者:
/consumers/[groupId]/owners/[topic]/[partitionid] 

图形工具 Offset Explorer (Kafka Tool)

Offset Explorer ,旧称Kafka Tool,工具是一个 GUI 应用程序,用于管理和使用 Apache Kafka 群集。它提供了一个直观的 UI,允许人们快速查看 Kafka 群集中的对象以及存储在群集主题中的消息。它包含面向开发人员和管理员的功能。一些关键功能包括

  • 快速查看您的所有 Kafka 集群,包括其经纪人、主题和消费者

  • 查看分区中邮件的内容并添加新邮件

  • 查看消费者的偏移量,包括阿帕奇风暴卡夫卡喷口消费者

  • 以漂亮的打印格式显示 JSON和 XML 消息

  • 添加和删除主题以及其他管理功能

  • 将单个邮件从分区保存到本地硬盘驱动器

  • 编写自己的插件,允许您查看自定义数据格式

  • Kafka 工具在Windows、Linux 和 Mac 操作系统上运行

官网:

Offset Explorer

下载链接:

Offset Explorer

 

         

>^C[root@node1 ~]#/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 10.0.0.187:9092 --topic kun
>hello1

查看数据

基于Web的Kafka集群监控系统 kafka-eagle

介绍

Kafka eagle(kafka鹰) 是一款由国内公司开源的Kafka集群监控系统,可以用来监视kafka集群的broker状态、Topic信息、IO、内存、consumer线程、偏移量等信息,并进行可视化图表展示。独特的KQL还可以通过SQL在线查询kafka中的数据。

官方地址

http://www.kafka-eagle.org/
https://github.com/smartloli/kafka-eagle-bin
https://www.cnblogs.com/smartloli/ 

安装

安装说明

https://docs.kafka-eagle.org/2.installation
https://www.cnblogs.com/smartloli/p/16728995.html 

安装 JAVA

 apt update && apt -y install openjdk-8-jdk 

下载安装

wget https://github.com/smartloli/kafka-eagle-bin/archive/refs/tags/v3.0.2.tar.gz 

解压安装包

tar zxf kafka-eagle-bin-3.0.2.tar.gz 
cd kafka-eagle-bin-3.0.2/ 
tar -zxvf kafka-eagle-web-3.0.2-bin.tar.gz -C /usr/local/
ln -s /usr/local/kafka-eagle-web-3.0.2 /usr/local/kafka-eagle-web 

设置全局变量

设置相关全局变量KE_HOME

vi /etc/profile
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export KE_HOME=/usr/local/kafka-eagle-web
export PATH=$PATH:$KE_HOME/bin
. /etc/profile 

修改配置文件

vim /usr/local/kafka-eagle-web/conf/system-config.properties
###################################### 
# 填写 zookeeper集群列表
#kafkazookeeper节点配置属性多个可以添加一个cluster1,如果有多套kafka集群加多个名称
efak.zk.cluster.alias=cluster1,cluster2 
######################################
#zookeeper地址
######################################修改此处
cluster1.zk.list=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181
#cluster2.zk.list=10.0.0.201:2181,10.0.0.202:2181,10.0.0.203:2181
######################################
# broker 最大规模数量
###################################### 
cluster1.efak.broker.size=20
######################################
# zk 客户端线程数#####################
################ 
kafka.zk.limit.size=32
######################################
# EFAK webui 端口
######################################
fak.webui.port=8048
####################################
# kafka offset storage 
###################################### 
cluster1.efak.offset.storage=kafka 
cluster2.efak.offset.storage=zk
######################################
# kafka jmx uri 
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jn di/rmi://%s/jmxrmi
######################################
# kafka metrics 指标,默认存储15天
###################################### 
efak.metrics.charts=true
efak.metrics.retain=15
######################################
# kafka sql topic records max 
###################################### 
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10
######################################
# delete kafka topic token 
###################################### 
efak.topic.token=keadmin
######################################
# kafka sqlite 数据库地址(需要修改存储路径) 
######################################修改此处,取消下面四行注释
efak.driver=org.sqlite.JDBC
efak.url=jdbc:sqlite:/usr/local/kafka-eagle-web/db/ke.db
efak.username=root
efak.password=www.kafka-eagle.org
######################################
# kafka mysql 数据库地址(需要提前创建ke库) 
######################################修改此处添加注释下面四行 
#efak.driver=com.mysql.cj.jdbc.Driver 
#efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF8&zeroDateTimeBehavior=convertToNull 
#efak.username=ke
#efak.password=123456
配置文件参考
[root@node1 ~]#cat /usr/local/kafka-eagle-web/conf/system-config.properties
######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
######################################
efak.zk.cluster.alias=cluster1,cluster2
cluster1.zk.list=10.0.0.187:2181,10.0.0.188:2181,10.0.0.189:2181
#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181

######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123

######################################
# broker size online list
######################################
cluster1.efak.broker.size=20

######################################
# zk client thread limit
######################################
kafka.zk.limit.size=16

######################################
# EFAK webui port
######################################
efak.webui.port=8048

######################################
# EFAK enable distributed
######################################
efak.distributed.enable=false
efak.cluster.mode.status=master
efak.worknode.master.host=localhost
efak.worknode.port=8085

######################################
# kafka jmx acl and ssl authenticate
######################################
cluster1.efak.jmx.acl=false
cluster1.efak.jmx.user=keadmin
cluster1.efak.jmx.password=keadmin123
cluster1.efak.jmx.ssl=false
cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
cluster1.efak.jmx.truststore.password=ke123456

######################################
# kafka offset storage
######################################
cluster1.efak.offset.storage=kafka
cluster2.efak.offset.storage=zk

######################################
# kafka jmx uri
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi

######################################
# kafka metrics, 15 days by default
######################################
efak.metrics.charts=true
efak.metrics.retain=15

######################################
# kafka sql topic records max
######################################
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10

######################################
# delete kafka topic token
######################################
efak.topic.token=keadmin

######################################
# kafka sasl authenticate
######################################
cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
cluster2.efak.sasl.enable=false
cluster2.efak.sasl.protocol=SASL_PLAINTEXT
cluster2.efak.sasl.mechanism=PLAIN
cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
cluster2.efak.sasl.client.id=
cluster2.efak.blacklist.topics=
cluster2.efak.sasl.cgroup.enable=false
cluster2.efak.sasl.cgroup.topics=

######################################
# kafka ssl authenticate
######################################
cluster3.efak.ssl.enable=false
cluster3.efak.ssl.protocol=SSL
cluster3.efak.ssl.truststore.location=
cluster3.efak.ssl.truststore.password=
cluster3.efak.ssl.keystore.location=
cluster3.efak.ssl.keystore.password=
cluster3.efak.ssl.key.password=
cluster3.efak.ssl.endpoint.identification.algorithm=https
cluster3.efak.blacklist.topics=
cluster3.efak.ssl.cgroup.enable=false
cluster3.efak.ssl.cgroup.topics=

######################################
# kafka sqlite jdbc driver address
######################################
efak.driver=org.sqlite.JDBC
efak.url=jdbc:sqlite:/usr/local/kafka-eagle-web/db/ke.db
efak.username=root
efak.password=www.kafka-eagle.org

######################################
# kafka mysql jdbc driver address
######################################
#efak.driver=com.mysql.cj.jdbc.Driver
#efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
#efak.username=root
#efak.password=123456

启动
/usr/local/kafka-eagle-web/bin/ke.sh start
登录
http://localhost:8048
默认账号:admin 
默认密码:123456
修改和监控 Kafka
#所有kafka节点修改配置
[root@node1 ~]#vim /usr/local/kafka/bin/kafka-server-start.sh
......
if[ " x$KAFKA_HEAP_OPTS"="x"] ; then
	export KAFKA_HEAP_OPTS=" -Xmx1G-Xms1G"  
	export JMX_PORT="9999" #添加此行
fi
......
[root@node1 ~]#systemctl restart kafka


网站公告

今日签到

点亮在社区的每一天
去签到