Apache Paimon (此前称为 Flink Table Store)是一种流式数据湖存储技术,采用 LSM(Log-Structured Merge-tree)树结构来存储数据,支持高吞吐、低延迟的数据摄入和实时查询,尤其适用于流式和批量统一的场景。
1. 创建表 (CREATE TABLE)
当执行 CREATE TABLE
语句时(例如通过 Flink SQL 或 Spark SQL),Paimon 会执行以下操作:
CREATE TABLE IF NOT EXISTS user_behavior (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
PRIMARY KEY (ts, user_id, item_id) NOT ENFORCED
) PARTITIONED BY (ts)
WITH (
'bucket-key' = 'user_id',
'bucket' = '4',
'snapshot.time-retained' = '1 h'
)
每个分区进一步分桶(Bucket),桶是读写的最小单元。默认使用列式存储(如 Parquet、ORC),数据文件按主键排序。
SQL 解析与 Catalog: 计算引擎(Flink/Spark)解析 SQL DDL 语句。请求通过 Paimon 的 Catalog 接口传递给 Paimon。Catalog 负责管理数据库和表的元数据。
Schema 管理:
SchemaManager
(源码路径:paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
) 负责处理表的 Schema 信息。它会接收用户定义的列、类型、主键、分区键、表属性等。- Schema 信息会被持久化。在文件系统类型的 Catalog 中,Schema 会以 JSON 文件的形式存储在表的基础路径下的
schema
目录中,例如warehouse/your_db/your_table/schema/schema-0
。"0" 代表 Schema 的版本号。
表对象实例化:
FileStoreTableFactory
(源码路径:paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
) 会根据表的配置(例如是否有主键、分区信息等)创建相应的表实例。- 如果定义了主键,通常会创建
PrimaryKeyFileStoreTable
(源码路径:paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
)。 - 如果没有主键(追加表),则会创建
AppendOnlyFileStoreTable
(源码路径:paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
)。
- 如果定义了主键,通常会创建
- 此时,磁盘上会创建表的基础目录结构,主要包含
schema
目录。snapshot
和manifest
目录此时可能为空或不存在,直到第一次数据提交。
2. Paimon 文件组织 (File Layout)
Paimon 的文件以分层的方式组织,核心元数据包括快照 (Snapshot)、清单列表 (Manifest List) 和清单文件 (Manifest File),它们共同指向实际的数据文件 (Data File)。
可以参考以下理解其文件布局:
/tmp/paimon/default.db/T/
├── schema/ # 存储表结构(schema-0)
├── snapshot/ # 快照目录(初始为空)
├── manifest/ # 清单文件目录(初始为空)
└── ts={partition}/ # 分区目录(按分区键动态生成)
├── bucket-0/ # 桶0目录
│ ├── data-xxx.parquet
│ └── ...
├── bucket-1/ # 桶1目录
│ ├── data-yyy.parquet
│ └── ...
├── bucket-2/
└── bucket-3/
表目录 (Table Directory): 所有与该表相关的文件都存储在这个目录下,例如
file:///tmp/paimon/default.db/T
。schema/
: 存储表的 Schema 文件 (如schema-0
,schema-1
, ...)。snapshot/
: 存储快照文件 (如snapshot-1
,snapshot-2
, ...)。manifest/
: 存储清单列表文件和清单文件 (如manifest-list-xxx
,manifest-file-yyy
, ...)。- 分区目录 (如
dt=2023-01-01/
): 如果表是分区的,数据文件会存储在相应的分区目录下。 - 桶目录 (如
bucket-0/
): 在每个分区内(或表级,如果未分区),数据按桶组织。
Snapshot (快照):
- 每个成功的 commit 操作都会生成一个新的快照。快照代表了表在某个特定时间点的完整、一致的状态。
- 快照文件是一个小的 JSON 文件,存储在
snapshot/
目录下。它包含了该快照的 ID、对应的 Schema ID、指向的 Manifest List 文件名、提交用户、提交时间等元信息。 - 例如
snapshot-1
的内容可能类似:{ "version" : 3, "id" : 1, "schemaId" : 0, "baseManifestList" : "manifest-list-uuid1-0", "deltaManifestList" : "manifest-list-uuid1-1", "commitUser" : "user-uuid", "commitIdentifier" : 1234567890, "commitKind" : "APPEND", "timeMillis" : 1672531200000 }
Manifest List (清单列表):
- 每个快照文件会指向一个 Manifest List 文件。
- Manifest List 文件本身也是一个元数据文件,它记录了一个或多个 Manifest 文件的列表。这些 Manifest 文件共同构成了该快照的数据视图。
- 存储在
manifest/
目录下。
Manifest File (清单文件):
- 每个 Manifest 文件包含了一组数据文件 (Data File) 或 Changelog 文件的元数据。
- 这些元数据包括:数据文件名、文件所属的分区和桶、文件在 LSM 树中的层级 (level)、文件中记录的数量、文件中 key 的统计信息 (min/max key,用于数据跳过)、文件大小等。
- 存储在
manifest/
目录下。
Data File (数据文件):
- 实际存储表记录的文件。Paimon 支持多种列式存储格式,如 Parquet (默认)、ORC、Avro。
- 数据文件根据表的分区键和桶键进行组织。例如,对于按
dt
分区、bucket
为 10 的表,一个数据文件路径可能为your_warehouse/your_db/your_table/dt=2024-01-01/bucket-5/data-file-uuid.parquet
。
Index File (索引文件): (可选)
- Paimon 支持为数据文件创建索引(例如布隆过滤器、MinMax 索引),这些索引信息可以存储在单独的索引文件中,或直接内嵌在 Manifest 文件中(如果索引较小)。
Manifest 文件的组织与存储格式
Paimon 的 Manifest 文件以及 Manifest List 文件是二进制存储的。它们通常使用 Apache Avro 格式进行序列化和存储。Avro 是一种数据序列化系统,它依赖于 Schema。当数据存储时,Schema 也会被存储,这样文件就可以在以后被任何程序读取。这使得 Avro 文件具有良好的可移植性和演化性。
Manifest 文件的组织结构如下:
Snapshot (快照):
- 每个快照文件(JSON 格式)指向一个或多个 Manifest List 文件。
Manifest List (清单列表):
- 存储在
manifest/
目录下,文件名通常包含 UUID,例如manifest-list-uuid-N
。 - 它是一个 Avro 文件。
- 其内容是
ManifestFileMeta
对象的列表。每个ManifestFileMeta
描述了一个 Manifest 文件的元数据,包括:- Manifest 文件名 (
_FILE_NAME
) - 文件大小 (
_FILE_SIZE
) - 该 Manifest 文件中新增的数据文件数量 (
_NUM_ADDED_FILES
) - 该 Manifest 文件中删除的数据文件数量 (
_NUM_DELETED_FILES
) - 分区统计信息 (
_PARTITION_STATS
),用于查询时跳过不相关的 Manifest 文件。 - 写入此 Manifest 文件时使用的 Schema ID (
_SCHEMA_ID
)。
- Manifest 文件名 (
- 相关文档:Manifest List 规范
- 存储在
Manifest File (清单文件):
- 存储在
manifest/
目录下,文件名通常也包含 UUID,例如manifest-file-uuid
。 - 它也是一个 Avro 文件。
- 其内容是
ManifestEntry
对象的列表。每个ManifestEntry
代表对一个数据文件(Data File)或一个 Changelog 文件的变更记录。 ManifestEntry
包含以下关键信息:- 变更类型 (
_KIND
):ADD
(新增文件) 或DELETE
(删除文件)。 - 分区值 (
_PARTITION
): 文件所属的分区。 - 桶号 (
_BUCKET
): 文件所属的桶。 - 数据文件名 (
_FILE_NAME
)。 - 数据文件在 LSM 树中的层级 (
_LEVEL
)。 - Schema ID (
_SCHEMA_ID
): 写入此数据文件时使用的 Schema ID。 - 数据文件的统计信息,如行数、key 的 min/max 值等,用于数据跳过。
- 关联的索引文件列表 (
_EXTRA_FILES
)。
- 变更类型 (
- 相关文档:Manifest 规范
- 存储在
由于它们是二进制的 Avro 文件,需要使用 Avro 工具或者通过 Paimon 的代码来读取和解析它们。
通过源码理解 Manifest 文件的创建
主要关注 ManifestFile.java
这个类,它负责 Manifest 文件的创建和读写。
ManifestFile.Factory
:- 这是创建
ManifestFile
实例的工厂类。 - 当你需要创建一个新的 Manifest 文件对象(逻辑上的,此时物理文件还未写入或只是准备写入)时,会使用这个工厂。
- 关键方法是
create()
。
ManifestFile.java
// ... existing code ... public static class Factory { private final FileIO fileIO; private final SchemaManager schemaManager; private final RowType partitionType; private final FileFormat fileFormat; private final String compression; private final FileStorePathFactory pathFactory; private final long suggestedFileSize; @Nullable private final SegmentsCache<Path> cache; public Factory( FileIO fileIO, SchemaManager schemaManager, RowType partitionType, FileFormat fileFormat, String compression, FileStorePathFactory pathFactory, long suggestedFileSize, @Nullable SegmentsCache<Path> cache) { this.fileIO = fileIO; this.schemaManager = schemaManager; this.partitionType = partitionType; this.fileFormat = fileFormat; this.compression = compression; this.pathFactory = pathFactory; this.suggestedFileSize = suggestedFileSize; this.cache = cache; } public ManifestFile create() { RowType entryType = VersionedObjectSerializer.versionType(ManifestEntry.SCHEMA); return new ManifestFile( fileIO, schemaManager, partitionType, new ManifestEntrySerializer(), entryType, fileFormat.createReaderFactory(entryType), fileFormat.createWriterFactory(entryType), compression, pathFactory.manifestFileFactory(), suggestedFileSize, cache); } } // ... existing code ...
在
create()
方法中:ManifestEntry.SCHEMA
定义了 Manifest 文件中每条记录(ManifestEntry
)的 Avro Schema。VersionedObjectSerializer.versionType
可能会对其进行包装以支持版本控制。fileFormat.createReaderFactory(entryType)
和fileFormat.createWriterFactory(entryType)
根据配置的file.format
(通常是 Avro) 创建相应的读写器工厂。pathFactory.manifestFileFactory()
用于生成 Manifest 文件的具体路径和名称。- 最终,它调用
ManifestFile
的构造函数来实例化对象。
- 这是创建
ManifestFile
构造函数:- 接收
Factory
传递过来的参数,初始化ManifestFile
对象。 - 这个对象代表一个逻辑上的 Manifest 文件,它知道如何读写物理文件。
ManifestFile.java
// ... existing code ... public class ManifestFile extends ObjectsFile<ManifestEntry> { private final SchemaManager schemaManager; private final RowType partitionType; private final FormatWriterFactory writerFactory; private final long suggestedFileSize; private ManifestFile( FileIO fileIO, SchemaManager schemaManager, RowType partitionType, ManifestEntrySerializer serializer, RowType schema, FormatReaderFactory readerFactory, FormatWriterFactory writerFactory, String compression, PathFactory pathFactory, long suggestedFileSize, @Nullable SegmentsCache<Path> cache) { super( fileIO, serializer, schema, readerFactory, writerFactory, compression, pathFactory, suggestedFileSize, cache); this.schemaManager = schemaManager; this.partitionType = partitionType; this.writerFactory = writerFactory; this.suggestedFileSize = suggestedFileSize; } // ... existing code ...
- 接收
ManifestFile.ManifestEntryWriter
:- 这是一个内部类,继承自
SingleFileWriter
。它负责将ManifestEntry
对象实际写入到物理的 Manifest 文件中。 - 当 Paimon 的写操作(例如
FileStoreCommitImpl
)准备提交一批数据文件的变更时,它会创建一个ManifestEntryWriter
。 - 通过调用
writer.write(ManifestEntry)
来逐条写入变更记录。 - 调用
writer.close()
时,会完成文件的写入、关闭流,并返回一个ManifestFileMeta
对象,该对象描述了刚刚写入的这个 Manifest 文件的元数据。
ManifestFile.java
// ... existing code ... @Override public ManifestEntrySerializer serializer() { return (ManifestEntrySerializer) super.serializer(); } public ManifestEntryWriter createWriter(String fileCompression) { return new ManifestEntryWriter( writerFactory, pathFactory.newPath(), fileCompression == null ? compression : fileCompression); } /** * Writer for manifest files. * * <p>IMPORTANT: This writer is not thread-safe. */ public class ManifestEntryWriter extends SingleFileWriter<ManifestEntry, ManifestFileMeta> { private final SimpleStatsCollector partitionStatsCollector; private final SimpleStatsConverter partitionStatsSerializer; private long numAddedFiles = 0; private long numDeletedFiles = 0; private long schemaId = Long.MIN_VALUE; private int minBucket = Integer.MAX_VALUE; private int maxBucket = Integer.MIN_VALUE; private int minLevel = Integer.MAX_VALUE; private int maxLevel = Integer.MIN_VALUE; ManifestEntryWriter(FormatWriterFactory factory, Path path, String fileCompression) { super( ManifestFile.this.fileIO, factory, path, serializer()::toRow, // 将 ManifestEntry 转换为 Avro 的 GenericRow fileCompression, false); this.partitionStatsCollector = new SimpleStatsCollector(partitionType); this.partitionStatsSerializer = new SimpleStatsConverter(partitionType); } @Override public void write(ManifestEntry entry) throws IOException { super.write(entry); // 调用父类的 write,最终会使用 FormatWriter 写入 // Update stats if (entry.partition().getFieldCount() > 0) { partitionStatsCollector.collect(entry.partition()); } if (entry.kind() == FileKind.ADD) { numAddedFiles++; } else { numDeletedFiles++; } if (schemaId == Long.MIN_VALUE) { schemaId = entry.file().schemaId(); } else if (schemaId != entry.file().schemaId()) { // TODO do we need to support different schema ids in one manifest file? // if so, we should store a list of schema ids, or store the schema id per // entry // for now, just check they are the same // In the future, if we want to support this, we need to change the // ManifestFileMeta.schemaId to a list or remove it. // And, AbstractFileStoreScan.readManifestFileStream should also be changed. checkState( schemaId == entry.file().schemaId(), "Schema id %s in manifest entry is different from previous schema id %s", entry.file().schemaId(), schemaId); } minBucket = Math.min(minBucket, entry.bucket()); maxBucket = Math.max(maxBucket, entry.bucket()); minLevel = Math.min(minLevel, entry.file().level()); maxLevel = Math.max(maxLevel, entry.file().level()); } @Override public ManifestFileMeta result() throws IOException { return new ManifestFileMeta( fileName(), fileSize(), numAddedFiles, numDeletedFiles, partitionStatsSerializer.toBinary(partitionStatsCollector.extract()), schemaId, minBucket, maxBucket, minLevel, maxLevel); } } // ... existing code ...
在
ManifestEntryWriter
中:- 构造函数接收一个
FormatWriterFactory
(例如AvroWriterFactory
) 和目标文件路径。 serializer()::toRow
是一个函数,它将ManifestEntry
Java 对象转换成 Avro 能理解的InternalRow
(或GenericRow
),然后由底层的 Avro writer 写入文件。write(ManifestEntry entry)
方法除了调用父类的write
来实际写入数据外,还会收集统计信息,如新增/删除文件数、分区统计、Schema ID、桶号范围、层级范围等。这些统计信息最终会记录在ManifestFileMeta
中。result()
方法在文件写入完成后被调用,用于生成包含上述统计信息的ManifestFileMeta
对象。
- 这是一个内部类,继承自
ManifestEntry.java
和ManifestEntrySerializer.java
:ManifestEntry.java
(通常是一个 POJO 或记录类) 定义了 Manifest 文件中每条记录的逻辑结构。ManifestEntrySerializer.java
负责将ManifestEntry
对象与 Avro 的InternalRow
进行相互转换。它知道如何从InternalRow
中读取字段来构造ManifestEntry
,以及如何将ManifestEntry
的字段写入InternalRow
。
总结
Manifest 文件的创建流程大致是:
- 当需要记录数据文件变更时(通常在事务提交阶段),Paimon 会通过
ManifestFile.Factory
创建一个ManifestFile
对象。 - 然后调用
manifestFile.createWriter(...)
得到一个ManifestEntryWriter
。 - 将表示数据文件新增或删除的
ManifestEntry
对象逐条写入ManifestEntryWriter
。 ManifestEntryWriter
内部使用 Avro 的FormatWriter
将ManifestEntry
序列化为二进制格式并写入磁盘文件。- 写入完成后,
ManifestEntryWriter
返回一个ManifestFileMeta
,其中包含了这个新生成的 Manifest 文件的元数据。 - 这些
ManifestFileMeta
对象会被收集起来,写入到 Manifest List 文件中,同样使用 Avro 格式。
通过阅读 ManifestFile.java
中的 Factory
和 ManifestEntryWriter
类,以及相关的 ManifestEntrySerializer.java
和 ManifestEntry.SCHEMA
,就能非常清楚地了解 Manifest 文件的创建和内容组织。
3. 数据写入与一致性
总体流程
- Flink SQL 是入口: 当使用 Flink SQL INSERT INTO paimon_table ... 时,Flink SQL 的 Planner 会将这个 SQL 语句转换成一个 Flink DataStream 作业。
- FlinkTableSink 是桥梁: Paimon 通过实现 DynamicTableSink (即 FlinkTableSink) 来告诉 Flink 如何处理对 Paimon 表的写入。
- FlinkSinkBuilder 是构建器: 在 FlinkTableSink.getSinkRuntimeProvider() 中,会使用 FlinkSinkBuilder 来根据表的具体特性(append-only, primary-key, bucket模式等)和配置(log sink, overwrite等)来组装 Sink 逻辑。
- FlinkSink 是核心封装: FlinkSinkBuilder 会创建并调用具体 FlinkSink 子类 (如 AppendOnlyFlinkSink, FixedBucketSink) 的 sinkFrom() 方法。
- FlinkSink.sinkFrom() 启动流程: 这个方法通过调用 doWrite() 和 doCommit(),将 Paimon 的写入算子 (RowDataStoreWriteOperator) 和提交算子 (CommitterOperator) 通过 Flink DataStream API 的 transform() 方法编织到 Flink 的作业图中。
- 所以,FlinkSink 及其子类是 Paimon Flink Sink 实现的核心部分,它们负责定义和构建实际执行写入和提交的 Flink 算子。而 Flink SQL 通过 DynamicTableSink 机制间接触发了 FlinkSinkBuilder 和 FlinkSink 的调用,从而将 Paimon 的写入能力集成到 SQL 定义的作业中。用户通常不需要直接与 FlinkSink 交互,除非是在纯 DataStream API 中自定义 Paimon Sink。
数据的写入方式取决于表是否定义了主键。
Append-Only 表 (无主键):
- 写入操作相对简单。新的数据记录会直接追加到新的数据文件中。
- 每次提交会生成新的数据文件,并更新相关的 Manifest 和 Snapshot 元数据。
Primary Key 表 (有主键):
Paimon 对主键表采用 LSM (Log-Structured Merge-Tree) 树的结构来组织和管理数据文件。这使得 Paimon 能够高效地处理大量的更新 (Update) 和删除 (Delete) 操作。
内存中的写缓冲 (Write Buffer):
- 当数据写入主键表时,记录首先可能被写入内存中的写缓冲区。这个缓冲区通常是排序的。
- 源码中
MemoryFileStoreWrite
(路径:paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
) 及其子类负责处理这部分逻辑,它们会使用内存池进行管理。
刷盘 (Flush) 与 Sorted Runs:
- 当写缓冲区达到一定大小或满足其他刷盘条件时,内存中的数据会被刷写到磁盘,形成一个新的数据文件。这个文件内部的记录是根据主键排序的,称为一个 "Sorted Run"。
- 新生成的 Sorted Run 通常位于 LSM 树的 L0 层。L0 层的文件之间其主键范围可能存在重叠。
LSM 树的层级 (Levels):
- LSM 树将 Sorted Run (数据文件) 组织成多个层级 (Level 0, Level 1, Level 2, ...)。
- L0 层的文件通常是最新写入的,文件较小,且键范围可能重叠。
- 更高层级 (L1+) 的文件通常由低层级文件合并而来,文件较大,并且在同一层级内,文件之间的主键范围通常不重叠。
Levels.java
(源码路径:paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
) 类负责管理这些层级和其中的数据文件 (DataFileMeta
)。
Compaction (合并):
- 随着数据的不断写入和更新,L0 层的文件数量会增加,或者某些层级的文件变得过于碎片化。这会影响查询性能,因为查询可能需要读取和合并多个文件。
- Paimon 会自动或通过专门的作业触发 Compaction 操作。Compaction 会选择一个或多个层级中的若干 Sorted Run,将它们合并成新的、更少的、更大的 Sorted Run,并通常将结果写入到更高的层级。
- 在合并过程中,会处理掉被标记为删除的记录 (DELETE) 和相同主键的旧版本记录 (UPDATE),只保留最新的有效版本。
数据查找 (Lookup):
-
LookupLevels.java
(源码路径:org/apache/paimon/mergetree/LookupLevels.java
) 文件就是 LSM 树读取和查找逻辑的关键部分。 - 当需要根据主键查找一条记录时,
LookupLevels
会从指定的起始层级开始,逐层遍历 LSM 树中的数据文件。 lookup(InternalRow key, int startLevel)
: 这是查找操作的入口。createLookupFile(DataFileMeta file)
: 为了加速查找,特别是对于存储在远程文件系统(如 HDFS, S3)上的数据文件,Paimon 可能会为该数据文件在本地创建一个优化的 "lookup file"。- 这个 lookup file 本质上是一个本地的键值存储,通常包含布隆过滤器 (BloomFilter) 和对键的索引。
- 创建过程会读取原始数据文件,使用
keySerializer
序列化键,使用valueProcessor
处理和序列化值,然后通过LookupStoreWriter
写入本地文件。bfGenerator
用于生成布隆过滤器。
lookupFileCache
: 这是一个 Caffeine Cache,用于缓存已经创建的LookupFile
对象,避免对同一个数据文件重复构建 lookup file,提高后续查找效率。ValueProcessor
接口及其实现 (如KeyValueProcessor
,PositionedKeyValueProcessor
): 定义了如何从磁盘读取值字节数组并将其反序列化为目标类型 (T),以及如何将 KeyValue 持久化到磁盘(在创建 lookup file 时)。例如,PositionedKeyValueProcessor
还会处理行位置信息,这对于 Deletion Vector 等高级功能非常重要。
-
数据写入
在 Paimon 中,数据写入和提交是一个多阶段的过程,旨在确保数据的一致性和可见性。
1. 谁负责接收请求并实际写入数据?
- FlinkSink: 这是 Flink 作业中 Paimon Sink 的入口点和协调者。它负责构建数据写入和提交的 Flink DataStream 转换。
- Write Operator (由
createWriteOperatorFactory
创建):FlinkSink
的doWrite
方法会创建一个特定的写入算子 (Write Operator)。这个算子是 Flink 的一个OneInputStreamOperator
,它在流处理任务中并行运行。此算子接收来自上游的数据流。 StoreSinkWrite
(及其实现类,如StoreSinkWriteImpl
): 在 Write Operator 内部,会使用StoreSinkWrite
的实例。这个组件是实际负责将数据记录写入到物理存储文件(例如 Parquet、ORC 文件)的核心。它处理数据的序列化、文件格式的写入、以及与底层文件系统的交互。
简单来说,Flink 的并行任务实例上运行着 Write Operator,这些 Operator 接收数据,并通过内嵌的 StoreSinkWrite
将数据写入到对应的表文件(data files)中。
代码位置: 在 FlinkSink.java
中,doWrite
方法设置了这个写入流程:
FlinkSink.java
// ...
public DataStream<Committable> doWrite(
DataStream<T> input, String commitUser, @Nullable Integer parallelism) {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
boolean isStreaming = isStreaming(input);
boolean writeOnly = table.coreOptions().writeOnly();
// 1. 创建 Write Operator Factory, 其内部会使用 StoreSinkWrite.Provider
// createWriteOperatorFactory 是一个抽象方法,具体实现由子类提供,例如 AppendOnlyWriterOperatorFactory
// createWriteProvider 会根据配置选择 StoreSinkWrite 的具体实现,如 StoreSinkWriteImpl
SingleOutputStreamOperator<Committable> written =
input.transform(
(writeOnly ? WRITER_WRITE_ONLY_NAME : WRITER_WRITE_ONLY_NAME) + " : " + table.name(),
new CommittableTypeInfo(),
createWriteOperatorFactory( // 此工厂创建的算子负责实际写入
createWriteProvider( // 提供 StoreSinkWrite 实例
env.getCheckpointConfig(),
isStreaming,
hasSinkMaterializer(input)),
commitUser));
// ...
return written;
}
// StoreSinkWrite.Provider 的创建逻辑,最终会返回一个 StoreSinkWrite 的实例
private StoreSinkWrite.Provider createWriteProvider(
CheckpointConfig checkpointConfig, boolean isStreaming, boolean hasSinkMaterializer) {
// ...
// 示例:对于非 write-only 且非 full-compaction 的情况
if (coreOptions.laziedLookup()) {
return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
assertNoSinkMaterializer.run();
return new AsyncLookupSinkWrite( // 这是一个 StoreSinkWrite 实现
table,
commitUser,
state,
ioManager,
ignorePreviousFiles,
waitCompaction,
isStreaming,
memoryPool,
metricGroup);
};
}
return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
assertNoSinkMaterializer.run();
return new StoreSinkWriteImpl( // 这是另一个 StoreSinkWrite 实现,更常见
table,
commitUser,
state,
ioManager,
ignorePreviousFiles,
waitCompaction,
isStreaming,
memoryPool,
metricGroup);
};
}
// 子类需要实现这个方法来提供具体的 Writer Operator Factory
protected abstract OneInputStreamOperatorFactory<T, Committable> createWriteOperatorFactory(
StoreSinkWrite.Provider writeProvider, String commitUser);
// ...
数据写入过程和 Snapshot/Manifest 的创建是紧密相连的,共同构成了 Paimon 的事务提交机制:
- 数据文件 (Data Files):
- 由上述的
StoreSinkWrite
在 Flink Checkpoint 期间写入。这些是包含实际数据的物理文件(例如.parquet
文件)。
- 由上述的
Committable
对象:- 当 Write Operator (通过
StoreSinkWrite
) 完成一个 Checkpoint 的数据写入后,它会生成一个或多个Committable
对象。 Committable
(在 Paimon 中通常是ManifestCommittable
) 包含了这次写入操作的元数据信息,例如:- 新创建的数据文件的列表和它们的统计信息(行数、大小等)。
- 对于主键表,可能还包括需要标记为删除的文件(用于更新或删除操作)。
- 这些
Committable
对象会从并行的 Write Operator 发送到下游的单个 Committer Operator。
- 当 Write Operator (通过
- Committer Operator (提交者):
FlinkSink
的doCommit
方法会创建一个 Committer Operator (通常是CommitterOperatorFactory
创建的CommitterOperator
)。这个算子以单并行度运行。- 它负责收集来自所有并行 Write Operator 的
Committable
对象。 - 当 Flink 触发 Checkpoint 完成时,Committer Operator 会执行提交操作。这个提交操作由
Committer.Factory
(通过createCommitterFactory()
提供) 创建的Committer
(例如StoreCommitter
) 来执行。 Committer
的核心职责是:- 创建 Manifest 文件: 根据收集到的
Committable
中的信息,Committer
会生成新的 Manifest 文件。Manifest 文件是一个元数据文件,它列出了属于本次提交的数据文件(新增的、删除的)。 - 创建 Manifest List 文件: Manifest List 文件指向一个或多个 Manifest 文件。它将本次提交的所有变更(通过 Manifest 文件描述)组织起来。每个 Snapshot 都会指向一个 Manifest List。
- 创建 Snapshot 文件: 这是提交的最后一步,也是原子性的关键。
Committer
会创建一个新的 Snapshot 文件(一个 JSON 文件)。这个文件包含了:- 当前 Snapshot 的 ID。
- 本次提交使用的 Schema ID。
- 指向新创建的 Manifest List 文件的指针。
- 提交用户、提交类型 (APPEND, COMPACT 等)、提交时间等元数据。
- 关于本次提交的统计信息,如总记录数、增量记录数等。
- 创建 Manifest 文件: 根据收集到的
- 原子性: Snapshot 文件的创建(通常是通过文件系统的 rename 操作)是原子性的。一旦 Snapshot 文件成功创建,这次提交写入的数据就对查询可见了。
代码位置: 在 FlinkSink.java
中,doCommit
方法负责提交阶段。
总结:
数据写入阶段 (并行):
- Flink 的并行 Sink Task (Write Operator) 接收数据。
- 每个 Task 内的
StoreSinkWrite
将数据写入到实际的存储介质,生成数据文件 (data files)。 - 每个 Task 在 Checkpoint 时生成
Committable
,描述其写入的文件和元数据。
提交阶段 (串行):
- 单个 Committer Operator 收集所有并行 Writer Task 发送过来的
Committable
。 - 在 Flink Checkpoint 成功后,Committer Operator 执行提交:
- 根据
Committable
创建新的 Manifest 文件 (列出数据文件变更)。 - 创建新的 Manifest List 文件 (指向相关的 Manifest 文件)。
- 最后,原子地创建新的 Snapshot 文件 (指向 Manifest List,并包含提交元数据)。
- 根据
- Snapshot 文件的成功创建标志着一批数据的成功提交和对外可见。
- 单个 Committer Operator 收集所有并行 Writer Task 发送过来的
这个过程类似于两阶段提交(2PC)的思想,确保了数据写入的原子性和一致性。FlinkSink
扮演了协调者的角色,将 Paimon 核心的写入 (StoreSinkWrite
) 和提交 (StoreCommitter
,通常由 createCommitterFactory
间接提供)逻辑编排到 Flink 的流式处理框架中。
FlinkSink
FlinkSink
本身并不是一个 Flink 的算子 (Operator)。它更像是一个构建器 (Builder) 或协调器 (Coordinator),负责将 Paimon 的写入逻辑组装成一个 Flink DataStream 的 Sink 部分。
以下是这个过程:
用户创建
FlinkSink
实例: 用户代码(通常是在 Flink Table API/SQL 层面通过DynamicTableSink
间接创建,或者直接使用FlinkSinkBuilder
)会创建一个FlinkSink
的具体子类实例 (例如AppendOnlyFlinkSink
,PrimaryKeyFlinkSink
),并传入FileStoreTable
对象。FlinkSink.sinkFrom(DataStream<T> input)
被调用: 这是将 Paimon Sink 连接到 Flink DataStream 的入口点。FlinkSink.doWrite()
:- 此方法接收上游的
DataStream<T>
。 - 它调用
createWriteOperatorFactory()
(这是一个抽象方法,由FlinkSink
的子类实现)。这个工厂方法会返回一个StreamOperatorFactory
,例如RowDataStoreWriteOperator.Factory
。 RowDataStoreWriteOperator.Factory
的构造函数接收FileStoreTable
、可选的LogSinkFunction
、以及一个StoreSinkWrite.Provider
(这个 Provider 是由FlinkSink.createWriteProvider()
根据表配置创建的)。- 然后,
doWrite()
方法使用 Flink DataStream API 的transform()
方法,将这个RowDataStoreWriteOperator.Factory
应用到输入流上。transform()
方法会将这个工厂包装成一个 Flink 的转换操作,当 Flink 作业图构建和执行时,Flink 会使用这个工厂来创建实际的RowDataStoreWriteOperator
实例,并在 TaskManager 上并行运行它们。
- 此方法接收上游的
FlinkSink.doCommit()
:doWrite()
返回一个DataStream<Committable>
。doCommit()
方法接收这个Committable
流,并类似地使用transform()
方法应用一个CommitterOperatorFactory
。这个工厂会创建CommitterOperator
实例。CommitterOperator
负责收集Committable
并在 Checkpoint 完成后执行 Paimon 的提交逻辑(创建 Manifest 和 Snapshot 文件)。这个算子通常以单并行度运行。
通过这种方式,FlinkSink
将 Paimon 的写入和提交逻辑封装成了标准的 Flink StreamOperatorFactory
,使得它们可以无缝地集成到 Flink 的 DataStream 作业图中,并由 Flink 的运行时环境来调度和执行。RowDataStoreWriteOperator
就是这个写入阶段被 Flink 实际执行的核心算子。
总结一下这个链条:
FlinkSink.sinkFrom()
->FlinkSink.doWrite()
doWrite()
调用子类实现的createWriteOperatorFactory()
,并将createWriteProvider()
的结果(一个StoreSinkWrite.Provider
)传递给它。- 子类的
createWriteOperatorFactory()
创建一个 Flink 的OneInputStreamOperatorFactory
(例如RowDataStoreWriteOperator.Factory
)。 - 这个 Flink Operator Factory 在创建具体的 Flink Operator (例如
RowDataStoreWriteOperator
) 时,会调用StoreSinkWrite.Provider.provide()
方法。 provide()
方法根据FlinkSink.createWriteProvider()
中的逻辑,实例化一个具体的StoreSinkWrite
实现 (例如StoreSinkWriteImpl
)。此时,FlinkSink
中持有的FileStoreTable
实例会被传递给StoreSinkWrite
的构造函数。StoreSinkWriteImpl
在其构造函数中,使用传入的FileStoreTable
调用table.newWrite(commitUser)
来创建一个TableWriteImpl
实例。这个TableWriteImpl
才是真正与 Paimon 核心写逻辑交互的组件。- 当 Flink Operator 的
processElement()
方法被调用时,它会调用StoreSinkWriteImpl.write(rowData)
。 StoreSinkWriteImpl.write(rowData)
最终会调用其内部持有的TableWriteImpl.write(rowData)
方法,将数据写入文件。- 在 Checkpoint 时,Flink Operator 调用
StoreSinkWriteImpl.prepareCommit()
,后者又调用TableWriteImpl.prepareCommit()
来生成ManifestCommittable
,其中包含了新写入文件的元数据。
核心流程概览
FlinkSink.doWrite()
: 这是发起写入操作的入口。它负责构建 Flink 的DataStream
转换,将输入数据流转换为Committable
对象流。createWriteOperatorFactory()
(抽象方法):FlinkSink
是一个抽象类,这个方法需要由其子类实现(例如FixedBucketSink
,CdcFixedBucketSink
等)。此工厂方法创建了一个 Flink 的OneInputStreamOperatorFactory
。这个工厂最终会生产出在 TaskManager 上实际执行数据写入的 Flink 算子 (Operator)。createWriteProvider()
: 在FlinkSink.doWrite()
内部,会调用createWriteProvider()
方法。这个方法根据FileStoreTable
的配置(例如changelog-producer
,write-only
等)来决定使用哪种StoreSinkWrite.Provider
。StoreSinkWrite.Provider
: 这是一个函数式接口,它的provide()
方法负责实例化一个具体的StoreSinkWrite
实现。这个provide()
方法会在 Flink 的写入算子 (Operator) 初始化时被调用。StoreSinkWrite
(例如StoreSinkWriteImpl
): 这是真正执行数据写入逻辑的核心组件。它接收 Flink 算子传递过来的每一条数据,并利用FileStoreTable
提供的能力将数据写入到存储中。FileStoreTable
的作用:- 提供表元数据:
StoreSinkWrite
通过FileStoreTable
对象获取表的 Schema、分区信息、Bucket 定义、核心选项 (CoreOptions) 等。 - 创建
TableWrite
对象:FileStoreTable
有一个关键方法,如newWrite(commitUser)
,它会返回一个TableWrite
实例 (通常是TableWriteImpl
)。这个TableWrite
对象封装了向特定 Paimon 表写入数据的底层细节,包括文件格式处理、数据文件组织、索引更新(如果适用)等。
- 提供表元数据:
TableWrite
(例如TableWriteImpl
):StoreSinkWrite
内部会持有并使用这个TableWrite
对象来执行具体的写操作,比如调用tableWrite.write(rowData)
。
代码梳理
让我们逐步看代码:
1. FlinkSink.java
- 写入的起点
FlinkSink.java
// ... existing code ...
public DataStream<Committable> doWrite(
DataStream<T> input, String commitUser, @Nullable Integer parallelism) {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
boolean isStreaming = isStreaming(input);
boolean writeOnly = table.coreOptions().writeOnly();
// 1. 调用 createWriteOperatorFactory,这是一个抽象方法,由子类实现。
// 子类的实现会使用 createWriteProvider 返回的 StoreSinkWrite.Provider。
SingleOutputStreamOperator<Committable> written =
input.transform(
(writeOnly ? WRITER_WRITE_ONLY_NAME : WRITER_NAME) + " : " + table.name(),
new CommittableTypeInfo(),
createWriteOperatorFactory( // 子类实现此方法
createWriteProvider( // 2. createWriteProvider 决定使用哪个 StoreSinkWrite 实现
env.getCheckpointConfig(),
isStreaming,
hasSinkMaterializer(input)),
commitUser));
if (parallelism == null) {
forwardParallelism(written, input);
} else {
written.setParallelism(parallelism);
}
Options options = Options.fromMap(table.options());
// ... existing code ...
if (!table.primaryKeys().isEmpty() && options.get(PRECOMMIT_COMPACT)) {
// ... existing code ...
}
return written;
}
// ... existing code ...
private StoreSinkWrite.Provider createWriteProvider(
CheckpointConfig checkpointConfig, boolean isStreaming, boolean hasSinkMaterializer) {
SerializableRunnable assertNoSinkMaterializer =
() ->
Preconditions.checkArgument(
!hasSinkMaterializer,
String.format(
"Sink materializer must not be used with Paimon sink. "
+ "Please set '%s' to '%s' in Flink's config.",
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE
.key(),
ExecutionConfigOptions.UpsertMaterialize.NONE.name()));
Options options = table.coreOptions().toConfiguration();
ChangelogProducer changelogProducer = table.coreOptions().changelogProducer();
boolean waitCompaction;
CoreOptions coreOptions = table.coreOptions();
if (coreOptions.writeOnly()) {
// ... existing code ...
}
// 3. 根据 FileStoreTable 的 coreOptions 决定返回哪种 StoreSinkWrite.Provider
// 例如,如果配置了 laziedLookup,则返回 AsyncLookupSinkWrite 的 Provider
if (coreOptions.laziedLookup()) {
return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
assertNoSinkMaterializer.run();
// 当 Flink Operator 调用这个 provider 的 provide() 方法时,会创建 AsyncLookupSinkWrite 实例
return new AsyncLookupSinkWrite(
table, // 这个 table 就是 FlinkSink持有的 FileStoreTable
commitUser,
state,
ioManager,
ignorePreviousFiles,
waitCompaction,
isStreaming,
memoryPool,
metricGroup);
};
}
// 默认情况下,返回 StoreSinkWriteImpl 的 Provider
return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
assertNoSinkMaterializer.run();
// 当 Flink Operator 调用这个 provider 的 provide() 方法时,会创建 StoreSinkWriteImpl 实例
return new StoreSinkWriteImpl(
table, // 这个 table 就是 FlinkSink持有的 FileStoreTable
commitUser,
state,
ioManager,
ignorePreviousFiles,
waitCompaction,
isStreaming,
memoryPool,
metricGroup);
};
}
// ... existing code ...
// 4. 这是一个抽象方法,由 FlinkSink 的子类实现。
// 例如 FixedBucketSink, CdcFixedBucketSink 等。
// 子类的实现会创建一个 Flink 的 OneInputStreamOperatorFactory,
// 这个 Factory 在其内部的 open() 或 createStreamOperator() 方法中,
// 会调用传入的 writeProvider.provide(...) 来获取 StoreSinkWrite 实例。
protected abstract OneInputStreamOperatorFactory<T, Committable> createWriteOperatorFactory(
StoreSinkWrite.Provider writeProvider, String commitUser);
// ... existing code ...
2. FlinkSink
的子类如何实现 createWriteOperatorFactory
(以 FixedBucketSink
为例)
FixedBucketSink
是 FlinkWriteSink
的子类,而 FlinkWriteSink
是 FlinkSink
的子类。
FixedBucketSink.java
// ... existing code ...
public class FixedBucketSink extends FlinkWriteSink<InternalRow> {
// ... existing code ...
public FixedBucketSink(
FileStoreTable table,
@Nullable Map<String, String> overwritePartition,
@Nullable LogSinkFunction logSinkFunction) {
super(table, overwritePartition);
this.logSinkFunction = logSinkFunction;
}
@Override
protected OneInputStreamOperatorFactory<InternalRow, Committable> createWriteOperatorFactory(
StoreSinkWrite.Provider writeProvider, String commitUser) {
// 5. FixedBucketSink 实现了 createWriteOperatorFactory。
// 它创建了 RowDataStoreWriteOperator.Factory。
// 这个 Factory 会在其内部创建 RowDataStoreWriteOperator 实例。
// RowDataStoreWriteOperator 在其 open() 方法中会调用 writeProvider.provide()
// 来获取 StoreSinkWrite 实例,并将其保存在成员变量中,用于后续的 processElement() 调用。
return new RowDataStoreWriteOperator.Factory(
table, // 将 FileStoreTable 传递下去
logSinkFunction,
writeProvider, // 将 FlinkSink.createWriteProvider() 返回的 Provider 传递下去
commitUser);
}
}
RowDataStoreWriteOperator
Flink 算子,使用 StoreSinkWrite.Provider
RowDataStoreWriteOperator
是一个 Flink 的流处理算子 (StreamOperator),其主要作用是:
- 接收上游数据: 它是一个
OneInputStreamOperator
,意味着它从上游 Flink 算子接收数据流。在这个特定的类中,它处理的是InternalRow
类型的数据,这是 Flink SQL 和 Table API 中常用的内部行数据表示格式。
// ...
public class RowDataStoreWriteOperator extends TableWriteOperator<InternalRow> {
// ...
@Override
public void processElement(StreamRecord<InternalRow> element) throws Exception {
sinkContext.timestamp = element.hasTimestamp() ? element.getTimestamp() : null;
SinkRecord record;
try {
// 调用父类 TableWriteOperator 中持有的 StoreSinkWrite 实例的 write 方法
record = write.write(element.getValue());
} catch (Exception e) {
throw new IOException(e);
}
if (record != null
&& logSinkFunction != null
&& (!logIgnoreDelete || record.row().getRowKind().isAdd())) {
// write to log store, need to preserve original pk (which includes partition fields)
SinkRecord logRecord = write.toLogRecord(record);
logSinkFunction.invoke(logRecord, sinkContext);
}
}
// ...
}
- 实际写入数据到 Paimon 表:
- 它继承自
TableWriteOperator
。在TableWriteOperator
的open()
方法中(或者更早的构造阶段,具体取决于实现),会通过传入的StoreSinkWrite.Provider
来实例化一个StoreSinkWrite
(例如StoreSinkWriteImpl
)。 RowDataStoreWriteOperator
的processElement
方法会调用这个StoreSinkWrite
实例的write(InternalRow)
方法,将接收到的InternalRow
数据实际写入到 Paimon 表的存储文件中(如 Parquet 文件)。
- 它继承自
- 处理 Log Sink (可选):
- 如果配置了
logSinkFunction
(用于将数据变更同步到外部消息队列如 Kafka),RowDataStoreWriteOperator
会在写入主存储后,将相应的SinkRecord
(可能经过转换以保留原始主键) 发送到logSinkFunction
进行处理。 - 它还负责处理 Watermark 并将其传递给
logSinkFunction
。
RowDataStoreWriteOperator.java
// ... public class RowDataStoreWriteOperator extends TableWriteOperator<InternalRow> { // ... @Nullable private final LogSinkFunction logSinkFunction; // ... @Override public void open() throws Exception { super.open(); this.sinkContext = new SimpleContext(getProcessingTimeService()); if (logSinkFunction != null) { openFunction(logSinkFunction); logCallback = new LogWriteCallback(); logSinkFunction.setWriteCallback(logCallback); logIgnoreDelete = Options.fromMap(table.options()).get(LOG_IGNORE_DELETE); } } // ... @Override public void processElement(StreamRecord<InternalRow> element) throws Exception { // ... (写入主存储的代码) ... if (record != null && logSinkFunction != null && (!logIgnoreDelete || record.row().getRowKind().isAdd())) { // write to log store, need to preserve original pk (which includes partition fields) SinkRecord logRecord = write.toLogRecord(record); logSinkFunction.invoke(logRecord, sinkContext); } } // ... @Override public void processWatermark(Watermark mark) throws Exception { super.processWatermark(mark); this.currentWatermark = mark.getTimestamp(); if (logSinkFunction != null) { logSinkFunction.writeWatermark( new org.apache.flink.api.common.eventtime.Watermark(mark.getTimestamp())); } } // ... }
- 如果配置了
- 生成 Committable:
- 在 Flink Checkpoint 触发时,
RowDataStoreWriteOperator
(通过其父类TableWriteOperator
和PrepareCommitOperator
) 的prepareCommit
方法会被调用。 - 这个方法会调用
StoreSinkWrite.prepareCommit()
来获取本次 Checkpoint 写入操作的元数据,封装成Committable
对象。 - 如果配置了
logSinkFunction
,它还会收集来自logCallback
的 Log Offset 信息,并将其也封装成Committable
对象。 - 这些
Committable
对象会被发送到下游的 Committer 算子。
RowDataStoreWriteOperator.java
// ... public class RowDataStoreWriteOperator extends TableWriteOperator<InternalRow> { // ... @Override protected List<Committable> prepareCommit(boolean waitCompaction, long checkpointId) throws IOException { // 调用父类的 prepareCommit,它会调用 storeSinkWrite.prepareCommit() List<Committable> committables = super.prepareCommit(waitCompaction, checkpointId); if (logCallback != null) { try { Objects.requireNonNull(logSinkFunction).flush(); } catch (Exception e) { throw new IOException(e); } logCallback .offsets() .forEach( (k, v) -> committables.add( new Committable( checkpointId, Committable.Kind.LOG_OFFSET, new LogOffsetCommittable(k, v)))); } return committables; } // ... }
- 在 Flink Checkpoint 触发时,
- 状态管理: 它还负责管理自身的状态(例如
StoreSinkWriteState
,以及logSinkFunction
的状态),确保在故障恢复时能够正确恢复。
StoreSinkWrite 接口
StoreSinkWrite 接口的核心职责:
- 写入数据: write(InternalRow rowData) 和 write(InternalRow rowData, int bucket) 是核心的写入方法。
- 处理 Compaction: compact(...) 用于触发数据文件的合并。
- 生成 Committable: prepareCommit(...) 在 Flink Checkpoint 时被调用,用于生成包含本次写入元数据的 Committable 对象。
- 状态管理: snapshotState() 用于在 Checkpoint 时保存自身状态。
- 生命周期管理: close() 用于释放资源。
- Schema 变更支持: replace(FileStoreTable newTable) 用于在 CDC 场景下处理 Schema 变更,通过替换内部的 TableWriteImpl 来适应新的 Schema。
在 FlinkSink.createWriteProvider()
方法中,会根据 FileStoreTable
的配置来决定返回哪种 StoreSinkWrite.Provider
。
三个实现类的功能总结
StoreSinkWriteImpl
(父类/基础实现)- 功能:提供了 Paimon Sink 端写入操作的基础框架和通用逻辑。
- 核心职责:
- 管理底层的
TableWriteImpl
对象,该对象负责实际的数据写入和文件操作。 - 处理数据写入请求 (
write
方法),将数据路由到正确的 bucket。 - 处理合并请求 (
compact
方法)。 - 在 Flink checkpoint 时,准备提交物 (
prepareCommit
方法),这些提交物包含了新生成的数据文件和需要更新的元数据信息。 - 管理 Flink 的状态 (
StoreSinkWriteState
),用于在故障恢复时恢复写入进度或相关信息。 - 处理与内存池、IO 管理器等的交互。
- 管理底层的
- 特点:它本身不包含特定的高级合并策略,而是提供了一个可扩展的基础,让子类可以实现更复杂的行为。
AsyncLookupSinkWrite
(子类/特定场景优化)- 功能:专门为配置了异步查找(lookup changelog producer 且
LOOKUP_WAIT
为 false)的 Paimon 表设计的 Sink Write。 - 核心职责:
- 在任务初始化(特别是从 checkpoint 恢复)时,对之前记录为“活跃”(active)的 buckets 执行一次常规合并 (
compact(..., false)
)。 - 在 Flink checkpoint 时,将当前底层
AbstractFileStoreWrite
认为“活跃”的 buckets 列表保存到 Flink 状态中。
- 在任务初始化(特别是从 checkpoint 恢复)时,对之前记录为“活跃”(active)的 buckets 执行一次常规合并 (
- 特点:
- 其合并操作是针对性的(只针对活跃 bucket)和即时的(在恢复时立即执行)。
- 合并类型通常是常规合并,而非全量合并。
- 目的是确保在异步查找场景下,因异步操作可能导致数据处于中间状态的 bucket 能够快速合并,保证数据一致性或为后续查找准备数据。
- 功能:专门为配置了异步查找(lookup changelog producer 且
GlobalFullCompactionSinkWrite
(子类/周期性全局优化)- 功能:实现周期性的、全局性的全量合并。
- 核心职责:
- 跟踪记录所有被写入过的
(partition, bucket)
,并按 checkpointId 进行组织。 - 根据
deltaCommits
参数定义的周期,在某个 checkpoint 触发对所有历史写入过的、且尚未被成功全量合并的 buckets 执行一次全量合并 (compact(..., true)
)。 - 通过检查 Paimon Snapshot 来确认全量合并操作是否成功提交。
- 将需要跟踪的写入 bucket 信息持久化到 Flink 状态。
- 跟踪记录所有被写入过的
- 特点:
- 合并操作是全局性的和周期性的。
- 合并类型是全量合并,更为彻底。
- 目的是定期对整个表进行深度优化,控制小文件,提升整体性能和数据紧凑性。
StoreSinkWriteImpl
StoreSinkWrite
的标准实现,它直接封装了 Paimon 核心的 TableWriteImpl
来执行实际的写操作。
核心功能和实现:
构造与初始化:
- 在构造时,它接收
FileStoreTable
、commitUser
、StoreSinkWriteState
(用于状态管理)、Flink 的IOManager
、内存池 (MemorySegmentPool
或MemoryPoolFactory
) 以及MetricGroup
等。 - 最关键的一步是在构造函数中调用
newTableWrite(table)
方法,该方法会使用传入的FileStoreTable
实例来创建一个TableWriteImpl<?>
实例。这个TableWriteImpl
负责所有与 Paimon 文件存储交互的底层细节。
StoreSinkWriteImpl.java
// ... public class StoreSinkWriteImpl implements StoreSinkWrite { // ... protected final String commitUser; protected final StoreSinkWriteState state; // 用于状态管理 private final IOManagerImpl paimonIOManager; // Flink IOManager 包装 // ... 其他成员 protected TableWriteImpl<?> write; // 核心:持有 TableWriteImpl 实例 public StoreSinkWriteImpl( FileStoreTable table, String commitUser, StoreSinkWriteState state, IOManager ioManager, boolean ignorePreviousFiles, boolean waitCompaction, boolean isStreamingMode, @Nullable MemorySegmentPool memoryPool, @Nullable MemoryPoolFactory memoryPoolFactory, // CDC 场景可能使用 @Nullable MetricGroup metricGroup) { this.commitUser = commitUser; this.state = state; this.paimonIOManager = new IOManagerImpl(ioManager); this.ignorePreviousFiles = ignorePreviousFiles; this.waitCompaction = waitCompaction; this.isStreamingMode = isStreamingMode; this.memoryPool = memoryPool; this.memoryPoolFactory = memoryPoolFactory; this.metricGroup = metricGroup; // 关键:创建 TableWriteImpl 实例 this.write = newTableWrite(table); } private TableWriteImpl<?> newTableWrite(FileStoreTable table) { checkArgument( !(memoryPool != null && memoryPoolFactory != null), "memoryPool and memoryPoolFactory cannot be set at the same time."); TableWriteImpl<?> tableWrite = table.newWrite( // 使用 FileStoreTable 创建 TableWrite commitUser, (part, bucket) -> state.stateValueFilter().filter(table.name(), part, bucket)) .withIOManager(paimonIOManager) .withIgnorePreviousFiles(ignorePreviousFiles) .withExecutionMode(isStreamingMode) .withBucketMode(table.bucketMode()); if (metricGroup != null) { tableWrite.withMetricRegistry(new FlinkMetricRegistry(metricGroup)); } if (memoryPoolFactory != null) { tableWrite.withMemoryPoolFactory(memoryPoolFactory); } else { tableWrite.withMemoryPool( memoryPool != null ? memoryPool : new HeapMemorySegmentPool( // 默认使用堆内存池 table.coreOptions().writeBufferSize(), table.coreOptions().pageSize())); } if (insertOnly != null) { tableWrite.withInsertOnly(insertOnly); } return tableWrite; } // ... }
- 在构造时,它接收
数据写入 (
write
方法):write(InternalRow rowData)
和write(InternalRow rowData, int bucket)
方法直接将调用委托给内部持有的TableWriteImpl
实例的相应writeAndReturn
方法。TableWriteImpl
会处理数据的序列化、分桶、写入到数据文件(例如 Parquet)、以及可能的索引更新等。
StoreSinkWriteImpl.java
// ... @Override @Nullable public SinkRecord write(InternalRow rowData) throws Exception { return write.writeAndReturn(rowData); // 委托给 TableWriteImpl } @Override @Nullable public SinkRecord write(InternalRow rowData, int bucket) throws Exception { return write.writeAndReturn(rowData, bucket); // 委托给 TableWriteImpl } // ...
Compaction (
compact
方法):- 同样,
compact
方法将调用委托给TableWriteImpl.compact()
。
StoreSinkWriteImpl.java
// ... @Override public void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception { write.compact(partition, bucket, fullCompaction); // 委托给 TableWriteImpl } // ...
- 同样,
准备提交 (
prepareCommit
方法):- 调用
TableWriteImpl.prepareCommit(waitCompaction, checkpointId)
来获取CommitIncrement
。 CommitIncrement
包含了新写入的数据文件 (newFilesIncrement
) 和合并产生的文件 (compactIncrement
) 的元数据。- 然后将这些元数据封装成
Committable
对象列表返回。这些Committable
会被发送到 Flink 的 Committer 算子。
StoreSinkWriteImpl.java
// ... @Override public List<Committable> prepareCommit(boolean waitCompaction, long checkpointId) throws IOException { List<Committable> committables = new ArrayList<>(); if (write != null) { try { // 调用 TableWriteImpl 的 prepareCommit CommitIncrement increment = write.prepareCommit(waitCompaction, checkpointId); if (LOG.isDebugEnabled()) { LOG.debug("Writer {} committed.", ಸ್ಥಾಪನೆ()); } committables.add( new Committable(checkpointId, Committable.Kind.FILE, increment)); } catch (Exception e) { throw new IOException(e); } } return committables; } // ...
- 调用
状态快照 (
snapshotState
方法):- 调用
TableWriteImpl.snapshotState()
来获取TableWriteImpl
内部的状态(例如,当前正在写入的文件、buffer 中的数据等)。 - 将这些状态通过
StoreSinkWriteState.put()
方法保存起来。StoreSinkWriteState
通常由StoreSinkWriteStateImpl
实现,它使用 Flink 的 Operator State (如ListState
) 来持久化状态。
StoreSinkWriteImpl.java
// ... @Override public void snapshotState() throws Exception { if (write == null) { return; } List<TableWriteImpl.State<InternalRow>> writeStates = write.checkpoint(); state.put( write.table().name(), StoreSinkWriteState.WRITER_STATE_NAME, writeStates.stream() .map( s -> new StoreSinkWriteState.StateValue( s.partition(), s.bucket(), SERIALIZER.serialize(s))) .collect(Collectors.toList())); } // ...
- 调用
Schema 变更 (
replace
方法):- 这是为了支持 CDC Sink 在遇到 Schema 变更时能够动态更新写入逻辑。
- 它会先调用当前
write.checkpoint()
获取旧TableWriteImpl
的状态。 - 然后使用新的
FileStoreTable
(包含新的 Schema) 创建一个新的TableWriteImpl
实例。 - 最后调用新
TableWriteImpl
的restore()
方法,将旧的状态恢复到新的TableWriteImpl
中。
StoreSinkWriteImpl.java
// ... @Override public void replace(FileStoreTable newTable) throws Exception { if (write != null) { List<TableWriteImpl.State<InternalRow>> states = write.checkpoint(); write.close(); write = newTableWrite(newTable); write.restore(states); } else { // Committer may not be initialized write = newTableWrite(newTable); } } // ...
其他:
withInsertOnly()
: 通知底层的TableWriteImpl
当前是否为仅插入模式。toLogRecord()
: 将SinkRecord
转换为用于 Log Store 的记录。close()
: 关闭内部的TableWriteImpl
。
总结 StoreSinkWriteImpl
: 它是 Paimon Flink Sink 中数据写入和状态管理的核心执行者,通过组合和委托 TableWriteImpl
来完成大部分工作。它为标准的 Paimon 表写入提供了通用的实现。
GlobalFullCompactionSinkWrite
GlobalFullCompactionSinkWrite
的核心目标是实现一种周期性的、全局性的全量合并(Full Compaction)机制。
想象一下,在一个持续写入数据的 Paimon 表中,数据会不断地以小文件的形式追加。随着时间的推移,小文件会越来越多,这可能会导致以下问题:
- 查询性能下降:查询时需要扫描大量的小文件。
- 元数据管理开销增大:需要跟踪和管理更多的文件。
- 存储空间利用率不高:小文件可能导致存储空间的碎片化。
Paimon 通过 Compaction(合并)机制来解决这些问题,将小文件合并成更大的文件。而 Full Compaction 是一种更彻底的合并,它会读取一个 bucket 下的所有数据,进行排序和去重(如果表有主键),然后写回新的、更紧凑的文件。
GlobalFullCompactionSinkWrite
的特殊之处在于:
- 全局性(Globally):它不是针对单个 bucket 或单个 writer 实例的合并,而是试图在某个时间点(Flink 的 checkpoint)对所有曾经被写入过的 bucket 都触发一次全量合并。
- 周期性(Periodically):它不是每次 checkpoint 都触发,而是根据
deltaCommits
参数来决定触发的频率。例如,如果deltaCommits
设置为 10,那么大约每 10 次 Flink 的 checkpoint commit 之后,它会尝试进行一次全局全量合并。 - 状态化跟踪(Stateful Tracking):
- 它会通过 Flink 的状态机制持久化记录在不同 checkpoint 期间,哪些分区(partition)和桶(bucket)被写入过数据。这是通过
writtenBuckets
(一个NavigableMap<Long, Set<Tuple2<BinaryRow, Integer>>>
) 实现的,其中Long
是 checkpointId,Set
包含了该 checkpoint 写入的(partition, bucket)
对。 - 它还会跟踪哪些 checkpoint 触发了全量合并,并等待确认这些合并操作是否真的成功提交(通过检查 Paimon 的 Snapshot)。这是通过
commitIdentifiersToCheck
(一个TreeSet<Long>
) 实现的。
- 它会通过 Flink 的状态机制持久化记录在不同 checkpoint 期间,哪些分区(partition)和桶(bucket)被写入过数据。这是通过
工作流程梳理:
数据写入/常规合并时 (
write
,compact
方法):- 当有数据写入或发生常规合并时,会调用
touchBucket
方法。 touchBucket
将当前操作涉及的(partition, bucket)
记录到currentWrittenBuckets
集合中。
- 当有数据写入或发生常规合并时,会调用
准备提交时 (
prepareCommit
方法,在 Flink Checkpoint 触发):- 检查已成功的合并:调用
checkSuccessfulFullCompaction()
。这个方法会检查 Paimon 的 Snapshot,看之前由这个 SinkWrite 触发的全量合并(其 commit identifier 记录在commitIdentifiersToCheck
中)是否已经成功生成了COMPACT
类型的 Snapshot。如果成功,就从writtenBuckets
和commitIdentifiersToCheck
中移除相关的记录,表示这些 bucket 已经被成功地全量合并过了,不需要再跟踪它们之前的写入状态了。 - 收集当前写入的 Buckets:将
currentWrittenBuckets
(本次 checkpoint 期间写入的 buckets)中的内容添加到writtenBuckets
中,并以当前checkpointId
作为键。然后清空currentWrittenBuckets
。 - 判断是否触发全局全量合并:
- 检查
writtenBuckets
是否为空(即之前是否有过写入)。 - 检查当前
checkpointId
是否满足isFullCompactedIdentifier(checkpointId, deltaCommits)
条件。这个条件通常是checkpointId % deltaCommits == 0
或类似的逻辑,用于控制触发频率。 - 如果以上两个条件都满足,则将
waitCompaction
标志设置为true
。
- 检查
- 执行全局全量合并:
- 如果
waitCompaction
为true
:- 调用
submitFullCompaction(checkpointId)
。这个方法会遍历writtenBuckets
中记录的所有历史 checkpoint 期间写入过的所有唯一的(partition, bucket)
,并对每一个都调用write.compact(partition, bucket, true)
来执行全量合并。 - 将当前的
checkpointId
添加到commitIdentifiersToCheck
,表示我们触发了一次全量合并,后续需要检查其是否成功。
- 调用
- 如果
- 调用父类
prepareCommit
:将(可能被修改过的)waitCompaction
标志和checkpointId
传递给父类,父类会处理实际的提交物(Committable)生成。
- 检查已成功的合并:调用
状态快照时 (
snapshotState
方法,在 Flink Checkpoint 触发):- 将
writtenBuckets
中的内容(记录了哪些 checkpoint 写入了哪些 bucket)序列化并保存到 Flink 的状态后端。这样在作业恢复时,可以恢复这些信息,确保不会丢失需要合并的 bucket 记录。
- 将
主要实现细节:
继承与构造:
- 继承
StoreSinkWriteImpl
。 - 构造函数接收
deltaCommits
参数,这个参数决定了大约每隔多少次 Flink 的 checkpoint commit 之后触发一次全局全量合并。 - 它会从 Flink 的状态(state)中恢复之前记录的
writtenBuckets
。writtenBuckets
是一个NavigableMap<Long, Set<Tuple2<BinaryRow, Integer>>>
,key 是 checkpointId,value 是在该 checkpoint 期间被写入过的分区和桶的集合。
GlobalFullCompactionSinkWrite.java
// ... existing code ... public class GlobalFullCompactionSinkWrite extends StoreSinkWriteImpl { private static final Logger LOG = LoggerFactory.getLogger(GlobalFullCompactionSinkWrite.class); private final int deltaCommits; private final String tableName; private final SnapshotManager snapshotManager; private final Set<Tuple2<BinaryRow, Integer>> currentWrittenBuckets; private final NavigableMap<Long, Set<Tuple2<BinaryRow, Integer>>> writtenBuckets; private static final String WRITTEN_BUCKETS_STATE_NAME = "paimon_written_buckets"; private final TreeSet<Long> commitIdentifiersToCheck; public GlobalFullCompactionSinkWrite( FileStoreTable table, String commitUser, StoreSinkWriteState state, IOManager ioManager, boolean ignorePreviousFiles, boolean waitCompaction, int deltaCommits, boolean isStreaming, @Nullable MemorySegmentPool memoryPool, MetricGroup metricGroup) { super( table, commitUser, state, ioManager, ignorePreviousFiles, waitCompaction, isStreaming, memoryPool, metricGroup); this.deltaCommits = deltaCommits; this.tableName = table.name(); this.snapshotManager = table.snapshotManager(); currentWrittenBuckets = new HashSet<>(); writtenBuckets = new TreeMap<>(); List<StoreSinkWriteState.StateValue> writtenBucketStateValues = state.get(tableName, WRITTEN_BUCKETS_STATE_NAME); if (writtenBucketStateValues != null) { for (StoreSinkWriteState.StateValue stateValue : writtenBucketStateValues) { writtenBuckets .computeIfAbsent(bytesToLong(stateValue.value()), k -> new HashSet<>()) .add(Tuple2.of(stateValue.partition(), stateValue.bucket())); } } commitIdentifiersToCheck = new TreeSet<>(); } // ... existing code ...
- 继承
追踪写入的 Buckets (
touchBucket
):- 重写了
write
和compact
方法。在调用父类相应方法后,会调用touchBucket
。 touchBucket
方法会将当前写入或合并操作涉及的(partition, bucket)
(分区和桶)记录到currentWrittenBuckets
集合中。为了防止BinaryRow
对象被复用导致的问题,这里会使用partition.copy()
。
GlobalFullCompactionSinkWrite.java
// ... existing code ... @Override @Nullable public SinkRecord write(InternalRow rowData) throws Exception { SinkRecord sinkRecord = super.write(rowData); if (sinkRecord != null) { touchBucket(sinkRecord.partition(), sinkRecord.bucket()); } return sinkRecord; } @Override @Nullable public SinkRecord write(InternalRow rowData, int bucket) throws Exception { SinkRecord sinkRecord = super.write(rowData, bucket); if (sinkRecord != null) { touchBucket(sinkRecord.partition(), bucket); } return sinkRecord; } @Override public void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception { super.compact(partition, bucket, fullCompaction); touchBucket(partition, bucket); } private void touchBucket(BinaryRow partition, int bucket) { if (LOG.isDebugEnabled()) { LOG.debug("touch partition {}, bucket {}", partition, bucket); } // partition is a reused BinaryRow // we first check if the tuple exists to minimize copying if (!currentWrittenBuckets.contains(Tuple2.of(partition, bucket))) { currentWrittenBuckets.add(Tuple2.of(partition.copy(), bucket)); } } // ... existing code ...
- 重写了
准备提交 (
prepareCommit
):- 这是核心逻辑所在。
- 首先调用
checkSuccessfulFullCompaction()
检查并清理之前已成功完成的全量合并所对应的状态。 - 将
currentWrittenBuckets
中的记录(当前 checkpoint 写入的 buckets)添加到writtenBuckets
中,并清空currentWrittenBuckets
。 - 判断是否需要触发全量合并:如果
writtenBuckets
不为空(即之前有过写入)并且当前checkpointId
满足isFullCompactedIdentifier(checkpointId, deltaCommits)
条件(即达到了deltaCommits
的间隔),则将waitCompaction
设置为true
。 - 如果
waitCompaction
为true
:- 调用
submitFullCompaction(checkpointId)
来对writtenBuckets
中记录的所有唯一的 bucket 执行全量合并 (write.compact(bucket.f0, bucket.f1, true)
)。 - 将当前的
checkpointId
添加到commitIdentifiersToCheck
,表示这个 checkpoint 触发了全量合并,后续需要检查其是否成功。
- 调用
- 最后调用
super.prepareCommit(waitCompaction, checkpointId)
。
GlobalFullCompactionSinkWrite.java
// ... existing code ... @Override public List<Committable> prepareCommit(boolean waitCompaction, long checkpointId) throws IOException { checkSuccessfulFullCompaction(); // collects what buckets we've modified during this checkpoint interval if (!currentWrittenBuckets.isEmpty()) { writtenBuckets .computeIfAbsent(checkpointId, k -> new HashSet<>()) .addAll(currentWrittenBuckets); currentWrittenBuckets.clear(); } if (LOG.isDebugEnabled()) { for (Map.Entry<Long, Set<Tuple2<BinaryRow, Integer>>> checkpointIdAndBuckets : writtenBuckets.entrySet()) { LOG.debug( "Written buckets for checkpoint #{} are:", checkpointIdAndBuckets.getKey()); for (Tuple2<BinaryRow, Integer> bucket : checkpointIdAndBuckets.getValue()) { LOG.debug(" * partition {}, bucket {}", bucket.f0, bucket.f1); } } } if (!writtenBuckets.isEmpty() && isFullCompactedIdentifier(checkpointId, deltaCommits)) { waitCompaction = true; } if (waitCompaction) { if (LOG.isDebugEnabled()) { LOG.debug("Submit full compaction for checkpoint #{}", checkpointId); } submitFullCompaction(checkpointId); commitIdentifiersToCheck.add(checkpointId); } return super.prepareCommit(waitCompaction, checkpointId); } // ... existing code ...
检查全量合并是否成功 (
checkSuccessfulFullCompaction
和checkSuccessfulFullCompaction(Snapshot snapshot)
):- 遍历最新的快照(Snapshot)。
- 如果找到一个由当前
commitUser
提交的COMPACT
类型的快照,并且其commitIdentifier
存在于commitIdentifiersToCheck
中,那么就认为由submitFullCompaction
触发的全量合并已成功。 - 成功后,会从
writtenBuckets
中移除该commitIdentifier
及之前的所有记录(因为它们已经被合并了),并从commitIdentifiersToCheck
中移除相应的commitIdentifier
。
GlobalFullCompactionSinkWrite.java
// ... existing code ... private void checkSuccessfulFullCompaction() { if (commitIdentifiersToCheck.isEmpty()) { return; } snapshotManager.traversalSnapshotsFromLatestSafely(this::checkSuccessfulFullCompaction); } private boolean checkSuccessfulFullCompaction(Snapshot snapshot) { if (snapshot.commitUser().equals(commitUser) && snapshot.commitKind() == Snapshot.CommitKind.COMPACT) { long commitIdentifier = snapshot.commitIdentifier(); if (commitIdentifiersToCheck.contains(commitIdentifier)) { // We found a full compaction snapshot triggered by `submitFullCompaction` method. // // Because `submitFullCompaction` will compact all buckets in `writtenBuckets`, thus // a successful commit indicates that all previous buckets have been compacted. // // We must make sure that the compact snapshot is triggered by // `submitFullCompaction`, because normal compaction may also trigger full // compaction, but that only compacts a specific bucket, not all buckets recorded in // `writtenBuckets`. if (LOG.isDebugEnabled()) { LOG.debug( "Found full compaction snapshot #{} with identifier {}", snapshot.id(), commitIdentifier); } writtenBuckets.headMap(commitIdentifier, true).clear(); commitIdentifiersToCheck.headSet(commitIdentifier).clear(); return true; } } return false; } // ... existing code ...
提交全量合并 (
submitFullCompaction
):- 遍历
writtenBuckets
中记录的所有 checkpoint 的所有 bucket。 - 对每个唯一的
(partition, bucket)
调用write.compact(bucket.f0, bucket.f1, true)
来执行全量合并。
GlobalFullCompactionSinkWrite.java
// ... existing code ... private void submitFullCompaction(long currentCheckpointId) { if (LOG.isDebugEnabled()) { LOG.debug("Submit full compaction for checkpoint #{}", currentCheckpointId); } Set<Tuple2<BinaryRow, Integer>> compactedBuckets = new HashSet<>(); writtenBuckets.forEach( (checkpointId, buckets) -> { for (Tuple2<BinaryRow, Integer> bucket : buckets) { if (compactedBuckets.contains(bucket)) { continue; } compactedBuckets.add(bucket); try { write.compact(bucket.f0, bucket.f1, true); } catch (Exception e) { throw new RuntimeException(e); } } }); } // ... existing code ...
- 遍历
状态快照 (
snapshotState
):- 将
writtenBuckets
中的数据序列化并存储到 Flink 的状态后端,以便在故障恢复时能够恢复这些信息。
GlobalFullCompactionSinkWrite.java
// ... existing code ... @Override public void snapshotState() throws Exception { super.snapshotState(); List<StoreSinkWriteState.StateValue> writtenBucketList = new ArrayList<>(); for (Map.Entry<Long, Set<Tuple2<BinaryRow, Integer>>> entry : writtenBuckets.entrySet()) { for (Tuple2<BinaryRow, Integer> bucket : entry.getValue()) { writtenBucketList.add( new StoreSinkWriteState.StateValue( bucket.f0, bucket.f1, longToBytes(entry.getKey()))); } } state.put(tableName, WRITTEN_BUCKETS_STATE_NAME, writtenBucketList); } // ... existing code ...
- 将
AsyncLookupSinkWrite
AsyncLookupSinkWrite
继承自 StoreSinkWriteImpl
,主要设计用于处理具有“lookup changelog producer”并且 CoreOptions#LOOKUP_WAIT
设置为 false
的表的写入操作。其核心逻辑围绕着追踪和处理“活跃的 buckets”(active buckets)。
成员变量:
ACTIVE_BUCKETS_STATE_NAME
: 一个静态常量字符串,用作 Flink 状态(state)中存储活跃 bucket 信息的键名。tableName
: 当前操作的表名。
构造函数 AsyncLookupSinkWrite(...)
AsyncLookupSinkWrite.java
// ... existing code ...
public AsyncLookupSinkWrite(
FileStoreTable table,
String commitUser,
StoreSinkWriteState state,
IOManager ioManager,
boolean ignorePreviousFiles,
boolean waitCompaction,
boolean isStreaming,
@Nullable MemorySegmentPool memoryPool,
MetricGroup metricGroup) {
super(
table,
commitUser,
state,
ioManager,
ignorePreviousFiles,
waitCompaction,
isStreaming,
memoryPool,
metricGroup);
this.tableName = table.name();
List<StoreSinkWriteState.StateValue> activeBucketsStateValues =
state.get(tableName, ACTIVE_BUCKETS_STATE_NAME);
if (activeBucketsStateValues != null) {
for (StoreSinkWriteState.StateValue stateValue : activeBucketsStateValues) {
try {
// 对从状态中恢复的每个活跃 bucket 执行一次常规合并
write.compact(stateValue.partition(), stateValue.bucket(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
// ... existing code ...
- 调用父类构造函数:
super(...)
将通用参数传递给StoreSinkWriteImpl
的构造函数,进行基础的初始化。 - 初始化
tableName
:记录当前操作的表名。 - 从状态恢复并处理活跃 Buckets:
state.get(tableName, ACTIVE_BUCKETS_STATE_NAME)
: 尝试从 Flink 的状态后端(state backend)中获取之前保存的活跃 bucket 列表。StoreSinkWriteState
接口负责状态的读取和写入。- 如果
activeBucketsStateValues
不为null
(即状态中存在之前保存的活跃 bucket 信息):- 遍历这些
StateValue
对象。每个StateValue
包含了分区(stateValue.partition()
)和桶号(stateValue.bucket()
)信息。 write.compact(stateValue.partition(), stateValue.bucket(), false)
: 对每一个从状态中恢复出来的活跃 bucket,立即调用其父类(最终是TableWriteImpl
或其子类)的compact
方法。- 关键点:这里的
compact
方法的最后一个参数fullCompaction
被设置为false
。这意味着执行的是一次常规的合并(compaction),而不是全量合并(full compaction)。这可能是为了确保在任务从故障中恢复后,这些之前活跃的 bucket 中的数据能够尽快被合并,以保证数据的一致性或为后续的异步查找操作准备好数据。
- 关键点:这里的
- 遍历这些
snapshotState()
方法
// ... existing code ...
@Override
public void snapshotState() throws Exception {
super.snapshotState(); // 调用父类的状态快照逻辑
List<StoreSinkWriteState.StateValue> activeBucketsList = new ArrayList<>();
// 从底层的 AbstractFileStoreWrite 获取当前活跃的 buckets
for (Map.Entry<BinaryRow, List<Integer>> partitions :
((AbstractFileStoreWrite<?>) write.getWrite()).getActiveBuckets().entrySet()) {
for (int bucket : partitions.getValue()) {
// 将活跃的 bucket 信息(分区和桶号)封装成 StateValue
// 注意:这里的 value 是一个空字节数组,说明对于这个状态,主要关心的是哪些 bucket 是活跃的,
// 而不是与 bucket 关联的具体值。
activeBucketsList.add(
new StoreSinkWriteState.StateValue(
partitions.getKey(), bucket, new byte[0]));
}
}
// 将当前活跃的 bucket 列表存入 Flink 状态
state.put(tableName, ACTIVE_BUCKETS_STATE_NAME, activeBucketsList);
}
}
- 调用父类
snapshotState()
:super.snapshotState()
确保了父类中定义的状态也能被正确地快照。 - 获取当前活跃的 Buckets:
((AbstractFileStoreWrite<?>) write.getWrite()).getActiveBuckets()
: 这一行代码表明,实际的活跃 bucket 信息是由更底层的写入实现(如AbstractFileStoreWrite
或其子类)维护的。write.getWrite()
可能是获取StoreSinkWriteImpl
中封装的实际TableWriteImpl
对象,然后将其转换为AbstractFileStoreWrite
类型来调用getActiveBuckets()
方法。getActiveBuckets()
方法返回一个Map
,其中键是分区 (BinaryRow
),值是该分区下活跃的 bucket 号列表 (List<Integer>
)。
- 构建状态列表:
- 遍历从
getActiveBuckets()
获取到的分区和对应的活跃 bucket 列表。 - 对于每一个活跃的
(partition, bucket)
组合,创建一个新的StoreSinkWriteState.StateValue
对象。 - 值得注意:在创建
StateValue
时,第三个参数value
被设置为new byte[0]
(一个空字节数组)。这表明对于ACTIVE_BUCKETS_STATE_NAME
这个状态,我们主要关心的是哪些分区和桶是活跃的,而不需要存储与它们相关的特定数据值。
- 遍历从
- 将状态存入 Flink:
state.put(tableName, ACTIVE_BUCKETS_STATE_NAME, activeBucketsList)
: 将收集到的当前所有活跃 bucket 的列表存储到 Flink 的状态后端,使用之前定义的ACTIVE_BUCKETS_STATE_NAME
作为键。
总结 AsyncLookupSinkWrite
的工作机制:
- 状态持久化:在 Flink 进行 checkpoint 时,
snapshotState()
方法会将当前被认为是“活跃”的 bucket(由底层的AbstractFileStoreWrite
跟踪)的列表(分区和桶号)保存到 Flink 的状态中。 - 状态恢复与处理:当 Flink 作业从 checkpoint 恢复时(例如,故障重启后),
AsyncLookupSinkWrite
的构造函数会被调用。它会从状态中读取之前保存的“活跃” bucket 列表,并立即对这些 bucket 执行一次常规的合并操作 (compact(..., false)
)。
这种机制可能是为了确保在异步查找的场景下:
- 如果作业失败并恢复,那些在失败前可能正在进行异步更新或查找操作的 bucket 能够被及时合并,以避免数据不一致或确保后续查找操作能获取到最新的、已合并的数据。
- 通过在恢复时主动合并这些 bucket,可以减少后续查找操作的复杂性或提高其效率。
与 GlobalFullCompactionSinkWrite
相比,AsyncLookupSinkWrite
的合并更加有针对性(只针对活跃 bucket)且通常不是全量合并,其主要目的是在特定场景下(异步查找和任务恢复)维护数据状态。
TableWriteImpl
TableWriteImpl<T>
是 Paimon 中负责将数据写入表的具体实现。它扮演着一个承上启下的角色:
- 对上:它实现了
InnerTableWrite
接口 (间接继承自TableWrite
),向上层应用(如 Flink Sink)提供了写入、合并、提交等操作的API。 - 对下:它封装并委托了
FileStoreWrite<T>
对象,后者更接近实际的文件系统操作和数据格式化。
TableWriteImpl
的核心组件和职责
TableWriteImpl.java
// ... (版权和包声明) ...
public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State<T>>> {
private final RowType rowType; // 表的行类型信息
private final FileStoreWrite<T> write; // 核心的底层写入器
private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor; // 用于提取分区、主键和计算桶号
private final RecordExtractor<T> recordExtractor; // 用于从 SinkRecord 中提取待写入的记录 (类型 T)
@Nullable private final RowKindGenerator rowKindGenerator; // 用于生成或获取记录的 RowKind (INSERT, DELETE, etc.)
private final boolean ignoreDelete; // 是否忽略删除操作
private boolean batchCommitted = false; // 标记批处理写入是否已提交
private BucketMode bucketMode; // 表的 Bucket 模式
private final int[] notNullFieldIndex; // 非空字段的索引,用于写入前校验
public TableWriteImpl(
RowType rowType,
FileStoreWrite<T> write,
KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor,
RecordExtractor<T> recordExtractor,
@Nullable RowKindGenerator rowKindGenerator,
boolean ignoreDelete) {
this.rowType = rowType;
this.write = write; // 关键:持有了 FileStoreWrite 实例
this.keyAndBucketExtractor = keyAndBucketExtractor;
this.recordExtractor = recordExtractor;
this.rowKindGenerator = rowKindGenerator;
this.ignoreDelete = ignoreDelete;
// 初始化非空字段索引,用于后续的 null 值检查
List<String> notNullColumnNames =
rowType.getFields().stream()
.filter(field -> !field.type().isNullable())
.map(DataField::name)
.collect(Collectors.toList());
this.notNullFieldIndex = rowType.getFieldIndices(notNullColumnNames);
}
// ... (各种 withXXX 方法,用于配置底层 FileStoreWrite) ...
@Override
public BinaryRow getPartition(InternalRow row) {
keyAndBucketExtractor.setRecord(row);
return keyAndBucketExtractor.partition();
}
@Override
public int getBucket(InternalRow row) {
keyAndBucketExtractor.setRecord(row);
return keyAndBucketExtractor.bucket();
}
@Override
public void write(InternalRow row) throws Exception {
writeAndReturn(row); // 最终调用 writeAndReturn
}
@Override
public void write(InternalRow row, int bucket) throws Exception {
writeAndReturn(row, bucket); // 最终调用 writeAndReturn
}
@Override
public void writeBundle(BinaryRow partition, int bucket, BundleRecords bundle)
throws Exception {
// 如果底层的 write 支持 Bundle 写入,则直接调用
if (write instanceof BundleFileStoreWriter) {
((BundleFileStoreWriter) write).writeBundle(partition, bucket, bundle);
} else {
// 否则,逐条写入
for (InternalRow row : bundle) {
write(row, bucket);
}
}
}
@Nullable
public SinkRecord writeAndReturn(InternalRow row) throws Exception {
checkNullability(row); // 1. 检查非空约束
RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row); // 2. 确定 RowKind
if (ignoreDelete && rowKind.isRetract()) { // 3. 如果配置了忽略删除且当前是删除操作,则跳过
return null;
}
SinkRecord record = toSinkRecord(row); // 4. 将 InternalRow 包装成 SinkRecord (包含分区、桶、主键等信息)
// 5. 调用底层 FileStoreWrite 的 write 方法
// recordExtractor.extract(...) 会根据 SinkRecord 和 RowKind 提取出真正要写入的类型 T 的数据
write.write(record.partition(), record.bucket(), recordExtractor.extract(record, rowKind));
return record;
}
@Nullable
public SinkRecord writeAndReturn(InternalRow row, int bucket) throws Exception {
checkNullability(row);
RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
if (ignoreDelete && rowKind.isRetract()) {
return null;
}
SinkRecord record = toSinkRecord(row, bucket); // 使用指定的 bucket
write.write(record.partition(), bucket, recordExtractor.extract(record, rowKind));
return record;
}
private void checkNullability(InternalRow row) {
// 遍历预先计算好的非空字段索引,检查对应列是否为 null
for (int idx : notNullFieldIndex) {
if (row.isNullAt(idx)) {
String columnName = rowType.getFields().get(idx).name();
throw new RuntimeException(
String.format("Cannot write null to non-null column(%s)", columnName));
}
}
}
private SinkRecord toSinkRecord(InternalRow row) {
keyAndBucketExtractor.setRecord(row); // 设置当前行,以便提取分区、桶、主键
return new SinkRecord(
keyAndBucketExtractor.partition(),
keyAndBucketExtractor.bucket(),
keyAndBucketExtractor.trimmedPrimaryKey(),
row);
}
private SinkRecord toSinkRecord(InternalRow row, int bucket) {
keyAndBucketExtractor.setRecord(row);
return new SinkRecord(
keyAndBucketExtractor.partition(),
bucket, // 直接使用传入的 bucket
keyAndBucketExtractor.trimmedPrimaryKey(),
row);
}
// ... (toLogRecord 方法) ...
@Override
public void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception {
// 合并操作直接委托给底层的 FileStoreWrite
write.compact(partition, bucket, fullCompaction);
}
// ... (withMetricRegistry, notifyNewFiles 方法) ...
@Override
public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier)
throws Exception {
// 准备提交的操作也直接委托给底层的 FileStoreWrite
return write.prepareCommit(waitCompaction, commitIdentifier);
}
// ... (prepareCommit (for batch), close, checkpoint, restore 方法,大多是委托给 write) ...
@VisibleForTesting
public FileStoreWrite<T> getWrite() {
return write; // 允许测试时获取底层的 FileStoreWrite
}
/** Extractor to extract {@link T} from the {@link SinkRecord}. */
public interface RecordExtractor<T> {
T extract(SinkRecord record, RowKind rowKind);
}
}
核心成员变量解释:
rowType
: 定义了表的 schema,包括字段名、数据类型等。write (FileStoreWrite<T>)
: 这是与底层存储交互的关键。FileStoreWrite
是一个更底层的接口/抽象类,它封装了针对特定文件存储(如 Append-Only, Primary-Key)的写入逻辑,包括如何组织数据到文件、如何处理排序、如何执行合并等。TableWriteImpl
将大部分实际的写入和合并工作都委托给了这个对象。keyAndBucketExtractor
: 负责从输入的InternalRow
中提取分区信息、主键信息,并根据这些信息计算出数据应该写入哪个桶(bucket)。这是数据分发和定位的基础。recordExtractor
: 一个泛型接口,其实现负责将SinkRecord
(包含了原始行数据、分区、桶、主键等元数据)和RowKind
转换为底层FileStoreWrite
期望的记录类型T
。例如,对于 Append-Only 表,T
可能就是InternalRow
;对于 Primary-Key 表,T
可能是包含RowKind
的更复杂结构。rowKindGenerator
: 用于确定每条记录的类型(INSERT
,UPDATE_BEFORE
,UPDATE_AFTER
,DELETE
)。这对于支持 CDC(Change Data Capture)和有主键的表非常重要。ignoreDelete
: 一个布尔标志,指示是否在写入过程中忽略DELETE
或UPDATE_BEFORE
这样的撤回操作。notNullFieldIndex
: 预先计算好的非空字段的索引数组,用于在写入数据前快速检查非空约束。
核心方法分析:
构造函数
TableWriteImpl(...)
:- 接收所有必要的组件作为参数,最重要的是
FileStoreWrite
实例。 - 初始化非空字段检查相关的
notNullFieldIndex
。
- 接收所有必要的组件作为参数,最重要的是
write(InternalRow row)
/write(InternalRow row, int bucket)
:- 这是最主要的写入入口。
- 非空检查:
checkNullability(row)
确保写入的数据符合表的非空约束。 - RowKind 确定:
RowKindGenerator.getRowKind(...)
获取当前行的RowKind
。 - 忽略删除: 如果
ignoreDelete
为true
并且当前行是撤回类型的(如DELETE
),则直接返回,不进行写入。 - 创建
SinkRecord
:toSinkRecord(...)
方法利用keyAndBucketExtractor
将InternalRow
包装成SinkRecord
。SinkRecord
是一个中间对象,它携带了路由和写入所需的所有信息(分区、桶、主键、原始数据)。 - 委托写入: 最关键的一步是
write.write(record.partition(), record.bucket(), recordExtractor.extract(record, rowKind))
。record.partition()
和record.bucket()
告诉底层的FileStoreWrite
数据应该写入哪个具体的分区和桶。recordExtractor.extract(record, rowKind)
将SinkRecord
和RowKind
转换为FileStoreWrite
能够处理的实际数据类型T
。- 最终,
FileStoreWrite
的write
方法负责将这条处理过的数据写入到对应的文件或内存缓冲区中。
compact(BinaryRow partition, int bucket, boolean fullCompaction)
:- 合并操作。它直接将请求委托给
this.write.compact(...)
。这意味着实际的合并逻辑(如何读取旧文件、如何合并数据、如何写新文件)完全由底层的FileStoreWrite
实现。
- 合并操作。它直接将请求委托给
prepareCommit(boolean waitCompaction, long commitIdentifier)
:- 准备提交。当上层(如 Flink Sink)触发 checkpoint 时,会调用此方法。
- 它也直接委托给
this.write.prepareCommit(...)
。底层的FileStoreWrite
会完成诸如刷写内存缓冲区到磁盘文件、生成CommitMessage
(包含了新生成的数据文件列表、删除的文件列表等元数据)等操作。
配置方法 (如
withIOManager
,withMemoryPool
,withIgnorePreviousFiles
等):- 这些方法通常也是直接调用底层
FileStoreWrite
对应的方法,用于配置写入行为的各种参数,如是否忽略历史文件、是否流式模式、使用的内存池等。
- 这些方法通常也是直接调用底层
FileStoreWrite
TableWriteImpl
本身并不直接操作磁盘或管理文件。它依赖于其持有的 FileStoreWrite<T>
实例来完成这些任务。
FileStoreWrite<T>
是 Paimon Core 中定义的一个核心接口,它封装了向文件存储(FileStore)写入数据、执行合并(compaction)以及准备提交(prepareCommit)等操作。
FileStoreWrite.java
public interface FileStoreWrite<T> extends Restorable<List<FileStoreWrite.State<T>>> {
// --- 配置方法 ---
FileStoreWrite<T> withIOManager(IOManager ioManager);
default FileStoreWrite<T> withMemoryPool(MemorySegmentPool memoryPool) { // ... }
FileStoreWrite<T> withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory);
void withIgnorePreviousFiles(boolean ignorePreviousFiles);
void withIgnoreNumBucketCheck(boolean ignoreNumBucketCheck);
void withExecutionMode(boolean isStreamingMode); // 流模式或批模式
FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry);
void withCompactExecutor(ExecutorService compactExecutor); // 用于异步合并的执行器
void withInsertOnly(boolean insertOnly); // 标记后续写入是否为仅插入
// ---核心操作方法 ---
/**
* 将数据写入指定的分区和桶。
*/
void write(BinaryRow partition, int bucket, T data) throws Exception;
/**
* 对指定分区和桶的数据进行合并。
* fullCompaction 为 true 表示全量合并。
*/
void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception;
/**
* 通知有新的数据文件在指定的快照、分区和桶中创建。
* 通常用于独立合并作业通知写入作业新文件的产生。
*/
void notifyNewFiles(long snapshotId, BinaryRow partition, int bucket, List<DataFileMeta> files);
/**
* 准备提交。这是关键的交互点。
* 返回 CommitMessage 列表,其中包含了本次提交涉及的文件变更信息。
* commitIdentifier 是本次提交的唯一标识。
*/
List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier)
throws Exception;
/**
* 关闭写入器。
*/
void close() throws Exception;
/**
* FileStoreWrite 的可恢复状态。
* 用于在作业失败重启后恢复写入状态。
*/
class State<T> {
protected final BinaryRow partition;
protected final int bucket;
protected final int totalBuckets; // 总桶数
protected final long baseSnapshotId; // 基础快照ID
protected final long lastModifiedCommitIdentifier; // 最后修改此 bucket 的提交标识
protected final List<DataFileMeta> dataFiles; // 当前 bucket 中活跃的数据文件
protected final long maxSequenceNumber; // 当前 bucket 中的最大序列号 (用于主键表)
@Nullable protected final IndexMaintainer<T> indexMaintainer; // 索引维护器 (用于主键表)
@Nullable protected final DeletionVectorsMaintainer deletionVectorsMaintainer; // 删除向量维护器
protected final CommitIncrement commitIncrement; // 提交的增量信息 (新文件、合并文件等)
// ... 构造函数和 toString ...
}
}
关键点:
- 泛型
<T>
: 代表写入记录的实际数据类型。 Restorable
: 表明FileStoreWrite
的状态是可序列化和恢复的,这对于 Flink 等流处理引擎的容错至关重要。其状态由内部类State<T>
定义。- 配置方法 (
withXXX
): 提供了一种链式调用的方式来配置写入器的各种行为,如内存管理、执行模式、合并策略等。 - 核心操作:
write()
: 负责将单条数据写入到正确的分区和桶。compact()
: 触发对特定分区和桶的合并操作。prepareCommit()
: 在 Flink Checkpoint 触发时被调用,用于将内存中的数据刷写到磁盘,生成新的数据文件,并收集这些变更信息形成CommitMessage
。
State<T>
: 定义了需要持久化的状态信息,确保在故障恢复后,FileStoreWrite
可以从正确的状态继续工作,例如,知道哪些文件是当前活跃的,上一次提交到哪里等。
FileStoreWrite
是一个接口,Paimon 根据表的类型(主要是 CoreOptions.STORE_FORMAT
定义的,例如 Append-Only 或 Primary-Key)和配置来选择具体的实现类。这些实现类通常继承自 org.apache.paimon.operation.AbstractFileStoreWrite
。
主要的实现类有:
org.apache.paimon.operation.AppendOnlyFileStoreWrite
:- 用于仅追加(Append-Only)表。
- 写入操作相对简单,直接将新数据追加到文件中。
- 合并操作主要是将小文件合并成大文件。
org.apache.paimon.operation.KeyValueFileStoreWrite
:- 用于有主键(Primary-Key)的表,支持更新和删除 (UPSERT 和 DELETE)。
- 写入和合并逻辑更复杂,需要处理主键的排序、去重、以及应用
RowKind
(INSERT, UPDATE, DELETE)。 - 通常会使用 LSM树(Log-Structured Merge-Tree)类似的结构来组织数据文件(例如,分为不同的 level 或 sorted run),合并时会将这些层级的数据进行归并。
- 可能涉及到索引(
IndexMaintainer
)和删除向量(DeletionVectorsMaintainer
)的管理。
数据和 Manifest 的处理
数据处理:
FileStoreWrite
的具体实现类(如AppendOnlyFileStoreWrite
或KeyValueFileStoreWrite
)直接负责数据的处理和写入。- 当调用
write(partition, bucket, data)
时,实现类会:- 找到或创建对应
(partition, bucket)
的内部RecordWriter
。 RecordWriter
负责将数据序列化(例如,转换为 Parquet 或 ORC 格式)并写入内存缓冲区。- 当缓冲区满、或
prepareCommit
被调用时,缓冲区的数据会被刷写到磁盘,形成新的数据文件(DataFileMeta)。
- 找到或创建对应
- 合并(
compact
)时,会读取已有的数据文件,进行合并处理,然后写出新的数据文件,并标记旧文件为待删除。
Manifest 处理:
FileStoreWrite
不直接生成或修改 Manifest 文件本身。- 它的核心职责是在
prepareCommit()
方法中生成List<CommitMessage>
。 - 每个
CommitMessage
对象封装了一个特定(partition, bucket)
在本次提交中的文件变更信息,主要包括:newFiles()
: 本次新写入的数据文件列表 (DataFileMeta
)。compactBefore()
: 本次合并前被读取的旧数据文件列表。compactAfter()
: 本次合并后新生成的数据文件列表。logOffsets()
: (如果适用)changelog 的偏移量信息。
- 这些
CommitMessage
是构建 Paimon 表元数据(Manifests 和 Snapshot)的原材料。
CommitMessage
如何传递到 FlinkSink 并最终更新 Manifest?
这个过程是一个逐层传递和转换的过程:
FileStoreWrite.prepareCommit()
->List<CommitMessage>
:- 当 Flink 作业进行 Checkpoint 时,Flink Sink 中的
StoreSinkWriteOperator
会调用其持有的StoreSinkWrite
实例的prepareCommit
方法。 StoreSinkWrite
(通常是StoreSinkWriteImpl
) 会进一步调用TableWriteImpl.prepareCommit()
。TableWriteImpl.prepareCommit()
最终会调用其持有的FileStoreWrite
实例的prepareCommit(waitCompaction, commitIdentifier)
方法。FileStoreWrite
执行刷盘等操作,并返回一个List<CommitMessage>
,其中包含了所有发生变更的(partition, bucket)
的文件增量信息。
- 当 Flink 作业进行 Checkpoint 时,Flink Sink 中的
List<CommitMessage>
->Committable
:StoreSinkWriteImpl
(或其父类) 接收到List<CommitMessage>
后,会将这些信息聚合成一个或多个Committable
对象。在 Paimon Flink Sink 中,这个Committable
通常是org.apache.paimon.flink.sink.Committable
类型,其内部可能包装了org.apache.paimon.manifest.ManifestCommittable
。ManifestCommittable
内部就包含了从CommitMessage
转换来的文件变更列表(ManifestEntry
,标记了文件的类型是 ADD 还是 DELETE)、commit identifier、log offsets 等。
Committable
在 Flink 算子间传递:StoreSinkWriteOperator
(作为 Flink 的Writer
角色) 将生成的Committable
对象发送给下游。
CommitterOperator
处理Committable
:- Flink Sink 拓扑中通常会有一个全局单并发的
CommitterOperator
(例如StoreCommitterOperator
)。 - 这个
CommitterOperator
接收上游WriterOperator
发送过来的Committable
对象。 - 在其
notifyCheckpointComplete()
或commit()
方法(取决于 Flink 版本和提交流程)中,它会处理这些Committable
。
- Flink Sink 拓扑中通常会有一个全局单并发的
FileStoreCommit.commit()
更新 Manifest 和 Snapshot:CommitterOperator
内部会通过table.newCommit(commitUser)
获取一个FileStoreCommit
的实例 (通常是FileStoreCommitImpl
)。- 然后,它会调用
fileStoreCommit.commit(manifestCommittable, properties)
方法。 FileStoreCommitImpl
在其commit()
方法中:- 根据
ManifestCommittable
中的ManifestEntry
列表(这些ManifestEntry
源自最初的CommitMessage
中的DataFileMeta
),生成新的 Manifest 文件或更新已有的 Manifest 文件。Manifest 文件记录了表中所有有效数据文件的元数据。 - 创建一个新的 Snapshot 文件,该 Snapshot 文件会指向新生成的 Manifest List(Manifest List 指向具体的 Manifest 文件)。Snapshot 代表了表在某个时间点的一致状态。
- 执行清理操作,如删除旧的、不再需要的 Snapshot 和 Manifest 文件,以及被合并掉的旧数据文件。
- 根据
总结传递路径:
FileStoreWrite.prepareCommit()
-> List<CommitMessage>
(包含文件级变更) -> (由 StoreSinkWriteImpl
包装) Committable
(通常内含 ManifestCommittable
) -> (Flink WriterOperator
发送) -> (Flink CommitterOperator
接收) -> FileStoreCommit.commit(ManifestCommittable, ...)
-> 更新 Manifest 文件 -> 创建新的 Snapshot 文件
因此,FileStoreWrite
是文件操作的执行者和原始变更信息的产生者,而 FileStoreCommit
则是利用这些信息来维护表整体元数据一致性的协调者。Flink Sink 负责编排这个流程,确保在分布式环境下数据写入和元数据提交的原子性和一致性。
MemoryFileStoreWrite
MemoryFileStoreWrite<T>
是一个抽象类,它继承自 AbstractFileStoreWrite<T>
,并为那些需要在内存中进行写缓冲和排序操作的 FileStoreWrite
实现提供了基础功能。
MemoryFileStoreWrite.java
// ... (版权和包声明) ...
public abstract class MemoryFileStoreWrite<T> extends AbstractFileStoreWrite<T> {
private static final Logger LOG = LoggerFactory.getLogger(MemoryFileStoreWrite.class);
protected final CoreOptions options; // 核心配置项
protected final CacheManager cacheManager; // 缓存管理器,主要用于 Lookup
private MemoryPoolFactory writeBufferPool; // 写缓冲内存池工厂
private WriterBufferMetric writerBufferMetric; // 写缓冲相关的指标
public MemoryFileStoreWrite(
SnapshotManager snapshotManager,
FileStoreScan scan,
CoreOptions options,
RowType partitionType,
@Nullable IndexMaintainer.Factory<T> indexFactory,
@Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
String tableName) {
super(
snapshotManager,
scan,
indexFactory,
dvMaintainerFactory,
tableName,
options,
partitionType);
this.options = options;
this.cacheManager =
new CacheManager( // 初始化缓存管理器
options.lookupCacheMaxMemory(), options.lookupCacheHighPrioPoolRatio());
}
@Override
public FileStoreWrite<T> withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) {
// 设置写缓冲内存池,并添加内存所有者
this.writeBufferPool = memoryPoolFactory.addOwners(this::memoryOwners);
return this;
}
@Override
public FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry) {
super.withMetricRegistry(metricRegistry);
// 注册写缓冲相关的指标
if (metricRegistry != null && options.writerBufferSpillable()) {
writerBufferMetric = new WriterBufferMetric(metricRegistry, tableName);
}
return this;
}
@Override
protected void write(
BinaryRow partition, int bucket, T data, @Nullable IndexMaintainer<T> indexMaintainer)
throws Exception {
// 获取或创建对应 partition 和 bucket 的 WriterContainer
WriterContainer<T> container = getWriterContainer(partition, bucket, indexMaintainer);
RecordWriter<T> writer = container.writer;
// 将数据写入 RecordWriter (通常是 MergeTreeWriter 或 AppendOnlyWriter)
writer.write(data);
}
@Override
protected void compact(
BinaryRow partition,
int bucket,
boolean fullCompaction,
@Nullable IndexMaintainer<T> indexMaintainer)
throws Exception {
// 获取或创建对应 partition 和 bucket 的 WriterContainer
WriterContainer<T> container = getWriterContainer(partition, bucket, indexMaintainer);
RecordWriter<T> writer = container.writer;
// 调用 RecordWriter 的 compact 方法
writer.compact(fullCompaction);
}
@Override
protected List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier)
throws Exception {
List<CommitMessage> result = new ArrayList<>();
// 遍历所有活动的 WriterContainer
for (Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> entry : writers.entrySet()) {
for (Map.Entry<Integer, WriterContainer<T>> bucketEntry :
entry.getValue().entrySet()) {
WriterContainer<T> writerContainer = bucketEntry.getValue();
RecordWriter<T> writer = writerContainer.writer;
// 调用 RecordWriter 的 prepareCommit 方法,获取该 writer 的 CommitIncrement
CommitIncrement increment = writer.prepareCommit(waitCompaction);
// 将 CommitIncrement 封装成 CommitMessage
result.add(
new CommitMessageImpl(
entry.getKey(), // partition
bucketEntry.getKey(), // bucket
increment,
writerContainer.newFilesIncrement(),
writerContainer.indexIncrement(),
writerContainer.deletionVectorsIncrement()));
// 清理已提交的增量信息
writerContainer.clearNewFilesIncrement();
writerContainer.clearIndexIncrement();
writerContainer.clearDeletionVectorsIncrement();
}
}
// 清理不再需要的 writer
clearFinishedWriters(commitIdentifier);
return result;
}
// ... (其他方法,如 memoryOwners, createWriterContainer, createWriter, createWriterCleaner 等) ...
/**
* 创建实际的 {@link RecordWriter}。子类必须实现此方法。
*
* @param partition 分区
* @param bucket 桶
* @param restoreFiles 用于恢复的旧数据文件
* @param restoredMaxSeqNumber 恢复的最大序列号
* @param restoreIncrement 恢复的提交增量
* @param compactExecutor 合并执行器
* @param dvMaintainer 删除向量维护器
* @return 创建的 RecordWriter
*/
protected abstract RecordWriter<T> createWriter(
BinaryRow partition,
int bucket,
List<DataFileMeta> restoreFiles,
long restoredMaxSeqNumber,
@Nullable CommitIncrement restoreIncrement,
ExecutorService compactExecutor,
@Nullable DeletionVectorsMaintainer dvMaintainer);
// ...
}
核心职责与特点【包含父类成员】:
- 内存管理: 提供了对写缓冲内存池 (
writeBufferPool
) 的管理。子类写入数据时,通常会先写入内存缓冲区,达到一定条件后再刷盘。 WriterContainer
管理: 内部通过writers
(一个Map<BinaryRow, Map<Integer, WriterContainer<T>>>
) 来管理每个(partition, bucket)
对应的WriterContainer
。WriterContainer
封装了实际的RecordWriter
以及相关的状态信息(如新文件增量、索引增量等)。- 抽象
createWriter
方法: 这是一个核心的抽象方法,子类必须实现它来创建具体的RecordWriter
实例。这个RecordWriter
才是真正负责数据写入、排序、合并等逻辑的组件。例如,KeyValueFileStoreWrite
会创建MergeTreeWriter
。 - 通用的
write
,compact
,prepareCommit
逻辑: 这些方法实现了通用的流程:write
和compact
: 获取或创建WriterContainer
,然后调用其内部RecordWriter
的相应方法。prepareCommit
: 遍历所有活动的WriterContainer
,调用其RecordWriter
的prepareCommit
方法收集CommitIncrement
,并将其包装成CommitMessage
。
- 缓存管理 (
CacheManager
): 主要用于支持 Lookup 操作时缓存相关数据,以提高性能。
MemoryFileStoreWrite
本身并不直接决定表的类型(有主键或仅追加),它更像是一个框架,为需要内存缓冲和管理的写入场景提供支持。
KeyValueFileStoreWrite
KeyValueFileStoreWrite
是专门为有主键(Primary-Key)表设计的 FileStoreWrite
实现。它继承自 MemoryFileStoreWrite<KeyValue>
,这里的泛型参数 KeyValue
表明它处理的是键值对数据。
KeyValueFileStoreWrite.java
// ... (版权和包声明) ...
public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
private static final Logger LOG = LoggerFactory.getLogger(KeyValueFileStoreWrite.class);
// --- 成员变量,很多与 KeyValue 表的特性相关 ---
private final KeyValueFileReaderFactory.Builder readerFactoryBuilder; // KeyValue 文件读取器工厂构造器
private final KeyValueFileWriterFactory.Builder writerFactoryBuilder; // KeyValue 文件写入器工厂构造器
private final Supplier<Comparator<InternalRow>> keyComparatorSupplier; // 主键比较器
private final Supplier<FieldsComparator> udsComparatorSupplier; // 用户定义序列号比较器
private final Supplier<RecordEqualiser> logDedupEqualSupplier; // Changelog 去重比较器
private final MergeFunctionFactory<KeyValue> mfFactory; // 合并函数工厂 (定义了数据如何合并,如 DEDUPLICATE, PARTIAL_UPDATE 等)
private final CoreOptions options;
private final FileIO fileIO;
private final RowType keyType; // 主键类型
private final RowType valueType; // 值类型
// ... 其他成员 ...
@Nullable private Cache<String, LookupFile> lookupFileCache; // Lookup 文件缓存
public KeyValueFileStoreWrite(
// ... 构造函数参数,非常多,用于初始化各种工厂和配置 ...
FileIO fileIO,
SchemaManager schemaManager,
TableSchema schema,
String commitUser,
RowType partitionType,
RowType keyType,
RowType valueType,
Supplier<Comparator<InternalRow>> keyComparatorSupplier,
Supplier<FieldsComparator> udsComparatorSupplier,
Supplier<RecordEqualiser> logDedupEqualSupplier,
MergeFunctionFactory<KeyValue> mfFactory,
// ...
CoreOptions options,
// ...
) {
super(/* ... 调用父类构造函数 ... */);
// ... 初始化各种成员变量 ...
this.keyType = keyType;
this.valueType = valueType;
this.mfFactory = mfFactory;
this.options = options;
// ...
}
@Override
protected MergeTreeWriter createWriter( // 实现了父类的抽象方法
BinaryRow partition,
int bucket,
List<DataFileMeta> restoreFiles,
long restoredMaxSeqNumber,
@Nullable CommitIncrement restoreIncrement,
ExecutorService compactExecutor,
@Nullable DeletionVectorsMaintainer dvMaintainer) {
// ... (日志) ...
// 1. 构建 KeyValueFileWriterFactory (用于创建实际写入文件的 writer)
KeyValueFileWriterFactory writerFactory =
writerFactoryBuilder.build(partition, bucket, options);
// 2. 获取主键比较器
Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
// 3. 初始化 Levels (管理 LSM-Tree 的层级)
Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels());
// 4. 创建合并策略 (CompactStrategy)
CompactStrategy compactStrategy = createCompactStrategy(options);
// 5. 创建合并管理器 (CompactManager)
CompactManager compactManager =
createCompactManager(
partition, bucket, compactStrategy, compactExecutor, levels, dvMaintainer);
// 6. 创建并返回 MergeTreeWriter
return new MergeTreeWriter(
bufferSpillable(), // 是否允许写缓冲溢写到磁盘
options.writeBufferSpillDiskSize(),
options.localSortMaxNumFileHandles(),
options.spillCompressOptions(),
ioManager, // 从父类继承的 IOManager
compactManager, // 合并管理器
restoredMaxSeqNumber, // 恢复的最大序列号
keyComparator, // 主键比较器
mfFactory.create(), // 创建合并函数 (MergeFunction)
writerFactory, // 文件写入器工厂
options.commitForceCompact(), // 是否在提交时强制合并
options.changelogProducer(), // Changelog 生成策略
restoreIncrement, // 恢复的提交增量
UserDefinedSeqComparator.create(valueType, options)); // 用户定义序列号比较器
}
// --- createCompactStrategy, createCompactManager, createRewriter ---
// 这些方法根据 CoreOptions 中的配置 (如 merge-engine, changelog-producer, lookup 等)
// 创建具体的合并策略 (UniversalCompaction, ForceUpLevel0Compaction)
// 和合并重写器 (MergeTreeCompactRewriter, FullChangelogMergeTreeCompactRewriter, LookupMergeTreeCompactRewriter)
private CompactStrategy createCompactStrategy(CoreOptions options) {
// ... 根据 options.needLookup() 和 options.lookupCompact() 等选择策略 ...
// 例如:UniversalCompaction, ForceUpLevel0Compaction
}
private CompactManager createCompactManager(
// ...
) {
if (options.writeOnly()) { // 如果是只写模式,则使用 NoopCompactManager
return new NoopCompactManager();
} else {
// ... 创建 CompactRewriter ...
CompactRewriter rewriter = createRewriter(/* ... */);
// ... 创建 MergeTreeCompactManager ...
return new MergeTreeCompactManager(/* ... */);
}
}
private MergeTreeCompactRewriter createRewriter(
// ...
) {
// ... 根据 changelogProducer 和 lookupStrategy 等配置选择不同的 Rewriter ...
// 例如:FullChangelogMergeTreeCompactRewriter, LookupMergeTreeCompactRewriter, MergeTreeCompactRewriter
}
// --- createLookupLevels ---
// 如果配置了 Lookup (例如 'lookup.cache-max-memory' > 0), 则创建 LookupLevels 用于加速点查
private <V> LookupLevels<V> createLookupLevels(/* ... */) {
// ... 初始化 LookupStoreFactory, LookupFile 缓存等 ...
return new LookupLevels<>(/* ... */);
}
// ... (其他方法,如 bufferSpillable, createWriterCleanChecker, close) ...
}
核心职责与特点:
- 继承
MemoryFileStoreWrite<KeyValue>
: 复用了内存管理、WriterContainer
管理等基础功能。 - 专门处理
KeyValue
数据: 其泛型参数固定为KeyValue
。 - 实现
createWriter
方法: 这是最关键的部分。它创建并返回一个MergeTreeWriter
实例。MergeTreeWriter
: 这是 Paimon 中用于处理有主键表写入的核心组件。它内部通常采用类似 LSM-Tree (Log-Structured Merge-Tree) 的结构来组织数据。数据首先写入内存中的 MemTable (通常是排序的),当 MemTable 满了之后会刷写到磁盘形成 Sorted Run (L0 层文件)。后台会有合并任务将不同层级的文件进行合并,以保证查询效率和数据一致性(应用更新和删除)。
- 复杂的合并策略和管理 (
CompactStrategy
,CompactManager
,CompactRewriter
):KeyValueFileStoreWrite
包含大量逻辑来根据CoreOptions
中的配置(如merge-engine
,changelog-producer
,compaction.strategy
等)创建合适的合并策略和合并执行组件。- 例如,它可以配置为
DEDUPLICATE
(去重,保留最新的记录)、PARTIAL_UPDATE
(部分列更新)、AGGREGATE
(聚合) 等不同的合并引擎。 - 它还支持不同的 Changelog 生成方式(如
NONE
,INPUT
,LOOKUP
,FULL_COMPACTION
)。 - 它支持多种合并策略,如
UniversalCompaction
。
- 支持 Lookup: 如果配置了 Lookup 相关选项,它可以创建
LookupLevels
来加速基于主键的点查。 - 依赖各种工厂: 如
KeyValueFileReaderFactory
,KeyValueFileWriterFactory
,MergeFunctionFactory
,这些工厂负责创建读写文件和执行合并操作的具体组件。
AbstractFileStoreWrite<T>
类分析
AbstractFileStoreWrite<T>
是 FileStoreWrite<T>
接口的一个核心抽象实现,它为各种具体的文件存储写入操作(如 Append-Only、Primary-Key)提供了通用的框架和基础功能。许多在 FileStoreWrite
接口中定义的方法都在这个抽象类中有了骨架实现或者部分实现。
AbstractFileStoreWrite.java
// ... (版权和包声明) ...
public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreWrite.class);
// --- 核心成员变量 ---
protected final SnapshotManager snapshotManager; // 快照管理器,用于读取快照信息
private final FileStoreScan scan; // 文件存储扫描器,用于读取现有文件信息
private final int writerNumberMax; // 允许的最大 writer 数量,超出可能触发溢写
@Nullable private final IndexMaintainer.Factory<T> indexFactory; // 索引维护器工厂 (可选)
@Nullable private final DeletionVectorsMaintainer.Factory dvMaintainerFactory; // 删除向量维护器工厂 (可选)
private final int numBuckets; // 表的总桶数
private final RowType partitionType; // 分区类型
@Nullable protected IOManager ioManager; // IO 管理器,用于溢写等
// 关键数据结构:存储每个 (partition, bucket) 对应的 WriterContainer
// WriterContainer 内部封装了实际的 RecordWriter
protected final Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers;
private ExecutorService lazyCompactExecutor; // 用于异步合并的执行器
private boolean closeCompactExecutorWhenLeaving = true; // 是否在关闭时关闭合并执行器
private boolean ignorePreviousFiles = false; // 是否忽略历史文件 (用于 overwrite 等场景)
// ... 其他配置和状态变量 ...
protected final String tableName; // 表名
private boolean isInsertOnly; // 是否为仅插入模式
// --- 构造函数 ---
protected AbstractFileStoreWrite(
SnapshotManager snapshotManager,
FileStoreScan scan,
@Nullable IndexMaintainer.Factory<T> indexFactory,
@Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
String tableName,
CoreOptions options,
RowType partitionType) {
this.snapshotManager = snapshotManager;
this.scan = scan;
// ... 初始化其他成员 ...
this.numBuckets = options.bucket();
this.partitionType = partitionType;
this.writers = new HashMap<>(); // 初始化 writers Map
this.tableName = tableName;
this.writerNumberMax = options.writeMaxWritersToSpill();
// ...
}
// --- 配置方法 (withXXX) ---
// 大部分是设置成员变量的值
@Override
public FileStoreWrite<T> withIOManager(IOManager ioManager) {
this.ioManager = ioManager;
return this;
}
// ... 其他 withXXX 方法 ...
// --- 核心写入逻辑 ---
@Override
public void write(BinaryRow partition, int bucket, T data) throws Exception {
// 1. 获取或创建对应 partition 和 bucket 的 WriterContainer
WriterContainer<T> container = getWriterWrapper(partition, bucket);
// 2. 调用 WriterContainer 内部 RecordWriter 的 write 方法
container.writer.write(data);
// 3. 如果有索引维护器,通知新纪录
if (container.indexMaintainer != null) {
container.indexMaintainer.notifyNewRecord(data);
}
}
// --- 核心合并逻辑 ---
@Override
public void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception {
// 获取 WriterContainer 并调用其内部 RecordWriter 的 compact 方法
getWriterWrapper(partition, bucket).writer.compact(fullCompaction);
}
// --- 通知新文件 (通常用于独立合并作业) ---
@Override
public void notifyNewFiles(
long snapshotId, BinaryRow partition, int bucket, List<DataFileMeta> files) {
WriterContainer<T> writerContainer = getWriterWrapper(partition, bucket);
// ... (日志) ...
// 如果新文件的快照 ID 大于当前 writerContainer 的基础快照 ID,则将新文件加入 RecordWriter
if (snapshotId > writerContainer.baseSnapshotId) {
writerContainer.writer.addNewFiles(files);
}
}
// --- 准备提交 (核心方法) ---
@Override
public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier)
throws Exception {
// 1. 创建 writer 清理检查器 (用于决定哪些不活跃的 writer 可以被关闭和清理)
Function<WriterContainer<T>, Boolean> writerCleanChecker = createWriterCleanChecker();
List<CommitMessage> result = new ArrayList<>();
Iterator<Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>>> partIter =
writers.entrySet().iterator();
while (partIter.hasNext()) {
// ... (遍历 partition) ...
Iterator<Map.Entry<Integer, WriterContainer<T>>> bucketIter =
partEntry.getValue().entrySet().iterator();
while (bucketIter.hasNext()) {
// ... (遍历 bucket) ...
WriterContainer<T> writerContainer = entry.getValue();
// 2. 调用 RecordWriter 的 prepareCommit,获取文件变更增量 (CommitIncrement)
CommitIncrement increment = writerContainer.writer.prepareCommit(waitCompaction);
List<IndexFileMeta> newIndexFiles = new ArrayList<>();
// 3. 如果有索引维护器,准备索引文件的提交
if (writerContainer.indexMaintainer != null) {
newIndexFiles.addAll(writerContainer.indexMaintainer.prepareCommit());
}
// ... (处理删除向量文件) ...
// 4. 构建 CommitMessage
CommitMessageImpl committable =
new CommitMessageImpl(
partition,
bucket,
writerContainer.totalBuckets,
increment.newFilesIncrement(), // 新数据文件
increment.compactIncrement(), // 合并相关文件
new IndexIncrement(newIndexFiles)); // 新索引文件
result.add(committable);
// 5. 清理逻辑:如果 CommitMessage 为空且满足清理条件,则关闭并移除该 writer
if (committable.isEmpty()) {
if (writerCleanChecker.apply(writerContainer)) {
// ... (日志和关闭 writer) ...
bucketIter.remove();
}
} else {
// 否则,更新 writerContainer 的最后修改提交标识
writerContainer.lastModifiedCommitIdentifier = commitIdentifier;
}
}
// ... (如果 partition 下所有 bucket 的 writer 都被移除,则移除该 partition) ...
}
return result;
}
// --- 抽象方法:创建 Writer 清理检查器 ---
// 子类需要根据具体场景(如是否感知冲突)实现此方法
protected abstract Function<WriterContainer<T>, Boolean> createWriterCleanChecker();
// --- 关闭操作 ---
@Override
public void close() throws Exception {
// 关闭所有 WriterContainer 中的 RecordWriter
for (Map<Integer, WriterContainer<T>> bucketWriters : writers.values()) {
for (WriterContainer<T> writerContainer : bucketWriters.values()) {
writerContainer.writer.close();
}
}
writers.clear();
// 关闭合并执行器 (如果需要)
if (lazyCompactExecutor != null && closeCompactExecutorWhenLeaving) {
lazyCompactExecutor.shutdownNow();
}
// ...
}
// --- Checkpoint (获取可恢复状态) ---
@Override
public List<State<T>> checkpoint() {
List<State<T>> result = new ArrayList<>();
for (Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> partitionEntry :
writers.entrySet()) {
// ... (遍历 partition 和 bucket) ...
WriterContainer<T> writerContainer = bucketEntry.getValue();
CommitIncrement increment;
try {
// 获取 RecordWriter 的当前提交增量 (不等待合并完成)
increment = writerContainer.writer.prepareCommit(false);
} catch (Exception e) {
throw new RuntimeException(/* ... */);
}
// 获取 RecordWriter 管理的所有数据文件
Collection<DataFileMeta> dataFiles = writerContainer.writer.dataFiles();
// 创建并添加 State 对象
result.add(
new State<>(
partition,
bucket,
writerContainer.totalBuckets,
writerContainer.baseSnapshotId,
writerContainer.lastModifiedCommitIdentifier,
dataFiles, // 当前活跃的数据文件
writerContainer.writer.maxSequenceNumber(), // 最大序列号
writerContainer.indexMaintainer,
writerContainer.deletionVectorsMaintainer,
increment)); // 未提交的文件增量
}
// ... (日志) ...
return result;
}
// --- Restore (从状态恢复) ---
@Override
public void restore(List<State<T>> states) {
for (State<T> state : states) {
// 1. 调用抽象方法 createWriter,根据恢复的状态创建 RecordWriter
RecordWriter<T> writer =
createWriter(
state.partition,
state.bucket,
state.dataFiles, // 恢复时传入已有的数据文件
state.maxSequenceNumber, // 恢复最大序列号
state.commitIncrement, // 恢复未提交的增量
compactExecutor(),
state.deletionVectorsMaintainer);
// ...
// 2. 创建 WriterContainer 并存入 writers Map
WriterContainer<T> writerContainer =
new WriterContainer<>(/* ... */);
// ...
writers.computeIfAbsent(state.partition, k -> new HashMap<>())
.put(state.bucket, writerContainer);
}
}
// --- 获取或创建 WriterContainer ---
protected WriterContainer<T> getWriterWrapper(BinaryRow partition, int bucket) {
Map<Integer, WriterContainer<T>> buckets = writers.get(partition);
if (buckets == null) {
buckets = new HashMap<>();
writers.put(partition.copy(), buckets); // 注意 partition.copy()
}
// 如果不存在,则调用 createWriterContainer 创建新的
return buckets.computeIfAbsent(
bucket, k -> createWriterContainer(partition.copy(), bucket, ignorePreviousFiles));
}
// --- 创建 WriterContainer (内部调用 createWriter) ---
protected WriterContainer<T> createWriterContainer(
BinaryRow partition, int bucket, boolean ignorePreviousFiles) {
// ... (日志和写入器数量检查,可能触发溢写) ...
List<DataFileMeta> restoreFiles = new ArrayList<>();
Snapshot latestSnapshot = null;
if (!ignorePreviousFiles) { // 如果不忽略历史文件
latestSnapshot = snapshotManager.latestSnapshot();
if (latestSnapshot != null) {
// 扫描已存在的该 partition 和 bucket 的文件
scanExistingFileMetas(latestSnapshot, partition, bucket, restoreFiles);
}
}
// ... (获取总桶数) ...
IndexMaintainer<T> indexMaintainer = null;
if (indexFactory != null) {
indexMaintainer = indexFactory.create(partition, bucket, scan, options);
}
DeletionVectorsMaintainer deletionVectorsMaintainer = null;
if (dvMaintainerFactory != null) {
deletionVectorsMaintainer =
dvMaintainerFactory.create(
tableName, partition, bucket, scan, options, latestSnapshot);
}
// 关键:调用抽象方法 createWriter 创建实际的 RecordWriter
RecordWriter<T> writer =
createWriter(
partition.copy(),
bucket,
restoreFiles, // 传入扫描到的历史文件
getMaxSequenceNumber(restoreFiles), // 从历史文件中获取最大序列号
null, // 新创建时,restoreIncrement 为 null
compactExecutor(),
deletionVectorsMaintainer);
// ...
return new WriterContainer<>(
writer,
totalBuckets,
indexMaintainer,
deletionVectorsMaintainer,
latestSnapshot == null ? null : latestSnapshot.id()); // 基础快照ID
}
// --- 抽象方法:创建 RecordWriter ---
// 子类必须实现此方法,以提供特定类型的 RecordWriter (如 MergeTreeWriter, AppendOnlyWriter)
protected abstract RecordWriter<T> createWriter(
BinaryRow partition,
int bucket,
List<DataFileMeta> restoreFiles,
long restoredMaxSeqNumber,
@Nullable CommitIncrement restoreIncrement,
ExecutorService compactExecutor,
@Nullable DeletionVectorsMaintainer dvMaintainer);
// --- 内部静态类:WriterContainer ---
// 封装了 RecordWriter 和其他相关状态
@VisibleForTesting
public static class WriterContainer<T> {
public final RecordWriter<T> writer; // 实际的写入器
public final int totalBuckets;
@Nullable public final IndexMaintainer<T> indexMaintainer;
@Nullable public final DeletionVectorsMaintainer deletionVectorsMaintainer;
protected final long baseSnapshotId; // 创建时的基础快照 ID
protected long lastModifiedCommitIdentifier; // 最后修改此 writer 的提交 ID
protected WriterContainer(
RecordWriter<T> writer,
// ...
Long baseSnapshotId) {
this.writer = writer;
// ...
this.baseSnapshotId = baseSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID -1 : baseSnapshotId;
this.lastModifiedCommitIdentifier = Long.MIN_VALUE;
}
// ...
}
}
核心职责与特点:
WriterContainer
管理:- 内部使用
writers
(一个Map<BinaryRow, Map<Integer, WriterContainer<T>>>
) 来为每个分区(BinaryRow
)和桶(Integer
)维护一个WriterContainer<T>
实例。 WriterContainer
是一个内部静态类,它封装了真正的RecordWriter<T>
实例以及与该写入器相关的元数据,如索引维护器 (IndexMaintainer
)、删除向量维护器 (DeletionVectorsMaintainer
)、基础快照ID (baseSnapshotId
) 和最后修改提交ID (lastModifiedCommitIdentifier
)。
- 内部使用
通用的写入、合并、提交流程:
write(partition, bucket, data)
: 定位到对应的WriterContainer
,然后调用其内部RecordWriter
的write
方法。compact(partition, bucket, fullCompaction)
: 类似地,调用RecordWriter
的compact
方法。prepareCommit(waitCompaction, commitIdentifier)
: 这是最复杂的方法之一。它会遍历所有活动的WriterContainer
,调用每个RecordWriter
的prepareCommit
方法来获取CommitIncrement
(包含新文件、合并文件等变更信息),然后将这些信息聚合成CommitMessage
列表返回。此方法还包含了清理不活跃WriterContainer
的逻辑。
状态管理 (Checkpoint 和 Restore):
checkpoint()
: 遍历所有WriterContainer
,从每个RecordWriter
获取其当前状态(包括数据文件、最大序列号、未提交的增量),并封装成FileStoreWrite.State<T>
对象列表。这用于 Flink 等引擎的故障恢复。restore(List<State<T>> states)
: 根据传入的状态列表,为每个状态调用抽象方法createWriter
来重建RecordWriter
,并恢复WriterContainer
。
抽象方法
createWriter(...)
:- 这是
AbstractFileStoreWrite
的核心扩展点。子类(如KeyValueFileStoreWrite
,AppendOnlyFileStoreWrite
)必须实现这个方法,以根据表的类型和配置创建并返回一个具体的RecordWriter<T>
实例。例如,KeyValueFileStoreWrite
会在这里创建MergeTreeWriter
。
- 这是
抽象方法
createWriterCleanChecker()
:- 另一个扩展点,子类需要实现它来提供一个函数,该函数用于判断一个空的(没有新数据写入的)
WriterContainer
是否可以被安全地关闭和清理。这通常涉及到检查该 writer 相关的提交是否已经被持久化。
- 另一个扩展点,子类需要实现它来提供一个函数,该函数用于判断一个空的(没有新数据写入的)
配置传递: 提供了多种
withXXX
方法来配置写入行为,如IOManager
、合并执行器等。
KeyValueFileStoreWrite
如何使用 MergeTreeWriter
写入
KeyValueFileStoreWrite
继承自 MemoryFileStoreWrite<KeyValue>
,而 MemoryFileStoreWrite
又继承自 AbstractFileStoreWrite<KeyValue>
。
关键在于 KeyValueFileStoreWrite
对 createWriter
方法的实现:
KeyValueFileStoreWrite.java
// ...
public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
// ... (其他成员和构造函数) ...
@Override
protected MergeTreeWriter createWriter( // 实现了父类 (MemoryFileStoreWrite -> AbstractFileStoreWrite) 的抽象方法
BinaryRow partition,
int bucket,
List<DataFileMeta> restoreFiles, // 从 AbstractFileStoreWrite.createWriterContainer 传入
long restoredMaxSeqNumber, // 从 AbstractFileStoreWrite.createWriterContainer 传入
@Nullable CommitIncrement restoreIncrement, // 从 AbstractFileStoreWrite.restore 传入 (新建时为 null)
ExecutorService compactExecutor, // 从 AbstractFileStoreWrite 传入
@Nullable DeletionVectorsMaintainer dvMaintainer) {
// ... (日志) ...
// 1. 构建 KeyValueFileWriterFactory (用于创建实际写入文件的 writer, 如 ParquetWriter)
KeyValueFileWriterFactory writerFactory =
writerFactoryBuilder.build(partition, bucket, options);
// 2. 获取主键比较器
Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
// 3. 初始化 Levels (管理 LSM-Tree 的层级,基于 restoreFiles)
Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels());
// 4. 创建合并策略 (CompactStrategy)
CompactStrategy compactStrategy = createCompactStrategy(options);
// 5. 创建合并管理器 (CompactManager),它会使用 compactStrategy 和 compactExecutor
CompactManager compactManager =
createCompactManager(
partition, bucket, compactStrategy, compactExecutor, levels, dvMaintainer);
// 6. 创建并返回 MergeTreeWriter 实例
return new MergeTreeWriter(
bufferSpillable(), // 是否允许写缓冲溢写到磁盘 (来自 MemoryFileStoreWrite)
options.writeBufferSpillDiskSize(),
options.localSortMaxNumFileHandles(),
options.spillCompressOptions(),
ioManager, // IOManager (来自 AbstractFileStoreWrite)
compactManager, // 上面创建的合并管理器
restoredMaxSeqNumber, // 恢复的最大序列号
keyComparator, // 主键比较器
mfFactory.create(), // 创建合并函数 (MergeFunction, 如 DeduplicateMergeFunction)
writerFactory, // 文件写入器工厂
options.commitForceCompact(), // 是否在提交时强制合并
options.changelogProducer(), // Changelog 生成策略
restoreIncrement, // 恢复的提交增量
UserDefinedSeqComparator.create(valueType, options)); // 用户定义序列号比较器
}
// ...
}
写入流程串联:
- 当外部调用
TableWriteImpl.write(row)
时,它会调用FileStoreWrite.write(partition, bucket, data)
。对于有主键的表,这个FileStoreWrite
就是KeyValueFileStoreWrite
的实例。 KeyValueFileStoreWrite.write()
方法本身没有直接实现,它会调用父类MemoryFileStoreWrite.write()
。MemoryFileStoreWrite.write()
进一步调用其父类AbstractFileStoreWrite.write(partition, bucket, data)
。AbstractFileStoreWrite.write()
:- 调用
getWriterWrapper(partition, bucket)
来获取或创建WriterContainer<KeyValue>
。 - 如果
WriterContainer
不存在,getWriterWrapper
会调用createWriterContainer(...)
。 createWriterContainer(...)
内部会调用被KeyValueFileStoreWrite
重写的createWriter(...)
方法。KeyValueFileStoreWrite.createWriter(...)
如上所示,创建并返回一个MergeTreeWriter
实例。这个MergeTreeWriter
被封装在新的WriterContainer
中。- 获取到
WriterContainer
后,调用container.writer.write(data)
,这里的container.writer
就是刚刚创建或之前已存在的MergeTreeWriter
实例。
- 调用
MergeTreeWriter.write(KeyValue data)
:- 数据被写入
MergeTreeWriter
内部的内存缓冲区(通常是一个基于排序结构如跳表或红黑树的 MemTable)。 MergeTreeWriter
会为每条记录分配一个序列号(sequence number)。
- 数据被写入
MergeTreeWriter
这个类是 Paimon 实现 LSM 树(Log-Structured Merge-Tree)结构并进行数据写入的核心组件。通过理解这个类,我们可以清晰地看到 Paimon 是如何组织和管理数据的。
Paimon 的 LSM 树结构,正如其文档中提到的,是一种为写密集型负载优化的数据结构。其核心思想是将写入操作先缓存在内存中(MemTable),达到一定条件后批量刷写到磁盘形成有序的文件(Sorted Run,通常称为 L0 层文件)。后台会不断地将这些文件进行合并(Compaction),形成更大、更少、层级更高的文件(L1, L2...Ln 层),从而优化读取性能并回收空间。
MergeTreeWriter
正是这个过程中的关键执行者,它负责:
- 接收写入的
KeyValue
数据。 - 在内存中缓冲和排序这些数据(使用
WriteBuffer
)。 - 将内存中的数据刷写到磁盘,形成新的 L0 层数据文件。
- 与
CompactManager
协作,触发和处理后台的合并任务。 - 在提交时,收集新生成的文件、因合并而改变的文件等信息,形成
CommitIncrement
。
下面我们详细分析 MergeTreeWriter.java
的主要构成和工作流程:
1. 构造函数和核心成员变量
MergeTreeWriter.java
public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
// --- 配置项和外部依赖 ---
private final boolean writeBufferSpillable; // 写缓冲是否允许溢写到磁盘
private final MemorySize maxDiskSize; // 溢写到磁盘时,单个排序运行文件的最大大小
private final int sortMaxFan; // 外部排序时的最大扇入(合并多少个run)
private final CompressOptions sortCompression; // 溢写文件压缩配置
private final IOManager ioManager; // 用于溢写等IO操作
private final RowType keyType; // 主键类型
private final RowType valueType; // 值类型
private final CompactManager compactManager; // 合并管理器,负责LSM树的层级管理和合并调度
private final Comparator<InternalRow> keyComparator; // 主键比较器
private final MergeFunction<KeyValue> mergeFunction; // 合并函数,定义数据如何合并(如去重、部分更新)
private final KeyValueFileWriterFactory writerFactory; // 文件写入器工厂,用于创建实际写入数据文件的writer
private final boolean commitForceCompact; // 是否在提交时强制等待合并完成
private final ChangelogProducer changelogProducer; // Changelog生成策略
@Nullable private final FieldsComparator userDefinedSeqComparator; // 用户定义的序列比较器 (可选)
// --- 状态追踪:用于记录文件变更 ---
// 这些集合存储了在一个写事务(从上一个commit到下一个prepareCommit)期间发生的文件变化
private final LinkedHashSet<DataFileMeta> newFiles; // 本次写入新生成的L0层数据文件
private final LinkedHashSet<DataFileMeta> deletedFiles; // 本次写入明确删除的文件 (通常在lookup且找到旧文件时)
private final LinkedHashSet<DataFileFileMeta> newFilesChangelog; // 本次写入新生成的Changelog文件 (如果开启了ChangelogProducer.INPUT)
private final LinkedHashMap<String, DataFileMeta> compactBefore; // 合并操作发生前的文件 (将被替换的旧文件)
private final LinkedHashSet<DataFileMeta> compactAfter; // 合并操作完成后生成的新文件
private final LinkedHashSet<DataFileMeta> compactChangelog; // 合并操作生成的Changelog文件
@Nullable private CompactDeletionFile compactDeletionFile; // 记录合并过程中产生的删除标记文件
// --- 内部状态 ---
private long newSequenceNumber; // 当前分配的最大序列号 + 1,用于为新写入的记录分配递增的序列号
private WriteBuffer writeBuffer; // 核心的写缓冲区域,通常是 SortBufferWriteBuffer
private boolean isInsertOnly; // 是否为仅插入模式 (优化手段)
public MergeTreeWriter(
boolean writeBufferSpillable,
MemorySize maxDiskSize,
int sortMaxFan,
CompressOptions sortCompression,
IOManager ioManager,
CompactManager compactManager, // 由外部(如 KeyValueFileStoreWrite)创建并传入
long maxSequenceNumber, // 从已存在的文件中恢复的最大序列号
Comparator<InternalRow> keyComparator,
MergeFunction<KeyValue> mergeFunction, // 由外部创建并传入
KeyValueFileWriterFactory writerFactory, // 由外部创建并传入
boolean commitForceCompact,
ChangelogProducer changelogProducer,
@Nullable CommitIncrement increment, // 用于从checkpoint恢复时的增量信息
@Nullable FieldsComparator userDefinedSeqComparator) {
this.writeBufferSpillable = writeBufferSpillable;
this.maxDiskSize = maxDiskSize;
this.sortMaxFan = sortMaxFan;
this.sortCompression = sortCompression;
this.ioManager = ioManager;
this.keyType = writerFactory.keyType();
this.valueType = writerFactory.valueType();
this.compactManager = compactManager; // 关键:负责LSM的层级管理和合并
this.newSequenceNumber = maxSequenceNumber + 1; // 初始化序列号
this.keyComparator = keyComparator;
this.mergeFunction = mergeFunction; // 关键:定义数据如何合并
this.writerFactory = writerFactory; // 关键:创建文件写入器
this.commitForceCompact = commitForceCompact;
this.changelogProducer = changelogProducer;
this.userDefinedSeqComparator = userDefinedSeqComparator;
// 初始化用于追踪文件变更的集合
this.newFiles = new LinkedHashSet<>();
this.deletedFiles = new LinkedHashSet<>();
this.newFilesChangelog = new LinkedHashSet<>();
this.compactBefore = new LinkedHashMap<>();
this.compactAfter = new LinkedHashSet<>();
this.compactChangelog = new LinkedHashSet<>();
// 如果是从 CommitIncrement 恢复,则加载已有的文件变更信息
if (increment != null) {
newFiles.addAll(increment.newFilesIncrement().newFiles());
deletedFiles.addAll(increment.newFilesIncrement().deletedFiles());
newFilesChangelog.addAll(increment.newFilesIncrement().changelogFiles());
increment
.compactIncrement()
.compactBefore()
.forEach(f -> compactBefore.put(f.fileName(), f)); // 使用文件名作为key存入map
compactAfter.addAll(increment.compactIncrement().compactAfter());
compactChangelog.addAll(increment.compactIncrement().changelogFiles());
updateCompactDeletionFile(increment.compactDeletionFile());
}
}
// ...
}
关键点:
CompactManager
: 这是MergeTreeWriter
与 LSM 树的层级结构和合并过程交互的核心。CompactManager
内部维护了 LSM 树的各个层级(Levels),并决定何时以及如何触发合并。MergeFunction
: 定义了当具有相同主键的多条记录相遇时(无论是在刷盘时还是在合并时),如何产生最终结果。例如,DeduplicateMergeFunction
会保留最新序列号的记录,PartialUpdateMergeFunction
会进行部分列的更新。KeyValueFileWriterFactory
: 负责创建实际写入数据文件(如 Parquet 文件)的RollingFileWriter
。newSequenceNumber
: Paimon 使用序列号(Sequence Number)来区分记录的新旧,实现 MVCC(多版本并发控制)和有序合并。每条写入的记录都会被赋予一个单调递增的序列号。- 文件追踪集合:
newFiles
,compactBefore
,compactAfter
等集合非常重要,它们记录了在一个事务周期内,由于写入和合并操作导致的文件变化。这些信息将在prepareCommit
时用于生成CommitIncrement
。
2. 内存管理和写缓冲 (setMemoryPool
和 WriteBuffer
)
MergeTreeWriter
实现了 MemoryOwner
接口,意味着它需要管理自己的内存使用。
MergeTreeWriter.java
// ...
@Override
public void setMemoryPool(MemorySegmentPool memoryPool) {
this.writeBuffer =
new SortBufferWriteBuffer( // 创建一个基于排序的写缓冲
keyType,
valueType,
userDefinedSeqComparator,
memoryPool, // 传入内存池
writeBufferSpillable, // 是否允许溢写
maxDiskSize, // 溢写文件大小限制
sortMaxFan, // 外部排序扇入
sortCompression, // 溢写文件压缩
ioManager); // IO管理器
}
@Override
public long memoryOccupancy() {
return writeBuffer.memoryOccupancy(); // 返回写缓冲占用的内存大小
}
@Override
public void flushMemory() throws Exception { // 尝试释放内存,如果写缓冲支持
boolean success = writeBuffer.flushMemory();
if (!success) {
// 如果 WriteBuffer 无法仅通过内部操作释放足够内存(例如,它已经溢写了部分数据但仍持有少量数据)
// 则强制执行一次完整的刷盘操作
flushWriteBuffer(false, false);
}
}
// ...
setMemoryPool(MemorySegmentPool memoryPool)
: 这个方法由外部(如MemoryFileStoreWrite
)调用,传入一个内存池。MergeTreeWriter
使用这个内存池来初始化其writeBuffer
。WriteBuffer
(通常是SortBufferWriteBuffer
): 这是数据写入内存的第一个落点,可以看作是 LSM 树的 MemTable。SortBufferWriteBuffer
内部会维护一个排序的数据结构(例如,基于BinaryInMemorySortBuffer
)。- 当数据写入时,会先放入这个内存缓冲区。
- 如果
writeBufferSpillable
为true
且内存不足,SortBufferWriteBuffer
能够将部分已排序的数据溢写(spill)到磁盘上的临时排好序的文件(Sorted Run),从而释放内存。这使得MergeTreeWriter
可以处理远超内存大小的数据写入。
3. 写入路径 (write
方法)
MergeTreeWriter.java
// ...
private long newSequenceNumber() { // 获取并递增序列号
return newSequenceNumber++;
}
@Override
public void write(KeyValue kv) throws Exception {
long sequenceNumber = newSequenceNumber(); // 为这条记录分配一个新的序列号
// 将记录 (序列号, RowKind, Key, Value) 放入写缓冲
boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
// 如果 WriteBuffer 满了,无法写入(即使尝试了内部溢写)
// 则先强制刷盘当前的 WriteBuffer 内容
flushWriteBuffer(false, false);
// 再次尝试写入
success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
// 如果再次失败,说明单个元素都放不下,抛出异常
throw new RuntimeException("Mem table is too small to hold a single element.");
}
}
}
// ...
- 每条写入的
KeyValue
记录都会被赋予一个全局唯一的、单调递增的sequenceNumber
。 - 记录被添加到
writeBuffer
中。 - 如果
writeBuffer
返回false
(表示无法接收更多数据,即使尝试了内部溢写),则会触发一次flushWriteBuffer
操作,将当前缓冲区的内容刷到磁盘形成 L0 文件,然后再次尝试写入。
4. 刷盘路径 (flushWriteBuffer
方法)
这是将内存中的 WriteBuffer
(MemTable) 内容持久化到磁盘形成 L0 文件的核心逻辑。
MergeTreeWriter.java
// ...
private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFullCompaction)
throws Exception {
if (writeBuffer.size() > 0) { // 只有当缓冲区有数据时才执行
// 检查是否需要等待上一次的合并完成
if (compactManager.shouldWaitForLatestCompaction()) {
waitForLatestCompaction = true;
}
// 根据 ChangelogProducer 配置,决定是否创建 Changelog 文件写入器
final RollingFileWriter<KeyValue, DataFileMeta> changelogWriter =
(changelogProducer == ChangelogProducer.INPUT && !isInsertOnly)
? writerFactory.createRollingChangelogFileWriter(0) // Level 0
: null;
// 创建数据文件写入器 (L0 层文件)
final RollingFileWriter<KeyValue, DataFileMeta> dataWriter =
writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND); // Level 0, 追加模式
try {
// 遍历 WriteBuffer 中的数据 (此时数据已在 WriteBuffer 内部排序)
// 应用 MergeFunction 进行记录合并 (例如,处理具有相同key的多条记录)
// 如果 changelogWriter 存在,则将结果写入 Changelog 文件
// 将最终结果写入数据文件
writeBuffer.forEach(
keyComparator, // 主键比较器
mergeFunction, // 合并函数
changelogWriter == null ? null : changelogWriter::write, // 写Changelog
dataWriter::write); // 写数据文件
} finally {
writeBuffer.clear(); // 清空内存缓冲区
if (changelogWriter != null) {
changelogWriter.close();
}
dataWriter.close();
}
// 获取刷盘后生成的数据文件元数据
List<DataFileMeta> dataMetas = dataWriter.result();
if (changelogWriter != null) {
// 如果生成了Changelog文件,将其元数据加入 newFilesChangelog 集合
newFilesChangelog.addAll(changelogWriter.result());
} else if (changelogProducer == ChangelogProducer.INPUT && isInsertOnly) {
// 特殊情况:如果是 InsertOnly 模式且 ChangelogProducer 为 INPUT,
// 则将数据文件复制一份作为 Changelog 文件
List<DataFileMeta> changelogMetas = new ArrayList<>();
for (DataFileMeta dataMeta : dataMetas) {
String newFileName = writerFactory.newChangelogFileName(0);
DataFileMeta changelogMeta = dataMeta.rename(newFileName);
writerFactory.copyFile(dataMeta, changelogMeta); // 实际复制文件内容
changelogMetas.add(changelogMeta);
}
newFilesChangelog.addAll(changelogMetas);
}
// 将新生成的数据文件元数据加入 newFiles 集合,并通知 CompactManager 有新文件加入 L0
for (DataFileMeta dataMeta : dataMetas) {
newFiles.add(dataMeta);
compactManager.addNewFile(dataMeta); // 通知 CompactManager 新的 L0 文件
}
}
// 尝试同步最新的合并结果 (如果 waitForLatestCompaction 为 true,则会阻塞等待)
trySyncLatestCompaction(waitForLatestCompaction);
// 触发一次合并检查 (如果 forcedFullCompaction 为 true,可能会触发更激进的合并)
compactManager.triggerCompaction(forcedFullCompaction);
}
// ...
流程:
- 检查缓冲区: 只有当
writeBuffer
中有数据时才执行刷盘。 - 创建文件写入器:
- 根据
changelogProducer
的配置,可能会创建一个用于写入 Changelog 文件的RollingFileWriter
。 - 创建一个用于写入 L0 数据文件的
RollingFileWriter
。FileSource.APPEND
表示这是追加产生的新文件。
- 根据
- 遍历并写入:
- 调用
writeBuffer.forEach(...)
,它会迭代writeBuffer
中已经按主键(和用户定义序列号)排序好的数据。 - 在迭代过程中,会应用
mergeFunction
来处理具有相同主key的记录(例如,只保留最新的,或者进行聚合)。 - 处理后的结果会写入数据文件,如果
changelogWriter
存在,也会写入 Changelog 文件。
- 调用
- 清理和关闭: 清理
writeBuffer
,关闭文件写入器。 - 收集元数据:
- 获取生成的数据文件 (
dataMetas
) 和 Changelog 文件 (changelogWriter.result()
) 的元数据 (DataFileMeta
)。 - 将这些元数据分别添加到
newFiles
和newFilesChangelog
集合中。 - 关键: 调用
compactManager.addNewFile(dataMeta)
,将新生成的 L0 文件告知CompactManager
,CompactManager
会将这些文件加入到 LSM 树的 L0 层。
- 获取生成的数据文件 (
- 同步与触发合并:
trySyncLatestCompaction
: 尝试获取并处理已完成的后台合并任务的结果。compactManager.triggerCompaction
: 通知CompactManager
检查是否需要触发新的合并任务(例如,L0 文件过多)。
5. 合并的集成 (compact
方法和 trySyncLatestCompaction
)
MergeTreeWriter
本身不直接执行合并逻辑,而是依赖 CompactManager
。
MergeTreeWriter.java
// ...
@Override
public void compact(boolean fullCompaction) throws Exception {
// 外部调用 compact 时,实际上是先将内存缓冲区刷盘,
// 然后依赖 CompactManager 的合并策略来执行实际的合并。
// fullCompaction 参数会传递给 compactManager.triggerCompaction
flushWriteBuffer(true, fullCompaction);
}
@Override
public void addNewFiles(List<DataFileMeta> files) {
// 通常用于外部合并作业完成后,将合并产生的新文件通知给当前 writer 的 compactManager
files.forEach(compactManager::addNewFile);
}
@Override
public Collection<DataFileMeta> dataFiles() {
// 返回 CompactManager 管理的所有当前有效的数据文件
return compactManager.allFiles();
}
// ...
private void trySyncLatestCompaction(boolean blocking) throws Exception {
// 从 CompactManager 获取已完成的合并任务的结果
// blocking 为 true 时会阻塞等待,直到有结果或超时
Optional<CompactResult> result = compactManager.getCompactionResult(blocking);
result.ifPresent(this::updateCompactResult); // 如果有结果,则更新文件状态
}
private void updateCompactResult(CompactResult result) {
// result 包含了合并前的文件 (before) 和合并后的文件 (after),以及可能的Changelog文件
Set<String> afterFiles =
result.after().stream().map(DataFileMeta::fileName).collect(Collectors.toSet());
for (DataFileMeta file : result.before()) { // 遍历合并前的文件
if (compactAfter.remove(file)) {
// 如果这个 'before' 文件之前是某个合并操作的 'after' 文件 (即中间文件)
// 并且它不是升级文件(upgrade file)的输入或输出,那么它可以被安全删除
if (!compactBefore.containsKey(file.fileName())
&& !afterFiles.contains(file.fileName())) {
writerFactory.deleteFile(file);
}
} else {
// 否则,这个 'before' 文件是原始数据文件或之前L0层刷盘的文件,
// 将其加入 compactBefore 集合,表示它在此次合并中被替换了
compactBefore.put(file.fileName(), file);
}
}
// 将合并后生成的新文件加入 compactAfter 集合
compactAfter.addAll(result.after());
// 将合并生成的Changelog文件加入 compactChangelog 集合
compactChangelog.addAll(result.changelog());
// 更新合并产生的删除标记文件
updateCompactDeletionFile(result.deletionFile());
}
private void updateCompactDeletionFile(@Nullable CompactDeletionFile newDeletionFile) {
if (newDeletionFile != null) {
compactDeletionFile =
compactDeletionFile == null
? newDeletionFile
: newDeletionFile.mergeOldFile(compactDeletionFile); // 合并多个删除文件
}
}
// ...
compact(boolean fullCompaction)
: 当外部(如KeyValueFileStoreWrite
)调用此方法时,它首先会调用flushWriteBuffer
将内存数据刷盘。刷盘操作的最后会调用compactManager.triggerCompaction
,这会使得CompactManager
根据其内部的合并策略(如文件数量、大小等)来决定是否启动新的合并任务。trySyncLatestCompaction(boolean blocking)
: 此方法用于从CompactManager
获取已经完成的合并任务的结果 (CompactResult
)。updateCompactResult(CompactResult result)
: 当获取到合并结果后,此方法会更新MergeTreeWriter
内部的文件追踪集合:- 将被合并掉的旧文件(
result.before()
)加入到compactBefore
集合。 - 将合并产生的新文件(
result.after()
)加入到compactAfter
集合。 - 处理中间文件(即某个合并的输出同时是另一个合并的输入)的删除。
- 记录合并产生的 Changelog 文件和删除标记文件。
- 将被合并掉的旧文件(
6. 提交准备 (prepareCommit
方法)
这是生成最终提交增量信息 (CommitIncrement
) 的地方。
MergeTreeWriter.java
// ...
@Override
public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception {
// 1. 先将内存缓冲区的数据刷盘
flushWriteBuffer(waitCompaction, false);
// 2. 根据配置决定是否需要等待合并完成
if (commitForceCompact) { // 如果配置了提交时强制合并
waitCompaction = true;
}
// 再次检查 CompactManager 是否认为需要等待 (例如L0文件过多)
if (compactManager.shouldWaitForPreparingCheckpoint()) {
waitCompaction = true;
}
// 3. 同步最新的合并结果 (如果 waitCompaction 为 true,会阻塞)
trySyncLatestCompaction(waitCompaction);
// 4. 从各个追踪集合中收集文件变更信息,并清空集合,准备下一次事务
return drainIncrement();
}
private CommitIncrement drainIncrement() {
// 构建 DataIncrement: 包含新刷盘的L0文件、删除的文件、新刷盘产生的Changelog文件
DataIncrement dataIncrement =
new DataIncrement(
new ArrayList<>(newFiles),
new ArrayList<>(deletedFiles),
new ArrayList<>(newFilesChangelog));
// 构建 CompactIncrement: 包含合并前的旧文件、合并后的新文件、合并产生的Changelog文件
CompactIncrement compactIncrement =
new CompactIncrement(
new ArrayList<>(compactBefore.values()),
new ArrayList<>(compactAfter),
new ArrayList<>(compactChangelog));
CompactDeletionFile drainDeletionFile = compactDeletionFile; // 获取当前的删除标记文件
// 清空所有追踪集合,为下一次提交做准备
newFiles.clear();
deletedFiles.clear();
newFilesChangelog.clear();
compactBefore.clear();
compactAfter.clear();
compactChangelog.clear();
compactDeletionFile = null;
// 返回包含所有文件变更的 CommitIncrement
return new CommitIncrement(dataIncrement, compactIncrement, drainDeletionFile);
}
// ...
流程:
- 刷盘: 首先调用
flushWriteBuffer
确保所有内存中的数据都已刷到磁盘 L0 层。 - 等待合并 (可选): 根据
commitForceCompact
配置和CompactManager
的状态,决定是否需要阻塞等待当前正在进行的或计划中的合并任务完成。 - 同步合并结果: 调用
trySyncLatestCompaction
来获取并处理所有已完成的合并任务的结果,更新compactBefore
和compactAfter
等集合。 drainIncrement()
:- 从
newFiles
,deletedFiles
,newFilesChangelog
构建DataIncrement
对象。 - 从
compactBefore
,compactAfter
,compactChangelog
构建CompactIncrement
对象。 - 获取当前的
compactDeletionFile
。 - 清空所有这些追踪文件变更的集合,以便它们可以用于下一个提交周期。
- 将
DataIncrement
,CompactIncrement
, 和CompactDeletionFile
封装成一个CommitIncrement
对象返回。这个对象会被上层(如AbstractFileStoreWrite
)用于生成CommitMessage
,最终更新表的 Manifest 和 Snapshot。
- 从
7. 关闭操作 (close
方法)
MergeTreeWriter.java
// ...
@Override
public void close() throws Exception {
// 1. 取消正在进行的合并任务,防止阻塞关闭流程
compactManager.cancelCompaction();
// 2. 同步所有已完成的合并任务的结果
sync(); // sync() 内部调用 trySyncLatestCompaction(true)
// 3. 关闭 CompactManager
compactManager.close();
// 4. 清理临时文件和未提交的文件
// 这些是本事务中产生但未被成功提交的文件
List<DataFileMeta> delete = new ArrayList<>(newFiles); // 未提交的新L0文件
newFiles.clear();
deletedFiles.clear(); // deletedFiles 通常在 prepareCommit 时已处理
for (DataFileMeta file : newFilesChangelog) { // 未提交的L0 Changelog文件
writerFactory.deleteFile(file);
}
newFilesChangelog.clear();
for (DataFileMeta file : compactAfter) { // 未提交的合并后产生的新文件
// 如果这个文件不是某个升级操作的输出 (即不是 compactBefore 中已有的),则删除
if (!compactBefore.containsKey(file.fileName())) {
delete.add(file);
}
}
compactAfter.clear();
for (DataFileMeta file : compactChangelog) { // 未提交的合并产生的Changelog文件
writerFactory.deleteFile(file);
}
compactChangelog.clear();
for (DataFileMeta file : delete) { // 执行删除
writerFactory.deleteFile(file);
}
if (compactDeletionFile != null) { // 清理删除标记文件
compactDeletionFile.clean();
}
}
// ...
关闭时,会尝试清理所有在本 MergeTreeWriter
生命周期内产生但未被成功提交(即未包含在返回的 CommitIncrement
中并被上层持久化)的临时文件和数据文件。
总结:MergeTreeWriter
如何组织 LSM
MergeTreeWriter
通过以下机制实现了 LSM 树的核心写入和维护逻辑:
- MemTable (内存缓冲): 使用
WriteBuffer
(通常是SortBufferWriteBuffer
) 作为内存中的 MemTable,接收写入数据并进行排序。支持溢写到磁盘临时文件以处理大数据量。 - Flush (刷盘到 L0):
flushWriteBuffer
方法负责将 MemTable 中的数据经过MergeFunction
处理后,写入到磁盘形成新的、有序的 L0 层数据文件(Sorted Run)。新生成的 L0 文件会通过compactManager.addNewFile()
注册到CompactManager
中。 - Compaction (合并):
MergeTreeWriter
自身不执行合并,而是委托给CompactManager
。CompactManager
内部维护 LSM 树的层级结构 (Levels
),并根据合并策略 (CompactStrategy
) 异步执行合并任务。MergeTreeWriter
通过trySyncLatestCompaction
获取合并结果 (CompactResult
),并使用updateCompactResult
更新其追踪的合并前 (compactBefore
) 和合并后 (compactAfter
) 的文件列表。
- Sequence Number: 为每条记录分配序列号,用于 MVCC 和保证合并的正确性。
- File Tracking: 通过
newFiles
,compactBefore
,compactAfter
等集合精确追踪在一个事务周期内文件的产生和变化。 - Commit Increment: 在
prepareCommit
时,将收集到的文件变更信息打包成CommitIncrement
,供上层写入逻辑生成快照。
因此,MergeTreeWriter
是 Paimon 实现有主键表(Primary-Key Table)高性能写入和数据管理的核心。它巧妙地结合了内存缓冲、溢写、后台合并以及精确的文件状态追踪,有效地实现了 LSM 树的机制。
BinaryInMemorySortBuffer
的核心设计
BinaryInMemorySortBuffer
继承自 BinaryIndexedSortable
,并实现了 SortBuffer
接口。其核心设计思想是:
- 面向二进制行数据 (
BinaryRow
): Paimon 内部大量使用BinaryRow
这种序列化后的二进制格式来表示行数据,以减少序列化和反序列化开销,并能更高效地利用内存。BinaryInMemorySortBuffer
直接操作这些二进制数据。 - 内存段管理 (
MemorySegment
): 数据和索引都存储在从MemorySegmentPool
分配的MemorySegment
列表中。这允许对内存进行更细致的管理和复用。 - 索引与数据分离:
- 数据存储 (
recordBufferSegments
): 实际的InternalRow
数据经过序列化后,紧凑地存储在recordBufferSegments
(一个ArrayList<MemorySegment>
) 中。写入通过SimpleCollectingOutputView
完成,它会自动管理MemorySegment
的分配和切换。 - 排序索引 (
sortIndex
): 这是一个独立的索引结构 (同样基于ArrayList<MemorySegment>
,由父类BinaryIndexedSortable
管理)。每个索引条目包含:- 一个指向
recordBufferSegments
中对应记录起始位置的指针 (offset)。 - 一个规范化键 (Normalized Key),这是从记录的排序列计算出来的前缀,用于快速比较。
- 一个指向
- 数据存储 (
- 写入过程 (
write
方法):- 当一条
InternalRow
写入时,它首先被序列化到recordBufferSegments
中。 - 然后,在
sortIndex
中为这条记录创建一个条目,包含指向其在recordBufferSegments
中位置的指针和其规范化键。 - 这样,实际的数据是按写入顺序追加存储的,而排序信息则记录在
sortIndex
中。
- 当一条
- 排序过程 (
sortedIterator
方法):- 当需要获取排序后的迭代器时,会调用
new QuickSort().sort(this)
。 - 这里的
QuickSort
是一个通用的排序算法,它作用于实现了IndexedSortable
接口的对象(即当前的BinaryInMemorySortBuffer
实例)。 IndexedSortable
接口定义了compare(i, j)
和swap(i, j)
等方法。compare(i, j)
: 比较sortIndex
中第i
个和第j
个条目。它首先比较它们的规范化键。如果规范化键相同或者不能完全区分(normalizedKeyFullyDetermines
为 false),则会根据指针去recordBufferSegments
中反序列化实际的行数据进行完整比较。swap(i, j)
: 只交换sortIndex
中的条目(指针和规范化键),而不需要移动recordBufferSegments
中庞大的实际行数据。这是非常关键的性能优化点。
- 当需要获取排序后的迭代器时,会调用
- 迭代 (
iterator
方法): 排序完成后,迭代器会按照sortIndex
中条目的顺序,通过指针从recordBufferSegments
中读取并反序列化BinaryRow
。
为什么没有使用跳表?
虽然跳表是一种优秀的动态排序数据结构,支持高效的插入、删除和查找(平均时间复杂度 O(log n)),但对于 BinaryInMemorySortBuffer
的特定应用场景,当前的设计可能更为合适:
批量处理和写优化:
BinaryInMemorySortBuffer
主要用于SortBufferWriteBuffer
,其目的是在内存中缓冲一批数据,然后一次性排序并刷写到磁盘(形成 L0 文件)或进行合并。这是一个典型的批量写和批量排序的场景。- 在这种场景下,数据通常是先累积,然后统一排序。快速排序等基于数组的排序算法在数据已经全部加载到内存后,通常具有非常好的平均性能和缓存友好性。
内存布局和序列化成本:
- Paimon 强调对二进制数据的直接操作以避免不必要的序列化/反序列化。跳表的节点通常需要存储指向其他节点的指针,如果每个节点都存储完整的
BinaryRow
对象,在节点间移动或比较时可能会涉及到更复杂的内存操作或间接访问。 - 当前设计将实际数据紧凑存储,排序时仅操作轻量级的索引,这对于二进制数据处理非常高效。
- Paimon 强调对二进制数据的直接操作以避免不必要的序列化/反序列化。跳表的节点通常需要存储指向其他节点的指针,如果每个节点都存储完整的
空间开销:
- 跳表的每个节点除了数据外,还需要存储多个指向不同层级下一个节点的指针,这会带来额外的空间开销。虽然可以通过优化减少,但相对于当前基于数组的紧凑索引,空间利用率可能稍逊。
实现的复杂度:
- 虽然不是主要因素,但相对于经典的快速排序,跳表的实现和维护(尤其是在并发场景下,尽管这里主要是单线程写)通常更为复杂。
主要操作是“写满后排序一次”:
BinaryInMemorySortBuffer
的生命周期通常是:写入数据直到缓冲区满 -> 执行一次排序 -> 数据被消费(例如写入文件)。它不像数据库索引那样需要持续地、动态地维护排序结构并响应随机读写。- 对于这种“一次性”排序的需求,快速排序是一个非常成熟和高效的选择。
总结
BinaryInMemorySortBuffer
的设计目标是在内存中高效地缓冲和排序一批 BinaryRow
数据。它通过索引与数据分离的策略,结合规范化键进行快速比较,并使用快速排序对索引进行排序,从而避免了移动庞大的实际数据。这种设计非常适合其作为写缓冲区的角色,专注于批量数据的处理效率和内存的有效利用。
虽然跳表在动态插入和查找方面表现优异,但对于 BinaryInMemorySortBuffer
这种更侧重于批量写入和一次性排序的场景,当前基于数组索引和快速排序的方案在简单性、内存效率和特定场景下的性能方面可能更具优势。
以下是相关的代码片段,展示了其核心机制:
构造函数和核心成员 (部分在父类 BinaryIndexedSortable
):
BinaryInMemorySortBuffer.java
// ...
public class BinaryInMemorySortBuffer extends BinaryIndexedSortable implements SortBuffer {
// ...
private final AbstractRowDataSerializer<InternalRow> inputSerializer;
private final ArrayList<MemorySegment> recordBufferSegments; // 存储实际序列化的行数据
private final SimpleCollectingOutputView recordCollector; // 用于向 recordBufferSegments 写入数据
// 来自父类 BinaryIndexedSortable
// protected ArrayList<MemorySegment> sortIndex; // 存储排序索引(指针 + 规范化键)
// protected NormalizedKeyComputer normalizedKeyComputer;
// protected RecordComparator comparator;
// protected RandomAccessInputView recordBuffer; // 用于从 recordBufferSegments 读取数据
// ...
public static BinaryInMemorySortBuffer createBuffer(
NormalizedKeyComputer normalizedKeyComputer,
AbstractRowDataSerializer<InternalRow> serializer,
RecordComparator comparator,
MemorySegmentPool memoryPool) {
// ...
ArrayList<MemorySegment> recordBufferSegments = new ArrayList<>(16);
return new BinaryInMemorySortBuffer(
normalizedKeyComputer,
serializer,
comparator,
recordBufferSegments,
new SimpleCollectingOutputView( // 负责将数据写入 recordBufferSegments
recordBufferSegments, memoryPool, memoryPool.pageSize()),
memoryPool);
}
private BinaryInMemorySortBuffer(
NormalizedKeyComputer normalizedKeyComputer,
AbstractRowDataSerializer<InternalRow> inputSerializer,
RecordComparator comparator,
ArrayList<MemorySegment> recordBufferSegments,
SimpleCollectingOutputView recordCollector,
MemorySegmentPool pool) {
super( // 初始化父类 BinaryIndexedSortable
normalizedKeyComputer,
new BinaryRowSerializer(inputSerializer.getArity()), // 内部使用的序列化器
comparator,
recordBufferSegments, // 父类也需要访问 recordBufferSegments 来创建 recordBuffer 视图
pool);
this.inputSerializer = inputSerializer; // 用于序列化用户传入的 InternalRow
this.recordBufferSegments = recordBufferSegments;
this.recordCollector = recordCollector;
// ...
this.clear();
}
// ...
}
写入逻辑:
BinaryInMemorySortBuffer.java
// ...
@Override
public boolean write(InternalRow record) throws IOException {
tryInitialize(); // 确保内存已初始化
// 检查 sortIndex 是否有足够空间
if (!checkNextIndexOffset()) { // 来自父类 BinaryIndexedSortable
return false; // 内存不足
}
// 将记录序列化到 recordBufferSegments
int skip;
try {
// inputSerializer 是用户传入的原始行数据的序列化器
// recordCollector 将序列化后的字节写入 recordBufferSegments
skip = this.inputSerializer.serializeToPages(record, this.recordCollector);
} catch (EOFException e) {
return false; // 内存不足
}
final long newOffset = this.recordCollector.getCurrentOffset(); // 记录在 recordBufferSegments 中的新偏移量
long currOffset = currentDataBufferOffset + skip; // 记录在当前数据段内的相对偏移
// 在 sortIndex 中写入指针 (currOffset) 和规范化键
writeIndexAndNormalizedKey(record, currOffset); // 来自父类 BinaryIndexedSortable
this.sortIndexBytes += this.indexEntrySize; // 更新索引占用的字节数
this.currentDataBufferOffset = newOffset; // 更新数据缓冲区的总偏移
return true;
}
// ...
父类 BinaryIndexedSortable
中的 writeIndexAndNormalizedKey
方法:
BinaryIndexedSortable.java
// ...
protected void writeIndexAndNormalizedKey(InternalRow record, long currOffset) {
// 在 currentSortIndexSegment (sortIndex 的一部分) 的 currentSortIndexOffset 位置写入指针
this.currentSortIndexSegment.putLong(this.currentSortIndexOffset, currOffset);
if (this.numKeyBytes != 0) { // 如果使用规范化键
// 计算并写入规范化键到指针之后的位置
normalizedKeyComputer.putKey(
record, this.currentSortIndexSegment, this.currentSortIndexOffset + OFFSET_LEN);
}
this.currentSortIndexOffset += this.indexEntrySize; // 移动到下一个索引条目位置
this.numRecords++;
}
// ...
排序和迭代:
BinaryInMemorySortBuffer.java
// ...
@Override
public final MutableObjectIterator<BinaryRow> sortedIterator() {
if (numRecords > 0) {
new QuickSort().sort(this); // 对 this (BinaryIndexedSortable) 进行排序
}
return iterator(); // 返回一个基于已排序索引的迭代器
}
private MutableObjectIterator<BinaryRow> iterator() {
tryInitialize();
return new MutableObjectIterator<BinaryRow>() {
// ...
private MemorySegment currentIndexSegment = sortIndex.get(0); // 当前迭代的索引段
@Override
public BinaryRow next(BinaryRow target) {
if (this.current < this.size) {
// ... (处理跨索引段) ...
// 从 currentIndexSegment 读取指向实际数据的指针
long pointer = this.currentIndexSegment.getLong(this.currentOffset);
this.currentOffset += indexEntrySize;
try {
// 根据指针从 recordBufferSegments (通过 recordBuffer 视图) 反序列化数据
return getRecordFromBuffer(target, pointer);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
} else {
return null;
}
}
// ...
};
}
private BinaryRow getRecordFromBuffer(BinaryRow reuse, long pointer) throws IOException {
// recordBuffer 是一个 RandomAccessInputView,包装了 recordBufferSegments
this.recordBuffer.setReadPosition(pointer); // 定位到数据
// serializer 是 BinaryRowSerializer,用于从二进制数据反序列化为 BinaryRow
return this.serializer.mapFromPages(reuse, this.recordBuffer);
}
// ...
RollingFileWriter
RollingFileWriter
是 Paimon 中一个通用的文件写入工具,它的核心功能是在写入数据时,当当前文件的大小达到预设的目标大小(targetFileSize
)时,自动关闭当前文件并创建一个新的文件继续写入,这个过程称为 "rolling" 或 "滚动"。这有助于将大量数据切分成大小可管理的小文件,便于后续的处理和管理。
RollingFileWriter
的设计是通用的,它只关心两件事:
- 如何创建单个文件的写入器 (
SingleFileWriter<T, R>
)。这是通过构造函数传入的writerFactory
(一个Supplier
) 来实现的。 - 单个文件的目标大小 (
targetFileSize
)。
层级信息是在创建 writerFactory
的地方起作用的,而不是在 RollingFileWriter
内部。
让我们详细分析一下 RollingFileWriter
的主要功能和工作流程:
RollingFileWriter
的主要功能
- 动态文件创建: 当开始写入或上一个文件写满后,通过
writerFactory.get()
创建一个新的SingleFileWriter
实例来写入新文件。 - 文件大小控制: 在每写入一条或一批记录后,会检查当前
SingleFileWriter
写入的数据量是否接近或超过targetFileSize
。 - 文件滚动: 如果当前文件大小达到阈值,则会:
- 关闭当前的
SingleFileWriter
。 - 将已关闭的
SingleFileWriter
的结果(通常是DataFileMeta
,描述了生成的文件元数据)收集到results
列表中。 - 将已关闭的
SingleFileWriter
的AbortExecutor
存储起来,以便在发生错误时可以中止所有已生成的文件。 - 将
currentWriter
置为null
,以便下次写入时创建新的文件写入器。
- 关闭当前的
- 结果收集: 最终关闭
RollingFileWriter
时,会返回一个包含所有成功写入并关闭的文件元数据列表 (List<R>
)。 - 错误处理与中止: 如果在写入过程中发生异常,会调用
abort()
方法,该方法会尝试中止当前正在写入的文件以及所有之前已经滚动关闭的文件。
我们来看一下关键代码段:
构造函数:
RollingFileWriter.java
// ...
public class RollingFileWriter<T, R> implements FileWriter<T, List<R>> {
// ...
private final Supplier<? extends SingleFileWriter<T, R>> writerFactory; // 单个文件写入器的工厂
private final long targetFileSize; // 目标文件大小
private final List<AbortExecutor> closedWriters; // 已关闭写入器的中止执行器
private final List<R> results; // 收集所有成功写入文件的元数据
private SingleFileWriter<T, R> currentWriter = null; // 当前正在写入的文件写入器
private long recordCount = 0; // 已写入的总记录数
// ...
public RollingFileWriter(
Supplier<? extends SingleFileWriter<T, R>> writerFactory, long targetFileSize) {
this.writerFactory = writerFactory;
this.targetFileSize = targetFileSize;
this.results = new ArrayList<>();
this.closedWriters = new ArrayList<>();
}
// ...
}
writerFactory
: 这是关键,它是一个函数式接口,提供了创建具体SingleFileWriter
实例的逻辑。层级信息、文件路径、文件格式等特定于单个文件的配置,都是在这个writerFactory
的实现中处理的。targetFileSize
: 控制单个文件的最大大小。
写入逻辑 (write
方法):
RollingFileWriter.java
// ...
@Override
public void write(T row) throws IOException {
try {
// 如果是第一次写入,或者上一个文件已经滚动,则打开新的写入器
if (currentWriter == null) {
openCurrentWriter();
}
currentWriter.write(row); // 写入数据到当前文件
recordCount += 1;
// 检查是否需要滚动文件
if (rollingFile(false)) { // false 表示非强制检查,会根据 CHECK_ROLLING_RECORD_CNT 决定是否真的检查大小
closeCurrentWriter(); // 如果达到大小,关闭当前文件
}
} catch (Throwable e) {
// ... 错误处理 ...
abort();
throw e;
}
}
private void openCurrentWriter() {
currentWriter = writerFactory.get(); // 通过工厂创建新的 SingleFileWriter
}
private void closeCurrentWriter() throws IOException {
if (currentWriter == null) {
return;
}
currentWriter.close(); // 关闭文件写入器
// 存储中止执行器和结果
closedWriters.add(currentWriter.abortExecutor());
results.add(currentWriter.result());
currentWriter = null; // 置空,以便下次 write 时重新 open
}
private boolean rollingFile(boolean forceCheck) throws IOException {
// 调用 SingleFileWriter 的 reachTargetSize 方法判断是否达到目标大小
// forceCheck 为 true,或者写入记录数达到 CHECK_ROLLING_RECORD_CNT 的倍数时,才真正检查文件大小
return currentWriter.reachTargetSize(
forceCheck || recordCount % CHECK_ROLLING_RECORD_CNT == 0, targetFileSize);
}
// ...
openCurrentWriter()
: 当需要新的文件时,调用writerFactory.get()
。rollingFile()
: 决定是否需要关闭当前文件并开始新文件。它会调用currentWriter.reachTargetSize()
。为了避免频繁检查文件大小(这可能是一个昂贵的操作),它引入了CHECK_ROLLING_RECORD_CNT
,即每写入一定数量的记录后才进行一次实际的大小检查,除非forceCheck
为true
(例如在writeBundle
时)。closeCurrentWriter()
: 关闭当前文件,并保存其元数据和中止逻辑。
writeBundle
方法: 这个方法用于一次性写入一批记录 (BundleRecords
),逻辑与 write
类似,但它在检查是否滚动时会设置 forceCheck
为 true
,因为写入一批数据后文件大小变化可能较大。
RollingFileWriter.java
// ...
public void writeBundle(BundleRecords bundle) throws IOException {
try {
if (currentWriter == null) {
openCurrentWriter();
}
currentWriter.writeBundle(bundle);
recordCount += bundle.rowCount();
if (rollingFile(true)) { // 注意这里 forceCheck 为 true
closeCurrentWriter();
}
} catch (Throwable e) {
// ... 错误处理 ...
abort();
throw e;
}
}
// ...
正如前面提到的,层级信息(level)是在 RollingFileWriter
的使用者(通常是创建 writerFactory
的地方)处理的。一个典型的例子是 KeyValueFileWriterFactory
:
KeyValueFileWriterFactory.java
Apply
// ...
public RollingFileWriter<KeyValue, DataFileMeta> createRollingMergeTreeFileWriter(
int level, FileSource fileSource) { // 注意这里的 level 参数
WriteFormatKey key = new WriteFormatKey(level, false); // level 被用来创建 WriteFormatKey
return new RollingFileWriter<>(
() -> { // 这个 lambda 表达式就是 writerFactory
// 在这个工厂方法内部,level (通过 key 间接) 被用来决定文件路径等
DataFilePathFactory pathFactory = formatContext.pathFactory(key);
return createDataFileWriter( // 创建具体的 SingleFileWriter (如 RowDataFileWriter)
pathFactory.newPath(), key, fileSource, pathFactory.isExternalPath());
},
suggestedFileSize); // 目标文件大小
}
public RollingFileWriter<KeyValue, DataFileMeta> createRollingChangelogFileWriter(int level) {
WriteFormatKey key = new WriteFormatKey(level, true); // level 同样在这里使用
return new RollingFileWriter<>(
() -> {
DataFilePathFactory pathFactory = formatContext.pathFactory(key);
return createDataFileWriter(
pathFactory.newChangelogPath(), // Changelog 文件路径也可能受 level 影响
key,
FileSource.APPEND,
pathFactory.isExternalPath());
},
suggestedFileSize);
}
// ...
在 KeyValueFileWriterFactory
的 createRollingMergeTreeFileWriter
和 createRollingChangelogFileWriter
方法中:
- 传入的
level
参数被用来创建一个WriteFormatKey
对象。 - 这个
WriteFormatKey
随后被用于从formatContext
获取一个DataFilePathFactory
。 DataFilePathFactory
负责根据层级、分区、桶等信息生成具体的文件路径 (例如,L0 层的文件可能在level-0
目录下,L1 层在level-1
目录下)。- 最终,这个生成的路径和
WriteFormatKey
(包含了层级信息) 会被传递给更底层的createDataFileWriter
方法,用于创建实际的SingleFileWriter
(例如RowDataFileWriter
)。
因此,RollingFileWriter
本身是一个通用的滚动写入框架,它将如何创建具体文件写入器的细节委托给了 writerFactory
。而层级信息正是通过这个 writerFactory
的实现逻辑传递到实际的文件创建过程中的。
RollingFileWriter
的核心作用是:
- 管理文件的生命周期:按需创建、写入、关闭文件。
- 控制文件大小:确保生成的文件不会过大,通过滚动到新文件来实现。
- 提供通用性:通过
Supplier<? extends SingleFileWriter<T, R>> writerFactory
使得它可以适用于不同类型的文件和写入逻辑。
层级信息虽然不由 RollingFileWriter
直接处理,但它是通过其构造时传入的 writerFactory
的具体实现来影响最终写入的文件的路径、命名约定或其他与层级相关的属性的。这种设计使得 RollingFileWriter
保持了通用性和灵活性。
层级
层级 (level) 是 Paimon LSM-Tree 架构的核心概念,它通过以下方式影响数据落盘和组织:
- 文件写入: 新写入的数据(通过 MergeTreeWriter 的 writeBuffer 刷写)总是作为 L0 文件生成。KeyValueFileWriterFactory 使用 level=0 来确定这些文件的路径和名称。
- Compaction: CompactManager 基于 Levels 对象中各层级的文件状态来触发和执行 Compaction。Compaction 将低层级(如 L0)的文件与高层级的文件合并,生成新的、更高层级的文件。这些新文件的 DataFileMeta 会记录其新的层级。
- 数据结构: Levels 类维护了不同层级的数据文件。L0 文件被特殊对待(每个文件是一个 Sorted Run),而 L1 及以上层级通常每个层级构成一个大的 Sorted Run。
- 查询: 查询时,层级信息用于优化数据读取,例如优先查找高层级或根据键范围跳过不必要的层级/文件。
因此,层级信息从文件创建(通过 KeyValueFileWriterFactory 传递给 RollingFileWriter)开始,贯穿到 Compaction 过程(由 CompactManager 和 Levels 管理),最终决定了数据文件在存储上的物理布局和逻辑组织。
DataFileMeta
DataFileMeta
类本身是一个 Java 对象,它存储了数据文件(例如 Parquet、ORC 文件)的元数据信息。这个 DataFileMeta
对象本身通常不会以其 Java 对象的形式直接序列化到磁盘上。
相反,DataFileMeta
中的信息会被用来构建清单文件 (Manifest File) 的条目。清单文件是 Paimon 用来跟踪表中所有数据文件及其状态的关键组件。清单文件本身是会被写入磁盘的,通常是以 Avro 格式存储。
所以,可以理解为 DataFileMeta
的内容会被持久化到磁盘,但不是 DataFileMeta
这个 Java 对象本身。
通过查看DataFileMeta.java
源码,可以总结出它主要包含以下信息:
fileName
(String): 数据文件的名称。这是定位文件的关键。fileSize
(long): 数据文件的大小(字节数)。rowCount
(long): 文件中包含的总记录行数(包括增加和删除的记录)。minKey
(BinaryRow): 文件中所有记录的最小主键(或排序键)。maxKey
(BinaryRow): 文件中所有记录的最大主键(或排序键)。keyStats
(SimpleStats): 关于主键列的统计信息(例如 null 计数、最小值、最大值)。valueStats
(SimpleStats): 关于值列的统计信息。minSequenceNumber
(long): 文件中记录的最小序列号。序列号用于 MVCC (多版本并发控制) 和确定记录的新旧。maxSequenceNumber
(long): 文件中记录的最大序列号。schemaId
(long): 该数据文件关联的表模式 (schema) 的 ID。level
(int): 文件所在的 LSM 树的层级。L0 是最新写入的层级。extraFiles
(List<String>): 额外关联的文件列表,例如用于某些索引类型的文件。creationTime
(Timestamp): 文件的创建时间。deleteRowCount
(Long, nullable): 文件中标记为删除的记录行数。embeddedIndex
(byte[], nullable): 内嵌的文件索引(例如 Bloom Filter 的字节数组),如果索引较小,可以直接存储在元数据中。fileSource
(FileSource, nullable): 文件的来源,例如是追加写入 (APPEND
) 还是 Compaction 的输出 (COMPACTION
)。valueStatsCols
(List<String>, nullable): 明确指定了哪些值列收集了统计信息。externalPath
(String, nullable): 如果文件存储在外部位置(例如,不由 Paimon 直接管理的文件路径),则记录其外部路径。
DataFileMeta
如何被使用并间接写入磁盘
- 写入数据文件: 当
MergeTreeWriter
(或其他写入器) 将内存中的数据刷写到磁盘时,会生成一个或多个实际的数据文件(如 Parquet 文件)。 - 生成
DataFileMeta
: 每生成一个数据文件,就会创建一个对应的DataFileMeta
对象来描述这个文件。例如,在KeyValueDataFileWriter
的result()
方法中,会收集统计信息并构建DataFileMeta
。KeyValueDataFileWriter.java
// ... public DataFileMeta result() throws IOException { if (recordCount() == 0) { return null; } long fileSize = outputBytes; Pair<SimpleColStats[], SimpleColStats[]> keyValueStats = fetchKeyValueStats(fieldStats(fileSize)); SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyValueStats.getKey()); Pair<List<String>, SimpleStats> valueStatsPair = valueStatsConverter.toBinary(keyValueStats.getValue()); DataFileIndexWriter.FileIndexResult indexResult = dataFileIndexWriter == null ? DataFileIndexWriter.EMPTY_RESULT : dataFileIndexWriter.result(fileSize); return new DataFileMeta( // 创建 DataFileMeta 对象 path.getName(), fileSize, recordCount(), minKey == null ? DataFileMeta.EMPTY_MIN_KEY : serializer.toBinaryRow(minKey), maxKey == null ? DataFileMeta.EMPTY_MAX_KEY : serializer.toBinaryRow(maxKey), keyStats, valueStatsPair.getRight(), minSequenceNumber, maxSequenceNumber, schemaId, level, indexResult.extraFiles(), indexResult.deleteRowCount(), indexResult.embeddedIndex(), fileSource, valueStatsPair.getLeft(), null); } // ...
- 提交 (Commit):
MergeTreeWriter
在prepareCommit
方法中收集新生成的DataFileMeta
(L0 文件) 和 Compaction 产生的DataFileMeta
。- 这些
DataFileMeta
对象会包含在CommitIncrement
中。 - 当事务提交时,
CommitIncrement
中的这些DataFileMeta
信息会被写入到新的清单列表 (Manifest List) 指向的清单文件 (Manifest File) 中。
- 清单文件 (Manifest File):
- 清单文件会记录一批
DataFileMeta
信息。每个条目对应一个数据文件,包含了DataFileMeta
中的核心字段。 - Paimon 定义了
DataFileMeta.SCHEMA
,这是一个RowType
,描述了DataFileMeta
信息在清单文件中如何存储。DataFileMeta.java
// ... public static final RowType SCHEMA = new RowType( false, Arrays.asList( new DataField(0, "_FILE_NAME", newStringType(false)), new DataField(1, "_FILE_SIZE", new BigIntType(false)), new DataField(2, "_ROW_COUNT", new BigIntType(false)), new DataField(3, "_MIN_KEY", newBytesType(false)), new DataField(4, "_MAX_KEY", newBytesType(false)), new DataField(5, "_KEY_STATS", SimpleStats.SCHEMA), new DataField(6, "_VALUE_STATS", SimpleStats.SCHEMA), new DataField(7, "_MIN_SEQUENCE_NUMBER", new BigIntType(false)), new DataField(8, "_MAX_SEQUENCE_NUMBER", new BigIntType(false)), new DataField(9, "_SCHEMA_ID", new BigIntType(false)), new DataField(10, "_LEVEL", new IntType(false)), new DataField( 11, "_EXTRA_FILES", new ArrayType(false, newStringType(false))), new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS()), new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true)), new DataField(14, "_EMBEDDED_FILE_INDEX", newBytesType(true)), new DataField(15, "_FILE_SOURCE", new TinyIntType(true)), new DataField( 16, "_VALUE_STATS_COLS", DataTypes.ARRAY(DataTypes.STRING().notNull())), new DataField(17, "_EXTERNAL_PATH", newStringType(true)))); // ...
- 当 Paimon 读取表的状态时,它会解析清单文件,并根据这些持久化的信息在内存中重建
DataFileMeta
对象,用于后续的查询和 Compaction 计划。
- 清单文件会记录一批
DataFileMeta
是一个非常重要的内存中对象,它封装了关于单个数据文件的所有元数据。它本身不直接写入磁盘,但其包含的核心信息会根据 DataFileMeta.SCHEMA
的定义被序列化并存储到清单文件 (Manifest File) 中。这些清单文件是 Paimon 表状态持久化的关键部分,使得 Paimon 能够跟踪和管理表中的所有数据文件。
ManifestEntry 使用和包装了 DataFileMeta,并在其基础上增加了操作类型 (kind) 和归属信息(分区、桶、层级)。
总结来说,DataFileMeta 是对数据文件属性的详细描述,而 ManifestEntry 是对这些数据文件在表生命周期中发生的具体变更(如添加、删除)的记录,它通过包含 DataFileMeta 来指明变更作用于哪个文件。两者在 Paimon 的元数据体系中各司其职,共同维护了表的完整状态。
总结数据流和组织
- 写入时:
TableWriteImpl
->KeyValueFileStoreWrite
(继承AbstractFileStoreWrite
) ->MergeTreeWriter.write()
-> 写入MergeTreeWriter
的内存WriteBuffer。 - 刷盘/提交时:
AbstractFileStoreWrite.prepareCommit()
触发MergeTreeWriter.prepareCommit()
。MergeTreeWriter
将内存中的 WriteBuffer 刷到磁盘形成 L0 Sorted Run 文件。MergeTreeWriter
可能执行合并,将 L0 和其他层级的文件合并成新的更高层级文件。MergeTreeWriter
返回包含新文件和合并文件信息的CommitIncrement
。AbstractFileStoreWrite
将其包装成CommitMessage
。FileStoreCommit
使用CommitMessage
更新磁盘上的 Manifest 和 Snapshot 文件。
通过这种方式,AbstractFileStoreWrite
提供了一个通用的写入和提交框架,而 KeyValueFileStoreWrite
通过实现 createWriter
来插入 MergeTreeWriter
,从而实现了对有主键表的特定写入和合并逻辑,并有效地管理内存和磁盘上的数据。
事务一致性
我们来探讨 Paimon 中事务处理和数据一致性保证的机制,以及 Flink 写入 Paimon 时相关的源码实现。
Paimon 通过快照 (Snapshot) 机制来保证数据的一致性和原子性。每一次成功的写入操作(Commit)都会生成一个新的快照。这个快照代表了表在某个特定时间点的完整、一致的状态。如果写入过程中发生任何故障,未完成的写入操作不会产生新的快照,因此不会影响到之前已提交的数据的可见性。这类似于数据库中的原子提交。
Flink 写入 Paimon 的事务处理流程
当使用 Flink 向 Paimon 表写入数据时,其事务处理和一致性保证主要依赖于 Flink 的 Checkpoint 机制和 Paimon 的 Commit 流程。
数据写入 (Write):
- Flink 的 Source 产生数据,经过一系列转换后,由 Paimon 的 Sink Operator (Writer) 接收。
- Writer Operator 会将接收到的数据写入到 Paimon 的数据文件中。这些新写入的数据文件在 Commit 完成之前是对外部查询不可见的。
- 这些数据文件可能是内存缓冲刷盘产生的,也可能是直接写入的。
Flink Checkpoint 触发:
- 当 Flink 的 Checkpoint Barrier 到达 Paimon Sink Operator 时,表示一个 Checkpoint 周期开始。
- Sink Operator (Writer) 会完成当前正在写入的数据文件的刷盘操作,并生成一批
Committable
对象。 Committable
对象包含了这次 Checkpoint 期间新写入的数据文件(DataFileMeta
)和需要删除的旧数据文件(通常是 Compaction 的结果)等信息。- 这些
Committable
对象会被发送给下游的 Committer Operator。
提交 (Commit):
- Committer Operator 收集来自所有 Writer 并行实例的
Committable
对象。 - 当 Flink JobManager 通知 Committer Operator 当前 Checkpoint 已经成功完成时 (即所有 Operator 都已确认 Checkpoint),Committer Operator 才会执行真正的 Paimon Commit 操作。
- Paimon Commit 操作包括:
- 生成新的 Manifest 文件,记录
Committable
中的文件变更(新增和删除的数据文件)。 - 生成新的 Manifest List 文件,指向新的 Manifest 文件。
- 生成一个新的 Snapshot 文件,指向新的 Manifest List 文件。这个 Snapshot 文件一旦成功写入,就代表本次事务提交成功,数据对外部可见。
- 生成新的 Manifest 文件,记录
- 如果 Checkpoint 失败,或者 Committer 在提交过程中失败,那么这个 Snapshot 就不会生成或不会被认为是有效的,之前写入的数据文件虽然可能已经存在于文件系统,但由于没有被任何有效的 Snapshot 引用,它们对查询是不可见的,后续会被清理。
- Committer Operator 收集来自所有 Writer 并行实例的
源码解析 Flink 写入 Paimon 的实现
我们主要关注 paimon-flink/paimon-flink-common
模块下的 FlinkSink.java
类,它是 Flink DataStream API 写入 Paimon 的核心逻辑。
FlinkSink.java
// ... existing code ...
public DataStreamSink<?> sinkFrom(DataStream<T> input) {
// This commitUser is valid only for new jobs.
// After the job starts, this commitUser will be recorded into the states of write and
// commit operators.
// When the job restarts, commitUser will be recovered from states and this value is
// ignored.
return sinkFrom(input, createCommitUser(table.coreOptions().toConfiguration()));
}
public DataStreamSink<?> sinkFrom(DataStream<T> input, String initialCommitUser) {
// do the actually writing action, no snapshot generated in this stage
DataStream<Committable> written = doWrite(input, initialCommitUser, null);
// commit the committable to generate a new snapshot
return doCommit(written, initialCommitUser);
}
// ... existing code ...
sinkFrom
方法主要做了三件事:
- 这个类负责构建 Flink DataStream 的 Sink Pipeline,将输入数据流写入 Paimon 表。
- 核心方法是
sinkFrom(DataStream<T> input)
,它将输入的DataStream
转换为DataStreamSink
。 doWrite()
: 处理实际的数据写入,并生成Committable
。doCommit()
: 执行最终的提交操作。
doWrite()
方法: FlinkSink.java
- 这个方法创建并配置了 Writer Operator。
- Writer Operator 负责接收上游数据,并使用 Paimon 的
TableWrite
将数据写入到数据文件中。 - 当 Flink Checkpoint 发生时,Writer Operator 会将写入的文件信息封装成
Committable
对象发送给下游。 createWriteOperatorFactory()
: 这个抽象方法由子类(如RowDataStoreSink
或AppendOnlyTableStoreSink
)实现,返回具体的 Writer Operator 工厂。createWriteProvider()
: 负责创建StoreSinkWrite.Provider
,它会提供实际的TableWrite
实例给 Writer Operator。TableWrite
是 Paimon Core 中负责数据写入的核心接口。
compact
(可选的预提交合并):
如果用户配置了 precommit-compact=true
(通过 FlinkConnectorOptions.PRECOMMIT_COMPACT
),这个方法会添加额外的 Flink Operator (Coordinator 和 Worker) 来对 Writer 生成的小文件进行合并,然后再将合并后的 Committable
传递给 Committer。
这有助于减少小文件问题,尤其是在流式写入且 Checkpoint 间隔较短的场景。
FlinkSink.java doWite内部
if (!table.primaryKeys().isEmpty() && options.get(PRECOMMIT_COMPACT)) {
SingleOutputStreamOperator<Committable> newWritten =
written.transform(
"Changelog Compact Coordinator",
new EitherTypeInfo<>(
new CommittableTypeInfo(), new ChangelogTaskTypeInfo()),
new ChangelogCompactCoordinateOperator(table.coreOptions()))
.forceNonParallel()
.transform(
"Changelog Compact Worker",
new CommittableTypeInfo(),
new ChangelogCompactWorkerOperator(table));
forwardParallelism(newWritten, written);
written = newWritten;
}
doCommit()
方法:
FlinkSink.java
// ... existing code ...
public DataStreamSink<?> doCommit(DataStream<Committable> written, String commitUser) {
StreamExecutionEnvironment env = written.getExecutionEnvironment();
ReadableConfig conf = env.getConfiguration();
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
boolean streamingCheckpointEnabled =
isStreaming(written) && checkpointConfig.isCheckpointingEnabled(); // 检查是否为流模式且开启了 Checkpoint
if (streamingCheckpointEnabled) {
assertStreamingConfiguration(env);
}
Options options = Options.fromMap(table.options());
OneInputStreamOperatorFactory<Committable, Committable> committerOperator =
new CommitterOperatorFactory<>( // 创建 Committer Operator 的工厂
streamingCheckpointEnabled,
true, // global commit
commitUser,
createCommitterFactory(), // 创建 Paimon TableCommit 的工厂
createCommittableStateManager(), // 管理 Committable 状态
options.get(END_INPUT_WATERMARK));
String uidSuffix = options.get(SINK_OPERATOR_UID_SUFFIX);
String committerName =
(table.coreOptions().writeOnly()
? GLOBAL_COMMITTER_WRITE_ONLY_NAME
: GLOBAL_COMMITTER_NAME)
+ " : "
+ table.name();
SingleOutputStreamOperator<Committable> committed =
written.transform(committerName, new CommittableTypeInfo(), committerOperator)
.setParallelism(1) // Committer 通常是单并行度
.forceNonParallel(); // 强制非并行
if (uidSuffix != null) {
committed =
committed.uid(
generateCustomUid(GLOBAL_COMMITTER_NAME, table.name(), uidSuffix));
}
return committed.addSink(new DiscardingSink<>()).name("Paimon Sink").setParallelism(1);
}
// ... existing code ...
- 这个方法创建并配置了 Committer Operator。
- Committer Operator 接收来自上游 (Writer 或 Compactor) 的
Committable
对象。 - 它会等待 Flink JobManager 的 Checkpoint 完成通知。
- 收到通知后,它会调用 Paimon 的
TableCommit
来执行实际的提交操作,即创建 Manifest 和 Snapshot 文件。 CommitterOperatorFactory
: 封装了CommitterOperator
的创建逻辑。createCommitterFactory()
: 这个抽象方法由子类实现,返回Committer.Factory
,它会提供 Paimon Core 中的TableCommit
实例。TableCommit
负责执行生成 Snapshot 的原子操作。CommitterOperator
内部会监听 Flink 的notifyCheckpointComplete()
回调。只有当 Checkpoint 成功后,才会调用TableCommit#commit()
方法。
SingleFileWriter.java
- 是一个通用的、用于将记录写入单个文件的工具类。
- Paimon 的 Writer (例如
ManifestFile.ManifestEntryWriter
或数据文件的 Writer) 可能会直接或间接使用SingleFileWriter
或类似逻辑来处理具体的文件写操作。 - 构造函数:
SingleFileWriter.java
这里,// ... existing code ... public SingleFileWriter( FileIO fileIO, FormatWriterFactory factory, // 如 AvroWriterFactory, ParquetWriterFactory Path path, // 目标文件路径 Function<T, InternalRow> converter, // 将输入记录 T 转换为 InternalRow String compression, boolean asyncWrite) { this.fileIO = fileIO; this.path = path; this.converter = converter; try { out = fileIO.newOutputStream(path, false); // 创建文件输出流 if (asyncWrite) { out = new AsyncPositionOutputStream(out); // 可选的异步输出流包装 } writer = factory.create(out, compression); // 通过工厂创建具体的 FormatWriter } catch (IOException e) { LOG.warn( "Failed to open the bulk writer, closing the output stream and throw the error.", e); if (out != null) { abort(); } throw new UncheckedIOException(e); } this.recordCount = 0; this.closed = false; } // ... existing code ...
FormatWriterFactory
(例如AvroWriterFactory
或ParquetWriterFactory
) 会根据文件类型创建相应的FormatWriter
。FormatWriter
负责将InternalRow
对象按照特定格式(Avro, Parquet 等)写入输出流。 writeImpl(T record)
:SingleFileWriter.java
这个方法首先使用// ... existing code ... protected InternalRow writeImpl(T record) throws IOException { if (closed) { throw new RuntimeException("Writer has already closed!"); } try { InternalRow rowData = converter.apply(record); // 1. 将用户记录转换为 InternalRow writer.addElement(rowData); // 2. FormatWriter 将 InternalRow 写入缓冲区或文件 recordCount++; return rowData; } catch (Throwable e) { LOG.warn("Exception occurs when writing file " + path + ". Cleaning up.", e); abort(); // 写入失败则中止,并删除已写入的部分文件 throw e; } } // ... existing code ...
converter
将输入的泛型记录T
转换为 Paimon 内部能处理的InternalRow
,然后调用writer.addElement(rowData)
将其写入。close()
:SingleFileWriter.java
当文件写入完成时调用此方法,它会确保所有数据被刷到磁盘,并关闭底层的// ... existing code ... @Override public void close() throws IOException { if (closed) { return; } if (LOG.isDebugEnabled()) { LOG.debug("Closing file {}", path); } try { if (writer != null) { writer.close(); // 关闭 FormatWriter,确保所有缓冲数据刷盘 writer = null; } if (out != null) { out.flush(); // 确保输出流数据刷盘 outputBytes = out.getPos(); // 获取文件大小 out.close(); // 关闭文件输出流 out = null; } } catch (IOException e) { LOG.warn("Exception occurs when closing file {}. Cleaning up.", path, e); abort(); // 关闭失败也尝试中止并删除文件 throw e; } finally { closed = true; } } // ... existing code ...
FormatWriter
和文件输出流。abort()
:SingleFileWriter.java
如果在写入或关闭过程中发生错误,会调用此方法来清理资源并删除可能已部分写入的文件,防止产生脏数据。// ... existing code ... @Override public void abort() { if (writer != null) { IOUtils.closeQuietly(writer); writer = null; } if (out != null) { IOUtils.closeQuietly(out); out = null; } fileIO.deleteQuietly(path); // 删除部分写入的文件 } // ... existing code ...
通过这种两阶段提交的方式(Writer 生成 Committable,Committer 在 Checkpoint 成功后执行 Paimon Commit),Paimon 结合 Flink 的 Checkpoint 机制,能够有效地保证端到端的数据一致性。即使在发生故障时,也能确保只有完整提交的数据才对用户可见。
CommitterOperator
的实现
paimon/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
CommitterOperator 与 Paimon 表数据的交互:
- CommitterOperator 不直接读写 Paimon 的数据文件 (例如 Parquet, ORC 等格式的文件)。这些数据文件的物理写入是由上游的 Paimon 写入算子在数据处理阶段完成的。
- CommitterOperator 的核心工作是处理和提交元数据。它接收描述数据文件变更的 CommitMessage,然后通过 Paimon Core 的 Committer 将这些变更信息持久化到 Paimon 表的元数据中,这主要涉及到更新 Manifest 文件列表和创建新的 Snapshot 文件。
- Paimon 表数据提交的原子性是由 Paimon Core 层的快照机制(例如 RenamingSnapshotCommit 策略)来保证的。该机制依赖于文件系统的原子操作(如重命名)以及可能的外部锁机制,确保每个快照的完整性和一致性。
- 因此,CommitterOperator 只是在 Flink 分布式快照和提交流程的协调下,在正确的时机(即 Flink Checkpoint 成功后)调用 Paimon Core 提供的原子提交流程,确保端到端的数据一致性。
CommitterOperator.java
中的关键组件和流程:
CommitT
和GlobalCommitT
:CommitT
: 通常代表一个Committable
对象,它包含了由上游 Writer 算子写入的数据文件的元数据信息(比如哪些文件是新增的,哪些是为更新而产生的旧文件等)。GlobalCommitT
: 代表了一个 Checkpoint 周期内所有并行 Writer 实例产生的CommitT
的聚合结果。例如,ManifestCommittable
就是一种GlobalCommitT
,它包含了要写入到 Manifest 文件中的所有变更。
inputs
(类型Deque<CommitT>
):- 这是一个双端队列,用于临时存储从上游
TableWriteOperator
发送过来的CommitT
对象。 processElement(StreamRecord<CommitT> element)
方法会将接收到的element.getValue()
(即CommitT
) 添加到这个队列中。
- 这是一个双端队列,用于临时存储从上游
committerFactory
(类型Committer.Factory<CommitT, GlobalCommitT>
):- 这是一个工厂类,负责创建 Paimon 核心库中的
Committer
对象。 - 在
initializeState
方法中,通过committerFactory.create(...)
创建了this.committer
实例。
- 这是一个工厂类,负责创建 Paimon 核心库中的
committer
(类型Committer<CommitT, GlobalCommitT>
):- 这是与 Paimon 表进行交互并执行提交的核心对象。 它不是 Flink 的算子,而是 Paimon Core API 的一部分。
- 它封装了将
Committable
聚合成GlobalCommitT
,并将GlobalCommitT
提交到 Paimon 表的逻辑。
committablesPerCheckpoint
(类型NavigableMap<Long, GlobalCommitT>
):- 这是一个有序的 Map,用于存储按 Checkpoint ID 分组并聚合后的
GlobalCommitT
对象。Key 是 Checkpoint ID,Value 是对应的GlobalCommitT
。
- 这是一个有序的 Map,用于存储按 Checkpoint ID 分组并聚合后的
原子提交流程
接收和缓存
Committable
:CommitterOperator
通过processElement
方法接收上游传来的CommitT
对象,并将它们暂存到inputs
队列中。
轮询和聚合 (在
snapshotState
或endInput
时触发pollInputs
):pollInputs()
方法被调用时(例如,当 Flink 准备做 Checkpoint 时,会调用snapshotState
,进而调用pollInputs
):Map<Long, List<CommitT>> grouped = committer.groupByCheckpoint(inputs);
- 这里调用了 Paimon Core
committer
对象的groupByCheckpoint
方法。这个方法会遍历inputs
队列中的所有CommitT
,并根据每个CommitT
内部携带的 Checkpoint ID 信息将它们分组。
- 这里调用了 Paimon Core
- 然后遍历
grouped
Map:Long cp = entry.getKey();
获取 Checkpoint ID。List<CommitT> committables = entry.getValue();
获取该 Checkpoint 对应的CommitT
列表。committablesPerCheckpoint.put(cp, toCommittables(cp, committables));
toCommittables(cp, committables)
内部调用committer.combine(checkpoint, currentWatermark, inputs)
。committer.combine(...)
也是 Paimon Corecommitter
对象的方法。它负责将同一个 Checkpoint 的多个CommitT
(来自不同并行度的 Writer)聚合成一个单一的GlobalCommitT
对象。这个GlobalCommitT
包含了本次提交所需的所有文件变更的完整信息。- 聚合后的
GlobalCommitT
被存入committablesPerCheckpoint
中,等待后续的提交。
- 最后,
inputs.clear()
清空临时队列。
Flink Checkpoint 完成与触发 Paimon Commit:
- 当 Flink JobManager 通知某个 Checkpoint 成功完成时,会调用
CommitterOperator
的notifyCheckpointComplete(long checkpointId)
方法。 notifyCheckpointComplete
内部调用commitUpToCheckpoint(checkpointId)
。
- 当 Flink JobManager 通知某个 Checkpoint 成功完成时,会调用
执行 Paimon Commit (
commitUpToCheckpoint
方法):NavigableMap<Long, GlobalCommitT> headMap = committablesPerCheckpoint.headMap(checkpointId, true);
- 从
committablesPerCheckpoint
中获取所有小于等于当前已完成checkpointId
的待提交内容。
- 从
List<GlobalCommitT> committables = committables(headMap);
- 将这些
GlobalCommitT
放入一个列表。
- 将这些
- 关键步骤:
committer.commit(committables);
(或者在批处理结束时调用committer.filterAndCommit(...)
)CommitterOperator
(Flink) ->StoreCommitter
/StoreMultiCommitter
(Flink Connector 适配层 paimon-flink/paimon-flink-common) ->TableCommitImpl
(Paimon Core 业务逻辑层) ->FileStoreCommit
(Paimon Core 底层存储交互层) -> 原子性 Snapshot 创建。
headMap.clear();
提交成功后,从committablesPerCheckpoint
中移除已提交的条目。
TableCommitImpl
(paimon-core)
核心职责: 这是 Paimon 表级别提交操作的最终执行者。它封装了与底层文件存储(
FileStoreCommit
)交互的复杂逻辑,包括:- 管理
FileStoreCommit
:TableCommitImpl
持有一个FileStoreCommit
的实例。FileStoreCommit
更接近底层,负责实际的 Manifest 文件读写、Snapshot 文件生成、文件系统操作等。 - 处理
ManifestCommittable
: 接收由上层(如 Flink Committer Operator)传递过来的ManifestCommittable
对象。这个对象聚合了本次提交需要的所有文件变更信息(新增文件、删除文件等)。 - 执行提交: 调用
FileStoreCommit
的commit()
或overwrite()
方法,将ManifestCommittable
中的变更写入新的 Manifest 文件,并最终生成一个新的 Snapshot 文件,从而原子性地完成一次提交。 - 处理覆盖写 (Overwrite): 如果是 Overwrite 操作,它会协调
FileStoreCommit
执行相应的分区覆盖逻辑。 - 快照和分区过期 (Expiration): 管理快照的过期删除 (
expireSnapshots
) 和分区的过期 (partitionExpire
)。这些操作通常在提交成功后异步执行。 - 自动标签管理 (Tag Auto Management): 如果配置了自动创建 Tag,它会协调
TagAutoManager
在提交后创建 Tag。 - 消费者过期管理: 管理消费者的过期信息。
- 指标收集: 通过
CommitMetrics
收集提交相关的指标。 - 文件存在性检查: 在重试提交时(
filterAndCommitMultiple
),会检查待提交的ManifestCommittable
中引用的数据文件是否存在于文件系统中,防止因文件丢失导致恢复失败。
- 管理
关键方法:
commit(ManifestCommittable committable)
: 提交单个ManifestCommittable
。commitMultiple(List<ManifestCommittable> committables, boolean checkAppendFiles)
: 提交多个ManifestCommittable
。这是流式作业中,一个 Checkpoint 可能包含多个ManifestCommittable
(尽管在StoreCommitter
中通常是一个)时会用到的。filterAndCommitMultiple(List<ManifestCommittable> committables, boolean checkAppendFiles)
: 先过滤掉已经提交过的ManifestCommittable
,然后提交剩余的。这在作业恢复时非常重要,确保不会重复提交。expire(...)
: 触发快照、分区、消费者和标签的过期逻辑。
TableCommitImpl.java
// ... existing code ...
public class TableCommitImpl implements InnerTableCommit {
private static final Logger LOG = LoggerFactory.getLogger(TableCommitImpl.class);
private final FileStoreCommit commit; // 底层文件存储提交逻辑
@Nullable private final Runnable expireSnapshots; // 快照过期逻辑
@Nullable private final PartitionExpire partitionExpire; // 分区过期逻辑
@Nullable private final TagAutoManager tagAutoManager; // 自动打标逻辑
@Nullable private final Duration consumerExpireTime; // 消费者过期时间
private final ConsumerManager consumerManager; // 消费者管理器
private final ExecutorService expireMainExecutor; // 过期操作执行器
private final AtomicReference<Throwable> expireError; // 过期操作错误记录
private final String tableName; // 表名
@Nullable private Map<String, String> overwritePartition = null; // 覆盖写的分区
private boolean batchCommitted = false; // 是否为批处理提交(一次性)
private final boolean forceCreatingSnapshot; // 是否强制创建快照
public TableCommitImpl(
FileStoreCommit commit,
@Nullable Runnable expireSnapshots,
@Nullable PartitionExpire partitionExpire,
@Nullable TagAutoManager tagAutoManager,
@Nullable Duration consumerExpireTime,
ConsumerManager consumerManager,
ExpireExecutionMode expireExecutionMode,
String tableName,
boolean forceCreatingSnapshot) {
if (partitionExpire != null) {
commit.withPartitionExpire(partitionExpire);
}
this.commit = commit;
this.expireSnapshots = expireSnapshots;
this.partitionExpire = partitionExpire;
this.tagAutoManager = tagAutoManager;
this.consumerExpireTime = consumerExpireTime;
this.consumerManager = consumerManager;
this.expireMainExecutor =
expireExecutionMode == ExpireExecutionMode.SYNC
? MoreExecutors.newDirectExecutorService()
: Executors.newSingleThreadExecutor(
new ExecutorThreadFactory(
Thread.currentThread().getName() + "expire-main-thread"));
this.expireError = new AtomicReference<>(null);
this.tableName = tableName;
this.forceCreatingSnapshot = forceCreatingSnapshot;
}
// ... 其他方法如 commit, commitMultiple, filterAndCommitMultiple, expire 等 ...
public void commitMultiple(List<ManifestCommittable> committables, boolean checkAppendFiles) {
if (overwritePartition == null) {
for (ManifestCommittable committable : committables) {
// 调用 FileStoreCommit 执行实际的提交
commit.commit(committable, new HashMap<>(), checkAppendFiles);
}
if (!committables.isEmpty()) {
// 提交成功后执行过期操作
expire(committables.get(committables.size() - 1).identifier(), expireMainExecutor);
}
} else {
// 处理覆盖写逻辑
ManifestCommittable committable;
if (committables.size() > 1) {
throw new RuntimeException(
"Multiple committables appear in overwrite mode, this may be a bug, please report it: "
+ committables);
} else if (committables.size() == 1) {
committable = committables.get(0);
} else {
committable = new ManifestCommittable(Long.MAX_VALUE); // 空提交用于覆盖写
}
commit.overwrite(overwritePartition, committable, Collections.emptyMap());
expire(committable.identifier(), expireMainExecutor);
}
}
// ... existing code ...
}
FileStoreCommitImpl
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
的实现,探讨其难点、值得学习的设计以及 TableCommitImpl
如何利用其能力。
FileStoreCommitImpl
是 Paimon 中执行原子提交的核心底层实现,直接与文件系统和 Paimon 的元数据文件(Manifests, Snapshots)打交道。它的正确性和鲁棒性对整个 Paimon 系统至关重要。
FileStoreCommitImpl
的核心职责与流程
接收提交信息:
- 通过
commit(ManifestCommittable committable, Map<String, String> properties)
或overwrite(...)
方法接收待提交的变更。ManifestCommittable
对象封装了所有文件级别的变更(新增文件、删除文件、更新文件等)。
- 通过
过滤已提交内容 (幂等性保证):
filterCommitted(List<ManifestCommittable> committables)
: 在实际提交前,会根据commitUser
和SnapshotManager
中记录的最新快照信息,过滤掉那些已经被当前commitUser
提交过的ManifestCommittable
。这是保证提交操作幂等性的关键一步,防止因重试导致重复提交。
冲突检测 (并发控制):
- 在
commit
和overwrite
方法内部,会调用tryCommit
或tryOverwrite
。这些方法在尝试提交前,会进行冲突检测。 - 核心思想: 确保本次提交所依赖的文件状态(例如,要删除的文件确实存在,要修改的文件没有被其他并发提交修改)没有发生变化。
- 实现方式:
- 读取当前最新的快照 (
snapshotManager.latestSnapshot()
)。 - 读取受本次提交影响的分区中的现有文件列表 (
readAllEntriesFromChangedPartitions
或scan.plan().files()
)。 noConflictsOrFail(...)
: 比较当前文件列表和本次提交要做的变更,如果发现冲突(例如,要删除的文件已经不存在,或者新增的文件与现有文件有重叠但元数据不一致),则抛出异常,提交失败。
- 读取当前最新的快照 (
- 优化: 为了减少文件读取次数,代码中包含了一些优化逻辑。例如,在
commit
方法中,如果第一次APPEND
类型的提交成功,并且没有其他并发提交,那么后续的COMPACT
类型提交可以基于之前读取和检查过的文件列表进行冲突检测,避免重复读取。safeLatestSnapshotId
变量用于标记这种安全的、无需重新读取文件进行冲突检查的快照点。
- 在
执行提交 (
tryCommit
/tryOverwrite
):- 这是实际执行提交的核心逻辑,通常会包含多次尝试(重试机制)。
tryCommitOnce(...)
: 单次提交尝试的核心。- 生成 Manifest 文件:
- 将
ManifestCommittable
中的文件变更(deltaFiles
,changelogFiles
)写入新的 Manifest 文件。manifestFile.write(deltaEntries)
。 - 如果涉及到索引文件(如哈希索引、删除向量索引),也会写入新的 Index Manifest 文件。
indexManifestFile.write(indexFileMetas)
。
- 将
- 生成 Manifest List 文件:
- 将新生成的 Manifest 文件和之前快照中仍然活跃的 Manifest 文件列表写入一个新的 Manifest List 文件。
manifestList.write(newManifests)
。
- 将新生成的 Manifest 文件和之前快照中仍然活跃的 Manifest 文件列表写入一个新的 Manifest List 文件。
- 创建 Snapshot 文件 (原子性保证):
- 这是实现原子提交的关键步骤。
snapshotManager.commit(newSnapshot)
:- 首先,在文件系统上创建一个临时的 Snapshot 文件。
- 然后,通过原子性的
rename
操作将临时文件重命名为正式的 Snapshot 文件名 (例如snapshot-N
)。 - 如果提供了外部的
SnapshotCommit
(例如,与外部 Catalog 集成时),则会调用snapshotCommit.commit(newSnapshot)
来委托外部系统保证 Snapshot 的原子性创建。
- 一旦 Snapshot 文件成功创建并对外部可见,就代表本次提交成功。
- 清理过期的 Manifest 文件: 提交成功后,会清理不再被任何 Snapshot 引用的旧 Manifest 文件和 Manifest List 文件。
- 调用回调: 执行注册的
CommitCallback
。
- 生成 Manifest 文件:
重试机制:
tryCommit
和tryOverwrite
方法内部包含一个循环,用于在提交失败时进行重试,直到达到最大重试次数 (commitMaxRetries
) 或超时 (commitTimeout
)。- 失败的原因可能是冲突、文件系统操作异常等。重试时会重新读取最新的快照,并再次进行冲突检测和提交尝试。
处理不同类型的提交:
- APPEND: 追加数据,生成新的数据文件和对应的 Manifest 条目。
- COMPACT: 合并数据文件,会产生新的数据文件,并标记旧的数据文件为删除。
- OVERWRITE: 覆盖分区数据。它会先标记指定分区下的所有现有文件为删除,然后再追加新的数据文件。
- ANALYZE: 提交统计信息,生成一个只包含统计信息文件路径的 Snapshot。
Manifest 文件合并 (Compaction):
compactManifest()
: 为了防止 Manifest 文件过多导致元数据扫描变慢,Paimon 会定期合并 Manifest 文件。FileStoreCommitImpl
提供了这个方法,它会被TableCommitImpl
在提交后调用。ManifestFileMerger.merge(...)
负责实际的合并逻辑。
FileStoreCommitImpl
的难点与挑战
并发控制与冲突解决:
- 这是最核心的难点。多个写入作业或 Compaction 作业可能同时尝试修改同一个表。
FileStoreCommitImpl
必须能够正确检测到这些并发操作之间的冲突,并保证只有一个提交能够成功,避免数据不一致或丢失。 - 其冲突检测逻辑依赖于读取最新的快照和受影响分区的文件列表,这在分布式环境下本身就有一定的复杂性(需要保证读取到的是一致性的视图)。
- 乐观锁的思想被广泛应用:先假设没有冲突,尝试提交,如果失败(例如因为原子
rename
失败或冲突检测失败),则回滚并重试。
- 这是最核心的难点。多个写入作业或 Compaction 作业可能同时尝试修改同一个表。
原子性保证:
- 最终的原子性依赖于文件系统的原子
rename
操作或外部SnapshotCommit
的能力。如果这个原子操作失败,整个提交必须被视为失败,并且不能留下部分提交的中间状态。 - 在提交过程中,如果发生任何异常,必须确保不会产生一个“损坏”的快照或不一致的元数据。代码中大量的
try-catch-finally
和清理逻辑都是为了应对这种情况。
- 最终的原子性依赖于文件系统的原子
幂等性:
filterCommitted
方法是保证幂等性的关键。在分布式系统中,任务可能会重试,如果提交操作不是幂等的,重试会导致数据重复写入或元数据错误。
状态管理与清理:
- 在提交过程中会生成临时的 Manifest 文件和 Snapshot 文件。如果提交失败,这些临时文件需要被正确清理。
- 提交成功后,旧的、不再被任何快照引用的 Manifest 文件和数据文件也需要被清理(这部分逻辑主要由 Expire 相关的类负责,但
FileStoreCommitImpl
会触发或协调这些操作)。
代码复杂性与可维护性:
- 由于需要处理多种提交类型、并发控制、重试、错误恢复等复杂逻辑,
FileStoreCommitImpl
的代码量较大,逻辑分支较多,理解和维护都有一定难度。注释中也强调了修改此类需要非常小心,并进行充分测试。
- 由于需要处理多种提交类型、并发控制、重试、错误恢复等复杂逻辑,
tryCommitOnce
tryCommitOnce
方法。这个方法是单次提交尝试的核心逻辑,负责生成所有必要的元数据文件,并最终尝试原子性地创建一个新的快照 (Snapshot)。
tryCommitOnce
的核心步骤:
- 确定新快照ID: 基于上一个快照ID递增。
- 冲突检测: (可选,根据
ConflictCheck
策略) 确保本次提交的文件变更与当前存储状态不冲突。这是保证并发正确性的关键。如果冲突,会抛出异常。 - 加载基础信息: 从
latestSnapshot
获取总记录数、水位线、旧的 Manifest 文件列表、旧的索引 Manifest 等。 - 合并 Manifest 文件: 将旧的 Manifest 文件进行合并,生成新的
baseManifestList
。这是为了优化元数据大小和读取性能。 - 计算记录数: 根据
deltaFiles
计算增量记录数和新的总记录数。 - 处理增量 Manifest 和索引 Manifest:
- 如果是重试 (
retryResult != null
),尽可能复用上次尝试生成的 Manifest List 和索引 Manifest。 - 如果不是重试,则根据
deltaFiles
,changelogFiles
,indexFiles
生成新的 Manifest List 和索引 Manifest。
- 如果是重试 (
- 构建
Snapshot
对象: 将所有收集到的元数据(Manifest List 路径、记录数、水位线、提交信息等)组装成一个新的Snapshot
对象。 - 原子性提交
Snapshot
: 调用commitSnapshotImpl
,该方法会:- 将
Snapshot
对象序列化为 JSON 文件。 - 尝试通过原子性的文件系统操作(如
rename
)将临时 Snapshot 文件重命名为正式的 Snapshot 文件 (e.g.,snapshot-N
)。 - 更新相关的统计信息。
- 将
- 处理结果:
- 如果原子提交成功,执行回调并返回
SuccessResult
。 - 如果原子提交失败(通常是
rename
失败,暗示并发冲突),清理本次生成的临时文件,并返回RetryResult
,以便外部循环进行重试。RetryResult
会携带本次尝试的部分中间结果,用于优化下一次重试。 - 如果发生其他异常,清理临时文件并向上抛出异常。
- 如果原子提交成功,执行回调并返回
FileStoreCommitImpl.java
// ... existing code ...
@VisibleForTesting
CommitResult tryCommitOnce(
@Nullable RetryResult retryResult, // 如果是重试,则包含上次尝试的部分结果
List<ManifestEntry> deltaFiles, // 本次提交的数据文件变更 (增、删)
List<ManifestEntry> changelogFiles, // 本次提交的 Changelog 文件 (通常用于流式 CDC)
List<IndexManifestEntry> indexFiles, // 本次提交的索引文件变更
long identifier, // 提交标识符,用于幂等性
@Nullable Long watermark, // 本次提交的水位线
Map<Integer, Long> logOffsets, // 日志偏移量 (用于流式场景)
Snapshot.CommitKind commitKind, // 提交类型 (APPEND, OVERWRITE, COMPACT, ANALYZE)
@Nullable Snapshot latestSnapshot, // 当前最新的快照
ConflictCheck conflictCheck, // 冲突检查策略
@Nullable String newStatsFileName) { // 新的统计信息文件名 (如果是 ANALYZE 提交)
long startMillis = System.currentTimeMillis(); // 记录开始时间,用于超时判断
// 1. 计算新快照 ID
long newSnapshotId =
latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshot.id() + 1;
// 2. 调试日志,打印待提交的文件信息
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to commit table files to snapshot {}", newSnapshotId);
for (ManifestEntry entry : deltaFiles) {
LOG.debug(" * {}", entry);
}
LOG.debug("Ready to commit changelog to snapshot {}", newSnapshotId);
for (ManifestEntry entry : changelogFiles) {
LOG.debug(" * {}", entry);
}
}
// 3. 冲突检测 (如果需要)
// - conflictCheck.shouldCheck(latestSnapshot.id()) 判断是否需要基于当前 latestSnapshot 进行冲突检查。
// 如果外部的 tryCommit 循环中,latestSnapshot 没有变化,可能就不需要重复检查。
List<SimpleFileEntry> baseDataFiles = new ArrayList<>();
if (latestSnapshot != null && conflictCheck.shouldCheck(latestSnapshot.id())) {
try {
// 获取本次提交涉及变更的分区
List<BinaryRow> changedPartitions =
deltaFiles.stream()
.map(ManifestEntry::partition)
.distinct()
.collect(Collectors.toList());
// 如果是重试 (retryResult != null) 并且重试结果中包含了上一个快照信息
if (retryResult != null && retryResult.latestSnapshot != null) {
// 复用上次重试时已读取的基础数据文件列表
baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
// 读取从 retryResult.latestSnapshot 到当前 latestSnapshot 之间的增量变更
List<SimpleFileEntry> incremental =
readIncrementalChanges(
retryResult.latestSnapshot, latestSnapshot, changedPartitions);
if (!incremental.isEmpty()) {
baseDataFiles.addAll(incremental);
// 合并,确保文件列表的正确性
baseDataFiles = new ArrayList<>(FileEntry.mergeEntries(baseDataFiles));
}
} else {
// 非重试或重试但无可用基础数据,则从当前 latestSnapshot 中读取所有涉及变更分区的文件
baseDataFiles =
readAllEntriesFromChangedPartitions(latestSnapshot, changedPartitions);
}
// 执行冲突检查:比较 baseDataFiles (当前分区状态) 和 deltaFiles (本次提交的变更)
// 如果有冲突(如要删除的文件不存在,或新增的文件与现有文件冲突),则抛出异常
noConflictsOrFail(
latestSnapshot.commitUser(), // 上一个提交的用户,用于检测是否是自己引入的冲突
baseDataFiles,
SimpleFileEntry.from(deltaFiles)); // 将 ManifestEntry 转换为 SimpleFileEntry
} catch (Exception e) {
// 冲突检测失败或读取文件异常
if (retryResult != null) {
// 如果是重试,清理掉上次重试产生的临时文件
retryResult.cleanAll();
}
throw e; // 抛出异常,由外层 tryCommit 捕获并决定是否重试
}
}
// 4. 初始化用于构建新快照的元数据变量
Snapshot newSnapshot;
Pair<String, Long> baseManifestList = null; // 指向基础 Manifest List 文件及其大小
Pair<String, Long> deltaManifestList = null; // 指向增量 Manifest List 文件及其大小
List<PartitionEntry> deltaStatistics; // 分区级别的统计信息
Pair<String, Long> changelogManifestList = null; // 指向 Changelog Manifest List 文件及其大小
String oldIndexManifest = null; // 旧的索引 Manifest 文件名
String indexManifest = null; // 新的索引 Manifest 文件名
List<ManifestFileMeta> mergeBeforeManifests = new ArrayList<>(); // 合并前的 Manifest 文件列表
List<ManifestFileMeta> mergeAfterManifests = new ArrayList<>(); // 合并后的 Manifest 文件列表
try {
// 5. 准备构建新快照所需的基础信息
long previousTotalRecordCount = 0L; // 上一个快照的总记录数
Long currentWatermark = watermark; // 当前提交的水位线,可能被上一个快照的水位线更新
if (latestSnapshot != null) {
previousTotalRecordCount = scan.totalRecordCount(latestSnapshot);
// 读取上一个快照引用的所有数据 Manifest 文件
mergeBeforeManifests = manifestList.readDataManifests(latestSnapshot);
// 合并当前 logOffsets 和上一个快照的 logOffsets
Map<Integer, Long> latestLogOffsets = latestSnapshot.logOffsets();
if (latestLogOffsets != null) {
latestLogOffsets.forEach(logOffsets::putIfAbsent); // 如果当前没有,则使用上一个的
}
// 更新水位线:取当前提交的水位线和上一个快照水位线的较大值
Long latestWatermark = latestSnapshot.watermark();
if (latestWatermark != null) {
currentWatermark =
currentWatermark == null
? latestWatermark
: Math.max(currentWatermark, latestWatermark);
}
oldIndexManifest = latestSnapshot.indexManifest(); // 获取旧的索引 Manifest
}
// 6. 合并旧的 Manifest 文件,生成新的基础 Manifest List (baseManifestList)
// ManifestFileMerger.merge 会根据配置决定是否以及如何合并 Manifest 文件,
// 目的是控制 Manifest 文件的数量,避免元数据扫描过慢。
mergeAfterManifests =
ManifestFileMerger.merge(
mergeBeforeManifests, // 输入的 Manifest 文件列表
manifestFile, // ManifestFile 工厂,用于读写 Manifest
manifestTargetSize.getBytes(), // Manifest 文件目标大小
manifestMergeMinCount, // 最少需要多少个 Manifest 文件才触发合并
manifestFullCompactionSize.getBytes(), // 触发全量合并的阈值
partitionType, // 分区类型
manifestReadParallelism); // 读取 Manifest 的并行度
baseManifestList = manifestList.write(mergeAfterManifests); // 将合并后的 Manifest 列表写入新的 Manifest List 文件
// 7. 计算记录数变化
long deltaRecordCount = recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles);
long totalRecordCount = previousTotalRecordCount + deltaRecordCount;
// 8. 处理 Manifest List 和索引 Manifest (区分是否重试)
boolean rewriteIndexManifest = true; // 是否需要重新写入索引 Manifest
if (retryResult != null) { // 如果是重试
// 复用上次重试时已生成的统计信息和 Manifest List
deltaStatistics = retryResult.deltaStatistics;
deltaManifestList = retryResult.deltaManifestList;
changelogManifestList = retryResult.changelogManifestList;
// 如果旧的索引 Manifest 没有变化,可以复用上次重试时生成的新的索引 Manifest
if (Objects.equals(oldIndexManifest, retryResult.oldIndexManifest)) {
rewriteIndexManifest = false;
indexManifest = retryResult.newIndexManifest;
LOG.info("Reusing index manifest {} for retry.", indexManifest);
} else {
// 如果旧的索引 Manifest 变了 (说明在两次重试之间有其他提交修改了索引),
// 则需要清理上次重试时生成的临时索引 Manifest 文件。
cleanIndexManifest(retryResult.oldIndexManifest, retryResult.newIndexManifest);
}
} else { // 如果不是重试 (首次尝试)
// 生成新的分区统计信息
deltaStatistics = new ArrayList<>(PartitionEntry.merge(deltaFiles));
// 将 deltaFiles (数据文件变更) 写入新的 Manifest 文件,并生成对应的 Manifest List (deltaManifestList)
deltaManifestList = manifestList.write(manifestFile.write(deltaFiles));
// 将 changelogFiles 写入新的 Manifest 文件,并生成对应的 Manifest List (changelogManifestList)
if (changelogFiles.isEmpty()) {
changelogManifestList = null;
} else {
changelogManifestList = manifestList.write(manifestFile.write(changelogFiles));
}
}
// 9. 处理索引文件 (如果需要重写索引 Manifest)
if (rewriteIndexManifest) {
if (indexFiles.isEmpty()) { // 没有索引文件变更
if (commitKind == Snapshot.CommitKind.OVERWRITE) {
// 如果是 OVERWRITE 操作,且没有新的索引文件,则表示清空索引
indexManifest = null;
// 清理旧的索引 Manifest (如果存在)
if (oldIndexManifest != null && indexManifestFile.exists(oldIndexManifest)) {
indexManifestFile.delete(oldIndexManifest);
}
} else {
// 其他提交类型,如果没有新的索引文件,则沿用旧的索引 Manifest
indexManifest = oldIndexManifest;
}
} else { // 有索引文件变更
// 将 indexFiles 写入新的索引 Manifest 文件
indexManifest = indexManifestFile.write(indexFiles);
// 清理旧的索引 Manifest (如果存在且与新的不同)
if (oldIndexManifest != null
&& !oldIndexManifest.equals(indexManifest)
&& indexManifestFile.exists(oldIndexManifest)) {
indexManifestFile.delete(oldIndexManifest);
}
}
}
// 10. 构建新的 Snapshot 对象
newSnapshot =
new Snapshot(
newSnapshotId,
schemaManager.latestSnapshotSchemaId(), // 当前最新的 Schema ID
baseManifestList.getLeft(), // 基础 Manifest List 文件名
baseManifestList.getRight(), // 基础 Manifest List 文件大小
deltaManifestList.getLeft(), // 增量 Manifest List 文件名
deltaManifestList.getRight(), // 增量 Manifest List 文件大小
changelogManifestList == null ? null : changelogManifestList.getLeft(),
changelogManifestList == null ? null : changelogManifestList.getRight(),
indexManifest, // 索引 Manifest 文件名
commitUser, // 提交用户
identifier, // 提交 ID
commitKind, // 提交类型
System.currentTimeMillis(), // 提交时间戳
logOffsets, // 日志偏移量
totalRecordCount, // 总记录数
deltaRecordCount, // 增量记录数
changelogRecordCount(changelogFiles), // Changelog 记录数
currentWatermark, // 水位线
newStatsFileName // 统计文件名 (如果是 ANALYZE)
);
// 11. 原子性地提交 Snapshot
// commitSnapshotImpl 内部会尝试原子性地创建 Snapshot 文件 (通常通过 rename 实现)
// 并更新统计信息。
if (commitSnapshotImpl(newSnapshot, deltaStatistics)) {
// 提交成功
if (LOG.isDebugEnabled()) {
LOG.debug(
String.format(
"Successfully commit snapshot #%d by user %s "
+ "with identifier %s and kind %s.",
newSnapshotId, commitUser, identifier, commitKind.name()));
}
// 执行提交成功后的回调
commitCallbacks.forEach(callback -> callback.call(deltaFiles, newSnapshot));
return new SuccessResult(); // 返回成功结果
}
// 12. 原子提交失败 (例如 rename 失败)
// 这种情况通常意味着有并发冲突,或者文件系统层面发生问题。
// 需要清理本次尝试生成的临时文件,并返回 RetryResult,以便外层循环进行重试。
LOG.info(
"Atomic commit failed for snapshot #{}, user {}, identifier {}. "
+ "Cleaning up and retrying.",
newSnapshotId,
commitUser,
identifier);
// 清理本次尝试生成的 Manifest List 和索引 Manifest 文件
cleanBaseManifestList(baseManifestList);
cleanDeltaManifestList(deltaManifestList);
cleanChangelogManifestList(changelogManifestList);
cleanIndexManifest(oldIndexManifest, indexManifest); // 清理新生成的或未被复用的旧索引 Manifest
// 返回 RetryResult,其中包含了本次尝试生成的部分元数据,
// 以便在下次重试时可以复用,减少重复工作。
return new RetryResult(
deltaStatistics,
deltaManifestList,
changelogManifestList,
oldIndexManifest,
indexManifest,
latestSnapshot, // 记录本次尝试时基于的 latestSnapshot
baseDataFiles, // 记录本次尝试时读取的基础数据文件
startMillis); // 记录本次尝试的开始时间
} catch (Throwable e) {
// 13. 发生未预期的异常
LOG.warn(
String.format(
"Failed to commit snapshot #%d by user %s with identifier %s.",
newSnapshotId, commitUser, identifier),
e);
// 清理本次尝试可能产生的临时文件
cleanBaseManifestList(baseManifestList);
cleanDeltaManifestList(deltaManifestList);
cleanChangelogManifestList(changelogManifestList);
cleanIndexManifest(oldIndexManifest, indexManifest);
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new RuntimeException(e);
}
}
// ... existing code ...
这个方法的设计充分考虑了原子性、幂等性(通过外部的 filterCommitted
和 identifier
)、并发控制(通过冲突检测和原子 rename
)以及重试优化。它是 Paimon 实现可靠提交的基石。
SnapshotCommit
接口的两个主要实现:RenamingSnapshotCommit
和 CatalogSnapshotCommit
,看看它们是如何保证快照提交的原子性的。
SnapshotCommit
接口的核心方法是 commit(Snapshot snapshot, String branch, List<PartitionStatistics> statistics)
,它负责将给定的 Snapshot
对象原子性地持久化。
1. RenamingSnapshotCommit
这是 Paimon 默认的、基于文件系统的快照提交实现。它的原子性主要依赖于文件系统的原子 rename
操作,并结合了锁机制来处理并发。
我们来看一下它的核心实现逻辑:
RenamingSnapshotCommit.java
// ... existing code ...
public class RenamingSnapshotCommit implements SnapshotCommit {
private final SnapshotManager snapshotManager;
private final FileIO fileIO;
private final Lock lock; // 锁,用于并发控制
public RenamingSnapshotCommit(SnapshotManager snapshotManager, Lock lock) {
this.snapshotManager = snapshotManager;
this.fileIO = snapshotManager.fileIO();
this.lock = lock;
}
@Override
public boolean commit(Snapshot snapshot, String branch, List<PartitionStatistics> statistics)
throws Exception {
// 1. 确定新快照文件的最终路径
Path newSnapshotPath =
snapshotManager.branch().equals(branch) // 判断是否是主分支
? snapshotManager.snapshotPath(snapshot.id()) // 主分支的快照路径
: snapshotManager.copyWithBranch(branch).snapshotPath(snapshot.id()); // 其他分支的快照路径
// 2. 定义实际执行写入和重命名的操作
Callable<Boolean> callable =
() -> {
// 2a. 尝试原子性地写入快照内容到最终路径
// - FileIO.tryToWriteAtomic 通常实现为:先写入到一个临时文件,然后原子性 rename 到目标路径。
// - 如果目标文件已存在,某些文件系统 rename 可能会覆盖,某些可能会失败。
// 这里的具体行为取决于 FileIO 的实现和底层文件系统。
boolean committed = fileIO.tryToWriteAtomic(newSnapshotPath, snapshot.toJson());
if (committed) {
// 2b. 如果写入成功,更新 LATEST hint 文件,指向这个新的快照 ID
snapshotManager.commitLatestHint(snapshot.id());
}
return committed;
};
// 3. 使用锁来保证并发操作的安全性
return lock.runWithLock( // lock.runWithLock 会获取锁,执行 lambda 表达式,然后释放锁
() ->
// 3a. 在持有锁的情况下,再次检查目标快照文件是否存在。
// 这是为了处理某些文件系统 rename 操作在目标文件已存在时的不确定行为。
// 如果文件已存在,意味着可能有并发冲突或者之前的尝试部分成功,此时不再尝试写入。
!fileIO.exists(newSnapshotPath) && callable.call());
}
@Override
public void close() throws Exception {
this.lock.close();
}
// ... existing code ...
}
实现原子性的关键点:
- 原子
rename
:fileIO.tryToWriteAtomic(newSnapshotPath, snapshot.toJson())
是核心。这个方法通常会先将snapshot.toJson()
的内容写入一个临时文件,然后通过文件系统的rename
操作将临时文件重命名为最终的newSnapshotPath
(例如snapshot-N
)。文件系统的rename
操作在大多数情况下被认为是原子性的,即要么成功完成,要么失败不留下中间状态。 - 锁机制 (
Lock
):- 在多并发写入场景下,仅靠原子
rename
可能不足以完全避免冲突。例如,两个进程可能同时尝试创建同一个snapshot-N
文件。 lock.runWithLock(...)
提供了一个互斥的执行环境。在尝试写入快照之前,会先获取锁。- 在持有锁之后,它会再次检查
!fileIO.exists(newSnapshotPath)
。这个检查非常重要,因为:- 如果
newSnapshotPath
已经存在,说明可能有另一个并发的提交已经成功创建了这个快照,或者上一次尝试因为某种原因(比如rename
成功但后续步骤失败)留下了这个文件。此时,当前提交应该失败,以避免覆盖或产生不一致。 - 如果文件不存在,则执行
callable.call()
,即进行原子写入。
- 如果
- 在多并发写入场景下,仅靠原子
LATEST
Hint 更新:如果快照文件成功写入,snapshotManager.commitLatestHint(snapshot.id())
会更新LATEST
这个提示文件,使其指向新提交的快照 ID。这是一个尽力而为的操作,即使失败,Paimon 也能通过扫描实际的snapshot-N
文件来找到最新的快照。
总结 RenamingSnapshotCommit
:
- 优点:实现相对简单,不依赖外部 Catalog 服务,直接利用文件系统的能力。
- 适用场景:适用于不使用外部 Catalog 或 Catalog 不提供事务性提交快照能力的场景。
注释 // fs.rename may not returns false if target file already exists, or even not atomic as we're relying on external locking, we can first check if file exist then rename to work around this case
解释了为什么需要这个锁:
fs.rename
的行为不确定性:- 目标文件已存在时可能不返回
false
:某些文件系统(尤其是对象存储)的rename
操作在目标文件已存在时,可能不会显式返回false
或抛出错误,而是直接覆盖。如果 Paimon 依赖rename
操作在目标存在时失败来处理冲突,这种行为就会导致问题。 - 非原子性:在对象存储上,
rename
操作通常不是原子的。它可能被实现为“先复制再删除源文件”。如果在这个过程中发生故障(例如,复制完成但删除未完成,或者多个进程同时操作),就可能导致数据不一致,比如产生部分写入的快照文件,或者一个进程的提交覆盖了另一个进程的提交。
- 目标文件已存在时可能不返回
依赖外部锁(
external locking
):- 为了解决上述
rename
操作的不可靠性,Paimon 引入了外部锁机制。这个lock
对象通常是一个目录级别的锁(例如基于 Hive Metastore 或 JDBC 的锁),确保在任意时刻只有一个写入者可以对特定的表执行提交操作。
- 为了解决上述
锁内的操作顺序:
!fileIO.exists(newSnapshotPath)
:在获取锁之后,首先检查目标快照路径newSnapshotPath
是否已经存在。- 如果已存在,则表达式
!fileIO.exists(newSnapshotPath)
为false
,整个runWithLock
的 lambda 表达式返回false
,表示提交失败。这可以防止覆盖已经存在的快照(可能由另一个并发的提交者刚刚完成)。 - 如果不存在,则继续执行
callable.call()
。
- 如果已存在,则表达式
callable.call()
:这个Callable
内部执行了fileIO.tryToWriteAtomic(newSnapshotPath, snapshot.toJson())
。这个方法会尝试原子地写入快照内容。它通常会先将快照数据写入一个临时文件,然后将该临时文件重命名为最终的newSnapshotPath
。- 如果
tryToWriteAtomic
成功(意味着底层的重命名成功),则返回true
。 - 如果成功,还会调用
snapshotManager.commitLatestHint(snapshot.id())
来更新最新的快照提示。
- 如果
总结来说,加锁的原因是:
- 保证原子性:将“检查文件是否存在”和“写入/重命名文件”这两个步骤组合成一个原子操作。如果没有锁,两个并发的提交操作可能会出现以下情况:
- 进程 A 检查到目标路径不存在。
- 进程 B 检查到目标路径不存在。
- 进程 A 成功写入并重命名文件。
- 进程 B 尝试写入并重命名文件,可能会覆盖进程 A 的提交,或者由于底层
rename
的非原子性导致失败或不一致状态。
- 处理文件系统
rename
的限制:特别是在对象存储上,rename
操作的原子性和行为(当目标存在时)不可靠。通过外部锁和显式的存在性检查,Paimon 能够更可靠地管理快照的提交。
这个锁确保了即使底层文件系统的 rename
操作不够健壮,Paimon 依然能够安全地提交快照,避免了并发写入时可能出现的竞态条件和数据损坏。
相关的代码片段也印证了这一点:
RenamingSnapshotCommit.Factory
类中,Lock
对象可以通过CatalogLockFactory
创建,这通常对应了文档中提到的 Hive Metastore 或 JDBC 锁,用于在分布式环境中提供锁服务。FileIO
接口及其各种实现(如LocalFileIO
、HadoopCompliantFileIO
for S3/OSS)处理实际的文件操作。LocalFileIO
的rename
使用了 Java NIO 的Files.move
并配合了本地锁RENAME_LOCK
,而对象存储的实现则依赖其底层驱动。- Paimon 的并发控制文档 (
concurrency-control.md
) 也明确指出,在对象存储上,由于RENAME
非原子性,需要配置外部锁(如 Hive Metastore)。
因此,这里的锁是保证 Paimon 在各种文件系统(尤其是对象存储)上进行快照提交时数据一致性和正确性的关键机制。
2. CatalogSnapshotCommit
这个实现将快照的提交委托给外部的 Paimon Catalog。这意味着原子性保证的责任转移到了 Catalog 的实现上。
CatalogSnapshotCommit.java
// ... existing code ...
public class CatalogSnapshotCommit implements SnapshotCommit {
private final Catalog catalog; // Paimon Catalog 实例
private final Identifier identifier; // 表的标识符
public CatalogSnapshotCommit(Catalog catalog, Identifier identifier) {
this.catalog = catalog;
this.identifier = identifier;
}
@Override
public boolean commit(Snapshot snapshot, String branch, List<PartitionStatistics> statistics)
throws Exception {
// 1. 构建带有分支信息的表标识符
Identifier newIdentifier =
new Identifier(identifier.getDatabaseName(), identifier.getTableName(), branch);
// 2. 调用 Catalog 的 commitSnapshot 方法
// 原子性的保证完全依赖于 catalog.commitSnapshot() 的实现。
return catalog.commitSnapshot(newIdentifier, snapshot, statistics);
}
@Override
public void close() throws Exception {
catalog.close(); // 关闭 Catalog 连接(如果需要)
}
// ... existing code ...
}
实现原子性的关键点:
- 委托给 Catalog:核心逻辑是
catalog.commitSnapshot(newIdentifier, snapshot, statistics)
。 - Catalog 的责任:Paimon Catalog 的具体实现(例如
HiveCatalog
,JdbcCatalog
等)需要负责原子地将Snapshot
信息(通常是其元数据,比如指向 Manifest List 的路径等)与表关联起来。- 对于像 Hive Metastore 这样的外部元数据存储,这可能涉及到在一个事务中更新表的属性或元数据记录。
- 如果 Catalog 支持事务,那么
commitSnapshot
应该在一个事务内完成。如果操作成功,事务提交;如果失败,事务回滚,从而保证原子性。
总结 CatalogSnapshotCommit
:
- 优点:
- 可以将快照管理与更强大的外部元数据系统集成,利用其事务能力、一致性保证和管理功能。
- 当 Paimon 表注册到外部 Catalog (如 Hive Metastore) 时,快照信息可以与 Catalog 中的表元数据保持一致。
- 依赖:强依赖外部 Catalog 实现
commitSnapshot
方法的原子性和正确性。不同的 Catalog 实现其保证级别可能不同。 - 适用场景:当 Paimon 与支持事务性元数据操作的外部 Catalog (如 Hive Metastore, JDBC Catalog) 集成时使用。
实际上只有 RESTCatalog 实现了commitSnapshot,意味着通过REST,外部委托给一个文件系统(
RESTCatalogServer接受请求),再在这个文件系统使用类似RenamingSnapshotCommit的提交。
选择哪个实现?
在 AbstractFileStore.newCommit()
方法中,Paimon 会根据配置决定使用哪个 SnapshotCommit
实现:
AbstractFileStore.java
// ... existing code ...
public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
SnapshotManager snapshotManager = snapshotManager();
// catalogEnvironment.snapshotCommit() 会根据配置返回合适的 SnapshotCommit 实例
SnapshotCommit snapshotCommit = catalogEnvironment.snapshotCommit(snapshotManager);
if (snapshotCommit == null) {
// 如果没有配置外部 Catalog 的 SnapshotCommit,或者 Catalog 不支持,
// 则默认使用 RenamingSnapshotCommit
snapshotCommit =
new RenamingSnapshotCommit(
snapshotManager, Lock.emptyFactory().createLock(null));
}
// ... 其他初始化 ...
return new FileStoreCommitImpl(
// ...
snapshotCommit, // 将选择的 SnapshotCommit 传递给 FileStoreCommitImpl
// ...
);
}
// ... existing code ...
如果 catalogEnvironment.snapshotCommit(snapshotManager)
返回 null
(意味着没有配置或 Catalog 不提供特定的 SnapshotCommit
实现),则会默认创建一个 RenamingSnapshotCommit
。否则,会使用 Catalog 提供的实现(通常是 CatalogSnapshotCommit
的一个实例,其内部持有的 Catalog
对象会负责实际的提交)。
总而言之,两种实现都旨在提供原子性的快照提交,但它们依赖的机制不同:RenamingSnapshotCommit
依赖文件系统和本地锁,而 CatalogSnapshotCommit
依赖外部 Catalog 的事务能力。
TableCommitImpl
如何使用 FileStoreCommitImpl
的能力
TableCommitImpl
是 FileStoreCommitImpl
的直接用户和封装者。它利用 FileStoreCommitImpl
的核心能力来完成对 Paimon 表的实际数据变更提交。
执行提交:
- 当
TableCommitImpl
的commit(ManifestCommittable committable)
或commitMultiple(...)
方法被调用时,它最终会调用其持有的FileStoreCommit
实例 (即FileStoreCommitImpl
) 的commit(...)
方法。 - 对于覆盖写操作,
TableCommitImpl.overwrite(...)
会调用FileStoreCommitImpl.overwrite(...)
。
- 当
过滤已提交内容:
- 在
TableCommitImpl.filterAndCommitMultiple(...)
方法中,它会先调用fileStoreCommit.filterCommitted(committables)
来过滤掉已经提交的ManifestCommittable
,然后再调用fileStoreCommit.commit(...)
提交剩余的部分。
- 在
传递配置和上下文:
TableCommitImpl
在创建FileStoreCommitImpl
实例时,会传递必要的配置信息,如commitUser
、SnapshotManager
、ManifestFile.Factory
等。
触发 Manifest 合并:
TableCommitImpl
在其expire(...)
方法中,可能会调用fileStoreCommit.compactManifests()
来触发 Manifest 文件的合并操作,以优化元数据。
处理统计信息提交:
TableCommitImpl.commitStatistics(...)
会调用FileStoreCommitImpl.commitStatistics(...)
来提交表的统计信息,这会生成一个ANALYZE
类型的 Snapshot。
简单来说,TableCommitImpl
扮演了一个协调者和业务逻辑封装者的角色,它将上层应用(如 Flink Sink)的提交请求转化为对 FileStoreCommitImpl
的调用,并处理一些表级别的管理任务(如过期、打标)。而 FileStoreCommitImpl
则专注于提供底层的、原子的、可靠的文件存储层提交服务。
StoreCommitter
(paimon-flink-common)
核心职责: 这是 Flink Sink 中针对单个 Paimon 表的 Committer 实现。它实现了 Flink 的
Committer<Committable, ManifestCommittable>
接口。- 适配 Flink: 将 Flink 的
Committable
对象(通常由TableWriteOperator
生成,包含了单个并行实例写入的数据文件信息)聚合成 Paimon Core 能理解的ManifestCommittable
。 - 管理
TableCommitImpl
: 每个StoreCommitter
实例持有一个对应 Paimon 表的TableCommitImpl
实例。 - 调用
TableCommitImpl
: 在 Flink Checkpoint 完成后,CommitterOperator
会调用StoreCommitter
的commit(ManifestCommittable)
方法,该方法进而调用其内部TableCommitImpl
实例的commit()
或filterAndCommit()
(在恢复时) 方法来执行实际的 Paimon 表提交。 - 处理分区监听: 如果配置了分区监听器,它会触发相应的监听逻辑。
- 指标桥接: 将 Flink 的 MetricGroup 传递给
TableCommitImpl
,以便 Paimon Core 层的指标能够注册到 Flink 的指标系统中。
- 适配 Flink: 将 Flink 的
关键方法:
combine(long checkpointId, long watermark, List<Committable> committables)
: 将来自 Flink Writer 的多个Committable
(通常对应一个 Checkpoint 的所有并行实例)聚合成一个ManifestCommittable
。commit(ManifestCommittable globalCommittable)
: 调用TableCommitImpl.commit(globalCommittable)
。filterAndCommit(Map<Long, ManifestCommittable> globalCommittables)
: 调用TableCommitImpl.filterAndCommitMultiple(...)
。
StoreCommitter.java
// ... existing code ...
public class StoreCommitter implements Committer<Committable, ManifestCommittable> {
private final TableCommitImpl commit; // 持有 TableCommitImpl 实例
@Nullable private final CommitterMetrics committerMetrics;
private final PartitionListeners partitionListeners;
private final boolean allowLogOffsetDuplicate;
public StoreCommitter(FileStoreTable table, TableCommit tableCommit, Context context) {
// 注意这里的 tableCommit 通常就是 TableCommitImpl 的实例
this.commit = (TableCommitImpl) tableCommit;
if (context.metricGroup() != null) {
this.commit.withMetricRegistry(new FlinkMetricRegistry(context.metricGroup()));
this.committerMetrics = new CommitterMetrics(context.metricGroup().getIOMetricGroup());
} else {
this.committerMetrics = null;
}
// ... existing code ...
}
@Override
public ManifestCommittable combine(
long checkpointId, long watermark, List<Committable> committables) {
ManifestCommittable manifestCommittable = new ManifestCommittable(checkpointId, watermark);
Map<Integer, Long> logOffsets = new HashMap<>();
for (Committable committable : committables) {
manifestCommittable.addFileCommittable(committable.wrappedCommittable());
if (committable.logOffsets() != null && !committable.logOffsets().isEmpty()) {
// ... 处理 log offsets ...
}
}
manifestCommittable.setLogOffsets(logOffsets);
return manifestCommittable;
}
@Override
public void commit(ManifestCommittable globalCommittable) {
// 调用 TableCommitImpl 执行提交
this.commit.commit(globalCommittable);
notifyPartitionCreated(globalCommittable);
if (committerMetrics != null) {
committerMetrics.commitDuration(
globalCommittable.identifier(),
this.commit.getLatestMetrics().getCommitDuration());
}
}
@Override
public Map<Long, ManifestCommittable> filterAndCommit(
Map<Long, ManifestCommittable> globalCommittables) {
// 调用 TableCommitImpl 执行过滤和提交
int committed =
this.commit.filterAndCommitMultiple(new ArrayList<>(globalCommittables.values()));
if (committerMetrics != null) {
committerMetrics.updateNumCommits(committed);
}
// ...
return globalCommittables; // 返回实际提交的内容 (或根据 TableCommitImpl 的返回值调整)
}
// ... existing code ...
}
StoreMultiCommitter
(paimon-flink-common)
核心职责: 这是 Flink Sink 中针对多个 Paimon 表的 Committer 实现。它实现了 Flink 的
Committer<MultiTableCommittable, WrappedManifestCommittable>
接口。这通常用于 CDC(Change Data Capture)场景,一个 Flink 作业可能同时向多个 Paimon 表写入数据。- 管理多个
StoreCommitter
:StoreMultiCommitter
内部维护一个Map<Identifier, StoreCommitter>
,其中 Key 是表标识符 (database.table),Value 是对应表的StoreCommitter
实例。 - 分发提交: 当接收到
MultiTableCommittable
(它内部包装了针对特定表的Committable
)时,它会根据表标识符找到对应的StoreCommitter
,并将提交任务分发给它。 - 聚合
WrappedManifestCommittable
:WrappedManifestCommittable
是对ManifestCommittable
的一层包装,增加了表标识符信息,使得在多表场景下能够区分不同表的提交。 - 目录和表过滤: 可以配置表过滤器 (
TableFilter
) 来确定哪些数据库和表需要被处理。
- 管理多个
关键方法:
combine(long checkpointId, long watermark, List<MultiTableCommittable> committables)
: 将多个MultiTableCommittable
按照表进行分组,然后对每个表调用其对应的StoreCommitter
的combine
方法,最后将结果包装成WrappedManifestCommittable
。commit(WrappedManifestCommittable globalCommittable)
: 从WrappedManifestCommittable
中提取表标识符和实际的ManifestCommittable
,然后调用对应表的StoreCommitter
的commit
方法。filterAndCommit(Map<Long, WrappedManifestCommittable> globalCommittables)
: 类似地,按表分发给各个StoreCommitter
的filterAndCommit
方法。
StoreMultiCommitter.java
// ... existing code ...
public class StoreMultiCommitter
implements Committer<MultiTableCommittable, WrappedManifestCommittable> {
private final Catalog catalog;
private final Context context;
private final Map<Identifier, StoreCommitter> tableCommitters; // 管理多个表的 StoreCommitter
// ... 其他成员变量 ...
public StoreMultiCommitter(
CatalogLoader catalogLoader,
Context context,
boolean ignoreEmptyCommit,
Map<String, String> dynamicOptions,
boolean eagerInit,
TableFilter tableFilter) {
this.catalog = catalogLoader.load();
this.context = context;
this.ignoreEmptyCommit = ignoreEmptyCommit;
this.dynamicOptions = dynamicOptions;
this.tableCommitters = new HashMap<>();
this.tableFilter = tableFilter;
// ... 初始化逻辑,可能会预先加载表的 StoreCommitter ...
}
@Override
public WrappedManifestCommittable combine(
long checkpointId, long watermark, List<MultiTableCommittable> committables) {
WrappedManifestCommittable wrappedCommittable =
new WrappedManifestCommittable(checkpointId, watermark);
// 按表分组
Map<Identifier, List<Committable>> grouped = new HashMap<>();
for (MultiTableCommittable committable : committables) {
Identifier tableId = Identifier.create(committable.database(), committable.table());
grouped.computeIfAbsent(tableId, k -> new ArrayList<>())
.add(new Committable(committable.kind(), committable.wrappedCommittable()));
}
for (Map.Entry<Identifier, List<Committable>> entry : grouped.entrySet()) {
Identifier tableId = entry.getKey();
List<Committable> tableCommittables = entry.getValue();
StoreCommitter storeCommitter = getStoreCommitter(tableId); // 获取或创建对应表的 StoreCommitter
// 调用对应表的 StoreCommitter 的 combine 方法
ManifestCommittable manifestCommittable =
storeCommitter.combine(checkpointId, watermark, tableCommittables);
wrappedCommittable.addCommittable(tableId, manifestCommittable);
}
return wrappedCommittable;
}
@Override
public void commit(WrappedManifestCommittable globalCommittable) {
for (Map.Entry<Identifier, ManifestCommittable> entry :
globalCommittable.getCommittables().entrySet()) {
Identifier tableId = entry.getKey();
ManifestCommittable manifestCommittable = entry.getValue();
StoreCommitter storeCommitter = getStoreCommitter(tableId);
// 调用对应表的 StoreCommitter 的 commit 方法
storeCommitter.commit(manifestCommittable);
}
}
@Override
public Map<Long, WrappedManifestCommittable> filterAndCommit(
Map<Long, WrappedManifestCommittable> globalCommittables) {
// 遍历每个 checkpoint 的 WrappedManifestCommittable
for (WrappedManifestCommittable wrapped : globalCommittables.values()) {
for (Map.Entry<Identifier, ManifestCommittable> entry :
wrapped.getCommittables().entrySet()) {
Identifier tableId = entry.getKey();
ManifestCommittable manifestCommittable = entry.getValue();
StoreCommitter storeCommitter = getStoreCommitter(tableId);
// 为每个表单独调用 filterAndCommit
// 注意:这里简化了逻辑,实际 filterAndCommit 需要更精细处理 checkpointId 和返回结果
storeCommitter.filterAndCommit(
Collections.singletonMap(wrapped.checkpointId(), manifestCommittable));
}
}
return globalCommittables; // 返回实际提交的内容
}
// ... existing code ...
}
总结:如何进行 Commit
结合上述组件,Paimon Flink Sink 的提交流程如下(以单表 StoreCommitter
为例):
数据写入与
Committable
生成 (FlinkTableWriteOperator
):- Flink 的
TableWriteOperator
(或类似的写入算子) 并行地将数据写入 Paimon 表的数据文件 (如 Parquet, ORC)。 - 在每个 Flink Checkpoint 准备阶段 (
prepareSnapshotPreBarrier
),每个并行 Writer 实例会生成一个Committable
对象。这个对象包含了该实例在该 Checkpoint 周期内新写入的数据文件列表、需要删除的旧文件列表(用于更新场景)等元数据。
- Flink 的
Committable
发送至CommitterOperator
:- 这些
Committable
对象通过 Flink 的网络层发送给下游的CommitterOperator
。CommitterOperator
通常设置为单并行度。
- 这些
CommitterOperator
聚合Committable
:CommitterOperator
接收到来自所有并行 Writer 实例的Committable
。- 在其
snapshotState
方法中(或endInput
对于批处理),它会调用其持有的Committer
(即StoreCommitter
) 的combine
方法。 StoreCommitter.combine()
将这些Committable
聚合成一个单一的ManifestCommittable
对象。这个对象代表了整个 Checkpoint 周期的所有变更。- 这个
ManifestCommittable
被保存在CommitterOperator
的 Flink 状态中,与 Checkpoint ID 关联。
Flink Checkpoint 完成通知:
- 当 Flink JobManager 确认一个 Checkpoint (
checkpointId
) 成功完成后,会调用CommitterOperator
的notifyCheckpointComplete(checkpointId)
方法。
- 当 Flink JobManager 确认一个 Checkpoint (
CommitterOperator
触发 Paimon Commit:CommitterOperator
从其状态中获取对应checkpointId
的ManifestCommittable
。- 调用其持有的
Committer
(即StoreCommitter
) 的commit(ManifestCommittable)
方法。 - (如果是作业恢复,则会调用
filterAndCommit
方法)。
StoreCommitter
调用TableCommitImpl
:StoreCommitter.commit(ManifestCommittable)
方法会直接调用其内部持有的TableCommitImpl
实例的commit(ManifestCommittable)
方法。
TableCommitImpl
执行核心提交逻辑:TableCommitImpl.commit(ManifestCommittable)
方法接收到ManifestCommittable
。- 它会调用其内部的
FileStoreCommit
实例的commit
方法。 FileStoreCommit
的原子提交:- 写入 Manifest 文件: 根据
ManifestCommittable
中的信息,生成一个新的 Manifest 文件。这个文件记录了哪些数据文件是新增的,哪些是因覆盖而删除的。 - 写入 Manifest List 文件: 生成一个新的 Manifest List 文件,它指向了上一步生成的 Manifest 文件以及之前快照可能仍然需要的旧 Manifest 文件。
- 原子性地创建 Snapshot 文件: 这是原子性的关键。Paimon 会创建一个新的 Snapshot 文件(例如
snapshot-N
)。这个 Snapshot 文件是一个小的 JSON 文件,它记录了快照的元数据,包括快照ID、时间戳、以及指向的 Manifest List 文件名。这个 Snapshot 文件的创建被设计为原子操作(通常通过文件系统的原子rename
实现)。
- 写入 Manifest 文件: 根据
- 一旦新的 Snapshot 文件成功生成并对外部可见,就代表整个提交完成了。
后续操作 (过期等):
TableCommitImpl
在提交成功后,会异步(或同步,根据配置)触发快照过期、分区过期、自动打标等逻辑。
总结: CommitterOperator
(Flink) -> StoreCommitter
/ StoreMultiCommitter
(Flink Connector 适配层) -> TableCommitImpl
(Paimon Core 业务逻辑层) -> FileStoreCommit
(Paimon Core 底层存储交互层) -> 原子性 Snapshot 创建。
这个分层设计使得 Flink 的 Checkpoint 机制能够与 Paimon Core 的事务提交机制解耦并协同工作,实现了端到端的数据一致性和 Exactly-Once 语义。