说明
kafka
提供了多种安全认证机制,主要分为SASL
和SSL
两大类。
SASL: 是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中SASL/PLAIN是基于账号密码的认证方式。
SSL: 是一种加密协议,用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接,并对传输的数据进行加密和解密,以防止未经授权的访问和篡改。
在 Kafka
中启用 SASL_SSL
安全协议时,SASL
用于客户端和服务器之间的身份验证,SSL
则用于加密和保护数据的传输。不仅提供身份验证,还提供加密和数据保护的功能。
要在 Kubernetes 中为 Kafka 配置 SASL_SSL,你需要先完成以下两大步骤:
第一部分:生成 SSL 证书用于 SASL_SSL(JKS 格式)
Kafka 使用 Java 的 Keystore/Truststore(.jks
文件)作为证书格式。
步骤 1:创建 CA 根证书
openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650 -passout pass:123456 -subj "/CN=Kafka-CA"
步骤 2:为 Kafka 生成 keystore
keytool -genkeypair -keystore kafka.keystore.jks -validity 365 -storepass 123456 -keypass 123456 -dname "CN=kafka" -alias kafka -keyalg RSA
步骤 3:创建证书签名请求(CSR)
keytool -keystore kafka.keystore.jks -alias kafka -certreq -file kafka.csr -storepass 123456
步骤 4:用 CA 签名 Kafka 证书
openssl x509 -req -CA ca-cert -CAkey ca-key -in kafka.csr -out kafka-signed.crt -days 365 -CAcreateserial -passin pass:123456
步骤 5:将 CA 证书导入 Kafka keystore
keytool -keystore kafka.keystore.jks -alias CARoot -import -file ca-cert -storepass 123456 -noprompt
步骤 6:导入已签名 Kafka 证书
keytool -keystore kafka.keystore.jks -alias kafka -import -file kafka-signed.crt -storepass 123456
步骤 7:创建 truststore(客户端用)
keytool -keystore kafka.truststore.jks -alias CARoot -import -file ca-cert -storepass 123456 -noprompt
✅ 生成完成的文件:
kafka.keystore.jks
:服务端使用(含私钥)kafka.truststore.jks
:客户端和服务端都使用(信任CA)- 密码:统一用
123456
(你可以自定义)
✅ 第二部分:Kafka 中配置 SASL_SSL
将上述文件配置到 Kafka 中(假设你在 Kubernetes 中运行):
🗂 配置 1:Kafka 环境变量(在 StatefulSet / Deployment 中)
env:
# --- 控制器配置(KRaft 模式) ---
- name: KAFKA_CFG_NODE_ID
value: "0" # 当前节点的唯一 ID(用于 controller quorum)
- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
value: CONTROLLER # 控制器监听使用的 listener 名称,需与 KAFKA_CFG_LISTENERS 中一致
- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTER_CLIENT_QUOTA_WINDOW_NUM
value: "10" # 控制器选举客户端配额窗口数量(流控相关)
- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTER_CLIENT_QUOTA_WINDOW_SIZE_SECONDS
value: "1" # 每个窗口的时长,单位秒
- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTER_REQUEST_TIMEOUT_MS
value: "5000" # 控制器之间选举通信的请求超时时间(毫秒)
- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
value: "0@kafka:9093" # controller 选举配置:nodeId@host:port
# --- Kafka 常规配置 ---
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
value: "true" # 启用自动创建 topic(生产建议关闭)
- name: KAFKA_ENABLE_KRAFT
value: "YES" # 启用 KRaft 模式(即不使用 ZooKeeper)
# --- 节点角色定义 ---
- name: KAFKA_CFG_PROCESS_ROLES
value: "broker,controller" # 当前节点同时担任 broker 和 controller 角色
# --- Listener 与协议映射 ---
- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
value: "CONTROLLER:SASL_PLAINTEXT,PLAINTEXT:SASL_SSL" # 每个 listener 使用的安全协议
# CONTROLLER 使用 SASL_PLAINTEXT,用于控制器通信
# PLAINTEXT 实际绑定 SASL_SSL,用于 broker/client 通信(命名不影响协议)
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: PLAINTEXT # Kafka Broker 间通信使用的 listener 名称(上方定义)
- name: KAFKA_CFG_LISTENERS
value: PLAINTEXT://:9092,CONTROLLER://:9093 # Broker 和 Controller 的监听端口及协议标识
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: PLAINTEXT://192.168.1.5:9092 # Kafka 向客户端暴露的访问地址
# --- SSL 配置 ---
- name: KAFKA_SSL_KEYSTORE_LOCATION
value: /bitnami/kafka/config/certs/kafka.keystore.jks # 服务端密钥 + 证书文件路径
- name: KAFKA_SSL_KEYSTORE_PASSWORD
value: 123456 # keystore 文件访问密码
- name: KAFKA_SSL_KEY_PASSWORD
value: 123456 # keystore 内私钥使用的密码
- name: KAFKA_SSL_TRUSTSTORE_LOCATION
value: /bitnami/kafka/config/certs/kafka.truststore.jks # CA 证书文件路径(信任的客户端)
- name: KAFKA_SSL_TRUSTSTORE_PASSWORD
value: 123456 # truststore 文件访问密码
- name: KAFKA_SSL_CLIENT_AUTH
value: required # 启用客户端证书校验(双向认证)
# --- SASL 配置(PLAIN 机制)---
- name: KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL
value: PLAIN # 控制器间通信使用 PLAIN SASL 机制
- name: KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL
value: PLAIN # Broker 间通信使用 PLAIN SASL 机制
# --- TLS 类型与证书密码 ---
- name: KAFKA_TLS_TYPE
value: JKS # TLS 密钥文件类型(Java KeyStore)
- name: KAFKA_CERTIFICATE_PASSWORD
value: 123456# 证书统一使用的访问密码(用于 SSL 参数)
# --- 监听器角色映射 ---
- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
value: SASL_SSL # Broker 间通信使用的安全 listener 名称
- name: KAFKA_CLIENT_LISTENER_NAME
value: SASL_SSL # 客户端连接 Kafka 使用的 listener 名称
# --- SASL 用户配置 ---
- name: KAFKA_CONTROLLER_USER
value: kafka # 控制器之间通信使用的用户名
- name: KAFKA_CONTROLLER_PASSWORD
value: kafka123 # 控制器之间通信使用的密码
- name: KAFKA_INTER_BROKER_USER
value: kafka # Broker 间通信使用的用户名
- name: KAFKA_INTER_BROKER_PASSWORD
value: kafka123 # Broker 间通信使用的密码
- name: KAFKA_CLIENT_USERS
value: kafka # 允许连接的客户端用户名(多个用逗号分隔)
- name: KAFKA_CLIENT_PASSWORDS
value: kafka123 # 客户端对应密码,顺序与用户名保持一致
🗂 配置 2:Kubernetes 中挂载证书和 JAAS 文件
volumeMounts:
- name: kafka-secrets
mountPath: /bitnami/kafka/config/certs
volumes:
- name: kafka-secrets
secret:
secretName: kafka-cert-secret
- name: jaas-config
configMap:
name: kafka-jaas-config
你需要将 .jks
文件和密码打包为 Secret:
kubectl create secret generic kafka-cert-secret -n 命名空间 --from-file=kafka.keystore.jks --from-file=kafka.truststore.jks
✅ Kafka 客户端配置(示例)
yaml 配置文件
spring:
kafka:
bootstrap-servers: 192.168.1.5:9092 # Kafka Broker 的地址和端口
listener:
ack-mode: MANUAL_IMMEDIATE # 消费者手动提交消息确认(ack),立即提交(MANUAL_IMMEDIATE),适合需要精确控制 offset 提交时机的业务场景。
consumer:
custom-environment: dev # 自定义字段,可用于 profile 配置(非 Spring Kafka 标准字段)
auto-offset-reset: latest # 从最新消息开始消费(若无提交 offset)
enable-auto-commit: false # 关闭自动提交,需手动调用 ack.acknowledge()
# auto-commit-interval: 2000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 50 # 每次 poll 最多拉取 50 条记录
max-poll-interval-ms: 600000 # 最大处理时间 10 分钟,超时视为挂掉
producer:
retries: 0 # 不重试(可调高)
batch-size: 16384 # 每批 16KB,达到此大小或 linger.ms 超时才发送
buffer-memory: 33554432 # 缓冲区大小
# String 类型键值序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer #
value-serializer: org.apache.kafka.common.serialization.StringSerializer
ssl:
# Kafka 客户端验证服务端证书是否可信(客户端信任的 CA 证书放在 truststore 中)
# .jks 文件必须放在 resources/ 目录下并打包到 classpath
trust-store-location: classpath:kafka.truststore.jks
trust-store-password: 123456
properties:
sasl:
mechanism: PLAIN # 使用 SASL/PLAIN 机制进行身份验证
jaas:
# 此处填写 SASL登录时分配的用户名密码(注意password结尾;)
# 此处用户名 kafka 和密码 kafka123 必须与服务端 Kafka 设置的 KAFKA_CLIENT_USERS 一致
config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka123";
security:
protocol: SASL_SSL # 通信使用 SASL 认证 + SSL 加密
ssl:
endpoint:
identification:
algorithm: "" # 关闭主机名验证,否则会因 SAN 缺失导致 SSL 握手失败(Java 默认开启)
# ssl.endpoint.identification.algorithm=
# producer.ssl.endpoint.identification.algorithm=
# consumer.ssl.endpoint.identification.algorithm=
生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerSaslSslExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-kafka-broker:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 配置SASL认证方式为SASL_SSL
props.put(ProducerConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(ProducerConfig.SASL_MECHANISM, "PLAIN"); // 或者其他支持的SASL机制
// 配置Kerberos认证所需的相关参数
props.put(ProducerConfig.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<your-username>\" password=\"<your-password>\";");
Producer<String, String> producer = new KafkaProducer<>(props);
// 生产者使用示例
producer.send(new ProducerRecord<>("your-topic", "message-key", "message-value"), (metadata, exception) -> {
if (exception == null) {
System.out.println("消息发送成功");
} else {
exception.printStackTrace();
}
});
producer.close();
}
}
注意
在这个示例中,我们配置了 Kafka 生产者所需的基本参数,并通过ProducerConfig.SECURITY_PROTOCOL_CONFIG
指定了安全协议为 SASL_PLAINTEXT
。然后,我们设置了 SASL_MECHANISM_CONFIG
为 PLAIN
并提供了 JAAS
配置 (SASL_JAAS_CONFIG
),其中包含了用于连接到 Kafka 集群的用户名和密码。
请确保将 <your-username>, <your-password>
, kafka-broker1:9092, kafka-broker2:9092
, 和 your-topic
替换为你的实际用户名、密码、Kafka 代理地址、主题名称。
你的Kafka集群已经配置了SSL和SASL认证,并且相关的安全设置是正确的
消费者
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private String enableAutoCommit;
@Value("${spring.kafka.consumer.max-poll-records}")
private String maxPollRecords;
@Value("${spring.kafka.consumer.max-poll-interval-ms}")
private String maxPollIntervalMs;
@Bean
public ConsumerFactory<String, String> custConsumerConfigFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("ssl.endpoint.identification.algorithm", "");
props.put("consumer.ssl.endpoint.identification.algorithm", "");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"kafka\" password=\"kafka123\";");
// SSL配置
props.put("ssl.truststore.location", "D:\\code\\ideaprojects\\zhubay-test\\src\\main\\resources\\kafka.truststore.jks");
props.put("ssl.truststore.password", "123456");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> daConsumerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(custConsumerConfigFactory());
factory.setBatchListener(true); // 启用批量消费
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
在这个示例中,我们配置了KafkaConsumer
以使用SASL_SSL
协议进行通信,并且指定了SASL
的PLAIN
认证机制。我们还需要指定SSL
的信任库和密钥库的位置以及它们的密码。sasl.jaas.config
属性中应该包含有效的JAAS
配置,它定义了用于认证的用户名和密码。