目录
1. Kafka 简介
Apache Kafka 是一个 分布式流处理平台,主要用于构建 高吞吐量、低延迟、可扩展 的实时数据管道和流式应用程序。它最初由 LinkedIn 开发,后成为 Apache 顶级开源项目,广泛应用于大数据、日志聚合、事件驱动架构等领域。
1.1 Kafka 核心概念
(1)消息系统 vs. 流处理平台
传统消息队列(如 RabbitMQ):主要用于解耦生产者和消费者,保证消息可靠传递。
Kafka:
不仅是一个消息队列,还是一个 分布式流存储系统,支持持久化存储和流式计算。
适用于 高吞吐、大规模数据流 场景(如日志、指标、事件数据)。
(2)核心组件
组件 | 说明 |
---|---|
Producer(生产者) | 向 Kafka 发送消息(如日志、交易数据)。 |
Consumer(消费者) | 从 Kafka 读取并处理消息。 |
Broker(代理) | Kafka 服务器,负责存储和转发消息。 |
Topic(主题) | 消息的分类(类似数据库表),如 orders 、logs 。 |
Partition(分区) | 每个 Topic 可分成多个 Partition,提高并行处理能力。 |
Offset(偏移量) | 每条消息在 Partition 中的唯一 ID(类似数据库主键)。 |
Consumer Group(消费者组) | 多个消费者共同消费一个 Topic,实现负载均衡。 |
ZooKeeper | 管理 Kafka 集群元数据(新版本 Kafka 已逐步移除依赖)。 |
1.2 Kafka 核心特性
(1)高吞吐 & 低延迟
支持每秒百万级消息处理(取决于硬件配置)。
采用 顺序 I/O(相比随机 I/O 更快)和 零拷贝 技术优化性能。
(2)持久化存储
消息默认持久化到磁盘(可配置保留时间),支持 重放(replay) 数据。
适用于 事件溯源(Event Sourcing) 和 审计日志。
(3)分布式 & 高可用
支持 多副本(Replication),防止数据丢失。
自动故障转移(Leader/Follower 机制)。
(4)水平扩展
可动态增加 Broker 和 Partition,提升吞吐量。
(5)流处理能力
配合 Kafka Streams 或 ksqlDB 可实现实时流计算(如聚合、窗口计算)。
1.3 Kafka 典型应用场景
场景 | 说明 |
---|---|
日志聚合 | 收集应用日志(替代 ELK 中的 Logstash)。 |
消息队列 | 解耦微服务,如订单系统 → 库存系统。 |
实时数据处理 | 结合 Flink/Spark Streaming 做实时分析。 |
事件驱动架构 | 如用户行为追踪、IoT 设备数据采集。 |
Commit Log(提交日志) | 数据库变更捕获(CDC),如 Debezium + Kafka。 |
1.4 Kafka 架构示例
生产者(Producer) → Kafka Cluster(Broker1, Broker2...) ↓ 消费者(Consumer Group)→ 实时处理(Flink/Spark) ↓ 存储(HDFS/DB)
数据流示例(订单处理)
订单服务(Producer)发送消息到
orders
Topic。库存服务(Consumer)读取
orders
消息,扣减库存。分析服务(Consumer)统计实时销售额。
1.5 Kafka vs 其他消息队列
特性 | Kafka | RabbitMQ | Pulsar |
---|---|---|---|
吞吐量 | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐ |
延迟 | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ |
持久化 | 支持(磁盘) | 可选(内存/磁盘) | 支持 |
流处理 | 原生支持(Kafka Streams) | 不支持 | 支持(Pulsar Functions) |
适用场景 | 大数据、日志 | 任务队列、RPC | 多租户、云原生 |
✅ 适用 Kafka 的场景:
需要高吞吐、持久化存储的实时数据流(如日志、事件)。
流处理(如实时分析、监控)。
❌ 不适用 Kafka 的场景:
需要复杂路由(RabbitMQ 更合适)。
低延迟任务队列(Redis Streams/RabbitMQ 更好)。
Kafka 已成为现代数据架构的核心组件,广泛应用于大数据、微服务、实时计算等领域。
2. kafka部署
2.1 创建Namespace
kubectl create namespace elk
2.2 创建ConfigMap
vim kafka-configmap.yaml
apiVersion: v1 kind: ConfigMap metadata: name: ldc-kafka-scripts namespace: elk data: setup.sh: |- #启动脚本 #!/bin/bash export KAFKA_CFG_NODE_ID=${MY_POD_NAME##*-} exec /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh
2.3 创建Headless Service
vim kafka-headless.yaml
apiVersion: v1 kind: Service metadata: name: kafka-headless namespace: elk spec: clusterIP: None selector: app: kafka ports: - name: broker port: 9092 - name: controller port: 9093
2.4 创建Statefulset
vim kafka-statefulset.yaml
apiVersion: apps/v1 kind: StatefulSet metadata: name: kafka namespace: elk labels: app: kafka spec: selector: matchLabels: app: kafka serviceName: kafka-headless podManagementPolicy: Parallel replicas: 1 #根据资源情况设置实例数,推荐3个副本 updateStrategy: type: RollingUpdate template: metadata: labels: app: kafka spec: affinity: nodeAffinity: #这里做了节点亲和性调度到master节点 requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: node-role.kubernetes.io/control-plane operator: Exists #values: #- master tolerations: - key: "node-role.kubernetes.io/control-plane" operator: "Exists" effect: "NoSchedule" containers: - name: kafka image: swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/bitnami/kafka:3.4.0 imagePullPolicy: "IfNotPresent" command: - /opt/leaderchain/setup.sh env: - name: BITNAMI_DEBUG value: "true" #详细日志 # KRaft settings - name: MY_POD_NAME # 用于生成KAFKA_CFG_NODE_ID valueFrom: fieldRef: fieldPath: metadata.name - name: KAFKA_CFG_PROCESS_ROLES value: "controller,broker" - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS value: "0@kafka-0.kafka-headless:9093" #修改实例数时要更新 - name: KAFKA_KRAFT_CLUSTER_ID value: "Jc7hwCMorEyPprSI1Iw4sW" # Listeners - name: KAFKA_CFG_LISTENERS value: "PLAINTEXT://:9092,CONTROLLER://:9093" - name: KAFKA_CFG_ADVERTISED_LISTENERS value: "PLAINTEXT://:9092" - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT" - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES value: "CONTROLLER" - name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME value: "PLAINTEXT" - name: ALLOW_PLAINTEXT_LISTENER value: "yes" ports: - containerPort: 9092 name: broker - containerPort: 9093 name: controller protocol: TCP volumeMounts: - mountPath: /bitnami/kafka name: kafka-data - mountPath: /opt/leaderchain/setup.sh name: scripts subPath: setup.sh readOnly: true securityContext: fsGroup: 1001 runAsUser: 1001 volumes: - configMap: defaultMode: 493 name: ldc-kafka-scripts #ConfigMap的名字 name: scripts volumeClaimTemplates: - apiVersion: v1 kind: PersistentVolumeClaim metadata: name: kafka-data spec: accessModes: [ "ReadWriteOnce" ] storageClassName: nfs-client #存储类的名称 resources: requests: storage: 1Gi
2.5 部署所有资源
[root@master1 Kafka]# ls kafka-configmap.yaml kafka-headless.yaml kafka-statefulset.yaml [root@master1 Kafka]# kubectl apply -f ./ configmap/ldc-kafka-scripts created service/kafka-headless created statefulset.apps/kafka created
2.6 检查kafka Pod状态
[root@master1 Kafka]# kubectl get pod -n elk NAME READY STATUS RESTARTS AGE filebeat-6db9l 1/1 Running 0 62m filebeat-qllxg 1/1 Running 0 62m filebeat-r5hw7 1/1 Running 0 62m kafka-0 1/1 Running 0 2m2s