【Docker】安装kafka案例

发布于:2025-08-15 ⋅ 阅读:(12) ⋅ 点赞:(0)

【一】环境准备

(1)准备服务器或者虚拟机
(2)安装宝塔控制面板
(3)安装docker
(4)安装jdk

【二】安装kafka

【1】zookeeper作用

(1)保存和管理 Kafka 集群的元数据信息。例如,Kafka 集群中的 Broker 信息、Topic 和 Partition
的信息、Consumer 的偏移量信息等。
(2)Kafka 集群的选举服务。当 Kafka 集群中的某个或某些 Broker 宕机时,ZooKeeper 可以协助 Kafka
集群选举出新的 Leader Broker。
(3)Kafka 集群的状态监控。ZooKeeper 可以监控 Kafka 集群中各个 Broker 的在线状态,并及时通知 Kafka
集群进行相应的处理

【2】拉取镜像

(1)拉取zookeeper镜像

docker pull wurstmeister/zookeeper

(2)拉取kafka镜像

docker pull wurstmeister/kafka

【3】创建容器

(1)创建zookeeper容器

docker run -d --name zookeeper -p 2181 -t wurstmeister/zookeeper

(2)创建kafka容器

前提要安装好jdk,并配置好环境变量

docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:latest

测试kafka

docker exec -it kafka /bin/bash

exit退出

【4】zookeeper服务启动

(1)找到Zookeeper安装路径

find / -name zkServer.sh

在这里插入图片描述

(2)执行启动

(1)进入zookeeper的config文件

cd /var/lib/docker/overlay2/80f2f1635bed9db5db5e5697053485db509c413aef6a5a6a3264330a7f53e8b4/diff/opt/zookeeper-3.4.13/conf/

(2)文件更名为zoo.cfg
将里面的zoo_xxx.cfg文件更名为zoo.cfg

mv zoo_xxx.cfg zoo.cfg

(3)执行启动脚本
进入zookeeper的bin目录

cd /var/lib/docker/overlay2/80f2f1635bed9db5db5e5697053485db509c413aef6a5a6a3264330a7f53e8b4/diff/opt/zookeeper-3.4.13/bin/

(4)执行./zkServer.sh start

./zkServer.sh start

在这里插入图片描述

(3)查询运行状态

./zkServer.sh status

在这里插入图片描述
该状态显示:以standalone模式运行。standalone模式意味着你只运行了一个Zookeeper服务器实例,而不是一个Zookeeper集群

(4)配置host

进入/etc/hosts 加上最后一行,192.168.x.x zookeeper。代表将将主机名映射到IP地址

在这里插入图片描述

【5】kafka服务启动

(1)找到kafka安装路径

find / -name kafka-topics.sh

在这里插入图片描述
进入到你查出来的kafka-topics.sh路径的上一个bin路径在进入config目录

cd /var/lib/docker/overlay2/8244efe326332818bd6b780a6333a2af67b2731cf32ded8a9e72f2179c7bb7d6/diff/opt/kafka_2.13-2.8.1/config
ll

在这里插入图片描述

(2)修改server.properties

找到config文件下的server.properties并进行修改端口,保存

配置你的服务器ip,确保配置文件中绑定的地址允许外部访问

# 监听地址:默认是localhost,仅允许本地访问,需改为0.0.0.0允许所有IP访问
listeners=PLAINTEXT://0.0.0.0:9092

# 对外公告的地址(若客户端与broker不在同一网络,需配置为公网IP或可访问的内网IP)
advertised.listeners=PLAINTEXT://192.168.19.16:9092

在这里插入图片描述

注意:advertised.listeners必须是客户端能访问到的 IP(不能是localhost或 127.0.0.1),否则客户端会获取到无效地址,远程连接会被拒绝

(3)执行kafka-server-start.sh 命令

/var/lib/docker/overlay2/8244efe326332818bd6b780a6333a2af67b2731cf32ded8a9e72f2179c7bb7d6/diff/opt/kafka_2.13-2.8.1/bin/kafka-server-start.sh /var/lib/docker/overlay2/8244efe326332818bd6b780a6333a2af67b2731cf32ded8a9e72f2179c7bb7d6/diff/opt/kafka_2.13-2.8.1/config/server.properties

也可以进入bin目录,然后

./kafka-server-start.sh  -daemon ../config/server.properties &

在这里插入图片描述

(4)kafka停止和重启

./kafka-server-stop.sh  -daemon ../config/server.properties &

然后重新开启

