Flink SQL-Client Kafka connector

发布于:2025-04-01 ⋅ 阅读:(22) ⋅ 点赞:(0)

下载对应版本的 Kafka 连接器 JAR 文件(如 flink-sql-connector-kafka-.jar),并放置到 Flink 的 lib/ 目录下。

Source

CREATE TABLE kafka_source (
    user_id STRING,
    event_time TIMESTAMP(3),
    action STRING,
    -- 定义事件时间与 Watermark
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',                      -- 指定连接器类型
    'topic' = 'test',                    -- Kafka Topic 名称
    'properties.bootstrap.servers' = 'chb1:9092',  -- Kafka Broker 地址
    'properties.group.id' = 'flink-sql-group',  -- 消费者组 ID
    'scan.startup.mode' = 'earliest-offset',    -- 初始消费位点(可选:latest-offset)
    'format' = 'json'                           -- 数据格式(JSON、Avro 等)
);

Sink


drop table kafka_sink;
CREATE TABLE kafka_sink (
    user_id STRING,
    window_start TIMESTAMP(3),
    action_count BIGINT
) WITH (
    'connector' = 'kafka',                      -- 连接器类型
    'topic' = 'output_topic',                   -- 目标 Topic
	'properties.group.id' = 'ksink-01',
    'properties.bootstrap.servers' = 'chb1:9092',
    'properties.auto.offset.reset' = 'earliest',
    'format' = 'json',                          -- 数据格式
    'sink.partitioner' = 'round-robin',         -- 分区策略(round-robin、key-hash)
	-- 事务
	'properties.transaction.timeout.ms'='300000',
    'sink.transactional-id-prefix' = 'ksink-tid-',	
    'sink.delivery-guarantee' = 'exactly-once'  -- 投递语义(需开启 Checkpoint)
	
);

写入到Sink

-- 按窗口统计用户行为次数
INSERT INTO kafka_sink
SELECT 
    user_id,
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
    COUNT(action) AS action_count
FROM kafka_source
GROUP BY 
    user_id,
    TUMBLE(event_time, INTERVAL '1' MINUTE);

问题

1、 The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
在 Flink SQL 中配置 Kafka 的 transaction.timeout.ms 时,需通过 properties. 前缀传递 Kafka Producer 的专属参数。
sink 端添加'properties.transaction.timeout.ms'='300000',

2、查询kafka_sink报错: Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [output_topic-0]
Flink SQL可能未正确设置auto.offset.reset参数,导致无法找到偏移量。您需要在Flink SQL的WITH子句中明确指定auto.offset.reset的值

 'properties.auto.offset.reset' = 'earliest',