K8S部署ELK(二):部署Kafka消息队列

发布于:2025-08-03 ⋅ 阅读:(16) ⋅ 点赞:(0)

目录

1. Kafka 简介

1.1 Kafka 核心概念

(1)消息系统 vs. 流处理平台

(2)核心组件

1.2 Kafka 核心特性

(1)高吞吐 & 低延迟

(2)持久化存储

(3)分布式 & 高可用

(4)水平扩展

(5)流处理能力

1.3 Kafka 典型应用场景

1.4 Kafka 架构示例

数据流示例(订单处理)

1.5 Kafka vs 其他消息队列

2. kafka部署

2.1 创建Namespace

2.2 创建ConfigMap

2.3 创建Headless Service

2.4 创建Statefulset

2.5 部署所有资源

2.6 检查kafka Pod状态


1. Kafka 简介

Apache Kafka 是一个 分布式流处理平台,主要用于构建 高吞吐量、低延迟、可扩展 的实时数据管道和流式应用程序。它最初由 LinkedIn 开发,后成为 Apache 顶级开源项目,广泛应用于大数据、日志聚合、事件驱动架构等领域。


1.1 Kafka 核心概念

(1)消息系统 vs. 流处理平台

  • 传统消息队列(如 RabbitMQ):主要用于解耦生产者和消费者,保证消息可靠传递。

  • Kafka

    • 不仅是一个消息队列,还是一个 分布式流存储系统,支持持久化存储和流式计算。

    • 适用于 高吞吐、大规模数据流 场景(如日志、指标、事件数据)。

(2)核心组件

组件 说明
Producer(生产者) 向 Kafka 发送消息(如日志、交易数据)。
Consumer(消费者) 从 Kafka 读取并处理消息。
Broker(代理) Kafka 服务器,负责存储和转发消息。
Topic(主题) 消息的分类(类似数据库表),如 orderslogs
Partition(分区) 每个 Topic 可分成多个 Partition,提高并行处理能力。
Offset(偏移量) 每条消息在 Partition 中的唯一 ID(类似数据库主键)。
Consumer Group(消费者组) 多个消费者共同消费一个 Topic,实现负载均衡。
ZooKeeper 管理 Kafka 集群元数据(新版本 Kafka 已逐步移除依赖)。

1.2 Kafka 核心特性

(1)高吞吐 & 低延迟

  • 支持每秒百万级消息处理(取决于硬件配置)。

  • 采用 顺序 I/O(相比随机 I/O 更快)和 零拷贝 技术优化性能。

(2)持久化存储

  • 消息默认持久化到磁盘(可配置保留时间),支持 重放(replay) 数据。

  • 适用于 事件溯源(Event Sourcing)审计日志

(3)分布式 & 高可用

  • 支持 多副本(Replication),防止数据丢失。

  • 自动故障转移(Leader/Follower 机制)。

(4)水平扩展

  • 可动态增加 Broker 和 Partition,提升吞吐量。

(5)流处理能力

  • 配合 Kafka StreamsksqlDB 可实现实时流计算(如聚合、窗口计算)。


1.3 Kafka 典型应用场景

场景 说明
日志聚合 收集应用日志(替代 ELK 中的 Logstash)。
消息队列 解耦微服务,如订单系统 → 库存系统。
实时数据处理 结合 Flink/Spark Streaming 做实时分析。
事件驱动架构 如用户行为追踪、IoT 设备数据采集。
Commit Log(提交日志) 数据库变更捕获(CDC),如 Debezium + Kafka。

1.4 Kafka 架构示例

生产者(Producer) → Kafka Cluster(Broker1, Broker2...)
                      ↓
消费者(Consumer Group)→ 实时处理(Flink/Spark)
                      ↓
                    存储(HDFS/DB)

数据流示例(订单处理)

  1. 订单服务(Producer)发送消息到 orders Topic。

  2. 库存服务(Consumer)读取 orders 消息,扣减库存。

  3. 分析服务(Consumer)统计实时销售额。


1.5 Kafka vs 其他消息队列

特性 Kafka RabbitMQ Pulsar
吞吐量 ⭐⭐⭐⭐⭐ ⭐⭐ ⭐⭐⭐⭐
延迟 ⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐
持久化 支持(磁盘) 可选(内存/磁盘) 支持
流处理 原生支持(Kafka Streams) 不支持 支持(Pulsar Functions)
适用场景 大数据、日志 任务队列、RPC 多租户、云原生

适用 Kafka 的场景

  • 需要高吞吐、持久化存储的实时数据流(如日志、事件)。

  • 流处理(如实时分析、监控)。

