原始sql语句如下
CREATE TABLE test_source (
event_time TIMESTAMP(3), -- 事件时间(精确到毫秒)
click INT, -- 随机数值字段
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
WITH (
'connector' = 'datagen', -- 使用内置数据生成器
'rows-per-second' = '10', -- 每秒生成10条数据
'fields.event_time.kind' = 'sequence', -- 时间戳递增(模拟有序数据流)
'fields.event_time.start' = '2024-01-01 00:00:00',
'fields.event_time.end' = '2024-01-01 00:10:00',
'fields.click.min' = '0', -- 数值范围 [0, 100]
'fields.click.max' = '100'
);
在flink sql-client 执行 with前面括号丢失,不清楚为什么?
源数据:
CREATE TABLE test_source (
event_time TIMESTAMP_LTZ(3),-- 如果使用timestamp在datagen生成的utc时间
click INT,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.click.min' = '0',
'fields.click.max' = '100',
'fields.event_time.kind' = 'random',
'fields.event_time.max-past' = '1000s' -- 设置生成的时间戳范围为当前时间减去10000毫秒
);
5分钟滚动窗口写入paimon表,然后可以基于5分钟统计指标做合并
create table dwd_click(
window_start String,
window_end String,
total_value bigint,
record_count bigint,
PRIMARY KEY (window_start, window_end) NOT ENFORCED
)
;
insert into paimon_catalog.db1.dwd_click
SELECT
cast(window_start as String),
cast(window_end as String),
cast(SUM(click) as bigint) AS total_value,
COUNT(*) AS record_count
FROM TABLE(
TUMBLE(
TABLE default_catalog.default_database.test_source,
DESCRIPTOR(event_time),
INTERVAL '5' MINUTE -- 最大窗口范围(覆盖所有数据)
)
)
GROUP BY window_start, window_end;
使用累计窗口计算每十秒统计当前一小时的结果
SELECT
window_start,
window_end,
SUM(click) AS total_value,
COUNT(*) AS record_count
FROM TABLE(
CUMULATE(
TABLE test_source,
DESCRIPTOR(event_time),
INTERVAL '10' SECOND, -- 触发间隔(每10秒输出一次)
INTERVAL '1' HOUR -- 最大窗口范围(覆盖所有数据)
)
)
GROUP BY window_start, window_end;
使用滑动窗口统计
SELECT
window_start,
window_end,
SUM(click) AS total_value,
COUNT(*) AS record_count
FROM TABLE(
hop(
TABLE test_source,
DESCRIPTOR(event_time),
INTERVAL '10' SECOND, -- 触发间隔(每10秒输出一次
INTERVAL '1' DAY, -- 最大窗口范围(覆盖所有数据
0,
)
)
GROUP BY window_start, window_end;