kafka SASL/PLAIN 认证及 ACL 权限控制

发布于:2025-05-28 ⋅ 阅读:(16) ⋅ 点赞:(0)

一、Zookeeper 配置 SASL/PLAIN 认证(每个zookeeper节点都要做)

1.1 在 zookeeper 的 conf 目录下,创建 zk_server_jaas.conf 文件,内容如下

Server {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin"
   user_kafka="kafka";
};

username="admin" 是 zookeeper 实例之间通信的用户;user_kafka="kafka" 是 kafka broker 与 zookeeper 连接的时候的认证用户,密码为=后面的值;

1.2 修改 zoo.cfg ,添加以下内容

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

1.3 因为认证的时候用到包org.apache.kafka.common.security.plain.PlainLoginModule, 这个是 kafka-client.jar 里面,所有需要将相应的 jar 包拷贝到 zookeeper 安装根目录的 lib 目录下, 大概要 copy 以下这些 jar 包

mv /opt/module/kafka_2.12-2.4.1/libs/kafka-clients-2.4.1.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/lz4-java-1.6.0.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/osgi-resource-locator-1.0.1.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/slf4j-api-1.7.28.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/snappy-java-1.1.7.3.jar /opt/module/zookeeper/lib/

1.4 修改 zookeeper 启动命令参数,在文件末尾追加以下内容

SERVER_JVMFLAGS="-Djava.security.auth.login.config=/opt/module/zookeeper/conf/zk_server_jaas.conf"

1.5 重启 zookeeper 服务。

二、Kafka Server 配置 SASL/PLAIN 认证(每个kafka节点都要做)

2.1 修改 kafka server.properties 文件,添加以下内容(各个node修改为自己的hostname)

listeners=SASL_PLAINTEXT://node2:9092
advertised.listeners=SASL_PLAINTEXT://node2:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# 配置ACL入口类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer  
# default false | true to accept all the users to use it.
# allow.everyone.if.no.acl.found=true 
# 设置本例中admin为超级用户
super.users=User:admin

2.2 在kafka config 目录下创建 kafka_server_jass.conf 文件,内容如下

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_consumer="consumer"
};


Client {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="kafka"
    password="kafka";
};

KafkaServer 段里面配置了 broker 之间的认证配置以及 client 和 broker 之间的认证配置

KafkaServer.username, KafkaServer.password 用于broker之间的相互认证

KafkaServer.user_admin和KafkaServer.user_consumer 用于 client 和broker 之间的认证, 下面我们 client 里面都用用户 consumer 进行认证

Client 段里面定义 username 和 password 用于 broker 与 zookeeper 连接的认证;

2.3 修改 kafka 启动命令脚本工具 kafka-server-start.sh,添加 java.security.auth.login.config 环境变量

将最后一句

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

修改为

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/opt/module/kafka_2.12-2.4.1/config/kafka_server_jaas.conf kafka.Kafka "$@"

2.4 修改完所有节点后,重启 kafka 服务,查看启动日志。

三、 Kafka client 的认证配置

3.1 在 Kafka 的 config 目录下创建 kafka_client_jaas.conf 文件,内容如下

KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="consumer"
      password="consumer";
};

3.2 修改 kafka 的 consumer.properties 和 producer.properties,添加以下内容

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

3.3 修改 producer 启动脚本参数,修改 kafka-console-producer.sh 文件

将最后一行

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

修改为

exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/module/kafka_2.12-2.4.1/config/kafka_client_jaas.conf kafka.tools.ConsoleProducer "$@"

3.4 修改 consumer 启动脚本参数,修改 kafka-console-consumer.sh 文件

将最后一行

exec $(dirname $0)/kafka-run-class.sh  kafka.tools.ConsoleConsumer "$@"

修改为

exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/module/kafka_2.12-2.4.1/config/kafka_client_jaas.conf kafka.tools.ConsoleConsumer "$@"

四、ACL 配置

