Flink SQL Client bug ---datagen connector

发布于:2025-03-29 ⋅ 阅读:(34) ⋅ 点赞:(0)

原始sql语句如下

CREATE TABLE test_source (
    event_time TIMESTAMP(3),   -- 事件时间(精确到毫秒)
    click INT,                 -- 随机数值字段
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
WITH (
    'connector' = 'datagen',                  -- 使用内置数据生成器
    'rows-per-second' = '10',                 -- 每秒生成10条数据
    'fields.event_time.kind' = 'sequence',    -- 时间戳递增(模拟有序数据流)
    'fields.event_time.start' = '2024-01-01 00:00:00',
    'fields.event_time.end' = '2024-01-01 00:10:00',
    'fields.click.min' = '0',                 -- 数值范围 [0, 100]
    'fields.click.max' = '100'
);

在flink sql-client 执行 with前面括号丢失,不清楚为什么?

在这里插入图片描述
源数据:

CREATE TABLE test_source (
    event_time TIMESTAMP_LTZ(3),-- 如果使用timestamp在datagen生成的utc时间
    click INT,
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1',
	'fields.click.min' = '0',
    'fields.click.max' = '100',
    'fields.event_time.kind' = 'random',
    'fields.event_time.max-past' = '1000s'  -- 设置生成的时间戳范围为当前时间减去10000毫秒
);

5分钟滚动窗口写入paimon表,然后可以基于5分钟统计指标做合并

create table dwd_click(
    window_start String,
    window_end String,
    total_value bigint,
    record_count bigint,
	PRIMARY KEY (window_start, window_end) NOT ENFORCED
)
;



insert into paimon_catalog.db1.dwd_click
SELECT
    cast(window_start as String),
    cast(window_end as String),
    cast(SUM(click) as bigint) AS total_value,
    COUNT(*) AS record_count
FROM TABLE(
    TUMBLE(
        TABLE default_catalog.default_database.test_source,
        DESCRIPTOR(event_time),
		INTERVAL '5' MINUTE  -- 最大窗口范围(覆盖所有数据)
    )
)
GROUP BY window_start, window_end;

使用累计窗口计算每十秒统计当前一小时的结果

SELECT
    window_start,
    window_end,
    SUM(click) AS total_value,
    COUNT(*) AS record_count
FROM TABLE(
    CUMULATE(
        TABLE test_source,
        DESCRIPTOR(event_time),
        INTERVAL '10' SECOND,   -- 触发间隔(每10秒输出一次)
		INTERVAL '1' HOUR  -- 最大窗口范围(覆盖所有数据)
    )
)
GROUP BY window_start, window_end;

使用滑动窗口统计

SELECT
    window_start,
    window_end,
    SUM(click) AS total_value,
    COUNT(*) AS record_count
FROM TABLE(
    hop(
        TABLE test_source,
        DESCRIPTOR(event_time),
        INTERVAL '10' SECOND,   -- 触发间隔(每10秒输出一次
		INTERVAL '1' DAY,  -- 最大窗口范围(覆盖所有数据
		0,
    )
)
GROUP BY window_start, window_end;