【实战ES】实战 Elasticsearch:快速上手与深度实践-5.1.1热点分片识别与均衡策略

发布于:2025-03-10 ⋅ 阅读:(19) ⋅ 点赞:(0)

👉 点击关注不迷路
👉 点击关注不迷路
👉 点击关注不迷路


5.1.1 Filebeat + Logstash + ES + Kibana 全链路配置实

  • 日志处理全链路架构与数据流向示意图
    • 数据采集(Filebeat):Filebeat 作为轻量级的日志采集器,通过安装在各个数据源所在的主机上的 Filebeat 代理来工作。它会监控指定的日志文件或目录,一旦有新的日志数据产生,就会将其收集起来,并发送给 Logstash 进行进一步处理。
    • 数据处理(Logstash):Logstash 接收来自 Filebeat 的日志数据,通过配置的过滤器(Filter)对数据进行清洗、解析和转换等操作。例如,过滤器可以提取日志中的关键信息(如时间戳、日志级别、消息内容等),并将其转换为结构化的数据格式。
    • 数据存储(Elasticsearch):Elasticsearch 是一个分布式的搜索引擎和数据存储系统,它接收来自 Logstash 的结构化日志数据,并将其存储在索引中。
    • 数据可视化(Kibana):Kibana 是 Elastic 生态系统中的数据可视化工具,它连接到 Elasticsearch 集群,从存储的日志数据中提取信息,并以各种可视化的方式展示出来,如柱状图、折线图、饼图、仪表盘等。用户可以通过 Kibana 对日志数据进行交互式分析,快速发现问题、趋势和模式
数据可视化
数据存储
数据处理
数据采集
数据源
Kibana 仪表盘
Kibana
Elasticsearch 集群
Elasticsearch
Logstash 过滤器
Logstash 转换器
Logstash
Filebeat 代理
Filebeat
系统日志文件
应用程序日志
网络设备日志
数据源

1. 架构设计与组件选型

1.1 技术栈对比分析

  • EPS,代表着 Elasticsearch 集群每秒能够处理的事件数量。20000 EPS 意味着系统具有强大的并发处理能力,能够在每秒内接收、解析、索引和存储大量的事件数据,展示了系统应对高流量数据输入的潜力
  • QPS,每秒查询率(Queries Per Second)。
组件 推荐版本 核心功能 性能基准(单节点)
Filebeat 8.12.2 轻量级日志采集 20,000 EPS(事件/秒)
Logstash 8.12.1 数据清洗与富化 15,000 EPS(含复杂处理)
Elasticsearch 8.12.0 分布式存储与检索 50,000 写入QPS
Kibana 8.12.0 可视化与分析 支持100并发查询

1.2 硬件配置推荐

节点类型 CPU 内存 存储 网络 数量
Filebeat 4核 8GB 100GB SSD 1Gbps 按需
Logstash 16核 32GB 500GB NVMe(临时存储) 10Gbps 3
Elasticsearch 32核 128GB 4TB NVMe x3(RAID0) 25Gbps 5
Kibana 8核 16GB 200GB SSD 1Gbps 2

2. Filebeat 高级配置

2.1 多输入源配置

