一.ELK架构
Elasticsearch + Logstash + Kibana
数据库+日志处理+日志显示
1.logstash的使用
(1)input:输入
(2)filter:处理
(3)output:输出
二.ELFK架构
Filebeat-->Elasticsearch-->Logstash-->Kibana
Filebeat 部署在节点上轻量采集日志,Logstash 集中处理。
ELK 的弊端:
NFS数据共享会成为流量瓶颈,不适合大规模集群
是否可以在每台 Web 服务器上安装 Logstash
Logstash占用资源多,在节点部署会争抢应用的资源
ELFK 优势:
Filebeat占用资源非常小,可以在所有节点部署
在每台Web服务器上安装客户端,通过网络发送日志,没有单点故障,没有流量瓶颈
filebeat的配置文件
test可以设置为业务名
filebeat直接区分不同服务的日志
示例filebeat:
filebeat.inputs:
- type: filestream
id: oss-logs
paths:
- /var/oss/*.log
fields:
log_source: "oss"
fields_under_root: true
- type: filestream
id: dps-logs
paths:
- /var/dps/*.log
fields:
log_source: "dps"
fields_under_root: true
output.logstash: # 输出到 Logstash
hosts: ["your-logstash-host:5044"] # 替换为 Logstash 地址
# 示例2
# ============================== Filebeat inputs ===============================
filebeat.inputs:
# 收集 OSS 服务日志
- type: filestream
enabled: true
id: oss-service
paths:
- /var/log/oss/*.log
- /var/log/oss/*.log.gz
fields:
log_source: "oss" # 关键字段,用于标识日志来源
environment: "production" # 可选:添加环境标识
fields_under_root: true # 将 fields 提升到事件顶级
# 可选:为OSS日志添加特定标签
tags: ["oss", "application"]
# 收集 DPS 服务日志
- type: filestream
enabled: true
id: dps-service
paths:
- /var/log/dps/*.log
- /var/log/dps/*.log.gz
fields:
log_source: "dps" # 关键字段,用于标识日志来源
environment: "production"
fields_under_root: true
tags: ["dps", "application"]
# ======================== Filebeat 模块(禁用避免干扰) ========================
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
# ================================= Outputs ====================================
output.logstash:
hosts: ["logstash-server-ip:5044"] # 替换为你的 Logstash 服务器 IP
# 可选:开启负载均衡,如果有多个 Logstash 节点
# loadbalance: true
# ================================== Logging ===================================
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
permissions: 0644
示例logstash:
# 例如在 /etc/logstash/conf.d/ 下创建 log_processing.conf
input {
beats {
port => 5044
}
}
# 这里也可以不过滤
filter {
# 此处可添加任意过滤解析规则,如Grok解析消息体:cite[10]
# 所有日志都会经过这里
}
output {
# 根据 log_source 字段值判断输出到哪个索引
if [log_source] == "oss" {
elasticsearch {
hosts => ["http://your-es-host:9200"] #Elasticsearch
index => "oss-logs-%{+YYYY.MM.dd}" # 定义OSS索引格式
}
} else if [log_source] == "dps" {
elasticsearch {
hosts => ["http://your-es-host:9200"]
index => "dps-logs-%{+YYYY.MM.dd}" # 定义DPS索引格式
}
}
# 可选: stdout { codec => rubydebug } # 调试时可在终端输出结果
}
# 示例2
input {
beats {
port => 5044
host => "0.0.0.0"
# 可选:增加并发处理能力
# threads => 4
}
}
filter {
# 通用处理:添加主机信息
mutate {
add_field => {
"host_ip" => "%{[host][ip]}"
"host_name" => "%{[host][name]}"
}
}
# 根据 log_source 字段进行路由和特定处理,log_source是filebeat里面设置的
if [log_source] == "oss" {
# OSS 服务日志的特定处理
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] %{LOGLEVEL:loglevel} %{GREEDYDATA:message}" }
overwrite => [ "message" ]
}
# 解析OSS特定字段(示例)
grok {
match => { "message" => "Bucket: %{NOTSPACE:bucket_name}, Object: %{NOTSPACE:object_key}, Action: %{WORD:action}" }
}
date {
match => [ "timestamp", "ISO8601" ]
remove_field => [ "timestamp" ]
}
# 为OSS日志添加特定标签
mutate {
add_tag => [ "processed_oss" ]
}
} else if [log_source] == "dps" {
# DPS 服务日志的特定处理
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:loglevel} \[%{NOTSPACE:thread}\] %{JAVACLASS:class} - %{GREEDYDATA:message}" }
overwrite => [ "message" ]
}
# 解析DPS特定字段(示例)
grok {
match => { "message" => "TransactionID: %{NOTSPACE:transaction_id}, Duration: %{NUMBER:duration}ms" }
}
date {
match => [ "timestamp", "ISO8601" ]
remove_field => [ "timestamp" ]
}
# 转换持续时间字段为数字
mutate {
convert => { "duration" => "integer" }
add_tag => [ "processed_dps" ]
}
}
# 通用后处理:移除不必要的字段
mutate {
remove_field => [ "ecs", "agent", "input", "log" ]
}
}
output {
# 根据 log_source 路由到不同的 Elasticsearch 索引
if [log_source] == "oss" {
elasticsearch {
hosts => ["http://elasticsearch-host:9200"]
index => "oss-logs-%{+YYYY.MM.dd}" # OSS 索引格式
# 可选:为OSS索引指定不同的身份验证或配置
# user => "oss_writer"
# password => "${OSS_ES_PASSWORD}"
}
} else if [log_source] == "dps" {
elasticsearch {
hosts => ["http://elasticsearch-host:9200"]
index => "dps-logs-%{+YYYY.MM.dd}" # DPS 索引格式
# 可选:为DPS索引指定不同的配置
# document_type => "dps_log"
}
}
# 调试输出(生产环境可注释掉)
stdout {
codec => rubydebug
# 可选:只在调试时开启特定标签的日志
# if "debug" in [tags]
}
# 可选:对于解析失败的日志,发送到死信队列
if "_grokparsefailure" in [tags] or "_dateparsefailure" in [tags] {
elasticsearch {
hosts => ["http://elasticsearch-host:9200"]
index => "failed-logs-%{+YYYY.MM.dd}"
}
}
}
kibana界面日志设置
(1)查看filebeat采集过来的日志
可以查看到索引日志说明成功采集过来了
(2)创建数据视图,查看日志
(3)选择创建了哪些数据视图(不同业务日志)
(4)日志检索
三.ELFK架构搭建步骤
1.搭建filebeat
(1)docker-compose 配置文件
services:
filebeat:
image: filebeat:8.11.0
container_name: filebeat
#user: root
restart: unless-stopped
volumes:
# 主配置文件:宿主机 ./filebeat.yml → 容器内 /usr/share/filebeat/filebeat.yml 只读
- ./filebeat.yml:/usr/share/filebeat/filebeat.ymlk
# 宿主机系统日志:/var/log 下 *.log 等
- /var/log:/var/log
# 业务日志目录:假设你的应用日志都放在 /app/logs
# - /app/logs:/app/logs
# 环境变量:在 filebeat.yml 里可通过 ${LOGSTASH_HOSTS} 引用
environment:
- LOGSTASH_HOSTS=logstash:5044
# 确保 logstash 服务先启动,避免 Filebeat 启动时找不到 5044 端口
#depends_on:
# - logstash
(2)服务配置文件
# 日志输入
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/vmware-network.*.log
fields:
service: vmware # 关键
env: prod
fields_under_root: true
# 添加以下配置来采集旧文件
ignore_older: 8760h # 忽略超过365天的文件(默认是24h)
close_older: 1h # 1小时后关闭文件句柄
scan_frequency: 30s # 扫描频率
- type: log
enabled: true
paths: ["/var/log/hawkey.log-*"]
fields:
service: hawkey # 关键
env: prod
fields_under_root: true
#---------------
filebeat.config:
modules:
path: ${path.config}/modules.d/*.yml #加载modules目录下的所有配置文件
reload.enabled: false # 是否开启热加载
processors: # 在日志发出去之前,给每条事件追加额外的元数据字段。
- add_cloud_metadata: ~
- add_docker_metadata: ~
# 输出到elasticsearch
#output.elasticsearch:
# hosts: '${ELASTICSEARCH_HOSTS:elasticsearch:9200}'
# username: '${ELASTICSEARCH_USERNAME:}'
# password: '${ELASTICSEARCH_PASSWORD:}'
# 输出到logsash
output.logstash:
hosts: ["192.168.88.53:5044"] # Logstash 地址/端口
loadbalance: true # 多台 Logstash 时轮询
ssl.enabled: false # 如启用 TLS 改成 true,并配证书
2.搭建logstash
(1)docker-compose配置文件
services:
logstash:
image: logstash:8.11.0
container_name: logstash
restart: unless-stopped
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
- ./logstash.yml:/usr/share/logstash/config/logstash.yml
ports:
- "5044:5044" # Beats 输入
- "5000:5000" # TCP 输入(可选)
# environment:
# - LS_JAVA_OPTS=-Xmx512m -Xms256m
# depends_on:
# - elasticsearch
(3)服务配置文件logstash.conf (业务日志)
input {
beats {
port => 5044
}
}
#
可以过滤
#
output {
elasticsearch {
hosts => ["http://192.168.88.53:9200"] # ← ES 地址
index => "%{[service]}-%{[env]}-%{+YYYY.MM.dd}"
}
# 可选:同时输出到控制台用于调试
stdout {
codec => rubydebug
}
}
(4)服务配置文件logstash.yml(监控指标)
#写监控指标
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://192.168.88.53:9200" ]
3.搭建elasticsearch
(1)docker-compose文件
services:
elasticsearch:
image: elasticsearch:8.11.0
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=true # 开启密码
- xpack.security.transport.ssl.enabled=false
# - ES_JAVA_OPTS=-Xms512m -Xmx512m
ports:
- "9200:9200"
- "9300:9300"
volumes:
- ./elasticsearch_data:/usr/share/elasticsearch/data
- ./elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
(2)elasticsearch.yml文件
cluster.name: "docker-cluster"
network.host: 0.0.0.0
xpack.security.enabled: true # 启用安全功能,密码
xpack.security.transport.ssl.enabled: false # 启用传输层SSL
http.cors.enabled: true # 如需跨域则启用
http.cors.allow-origin: "*"
http.cors.allow-headers: Authorization
(3)密码设置
(3.1)上述配置完成后,重启 Elasticsearch 容器使配置生效。
docker restart elasticsearch
(3.2)设置内置用户密码:再次进入 Elasticsearch 容器,执行以下命令为内置用户(如 elastic
、kibana_system
等)设置密码。交互式命令会提示你为每个用户设置密码:
./bin/elasticsearch-setup-passwords interactive
# 仅给 elastic 设密码
./bin/elasticsearch-users userpasswd elastic -p
输入密码:elastic_passwd@8
若以后需要其他内置账号,再跑
./bin/elasticsearch-users userpasswd <用户名>
即可单独启用。
内置账号:
elastic
(超级管理员)kibana_system
(Kibana 连接用)官方专用账号,权限最小够用(只读集群状态、写 .kibana* 索引)。logstash_system
、beats_system
、apm_system
、remote_monitoring_user(
这些是 Elasticsearch 内置专用账号,每个账号只负责把对应组件的 监控/系统数据 写入 ES,权限被严格限制,不能随意登录控制台或读写业务索引。)
4.搭建kibana
(1)docker-compose文件
services:
kibana:
image: kibana:8.11.0
container_name: kibana
ports:
- "5601:5601"
volumes:
- ./config:/usr/share/kibana/config
environment:
- "ELASTICSEARCH_HOSTS=http://192.168.88.53:9200"
- I18N_LOCALE=zh-CN # 设置中文界面
(2)config/kibana.yml配置文件
#
# ** THIS IS AN AUTO-GENERATED FILE **
#
# Default Kibana configuration for docker target
server.host: "0.0.0.0"
server.shutdownTimeout: "5s"
elasticsearch.hosts: [ "http://192.168.88.53:9200" ]
monitoring.ui.container.elasticsearch.enabled: true
#开启用户密码
xpack.security.enabled: true
#官方专用账号,权限最小够用(只读集群状态、写 .kibana* 索引)。
elasticsearch.username: "kibana_system" # 推荐使用 kibana_system 用户:cite[2]
elasticsearch.password: "elastic_passwd@8888" # 替换为 kibana_system 用户的密码
(3)搜索界面的KQL语法
(3.1)键值对:field1: value1
(3.1)and:field1: value1 (空格)field2: value2
(3.1)or:field1: value1 or field2: value2
四.EFK架构
Fluentd 或 Fluent-bit 替代 Logstash,资源占用极低(<10MB),支持插件扩展。
Fluent-bit 更适合容器环境,如 Kubernetes 中的 DaemonSet 部署
ELFK 的弊端:
ELFK 组件较多,需要配置 Logstash 处理数据
Logstash 体积大,占用资源多,不适用放在容器内运行
EFK 优势:
Fluent 整合了 Filebeat 和 Logstash 的功能
Fluent 组件更少,占用资源更小,非常适合容器部署
容器部署的优选方案