(5)测试创建主题

/kafka-verifiable-producer.sh --broker-list localhost:9092 --topic mykafka 

【6】检查zookeeper和kafka的状态

(1)查看ZooKeeper 容器运行状态

首先确认 ZooKeeper 容器是否正常启动:

# 查看所有运行的容器
docker ps

# 若未运行,查看所有容器(包括停止的)
docker ps -a

正常状态:ZooKeeper 容器的STATUS应为Up(运行中)。
若未运行,检查启动命令或日志:

# 查看ZooKeeper容器日志(替换为实际容器名或ID)
docker logs zookeeper

(2)连接 ZooKeeper 验证服务可用性

通过 ZooKeeper 客户端工具(容器内已内置)连接服务,验证是否可正常交互:

# 进入ZooKeeper容器
docker exec -it zookeeper /bin/bash

# 使用zkCli.sh连接本地ZooKeeper(默认端口2181)
./bin/zkCli.sh -server localhost:2181

成功连接后会显示Connected to localhost:2181,并进入[zk: localhost:2181(CONNECTED)]交互模式。
执行简单命令测试(如查看根节点):

[zk: localhost:2181(CONNECTED)] ls /
# 正常输出应包含默认节点(如/zookeeper)

输入quit退出客户端。

(3)查看 Kafka 容器运行状态

# 查看Kafka容器状态
docker ps | grep kafka

正常状态:STATUS为Up,且无频繁重启(Restarting)。
若异常,查看 Kafka 日志(关键!):

# 查看Kafka容器日志(替换为实际容器名或ID)
docker logs kafka

无ERROR级日志(如连接 ZooKeeper 失败的错误)即为正常。

(4)验证 Kafka broker 可用性

通过 Kafka 内置工具创建测试主题(Topic),验证 broker 是否正常工作:

# 进入Kafka容器
docker exec -it kafka /bin/bash

# 创建测试主题(名称为test,1个分区,1个副本)
./bin/kafka-topics.sh --create \
  --bootstrap-server localhost:9092 \
  --topic test \
  --partitions 1 \
  --replication-factor 1

成功输出:Created topic test.
若失败,检查日志中是否有 “无法连接 broker” 或 “权限不足” 等错误。

(5)测试 Kafka 生产消费功能

进一步验证 Kafka 能否正常收发消息:

# (在Kafka容器内)启动消费者监听test主题
./bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic test \
  --from-beginning

打开另一个终端,进入 Kafka 容器启动生产者:

# (新终端)进入Kafka容器
docker exec -it kafka /bin/bash

# 启动生产者发送消息
./bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic test

在生产者终端输入消息(如hello kafka),消费者终端应实时收到消息,说明 Kafka 服务正常。

(6)验证 Kafka 与 ZooKeeper 的连接是否正常

Kafka 依赖 ZooKeeper 存储元数据(如主题、分区信息),两者连接正常的核心标志是:Kafka 成功在 ZooKeeper 中注册元数据。

方法 1:通过 ZooKeeper 客户端查看 Kafka 注册的节点
(1)进入 ZooKeeper 容器并连接客户端(如前文步骤):

docker exec -it zookeeper /bin/bash
./bin/zkCli.sh -server localhost:2181

(2)查看 Kafka 在 ZooKeeper 中创建的节点:

# 查看Kafka相关节点(Kafka默认会在/zookeeper下注册元数据)
[zk: localhost:2181(CONNECTED)] ls /brokers
# 正常输出应包含/ids、/topics等子节点,例如:
# [ids, topics, seqid]

# 查看已注册的Kafka broker ID
[zk: localhost:2181(CONNECTED)] ls /brokers/ids
# 输出示例:[0](表示ID为0的broker已注册,即当前Kafka容器)

若/brokers节点存在且包含内容,说明 Kafka 已成功连接 ZooKeeper 并注册元数据。

方法 2:通过 Kafka 日志验证连接
查看 Kafka 容器日志,搜索与 ZooKeeper 相关的信息:

docker logs kafka | grep -i zookeeper

正常日志应包含 “成功连接 ZooKeeper” 的信息,例如:

INFO Successfully connected to ZooKeeper (org.apache.kafka.common.utils.AppInfoParser)
INFO ZooKeeper session established, sessionid: 0x10000000000000, negotiated timeout: 6000 (org.apache.zookeeper.ClientCnxn)

若日志中出现Connection to zookeeper failed或ZooKeeper session expired,说明连接失败(需检查网络或配置)。

