k8s学习--k8s集群部署kafka详细过程

发布于:2024-07-02 ⋅ 阅读:(16) ⋅ 点赞:(0)


前言

kafka简介

kafka是什么

Kafka 是一个由 Apache 软件基金会开发的分布式流处理平台,主要用于实时数据流的处理和传输。它最初由 LinkedIn 开发,并于 2011 年开源。

kafka主要特点

1高吞吐量: Kafka 能够处理大量的数据流,适用于需要高吞吐量的场景。

2.分布式: Kafka 具有分布式架构,能够在多个节点上运行,提供高可用性和容错性。

3.持久化: Kafka 使用磁盘存储消息,确保数据的持久性。

4.高可靠性: 通过复制机制,Kafka 可以在节点故障时保持数据的完整性。

5可扩展性: Kafka 可以通过增加节点来轻松扩展其处理能力。

kafka主要组件

1.Producer(生产者): 负责将数据发布到 Kafka 的 topic 中。

2.Consumer(消费者): 负责从 Kafka 的 topic 中读取数据。

3.Broker(代理): Kafka 集群中的每个节点称为 broker,负责存储和转发数据。

4.Topic(主题): 消息的分类和组织单元,每个 topic 可以分成多个 partition(分区),每个分区存储部分数据。

5.Zookeeper: Kafka 使用 Zookeeper 进行分布式协调和管理集群状态。

kafka使用场景

**1.日志聚合:**集中收集和管理应用程序的日志。

**2.实时数据流处理:**用于实时分析和处理数据流。

**3.事件源驱动架构:**支持事件源驱动的应用程序架构。

**4.消息队列:**作为高吞吐量的消息队列系统,用于跨系统的数据传输。

kafka工作流程

1.生产者 将消息发送到指定的 topic。

2.Broker 接收到消息后,将其存储在磁盘上,并按照 topic 和 partition 进行组织。

3.消费者 从指定的 topic 和 partition 中读取消息。

小结

Kafka 的高吞吐量、低延迟、高可用性和扩展性使其成为处理大规模数据流的理想选择,广泛应用于数据管道、实时分析、日志聚合等领域。

k8s集群部署kafka

环境

虚拟机

Ip 主机名 cpu 内存 硬盘
192.168.10.11 master01 2cpu双核 4G 100G
192.168.10.12 worker01 2cpu双核 4G 100G
192.168.10.13 worker02 2cpu双核 4G 100G
192.168.10.18 nfs 1cpu一核 2G 40G

版本 centos7.9
已部署k8s-1.27

需要做的前置配置
storageclass
详情请看
链接: k8s练习–StorageClass详细解释与应用
区别仅有ip不一致,按本次ip来

本章不再过多叙述

kafka部署过程

master配置
编写yaml文件

vim kafka.yaml
apiVersion: v1
kind: Service
metadata:
  name: kafka-svc
  labels:
    app: kafka-app
spec:
  clusterIP: None
  ports:
    - name: '9092'
      port: 9092
      protocol: TCP
      targetPort: 9092
  selector:
    app: kafka-app
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  labels:
    app: kafka-app
spec:
  serviceName: kafka-svc
  replicas: 3
  selector:
    matchLabels:
      app: kafka-app
  template:
    metadata:
      labels:
        app: kafka-app
    spec:
      containers:
        - name: kafka-container
          image: doughgle/kafka-kraft
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 9092
            - containerPort: 9093
          env:
            - name: REPLICAS
              value: '3'
            - name: SERVICE
              value: kafka-svc
            - name: NAMESPACE
              value: default
            - name: SHARE_DIR
              value: /mnt/kafka
            - name: CLUSTER_ID
              value: oh-sxaDRTcyAr6pFRbXyzA
            - name: DEFAULT_REPLICATION_FACTOR
              value: '3'
            - name: DEFAULT_MIN_INSYNC_REPLICAS
              value: '2'
          volumeMounts:
            - name: data
              mountPath: /mnt/kafka
            - name: localtime
              mountPath: /etc/localtime
      volumes:
      - name: localtime
        hostPath:
          path: /etc/localtime
          type: ''

  volumeClaimTemplates:
    - metadata:
        name: data
      spec:
        accessModes:
          - "ReadWriteOnce"
        storageClassName: nfs-client
        resources:
          requests:
            storage: "1Gi"

yaml文件拆分解释
这个 YAML 文件定义了一个 Kafka 服务和一个有状态集合 (StatefulSet)。以下是对每个部分的解释:
service部分

apiVersion: v1
kind: Service
metadata:
  name: kafka-svc
  labels:
    app: kafka-app
spec:
  clusterIP: None
  ports:
    - name: '9092'
      port: 9092
      protocol: TCP
      targetPort: 9092
  selector:
    app: kafka-app

解释
apiVersion: v1: 指定 API 版本。
kind: Service: 资源类型为服务 (Service)。
metadata: 元数据,包括名称和标签。
name: kafka-svc: 服务的名称。
labels: 服务的标签,用于标识和选择服务。
app: kafka-app: 应用标签。
spec: 服务的规格。
clusterIP: None: 指定服务为无头服务(Headless Service),没有单一的 ClusterIP。
ports: 服务暴露的端口。
name: ‘9092’: 端口名称。
port: 9092: 服务端口号。
protocol: TCP: 协议为 TCP。
targetPort: 9092: 目标容器的端口号。
selector: 服务选择器,用于选择匹配的 Pod。
app: kafka-app: 应用标签。

