k8s部署kafka三节点集群

发布于:2025-09-12 ⋅ 阅读:(21) ⋅ 点赞:(0)

本来认为部署kafka很简单,没想到也折腾了2-3天,这水平没治了~

kafka从3.4.0版本之后,可以不依赖zookeeper直接使用KRaft模式部署,也就是说部署kafka可以不安装zookeeper直接部署。

官网上没有找到如何使用yaml文件在k8s上部署,捣鼓了2-3天终于稳定部署了,把步骤记录下来以备后查。

yaml文件内容:

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-hs
  namespace: kafka
spec:
  clusterIP: None
  selector:
    app: kafka
  ports:
    - port: 9092
      targetPort: 9092
      name: kafka-server
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-svc
  namespace: kafka
spec:
  type: ClusterIP
  selector:
    app: kafka
  ports:
    - port: 9092
      targetPort: 9092
      name: server
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  namespace: kafka
  labels:
    app: kafka
spec:
  serviceName: "kafka-hs" # 指向内部无头服务
  replicas: 3
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      initContainers:
      - name: create-data-dir
        image: docker.m.daocloud.io/library/busybox:latest
        imagePullPolicy: IfNotPresent
        command: ['sh', '-c', 'mkdir -p /host-data/$(POD_NAME) && chmod 755 /host-data/$(POD_NAME) && chown -R 1000:1000 /host-data']
        env:
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        volumeMounts:
        - name: data
          mountPath: /host-data
      containers:
      - name: kafka
        image: docker.m.daocloud.io/apache/kafka:4.1.0
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 9092
          protocol: TCP
        - containerPort: 9093
          protocol: TCP
        securityContext:
          runAsUser: 1000
          runAsGroup: 1000
        env:
        - name: KAFKA_ENABLE_KRAFT
          value: "yes"
        - name: KAFKA_PROCESS_ROLES
          value: "broker,controller"  # 节点同时担任 broker 和 controller
        - name: KAFKA_NODE_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.labels['apps.kubernetes.io/pod-index']
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: KAFKA_CONTROLLER_QUORUM_VOTERS
          value: "0@kafka-0.kafka-hs.kafka.svc.cluster.local:9093,1@kafka-1.kafka-hs.kafka.svc.cluster.local:9093,2@kafka-2.kafka-hs.kafka.svc.cluster.local:9093"
        - name: KAFKA_LISTENERS
          value: "PLAINTEXT://:9092,CONTROLLER://:9093"
        - name: KAFKA_ADVERTISED_LISTENERS
          value: "PLAINTEXT://$(POD_NAME).kafka-hs.kafka.svc.cluster.local:9092"
        - name: KAFKA_LOG_DIRS
          value: "/bitnami/kafka/log"
        - name: KAFKA_CLUSTER_ID
          value: "oUp8pYCCRTKwXIc8KiQ2Uw"
        - name: ALLOW_PLAINTEXT_LISTENER
          value: "yes"
        # 添加存储初始化命令
        command: ["sh", "-c"]
        args:
          - |
            # 首次启动时格式化存储目录
           # KAFKA_CLUSTER_ID="$(/opt/kafka/bin/kafka-storage.sh random-uuid)"
            if [ ! -f /bitnami/kafka/log ]; then
              echo "创建log目录"
              mkdir -p /bitnami/kafka/log
              echo "初始化配置文件"
              sed -i 's/^log.dirs=.*//g' /opt/kafka/config/server.properties
              sed -i 's/^node.id=.*/node.id=$(KAFKA_NODE_ID)/' /opt/kafka/config/server.properties
              sed -i 's/localhost/$(POD_NAME).kafka-hs.kafka.svc.cluster.local/g' /opt/kafka/config/server.properties
              echo 'controller.quorum.voters=$(KAFKA_CONTROLLER_QUORUM_VOTERS)' >> /opt/kafka/config/server.properties
              echo 'log.dirs=$(KAFKA_LOG_DIRS)' >> /opt/kafka/config/server.properties
              sleep 1
              echo "配置文件初始化完毕..."
              echo "cluster.id=$(KAFKA_CLUSTER_ID)" > /bitnami/kafka/cluster.id
              cat /opt/kafka/config/server.properties
              /opt/kafka/bin/kafka-storage.sh format \
                -c /opt/kafka/config/server.properties \
                -t $(KAFKA_CLUSTER_ID) \
                --no-initial-controllers
             echo "格式化log存储目录"
            fi
            sleep 1            
            # 启动 Kafka
             echo "启动 kafka"
            /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
        volumeMounts:
        - name: data
          mountPath: /bitnami/kafka
          subPathExpr: $(POD_NAME)
      volumes:
      - name: data
        hostPath:
          path: /data/juicefs-mnt/kafka
          type: DirectoryOrCreate