方法 3:通过 Kafka 工具查看 broker 信息
使用 Kafka 工具查看 broker 是否正常注册(依赖 ZooKeeper):

# 进入Kafka容器
docker exec -it kafka /bin/bash

# 查看broker列表(需通过ZooKeeper地址查询)
./bin/kafka-brokers.sh --zookeeper zookeeper:2181 --list

正常输出:0(表示 ID 为 0 的 broker 已通过 ZooKeeper 注册)。

总结
验证流程:
ZooKeeper 正常:容器运行 +zkCli.sh可连接并查看节点。
Kafka 正常:容器运行 + 能创建主题 + 生产消费消息正常。
Kafka 连接 ZooKeeper 正常:ZooKeeper 中存在/brokers节点 + Kafka 日志无连接错误 +kafka-brokers.sh能列出 broker。

【7】docker的基本操作

(1)拉取镜像

# 拉取指定镜像(默认latest标签)
docker pull [镜像名]
# 示例:拉取ubuntu最新镜像
docker pull ubuntu

# 拉取指定版本镜像(通过标签)
docker pull [镜像名]:[标签]
# 示例:拉取mysql 8.0版本
docker pull mysql:8.0

(2)查看本地镜像

# 列出所有本地镜像
docker images
# 或(简洁格式)
docker image ls

# 过滤查看指定镜像
docker images [镜像名]
# 示例:查看所有ubuntu镜像
docker images ubuntu

(3)创建容器

# 基本格式:创建并启动容器(交互式,退出后容器停止)
docker run -it [镜像名] /bin/bash

# 常用参数说明:
# -d:后台运行(守护模式)
# -p:端口映射(宿主端口:容器端口)
# -v:挂载数据卷(宿主路径:容器路径)
# --name:指定容器名称
# --rm:容器停止后自动删除

# 示例1:后台启动nginx,映射80端口,命名为mynginx
docker run -d -p 80:80 --name mynginx nginx

# 示例2:交互式启动ubuntu,退出后自动删除
docker run -it --rm ubuntu /bin/bash

# 示例3:启动mysql,挂载数据卷,设置环境变量
docker run -d -p 3306:3306 \
  -v /my/mysql/data:/var/lib/mysql \
  -e MYSQL_ROOT_PASSWORD=123456 \
  --name mymysql \
  mysql:8.0

(4)查看本地容器

# 列出所有运行中的容器
docker ps

# 列出所有容器(包括停止的)
docker ps -a

# 查看容器详细信息(通过容器名或ID)
docker inspect [容器名/ID]

# 查看容器运行状态(简洁)
docker stats [容器名/ID]

进入容器

# 进入运行中的容器(交互式,支持命令行操作)
docker exec -it [容器名/ID] /bin/bash
# 示例:进入mynginx容器
docker exec -it mynginx /bin/bash

# 若容器内无bash,可用sh
docker exec -it [容器名/ID] /bin/sh

查看容器内部进程

# 查看容器实时日志(默认最后10行)
docker logs [容器名/ID]

# 查看实时日志并持续输出(类似tail -f)
docker logs -f [容器名/ID]

# 查看指定行数的日志
docker logs -n 100 [容器名/ID]  # 最后100行

# 查看包含时间戳的日志
docker logs -t [容器名/ID]

查看容器端口映射

# 查看容器的端口映射关系
docker port [容器名/ID]

(5)启动和关闭容器

# 启动已停止的容器
docker start [容器名/ID]

# 停止运行中的容器(优雅关闭)
docker stop [容器名/ID]

# 强制停止容器
docker kill [容器名/ID]

# 重启容器
docker restart [容器名/ID]

(6)删除容器

# 删除已停止的容器
docker rm [容器名/ID]

# 强制删除运行中的容器
docker rm -f [容器名/ID]

# 删除所有已停止的容器
docker container prune

(7)删除镜像

# 通过镜像ID删除(需先删除依赖该镜像的容器)
docker rmi [镜像ID]
# 或通过镜像名:标签删除
docker rmi [镜像名]:[标签]

# 强制删除(即使有容器依赖,慎用)
docker rmi -f [镜像ID]

# 删除所有未使用的镜像(无标签的虚悬镜像)
docker image prune

(7)查看日志

# 查看容器实时日志(默认最后10行)
docker logs [容器名/ID]

# 查看实时日志并持续输出(类似tail -f)
docker logs -f [容器名/ID]

# 查看指定行数的日志
docker logs -n 100 [容器名/ID]  # 最后100行

