k8s 配置 Kafka SASL_SSL双重认证

发布于:2025-05-21 ⋅ 阅读:(14) ⋅ 点赞:(0)

说明

kafka提供了多种安全认证机制,主要分为SASLSSL两大类。

SASL: 是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中SASL/PLAIN是基于账号密码的认证方式。
SSL: 是一种加密协议,用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接,并对传输的数据进行加密和解密,以防止未经授权的访问和篡改。
Kafka 中启用 SASL_SSL 安全协议时,SASL 用于客户端和服务器之间的身份验证,SSL 则用于加密和保护数据的传输。不仅提供身份验证,还提供加密和数据保护的功能。


要在 Kubernetes 中为 Kafka 配置 SASL_SSL,你需要先完成以下两大步骤:


第一部分:生成 SSL 证书用于 SASL_SSL(JKS 格式)

Kafka 使用 Java 的 Keystore/Truststore(.jks 文件)作为证书格式。

步骤 1:创建 CA 根证书

openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650 -passout pass:123456 -subj "/CN=Kafka-CA"

在这里插入图片描述


步骤 2:为 Kafka 生成 keystore

keytool -genkeypair -keystore kafka.keystore.jks  -validity 365 -storepass 123456 -keypass 123456 -dname "CN=kafka"   -alias kafka  -keyalg RSA

在这里插入图片描述

步骤 3:创建证书签名请求(CSR)

keytool -keystore kafka.keystore.jks -alias kafka  -certreq -file kafka.csr -storepass 123456

在这里插入图片描述


步骤 4:用 CA 签名 Kafka 证书

openssl x509 -req -CA ca-cert -CAkey ca-key -in kafka.csr -out kafka-signed.crt -days 365 -CAcreateserial -passin pass:123456

在这里插入图片描述


步骤 5:将 CA 证书导入 Kafka keystore

keytool -keystore kafka.keystore.jks -alias CARoot -import -file ca-cert -storepass 123456 -noprompt

在这里插入图片描述


步骤 6:导入已签名 Kafka 证书

keytool -keystore kafka.keystore.jks -alias kafka -import -file kafka-signed.crt -storepass 123456

在这里插入图片描述


步骤 7:创建 truststore(客户端用)

keytool -keystore kafka.truststore.jks -alias CARoot -import -file ca-cert -storepass 123456 -noprompt

在这里插入图片描述


✅ 生成完成的文件:

  • kafka.keystore.jks:服务端使用(含私钥)
  • kafka.truststore.jks:客户端和服务端都使用(信任CA)
  • 密码:统一用 123456(你可以自定义)
    在这里插入图片描述

✅ 第二部分:Kafka 中配置 SASL_SSL

将上述文件配置到 Kafka 中(假设你在 Kubernetes 中运行):


🗂 配置 1:Kafka 环境变量(在 StatefulSet / Deployment 中)

