一、Zookeeper 配置 SASL/PLAIN 认证(每个zookeeper节点都要做)
1.1 在 zookeeper 的 conf 目录下,创建 zk_server_jaas.conf 文件,内容如下
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_kafka="kafka";
};
username="admin" 是 zookeeper 实例之间通信的用户;user_kafka="kafka" 是 kafka broker 与 zookeeper 连接的时候的认证用户,密码为=后面的值;
1.2 修改 zoo.cfg ,添加以下内容
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
1.3 因为认证的时候用到包org.apache.kafka.common.security.plain.PlainLoginModule, 这个是 kafka-client.jar 里面,所有需要将相应的 jar 包拷贝到 zookeeper 安装根目录的 lib 目录下, 大概要 copy 以下这些 jar 包
mv /opt/module/kafka_2.12-2.4.1/libs/kafka-clients-2.4.1.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/lz4-java-1.6.0.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/osgi-resource-locator-1.0.1.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/slf4j-api-1.7.28.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/snappy-java-1.1.7.3.jar /opt/module/zookeeper/lib/
1.4 修改 zookeeper 启动命令参数,在文件末尾追加以下内容
SERVER_JVMFLAGS="-Djava.security.auth.login.config=/opt/module/zookeeper/conf/zk_server_jaas.conf"
1.5 重启 zookeeper 服务。
二、Kafka Server 配置 SASL/PLAIN 认证(每个kafka节点都要做)
2.1 修改 kafka server.properties 文件,添加以下内容(各个node修改为自己的hostname)
listeners=SASL_PLAINTEXT://node2:9092
advertised.listeners=SASL_PLAINTEXT://node2:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# 配置ACL入口类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# default false | true to accept all the users to use it.
# allow.everyone.if.no.acl.found=true
# 设置本例中admin为超级用户
super.users=User:admin
2.2 在kafka config 目录下创建 kafka_server_jass.conf 文件,内容如下
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_consumer="consumer"
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafka";
};
KafkaServer 段里面配置了 broker 之间的认证配置以及 client 和 broker 之间的认证配置
KafkaServer.username, KafkaServer.password 用于broker之间的相互认证
KafkaServer.user_admin和KafkaServer.user_consumer 用于 client 和broker 之间的认证, 下面我们 client 里面都用用户 consumer 进行认证
Client 段里面定义 username 和 password 用于 broker 与 zookeeper 连接的认证;
2.3 修改 kafka 启动命令脚本工具 kafka-server-start.sh,添加 java.security.auth.login.config 环境变量
将最后一句
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
修改为
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/opt/module/kafka_2.12-2.4.1/config/kafka_server_jaas.conf kafka.Kafka "$@"
2.4 修改完所有节点后,重启 kafka 服务,查看启动日志。
三、 Kafka client 的认证配置
3.1 在 Kafka 的 config 目录下创建 kafka_client_jaas.conf 文件,内容如下
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="consumer"
password="consumer";
};
3.2 修改 kafka 的 consumer.properties 和 producer.properties,添加以下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
3.3 修改 producer 启动脚本参数,修改 kafka-console-producer.sh 文件
将最后一行
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
修改为
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/module/kafka_2.12-2.4.1/config/kafka_client_jaas.conf kafka.tools.ConsoleProducer "$@"
3.4 修改 consumer 启动脚本参数,修改 kafka-console-consumer.sh 文件
将最后一行
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
修改为
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/module/kafka_2.12-2.4.1/config/kafka_client_jaas.conf kafka.tools.ConsoleConsumer "$@"
四、ACL 配置
4.1 因为我们有个超级用户 admin,所以可以用 admin 做生产者,admin不需要做 ACL 配置
4.2 为 test topic 添加消费者用户 consumer
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241 --add --allow-principal User:consumer --consumer --topic test --group test_group
4.3 测试
生产者测试
kafka-console-producer.sh --broker-list
node2 --topic test --producer.config /opt/module/kafka_2.12-2.4.1/config/producer.properties
消费者测试
kafka-console-consumer.sh --bootstrap-server
node2:9092 --topic test --from-beginning --consumer.config
/opt/module/kafka_2.12-2.4.1/config/consumer.properties
4.4 Flink 代码消费测试
object TestKafkaSource {
// Kafka Broker 地址
private val BOOTSTRAP_SERVERS = "node2:9092"
// SASL/PLAIN 认证配置
private val SASL_USERNAME = "consumer"
private val SASL_PASSWORD = "consumer"
// Topic 名称
private val ALLOWED_TOPIC = "test"
private val DENIED_TOPIC = "test1"
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS)
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
// SASL/PLAIN 配置
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
props.put("sasl.mechanism", "PLAIN")
props.put("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", SASL_USERNAME, SASL_PASSWORD))
val dataSource: DataStreamSource[String] = env.addSource(new FlinkKafkaConsumer[String](ALLOWED_TOPIC , new SimpleStringSchema(), props))
dataSource.print()
env.execute()
}
}
五、kafka-exporter 配置(使用了 kafka-exporter 监控 kafka 集群才做)
5.1 在kafka_server_jaas.conf 文件中加入 kafka-exporter 用户信息,内容如下
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_consumer="consumer"
user_kafka-exporter="kafka-exporter"
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafka";
};
5.2 分发到所有 kafka broker ,并重启 kafka 服务
5.3 为 kafka-exporter 用户配置权限
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241 --add --allow-principal User:kafka-exporter --operation Describe --cluster
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241 --add --allow-principal User:kafka-exporter --operation Describe --topic '*'
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241 --add --allow-principal User:kafka-exporter --operation Describe --group '*'
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241 --add --allow-principal User:kafka-exporter --operation Read --topic '*'
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241 --add --allow-principal User:kafka-exporter --operation Read --topic __consumer_offsets
5.4 配置 kafka_export 服务启动参数,在 kafka-export 安装目录下创建一个文件 start_kafka_export ,内容如下
#!/bin/bash
export KAFKA_SASL_USERNAME="kafka-exporter"
export KAFKA_SASL_PASSWORD="kafka-exporter"
export KAFKA_SASL_MECHANISM="PLAIN"
/opt/module/kafka_exporter-1.7.0/kafka_exporter --kafka.server=node2:9092 --web.listen-address=:9308 --zookeeper.server=node2:2181/kafka241 \
--sasl.enabled \
--sasl.username=$KAFKA_SASL_USERNAME \
--sasl.password=$KAFKA_SASL_PASSWORD \
--sasl.mechanism=$KAFKA_SASL_MECHANISM
5.5 修改 kafka_export.service 文件,内容如下
[Unit]
Description=kafka_exporter
Wants=prometheus.service
After=network.target prometheus.service
[Service]
Type=simple
User=bigdata
Group=bigdata
Restart=on-failure
WorkingDirectory=/opt/module/kafka_exporter-1.7.0
#ExecStart=/opt/module/kafka_exporter-1.7.0/kafka_exporter --kafka.server=node2:9092 --web.listen-address=:9308 --zookeeper.server=node2:2181
ExecStart=/opt/module/kafka_exporter-1.7.0/start_kafka_exporter
[Install]
WantedBy=multi-user.target
5.6 重新加载 systemctl 管理的服务
sudo systemctl daemon-reload
5.7 启动 kafka_exporter 服务
sudo systemctl start kafka_exporter
5.8 上 grafana 查看是否能获取 kafka 状态
六、 参考文档
6.1 官网文档 Apache Kafka