StatefulSet部分

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  labels:
    app: kafka-app
spec:
  serviceName: kafka-svc
  replicas: 3
  selector:
    matchLabels:
      app: kafka-app
  template:
    metadata:
      labels:
        app: kafka-app
    spec:
      containers:
        - name: kafka-container
          image: doughgle/kafka-kraft
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 9092
            - containerPort: 9093
          env:
            - name: REPLICAS
              value: '3'
            - name: SERVICE
              value: kafka-svc
            - name: NAMESPACE
              value: default
            - name: SHARE_DIR
              value: /mnt/kafka
            - name: CLUSTER_ID
              value: oh-sxaDRTcyAr6pFRbXyzA
            - name: DEFAULT_REPLICATION_FACTOR
              value: '3'
            - name: DEFAULT_MIN_INSYNC_REPLICAS
              value: '2'
          volumeMounts:
            - name: data
              mountPath: /mnt/kafka
            - name: localtime
              mountPath: /etc/localtime
      volumes:
      - name: localtime
        hostPath:
          path: /etc/localtime
          type: ''
  volumeClaimTemplates:
    - metadata:
        name: data
      spec:
        accessModes:
          - "ReadWriteOnce"
        storageClassName: nfs-client
        resources:
          requests:
            storage: "1Gi"

解释
apiVersion: apps/v1: 指定 API 版本。
kind: StatefulSet: 资源类型为有状态集合 (StatefulSet)。
metadata: 元数据,包括名称和标签。
name: kafka: StatefulSet 的名称。
labels: StatefulSet 的标签。
app: kafka-app: 应用标签。
spec: StatefulSet 的规格。
serviceName: kafka-svc: 关联的服务名称。
replicas: 3: 副本数。
selector: 选择器,用于选择匹配的 Pod。
matchLabels:
app: kafka-app: 应用标签。
template: Pod 模板定义。
metadata: 模板的元数据,包括标签。
labels:
app: kafka-app: 应用标签。
spec: Pod 的规格。
containers: 容器定义。
name: kafka-container: 容器名称。
image: doughgle/kafka-kraft: 使用的镜像。
imagePullPolicy: IfNotPresent: 拉取镜像策略。
ports: 容器暴露的端口。
containerPort: 9092
containerPort: 9093
env: 环境变量。
REPLICAS: 副本数量。
SERVICE: 服务名称。
NAMESPACE: 命名空间。
SHARE_DIR: 共享目录。
CLUSTER_ID: 集群 ID。
DEFAULT_REPLICATION_FACTOR: 默认副本因子。
DEFAULT_MIN_INSYNC_REPLICAS: 默认最小同步副本数。
volumeMounts: 挂载卷。
name: data: 挂载的卷名称。
mountPath: /mnt/kafka: 挂载路径。
name: localtime: 挂载的本地时间卷。
mountPath: /etc/localtime: 挂载路径。
volumes: 卷定义。
name: localtime: 卷名称。
hostPath:
path: /etc/localtime: 主机路径。
type: ‘’: 类型为空。
volumeClaimTemplates: 卷声明模板。
metadata: 元数据,包括名称。
name: data: 卷名称。
spec: 卷规格。
accessModes:
“ReadWriteOnce”: 访问模式。
storageClassName: nfs-client: 存储类名称。
resources: 资源请求。
requests:
storage: “1Gi”: 存储请求大小。

这个定义确保了 Kafka 集群的高可用性和持久化存储,通过 StatefulSet 管理 Kafka 实例,并使用无头服务来保证每个实例都有一个固定的网络标识。

应用yaml文件

kubectl apply -f kafka.yaml

下载需要vpn
速度较慢,耐心等待
在这里插入图片描述
查看service

kubectl get svc

kafka应用测试

创建客户端pod

kubectl run kafka-client --rm -it --image bitnami/kafka:3.1.0 -- bash

过程较慢,稍等
报错
在这里插入图片描述

等半分钟,重试即可
在这里插入图片描述

查看默认存在的topic(Topic,即消息主题。)

 kafka-topics.sh --list --bootstrap-server kafka-svc.default.svc.cluster.local:9092

创建topic

kafka-topics.sh --bootstrap-server kafka-svc.default.svc.cluster.local:9092 --topic test01 --create --partitions 3 --replication-factor 2

在这里插入图片描述

kafka-topics.sh --list --bootstrap-server kafka-svc.default.svc.cluster.local:9092

在这里插入图片描述
创建数据生产者,添加数据

kafka-console-producer.sh --topic test1 --request-required-acks all --bootstrap-server kafka-svc.default.svc.cluster.local:9092
hello world

ctrl +c 退出

在这里插入图片描述在当前终端或另一个终端中创建数据消费者,消费数据

 kafka-console-consumer.sh --topic test1 --from-beginning --bootstrap-server kafka-svc.default.svc.cluster.local:9092

在这里插入图片描述
查看默认test topic相关描述信息

kafka-topics.sh --describe --topic test1 --bootstrap-server kafka-svc.default.svc.cluster.local:9092

在这里插入图片描述
查看test01 topic相关描述信息

kafka-topics.sh --describe --topic test01  --bootstrap-server kafka-svc.default.svc.cluster.local:9092

在这里插入图片描述