基于 Fluent-Bit 和 Fluentd 的分布式日志采集与处理方案

发布于:2025-04-02 ⋅ 阅读:(14) ⋅ 点赞:(0)

#作者:任少近

需求描述

Fluent Bit 将日志传输到 Fluentd,Fluentd 再将日志写入 Kafka
背景: 随着系统日志量的增加,尤其是在微服务架构下,日志的收集、处理和传输变得愈加复杂。为了实现高效的日志收集和处理,需要一个可靠且可扩展的日志管道。Fluent Bit、Fluentd 和 Kafka 的结合可以为这一需求提供强大的支持。

系统目标

Fluent Bit 作为日志收集器,从不同来源(如容器、应用程序、系统日志文件等)收集日志。
将收集到的日志实时转发到 Fluentd,Fluentd 对日志进行进一步的处理(如解析、过滤、增强等)。
Fluentd 将处理后的日志通过 Kafka 传输,Kafka 作为消息队列,提供日志的高吞吐量传输和存储。

系统组件

Fluent Bit

功能:负责日志的采集和转发,能够高效地从各种日志源收集日志数据,并将其发送到 Fluentd。
特点:轻量级、高效、低资源消耗,适用于边缘设备和容器环境。

Fluentd

功能:接收来自 Fluent Bit 的日志数据,对日志进行进一步的处理,如过滤、格式转换、增强等。
特点:支持丰富的插件生态系统,能够灵活地扩展和配置,适用于复杂的日志处理和存储需求。

Kafka

功能:作为日志数据的消息队列,提供高吞吐量、可靠的日志传输机制。Fluentd 将日志数据发送到 Kafka,Kafka 作为缓冲区存储和传递日志数据,确保日志的可靠性和可扩展性。
特点:高吞吐量、可扩展、容错能力强。

数据流与处理流程

日志采集

Fluent Bit 部署在日志源所在的节点或容器中,实时监控指定的日志文件(如 /var/log/test.log 等)。
Fluent Bit 使用 tail 插件采集日志,并将其转换为指定的格式(如 JSON)。

日志转发到 Fluentd

Fluent Bit 使用 forward 输出插件将日志数据转发到 Fluentd,通过指定 Fluentd 的 IP 地址和端口进行连接。

日志处理与转发到 Kafka

Fluentd 接收到日志后,可以进行各种处理(如过滤、解析、增强、格式转换等)。
处理后的日志通过 Kafka 输出插件将日志发送到指定的 Kafka 集群。
Kafka 将日志存储在其主题中,以便进行后续的分析、查询和处理。

Kafka 作为消息队列

Kafka 将日志数据持久化到其分区中,提供可靠的消息存储和高吞吐量的数据传输。
消费者 可以从 Kafka 中读取数据进行进一步处理或存储到其他系统(如 Elasticsearch、OpenSearch、数据库等)。

具体配置

Fluent-Bit的CM配置

 fluent-bit.conf: |
    [SERVICE]
        Flush 1
        Parsers_File parsers.conf
        HTTP_Server  On
        HTTP_Listen  0.0.0.0
        HTTP_PORT    3302
    [INPUT]
        Name         tail
        Tag          regex-fluent
        DB           /var/log/regex-fluent.db
        Read_from_Head true
        Path  /var/log/test.log
        Path_Key  pod_log_path
    [OUTPUT]
        Name        forward
        Match       *
        Host        fluentd-service.logging.svc
        Port        24224
        Retry_Limit 5

Fluent-Bit的DS配置

注意:日志目录的挂载

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluent-bit
  namespace: logging
  labels:
    k8s-app: fluent-bit-logging
    version: v1
    kubernetes.io/cluster-service: "true"
spec:
  selector:
    matchLabels:
      k8s-app: fluent-bit-logging
  template:
    metadata:
      labels:
        k8s-app: fluent-bit-logging
        version: v1
        kubernetes.io/cluster-service: "true"
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "2020"
        prometheus.io/path: /api/v1/metrics/prometheus
    spec:
      nodeSelector:
        zk-app: app
      containers:
      - name: fluent-bit
        image: registry.cn-hangzhou.aliyuncs.com/ali_cloud_images/fluent-bit:1.9
        imagePullPolicy: Never
        ports:
          - containerPort: 2020
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: fluent-bit-config
          mountPath: /fluent-bit/etc/
        - name: db
          mountPath: /tail-db/
      terminationGracePeriodSeconds: 10
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: fluent-bit-config
        configMap:
          name: fluent-bit-config
      - name: db
        hostPath:
          path: /home/chb/hundun/fluent-bit
          type: Directory
      serviceAccountName: fluent-bit
      tolerations:
      - key: node-role.kubernetes.io/master
        operator: Exists
        effect: NoSchedule
      - operator: "Exists"
        effect: "NoExecute"
      - operator: "Exists"
        effect: "NoSchedule"

Fluentd的CM配置

 fluent.conf: |-
    <source>
      @type forward
      port 24224
      bind 0.0.0.0
      tag test
    </source>
    <match test>
      @type kafka2
      brokers 192.168.123.100:9092  # Kafka broker 地址
      topic fluentd_topic
      <format>
       @type json
      </format>
    </match>

Fluentd的DS配置

注意:端口的暴露

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd-kafka
  namespace: logging
  labels:
    k8s-app: fluentd-kafka
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-kafka
  template:
    metadata:
      labels:
        k8s-app: fluentd-kafka
        kubernetes.io/cluster-service: "true"
      # 此注释确保如果节点被驱逐,fluentd不会被驱逐,支持关键的基于 pod 注释的优先级方案。
      annotations:
        scheduler.alpha.kubernetes.io/critical-pod: ''
    spec:
      initContainers:
        - name: init-permission
          image: busybox
          imagePullPolicy: Never
          command: ["sh", "-c", "mkdir -p /var/log/td-agent && chown -R 1000:1000 /var/log/td-agent"]
      nodeSelector:
        zk-app: app
      serviceAccountName: fluentd-kafka
      containers:
      - name: fluentd-kafka
        image: fluentd-kafka:latest
        imagePullPolicy: Never
        securityContext:
          runAsUser: 0
        ports: 
          - containerPort: 24224
            name: forward-port
        volumeMounts:
          - name: fluentd-config-volume
            mountPath: /opt/bitnami/fluentd/conf/fluentd.conf
            subPath: fluent.conf
          - name: varlog
            mountPath: /var/log
          - name: pos
            mountPath: /var/log/td-agent
      volumes:
      - name: fluentd-config-volume
        configMap:
          name: fluentd-config
      - name: varlog
        hostPath:
          path: /home/chb/test
      - name: pos
        hostPath:
          path: /var/log/td-agent
      imagePullSecrets:
        - name: default-secret
      tolerations:
      - operator: Exists
      terminationGracePeriodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
  name: fluentd-service
  namespace: logging
spec:
  selector:
    k8s-app: fluentd-kafka
  ports:
    - protocol: TCP
      port: 24224
      targetPort: 24224
  clusterIP: None

Kafka查询结果

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到