env:
  # --- 控制器配置(KRaft 模式) ---
  - name: KAFKA_CFG_NODE_ID
    value: "0"  # 当前节点的唯一 ID(用于 controller quorum)
  - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
    value: CONTROLLER  # 控制器监听使用的 listener 名称,需与 KAFKA_CFG_LISTENERS 中一致
  - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTER_CLIENT_QUOTA_WINDOW_NUM
    value: "10"  # 控制器选举客户端配额窗口数量(流控相关)
  - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTER_CLIENT_QUOTA_WINDOW_SIZE_SECONDS
    value: "1"  # 每个窗口的时长,单位秒
  - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTER_REQUEST_TIMEOUT_MS
    value: "5000"  # 控制器之间选举通信的请求超时时间(毫秒)
  - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
    value: "0@kafka:9093"  # controller 选举配置:nodeId@host:port

  # --- Kafka 常规配置 ---
  - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
    value: "true"  # 启用自动创建 topic(生产建议关闭)
  - name: KAFKA_ENABLE_KRAFT
    value: "YES"  # 启用 KRaft 模式(即不使用 ZooKeeper)

  # --- 节点角色定义 ---
  - name: KAFKA_CFG_PROCESS_ROLES
    value: "broker,controller"  # 当前节点同时担任 broker 和 controller 角色

  # --- Listener 与协议映射 ---
  - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
    value: "CONTROLLER:SASL_PLAINTEXT,PLAINTEXT:SASL_SSL"  # 每个 listener 使用的安全协议
      # CONTROLLER 使用 SASL_PLAINTEXT,用于控制器通信
      # PLAINTEXT 实际绑定 SASL_SSL,用于 broker/client 通信(命名不影响协议)
  - name: KAFKA_INTER_BROKER_LISTENER_NAME
    value: PLAINTEXT  # Kafka Broker 间通信使用的 listener 名称(上方定义)
  - name: KAFKA_CFG_LISTENERS
    value: PLAINTEXT://:9092,CONTROLLER://:9093   # Broker 和 Controller 的监听端口及协议标识
  - name: KAFKA_CFG_ADVERTISED_LISTENERS
    value: PLAINTEXT://192.168.1.5:9092   # Kafka 向客户端暴露的访问地址  


  # --- SSL 配置 ---
  - name: KAFKA_SSL_KEYSTORE_LOCATION
    value: /bitnami/kafka/config/certs/kafka.keystore.jks  # 服务端密钥 + 证书文件路径
  - name: KAFKA_SSL_KEYSTORE_PASSWORD
    value: 123456  # keystore 文件访问密码
  - name: KAFKA_SSL_KEY_PASSWORD
    value: 123456   # keystore 内私钥使用的密码
  - name: KAFKA_SSL_TRUSTSTORE_LOCATION
    value: /bitnami/kafka/config/certs/kafka.truststore.jks  # CA 证书文件路径(信任的客户端)
  - name: KAFKA_SSL_TRUSTSTORE_PASSWORD
    value: 123456  # truststore 文件访问密码
  - name: KAFKA_SSL_CLIENT_AUTH
    value: required  # 启用客户端证书校验(双向认证)

  # --- SASL 配置(PLAIN 机制)---
  - name: KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL
    value: PLAIN  # 控制器间通信使用 PLAIN SASL 机制
  - name: KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL
    value: PLAIN  # Broker 间通信使用 PLAIN SASL 机制

  # --- TLS 类型与证书密码 ---
  - name: KAFKA_TLS_TYPE
    value: JKS  # TLS 密钥文件类型(Java KeyStore)
  - name: KAFKA_CERTIFICATE_PASSWORD
    value: 123456# 证书统一使用的访问密码(用于 SSL 参数)

  # --- 监听器角色映射 ---
  - name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
    value: SASL_SSL  # Broker 间通信使用的安全 listener 名称
  - name: KAFKA_CLIENT_LISTENER_NAME
    value: SASL_SSL  # 客户端连接 Kafka 使用的 listener 名称

  # --- SASL 用户配置 ---
  - name: KAFKA_CONTROLLER_USER
    value: kafka  # 控制器之间通信使用的用户名
  - name: KAFKA_CONTROLLER_PASSWORD
    value: kafka123  # 控制器之间通信使用的密码
  - name: KAFKA_INTER_BROKER_USER
    value: kafka  # Broker 间通信使用的用户名
  - name: KAFKA_INTER_BROKER_PASSWORD
    value: kafka123  # Broker 间通信使用的密码
  - name: KAFKA_CLIENT_USERS
    value: kafka  # 允许连接的客户端用户名(多个用逗号分隔)
  - name: KAFKA_CLIENT_PASSWORDS
    value: kafka123  # 客户端对应密码,顺序与用户名保持一致


🗂 配置 2:Kubernetes 中挂载证书和 JAAS 文件

volumeMounts:
  - name: kafka-secrets
    mountPath: /bitnami/kafka/config/certs

volumes:
  - name: kafka-secrets
    secret:
      secretName: kafka-cert-secret
  - name: jaas-config
    configMap:
      name: kafka-jaas-config

你需要将 .jks 文件和密码打包为 Secret:

kubectl create secret generic kafka-cert-secret  -n 命名空间 --from-file=kafka.keystore.jks  --from-file=kafka.truststore.jks

✅ Kafka 客户端配置(示例)

yaml 配置文件

