1.简介
Flashcat 对接日志可以使用如下链路:
categraf -> kafka -> logstash -> es -> n9e
采集日志并在 n9e 上可视化和配置告警的逻辑如下:
使用categraf从 file/journald/tcp/udp 中采集日志数据,作为生产者推送到 kafka 中;
使用 logstash 作为消费者,消费 kafka 中的日志数据,按规则处理完后存入 es;
n9e 配置 es 的日志数据源,在日志分析模块查看和配置告警规则。
注意:从 kafka 消费日志数据存入 es 的 "kafka->logstash->es" 链路不是categraf封装的功能,需要自行下载相关工具配置,这也仅是数据消费存入es推荐的链路,读者可选择其它方式将kafka中存储的日志数据消费至es或其它n9e支持的日志存储源中
2.实际架构
架构服务设计, 相关信息已脱敏处理
服务 |
主机 |
categraf |
被采集客户端 |
Kafka |
消息队列服务器或集群 |
Logstash |
日志处理服务器 |
Elasticsearch |
Es服务器或集群 |
N9e |
夜莺服务器 |
3.操作步骤
3.1 Kafka环境准备
使用Kafka新版内置集群KRaft
`部署JDK21版本
# wget https://download.oracle.com/java/21/latest/jdk-21_linux-x64_bin.deb
# dpkg -i jdk-21_linux-x64_bin.deb
`确认java版本
# java --version
`安装kafka
# wget "https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz"
# tar -xzf kafka_2.13-3.9.0.tgz
# cd kafka_2.13-3.9.0
`生成集群ID, 使用 KRaft 部署 Kafka
# bin/kafka-storage.sh random-uuid
7XBm8Eb3To-BSvtzIRpdcw
`格式化存储目录
# bin/kafka-storage.sh format -t 7XBm8Eb3To-BSvtzIRpdcw -c config/kraft/server.properties
`启动Kafka
# bin/kafka-server-start.sh config/kraft/server.properties &
3.2 安装logstash
# apt -y install logstash
` 安装后可能找不到运行二进制文件,需要建立软链接
# ln -sv /usr/share/logstash/bin/logstash /usr/local/bin/
`查看版本可执行,说明安装成功
# logstash --version
3.3 安装Elasticsearch
`Debain10安装ES, 需要提前安装JDK
`导入官方GPG密钥并添加仓库
# wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | apt-key add -
# sh -c 'echo "deb https://artifacts.elastic.co/packages/8.x/apt stable main" > /etc/apt/sources.list.d/elastic-8.x.list'
`安装Elasticsearch
`此命令将自动安装最新版本(根据仓库配置
# apt update && apt install elasticsearch
# systemctl daemon-reload
# systemctl start elasticsearch.service
`修改Es的配置文件, 使放通所有地址
# vim /etc/elasticsearch/elasticsearch.yml
...取消注释或修改....
bootstrap.memory_lock: false
network.host: 0.0.0.0
http.port: 9200
.......
# systemctl restart elasticsearch.service
3.4 修改 categraf 配置文件
logs.toml 文件描述: https://flashcat.cloud/docs/content/flashcat-monitor/categraf/4-logs-agent/
# vim /opt/categraf/conf/logs.toml
[logs]
## just a placholder
api_key = "ef4ahfbwzwwtlwfpbertgq1i6mq0ab1q"
## enable log collect or not
enable = true
## the server receive logs, http/tcp/kafka, only kafka brokers can be multiple ip:ports with concatenation character ","
#send_to = "127.0.0.1:17878"
send_to = "kafka_server" # kafka服务器地址, 此处已做脱敏处理
## send logs with protocol: http/tcp/kafka
send_type = "kafka"
topic = "nginx_server_access" # kafka 主题
## send logs with compression or not
use_compress = false
## use ssl or not
send_with_tls = false
## send logs in batchs
batch_wait = 5
## save offset in this path
run_path = "/opt/categraf/run"
## max files can be open
open_files_limit = 100
## scan config file in 10 seconds
scan_period = 10
## read buffer of udp
frame_size = 9000
## channal size, default 100
## 读取日志缓冲区,行数
chan_size = 1000
## pipeline num , default 4
## 有多少线程处理日志
pipeline=4
## configuration for kafka
## 指定kafka版本
kafka_version="3.3.2"
# 默认0 表示串行,如果对日志顺序有要求,保持默认配置
batch_max_concurrence = 0
# 最大并发批次, 默认100
batch_max_size=100
# 每次最大发送的内容上限 默认1000000
batch_max_contentsize=1000000
# client timeout in seconds
producer_timeout= 10
# 是否开启sasl模式
sasl_enable = false
sasl_user = "admin"
sasl_password = "admin"
# PLAIN
sasl_mechanism= "PLAIN"
# v1
sasl_version=1
# set true
sasl_handshake = true
# optional
# sasl_auth_identity=""
#
##
# v0.3.39以上版本新增,是否开启pod日志采集
enable_collect_container=false
# 是否采集所有pod的stdout stderr
collect_container_all = true
## glog processing rules
# [[logs.Processing_rules]]
## single log configure
[[logs.items]]
## file/journald/tcp/udp
type = "file"
## type=file, path is required; type=journald/tcp/udp, port is required
#path = "/opt/tomcat/logs/*.txt"
path = "/data/log/nginx/access.log"
source = "www1"
service = "nginx_service"
# systemctl restart categraf
# systemctl status categraf
以上这个报错表示外部主机作为生产者无法连接到 Kafka 服务器并向其发送消息。你遇到的错误信息指出尝试连接 [::1]:9092
被拒绝,这表示连接尝试到了 IPv6 的回环地址
这里附上全中文配置文件参数:
############################# 服务器基础配置 #############################
# 设置服务器角色。设置此项会启用KRaft模式
process.roles=broker,controller
# 与此实例角色关联的节点ID
node.id=1
# 控制器仲裁的连接字符串
controller.quorum.voters=1@kafka_server # kafka服务器地址, 此处已做脱敏处理
############################# Socket服务器设置 #############################
# Socket服务器监听的地址
# 对于同时作为broker和controller的节点(即设置了`process.roles=broker,controller`),必须至少在这里列出controller监听器
listeners=PLAINTEXT://kafka_server,CONTROLLER://kafka_server # kafka服务器地址, 此处已做脱敏处理
# broker之间通信使用的监听器名称
inter.broker.listener.name=PLAINTEXT
# broker或controller向客户端广播的监听器名称、主机名和端口
advertised.listeners=PLAINTEXT://kafka_server,CONTROLLER://kafka_server # kafka服务器地址, 此处已做脱敏处理
# 控制器使用的监听器名称列表
controller.listener.names=CONTROLLER
# 监听器名称到安全协议的映射,默认情况下它们是相同的
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
############################# 日志基础配置 #############################
# 存储日志文件的目录列表,逗号分隔
log.dirs=/tmp/kraft-combined-logs
# 默认每个主题的日志分区数
num.partitions=1
# 每个数据目录用于在启动时恢复日志和关闭时刷新的日程数量
num.recovery.threads.per.data.dir=1
############################# 内部主题设置 #############################
# 组元数据内部主题 "__consumer_offsets" 和 "__transaction_state" 的复制因子
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# 日志保留策略 #############################
# 根据时间确定日志文件是否符合删除条件的最小年龄
log.retention.hours=168
# 日志段文件的最大大小
log.segment.bytes=1073741824
# 根据保留策略检查日志段是否可以删除的时间间隔
log.retention.check.interval.ms=300000
############################# 其他配置 #############################
# 根据你的网络和性能需求调整以下参数
num.network.threads=3
num.io.threads=8
# 缓冲区大小
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
3.5 使用logstash消费
使用 logstash 作为消费者,消费 kafka 数据存入es. 修改配置文件config.yaml 文件
# vim /etc/logstash/config.yaml
input {
kafka {
# Kafka服务地址
bootstrap_servers => "kafka_server" # kafka服务器地址, 此处已做脱敏处理
# 要订阅的主题
topics => ["tps_server_proxy"]
# 数据格式转换为json
codec => json
# 自定义类型标记
type => n9e
}
}
filter {
# 这里可以为空,也可以添加任何需要的数据过滤或转换逻辑
}
output {
elasticsearch {
# Elasticsearch服务地址
hosts => ["Elasticsearch_server"]
# 索引名称,这里使用了动态索引名称,每日创建一个新的索引,方便按日期进行数据管理
index => "flashcatcloud-n9e-%{+YYYY.MM.dd}"
}
}
# logstash -f /etc/logstash/config.yaml &> /root/log/logstash_start.log &
4.夜莺对接Es
4.1 n9e 配置 es 的日志数据源,在日志分析模块查看
但是导入查询后出现网络无法访问的情况.
这个时候使用es_serverip:9200访问发现, 无一例外全部都返回了 "curl: (52) Empty reply from server"
# curl -Lv "http://Elasticsearch_server/" ---> 使用外网IP
# curl -Lv "http://127.0.0.1:9200/" ---> 使用内网IP
这个时候通过查询Elasticesearch日志得知
分析日志
这两条日志的关键信息如下:
日志级别为WARN,表示有异常情况
日志来自o.e.h.n.Netty4HttpServerTransport,即Elasticsearch的HTTP服务器组件
日志内容提示"
received plaintext http traffic on an https channel
",即在HTTPS通道上接收到了明文HTTP流量Elasticsearch的应对措施是"closing connection",即关闭了这个连接
所以可以判断,Elasticsearch服务器期望客户端使用HTTPS发送请求,但实际收到的是明文HTTP请求,因此拒绝了这些请求。
同时日志中也提到了请求的源地址和目标地址,这里都是[0:0:0:0:0:0:0:1]
,即IPv6环回地址,相当于IPv4的127.0.0.1
,表示请求来自本机。目标端口为9200
,即Elasticsearch的默认端口
结论为:
Elasticsearch正在通过HTTPS监听请求,但是接收到了一个明文HTTP请求,因此它关闭了连接。这意味着你的Elasticsearch配置为仅接受HTTPS请求,而你尝试使用HTTP进行访问
解决方式:
使用HTTPS方式访问
需要部署证书, 方法参考如下: https://cloud.tencent.com/developer/article/2457640
设置允许HTTP明文通信(不推荐)
具体操作步骤如下:
将所有有关SSL证书的true改为False# vim /etc/elasticsearch/elasticsearch.yml ... # Enable security features xpack.security.enabled: false xpack.security.enrollment.enabled: false # Enable encryption for HTTP API client connections, such as Kibana, Logstash, and Agents xpack.security.http.ssl: enabled: false #enabled: true #keystore.path: certs/http.p12 # Enable encryption and mutual authentication between cluster nodes xpack.security.transport.ssl: enabled: false #enabled: true ... `重启elasticsearch # systemctl restart elasticsearch.service
再次测试访问:
Es测试
` 由于之前ES启动错误, 修复后需要重启logstash
# logstash -f /etc/logstash/config.yaml &> /root/log/logstash_start.log &
` 测试日志是否成功上报
# curl -X GET "Elasticsearch_server/flashcatcloud-n9e/_search" -H 'Content-Type: application/json' -d'
{
"size": 1,
"query": {
"match_all": {}
}
}'
{"took":89,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":5450,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"flashcatcloud-n9e","_id":"5OwihJUBrZI6SU62G6wh","_score":1.0,"_ignored":["event.original.keyword"],"_source":{"message":"2025-03-11 11:30:05.741\tINFO\t81.70.170.27:53726 - \"CONNECT www.baidu.com:443 HTTP/1.1\\r\\nHost: www.baidu.com:443\\r\\nProxy-Authorization: Basic dHBzdGVzdDp4a2Nwa2I=\\r\\nUser-Agent: Go-http-client/1.1\\r\\n\\r\\n\"","timestamp":1741663805829,"msg_key":"tps56/","event":{"original":"{\"message\":\"2025-03-11 11:30:05.741\\tINFO\\t81.70.170.27:53726 - \\\"CONNECT www.baidu.com:443 HTTP/1.1\\\\r\\\\nHost: www.baidu.com:443\\\\r\\\\nProxy-Authorization: Basic dHBzdGVzdDp4a2Nwa2I=\\\\r\\\\nUser-Agent: Go-http-client/1.1\\\\r\\\\n\\\\r\\\\n\\\"\",\"status\":\"info\",\"timestamp\":1741663805829,\"agent_hostname\":\"tps56\",\"fcservice\":\"tps_service\",\"fcsource\":\"tps56\",\"fctags\":\"{\\\"filename\\\":\\\"proxy.log\\\"}\",\"topic\":\"tps_server_proxy\",\"msg_key\":\"tps56/\"}"},"status":"info","@version":"1","type":"n9e","fctags":"{\"filename\":\"proxy.log\"}","fcsource":"tps56","topic":"tps_server_proxy","@timestamp":"2025-03-11T07:36:19.606912093Z","agent_hostname":"tps56","fcservice":"tps_service"}}]}}
使用夜莺查询
日志分析->即时查询中,选择查询类型为Elasticsearch、数据源为上图中设置的,即可查询日志
相关日志信息已做脱敏处理
4.2 配置日志告警规则
配置日志告警规则时,规则配置选择log指标,数据源选择es(用户也可以自行按需选择n9e支持的其它日志数据源),关联数据源选择4.1中设置的。接下来是重点,查询统计负责统计日志的数量,当统计的日志数量触发了告警条件,就会产生告警。下图中查询统计、告警条件配置的含义为,当your_index索引中,status字段为error的日志,在一分钟内出现的次数大于10条,就触发二级告警
5. 总结
资源消耗方面
服务名称 |
内存 |
CPU |
categraf |
83.4M |
3.2% |
Kafka |
1213M |
0.9% |
Logstash |
1539M |
7.0% |
Elasticsearch |
32.5G + 1.2G + 115M |
~ 10.0% |
N9e |
233.6M |
74.5 % |
5.1 优点
无缝集成现有监控平台:该方案无需额外搭建新的数据采集器和展示工具,能够直接利用现有的基础设施进行日志和指标的收集、处理与展示。这不仅简化了部署流程,还减少了学习成本和技术债务
强大的搜索和分析能力:Elasticsearch 提供了强大且灵活的数据查询和分析功能,支持复杂的搜索需求以及实时数据分析。这对于需要深度分析系统性能、用户行为等场景特别有用
扩展性强:通过构建Kafka集群用于消息队列管理和Elasticsearch集群用于数据存储与检索,可以轻松应对大规模数据量的增长,并保证系统的高可用性和可扩展性
5.2 缺点及改进建议
资源消耗大:
问题描述:Elasticsearch 对内存有较高要求,而Kafka作为中间件需要独立于Elasticsearch运行以确保稳定性和性能。这意味着至少需要两台服务器来分别承载这两个组件,为了实现高可用性和可扩展性,则需进一步构建各自的集群,增加了硬件成本和维护复杂度
改进建议:考虑采用更高效的资源配置策略,比如使用云服务提供的自动扩展功能来动态调整资源;或者评估是否可以通过优化索引设计、减少副本数量等方式降低单节点的压力
KQL使用难度较大:
问题描述:虽然Kibana Query Language (KQL) 提供了相对简单的语法,但对于初学者来说仍然存在一定的学习曲线,尤其是在处理复杂查询时。此外,Elasticsearch 默认要求输入为JSON格式,如果要接入如TPS/JDE/Nginx等不同来源的日志数据,则可能需要在Logstash中进行格式转换,增加了实施难度。
改进建议:提供详细的文档和支持材料帮助团队快速上手KQL;探索预定义查询模板或插件,简化常见查询的创建过程;针对非JSON格式的数据源,开发标准化的数据转换管道,减少手动配置的工作量
技术栈依赖单一语言(Java)导致的挑战:
问题描述:Elasticsearch、Logstash 和 Kafka 均基于Java开发,对于不熟悉Java的技术团队而言,在遇到性能瓶颈或错误排查时可能会面临较大困难。
改进建议:加强对团队成员关于Java相关知识的培训,提高对底层架构的理解;同时,考虑引入一些跨语言的监控工具或脚本,以便从不同的角度诊断问题。另外,也可以关注社区动态,及时获取最新的优化建议和技术更新