# 定义 Filebeat 的输入配置,可包含多个不同类型的输入源
filebeat.inputs:
  # 第一个输入配置,类型为 filestream,用于收集文件流数据
  - type: filestream
    # 为该输入配置指定一个唯一的 ID,方便后续管理和识别
    id: nginx-access
    # 指定要收集的文件路径,可以使用通配符
    # 这里表示收集 /var/log/nginx/ 目录下所有以 access.log 开头的文件
    paths:
      - /var/log/nginx/access.log*
    # 为收集到的日志数据添加自定义字段
    # 这里添加了 log_type 字段,值为 "nginx_access",方便后续分析和过滤
    fields:
      log_type: "nginx_access"
    # 定义解析器,用于解析收集到的数据
    # 这里使用 ndjson 解析器,~ 表示使用默认配置
    parsers:
      - ndjson: ~

  # 第二个输入配置,类型为 container,用于收集容器日志
  - type: container
    # 为该输入配置指定一个唯一的 ID
    id: docker-logs
    # 指定要收集的容器日志文件路径
    # 这里表示收集 /var/lib/docker/containers/ 目录下所有容器的日志文件
    paths:
      - '/var/lib/docker/containers/*/*.log'
    # 定义处理器,用于对收集到的数据进行预处理
    # 这里使用 add_docker_metadata 处理器,~ 表示使用默认配置
    # 该处理器会为日志数据添加 Docker 容器的元数据,如容器 ID、名称等
    processors:
      - add_docker_metadata: ~

  # 第三个输入配置,类型为 syslog,用于通过 UDP 协议收集 Syslog 数据
  - type: syslog
    # 配置 Syslog 的 UDP 协议相关参数
    protocol.udp:
      # 指定监听的主机和端口
      # 0.0.0.0 表示监听所有可用的网络接口,端口为 5140
      host: "0.0.0.0:5140"
    # 为收集到的日志数据添加标签
    # 这里添加了 "syslog" 标签,方便后续筛选和分类
    tags: ["syslog"]

2.2 性能优化参数

# 配置 Filebeat 的内存队列相关参数
queue.mem:
  # 定义内存队列中可以存储的最大事件数量
  # 这里设置为 4096,表示内存队列最多能容纳 4096 个事件
  # 当队列中的事件数量达到此上限时,新的事件可能会被阻塞或丢弃(取决于具体策略)
  events: 4096
  # 触发内存队列刷新的最小事件数量
  # 当队列中的事件数量达到 1024 个时,Filebeat 会尝试将这些事件刷新到输出端
  flush.min_events: 1024
  # 内存队列刷新的超时时间
  # 即使队列中的事件数量未达到 flush.min_events 的设定值,每经过 5 秒,Filebeat 也会将队列中的事件刷新到输出端
  flush.timeout: 5s

# 配置 Filebeat 的输出,将收集到的事件发送到 Logstash
output.logstash:
  # 指定 Logstash 服务的主机和端口信息
  # "logstash-prod:5044" 表示 Logstash 服务所在的主机名为 logstash-prod,端口为 5044
  hosts: ["logstash-prod:5044"]
  # 设置并行工作线程的数量
  # 这里设置为 8,表示 Filebeat 会使用 8 个并行线程将事件发送到 Logstash,以提高发送效率
  worker: 8
  # 设置数据传输时的压缩级别
  # 取值范围通常是 0 - 9,0 表示不压缩,9 表示最高压缩比
  # 这里设置为 3,是一个在压缩率和性能之间的平衡选择,能在一定程度上减少网络传输的数据量
  compression_level: 3
  # 启用负载均衡功能
  # 当 hosts 配置中有多个 Logstash 实例时,Filebeat 会自动在这些实例之间进行负载均衡,避免某个实例负载过高
  loadbalance: true

# 配置 Filebeat 的日志记录级别
# 设置为 warning 表示 Filebeat 只会记录警告级别及以上的日志信息
# 这样可以减少日志文件的大小,只关注可能影响系统正常运行的重要信息
logging.level: warning

3. Logstash 数据处理管道

3.1 多阶段处理流程

  • ruby代码实现
# 定义 Logstash 的输入部分,用于接收数据
input {
  # 使用 beats 输入插件,用于接收来自 Filebeat 等 Beats 系列工具发送的数据
  beats {
    # 指定监听的端口,Filebeat 会将数据发送到这个端口
    port => 5044
    # 启用 SSL 加密,确保数据在传输过程中的安全性
    ssl => true
    # 指定 SSL 证书的路径,用于 SSL 加密通信
    ssl_certificate => "/etc/pki/tls/certs/logstash.crt"
    # 指定 SSL 私钥的路径,与证书配合完成 SSL 加密
    ssl_key => "/etc/pki/tls/private/logstash.key"
  }
}