spring:
  kafka:
    bootstrap-servers: 192.168.1.5:9092 # Kafka Broker 的地址和端口
    listener:
      ack-mode: MANUAL_IMMEDIATE # 消费者手动提交消息确认(ack),立即提交(MANUAL_IMMEDIATE),适合需要精确控制 offset 提交时机的业务场景。
    consumer:
      custom-environment: dev  # 自定义字段,可用于 profile 配置(非 Spring Kafka 标准字段)
      auto-offset-reset: latest # 从最新消息开始消费(若无提交 offset)
      enable-auto-commit: false # 关闭自动提交,需手动调用 ack.acknowledge()
      #      auto-commit-interval: 2000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 50 # 每次 poll 最多拉取 50 条记录
      max-poll-interval-ms: 600000 # 最大处理时间 10 分钟,超时视为挂掉
    producer:
      retries: 0 # 不重试(可调高)
      batch-size: 16384 # 每批 16KB,达到此大小或 linger.ms 超时才发送
      buffer-memory: 33554432 # 缓冲区大小
      # String 类型键值序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer #
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    ssl:
      # Kafka 客户端验证服务端证书是否可信(客户端信任的 CA 证书放在 truststore 中)
      # .jks 文件必须放在 resources/ 目录下并打包到 classpath
      trust-store-location: classpath:kafka.truststore.jks
      trust-store-password: 123456

    properties:
      sasl:
        mechanism: PLAIN # 使用 SASL/PLAIN 机制进行身份验证
        jaas:
          # 此处填写 SASL登录时分配的用户名密码(注意password结尾;)
          # 此处用户名 kafka 和密码 kafka123 必须与服务端 Kafka 设置的 KAFKA_CLIENT_USERS 一致
          config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka123"; 
      security:
        protocol: SASL_SSL # 通信使用 SASL 认证 + SSL 加密
      ssl:
        endpoint:
          identification:
            algorithm: "" # 关闭主机名验证,否则会因 SAN 缺失导致 SSL 握手失败(Java 默认开启)
      # ssl.endpoint.identification.algorithm=
      # producer.ssl.endpoint.identification.algorithm=
      # consumer.ssl.endpoint.identification.algorithm= 

生产者

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
  
import java.util.Properties;
  
public class KafkaProducerSaslSslExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-kafka-broker:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         
        // 配置SASL认证方式为SASL_SSL
        props.put(ProducerConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(ProducerConfig.SASL_MECHANISM, "PLAIN"); // 或者其他支持的SASL机制
         
        // 配置Kerberos认证所需的相关参数
        props.put(ProducerConfig.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<your-username>\" password=\"<your-password>\";");
        Producer<String, String> producer = new KafkaProducer<>(props);
        // 生产者使用示例
        producer.send(new ProducerRecord<>("your-topic", "message-key", "message-value"), (metadata, exception) -> {
            if (exception == null) {
                System.out.println("消息发送成功");
            } else {
                exception.printStackTrace();
            }
        });
         
        producer.close();
    }
}

注意
在这个示例中,我们配置了 Kafka 生产者所需的基本参数,并通过ProducerConfig.SECURITY_PROTOCOL_CONFIG 指定了安全协议为 SASL_PLAINTEXT。然后,我们设置了 SASL_MECHANISM_CONFIGPLAIN 并提供了 JAAS 配置 (SASL_JAAS_CONFIG),其中包含了用于连接到 Kafka 集群的用户名和密码。

请确保将 <your-username>, <your-password>, kafka-broker1:9092, kafka-broker2:9092, 和 your-topic 替换为你的实际用户名、密码、Kafka 代理地址、主题名称。
你的Kafka集群已经配置了SSL和SASL认证,并且相关的安全设置是正确的

消费者

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;
    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;

    @Value("${spring.kafka.consumer.max-poll-interval-ms}")
    private String maxPollIntervalMs;

    @Bean
    public ConsumerFactory<String, String> custConsumerConfigFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);

        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "PLAIN");
        props.put("ssl.endpoint.identification.algorithm", "");
        props.put("consumer.ssl.endpoint.identification.algorithm", "");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"kafka\" password=\"kafka123\";");

        // SSL配置
        props.put("ssl.truststore.location", "D:\\code\\ideaprojects\\zhubay-test\\src\\main\\resources\\kafka.truststore.jks");
        props.put("ssl.truststore.password", "123456");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> daConsumerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(custConsumerConfigFactory());
        factory.setBatchListener(true); // 启用批量消费
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

在这个示例中,我们配置了KafkaConsumer以使用SASL_SSL协议进行通信,并且指定了SASLPLAIN认证机制。我们还需要指定SSL的信任库和密钥库的位置以及它们的密码。sasl.jaas.config 属性中应该包含有效的JAAS配置,它定义了用于认证的用户名和密码。


网站公告

今日签到

点亮在社区的每一天
去签到