日志ELK、ELFK、EFK

发布于:2025-09-02 ⋅ 阅读:(13) ⋅ 点赞:(0)

一.ELK架构

Elasticsearch + Logstash + Kibana
数据库+日志处理+日志显示

1.logstash的使用

(1)input:输入

(2)filter:处理

(3)output:输出

二.ELFK架构

Filebeat-->Elasticsearch-->Logstash-->Kibana

Filebeat 部署在节点上轻量采集日志,Logstash 集中处理。

ELK 的弊端:

  • NFS数据共享会成为流量瓶颈,不适合大规模集群

  • 是否可以在每台 Web 服务器上安装 Logstash

    • Logstash占用资源多,在节点部署会争抢应用的资源

ELFK 优势:

  • Filebeat占用资源非常小,可以在所有节点部署

  • 在每台Web服务器上安装客户端,通过网络发送日志,没有单点故障,没有流量瓶颈

filebeat的配置文件

test可以设置为业务名

filebeat直接区分不同服务的日志

示例filebeat:

filebeat.inputs:
  - type: filestream
    id: oss-logs
    paths:
      - /var/oss/*.log
    fields:
      log_source: "oss"
    fields_under_root: true

  - type: filestream
    id: dps-logs
    paths:
      - /var/dps/*.log
    fields:
      log_source: "dps"
    fields_under_root: true

output.logstash: # 输出到 Logstash
  hosts: ["your-logstash-host:5044"] # 替换为 Logstash 地址



# 示例2
# ============================== Filebeat inputs ===============================
filebeat.inputs:

  # 收集 OSS 服务日志
  - type: filestream
    enabled: true
    id: oss-service
    paths:
      - /var/log/oss/*.log
      - /var/log/oss/*.log.gz
    fields:
      log_source: "oss"  # 关键字段,用于标识日志来源
      environment: "production" # 可选:添加环境标识
    fields_under_root: true # 将 fields 提升到事件顶级
    # 可选:为OSS日志添加特定标签
    tags: ["oss", "application"]

  # 收集 DPS 服务日志
  - type: filestream
    enabled: true
    id: dps-service
    paths:
      - /var/log/dps/*.log
      - /var/log/dps/*.log.gz
    fields:
      log_source: "dps"  # 关键字段,用于标识日志来源
      environment: "production"
    fields_under_root: true
    tags: ["dps", "application"]

# ======================== Filebeat 模块(禁用避免干扰) ========================
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false

# ================================= Outputs ====================================
output.logstash:
  hosts: ["logstash-server-ip:5044"]  # 替换为你的 Logstash 服务器 IP
  # 可选:开启负载均衡,如果有多个 Logstash 节点
  # loadbalance: true

# ================================== Logging ===================================
logging.level: info
logging.to_files: true
logging.files:
  path: /var/log/filebeat
  name: filebeat
  keepfiles: 7
  permissions: 0644

示例logstash:

# 例如在 /etc/logstash/conf.d/ 下创建 log_processing.conf
input {
  beats {
    port => 5044
  }
}

# 这里也可以不过滤
filter {
  # 此处可添加任意过滤解析规则,如Grok解析消息体:cite[10]
  # 所有日志都会经过这里
}

output {
  # 根据 log_source 字段值判断输出到哪个索引
  if [log_source] == "oss" {
    elasticsearch {
      hosts => ["http://your-es-host:9200"] #Elasticsearch
      index => "oss-logs-%{+YYYY.MM.dd}" # 定义OSS索引格式
    }
  } else if [log_source] == "dps" {
    elasticsearch {
      hosts => ["http://your-es-host:9200"]
      index => "dps-logs-%{+YYYY.MM.dd}" # 定义DPS索引格式
    }
  }

  # 可选: stdout { codec => rubydebug } # 调试时可在终端输出结果
}




# 示例2
input {
  beats {
    port => 5044
    host => "0.0.0.0"
    # 可选:增加并发处理能力
    # threads => 4
  }
}

filter {
  # 通用处理:添加主机信息
  mutate {
    add_field => {
      "host_ip" => "%{[host][ip]}"
      "host_name" => "%{[host][name]}"
    }
  }
  
  # 根据 log_source 字段进行路由和特定处理,log_source是filebeat里面设置的
  if [log_source] == "oss" {
    # OSS 服务日志的特定处理
    grok {
      match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] %{LOGLEVEL:loglevel} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    
    # 解析OSS特定字段(示例)
    grok {
      match => { "message" => "Bucket: %{NOTSPACE:bucket_name}, Object: %{NOTSPACE:object_key}, Action: %{WORD:action}" }
    }
    
    date {
      match => [ "timestamp", "ISO8601" ]
      remove_field => [ "timestamp" ]
    }
    
    # 为OSS日志添加特定标签
    mutate {
      add_tag => [ "processed_oss" ]
    }
    
  } else if [log_source] == "dps" {
    # DPS 服务日志的特定处理
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:loglevel} \[%{NOTSPACE:thread}\] %{JAVACLASS:class} - %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    
    # 解析DPS特定字段(示例)
    grok {
      match => { "message" => "TransactionID: %{NOTSPACE:transaction_id}, Duration: %{NUMBER:duration}ms" }
    }
    
    date {
      match => [ "timestamp", "ISO8601" ]
      remove_field => [ "timestamp" ]
    }
    
    # 转换持续时间字段为数字
    mutate {
      convert => { "duration" => "integer" }
      add_tag => [ "processed_dps" ]
    }
  }
  
  # 通用后处理:移除不必要的字段
  mutate {
    remove_field => [ "ecs", "agent", "input", "log" ]
  }
}

output {
  # 根据 log_source 路由到不同的 Elasticsearch 索引
  if [log_source] == "oss" {
    elasticsearch {
      hosts => ["http://elasticsearch-host:9200"]
      index => "oss-logs-%{+YYYY.MM.dd}"  # OSS 索引格式
      # 可选:为OSS索引指定不同的身份验证或配置
      # user => "oss_writer"
      # password => "${OSS_ES_PASSWORD}"
    }
  } else if [log_source] == "dps" {
    elasticsearch {
      hosts => ["http://elasticsearch-host:9200"]
      index => "dps-logs-%{+YYYY.MM.dd}"  # DPS 索引格式
      # 可选:为DPS索引指定不同的配置
      # document_type => "dps_log"
    }
  }
  
  # 调试输出(生产环境可注释掉)
  stdout {
    codec => rubydebug
    # 可选:只在调试时开启特定标签的日志
    # if "debug" in [tags]
  }
  
  # 可选:对于解析失败的日志,发送到死信队列
  if "_grokparsefailure" in [tags] or "_dateparsefailure" in [tags] {
    elasticsearch {
      hosts => ["http://elasticsearch-host:9200"]
      index => "failed-logs-%{+YYYY.MM.dd}"
    }
  }
}

kibana界面日志设置


(1)查看filebeat采集过来的日志

可以查看到索引日志说明成功采集过来了

(2)创建数据视图,查看日志

(3)选择创建了哪些数据视图(不同业务日志)

(4)日志检索

三.ELFK架构搭建步骤

1.搭建filebeat

(1)docker-compose 配置文件

services:
  filebeat:
    image: filebeat:8.11.0
    container_name: filebeat
    #user: root
    restart: unless-stopped
    volumes:
      # 主配置文件:宿主机 ./filebeat.yml → 容器内 /usr/share/filebeat/filebeat.yml 只读
      - ./filebeat.yml:/usr/share/filebeat/filebeat.ymlk
      # 宿主机系统日志:/var/log 下 *.log 等
      - /var/log:/var/log
      # 业务日志目录:假设你的应用日志都放在 /app/logs
     # - /app/logs:/app/logs
    # 环境变量:在 filebeat.yml 里可通过 ${LOGSTASH_HOSTS} 引用
    environment:
      - LOGSTASH_HOSTS=logstash:5044
    # 确保 logstash 服务先启动,避免 Filebeat 启动时找不到 5044 端口
    #depends_on:
    #  - logstash

(2)服务配置文件

# 日志输入
filebeat.inputs:
  - type: log
    enabled: true
    paths: 
      - /var/log/vmware-network.*.log
    fields:
      service: vmware   # 关键
      env: prod
    fields_under_root: true
        # 添加以下配置来采集旧文件
    ignore_older: 8760h  # 忽略超过365天的文件(默认是24h)
    close_older: 1h      # 1小时后关闭文件句柄
    scan_frequency: 30s  # 扫描频率
  - type: log
    enabled: true
    paths: ["/var/log/hawkey.log-*"]
    fields:
      service: hawkey   # 关键
      env: prod
    fields_under_root: true


#---------------
filebeat.config:
  modules:
    path: ${path.config}/modules.d/*.yml  #加载modules目录下的所有配置文件
    reload.enabled: false   # 是否开启热加载

processors:  # 在日志发出去之前,给每条事件追加额外的元数据字段。
  - add_cloud_metadata: ~
  - add_docker_metadata: ~


# 输出到elasticsearch
#output.elasticsearch:
#  hosts: '${ELASTICSEARCH_HOSTS:elasticsearch:9200}'
#  username: '${ELASTICSEARCH_USERNAME:}'
#  password: '${ELASTICSEARCH_PASSWORD:}'

# 输出到logsash
output.logstash:
  hosts: ["192.168.88.53:5044"]          # Logstash 地址/端口
  loadbalance: true                 # 多台 Logstash 时轮询
  ssl.enabled: false                # 如启用 TLS 改成 true,并配证书

2.搭建logstash

(1)docker-compose配置文件

services:
  logstash:
    image: logstash:8.11.0
    container_name: logstash
    restart: unless-stopped
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
      - ./logstash.yml:/usr/share/logstash/config/logstash.yml
    ports:
      - "5044:5044"   # Beats 输入
      - "5000:5000"   # TCP 输入(可选)
   # environment:
   #   - LS_JAVA_OPTS=-Xmx512m -Xms256m
   # depends_on:
   #   - elasticsearch

(3)服务配置文件logstash.conf  (业务日志)

input {
  beats {
    port => 5044
  }
}
#
可以过滤
#
output {
  elasticsearch {
    hosts => ["http://192.168.88.53:9200"]   # ← ES 地址
    index  => "%{[service]}-%{[env]}-%{+YYYY.MM.dd}"
  }
  # 可选:同时输出到控制台用于调试
  stdout {
    codec => rubydebug
  }
}

(4)服务配置文件logstash.yml(监控指标)

#写监控指标
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://192.168.88.53:9200" ]

3.搭建elasticsearch

(1)docker-compose文件

services:
 elasticsearch:
    image: elasticsearch:8.11.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=true  # 开启密码
      - xpack.security.transport.ssl.enabled=false
    #  - ES_JAVA_OPTS=-Xms512m -Xmx512m
    ports:
      - "9200:9200"
      - "9300:9300"
    volumes:
      - ./elasticsearch_data:/usr/share/elasticsearch/data
      - ./elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml

(2)elasticsearch.yml文件

cluster.name: "docker-cluster"
network.host: 0.0.0.0
xpack.security.enabled: true  # 启用安全功能,密码
xpack.security.transport.ssl.enabled: false  # 启用传输层SSL
http.cors.enabled: true  # 如需跨域则启用
http.cors.allow-origin: "*"
http.cors.allow-headers: Authorization

(3)密码设置

(3.1)上述配置完成后,重启 Elasticsearch 容器使配置生效。

docker restart elasticsearch

(3.2)设置内置用户密码:再次进入 Elasticsearch 容器,执行以下命令为内置用户(如 elastickibana_system 等)设置密码。交互式命令会提示你为每个用户设置密码:

./bin/elasticsearch-setup-passwords interactive
# 仅给 elastic 设密码
./bin/elasticsearch-users userpasswd elastic -p


输入密码:elastic_passwd@8
  • 若以后需要其他内置账号,再跑 ./bin/elasticsearch-users userpasswd <用户名> 即可单独启用。

内置账号:

  • elastic(超级管理员)

  • kibana_system(Kibana 连接用)官方专用账号,权限最小够用(只读集群状态、写 .kibana* 索引)。

  • logstash_systembeats_systemapm_systemremote_monitoring_user(这些是 Elasticsearch 内置专用账号,每个账号只负责把对应组件的 监控/系统数据 写入 ES,权限被严格限制,不能随意登录控制台或读写业务索引。

4.搭建kibana

(1)docker-compose文件

services:
  kibana:
    image: kibana:8.11.0
    container_name: kibana
    ports:
      - "5601:5601"
    volumes:
      - ./config:/usr/share/kibana/config
    environment:
      - "ELASTICSEARCH_HOSTS=http://192.168.88.53:9200"
      - I18N_LOCALE=zh-CN  # 设置中文界面

(2)config/kibana.yml配置文件

#
# ** THIS IS AN AUTO-GENERATED FILE **
#

# Default Kibana configuration for docker target
server.host: "0.0.0.0"
server.shutdownTimeout: "5s"
elasticsearch.hosts: [ "http://192.168.88.53:9200" ]
monitoring.ui.container.elasticsearch.enabled: true

#开启用户密码
xpack.security.enabled: true

#官方专用账号,权限最小够用(只读集群状态、写 .kibana* 索引)。
elasticsearch.username: "kibana_system"  # 推荐使用 kibana_system 用户:cite[2]
elasticsearch.password: "elastic_passwd@8888"  # 替换为 kibana_system 用户的密码

(3)搜索界面的KQL语法

(3.1)键值对:field1: value1

(3.1)and:field1: value1 (空格)field2: value2

(3.1)or:field1: value1 or  field2: value2

四.EFK架构


FluentdFluent-bit 替代 Logstash,资源占用极低(<10MB),支持插件扩展。

Fluent-bit 更适合容器环境,如 Kubernetes 中的 DaemonSet 部署

ELFK 的弊端:

  • ELFK 组件较多,需要配置 Logstash 处理数据

  • Logstash 体积大,占用资源多,不适用放在容器内运行

EFK 优势:

  • Fluent 整合了 Filebeat 和 Logstash 的功能

  • Fluent 组件更少,占用资源更小,非常适合容器部署

  • 容器部署的优选方案


网站公告

今日签到

点亮在社区的每一天
去签到