4.1 因为我们有个超级用户 admin,所以可以用 admin 做生产者,admin不需要做 ACL 配置

4.2 为 test topic 添加消费者用户 consumer

kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241 --add --allow-principal User:consumer --consumer --topic test --group test_group

4.3 测试

生产者测试

kafka-console-producer.sh --broker-list 
node2 --topic test --producer.config /opt/module/kafka_2.12-2.4.1/config/producer.properties

消费者测试

kafka-console-consumer.sh --bootstrap-server 
node2:9092 --topic test --from-beginning --consumer.config 
/opt/module/kafka_2.12-2.4.1/config/consumer.properties  

4.4 Flink 代码消费测试

object TestKafkaSource {
  // Kafka Broker 地址
  private val BOOTSTRAP_SERVERS = "node2:9092"


  // SASL/PLAIN 认证配置
  private val SASL_USERNAME = "consumer" 


  private val SASL_PASSWORD = "consumer"


  // Topic 名称
  private val ALLOWED_TOPIC = "test"


  private val DENIED_TOPIC = "test1"


  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment


    val props = new Properties
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS)
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group")
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")


    // SASL/PLAIN 配置
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
    props.put("sasl.mechanism", "PLAIN")
    props.put("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", SASL_USERNAME, SASL_PASSWORD))




    val dataSource: DataStreamSource[String] = env.addSource(new FlinkKafkaConsumer[String](ALLOWED_TOPIC , new SimpleStringSchema(), props))
    dataSource.print()


    env.execute()
  }
}

五、kafka-exporter 配置(使用了 kafka-exporter 监控 kafka 集群才做)

5.1 在kafka_server_jaas.conf 文件中加入 kafka-exporter 用户信息,内容如下

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_consumer="consumer"
    user_kafka-exporter="kafka-exporter"
};


Client {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="kafka"
    password="kafka";
};

5.2 分发到所有 kafka broker ,并重启 kafka 服务

5.3 为 kafka-exporter 用户配置权限

kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Describe --cluster
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Describe --topic '*'
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Describe --group '*'
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Read --topic '*'
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Read --topic __consumer_offsets

5.4 配置 kafka_export 服务启动参数,在 kafka-export 安装目录下创建一个文件 start_kafka_export ,内容如下

#!/bin/bash
export KAFKA_SASL_USERNAME="kafka-exporter"
export KAFKA_SASL_PASSWORD="kafka-exporter"
export KAFKA_SASL_MECHANISM="PLAIN"


/opt/module/kafka_exporter-1.7.0/kafka_exporter --kafka.server=node2:9092 --web.listen-address=:9308 --zookeeper.server=node2:2181/kafka241 \
  --sasl.enabled \
  --sasl.username=$KAFKA_SASL_USERNAME \
  --sasl.password=$KAFKA_SASL_PASSWORD \
  --sasl.mechanism=$KAFKA_SASL_MECHANISM

5.5 修改 kafka_export.service 文件,内容如下

[Unit]
Description=kafka_exporter
Wants=prometheus.service
After=network.target prometheus.service


[Service]
Type=simple
User=bigdata
Group=bigdata
Restart=on-failure
WorkingDirectory=/opt/module/kafka_exporter-1.7.0
#ExecStart=/opt/module/kafka_exporter-1.7.0/kafka_exporter --kafka.server=node2:9092 --web.listen-address=:9308 --zookeeper.server=node2:2181
ExecStart=/opt/module/kafka_exporter-1.7.0/start_kafka_exporter
[Install]
WantedBy=multi-user.target

5.6 重新加载 systemctl 管理的服务

sudo systemctl daemon-reload

5.7 启动 kafka_exporter 服务

sudo systemctl start kafka_exporter

5.8 上 grafana 查看是否能获取 kafka 状态

六、 参考文档

6.1 官网文档 Apache Kafka


网站公告

今日签到

点亮在社区的每一天
去签到