1. SSL 认证配置示例
sql
INSERT INTO kafka_configs ( cluster_name, description, bootstrap_servers, auth_type, security_protocol, truststore_data, truststore_password, keystore_data, keystore_password, additional_properties ) VALUES ( 'kafka-ssl-cluster', 'SSL认证的Kafka集群', 'kafka1:9093,kafka2:9093', 'SSL', 'SSL', -- 证书二进制数据 decode('MIIDHzCCAgegAwIBAgIJAK...', 'base64'), crypt('truststore_password', gen_salt('bf')), decode('MIIDHzCCAgegAwIBAgIJAK...', 'base64'), crypt('keystore_password', gen_salt('bf')), '{ "ssl.endpoint.identification.algorithm": "https", "ssl.protocol": "TLSv1.3", "ssl.keystore.type": "PKCS12", "ssl.truststore.type": "JKS" }' );
2. SASL/PLAIN 认证配置示例
sql
INSERT INTO kafka_configs ( cluster_name, description, bootstrap_servers, auth_type, security_protocol, sasl_mechanism, sasl_username, sasl_password_hash, sasl_salt, additional_properties ) VALUES ( 'kafka-sasl-plain-cluster', 'SASL/PLAIN认证的Kafka集群', 'kafka1:9094,kafka2:9094', 'SASL_PLAIN', 'SASL_SSL', 'PLAIN', 'admin', -- 密码哈希: SHA-256 + 盐值 encode(digest('password' || 'random_salt', 'sha256'), 'hex'), 'random_salt', '{ "ssl.endpoint.identification.algorithm": "https", "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"password\";" }' );
3. SASL/SCRAM 认证配置示例
sql
INSERT INTO kafka_configs ( cluster_name, description, bootstrap_servers, auth_type, security_protocol, sasl_mechanism, sasl_username, sasl_password_hash, sasl_salt, sasl_iterations, additional_properties ) VALUES ( 'kafka-sasl-scram-cluster', 'SASL/SCRAM认证的Kafka集群', 'kafka1:9094,kafka2:9094', 'SASL_SCRAM', 'SASL_SSL', 'SCRAM-SHA-512', 'admin', -- SCRAM哈希值 (需要使用SCRAM算法生成) 'nThb8a8vT7QJZQJZQJZQJZQJZQJZQJZQJZQJZQ==', 'random_salt_123456', 4096, '{ "ssl.endpoint.identification.algorithm": "https", "sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"password\";" }' );
4. SASL/Kerberos (GSSAPI) 认证配置示例
sql
INSERT INTO kafka_configs ( cluster_name, description, bootstrap_servers, auth_type, security_protocol, sasl_mechanism, kerberos_config, service_name, additional_properties ) VALUES ( 'kafka-kerberos-cluster', 'Kerberos认证的Kafka集群', 'kafka1:9093,kafka2:9093', 'SASL_KERBEROS', 'SASL_SSL', 'GSSAPI', -- krb5.conf配置内容 ' [libdefaults] default_realm = EXAMPLE.COM dns_lookup_realm = false dns_lookup_kdc = false [realms] EXAMPLE.COM = { kdc = kerberos.example.com:88 admin_server = kerberos.example.com:749 } [domain_realm] .example.com = EXAMPLE.COM example.com = EXAMPLE.COM ', 'kafka', '{ "ssl.endpoint.identification.algorithm": "https", "sasl.jaas.config": "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/security/keytabs/kafka_client.keytab\" principal=\"kafka-client@EXAMPLE.COM\";", "sasl.kerberos.service.name": "kafka", "kerberos.principal.to.local.rules": "DEFAULT" }' );
5. OAUTHBEARER 认证配置示例
sql
INSERT INTO kafka_configs ( cluster_name, description, bootstrap_servers, auth_type, security_protocol, sasl_mechanism, additional_properties ) VALUES ( 'kafka-oauth-cluster', 'OAuth认证的Kafka集群', 'kafka1:9096,kafka2:9096', 'SASL_OAUTHBEARER', 'SASL_SSL', 'OAUTHBEARER', '{ "ssl.endpoint.identification.algorithm": "https", "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required username=\"client-id\" password=\"client-secret\" metadataServerUrls=\"https://auth-server:8080/realms/kafka/protocol/openid-connect/token\";", "sasl.login.callback.handler.class": "org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler" }' );
使用说明
敏感信息安全:
- 密码字段(如 truststore_password)使用 PostgreSQL 的
crypt()
函数进行 BCrypt 加密- SASL 密码使用 SHA-256 加盐哈希(实际生产环境建议使用更强的算法如 Argon2)
二进制数据存储:
- 证书数据使用
decode()
函数将 Base64 字符串转换为二进制存储- 大文件建议存储在文件系统,数据库仅存储路径和元数据
配置验证:
- 插入数据前需验证配置完整性(如选择 SSL 认证时,SSL 相关字段不能为空)
- 对于复杂配置(如 Kerberos),建议在应用层进行格式验证
证书管理:
- 证书有效期需通过程序定期检查(可利用 certificates 表的 expires_at 字段)
- 证书更新时需同时更新数据库记录和文件存储
这些示例展示了如何为不同认证方式填充配置表,实际应用中需根据具体环境调整参数。