不适用 Kafka 的场景

  • 需要复杂路由(RabbitMQ 更合适)。

  • 低延迟任务队列(Redis Streams/RabbitMQ 更好)。

Kafka 已成为现代数据架构的核心组件,广泛应用于大数据、微服务、实时计算等领域。

2. kafka部署

2.1 创建Namespace

kubectl create namespace elk

2.2 创建ConfigMap

vim kafka-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: ldc-kafka-scripts
  namespace: elk
data:
  setup.sh: |-  #启动脚本
    #!/bin/bash
    export KAFKA_CFG_NODE_ID=${MY_POD_NAME##*-} 
    exec /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh

2.3 创建Headless Service

vim kafka-headless.yaml
apiVersion: v1
kind: Service
metadata:
  name: kafka-headless
  namespace: elk
spec:
  clusterIP: None
  selector:
    app: kafka
  ports:
    - name: broker
      port: 9092
    - name: controller
      port: 9093

2.4 创建Statefulset

vim kafka-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  namespace: elk
  labels:
    app: kafka
spec:
  selector:
    matchLabels:
      app: kafka
  serviceName: kafka-headless
  podManagementPolicy: Parallel
  replicas: 1  #根据资源情况设置实例数,推荐3个副本
  updateStrategy:
    type: RollingUpdate
  template:
    metadata:
      labels:
        app: kafka
    spec:
      affinity:
        nodeAffinity:  #这里做了节点亲和性调度到master节点
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: node-role.kubernetes.io/control-plane
                operator: Exists
               #values:
               #- master
      tolerations:
      - key: "node-role.kubernetes.io/control-plane"
        operator: "Exists"
        effect: "NoSchedule"
      containers:
      - name: kafka
        image: swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/bitnami/kafka:3.4.0
        imagePullPolicy: "IfNotPresent"
        command:
        - /opt/leaderchain/setup.sh
        env:
        - name: BITNAMI_DEBUG
          value: "true" #详细日志
        # KRaft settings 
        - name: MY_POD_NAME # 用于生成KAFKA_CFG_NODE_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.name            
        - name: KAFKA_CFG_PROCESS_ROLES
          value: "controller,broker"
        - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
          value: "0@kafka-0.kafka-headless:9093"  #修改实例数时要更新
        - name: KAFKA_KRAFT_CLUSTER_ID
          value: "Jc7hwCMorEyPprSI1Iw4sW"  
        # Listeners            
        - name: KAFKA_CFG_LISTENERS
          value: "PLAINTEXT://:9092,CONTROLLER://:9093"
        - name: KAFKA_CFG_ADVERTISED_LISTENERS
          value: "PLAINTEXT://:9092"
        - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
          value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
        - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
          value: "CONTROLLER"
        - name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
          value: "PLAINTEXT"
        - name: ALLOW_PLAINTEXT_LISTENER
          value: "yes"
        ports:
        - containerPort: 9092
          name: broker
        - containerPort: 9093
          name: controller
          protocol: TCP                     
        volumeMounts:
        - mountPath: /bitnami/kafka
          name: kafka-data
        - mountPath: /opt/leaderchain/setup.sh
          name: scripts
          subPath: setup.sh
          readOnly: true      
      securityContext:
        fsGroup: 1001
        runAsUser: 1001
      volumes:    
      - configMap:
          defaultMode: 493
          name: ldc-kafka-scripts  #ConfigMap的名字
        name: scripts                   
  volumeClaimTemplates:
  - apiVersion: v1
    kind: PersistentVolumeClaim
    metadata:
      name: kafka-data
    spec:
      accessModes: [ "ReadWriteOnce" ] 
      storageClassName: nfs-client  #存储类的名称
      resources:
        requests:
          storage: 1Gi

2.5 部署所有资源

[root@master1 Kafka]# ls
kafka-configmap.yaml  kafka-headless.yaml  kafka-statefulset.yaml
[root@master1 Kafka]# kubectl apply -f ./
configmap/ldc-kafka-scripts created
service/kafka-headless created
statefulset.apps/kafka created

2.6 检查kafka Pod状态

[root@master1 Kafka]# kubectl get pod -n elk 
NAME             READY   STATUS    RESTARTS   AGE
filebeat-6db9l   1/1     Running   0          62m
filebeat-qllxg   1/1     Running   0          62m
filebeat-r5hw7   1/1     Running   0          62m
kafka-0          1/1     Running   0          2m2s

网站公告

今日签到

点亮在社区的每一天
去签到