接kafka4.0部署 https://blog.51cto.com/mapengfei/14081048
本文适用于:
- 需要在生产环境或测试环境下实现 Kafka 全链路 SSL 加密通信的团队和个人
- 希望通过证书实现不同客户端精细化权限控制(ACL),可以精确到每个客户端能操作哪些topic已经具体操作,提升安全等级
- 需要一键自动化、批量证书签发、权限分离、运维规范的 Kafka 用户
文章内容涵盖:
- Kafka 服务端与客户端证书链全流程(含 SAN、CA、自签、PKCS12、Truststore)
- Kafka 端到端 SSL 配置、server.properties 推荐模板
- 每个客户端独立证书、基于证书 CN 的 ACL 权限精细化控制
- 一键自动化与可视化流程图
适合 Kafka 安全运维、DevOps、数据平台、微服务架构等场景,助你快速落地企业级安全实践。
文章共分为2部分,kafka的server配置和客户端认证ACL控制部分
一、Kafka Server的配置部分
1. 生成服务端证书相关目录和 openssl 配置
mkdir -p /etc/kafka/server
cat > /etc/kafka/server/san.cnf <<EOF
[ req ]
default_bits = 2048
distinguished_name = req_distinguished_name
req_extensions = v3_req
prompt = no
[ req_distinguished_name ]
C = CN
ST = BJ
L = BJ
O = test
OU = test
CN = 10.0.70.189
[ v3_req ]
subjectAltName = @alt_names
[ alt_names ]
IP.1 = 10.0.70.189
DNS.1 = 10.0.70.189
EOF
2. 生成服务端私钥和 CSR
cd /etc/kafka/server
openssl req -new -nodes -out server.csr -keyout server.key -config san.cnf
3. 生成 CA
openssl genrsa -out ca.key 2048
openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -out ca.crt -subj "/C=CN/ST=BJ/L=BJ/O=test/OU=test/CN=Kafka-CA"
4. 用 CA 签发服务端证书
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 3650 -extensions v3_req -extfile san.cnf
5. 生成 PKCS12 证书
openssl pkcs12 -export -in server.crt -inkey server.key -out kafka.server.p12 -name kafka-server -CAfile ca.crt -caname root -password pass:mafei2025
6. 生成 Truststore
rm -f kafka.server.truststore.p12
keytool -importcert -trustcacerts -alias CARoot -file ca.crt -keystore kafka.server.truststore.p12 -storetype PKCS12 -storepass mafei2025 -noprompt
7. 生成 server-ssl.properties
cat > /etc/kafka/server/server-ssl.properties <<EOF
security.protocol=SSL
ssl.truststore.location=/etc/kafka/server/kafka.server.truststore.p12
ssl.truststore.password=mafei2025
ssl.keystore.location=/etc/kafka/server/kafka.server.p12
ssl.keystore.password=mafei2025
ssl.key.password=mafei2025
EOF
8.生成/覆盖 Kafka server.properties 主要SSL配置
cat > /opt/kafka/config/server.properties <<EOF
process.roles=broker,controller
node.id=1
controller.quorum.bootstrap.servers=10.0.70.189:9093
listeners=CONTROLLER://10.0.70.189:9092,BROKER://10.0.70.189:9093
inter.broker.listener.name=BROKER
advertised.listeners=CONTROLLER://10.0.70.189:9092,BROKER://10.0.70.189:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:SSL,BROKER:SSL
ssl.keystore.type=PKCS12
ssl.keystore.location=/etc/kafka/server/kafka.server.p12
ssl.keystore.password=mafei2025
ssl.key.password=mafei2025
ssl.truststore.type=PKCS12
ssl.truststore.location=/etc/kafka/server/kafka.server.truststore.p12
ssl.truststore.password=mafei2025
ssl.client.auth=required
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
super.users=User:root;User:CN=10.0.70.189,OU=test,O=test,L=BJ,ST=BJ,C=CN
log.dirs=/data/kafka
num.partitions=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
EOF
echo "已自动生成 /opt/kafka/config/server.properties,包含SSL和KRaft主要配置。"
9.启动kafka Server
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
二、客户端A配置
1.生成 clientA 证书
mkdir -p /etc/kafka/clientA
cd /etc/kafka/clientA
openssl genrsa -out clientA.key 2048
openssl req -new -key clientA.key -out clientA.csr -subj "/CN=clientA"
openssl x509 -req -in clientA.csr -CA /etc/kafka/server/ca.crt -CAkey /etc/kafka/server/ca.key -CAcreateserial -out clientA.crt -days 3650 -sha256
openssl pkcs12 -export -in clientA.crt -inkey clientA.key -certfile /etc/kafka/server/server.crt -name clientA -out clientA.p12 -password pass:mafei2025
2. 创建 topic
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 10.0.70.189:9093 --create --topic t_mafei --partitions 1 --replication-factor 1 --command-config /etc/kafka/server/server-ssl.properties
3. 添加 ACL,限制clientA只能操作t_mafei 这个topic
/opt/kafka/bin/kafka-acls.sh --bootstrap-server 10.0.70.189:9093 --add --allow-principal "User:CN=clientA" --operation All --topic t_mafei --command-config /etc/kafka/server/server-ssl.properties
4.测试生产消息( /root/kafka_test_amd64 工具就是kafka producer客户端,文章最后有贴)
/root/kafka_test_amd64 --ca=/etc/kafka/server/ca.crt --cert=/etc/kafka/clientA/clientA.crt --key=/etc/kafka/clientA/clientA.key --topic t_mafei --broker 10.0.70.189:9093
5.换一个topic应该就能看到报错信息了, 发送消息失败: kafka server: The client is not authorized to access this topic
/root/kafka_test_amd64 --ca=/etc/kafka/server/ca.crt --cert=/etc/kafka/clientA/clientA.crt --key=/etc/kafka/clientA/clientA.key --topic test1 --broker 10.0.70.189:9093
6.其他命令
1).删除其他多余的acl
/opt/kafka/bin/kafka-acls.sh --bootstrap-server 10.0.70.189:9093 --remove --allow-principal "User:CN=clientA" --operation Create --topic test2 --command-config /etc/kafka/server/server-ssl.properties
2).查看当前所有acl
/opt/kafka/bin/kafka-acls.sh --bootstrap-server 10.0.70.189:9093 --list --command-config /etc/kafka/server/server-ssl.properties
3).为每个客户端生成 PKCS12 keystore(如需)
cd /etc/kafka/clientA
openssl pkcs12 -export -in clientA.crt -inkey clientA.key -certfile /etc/kafka/server/server.crt -name clientA -out clientA.p12 -password pass:mafei2025
每个客户端用自己的证书和私钥(或 p12 文件)连接 Kafka,配置如下:
● clientA 用 clientA.crt + clientA.key(或 clientA.p12)
● clientB 用 clientB.crt + clientB.key(或 clientB.p12)
● 都用同一个 ca.crt 作为 truststore
4)ACL授权以t_mafei开头的所有topic
/opt/kafka/bin/kafka-acls.sh --bootstrap-server 10.0.70.189:9093 --add --allow-principal "User:CN=clientA" --operation Create --topic t_mafei --resource-pattern-type PREFIXED --command-config /etc/kafka/server-ssl.properties
这样,User:CN=clientA 将拥有所有以 t_mafei 开头的 topic 的 Create 权限(如 t_mafei1、t_mafei_test 等)。
说明:
- --resource-pattern-type PREFIXED 是 Kafka 2.0 及以上版本支持的功能。
- 你也可以用 --operation All 授权所有操作。
三、总结
● 每个客户端独立生成私钥和证书请求,由同一个 CA 签发。
● 服务端只需信任 CA。
● 客户端用自己的证书和私钥连接。
● 可通过证书 CN 字段实现精细化权限控制。