# 请先了解input模式
环境创建
CREATE CATALOG fs_catalog WITH (
'type'='paimon',
'warehouse'='file:/data/soft/paimon/catalog'
);
USE CATALOG fs_catalog;
drop table if exists t_changelog_none ;
CREATE TABLE t_changelog_none (
age BIGINT,
money BIGINT,
hh STRING,
PRIMARY KEY (hh) NOT ENFORCED
)WITH (
'merge-engine' = 'deduplicate',
'changelog-producer' = 'none' -- 不写也可以,默认的changelog模式
);
paimon的snapshot和checkpoint之间的关系
- 一次snapshot会产生一个data文件
- 一次checkpoint会产生1-2个snapshot文件,要看这次checkpoint是否触发compaction,触发了就是2个data文件(一个是合并后的数据,一个本次checkpoint写入数据),否则只有一个(本次checkpoint写入数据)
- 流式写入根据checkpoint间隔,定期进行checkpoint
- 批写(手动执行sql脚本)每一个sql会立即生成一次checkpoint效果
执行一次插入操作
insert into t_changelog_none values(10,1000,'1');
root@wsl01:/data/soft/paimon/catalog/default.db/t_changelog_none/bucket-0# ll
total 4
-rw-r–r-- 1 root root 1217 Nov 27 15:40 data-6dc0afc3-377d-4c20-83c0-a8522cf42052-0.parquet
只有data文件
相同主键,再执行一次插入操作
insert into t_changelog_none values(10,2000,'1');
只有data文件
root@wsl01:/data/soft/paimon/catalog/default.db/t_changelog_none/bucket-0# ll
total 8
-rw-r–r-- 1 root root 1217 Nov 27 15:40 data-6dc0afc3-377d-4c20-83c0-a8522cf42052-0.parquet
-rw-r–r-- 1 root root 1217 Nov 27 15:56 data-a27c3b1c-5527-4656-b3e2-5b98eb15ef50-0.parquet
sqlclient 流式查询
-- Flink SQL
SET 'execution.runtime-mode' = 'streaming';
--设置检查点的间隔为10秒
SET 'execution.checkpointing.interval'='10 s';
set parallelism.default=1;
-- 使用changelog,展示op,操作类型
SET 'sql-client.execution.result-mode'='changelog';
-- 从ID=1的快照开始,读取变化情况
SELECT * FROM t_changelog_none /*+ OPTIONS('scan.snapshot-id' = '1') */;
sqlclient 批查询
Flink SQL> SET 'sql-client.execution.result-mode'='tableau';
[INFO] Execute statement succeed.
Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeed.
Flink SQL> SELECT * FROM t_changelog_none ;
+-----+-------+----+
| age | money | hh |
+-----+-------+----+
| 10 | 2000 | 1 |
+-----+-------+----+
1 row in set
Flink SQL> SELECT * FROM t_changelog_none /*+ OPTIONS('scan.snapshot-id' = '1') */;
+-----+-------+----+
| age | money | hh |
+-----+-------+----+
| 10 | 1000 | 1 |
+-----+-------+----+
1 row in set
Flink SQL> SELECT * FROM t_changelog_none /*+ OPTIONS('scan.snapshot-id' = '2') */;
+-----+-------+----+
| age | money | hh |
+-----+-------+----+
| 10 | 2000 | 1 |
+-----+-------+----+
1 row in set
结论
- changelog=node的情况下,一次操作产生一个data文件的,不会产生一个changelog文件
- changelog=node的情况下,流式读取结果是正确的,虽然不像input模式有changelog,但是paimon会记录每次操作产生的快照,根据不同版本的快照数据,经过汇总能够推断出changelog,这个changelog是每次流式处理该表时,加载到flink状态中的(内存)
- 因此这个none模式的changelog实际上是不够稳定的,而且比较耗费运算资源,但是非常省存储资源(不需要额外存储changelog文件)
- 由于批处理模式不会使用changelog,因此批处理模式和none模式是比较搭配的
应用场景
- 用于只进行批处理,不进行流式处理的表