消费 Kafka 一个TOPIC数据,插入到另一个KAFKA的TOPIC

发布于:2025-07-15 ⋅ 阅读:(19) ⋅ 点赞:(0)

从 Kafka 消费 CDC 数据(变更捕获,需 Upsert 语义)
用 kafka 连接器 + 主键 + 处理函数 模拟 Upsert,示例:
CREATE TABLE `KAFKA_TEST_0002` (
  `LGL_PERN_CODE` VARCHAR COMMENT 'LGL_PERN_CODE',
  `LBLTY_ACCT_NUM` VARCHAR COMMENT 'LBLTY_ACCT_NUM',
  `ACCT_NM` VARCHAR COMMENT 'ACCT_NM',
  `CUST_NUM` VARCHAR COMMENT 'CUST_NUM',
  `NAT_CODE` VARCHAR COMMENT 'NAT_CODE',
  -- 声明主键(用于 Upsert 去重)
  PRIMARY KEY (`LBLTY_ACCT_NUM`) NOT ENFORCED 
) WITH (
  'connector' = 'kafka',  -- 恢复为 kafka 连接器
  'topic' = 'KAFKA_TEST_0002',
  'properties.bootstrap.servers' = '10.57.48.38:21007,10.57.48.37:21007,10.57.48.36:21007',
  'properties.group.id' = '7a074dd07bfb4d4da39eb0f5773b952b',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'debezium-json',  -- 适配 CDC 格式
  'debezium-json.ignore-parse-errors' = 'true',
  'debezium-json.schema-include' = 'true',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.kerberos.domain.name' = 'hadoop.124dba82_3b54_0125_81e4_110652049a41.com',
  'properties.sasl.kerberos.service.name' = 'kafka'
);

-- 如需 Upsert 输出,再通过 Sink 写入 upsert-kafka
CREATE TABLE KafkaUpsertSink (
  `LBLTY_ACCT_NUM` VARCHAR,
  `LGL_PERN_CODE` VARCHAR,
  `ACCT_NM` VARCHAR,
  PRIMARY KEY (`LBLTY_ACCT_NUM`) NOT ENFORCED 
) WITH (
  'connector' = 'upsert-kafka',  -- Sink 侧使用 upsert-kafka
  'topic' = 'sink_topic',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'json',
  'value.format' = 'json'
);

-- 业务逻辑:从 Kafka 读 CDC 数据,处理后 Upsert 写入
INSERT INTO KafkaUpsertSink
SELECT LBLTY_ACCT_NUM, LGL_PERN_CODE, ACCT_NM
FROM `KAFKA_TEST_0002`;


网站公告

今日签到

点亮在社区的每一天
去签到