这里面有几个关键知识点是以前我所没有涉及的:

1. 通过metadata.labels['apps.kubernetes.io/pod-index']  可以获取使用statefulset部署pod的索引;

2. 在volumeMounts:
        - name: data
          mountPath: /bitnami/kafka
          subPathExpr: $(POD_NAME) 中,可以使用subPathExpr获取环境变量

3. kafka部署是先要进行格式化存储目录的,并且在部署过程中设置传入pod中的环境变量对初始化命令没有直接影响,必须通过更改配置文件server.properties来修改格式化过程。官网只提供了单机模式部署,初始化过程中的kafka-storage format 命令提供了三种模式:[--standalone |
                     --no-initial-controllers |
                     --initial-controllers INITIAL_CONTROLLERS] 其帮助文档如下:

usage: kafka-storage format [-h] --config CONFIG --cluster-id CLUSTER_ID
                     [--add-scram ADD_SCRAM] [--ignore-formatted]
                     [--release-version RELEASE_VERSION]
                     [--feature FEATURE] [--standalone |
                     --no-initial-controllers |
                     --initial-controllers INITIAL_CONTROLLERS]

optional arguments:
  -h, --help             show this help message and exit
  --config CONFIG, -c CONFIG
                         The Kafka configuration file to use.
  --cluster-id CLUSTER_ID, -t CLUSTER_ID
                         The cluster ID to use.
  --add-scram ADD_SCRAM, -S ADD_SCRAM
                         A    SCRAM_CREDENTIAL     to     add     to    the
                         __cluster_metadata log e.g.
                         'SCRAM-SHA-256=[name=alice,password=alice-
                         secret]'
                         'SCRAM-SHA-512=[name=alice,iterations=8192,salt="
                         N3E=",saltedpassword="YCE="]'
  --ignore-formatted, -g
                         When this option  is  passed,  the  format command
                         will  skip  over   already  formatted  directories
                         rather than failing.
  --release-version RELEASE_VERSION, -r RELEASE_VERSION
                         The  release  version  to   use  for  the  initial
                         feature settings.  The  minimum  is  3.3-IV3;  the
                         default is 4.1-IV1
  --feature FEATURE, -f FEATURE
                         The setting to  use  for  a  specific  feature, in
                         feature=level   format.   For   example:   `kraft.
                         version=1`.
  --standalone, -s       Used to initialize a  controller  as a single-node
                         dynamic quorum.
  --no-initial-controllers, -N
                         Used to  initialize  a  server  without  a dynamic
                         quorum topology.
  --initial-controllers INITIAL_CONTROLLERS, -I INITIAL_CONTROLLERS
                         Used  to  initialize  a  server  with  a  specific
                         dynamic quorum topology. The  argument is a comma-
                         separated list of  id@hostname:port:directory. The
                         same values must be used  to format all nodes. For
                         example:
                         0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,
                         1@example.com:8083:MvDxzVmcRsaTz33bUuRU6A,
                         2@example.com:8084:07R5amHmR32VDA6jHkGbTA

你若用initial-controllers模式需要先转换directory.id 比较费劲,所以我选择了在部署过程中直接修改/opt/kafka/config/server.properties ,按照设置传递进来的环境变量修改配置文件后,再启动kafka就可以三节点稳定运行了。


网站公告

今日签到

点亮在社区的每一天
去签到