Paimon INSERT OVERWRITE

发布于:2025-07-22 ⋅ 阅读:(13) ⋅ 点赞:(0)

在 Paimon 中,INSERT OVERWRITE 是一种原子性的数据替换操作。它不像传统数据库的 DELETE + INSERT,而是一个完整的、不可分割的事务。

  • 原子性 (Atomicity):整个覆盖操作要么完全成功,要么完全失败。不会出现数据被部分删除或部分写入的中间状态。即使作业失败,表数据也会回滚到操作之前的状态。
  • 事务性 (Transactional):每一次 OVERWRITE 都会生成一个新的、完整的表快照 (Snapshot)。这使得数据具备了版本管理能力,支持时间旅行(Time Travel)。
  • 高效性 (Efficiency):对于分区表,它只重写涉及到的分区,未触及的分区数据保持不变,开销可控。

INSERT OVERWRITE 主要分为两种模式:静态分区覆盖 (Static Partition Overwrite) 和 动态分区覆盖 (Dynamic Partition Overwrite)

静态分区覆盖

定义:在 INSERT 语句中明确指定要覆盖哪些分区。Paimon 会清空这些指定分区内的所有数据,然后将新数据写入。

适用场景

  • 清理并重写某个或某几个历史分区的数据。
  • 对整个表进行数据重写(不指定分区时)。

语法示例 (Spark SQL):

-- 假设表 T 有两个分区键: pt1, pt2
CREATE TABLE T (id INT, name STRING, pt1 STRING, pt2 INT) PARTITIONED BY (pt1, pt2);

-- 覆盖单个分区 pt1='a', pt2=1
INSERT OVERWRITE T PARTITION (pt1 = 'a', pt2 = 1)
SELECT id, name FROM source_table WHERE ...;

-- 覆盖所有 pt1='a' 的分区 (pt2 的值不限)
-- 注意:Spark 默认是静态覆盖模式,所以这会覆盖整个表,除非配置为动态模式
-- 更安全的静态覆盖是明确指定所有分区键
INSERT OVERWRITE T PARTITION (pt1 = 'a') -- 这种语法在不同引擎下行为可能不同,需谨慎
SELECT id, name, pt2 FROM source_table WHERE ...;

-- 覆盖整个表 (清空所有分区)
INSERT OVERWRITE T
SELECT id, name, pt1, pt2 FROM source_table;
动态分区覆盖

定义:不直接在 INSERT 语句中指定分区,而是根据 SELECT 查询结果中的数据来动态决定需要覆盖哪些分区。如果查询结果中包含了 pt1='a' 和 pt1='b' 的数据,那么只有这两个分区会被覆盖。

适用场景

  • 日常的 ETL 任务,根据上游数据重跑当天的分区。
  • 数据修正,输入一个包含多个分区修正数据的源,一次性覆盖所有相关分区。

语法示例 (Spark SQL):

-- 必须先开启 Spark 的动态分区覆盖模式
SET spark.sql.sources.partitionOverwriteMode=dynamic;

-- 假设 source_table 中包含 pt1='a' 和 pt1='b' 的数据
-- 那么 T 表中只有 pt1='a' 和 pt1='b' 这两个分区会被覆盖
-- T 表中原有的 pt1='c', pt1='d' 等分区数据不受影响
INSERT OVERWRITE T
SELECT id, name, pt1, pt2 FROM source_table;

在 Flink 中,默认就是动态分区覆盖。如果想改为静态,需要加 Hint:


-- Flink SQL: 静态覆盖整个表
INSERT OVERWRITE T /*+ OPTIONS('dynamic-partition-overwrite'='false') */
SELECT ...;

如何工作:内部实现机制(重点)

INSERT OVERWRITE 的原子性和事务性完全依赖于 Paimon 的元数据和文件管理机制

我们以一个动态分区覆盖为例,追踪其内部流程:

Step 1: 发起写入
  • 当执行 INSERT OVERWRITE 命令时,计算引擎(如 Spark/Flink)的 Sink 任务开始向 Paimon 写入数据。
  • Paimon 的 TableWrite 对象被创建,并且通过 withOverwrite(partition) 方法被标记为覆盖模式。对于动态覆盖,partition 参数为空,表示由数据动态决定。