# 定义 Logstash 的过滤部分,用于对输入的数据进行处理和转换
filter {
  # 针对 NGINX 访问日志进行解析
  # 检查数据中的 [fields][log_type] 字段是否为 "nginx_access"
  if [fields][log_type] == "nginx_access" {
    # 使用 grok 过滤器,它可以根据正则表达式模式从日志消息中提取字段
    grok {
      # 定义匹配模式,用于解析 NGINX 访问日志的每一行
      # 将不同的部分提取到对应的字段中,如客户端 IP、请求方法、响应状态码等
      match => { "message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} %{NUMBER:bytes} "%{DATA:referrer}" "%{DATA:useragent}"' }
    }
    # 使用 date 过滤器,将时间戳字符串转换为 Logstash 内部的日期对象
    date {
      # 指定时间戳字段和对应的日期格式,用于解析时间
      match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
    # 使用 useragent 过滤器,解析用户代理字符串
    # 提取浏览器、操作系统等信息到新的 "ua" 字段中
    useragent {
      source => "useragent"
      target => "ua"
    }
  }

  # 通用字段处理部分,对所有接收到的数据都进行操作
  mutate {
    # 将 "response" 和 "bytes" 字段的数据类型转换为整数类型
    convert => {
      "response" => "integer"
      "bytes" => "integer"
    }
    # 从数据中移除 "message" 字段,因为它的信息可能已经被解析到其他字段中
    remove_field => ["message"]
  }
}

# 定义 Logstash 的输出部分,用于将处理后的数据发送到目标位置
output {
  # 使用 elasticsearch 输出插件,将数据发送到 Elasticsearch 集群
  elasticsearch {
    # 指定 Elasticsearch 集群的节点地址
    hosts => ["es - node1:9200","es - node2:9200"]
    # 定义索引名称,采用动态索引的方式,根据日期每天创建一个新的索引
    index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
    # 指定索引模板的路径,用于在 Elasticsearch 中创建索引时的初始配置
    template => "/etc/logstash/templates/nginx - template.json"
    # 指定索引模板的名称
    template_name => "nginx - access"
    # 启用索引生命周期管理(ILM),可以自动管理索引的生命周期,如滚动、删除等
    ilm_enabled => true
  }

  # 调试输出部分,在生产环境中建议关闭
  # 将处理后的数据以易读的 Ruby 调试格式输出到标准输出
  stdout {
    codec => rubydebug { metadata => true }
  }
}

3.2 Grok性能优化表

  • Grok是一个非常实用的工具,通常结合 Logstash 一起使用,用于解析非结构化的日志数据并将其转换为结构化的数据,以便于在 Elasticsearch 中进行索引、搜索和分析。
    • 是一种基于正则表达式的模式匹配语言。
    • 常用预定义模式
      • %{IPORHOST}:用于匹配 IP 地址或主机名。
      • %{USER}:匹配用户名。
      • %{HTTPDATE}:匹配 HTTP 日期格式,如 01/Jan/2024:12:00:00 +0800。
      • %{WORD}:匹配一个单词。
      • %{NUMBER}:匹配一个数字。
模式复杂度 原始性能(事件/秒) 优化策略 优化后性能 提升幅度
简单模式 12,000 预编译正则表达式 15,000 25%↑
中等模式 8,500 使用Oniguruma引擎 11,200 32%↑
复杂模式 3,200 模式分解+条件判断 5,800 81%↑

4. Elasticsearch 索引设计

4.1 ILM生命周期策略

PUT _ilm/policy/logs-policy
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_size": "50gb",
            "max_age": "7d"
          },
          "set_priority": {
            "priority": 100
          }
        }
      },
      "warm": {
        "min_age": "7d",
        "actions": {
          "forcemerge": {
            "max_num_segments": 1
          },
          "shrink": {
            "number_of_shards": 1
          }
        }
      },
      "delete": {
        "min_age": "30d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

4.2 索引模板配置

// 创建一个名为 logs-template 的索引模板
PUT _index_template/logs-template
{
    // 定义索引模板匹配的索引名称模式
    // 这里表示该模板将应用于所有以 "logs-" 开头的索引
    "index_patterns": ["logs-*"],
    // 定义当创建匹配此模板的索引时所使用的设置和映射
    "template": {
        // 索引的设置部分,包含一些与索引性能、存储、生命周期等相关的参数
        "settings": {
            // 指定索引的主分片数量为 3
            // 主分片用于存储索引数据,多个主分片可以实现数据的分布式存储和并行处理
            "number_of_shards": 3,
            // 指定每个主分片的副本分片数量为 1
            // 副本分片是主分片的复制,用于提高数据的可用性和容错性
            "number_of_replicas": 1,
            // 指定该索引将使用名为 "logs-policy" 的索引生命周期管理策略
            // 索引生命周期管理可以自动管理索引的各个阶段,如热、温、冷、删除等
            "index.lifecycle.name": "logs-policy",
            // 指定索引使用的压缩编解码器为 "best_compression"
            // 该编解码器会以较高的压缩率对索引数据进行压缩,以节省磁盘空间,但可能会增加一定的 CPU 开销
            "index.codec": "best_compression"
        },
        // 索引的映射部分,定义了索引中字段的类型和结构
        "mappings": {
            // 设置动态映射规则为 "strict"
            // 这意味着只有在映射中显式定义的字段才能被索引,新字段不会自动添加到映射中
            "dynamic": "strict",
            // 定义索引中各个字段的具体属性
            "properties": {
                // 定义 "@timestamp" 字段的类型为日期类型
                // 该字段通常用于存储日志事件的时间戳,方便进行时间范围的查询和分析
                "@timestamp": { "type": "date" },
                // 定义 "message" 字段的类型为文本类型
                // 文本类型适用于存储较长的文本内容,支持全文搜索
                "message": { "type": "text" },
                // 定义 "response" 字段的类型为整数类型
                // 该字段可能用于存储响应状态码等整数值
                "response": { "type": "integer" },
                // 定义 "geoip" 字段为对象类型
                // 对象类型可以包含多个子字段,用于组织相关的信息
                "geoip": {
                    "type": "object",
                    // 定义 "geoip" 对象中的子字段
                    "properties": {
                        // 定义 "location" 字段的类型为地理点类型
                        // 地理点类型用于存储地理位置信息,如经纬度,方便进行地理空间查询
                        "location": { "type": "geo_point" }
                    }
                }
            }
        }
    }
}

5. Kibana 可视化实战

5.1 仪表板配置要点

// 向 Kibana 的 API 发起 POST 请求,用于创建一个新的仪表盘(dashboard)对象
POST /api/saved_objects/dashboard
{
    // 定义仪表盘对象的属性
    "attributes": {
        // 仪表盘的标题,这里设置为 "Nginx访问监控"
        "title": "Nginx访问监控",
        // 仪表盘的描述信息,说明该仪表盘用于实时流量分析
        "description": "实时流量分析仪表板",
        // 仪表盘上的面板配置,包含多个可视化面板
        "panelsJSON": [
            {
                // 第一个面板的类型为时间序列图(timeseries)
                "type": "timeseries",
                // 该面板的标题为 "请求量趋势"
                "title": "请求量趋势",
                // 该面板的参数配置
                "params": {
                    // 指定要查询的索引模式,这里表示查询所有以 "logs-nginx-" 开头的索引
                    "index": "logs-nginx-*",
                    // 指定时间字段,用于按照时间进行数据聚合和展示,这里使用 "@timestamp" 字段
                    "time_field": "@timestamp",
                    // 指定时间间隔为 1 小时,即按每小时进行数据聚合
                    "interval": "1h",
                    // 时间序列图的系列配置
                    "series": [
                        {
                            // 系列的名称为 "总请求量"
                            "name": "总请求量",
                            // 聚合方式为计数,即统计每个时间间隔内的请求数量
                            "aggregation": "count"
                        }
                    ]
                }
            },
            {
                // 第二个面板的类型为饼图(pie)
                "type": "pie",
                // 该面板的标题为 "HTTP状态码分布"
                "title": "HTTP状态码分布",
                // 该面板的参数配置
                "params": {
                    // 指定要查询的索引模式,同样查询所有以 "logs-nginx-" 开头的索引
                    "index": "logs-nginx-*",
                    // 分割模式为按词项(terms)分割,即根据某个字段的值进行分组
                    "split_mode": "terms",
                    // 指定用于分组的字段为 "response",通常这个字段存储的是 HTTP 状态码
                    "terms_field": "response",
                    // 显示前 5 个分组的结果,即只展示出现次数最多的 5 个 HTTP 状态码的分布情况
                    "size": 5
                }
            }
        ]
    }
}

5.2 关键可视化类型对比

图表类型 适用场景 性能影响 数据精度 交互性
时间序列图 流量趋势分析
热力图 异常检测
地理地图 IP分布分析
数据表 原始日志查看 最高

6. 全链路监控与调优

6.1 性能监控指标

组件 关键指标 健康阈值 告警阈值 监控工具
Filebeat Harvester活跃数 < 1000 > 2000 Metricbeat
Logstash Pipeline延迟 < 500ms > 2s Prometheus
Elasticsearch 索引延迟 < 1s > 5s Elastic监控
Kibana 查询响应时间 < 3s > 10s APM
  • Harvester 是 Filebeat 中负责实际读取文件内容的核心组件
    • Harvester 是 Filebeat 里的一个文件读取器,当 Filebeat 监测到有符合采集规则的文件时,会为每个文件启动一个 Harvester 实例。
    • 它的主要任务是逐行读取文件内容,将读取到的行封装成事件,然后发送给 Filebeat 的 spooler 进行后续处理。
  • APM(Application Performance Monitoring)
    • 用于对应用程序的性能进行监控和分析。
    • 功能特点: 性能指标监控、分布式追踪、错误分析、用户体验监控。

6.2 典型瓶颈解决方案

  • 场景:日志处理延迟突增

    1. 问题定位
    # 查看Logstash节点状态
    GET _nodes/stats/pipelines?filter_path=nodes.*.pipelines
    
    # 检查热点线程
    GET _nodes/hot_threads
    
    1. 优化策略
     # 管道配置用于控制 Logstash 处理事件的方式和效率
     pipeline:
       # 批处理相关配置,影响 Logstash 一次处理的事件数量和处理时间间隔
       batch:
         # 每次批处理的事件数量
         # 原值为 250,现调整为 125
         # 减小批量大小可能会降低每次处理的数据量,使处理更加灵活,适用于事件产生速度不均匀或者对实时性要求较高的场景
         size: 125
         # 批处理的延迟时间(单位:毫秒)
         # 原值为 100,现调整为 50
         # 缩短延迟时间可以让 Logstash 更快地处理事件,提高数据处理的实时性
         delay: 50
       # 工作线程数量
       # 原值为 4,现调整为 8
       # 增加工作线程数量可以提高 Logstash 的并发处理能力,加快事件处理速度,但同时也会增加系统资源的消耗
       workers: 8
       # 是否按顺序处理事件
       # 设置为 false 表示不按顺序处理,这样可以提高处理效率,但可能会导致事件处理的顺序与输入顺序不一致
       # 对于对事件顺序要求不高的场景,关闭顺序处理可以提升性能
       ordered: false
    
     # 以下是对 Logstash 运行时所使用的 Java 虚拟机(JVM)堆内存进行配置
     # LS_JAVA_OPTS 是一个环境变量,用于设置 JVM 的启动参数
     LS_JAVA_OPTS: "-Xms8g -Xmx8g"
     # -Xms8g 表示 JVM 堆内存的初始大小为 8GB
     # -Xmx8g 表示 JVM 堆内存的最大大小为 8GB
     # 增加堆内存可以让 Logstash 有更多的内存空间来处理大量的数据,减少因内存不足导致的性能问题和错误
     # 但同时也需要确保系统有足够的物理内存支持,否则可能会导致系统性能下降或出现内存溢出错误
    
    • LS_JAVA_OPTS 通常是指与 Logstash(LS 可能是 Logstash 的缩写)相关的 Java 虚拟机(JVM)选项配置变量。Logstash 是基于 Java 开发的开源数据处理引擎,用于采集、处理和转发数据。
    • 通过合理配置 LS_JAVA_OPTS,可以优化 Logstash 的性能、提高稳定性,并满足不同的运行需求和场景。
    1. 优化效果
    指标 优化前 优化后 提升比例
    处理延迟 4.2s 0.8s 81%↓
    CPU使用率 95% 68% 28%↓
    吞吐量 8K EPS 14K EPS 75%↑

7. 安全加固方案

7.1 传输层加密配置

# Filebeat配置部分,用于设置 Filebeat 输出数据的目标及相关安全配置
output.logstash:
  # 指定 Logstash 服务的主机和端口,这里表示将数据发送到名为 "logstash" 的主机的 5044 端口
  hosts: ["logstash:5044"]
  # SSL 相关配置,用于启用和配置 Filebeat 与 Logstash 之间的 SSL 加密通信
  ssl:
    # 启用 SSL 加密,设置为 true 表示开启
    enabled: true
    # 证书颁发机构(CA)证书路径,用于验证 Logstash 服务器证书的合法性
    # Filebeat 使用该 CA 证书来确认它连接的 Logstash 服务器是受信任的
    certificate_authorities: ["/etc/pki/ca.crt"]
    # Filebeat 客户端证书路径,用于向 Logstash 服务器进行身份验证
    # Logstash 可以使用该证书来确认连接的客户端是合法的
    certificate: "/etc/pki/client.crt"
    # Filebeat 客户端私钥路径,与客户端证书配合使用,用于加密和解密通信数据
    key: "/etc/pki/client.key"

# Elasticsearch配置部分,主要涉及 X-Pack 安全模块中传输层的 SSL 配置
xpack.security.transport.ssl:
  # 启用传输层的 SSL 加密,设置为 true 表示开启
  enabled: true
  # 证书验证模式,这里设置为 "certificate" 表示进行证书验证
  # 这意味着 Elasticsearch 会验证连接的对等方(如其他节点或客户端)的证书
  verification_mode: certificate
  # 密钥库路径,密钥库包含了 Elasticsearch 用于 SSL 通信的私钥和证书
  # "certs/elastic-certificates.p12" 是存储密钥和证书的文件路径
  keystore.path: certs/elastic-certificates.p12
  # 信任库路径,信任库包含了 Elasticsearch 信任的证书颁发机构的证书
  # 这里信任库路径与密钥库路径相同,表明使用相同的证书文件来验证对等方的证书
  truststore.path: certs/elastic-certificates.p12

7.2 权限控制矩阵

角色 数据访问范围 操作权限 适用场景
log_viewer logs-* read, view_index_metadata 普通运维人员
log_admin logs-* manage, create_index 系统管理员
alert_manager .kibana-event-log-* read, index 监控告警系统
report_user 特定索引模式 read 审计与报表生成

8. 实战案例:电商大促日志监控

8.1 场景参数

{
  "业务场景": "双11大促监控",
  "日志规模": {
    "峰值QPS": "120,000 EPS",
    "单日数据量": "8TB",
    "保留周期": "30天"
  },
  "架构特性": [
    "自动扩缩容",
    "多级缓存",
    "实时异常检测"
  ]
}

8.2 性能测试结果

测试阶段 写入延迟(p95) 查询响应时间 系统可用性
预热阶段 23ms 280ms 100%
峰值压力 89ms 1.2s 99.98%
故障恢复 自动切换5.8s - 99.95%
  • p95
    • P95 指的是第 95 百分位数。它是一种统计指标,将一组数据从小到大排序后,处于第 95% 位置的值就是 P95。
    • 例如,有 100 个数据,将它们按从小到大排列,第 95 个数据的值就是 P95。在性能测试和数据分析中,P95 常用于衡量数据的分布和性能表现

附录:常用诊断命令速查表

功能 Filebeat命令 ES API端点
查看采集状态 filebeat test output GET _cat/indices?v
检查管道状态 journalctl -u filebeat GET _nodes/stats/ingest
监控队列积压 filebeat -e -d "*" GET _cat/thread_pool?v
验证配置语法 filebeat test config GET _cluster/pending_tasks

最佳实践建议

  1. 建议采用「先索引模板后数据写入」的流程
  2. 日志类数据优先使用时间序列索引模式
  3. 定期执行_forcemerge优化存储空间
  4. 重要操作需通过变更管理系统审批