#作者:任少近
文章目录
需求描述
Fluent Bit 将日志传输到 Fluentd,Fluentd 再将日志写入 Kafka
背景: 随着系统日志量的增加,尤其是在微服务架构下,日志的收集、处理和传输变得愈加复杂。为了实现高效的日志收集和处理,需要一个可靠且可扩展的日志管道。Fluent Bit、Fluentd 和 Kafka 的结合可以为这一需求提供强大的支持。
系统目标
Fluent Bit 作为日志收集器,从不同来源(如容器、应用程序、系统日志文件等)收集日志。
将收集到的日志实时转发到 Fluentd,Fluentd 对日志进行进一步的处理(如解析、过滤、增强等)。
Fluentd 将处理后的日志通过 Kafka 传输,Kafka 作为消息队列,提供日志的高吞吐量传输和存储。
系统组件
Fluent Bit
功能:负责日志的采集和转发,能够高效地从各种日志源收集日志数据,并将其发送到 Fluentd。
特点:轻量级、高效、低资源消耗,适用于边缘设备和容器环境。
Fluentd
功能:接收来自 Fluent Bit 的日志数据,对日志进行进一步的处理,如过滤、格式转换、增强等。
特点:支持丰富的插件生态系统,能够灵活地扩展和配置,适用于复杂的日志处理和存储需求。
Kafka
功能:作为日志数据的消息队列,提供高吞吐量、可靠的日志传输机制。Fluentd 将日志数据发送到 Kafka,Kafka 作为缓冲区存储和传递日志数据,确保日志的可靠性和可扩展性。
特点:高吞吐量、可扩展、容错能力强。
数据流与处理流程
日志采集
Fluent Bit 部署在日志源所在的节点或容器中,实时监控指定的日志文件(如 /var/log/test.log 等)。
Fluent Bit 使用 tail 插件采集日志,并将其转换为指定的格式(如 JSON)。
日志转发到 Fluentd
Fluent Bit 使用 forward 输出插件将日志数据转发到 Fluentd,通过指定 Fluentd 的 IP 地址和端口进行连接。
日志处理与转发到 Kafka
Fluentd 接收到日志后,可以进行各种处理(如过滤、解析、增强、格式转换等)。
处理后的日志通过 Kafka 输出插件将日志发送到指定的 Kafka 集群。
Kafka 将日志存储在其主题中,以便进行后续的分析、查询和处理。
Kafka 作为消息队列
Kafka 将日志数据持久化到其分区中,提供可靠的消息存储和高吞吐量的数据传输。
消费者 可以从 Kafka 中读取数据进行进一步处理或存储到其他系统(如 Elasticsearch、OpenSearch、数据库等)。
具体配置
Fluent-Bit的CM配置
fluent-bit.conf: |
[SERVICE]
Flush 1
Parsers_File parsers.conf
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_PORT 3302
[INPUT]
Name tail
Tag regex-fluent
DB /var/log/regex-fluent.db
Read_from_Head true
Path /var/log/test.log
Path_Key pod_log_path
[OUTPUT]
Name forward
Match *
Host fluentd-service.logging.svc
Port 24224
Retry_Limit 5
Fluent-Bit的DS配置
注意:日志目录的挂载
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluent-bit
namespace: logging
labels:
k8s-app: fluent-bit-logging
version: v1
kubernetes.io/cluster-service: "true"
spec:
selector:
matchLabels:
k8s-app: fluent-bit-logging
template:
metadata:
labels:
k8s-app: fluent-bit-logging
version: v1
kubernetes.io/cluster-service: "true"
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "2020"
prometheus.io/path: /api/v1/metrics/prometheus
spec:
nodeSelector:
zk-app: app
containers:
- name: fluent-bit
image: registry.cn-hangzhou.aliyuncs.com/ali_cloud_images/fluent-bit:1.9
imagePullPolicy: Never
ports:
- containerPort: 2020
volumeMounts:
- name: varlog
mountPath: /var/log
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: fluent-bit-config
mountPath: /fluent-bit/etc/
- name: db
mountPath: /tail-db/
terminationGracePeriodSeconds: 10
volumes:
- name: varlog
hostPath:
path: /var/log
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: fluent-bit-config
configMap:
name: fluent-bit-config
- name: db
hostPath:
path: /home/chb/hundun/fluent-bit
type: Directory
serviceAccountName: fluent-bit
tolerations:
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
- operator: "Exists"
effect: "NoExecute"
- operator: "Exists"
effect: "NoSchedule"
Fluentd的CM配置
fluent.conf: |-
<source>
@type forward
port 24224
bind 0.0.0.0
tag test
</source>
<match test>
@type kafka2
brokers 192.168.123.100:9092 # Kafka broker 地址
topic fluentd_topic
<format>
@type json
</format>
</match>
Fluentd的DS配置
注意:端口的暴露
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd-kafka
namespace: logging
labels:
k8s-app: fluentd-kafka
kubernetes.io/cluster-service: "true"
addonmanager.kubernetes.io/mode: Reconcile
spec:
selector:
matchLabels:
k8s-app: fluentd-kafka
template:
metadata:
labels:
k8s-app: fluentd-kafka
kubernetes.io/cluster-service: "true"
# 此注释确保如果节点被驱逐,fluentd不会被驱逐,支持关键的基于 pod 注释的优先级方案。
annotations:
scheduler.alpha.kubernetes.io/critical-pod: ''
spec:
initContainers:
- name: init-permission
image: busybox
imagePullPolicy: Never
command: ["sh", "-c", "mkdir -p /var/log/td-agent && chown -R 1000:1000 /var/log/td-agent"]
nodeSelector:
zk-app: app
serviceAccountName: fluentd-kafka
containers:
- name: fluentd-kafka
image: fluentd-kafka:latest
imagePullPolicy: Never
securityContext:
runAsUser: 0
ports:
- containerPort: 24224
name: forward-port
volumeMounts:
- name: fluentd-config-volume
mountPath: /opt/bitnami/fluentd/conf/fluentd.conf
subPath: fluent.conf
- name: varlog
mountPath: /var/log
- name: pos
mountPath: /var/log/td-agent
volumes:
- name: fluentd-config-volume
configMap:
name: fluentd-config
- name: varlog
hostPath:
path: /home/chb/test
- name: pos
hostPath:
path: /var/log/td-agent
imagePullSecrets:
- name: default-secret
tolerations:
- operator: Exists
terminationGracePeriodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
name: fluentd-service
namespace: logging
spec:
selector:
k8s-app: fluentd-kafka
ports:
- protocol: TCP
port: 24224
targetPort: 24224
clusterIP: None