#!/bin/bash
#IT_BEGIN
#IT_TYPE=1
#IT MIDWARE_KAFKA_IP|ipAddress
#IT MIDWARE_KAFKA_PORT|port
#IT MIDWARE_KAFKA_CREATETOPICENABLE|topic
#IT MIDWARE_KAFKA_BROKERID|broker_id
#IT MIDWARE_KAFKA_INSTALDIR|install_dir
#IT MIDWARE_KAFKA_GROUPID|group_id
#IT MIDWARE_KAFKA_LOGINTERVALMS|log.retention.check.interval.ms
#IT MIDWARE_KAFKA_LOGSEGMENTBYTE|log.segment.bytes
#IT MIDWARE_KAFKA_LISTENERS|listeners
#IT MIDWARE_KAFKA_SOCKETRECBUFBYTE|socket.receive.buffer.bytes
#IT MIDWARE_KAFKA_SOCKETREQUESTMAXBYTE|socket.request.max.bytes
#IT MIDWARE_KAFKA_SOCKETSENDBUFBYTE|socket.send.buffer.bytes
#IT MIDWARE_KAFKA_RETENTIONHOURS|log.retention.hours
#IT MIDWARE_KAFKA_KAFKAVERSION|kafkaVersion
#IT MIDWARE_KAFKA_JMXPORT|jmxPort
#IT MIDWARE_KAFKA_ServiceName|ServiceName
#IT MIDWARE_KAFKA_describe|describe
#IT MIDWARE_KAFKA_CurrentTime|CurrentTime
#IT MIDWARE_KAFKA_ifautodis|ifautodis
#IT MIDWARE_KAFKA_type|type
#IT MIDWARE_KAFKA_hostname|hostname
#IT MIDWARE_KAFKA_user|user
#IT MIDWARE_KAFKA_logpath|logpath
#新增指标初始内存,最大内存,broker默认分区数,broker副本数,集群控制器Socket超时时间(毫秒)
#IT MIDWARE_KAFKA_InitialMemory|initial_memory
#IT MIDWARE_KAFKA_MaxMemory|max_memory
#IT MIDWARE_KAFKA_NumPartitions|num_partitions
#IT MIDWARE_KAFKA_OffsetsTopicReplicationFactor|offsets_topic_replication_factor
#IT MIDWARE_KAFKA_ControllerSocketTimeoutMs|controller_socket_timeout_ms
#IT_END
shname=`basename $0`
ATTR="_X(g=$shname,p=cmdb,t=script,f=0)"
tmpfile="/tmp/${shname}_${$}.tmp"
# 获取Kafka进程信息 - 使用更可靠的查找方式
pid=$(ps aux | grep '[s]erver.properties' | head -1 | awk '{print $2}')
user=$(ps aux | grep '[s]erver.properties' | head -1 | awk '{print $1}')
if [ -z "$pid" ]; then
exit 127
fi
# 获取配置文件路径 - 确保正确获取server.properties路径
dir=$(ps aux | grep '[s]erver.properties' | head -1 | awk '{for(i=NF;i>0;i--){if($i~/server.properties$/){print $i;exit}}}')
if [ ! -f "$dir" ]; then
# 如果从进程信息中获取失败,尝试从标准路径查找
install_dir=$(ls -l /proc/$pid/cwd | awk '{print $NF}')
dir=$(find "$install_dir" -name "server.properties" | head -1)
fi
# 获取IP地址 - 使用您提供的可靠方式
if command -v ifconfig >/dev/null 2>&1; then
ip=$(ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v "broadcast 0.0.0.0" | grep -v inet6 | awk '{print $2}' | cut -d ':' -f 2 | head -1)
elif command -v ip >/dev/null 2>&1; then
ip=$(ip addr show | grep inet | grep -v 127.0.0.1 | grep -v "broadcast 0.0.0.0" | grep -v inet6 | awk '{print $2}' | cut -d '/' -f 1 | head -1)
elif command -v hostname >/dev/null 2>&1; then
ip=$(hostname -I | awk '{print $1}')
else
ip="N/A"
fi
# 解析配置文件
if [ -f "$dir" ]; then
# 创建临时配置文件(去除注释和空行)
grep -v '^#' "$dir" | grep -v '^$' > "$tmpfile"
# 基础配置 - 使用更可靠的提取方式
topic=$(grep -E '^auto\.create\.topic\.enable=' "$tmpfile" | tail -1 | cut -d'=' -f2)
broker_id=$(grep -E '^broker\.id=' "$tmpfile" | tail -1 | cut -d'=' -f2)
group_id=$(grep -E '^group\.id=' "$tmpfile" | tail -1 | cut -d'=' -f2)
log_retention_check_interval_ms=$(grep -E '^log\.retention\.check\.interval\.ms=' "$tmpfile" | tail -1 | cut -d'=' -f2)
log_segment_bytes=$(grep -E '^log\.segment\.bytes=' "$tmpfile" | tail -1 | cut -d'=' -f2)
listeners=$(grep -E '^listeners=' "$tmpfile" | tail -1 | cut -d'=' -f2)
socket_receive_buffer_bytes=$(grep -E '^socket\.receive\.buffer\.bytes=' "$tmpfile" | tail -1 | cut -d'=' -f2)
socket_request_max_bytes=$(grep -E '^socket\.request\.max\.bytes=' "$tmpfile" | tail -1 | cut -d'=' -f2)
socket_send_buffer_bytes=$(grep -E '^socket\.send\.buffer\.bytes=' "$tmpfile" | tail -1 | cut -d'=' -f2)
log_retention_hours=$(grep -E '^log\.retention\.hours=' "$tmpfile" | tail -1 | cut -d'=' -f2)
logpath=$(grep -E '^log\.dirs=' "$tmpfile" | tail -1 | cut -d'=' -f2)
num_partitions=$(grep -E '^num\.partitions=' "$tmpfile" | tail -1 | cut -d'=' -f2)
offsets_topic_replication_factor=$(grep -E '^offsets\.topic\.replication\.factor=' "$tmpfile" | tail -1 | cut -d'=' -f2)
controller_socket_timeout_ms=$(grep -E '^controller\.socket\.timeout\.ms=' "$tmpfile" | tail -1 | cut -d'=' -f2)
# 获取监听地址和端口 - 增强处理逻辑
if [ -z "$listeners" ]; then
addr=$(grep -E '^advertised\.host\.name=' "$tmpfile" | tail -1 | cut -d'=' -f2)
port=$(grep -E '^advertised\.port=' "$tmpfile" | tail -1 | cut -d'=' -f2)
else
addr=$(echo "$listeners" | awk -F"//" '{print $2}' | awk -F: '{print $1}')
port=$(echo "$listeners" | awk -F"//" '{print $2}' | awk -F: '{print $2}' | awk -F',' '{print $1}')
fi
# 如果addr为空,使用之前获取的IP
[ -z "$addr" ] && addr="$ip"
fi
# 获取安装目录
install_dir=$(ls -l /proc/$pid/cwd | awk '{print $NF}')
# 获取JVM内存配置
jvm_opts_file=$(find "$install_dir" -name "kafka-server-start.sh" | head -1)
if [ -f "$jvm_opts_file" ]; then
initial_memory=$(grep -oP '(?<=-Xms)[0-9]+[A-Za-z]*' "$jvm_opts_file" | head -1)
max_memory=$(grep -oP '(?<=-Xmx)[0-9]+[A-Za-z]*' "$jvm_opts_file" | head -1)
fi
# 获取Kafka版本 - 更可靠的版本提取方式
kafkaVersion=$(find "$install_dir" -name "*kafka_*.jar" -o -name "*kafka-*.jar" | head -1 | \
awk -F'-' '{for(i=2;i<NF;i++){if($i~/^[0-9]/){print $i;exit}}}')
# 获取JMX端口
jmxPort=$(grep -oP '(?<=jmxremote.port=)[0-9]+' "$jvm_opts_file" 2>/dev/null | head -1)
# 其他信息
hostname=$(hostname 2>/dev/null || echo "N/A")
ServiceName="${hostname}_${port:-N/A}"
describe="${hostname}_${port:-N/A}"
CurrentTime=$(date +"%Y-%m-%d %H:%M:%S")
ifautodis="TRUE"
type="Kafka"
# 输出结果(所有指标都设置默认值)
echo "COL_DETAIL_START:"
echo "ipAddress[$ATTR]|+|${addr:-N/A}"
echo "port[$ATTR]|+|${port:-N/A}"
echo "topic[$ATTR]|+|${topic:-N/A}"
echo "broker_id[$ATTR]|+|${broker_id:-N/A}"
echo "install_dir[$ATTR]|+|${install_dir:-N/A}"
echo "group_id[$ATTR]|+|${group_id:-N/A}"
echo "log.retention.check.interval.ms[$ATTR]|+|${log_retention_check_interval_ms:-N/A}"
echo "log.segment.bytes[$ATTR]|+|${log_segment_bytes:-N/A}"
echo "listeners[$ATTR]|+|${listeners:-N/A}"
echo "socket.receive.buffer.bytes[$ATTR]|+|${socket_receive_buffer_bytes:-N/A}"
echo "socket.request.max.bytes[$ATTR]|+|${socket_request_max_bytes:-N/A}"
echo "socket.send.buffer.bytes[$ATTR]|+|${socket_send_buffer_bytes:-N/A}"
echo "log.retention.hours[$ATTR]|+|${log_retention_hours:-N/A}"
echo "kafkaVersion[$ATTR]|+|${kafkaVersion:-N/A}"
echo "jmxPort[$ATTR]|+|${jmxPort:-N/A}"
echo "ServiceName[$ATTR]|+|${ServiceName:-N/A}"
echo "describe[$ATTR]|+|${describe:-N/A}"
echo "ifautodis[$ATTR]|+|${ifautodis:-N/A}"
echo "CurrentTime[$ATTR]|+|${CurrentTime:-N/A}"
echo "type[$ATTR]|+|${type:-N/A}"
echo "hostname[$ATTR]|+|${hostname:-N/A}"
echo "user[$ATTR]|+|${user:-N/A}"
echo "logpath[$ATTR]|+|${logpath:-N/A}"
# 新增指标
echo "initial_memory[$ATTR]|+|${initial_memory:-N/A}"
echo "max_memory[$ATTR]|+|${max_memory:-N/A}"
echo "num_partitions[$ATTR]|+|${num_partitions:-N/A}"
echo "offsets_topic_replication_factor[$ATTR]|+|${offsets_topic_replication_factor:-N/A}"
echo "controller_socket_timeout_ms[$ATTR]|+|${controller_socket_timeout_ms:-N/A}"
echo "COL_DETAIL_END:"
# 清理临时文件
[ -f "$tmpfile" ] && rm -f "$tmpfile"
10.14.1.101执行结果如下:
[root@omptest101 ~]# sh COLT_CMDB_kafka_20250628.sh
COL_DETAIL_START:
ipAddress[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|10.14.1.101
port[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|9092
topic[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|N/A
broker_id[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|N/A
install_dir[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|/home/ntciom/kafka/kafka_2.12-3.5.1
group_id[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|N/A
log.retention.check.interval.ms[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|300000
log.segment.bytes[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|1073741824
listeners[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|PLAINTEXT://:9092,CONTROLLER://:9093
socket.receive.buffer.bytes[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|102400
socket.request.max.bytes[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|104857600
socket.send.buffer.bytes[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|102400
log.retention.hours[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|168
kafkaVersion[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|3.5.1/libs/kafka
jmxPort[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|9999
ServiceName[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|omptest101_9092
describe[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|omptest101_9092
ifautodis[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|TRUE
CurrentTime[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|2025-06-28 15:53:21
type[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|Kafka
hostname[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|omptest101
user[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|kafka
logpath[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|/tmp/kraft-combined-logs
initial_memory[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|1G
max_memory[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|1G
num_partitions[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|1
offsets_topic_replication_factor[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|1
controller_socket_timeout_ms[_X(g=COLT_CMDB_kafka_20250628.sh,p=cmdb,t=script,f=0)]|+|N/A
COL_DETAIL_END:
补充指标:
JVM内存配置情况(初始/最大内存)
Broker的分区配置(默认分区数)
内部主题的副本配置(__consumer_offsets)
控制器相关参数(Socket超时时间)
# 新增指标中文对照表: initial_memory # JVM初始内存配置(Xms) max_memory # JVM最大内存配置(Xmx) num_partitions # broker默认分区数 offsets_topic_replication_factor # __consumer_offsets主题副本数 controller_socket_timeout_ms # 控制器Socket超时时间(毫秒)