Kafka消息中间件安装配置

发布于:2025-09-02 ⋅ 阅读:(19) ⋅ 点赞:(0)

1 docker安装zookeeper集群(可忽略)

zookeeper 集群中的每个节点都需要一个唯一的标识(myid 文件)和统一的集群服务器列表配置(zoo.cfg 中的 server.x 列表)。

  • ZOO_MY_ID:表示当前 zookeeper 实例在集群中的编号,范围为1-255,所以一个 zookeeper 集群最多有 255 个节点
  • ZOO_SERVERS:表示当前 zookeeper 实例所在集群中的所有节点的编号、主机名(或IP地址)、端口

zookeeper 集群需要的端口:

  • 2181:客户端连接端口。
  • 2888:zookeeper 集群中的节点,Follower 与 Leader 之间进行数据同步和消息传递的端口。
  • 3888:用于 Leader 选举的端口。

1.1 创建相关目录

mkdir -p /opt/soft/kafka_cluster/zk1/{data,logs}
mkdir -p /opt/soft/kafka_cluster/zk2/{data,logs}
mkdir -p /opt/soft/kafka_cluster/zk3/{data,logs}

sudo chmod -R 777 /opt/soft/kafka_cluster/zk1
sudo chmod -R 777 /opt/soft/kafka_cluster/zk2
sudo chmod -R 777 /opt/soft/kafka_cluster/zk3

1.2 zk-docker-compose.yaml

version: '3.2'
 
services:
  zk1:
    image: zookeeper:3.9.3
    container_name: zk1
    restart: always
    ports:
      - "2181:2181"
    environment:
      ALLOW_ANONYMOUS_LOGIN: yes # 允许匿名连接,生产环境应配置认证
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
    volumes:
      - /etc/localtime:/etc/localtime
      - ./zk1/data:/data
      - ./zk1/logs:/datalog
    networks:
      - zk-net
 
  zk2:
    image: zookeeper:3.9.3
    container_name: zk2
    restart: always
    ports:
      - "2182:2181"
    environment:
      ALLOW_ANONYMOUS_LOGIN: yes # 允许匿名连接,生产环境应配置认证
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zk3:2888:3888;2181
    volumes:
      - /etc/localtime:/etc/localtime
      - ./zk2/data:/data
      - ./zk2/logs:/datalog
    networks:
      - zk-net
 
  zk3:
    image: zookeeper:3.9.3
    container_name: zk3
    restart: always
    ports:
      - "2183:2181"
    environment:
      ALLOW_ANONYMOUS_LOGIN: yes # 允许匿名连接,生产环境应配置认证
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
    volumes:
      - /etc/localtime:/etc/localtime
      - ./zk3/data:/data
      - ./zk3/logs:/datalog
    networks:
      - zk-net
 
# 给集群创建一个网络,名称自己随便定义,这里取名为 zk-net
networks:
  zk-net:
    driver: bridge

1.3 验证集群状态

集群启动后(大约需要等待一分钟进行选举),可以通过以下命令检查每个节点的状态:

# 进入节点1容器
docker exec -it zk1 /bin/bash

# 在容器内执行状态检查命令
[root@node05 kafka_cluster]# docker exec -it zk1 /bin/sh
# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
# exit

[root@node05 kafka_cluster]# docker exec -it zk2 /bin/sh
# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
# exit

[root@node05 kafka_cluster]# docker exec -it zk3 /bin/sh
# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader

注意:对每个节点执行此操作,输出会显示该节点的模式是 Mode: leader 还是 Mode: follower。一个健康的集群应该有一个 Leader 和两个 Follower。

1.4 启动服务命令

# -f表示指定某个配置文件名   -d:表示后台启动
docker-compose up -d 
# 查看当前服务
docker ps

※2 docker安装kafka集群(基于KRaft模式)

