下载对应版本的 Kafka 连接器 JAR 文件(如 flink-sql-connector-kafka-.jar),并放置到 Flink 的 lib/ 目录下。
Source
CREATE TABLE kafka_source (
user_id STRING,
event_time TIMESTAMP(3),
action STRING,
-- 定义事件时间与 Watermark
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka', -- 指定连接器类型
'topic' = 'test', -- Kafka Topic 名称
'properties.bootstrap.servers' = 'chb1:9092', -- Kafka Broker 地址
'properties.group.id' = 'flink-sql-group', -- 消费者组 ID
'scan.startup.mode' = 'earliest-offset', -- 初始消费位点(可选:latest-offset)
'format' = 'json' -- 数据格式(JSON、Avro 等)
);
Sink
drop table kafka_sink;
CREATE TABLE kafka_sink (
user_id STRING,
window_start TIMESTAMP(3),
action_count BIGINT
) WITH (
'connector' = 'kafka', -- 连接器类型
'topic' = 'output_topic', -- 目标 Topic
'properties.group.id' = 'ksink-01',
'properties.bootstrap.servers' = 'chb1:9092',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json', -- 数据格式
'sink.partitioner' = 'round-robin', -- 分区策略(round-robin、key-hash)
-- 事务
'properties.transaction.timeout.ms'='300000',
'sink.transactional-id-prefix' = 'ksink-tid-',
'sink.delivery-guarantee' = 'exactly-once' -- 投递语义(需开启 Checkpoint)
);
写入到Sink
-- 按窗口统计用户行为次数
INSERT INTO kafka_sink
SELECT
user_id,
TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
COUNT(action) AS action_count
FROM kafka_source
GROUP BY
user_id,
TUMBLE(event_time, INTERVAL '1' MINUTE);
问题
1、 The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
在 Flink SQL 中配置 Kafka 的 transaction.timeout.ms 时,需通过 properties. 前缀传递 Kafka Producer 的专属参数。
sink 端添加'properties.transaction.timeout.ms'='300000',
2、查询kafka_sink报错: Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [output_topic-0]
Flink SQL可能未正确设置auto.offset.reset参数,导致无法找到偏移量。您需要在Flink SQL的WITH子句中明确指定auto.offset.reset的值
'properties.auto.offset.reset' = 'earliest',