Step 2: 数据写入与生成新文件
  • Sink 任务读取源数据,根据分区键将数据分组,然后写入到对应分区的 新数据文件(Data File)中。
  • 例如,要覆盖 pt='a' 和 pt='b' 两个分区,Paimon 会在 path/to/table/pt=a/ 和 path/to/table/pt=b/ 目录下创建新的 .orc 或 .parquet 文件。
  • 关键点:此时,旧的数据文件仍然存在,线上读取任务看到的是旧数据。
Step 3: 准备提交 (Prepare Commit)
  • 所有 Sink 任务完成数据写入后,会将它们生成的新文件列表作为 CommitMessage 发送给 Driver/JobManager 端的 Commit 算子。
  • CommitMessage 中包含了每个新数据文件的元信息,如文件名、大小、行数、分区值等。
Step 4: 执行提交 (Commit)

这是 OVERWRITE 的核心所在。TableCommit 对象会整理快照和提交。

Step 5: 清理旧文件 (Cleanup)
  • 旧的数据文件(被 DELETE 标记的)不会立即被物理删除。它们会保留一段时间(由 snapshot.time-retained 配置决定)。
  • Paimon 的后台清理线程或下一次 commit 时会检查过期的快照,并将这些快照引用的、且后续快照不再引用的文件进行物理删除。这为时间旅行和故障恢复提供了保障。

总结

INSERT OVERWRITE 在 Paimon 中是一个精心设计的、高度可靠的操作。

  • 对用户而言,它提供了灵活的静态和动态数据覆盖能力。
  • 在内部,它通过 Copy-on-Write 的思想,结合快照(Snapshot) -> 清单列表(Manifest List) -> 清单(Manifest) 的三级元数据结构,将一个复杂的数据替换操作,最终简化为一次原子性的文件重命名,从而保证了整个过程的原子性、一致性和事务性。

TableCommitImpl 类中的 withOverwrite 

withOverwrite 方法是用来配置“覆盖写”操作的。在数据库和数据仓库中,覆盖写(Overwrite)通常指的是用新的数据集完全替换掉表中已有的数据,或者替换掉特定分区中的数据。这对应了 SQL 中的 INSERT OVERWRITE 语义。

// ... existing code ...
    @Override
    public TableCommitImpl withOverwrite(@Nullable Map<String, String> overwritePartitions) {
        this.overwritePartition = overwritePartitions;
        return this;
    }
// ... existing code ...

overwritePartition 是一个 Map<String, String>,它定义了覆盖写的范围:

  • 如果 overwritePartition 是一个空 Map (例如 Collections.emptyMap()),这表示要覆盖整个表。这通常用于非分区表,或者需要覆盖所有分区的分区表。
  • 如果 overwritePartition 包含了分区键值对 (例如 {"dt": "2024-01-01", "hr": "09"}), 这表示只覆盖符合该分区规范的特定分区。

独特的处理逻辑

当 withOverwrite 被调用后,overwritePartition 字段不再为 null,这会触发 TableCommitImpl 中一系列独特的处理逻辑,主要集中在 commitMultiple 方法中。

让我们看一下 commitMultiple 方法的关键部分:

// ... existing code ...
    public void commitMultiple(List<ManifestCommittable> committables, boolean checkAppendFiles) {
        if (overwritePartition == null) {
            // 这是正常的追加(append)或合并(merge)提交逻辑
            for (ManifestCommittable committable : committables) {
                commit.commit(committable, new HashMap<>(), checkAppendFiles);
            }
// ... existing code ...
        } else {
            // **** 这是 overwrite 模式下的特殊处理逻辑 ****
            ManifestCommittable committable;
            if (committables.size() > 1) {
                // 1. 检查:覆盖写操作期望只有一个提交单元(committable)
                throw new RuntimeException(
                        "Multiple committables appear in overwrite mode, this may be a bug, please report it: "
                                + committables);
            } else if (committables.size() == 1) {
                committable = committables.get(0);
            } else {
                // 2. 如果没有新的数据要写入(例如,用一个空数据集覆盖分区),
                //    会创建一个空的 committable 来触发底层的覆盖逻辑。
                // create an empty committable
                // identifier is Long.MAX_VALUE, come from batch job
                // TODO maybe it can be produced by CommitterOperator
                committable = new ManifestCommittable(Long.MAX_VALUE);
            }
            // 3. 核心调用:调用底层的 FileStoreCommit 的 overwrite 方法,
            //    将分区信息和新的文件列表传递下去。
            commit.overwrite(overwritePartition, committable, Collections.emptyMap());
            expire(committable.identifier(), expireMainExecutor);
        }
    }
