paimon的四种changelog模式(2)-none模式

发布于:2024-11-29 ⋅ 阅读:(43) ⋅ 点赞:(0)

# 请先了解input模式

环境创建

 CREATE CATALOG fs_catalog WITH (
    'type'='paimon',
    'warehouse'='file:/data/soft/paimon/catalog'
);

USE CATALOG fs_catalog;

drop table if exists t_changelog_none ;
 
 CREATE TABLE t_changelog_none (
      age BIGINT,
      money BIGINT,
      hh STRING,
      PRIMARY KEY (hh) NOT ENFORCED
)WITH (
    'merge-engine' = 'deduplicate',
    'changelog-producer' = 'none' -- 不写也可以,默认的changelog模式
);

paimon的snapshot和checkpoint之间的关系

  • 一次snapshot会产生一个data文件
  • 一次checkpoint会产生1-2个snapshot文件,要看这次checkpoint是否触发compaction,触发了就是2个data文件(一个是合并后的数据,一个本次checkpoint写入数据),否则只有一个(本次checkpoint写入数据)
  • 流式写入根据checkpoint间隔,定期进行checkpoint
  • 批写(手动执行sql脚本)每一个sql会立即生成一次checkpoint效果

执行一次插入操作

insert into t_changelog_none values(10,1000,'1');

root@wsl01:/data/soft/paimon/catalog/default.db/t_changelog_none/bucket-0# ll
total 4
-rw-r–r-- 1 root root 1217 Nov 27 15:40 data-6dc0afc3-377d-4c20-83c0-a8522cf42052-0.parquet

只有data文件
在这里插入图片描述

相同主键,再执行一次插入操作

insert into t_changelog_none values(10,2000,'1');

只有data文件

root@wsl01:/data/soft/paimon/catalog/default.db/t_changelog_none/bucket-0# ll
total 8
-rw-r–r-- 1 root root 1217 Nov 27 15:40 data-6dc0afc3-377d-4c20-83c0-a8522cf42052-0.parquet
-rw-r–r-- 1 root root 1217 Nov 27 15:56 data-a27c3b1c-5527-4656-b3e2-5b98eb15ef50-0.parquet

在这里插入图片描述

sqlclient 流式查询

-- Flink SQL
SET 'execution.runtime-mode' = 'streaming';

--设置检查点的间隔为10秒
SET 'execution.checkpointing.interval'='10 s';
set parallelism.default=1;

-- 使用changelog,展示op,操作类型
SET 'sql-client.execution.result-mode'='changelog';

-- 从ID=1的快照开始,读取变化情况 
SELECT * FROM t_changelog_none  /*+ OPTIONS('scan.snapshot-id' = '1') */;

在这里插入图片描述

sqlclient 批查询

Flink SQL> SET 'sql-client.execution.result-mode'='tableau';
[INFO] Execute statement succeed.

Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeed.

Flink SQL> SELECT * FROM t_changelog_none ;
+-----+-------+----+
| age | money | hh |
+-----+-------+----+
|  10 |  2000 |  1 |
+-----+-------+----+
1 row in set

Flink SQL> SELECT * FROM t_changelog_none /*+ OPTIONS('scan.snapshot-id' = '1') */;
+-----+-------+----+
| age | money | hh |
+-----+-------+----+
|  10 |  1000 |  1 |
+-----+-------+----+
1 row in set

Flink SQL> SELECT * FROM t_changelog_none /*+ OPTIONS('scan.snapshot-id' = '2') */;
+-----+-------+----+
| age | money | hh |
+-----+-------+----+
|  10 |  2000 |  1 |
+-----+-------+----+
1 row in set

结论

  • changelog=node的情况下,一次操作产生一个data文件的,不会产生一个changelog文件
  • changelog=node的情况下,流式读取结果是正确的,虽然不像input模式有changelog,但是paimon会记录每次操作产生的快照,根据不同版本的快照数据,经过汇总能够推断出changelog,这个changelog是每次流式处理该表时,加载到flink状态中的(内存)
  • 因此这个none模式的changelog实际上是不够稳定的,而且比较耗费运算资源,但是非常省存储资源(不需要额外存储changelog文件)
  • 由于批处理模式不会使用changelog,因此批处理模式和none模式是比较搭配的

应用场景

  • 用于只进行批处理,不进行流式处理的表