以下是 Kafka 不同认证方式的配置示例,结合前面的单表设计方案,展示如何为每种认证方式填充配置表

发布于:2025-07-02 ⋅ 阅读:(17) ⋅ 点赞:(0)

1. SSL 认证配置示例

sql

INSERT INTO kafka_configs (
    cluster_name,
    description,
    bootstrap_servers,
    auth_type,
    security_protocol,
    truststore_data,
    truststore_password,
    keystore_data,
    keystore_password,
    additional_properties
) VALUES (
    'kafka-ssl-cluster',
    'SSL认证的Kafka集群',
    'kafka1:9093,kafka2:9093',
    'SSL',
    'SSL',
    -- 证书二进制数据
    decode('MIIDHzCCAgegAwIBAgIJAK...', 'base64'),
    crypt('truststore_password', gen_salt('bf')),
    decode('MIIDHzCCAgegAwIBAgIJAK...', 'base64'),
    crypt('keystore_password', gen_salt('bf')),
    '{
        "ssl.endpoint.identification.algorithm": "https",
        "ssl.protocol": "TLSv1.3",
        "ssl.keystore.type": "PKCS12",
        "ssl.truststore.type": "JKS"
    }'
);

2. SASL/PLAIN 认证配置示例

sql

INSERT INTO kafka_configs (
    cluster_name,
    description,
    bootstrap_servers,
    auth_type,
    security_protocol,
    sasl_mechanism,
    sasl_username,
    sasl_password_hash,
    sasl_salt,
    additional_properties
) VALUES (
    'kafka-sasl-plain-cluster',
    'SASL/PLAIN认证的Kafka集群',
    'kafka1:9094,kafka2:9094',
    'SASL_PLAIN',
    'SASL_SSL',
    'PLAIN',
    'admin',
    -- 密码哈希: SHA-256 + 盐值
    encode(digest('password' || 'random_salt', 'sha256'), 'hex'),
    'random_salt',
    '{
        "ssl.endpoint.identification.algorithm": "https",
        "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"password\";"
    }'
);

3. SASL/SCRAM 认证配置示例

sql

INSERT INTO kafka_configs (
    cluster_name,
    description,
    bootstrap_servers,
    auth_type,
    security_protocol,
    sasl_mechanism,
    sasl_username,
    sasl_password_hash,
    sasl_salt,
    sasl_iterations,
    additional_properties
) VALUES (
    'kafka-sasl-scram-cluster',
    'SASL/SCRAM认证的Kafka集群',
    'kafka1:9094,kafka2:9094',
    'SASL_SCRAM',
    'SASL_SSL',
    'SCRAM-SHA-512',
    'admin',
    -- SCRAM哈希值 (需要使用SCRAM算法生成)
    'nThb8a8vT7QJZQJZQJZQJZQJZQJZQJZQJZQJZQ==',
    'random_salt_123456',
    4096,
    '{
        "ssl.endpoint.identification.algorithm": "https",
        "sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"password\";"
    }'
);

4. SASL/Kerberos (GSSAPI) 认证配置示例

sql

INSERT INTO kafka_configs (
    cluster_name,
    description,
    bootstrap_servers,
    auth_type,
    security_protocol,
    sasl_mechanism,
    kerberos_config,
    service_name,
    additional_properties
) VALUES (
    'kafka-kerberos-cluster',
    'Kerberos认证的Kafka集群',
    'kafka1:9093,kafka2:9093',
    'SASL_KERBEROS',
    'SASL_SSL',
    'GSSAPI',
    -- krb5.conf配置内容
    '
[libdefaults]
    default_realm = EXAMPLE.COM
    dns_lookup_realm = false
    dns_lookup_kdc = false

[realms]
    EXAMPLE.COM = {
        kdc = kerberos.example.com:88
        admin_server = kerberos.example.com:749
    }

[domain_realm]
    .example.com = EXAMPLE.COM
    example.com = EXAMPLE.COM
    ',
    'kafka',
    '{
        "ssl.endpoint.identification.algorithm": "https",
        "sasl.jaas.config": "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/security/keytabs/kafka_client.keytab\" principal=\"kafka-client@EXAMPLE.COM\";",
        "sasl.kerberos.service.name": "kafka",
        "kerberos.principal.to.local.rules": "DEFAULT"
    }'
);

5. OAUTHBEARER 认证配置示例

sql

INSERT INTO kafka_configs (
    cluster_name,
    description,
    bootstrap_servers,
    auth_type,
    security_protocol,
    sasl_mechanism,
    additional_properties
) VALUES (
    'kafka-oauth-cluster',
    'OAuth认证的Kafka集群',
    'kafka1:9096,kafka2:9096',
    'SASL_OAUTHBEARER',
    'SASL_SSL',
    'OAUTHBEARER',
    '{
        "ssl.endpoint.identification.algorithm": "https",
        "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required username=\"client-id\" password=\"client-secret\" metadataServerUrls=\"https://auth-server:8080/realms/kafka/protocol/openid-connect/token\";",
        "sasl.login.callback.handler.class": "org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
    }'
);

使用说明

  1. 敏感信息安全

    • 密码字段(如 truststore_password)使用 PostgreSQL 的crypt()函数进行 BCrypt 加密
    • SASL 密码使用 SHA-256 加盐哈希(实际生产环境建议使用更强的算法如 Argon2)
  2. 二进制数据存储

    • 证书数据使用decode()函数将 Base64 字符串转换为二进制存储
    • 大文件建议存储在文件系统,数据库仅存储路径和元数据
  3. 配置验证

    • 插入数据前需验证配置完整性(如选择 SSL 认证时,SSL 相关字段不能为空)
    • 对于复杂配置(如 Kerberos),建议在应用层进行格式验证
  4. 证书管理

    • 证书有效期需通过程序定期检查(可利用 certificates 表的 expires_at 字段)
    • 证书更新时需同时更新数据库记录和文件存储
 

这些示例展示了如何为不同认证方式填充配置表,实际应用中需根据具体环境调整参数。


网站公告

今日签到

点亮在社区的每一天
去签到