Kafka 4.0 引入了 KRaft 模式(Kafka Raft Metadata Mode),它使 Kafka 集群不再依赖 ZooKeeper 进行元数据管理。KRaft 模式简化了 Kafka 部署和管理,不需要额外配置 ZooKeeper 服务,使得集群的配置和运维更加高效。

2.1 参数讲解

通用KRaft配置

  • KAFKA_ENABLE_KRAFT=yes(允许使用kraft,使用kraft模式)
  • KAFKA_CFG_PROCESS_ROLES=broker,controller(指定Kafka进程的角色,在KRaft模式下,节点可以同时是broker和controller,或者是其中一种。)
  • KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER(指定供外部使用的控制类请求信息[指定用于控制器通信的监听器名称。)
  • KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093(定义Kafka监听的协议、接口和端口。内部通信和外部客户端访问的端口可以分开。)
  • KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT(将监听器名称映射到安全协议。)
  • KAFKA_KRAFT_CLUSTER_ID=${CLUSTER_ID} (使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可)
  • KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093(定义参与控制器选举的所有投票者地址。格式为id@host:port,其中id是每个节点的BROKER_ID。)
  • ALLOW_PLAINTEXT_LISTENER=yes(允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用)

Broker特定配置

  • KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${HOST}:9194(定义客户端如何连接到Kafka。通常设置为宿主机的IP或域名和映射后的端口。)
  • KAFKA_BROKER_ID=1(broker.id,必须唯一)

可选:资源与性能

  • KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true(允许自动创建主题,生产环境建议关闭)
  • KAFKA_CFG_NODE_ID=1(node id 唯一并且和broker一致)

2.2 创建相关目录

mkdir -p /opt/soft/kafka_cluster/kafka1
mkdir -p /opt/soft/kafka_cluster/kafka2
mkdir -p /opt/soft/kafka_cluster/kafka3

sudo chmod -R 777 /opt/soft/kafka_cluster/kafka1
sudo chmod -R 777 /opt/soft/kafka_cluster/kafka2
sudo chmod -R 777 /opt/soft/kafka_cluster/kafka3

2.3 docker-compose.yaml

  1. 部署前准备
# 1. 编辑.env 文件 注意HOST替换为主机ip
CLUSTER_ID=y2OnVDLzRYyxh3V156gnxw
HOST=192.168.1.151
  1. 编写docker-compose.yaml文件
version: "3.2"
services:
  kafka1:
    image: "bitnami/kafka:3.8.0"
    container_name: kafka11
    restart: always
    user: root
    ports:
      - 9192:9092 # 外部客户端访问端口
      - 9193:9093 # Controller监听端口,内部通信
    environment:
      ### 通用KRaft配置 ###
      # 允许使用kraft,使用kraft模式
      - KAFKA_ENABLE_KRAFT=yes
      # kafka角色,同时做为broker和controller[指定Kafka进程的角色。在KRaft模式下,节点可以同时是broker和controller,或者是其中一种。]
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      # 指定供外部使用的控制类请求信息[指定用于控制器通信的监听器名称。]
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 定义Kafka监听的协议、接口和端口。内部通信和外部客户端访问的端口可以分开。
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      # 将监听器名称映射到安全协议。
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      # 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可
      - KAFKA_KRAFT_CLUSTER_ID=${CLUSTER_ID}      
      # 定义参与控制器选举的所有投票者地址。格式为id@host:port,其中id是每个节点的BROKER_ID。
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
      # 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
      - ALLOW_PLAINTEXT_LISTENER=yes
      ### Broker特定配置 ###
      # 定义客户端如何连接到Kafka。通常设置为宿主机的IP或域名和映射后的端口。
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${HOST}:9192
      # broker.id,必须唯一
      - KAFKA_BROKER_ID=1
      ### 可选:资源与性能 ###
      # 允许自动创建主题,建议生产环境关闭
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      # node id 唯一并且和broker一致
      - KAFKA_CFG_NODE_ID=1
    volumes:
      - /etc/localtime:/etc/localtime:ro # 将外边时间直接挂载到容器内部,权限只读
      - /opt/soft/kafka_cluster/kafka1:/bitnami/kafka
    networks:
      - kafka-net

  kafka2:
    image: "bitnami/kafka:3.8.0"
    container_name: kafka22
    restart: always
    user: root
    ports:
      - 9292:9092
      - 9293:9093
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_KRAFT_CLUSTER_ID=${CLUSTER_ID}
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${HOST}:9292
      - KAFKA_BROKER_ID=2
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_CFG_NODE_ID=2
    volumes:
      - /etc/localtime:/etc/localtime:ro # 将外边时间直接挂载到容器内部,权限只读
      - /opt/soft/kafka_cluster/kafka2:/bitnami/kafka
    networks:
      - kafka-net

  kafka3:
    image: "bitnami/kafka:3.8.0"
    container_name: kafka33
    restart: always
    user: root
    ports:
      - 9392:9092
      - 9393:9093
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_KRAFT_CLUSTER_ID=${CLUSTER_ID}
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${HOST}:9392
      - KAFKA_BROKER_ID=3
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_CFG_NODE_ID=3
    volumes:
      - /etc/localtime:/etc/localtime:ro # 将外边时间直接挂载到容器内部,权限只读
      - /opt/soft/kafka_cluster/kafka3:/bitnami/kafka
    networks:
      - kafka-net

  # 可选:Kafka UI 管理工具
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    restart: always
    ports:
      - "9080:8080"
    environment:
      - KAFKA_CLUSTERS_0_NAME=local-kraft-cluster
      - KAFKA_CLUSTERS_0_BOOTSTRAP_SERVERS=kafka1:9092,kafka2:9092,kafka3:9092
      # 如果Kafka配置了认证,需在此设置
    depends_on:
      - kafka1
      - kafka2
      - kafka3
    networks:
      - kafka-net

networks:
  kafka-net:
    driver: bridge
    ipam:
      config:
        - subnet: 172.30.0.0/16

2.4 验证集群状态

  1. 进入其中一个容器,使用Kafka命令行工具验证集群是否正常运行。
docker exec -it kafka11 /bin/bash

# 进入容器后,列出主题(此时应为空)
kafka-topics.sh --bootstrap-server localhost:9092 --list
  1. 创建一个测试主题
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic --partitions 3 --replication-factor 3
  1. 查看主题详情,确认分区和副本分布
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test-topic

Topic: test-topic       TopicId: 2ChsTY_IT1KiuC0AP8-25w PartitionCount: 3       ReplicationFactor: 3    Configs:
        Topic: test-topic       Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3      Elr:    LastKnownElr:
        Topic: test-topic       Partition: 1    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1      Elr:    LastKnownElr:
        Topic: test-topic       Partition: 2    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2      Elr:    LastKnownElr:
  1. 测试消息生产与消费
# 在一个终端运行生产者
docker exec -it kafka11 /bin/bash

kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

docker exec -it kafka11 /bin/sh

# 在另一个终端运行消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning

在生产端输入消息,应该在消费端能看到。
5. 访问Kafka UI(如果部署了):
打开浏览器访问 http://your-host-ip:9080,应该能看到Kafka UI界面,并可以查看集群、主题、消费者组等信息。

2.5 开放相关端口

# kafka1相关端口
firewall-cmd --zone=public --add-port=9192/tcp --permanent
firewall-cmd --zone=public --add-port=9193/tcp --permanent

# kafka2相关端口
firewall-cmd --zone=public --add-port=9292/tcp --permanent
firewall-cmd --zone=public --add-port=9293/tcp --permanent

## kafka3相关端口
firewall-cmd --zone=public --add-port=9392/tcp --permanent
firewall-cmd --zone=public --add-port=9393/tcp --permanent

firewall-cmd --zone=public --add-port=9080/tcp --permanent

# 重启防火墙
firewall-cmd --reload

# 查看开放的端口
firewall-cmd --list-port