👉 点击关注不迷路
👉 点击关注不迷路
👉 点击关注不迷路
文章大纲
5.1.1 Filebeat + Logstash + ES + Kibana 全链路配置实
日志处理全链路架构与数据流向示意图
数据采集(Filebeat):Filebeat 作为轻量级的日志采集器,通过安装在各个数据源所在的主机上的 Filebeat 代理来工作
。它会监控指定的日志文件或目录,一旦有新的日志数据产生,就会将其收集起来,并发送给Logstash
进行进一步处理。数据处理(Logstash)
:Logstash 接收来自 Filebeat 的日志数据,通过配置的过滤器(Filter)对数据进行清洗、解析和转换等操作
。例如,过滤器可以提取日志中的关键信息(如时间戳、日志级别、消息内容等),并将其转换为结构化的数据格式。数据存储(Elasticsearch)
:Elasticsearch 是一个分布式的搜索引擎和数据存储系统
,它接收来自 Logstash 的结构化日志数据,并将其存储在索引中。数据可视化(Kibana)
:Kibana 是 Elastic 生态系统中的数据可视化工具,它连接到 Elasticsearch 集群,从存储的日志数据中提取信息,并以各种可视化的方式展示出来,如柱状图、折线图、饼图、仪表盘等
。用户可以通过 Kibana 对日志数据进行交互式分析,快速发现问题、趋势和模式
。
1. 架构设计与组件选型
1.1 技术栈对比分析
EPS
,代表着 Elasticsearch 集群每秒能够处理的事件数量。20000 EPS
意味着系统具有强大的并发处理能力,能够在每秒内接收、解析、索引和存储
大量的事件数据,展示了系统应对高流量数据输入的潜力
。QPS
,每秒查询率(Queries Per Second
)。
组件 | 推荐版本 | 核心功能 | 性能基准(单节点) |
---|---|---|---|
Filebeat |
8.12.2 | 轻量级日志采集 | 20,000 EPS (事件/秒) |
Logstash |
8.12.1 | 数据清洗与富化 | 15,000 EPS (含复杂处理) |
Elasticsearch |
8.12.0 | 分布式存储与检索 | 50,000 写入QPS |
Kibana |
8.12.0 | 可视化与分析 | 支持100并发查询 |
1.2 硬件配置推荐
节点类型 | CPU | 内存 | 存储 | 网络 | 数量 |
---|---|---|---|---|---|
Filebeat |
4核 | 8GB | 100GB SSD | 1Gbps | 按需 |
Logstash |
16核 | 32GB | 500GB NVMe(临时存储) | 10Gbps | 3 |
Elasticsearch |
32核 | 128GB | 4TB NVMe x3(RAID0) | 25Gbps | 5 |
Kibana |
8核 | 16GB | 200GB SSD | 1Gbps | 2 |
2. Filebeat 高级配置
2.1 多输入源配置
# 定义 Filebeat 的输入配置,可包含多个不同类型的输入源
filebeat.inputs:
# 第一个输入配置,类型为 filestream,用于收集文件流数据
- type: filestream
# 为该输入配置指定一个唯一的 ID,方便后续管理和识别
id: nginx-access
# 指定要收集的文件路径,可以使用通配符
# 这里表示收集 /var/log/nginx/ 目录下所有以 access.log 开头的文件
paths:
- /var/log/nginx/access.log*
# 为收集到的日志数据添加自定义字段
# 这里添加了 log_type 字段,值为 "nginx_access",方便后续分析和过滤
fields:
log_type: "nginx_access"
# 定义解析器,用于解析收集到的数据
# 这里使用 ndjson 解析器,~ 表示使用默认配置
parsers:
- ndjson: ~
# 第二个输入配置,类型为 container,用于收集容器日志
- type: container
# 为该输入配置指定一个唯一的 ID
id: docker-logs
# 指定要收集的容器日志文件路径
# 这里表示收集 /var/lib/docker/containers/ 目录下所有容器的日志文件
paths:
- '/var/lib/docker/containers/*/*.log'
# 定义处理器,用于对收集到的数据进行预处理
# 这里使用 add_docker_metadata 处理器,~ 表示使用默认配置
# 该处理器会为日志数据添加 Docker 容器的元数据,如容器 ID、名称等
processors:
- add_docker_metadata: ~
# 第三个输入配置,类型为 syslog,用于通过 UDP 协议收集 Syslog 数据
- type: syslog
# 配置 Syslog 的 UDP 协议相关参数
protocol.udp:
# 指定监听的主机和端口
# 0.0.0.0 表示监听所有可用的网络接口,端口为 5140
host: "0.0.0.0:5140"
# 为收集到的日志数据添加标签
# 这里添加了 "syslog" 标签,方便后续筛选和分类
tags: ["syslog"]
2.2 性能优化参数
# 配置 Filebeat 的内存队列相关参数
queue.mem:
# 定义内存队列中可以存储的最大事件数量
# 这里设置为 4096,表示内存队列最多能容纳 4096 个事件
# 当队列中的事件数量达到此上限时,新的事件可能会被阻塞或丢弃(取决于具体策略)
events: 4096
# 触发内存队列刷新的最小事件数量
# 当队列中的事件数量达到 1024 个时,Filebeat 会尝试将这些事件刷新到输出端
flush.min_events: 1024
# 内存队列刷新的超时时间
# 即使队列中的事件数量未达到 flush.min_events 的设定值,每经过 5 秒,Filebeat 也会将队列中的事件刷新到输出端
flush.timeout: 5s
# 配置 Filebeat 的输出,将收集到的事件发送到 Logstash
output.logstash:
# 指定 Logstash 服务的主机和端口信息
# "logstash-prod:5044" 表示 Logstash 服务所在的主机名为 logstash-prod,端口为 5044
hosts: ["logstash-prod:5044"]
# 设置并行工作线程的数量
# 这里设置为 8,表示 Filebeat 会使用 8 个并行线程将事件发送到 Logstash,以提高发送效率
worker: 8
# 设置数据传输时的压缩级别
# 取值范围通常是 0 - 9,0 表示不压缩,9 表示最高压缩比
# 这里设置为 3,是一个在压缩率和性能之间的平衡选择,能在一定程度上减少网络传输的数据量
compression_level: 3
# 启用负载均衡功能
# 当 hosts 配置中有多个 Logstash 实例时,Filebeat 会自动在这些实例之间进行负载均衡,避免某个实例负载过高
loadbalance: true
# 配置 Filebeat 的日志记录级别
# 设置为 warning 表示 Filebeat 只会记录警告级别及以上的日志信息
# 这样可以减少日志文件的大小,只关注可能影响系统正常运行的重要信息
logging.level: warning
3. Logstash 数据处理管道
3.1 多阶段处理流程
ruby
代码实现
# 定义 Logstash 的输入部分,用于接收数据
input {
# 使用 beats 输入插件,用于接收来自 Filebeat 等 Beats 系列工具发送的数据
beats {
# 指定监听的端口,Filebeat 会将数据发送到这个端口
port => 5044
# 启用 SSL 加密,确保数据在传输过程中的安全性
ssl => true
# 指定 SSL 证书的路径,用于 SSL 加密通信
ssl_certificate => "/etc/pki/tls/certs/logstash.crt"
# 指定 SSL 私钥的路径,与证书配合完成 SSL 加密
ssl_key => "/etc/pki/tls/private/logstash.key"
}
}
# 定义 Logstash 的过滤部分,用于对输入的数据进行处理和转换
filter {
# 针对 NGINX 访问日志进行解析
# 检查数据中的 [fields][log_type] 字段是否为 "nginx_access"
if [fields][log_type] == "nginx_access" {
# 使用 grok 过滤器,它可以根据正则表达式模式从日志消息中提取字段
grok {
# 定义匹配模式,用于解析 NGINX 访问日志的每一行
# 将不同的部分提取到对应的字段中,如客户端 IP、请求方法、响应状态码等
match => { "message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} %{NUMBER:bytes} "%{DATA:referrer}" "%{DATA:useragent}"' }
}
# 使用 date 过滤器,将时间戳字符串转换为 Logstash 内部的日期对象
date {
# 指定时间戳字段和对应的日期格式,用于解析时间
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
# 使用 useragent 过滤器,解析用户代理字符串
# 提取浏览器、操作系统等信息到新的 "ua" 字段中
useragent {
source => "useragent"
target => "ua"
}
}
# 通用字段处理部分,对所有接收到的数据都进行操作
mutate {
# 将 "response" 和 "bytes" 字段的数据类型转换为整数类型
convert => {
"response" => "integer"
"bytes" => "integer"
}
# 从数据中移除 "message" 字段,因为它的信息可能已经被解析到其他字段中
remove_field => ["message"]
}
}
# 定义 Logstash 的输出部分,用于将处理后的数据发送到目标位置
output {
# 使用 elasticsearch 输出插件,将数据发送到 Elasticsearch 集群
elasticsearch {
# 指定 Elasticsearch 集群的节点地址
hosts => ["es - node1:9200","es - node2:9200"]
# 定义索引名称,采用动态索引的方式,根据日期每天创建一个新的索引
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
# 指定索引模板的路径,用于在 Elasticsearch 中创建索引时的初始配置
template => "/etc/logstash/templates/nginx - template.json"
# 指定索引模板的名称
template_name => "nginx - access"
# 启用索引生命周期管理(ILM),可以自动管理索引的生命周期,如滚动、删除等
ilm_enabled => true
}
# 调试输出部分,在生产环境中建议关闭
# 将处理后的数据以易读的 Ruby 调试格式输出到标准输出
stdout {
codec => rubydebug { metadata => true }
}
}
3.2 Grok
性能优化表
Grok
是一个非常实用的工具,通常结合Logstash
一起使用,用于解析非结构化的日志数据并将其转换为结构化的数据,以便于在 Elasticsearch 中进行索引、搜索和分析。- 是一种基于正则表达式的模式匹配语言。
- 常用预定义模式
%{IPORHOST}
:用于匹配 IP 地址或主机名。%{USER}
:匹配用户名。%{HTTPDATE}
:匹配 HTTP 日期格式,如 01/Jan/2024:12:00:00 +0800。%{WORD}
:匹配一个单词。%{NUMBER}
:匹配一个数字。
模式复杂度 |
原始性能(事件/秒) | 优化策略 |
优化后性能 |
提升幅度 |
---|---|---|---|---|
简单模式 | 12,000 | 预编译正则表达式 | 15,000 | 25%↑ |
中等模式 | 8,500 | 使用Oniguruma 引擎 |
11,200 | 32%↑ |
复杂模式 | 3,200 | 模式分解+条件判断 | 5,800 | 81%↑ |
4. Elasticsearch 索引设计
4.1 ILM生命周期策略
PUT _ilm/policy/logs-policy
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "50gb",
"max_age": "7d"
},
"set_priority": {
"priority": 100
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"forcemerge": {
"max_num_segments": 1
},
"shrink": {
"number_of_shards": 1
}
}
},
"delete": {
"min_age": "30d",
"actions": {
"delete": {}
}
}
}
}
}
4.2 索引模板配置
// 创建一个名为 logs-template 的索引模板
PUT _index_template/logs-template
{
// 定义索引模板匹配的索引名称模式
// 这里表示该模板将应用于所有以 "logs-" 开头的索引
"index_patterns": ["logs-*"],
// 定义当创建匹配此模板的索引时所使用的设置和映射
"template": {
// 索引的设置部分,包含一些与索引性能、存储、生命周期等相关的参数
"settings": {
// 指定索引的主分片数量为 3
// 主分片用于存储索引数据,多个主分片可以实现数据的分布式存储和并行处理
"number_of_shards": 3,
// 指定每个主分片的副本分片数量为 1
// 副本分片是主分片的复制,用于提高数据的可用性和容错性
"number_of_replicas": 1,
// 指定该索引将使用名为 "logs-policy" 的索引生命周期管理策略
// 索引生命周期管理可以自动管理索引的各个阶段,如热、温、冷、删除等
"index.lifecycle.name": "logs-policy",
// 指定索引使用的压缩编解码器为 "best_compression"
// 该编解码器会以较高的压缩率对索引数据进行压缩,以节省磁盘空间,但可能会增加一定的 CPU 开销
"index.codec": "best_compression"
},
// 索引的映射部分,定义了索引中字段的类型和结构
"mappings": {
// 设置动态映射规则为 "strict"
// 这意味着只有在映射中显式定义的字段才能被索引,新字段不会自动添加到映射中
"dynamic": "strict",
// 定义索引中各个字段的具体属性
"properties": {
// 定义 "@timestamp" 字段的类型为日期类型
// 该字段通常用于存储日志事件的时间戳,方便进行时间范围的查询和分析
"@timestamp": { "type": "date" },
// 定义 "message" 字段的类型为文本类型
// 文本类型适用于存储较长的文本内容,支持全文搜索
"message": { "type": "text" },
// 定义 "response" 字段的类型为整数类型
// 该字段可能用于存储响应状态码等整数值
"response": { "type": "integer" },
// 定义 "geoip" 字段为对象类型
// 对象类型可以包含多个子字段,用于组织相关的信息
"geoip": {
"type": "object",
// 定义 "geoip" 对象中的子字段
"properties": {
// 定义 "location" 字段的类型为地理点类型
// 地理点类型用于存储地理位置信息,如经纬度,方便进行地理空间查询
"location": { "type": "geo_point" }
}
}
}
}
}
}
5. Kibana
可视化实战
5.1 仪表板配置要点
// 向 Kibana 的 API 发起 POST 请求,用于创建一个新的仪表盘(dashboard)对象
POST /api/saved_objects/dashboard
{
// 定义仪表盘对象的属性
"attributes": {
// 仪表盘的标题,这里设置为 "Nginx访问监控"
"title": "Nginx访问监控",
// 仪表盘的描述信息,说明该仪表盘用于实时流量分析
"description": "实时流量分析仪表板",
// 仪表盘上的面板配置,包含多个可视化面板
"panelsJSON": [
{
// 第一个面板的类型为时间序列图(timeseries)
"type": "timeseries",
// 该面板的标题为 "请求量趋势"
"title": "请求量趋势",
// 该面板的参数配置
"params": {
// 指定要查询的索引模式,这里表示查询所有以 "logs-nginx-" 开头的索引
"index": "logs-nginx-*",
// 指定时间字段,用于按照时间进行数据聚合和展示,这里使用 "@timestamp" 字段
"time_field": "@timestamp",
// 指定时间间隔为 1 小时,即按每小时进行数据聚合
"interval": "1h",
// 时间序列图的系列配置
"series": [
{
// 系列的名称为 "总请求量"
"name": "总请求量",
// 聚合方式为计数,即统计每个时间间隔内的请求数量
"aggregation": "count"
}
]
}
},
{
// 第二个面板的类型为饼图(pie)
"type": "pie",
// 该面板的标题为 "HTTP状态码分布"
"title": "HTTP状态码分布",
// 该面板的参数配置
"params": {
// 指定要查询的索引模式,同样查询所有以 "logs-nginx-" 开头的索引
"index": "logs-nginx-*",
// 分割模式为按词项(terms)分割,即根据某个字段的值进行分组
"split_mode": "terms",
// 指定用于分组的字段为 "response",通常这个字段存储的是 HTTP 状态码
"terms_field": "response",
// 显示前 5 个分组的结果,即只展示出现次数最多的 5 个 HTTP 状态码的分布情况
"size": 5
}
}
]
}
}
5.2 关键可视化类型对比
图表类型 | 适用场景 |
性能影响 | 数据精度 | 交互性 |
---|---|---|---|---|
时间序列图 | 流量趋势分析 | 低 | 高 | 强 |
热力图 | 异常检测 | 中 | 中 | 中 |
地理地图 |
IP分布分析 |
高 | 高 | 弱 |
数据表 | 原始日志查看 | 低 | 最高 | 弱 |
6. 全链路监控与调优
6.1 性能监控指标
组件 | 关键指标 |
健康阈值 | 告警阈值 |
监控工具 |
---|---|---|---|---|
Filebeat | Harvester活跃数 | < 1000 | > 2000 | Metricbeat |
Logstash | Pipeline延迟 | < 500ms | > 2s | Prometheus |
Elasticsearch | 索引延迟 | < 1s | > 5s | Elastic监控 |
Kibana | 查询响应时间 | < 3s | > 10s | APM |
Harvester
是 Filebeat 中负责实际读取文件内容的核心组件- Harvester 是 Filebeat 里的一个文件读取器,当 Filebeat 监测到有符合采集规则的文件时,会为每个文件启动一个 Harvester 实例。
- 它的主要任务是逐行读取文件内容,将读取到的行封装成事件,然后发送给 Filebeat 的 spooler 进行后续处理。
APM(Application Performance Monitoring)
- 用于对应用程序的性能进行监控和分析。
- 功能特点: 性能指标监控、分布式追踪、错误分析、用户体验监控。
6.2 典型瓶颈解决方案
场景:日志处理延迟突增
-
- 问题定位:
# 查看Logstash节点状态 GET _nodes/stats/pipelines?filter_path=nodes.*.pipelines # 检查热点线程 GET _nodes/hot_threads
-
- 优化策略:
# 管道配置用于控制 Logstash 处理事件的方式和效率 pipeline: # 批处理相关配置,影响 Logstash 一次处理的事件数量和处理时间间隔 batch: # 每次批处理的事件数量 # 原值为 250,现调整为 125 # 减小批量大小可能会降低每次处理的数据量,使处理更加灵活,适用于事件产生速度不均匀或者对实时性要求较高的场景 size: 125 # 批处理的延迟时间(单位:毫秒) # 原值为 100,现调整为 50 # 缩短延迟时间可以让 Logstash 更快地处理事件,提高数据处理的实时性 delay: 50 # 工作线程数量 # 原值为 4,现调整为 8 # 增加工作线程数量可以提高 Logstash 的并发处理能力,加快事件处理速度,但同时也会增加系统资源的消耗 workers: 8 # 是否按顺序处理事件 # 设置为 false 表示不按顺序处理,这样可以提高处理效率,但可能会导致事件处理的顺序与输入顺序不一致 # 对于对事件顺序要求不高的场景,关闭顺序处理可以提升性能 ordered: false # 以下是对 Logstash 运行时所使用的 Java 虚拟机(JVM)堆内存进行配置 # LS_JAVA_OPTS 是一个环境变量,用于设置 JVM 的启动参数 LS_JAVA_OPTS: "-Xms8g -Xmx8g" # -Xms8g 表示 JVM 堆内存的初始大小为 8GB # -Xmx8g 表示 JVM 堆内存的最大大小为 8GB # 增加堆内存可以让 Logstash 有更多的内存空间来处理大量的数据,减少因内存不足导致的性能问题和错误 # 但同时也需要确保系统有足够的物理内存支持,否则可能会导致系统性能下降或出现内存溢出错误
LS_JAVA_OPTS
通常是指与 Logstash(LS 可能是 Logstash 的缩写)相关的 Java 虚拟机(JVM)选项配置变量。Logstash 是基于 Java 开发的开源数据处理引擎,用于采集、处理和转发数据。- 通过合理配置
LS_JAVA_OPTS
,可以优化 Logstash 的性能、提高稳定性,并满足不同的运行需求和场景。
-
- 优化效果:
指标 优化前 优化后 提升比例 处理延迟 4.2s 0.8s 81%↓ CPU使用率
95% 68% 28%↓ 吞吐量 8K EPS 14K EPS 75%↑
7. 安全加固方案
7.1 传输层加密配置
# Filebeat配置部分,用于设置 Filebeat 输出数据的目标及相关安全配置
output.logstash:
# 指定 Logstash 服务的主机和端口,这里表示将数据发送到名为 "logstash" 的主机的 5044 端口
hosts: ["logstash:5044"]
# SSL 相关配置,用于启用和配置 Filebeat 与 Logstash 之间的 SSL 加密通信
ssl:
# 启用 SSL 加密,设置为 true 表示开启
enabled: true
# 证书颁发机构(CA)证书路径,用于验证 Logstash 服务器证书的合法性
# Filebeat 使用该 CA 证书来确认它连接的 Logstash 服务器是受信任的
certificate_authorities: ["/etc/pki/ca.crt"]
# Filebeat 客户端证书路径,用于向 Logstash 服务器进行身份验证
# Logstash 可以使用该证书来确认连接的客户端是合法的
certificate: "/etc/pki/client.crt"
# Filebeat 客户端私钥路径,与客户端证书配合使用,用于加密和解密通信数据
key: "/etc/pki/client.key"
# Elasticsearch配置部分,主要涉及 X-Pack 安全模块中传输层的 SSL 配置
xpack.security.transport.ssl:
# 启用传输层的 SSL 加密,设置为 true 表示开启
enabled: true
# 证书验证模式,这里设置为 "certificate" 表示进行证书验证
# 这意味着 Elasticsearch 会验证连接的对等方(如其他节点或客户端)的证书
verification_mode: certificate
# 密钥库路径,密钥库包含了 Elasticsearch 用于 SSL 通信的私钥和证书
# "certs/elastic-certificates.p12" 是存储密钥和证书的文件路径
keystore.path: certs/elastic-certificates.p12
# 信任库路径,信任库包含了 Elasticsearch 信任的证书颁发机构的证书
# 这里信任库路径与密钥库路径相同,表明使用相同的证书文件来验证对等方的证书
truststore.path: certs/elastic-certificates.p12
7.2 权限控制矩阵
角色 | 数据访问范围 | 操作权限 |
适用场景 |
---|---|---|---|
log_viewer | logs-* | read, view_index_metadata | 普通运维人员 |
log_admin | logs-* | manage, create_index | 系统管理员 |
alert_manager | .kibana-event-log-* | read, index | 监控告警系统 |
report_user |
特定索引模式 | read | 审计与报表生成 |
8. 实战案例:电商大促日志监控
8.1 场景参数
{
"业务场景": "双11大促监控",
"日志规模": {
"峰值QPS": "120,000 EPS",
"单日数据量": "8TB",
"保留周期": "30天"
},
"架构特性": [
"自动扩缩容",
"多级缓存",
"实时异常检测"
]
}
8.2 性能测试结果
测试阶段 | 写入延迟(p95) |
查询响应时间 | 系统可用性 |
---|---|---|---|
预热阶段 | 23ms | 280ms | 100% |
峰值压力 | 89ms | 1.2s | 99.98% |
故障恢复 | 自动切换5.8s | - | 99.95% |
p95
P95 指的是第 95 百分位数
。它是一种统计指标,将一组数据从小到大排序后,处于第 95% 位置的值就是 P95。- 例如,有 100 个数据,将它们按从小到大排列,第 95 个数据的值就是 P95。在性能测试和数据分析中,
P95 常用于衡量数据的分布和性能表现
。
附录:常用诊断命令速查表
功能 | Filebeat命令 | ES API端点 |
---|---|---|
查看采集状态 | filebeat test output |
GET _cat/indices?v |
检查管道状态 | journalctl -u filebeat |
GET _nodes/stats/ingest |
监控队列积压 | filebeat -e -d "*" |
GET _cat/thread_pool?v |
验证配置语法 | filebeat test config |
GET _cluster/pending_tasks |
最佳实践建议:
- 建议采用「
先索引模板后数据写入
」的流程- 日志类数据优先使用
时间序列索引模式
- 定期执行
_forcemerge
优化存储空间- 重要操作需通过变更管理系统审批