# 查看包含时间戳的日志
docker logs -t [容器名/ID]

(8)其他操作

# 保存镜像为本地文件(.tar格式)
docker save -o [保存路径/文件名.tar] [镜像名]:[标签]
# 示例:保存ubuntu镜像为ubuntu.tar
docker save -o ./ubuntu.tar ubuntu:latest

# 加载本地镜像文件
docker load -i [镜像文件.tar]

# 给镜像打标签(用于推送至仓库)
docker tag [原镜像名:标签] [新标签,如仓库地址/镜像名:标签]
# 示例:标记本地镜像为私有仓库镜像
docker tag ubuntu:latest myrepo/ubuntu:v1

【三】服务快速重启

通过shell脚本实现对zookeeper和kafka的状态检查和重启

#!/bin/bash
# 脚本功能:检查Docker中的ZooKeeper和Kafka容器状态,未运行则重启
# 使用方法:chmod +x zk_kafka_monitor.sh && ./zk_kafka_monitor.sh

# 配置容器名称(根据实际情况修改)
ZOOKEEPER_CONTAINER="zookeeper"
KAFKA_CONTAINER="kafka"

# 日志文件路径
LOG_FILE="/var/log/zk_kafka_monitor.log"

# 记录日志函数
log() {
    local timestamp=$(date "+%Y-%m-%d %H:%M:%S")
    echo "[$timestamp] $1" >> $LOG_FILE
    echo "[$timestamp] $1"  # 同时输出到控制台
}

# 检查容器是否运行
check_container() {
    local container_name=$1
    # 检查容器是否存在且状态为运行中
    if docker ps --filter "name=^/$container_name$" --filter "status=running" --format "{{.Names}}" | grep -q "$container_name"; then
        return 0  # 运行中
    else
        return 1  # 未运行
    fi
}

# 重启容器
restart_container() {
    local container_name=$1
    log "开始重启容器: $container_name"
    
    # 尝试停止容器(若正在运行)
    if docker ps --filter "name=^/$container_name$" --format "{{.Names}}" | grep -q "$container_name"; then
        if docker stop $container_name; then
            log "成功停止容器: $container_name"
        else
            log "警告:停止容器 $container_name 失败,尝试强制停止"
            docker kill $container_name > /dev/null 2>&1
        fi
    fi
    
    # 启动容器
    if docker start $container_name; then
        log "成功启动容器: $container_name"
        # 等待3秒后再次检查状态
        sleep 3
        if check_container $container_name; then
            log "容器 $container_name 重启后状态正常"
            return 0
        else
            log "错误:容器 $container_name 重启后仍未正常运行"
            return 1
        fi
    else
        log "错误:启动容器 $container_name 失败"
        return 1
    fi
}

# 主逻辑
main() {
    log "===== 开始检查ZooKeeper和Kafka状态 ====="
    
    # 先检查ZooKeeper(Kafka依赖ZooKeeper)
    if check_container $ZOOKEEPER_CONTAINER; then
        log "ZooKeeper容器运行正常"
    else
        log "ZooKeeper容器未运行,需要重启"
        restart_container $ZOOKEEPER_CONTAINER
        # 若ZooKeeper重启失败,直接退出(Kafka依赖它)
        if [ $? -ne 0 ]; then
            log "ZooKeeper重启失败,终止检查"
            exit 1
        fi
    fi
    
    # 检查Kafka
    if check_container $KAFKA_CONTAINER; then
        log "Kafka容器运行正常"
    else
        log "Kafka容器未运行,需要重启"
        restart_container $KAFKA_CONTAINER
    fi
    
    log "===== 状态检查与重启操作完成 ====="
    echo "----------------------------------------"
}

# 执行主逻辑
main

添加权限

chmod +x zk_kafka_monitor.sh

运行脚本

./zk_kafka_monitor.sh

日志查看

tail -f /var/log/zk_kafka_monitor.log

脚本关键点说明
容器检查逻辑:通过docker ps过滤运行状态的容器,确保准确性
依赖处理:先检查并重启 ZooKeeper,再处理 Kafka,避免依赖问题
错误处理:停止容器失败时尝试强制终止,启动后再次验证状态
日志记录:同时输出到控制台和日志文件,方便调试和追溯

【四】安装kafka控制面板

【五】远程连接和测试案例

【1】测试端口接通

(1)关闭防火墙或者开发端口9092
(2)远程连接测试端口

telnet 192.168.19.16 9092

在这里插入图片描述