// ... existing code ...

总结一下其独特处理:

  1. 逻辑分支commitMultiple 方法通过检查 overwritePartition 是否为 null 来区分是常规提交还是覆盖写提交,并进入不同的逻辑分支。

  2. 单一提交限制:在覆盖写模式下,系统预期一次 commit 操作只对应一个 ManifestCommittable。这是因为覆盖写是一个原子性的替换操作,将多个独立的提交单元合并到一个覆盖写操作中没有明确的语义,且容易出错。

  3. 空提交处理:即使没有新的数据写入(committables 为空),只要指定了 withOverwrite,Paimon 依然会生成一个快照。这用于处理“清空分区”或“清空表”的场景(INSERT OVERWRITE ... SELECT ... WHERE false)。

  4. 调用底层实现:它最终不会调用常规的 commit.commit(),而是调用 commit.overwrite()。这个 overwrite 方法(在 FileStoreCommitImpl 中实现)会负责:

    • 标记指定分区(或全表)下的所有旧数据文件为已删除。
    • 添加本次提交带来的新数据文件。
    • 将这些变更原子性地提交为一个新的快照(Snapshot)。
  5. 强制生成快照:覆盖写操作总是会生成一个新的快照,即使没有写入新数据。这体现在 forceCreatingSnapshot 方法中:

    // ... existing code ...
    public boolean forceCreatingSnapshot() {
        if (this.forceCreatingSnapshot) {
            return true;
        }
        if (overwritePartition != null) {
            return true;
        }
        return tagAutoManager != null
    

如何使用

在实践中,withOverwrite 通常由 BatchWriteBuilder 来调用,如下面的代码所示:

// ... existing code ...
    @Override
    public BatchTableCommit newCommit() {
        InnerTableCommit commit = table.newCommit(commitUser).withOverwrite(staticPartition);
        commit.ignoreEmptyCommit(true);
        return commit;
    }
// ... existing code ...

当用户执行 INSERT OVERWRITE SQL 语句时,Paimon 的 Flink 或 Spark Connector 会构建一个 BatchWriteBuilder,并通过 withOverwrite 方法设置要覆盖的分区,最终在提交时触发上述的特殊逻辑。

总而言之,withOverwrite 是 Paimon 中实现原子性覆盖写功能的关键入口。它通过设置一个内部状态(overwritePartition),在提交时触发一套完全不同于常规追加/合并的逻辑,以确保数据替换的原子性和正确性。

FileStoreCommitImpl 中的 overwrite 

这个方法是 Paimon 执行 INSERT OVERWRITE 语义的底层核心实现。当上层的 TableCommitImpl 确定这是一个覆盖写操作后,就会调用这个方法。它的主要职责是计算出需要删除的旧文件和需要添加的新文件,并将这些变更原子地提交为一个新的快照(Snapshot)。

我们来分解一下这个方法的具体处理流程:

// ... existing code ...
    @Override
    public void overwrite(
            Map<String, String> partition,
            ManifestCommittable committable,
            Map<String, String> properties) {
        if (LOG.isDebugEnabled()) {
// ... existing code ...
        }

        long started = System.nanoTime();
        int generatedSnapshot = 0;
        int attempts = 0;
        // 1. 收集本次提交带来的所有文件变更
        List<ManifestEntry> appendTableFiles = new ArrayList<>();
        List<ManifestEntry> appendChangelog = new ArrayList<>();
        List<ManifestEntry> compactTableFiles = new ArrayList<>();
        List<ManifestEntry> compactChangelog = new ArrayList<>();
        List<IndexManifestEntry> appendHashIndexFiles = new ArrayList<>();
        List<IndexManifestEntry> compactDvIndexFiles = new ArrayList<>();
        collectChanges(
                committable.fileCommittables(),
                appendTableFiles,
                appendChangelog,
                compactTableFiles,
                compactChangelog,
                appendHashIndexFiles,
                compactDvIndexFiles);

        // 2. 覆盖写模式下不处理 changelog 文件,并打印警告
        //    因为覆盖写是破坏性操作,会破坏流读的连续性。
        if (!appendChangelog.isEmpty() || !compactChangelog.isEmpty()) {
            StringBuilder warnMessage =
                    new StringBuilder(
                            "Overwrite mode currently does not commit any changelog.\n"
                                    + "Please make sure that the partition you're overwriting "
                                    + "is not being consumed by a streaming reader.\n"
                                    + "Ignored changelog files are:\n");
// ... existing code ...
            LOG.warn(warnMessage.toString());
        }

        try {
            boolean skipOverwrite = false;
            // 3. 根据静态/动态分区模式,创建分区过滤器,用于确定要删除哪些分区的数据
            PartitionPredicate partitionFilter = null;
            if (dynamicPartitionOverwrite) {
                // 动态分区覆盖模式
                if (appendTableFiles.isEmpty()) {
                    // 如果没有新数据写入,则跳过覆盖操作,不会删除任何数据
                    skipOverwrite = true;
                } else {
                    // 根据新写入数据涉及的分区,来确定要覆盖哪些分区
                    Set<BinaryRow> partitions =
                            appendTableFiles.stream()
                                    .map(ManifestEntry::partition)
                                    .collect(Collectors.toSet());
                    partitionFilter = PartitionPredicate.fromMultiple(partitionType, partitions);
                }
            } else {
                // 静态分区覆盖模式 (或者非分区表)
                Predicate partitionPredicate =
                        createPartitionPredicate(partition, partitionType, partitionDefaultName);
                partitionFilter =
                        PartitionPredicate.fromPredicate(partitionType, partitionPredicate);
                // 安全检查:确保所有新写入的文件都属于要覆盖的分区
                if (partitionFilter != null) {
                    for (ManifestEntry entry : appendTableFiles) {
                        if (!partitionFilter.test(entry.partition())) {
                            throw new IllegalArgumentException(
// ... existing code ...
                                            + " does not belong to this partition");
                        }
                    }
                }
            }

            // 4. 调用 tryOverwrite 执行核心的覆盖写逻辑
            if (!skipOverwrite) {
                attempts +=
                        tryOverwrite(
                                partitionFilter,
                                appendTableFiles,
                                appendHashIndexFiles,
                                committable.identifier(),
                                committable.watermark(),
                                committable.logOffsets());
                generatedSnapshot += 1;
            }

            // 5. 如果本次提交还包含数据压缩(compaction)产生的文件,则单独提交一次 COMPACT 类型的快照
            if (!compactTableFiles.isEmpty() || !compactDvIndexFiles.isEmpty()) {
                attempts +=
                        tryCommit(
                                compactTableFiles,
                                emptyList(),
                                compactDvIndexFiles,
                                committable.identifier(),
                                committable.watermark(),
                                committable.logOffsets(),
                                Snapshot.CommitKind.COMPACT,
                                mustConflictCheck(),
                                null);
                generatedSnapshot += 1;
            }
        } finally {
            // 6. 报告提交指标
            long commitDuration = (System.nanoTime() - started) / 1_000_000;
            if (this.commitMetrics != null) {
                reportCommit(
                        appendTableFiles,
                        emptyList(),
                        compactTableFiles,
                        emptyList(),
                        commitDuration,
                        generatedSnapshot,
                        attempts);
            }
        }
    }
// ... existing code ...
    private int tryOverwrite(
            @Nullable PartitionPredicate partitionFilter,
            List<ManifestEntry> changes,
            List<IndexManifestEntry> indexFiles,
            long identifier,
            @Nullable Long watermark,
            Map<Integer, Long> logOffsets) {
        // 7. tryOverwrite 的核心逻辑
        Snapshot latestSnapshot = snapshotManager.latestSnapshot();
        List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
        List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
        if (latestSnapshot != null) {
            // 读取最新快照中所有的文件列表
            List<ManifestEntry> currentEntries = readAllEntries(latestSnapshot);
            for (ManifestEntry entry : currentEntries) {
                // 如果文件属于被覆盖的分区 (partitionFilter.test),则标记为 DELETE
                // 否则,保留原样
                if (partitionFilter == null || !partitionFilter.test(entry.partition())) {
                    changesWithOverwrite.add(entry);
                } else {
                    changesWithOverwrite.add(
                            new ManifestEntry(
                                    FileKind.DELETE,
                                    entry.partition(),
                                    entry.bucket(),
                                    entry.totalBuckets(),
                                    entry.file()));
                }
            }
// ... existing code ...
        }

        // 8. 将本次要新增的文件(来自参数 changes)加入列表
        changesWithOverwrite.addAll(changes);
        indexChangesWithOverwrite.addAll(indexFiles);

        // 9. 使用包含“删除旧文件”和“添加新文件”的完整列表,进行一次 OVERWRITE 类型的提交
        return tryCommit(
                changesWithOverwrite,
                emptyList(),
                indexChangesWithOverwrite,
                identifier,
                watermark,
                logOffsets,
                Snapshot.CommitKind.OVERWRITE,
                mustConflictCheck(),
                null);
    }
// ... existing code ...

总结 overwrite 方法的处理流程:

  1. 收集变更:首先,它将传入的 ManifestCommittable 中的文件变更信息进行分类,比如哪些是新增的数据文件,哪些是compaction产生的文件等。
  2. 忽略Changelogoverwrite 操作会明确地忽略所有 changelog 文件。这是因为 overwrite 是一个全量替换操作,会破坏数据的增量变更历史,与流式读取的语义不兼容。因此系统会打印警告,提醒用户不要在流读任务正在消费的分区上执行覆盖操作。
  3. 确定覆盖范围:这是最关键的步骤之一。它通过 dynamicPartitionOverwrite 配置项来决定如何确定要覆盖的分区范围。
    • 动态覆盖 (true):覆盖的范围由本次写入的新数据所在的分区决定。如果写入了 p1 和 p2 两个分区的数据,那么只有 p1 和 p2 的旧数据会被删除。如果本次写入没有产生任何新数据,则不会删除任何分区。
    • 静态覆盖 (false):覆盖的范围由传入的 partition 参数决定,这通常对应 SQL 中的 PARTITION (...) 子句。所有新写入的数据必须属于这个静态指定的分区。
  4. 执行原子替换:它调用私有的 tryOverwrite 方法。这个方法: a. 读取当前最新的快照,获取全部分区的所有数据文件列表。 b. 遍历这些文件,如果一个文件属于上一步确定的覆盖范围,就将其标记为 DELETE。不属于覆盖范围的文件则保持不变。 c. 将本次提交要新增的数据文件标记为 ADD。 d. 将所有这些变更(DELETE旧文件 + ADD新文件)作为一个整体,通过 tryCommit 方法以 Snapshot.CommitKind.OVERWRITE 的类型原子性地提交,生成一个新的快照。
  5. 处理其他变更:如果 committable 中还包含了数据压缩(compaction)的结果,会在 overwrite 操作之后,再进行一次独立的 COMPACT 类型的提交。
  6. 特殊用例truncateTable(清空表)和 dropPartitions(删除分区)方法内部也复用了 tryOverwrite 逻辑。它们可以看作是特殊的 overwrite,即用一个空的数据集去覆盖整个表或指定分区。

FileStoreCommitImpl.overwrite 是一个精心设计的底层方法,它通过精确地计算文件变更集(哪些删除、哪些新增),并利用 Paimon 的快照机制,实现了分区或整表的原子性覆盖写。


网站公告

今日签到

点亮在社区的每一天
去签到