实时同步
注意mysql表的主键要和FlinkSQL的一致
set execution.checkpointing.checkpoints - after - tasks - finish.enabled = true;
SET pipeline.operator - chaining = false;
set state.backend.type = rocksdb;
set execution.checkpointing.interval = 8000;
set state.checkpoints.num - retained = 10;
set cluster.evenly - spread - out - slots = true;
CREATE TABLE IF NOT EXISTS study_score_cdc (
cid INT,
sid INT,
cls STRING,
score INT,
PRIMARY KEY (cid) NOT ENFORCED
)
WITH
(
'connector' = 'mysql-cdc',
'hostname' = '192.168.20.23',
'port' = '3306',
'username' = 'root',
'password' = 'ccphl123!',
'database-name' = 'db_eugene_test',
'server-time-zone' = 'Asia/Shanghai',
-- 'server-time-zone' = 'UTC',
-- 'scan.incremental.snapshot.enabled' = 'true',
-- 'debezium.snapshot.mode' = 'initial', -- 或者key是scan.startup.mode,initial表示要历史数据,latest-offset表示不要历史数据
-- 'debezium.datetime.format.date' = 'yyyy-MM-dd',
-- 'debezium.datetime.format.time' = 'HH-mm-ss',
-- 'debezium.datetime.format.datetime' = 'yyyy-MM-dd HH-mm-ss',
-- 'debezium.datetime.format.timestamp' = 'yyyy-MM-dd HH-mm-ss',
-- 'debezium.datetime.format.timestamp.zone' = 'UTC+8',
'table-name' = 'study_score'
);
DROP table IF EXISTS study_score_cdc_sink;
CREATE TABLE IF NOT EXISTS study_score_cdc_sink (
cid INT,
sid INT,
cls STRING,
score INT,
PRIMARY KEY (cid) NOT ENFORCED
)
WITH
(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.21.201:3308/db_eugene_test_01?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8',
'username' = 'root',
'password' = 'redrcd@123',
'table-name' = 'study_score_cdc_sink'
);
INSERT INTO
study_score_cdc_sink
select
cid,
sid,
cls,
score
from
study_score_cdc;
实时统计(无历史记录)
mysql表:
CREATE TABLE `score_statistics` (
`cls` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
`score` int(11) DEFAULT NULL,
PRIMARY KEY (`cls`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
注意mysql表的主键要和FlinkSQL的一致
-- 配置参数(原有配置保留)
SET execution.checkpointing.checkpoints-after-tasks-finish.enabled = true;
SET pipeline.operator-chaining = false;
SET state.backend.type = rocksdb;
SET execution.checkpointing.interval = 8000;
SET state.checkpoints.num-retained = 10;
SET cluster.evenly-spread-out-slots = true;
SET execution.time-characteristic = 'ProcessingTime';
SET table.exec.timezone = 'Asia/Shanghai';
-- CDC 源表(保持不变)
CREATE TABLE IF NOT EXISTS study_score_cdc (
cid INT,
sid INT,
cls STRING,
score INT,
PRIMARY KEY (cid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.20.23',
'port' = '3306',
'username' = 'root',
'password' = 'ccphl123!',
'database-name' = 'db_eugene_test',
'server-time-zone' = 'Asia/Shanghai',
'table-name' = 'study_score'
);
-- Sink 表(所有时间字段为 STRING 类型)
CREATE TABLE IF NOT EXISTS study_score_cdc_statistics_sink (
cls STRING PRIMARY KEY NOT ENFORCED, -- 主键为业务键 cls,和MySQL的
score INT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.21.201:3308/db_eugene_test_01?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8',
'username' = 'root',
'password' = 'redrcd@123',
'table-name' = 'score_statistics',
'sink.buffer-flush.max-rows' = '1' -- 控制写入频率
);
-- 执行更新逻辑
INSERT INTO study_score_cdc_statistics_sink
WITH
-- 计算最新聚合数据
newa_records AS (
SELECT
cls,
SUM(score) AS score
FROM study_score_cdc
GROUP BY cls
)
-- 合并新旧数据并执行 UPSERT
SELECT
cls,
score
FROM newa_records;
拉链表
注意目标表的联合主键和FlinkSQL的保持一致
CREATE TABLE `dim_score_statistics` (
`id` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
`cls` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
`score` int(11) DEFAULT NULL,
`update_time` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`dw_start_time` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`dw_end_time` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
`is_current` tinyint(1) NOT NULL,
-- 这儿重点注意以下
PRIMARY KEY (`cls`,`dw_end_time`,`is_current`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
实时统计(有历史记录,每次运行如果目标表有数据都会变成历史数据)
-- 配置参数(原有配置保留)
SET execution.checkpointing.checkpoints-after-tasks-finish.enabled = true;
SET pipeline.operator-chaining = false;
SET state.backend.type = rocksdb;
SET execution.checkpointing.interval = 8000;
SET state.checkpoints.num-retained = 10;
SET cluster.evenly-spread-out-slots = true;
SET execution.time-characteristic = 'ProcessingTime';
SET table.exec.timezone = 'Asia/Shanghai';
-- CDC 源表(保持不变)
CREATE TABLE IF NOT EXISTS study_score_cdc (
cid INT,
sid INT,
cls STRING,
score INT,
PRIMARY KEY (cid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.20.23',
'port' = '3306',
'username' = 'root',
'password' = 'ccphl123!',
'database-name' = 'db_eugene_test',
'server-time-zone' = 'Asia/Shanghai',
'table-name' = 'study_score'
);
-- Sink 表(根据 MySQL 主键调整字段顺序)
CREATE TABLE IF NOT EXISTS dim_score_statistics_sink (
cls STRING,
dw_end_time STRING,
is_current BOOLEAN,
id STRING,
score INT,
update_time STRING,
dw_start_time STRING,
-- 主键必须与 MySQL 的组合主键一致
PRIMARY KEY (cls, dw_end_time, is_current) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.21.201:3308/db_eugene_test_01?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8',
'username' = 'root',
'password' = 'redrcd@123',
'table-name' = 'dim_score_statistics',
'sink.buffer-flush.max-rows' = '1' -- 控制写入频率
);
-- 执行更新逻辑
INSERT INTO dim_score_statistics_sink
WITH
-- 新记录(当前有效)
newa_records AS (
SELECT
cls,
'9999-12-31 23:59:59' AS dw_end_time,
TRUE AS is_current,
UUID() AS id,
SUM(score) AS score,
DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS update_time,
DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS dw_start_time
FROM study_score_cdc
GROUP BY cls
),
-- 获取所有当前有效旧记录
olda_records AS (
SELECT
cls,
dw_end_time,
is_current,
id,
score,
update_time,
dw_start_time
FROM dim_score_statistics_sink
WHERE is_current = TRUE
),
-- 生成失效的旧记录(主键组合为 (cls, 当前时间, FALSE))
closed_olda_records AS (
SELECT
olda.cls AS cls,
DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS dw_end_time, -- 过期时间设为当前时间
FALSE AS is_current,
olda.id AS id,
olda.score AS score,
olda.update_time AS update_time,
olda.dw_start_time AS dw_start_time
FROM olda_records AS olda
)
-- 合并新记录和失效的旧记录
SELECT
cls,
dw_end_time,
is_current,
id,
score,
update_time,
dw_start_time
FROM newa_records
UNION ALL
SELECT
cls,
dw_end_time,
is_current,
id,
score,
update_time,
dw_start_time
FROM closed_olda_records;
实时统计(有历史记录,只新增score有变动的)
-- 配置参数(原有配置保留)
SET execution.checkpointing.checkpoints-after-tasks-finish.enabled = true;
SET pipeline.operator-chaining = false;
SET state.backend.type = rocksdb;
SET execution.checkpointing.interval = 8000;
SET state.checkpoints.num-retained = 10;
SET cluster.evenly-spread-out-slots = true;
SET execution.time-characteristic = 'ProcessingTime';
SET table.exec.timezone = 'Asia/Shanghai';
-- CDC 源表(保持不变)
CREATE TABLE IF NOT EXISTS study_score_cdc (
cid INT,
sid INT,
cls STRING,
score INT,
PRIMARY KEY (cid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.20.23',
'port' = '3306',
'username' = 'root',
'password' = 'ccphl123!',
'database-name' = 'db_eugene_test',
'server-time-zone' = 'Asia/Shanghai',
'table-name' = 'study_score'
);
-- Sink 表(主键为 (cls, dw_end_time, is_current))
CREATE TABLE IF NOT EXISTS dim_score_statistics_sink (
cls STRING,
dw_end_time STRING,
is_current BOOLEAN,
id STRING,
score INT,
update_time STRING,
dw_start_time STRING,
PRIMARY KEY (cls, dw_end_time, is_current) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.21.201:3308/db_eugene_test_01?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8',
'username' = 'root',
'password' = 'redrcd@123',
'table-name' = 'dim_score_statistics',
'sink.buffer-flush.max-rows' = '1'
);
-- 执行更新逻辑
INSERT INTO dim_score_statistics_sink
WITH
-- 计算新记录的 score
newa_score AS (
SELECT
cls,
SUM(score) AS new_score
FROM study_score_cdc
GROUP BY cls
),
-- 获取当前有效旧记录的字段(包含所有字段)
olda_current AS (
SELECT
cls,
score AS old_score,
dw_start_time,
dw_end_time,
is_current,
id,
update_time
FROM dim_score_statistics_sink
WHERE is_current = TRUE
),
-- 筛选出需要更新的 cls(score 变化或新 cls)
changed_cls AS (
SELECT
newa.cls,
newa.new_score,
olda.id AS old_id,
olda.old_score,
olda.dw_start_time AS old_dw_start_time,
olda.dw_end_time AS old_dw_end_time,
olda.update_time AS old_update_time
FROM newa_score AS newa
LEFT JOIN olda_current AS olda ON newa.cls = olda.cls
WHERE newa.new_score <> olda.old_score OR olda.cls IS NULL
),
-- 生成新记录(仅当需要变化时)
newa_records AS (
SELECT
cls,
'9999-12-31 23:59:59' AS dw_end_time,
TRUE AS is_current,
UUID() AS id,
new_score AS score,
DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS update_time,
DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS dw_start_time
FROM changed_cls
),
-- 生成失效的旧记录(仅当 score 变化时)
closed_olda_records AS (
SELECT
olda.cls AS cls,
DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS dw_end_time,
FALSE AS is_current,
olda.id AS id,
olda.old_score AS score,
olda.update_time AS update_time,
olda.dw_start_time AS dw_start_time -- 修正:直接引用 olda_current 的 dw_start_time
FROM changed_cls AS changed
JOIN olda_current AS olda ON changed.cls = olda.cls
WHERE changed.new_score <> olda.old_score
)
-- 合并新记录和失效的旧记录
SELECT
cls,
dw_end_time,
is_current,
id,
score,
update_time,
dw_start_time
FROM newa_records
UNION ALL
SELECT
cls,
dw_end_time,
is_current,
id,
score,
update_time,
dw_start_time
FROM closed_olda_records;