Paimon 动态分桶

发布于:2025-07-18 ⋅ 阅读:(17) ⋅ 点赞:(0)

BucketAssigner

这是一个用于在 Paimon 中进行动态 Bucket 分配的工具类。根据其包名 crosspartition (跨分区),我们可以推断它主要服务于需要跨分区协调进行 Bucket 分配的场景。它的核心职责是为进入特定分区的每条记录智能地分配一个 Bucket ID。

BucketAssigner 类是一个状态化的管理器。它内部维护了每个分区(Partition)的 Bucket 分配情况的统计信息。其主要目标是:

  • 动态分配:当有新数据写入时,能够动态地决定数据应该放入哪个 Bucket。
  • 负载均衡:通过限制每个 Bucket 中的记录数量(maxCount),避免单个 Bucket 过大,从而实现负载均衡。
  • 高效复用:优先复用未满的现有 Bucket,当没有可用 Bucket 时再创建新的。

核心数据结构

// ... existing code ...
public class BucketAssigner {

    private final Map<BinaryRow, TreeMap<Integer, Integer>> stats = new HashMap<>();

// ... existing code ...

这个类只有一个成员变量 stats,但它是整个逻辑的核心:

  • Map<BinaryRow, TreeMap<Integer, Integer>> stats:
    • 外层 Map 的 Key (BinaryRow): 代表一个分区。BinaryRow 是 Paimon 中用于表示一行数据的二进制格式,这里用它来唯一标识一个分区键(Partition Key)。
    • 外层 Map 的 Value (TreeMap<Integer, Integer>): 存储了该分区内所有 Bucket 的统计信息。
      • 内层 TreeMap 的 Key (Integer): 代表 Bucket 的 ID。
      • 内层 TreeMap 的 Value (Integer): 代表当前已分配到该 Bucket 的记录数量。
    • 为什么用 TreeMap?TreeMap 会根据 Key(即 Bucket ID)进行排序。这使得在 assignBucket 方法中遍历现有 Bucket 时,会按照 Bucket ID 从小到大的顺序进行,保证了分配逻辑的确定性。

assignBucket

这是最核心的方法,负责执行 Bucket 分配逻辑。

// ... existing code ...
    public int assignBucket(BinaryRow part, Filter<Integer> filter, int maxCount) {
        TreeMap<Integer, Integer> bucketMap = bucketMap(part);
        // 阶段一:尝试复用现有 Bucket
        for (Map.Entry<Integer, Integer> entry : bucketMap.entrySet()) {
            int bucket = entry.getKey();
            int count = entry.getValue();
            if (filter.test(bucket) && count < maxCount) {
                bucketMap.put(bucket, count + 1);
                return bucket;
            }
        }

        // 阶段二:创建新 Bucket
        for (int i = 0; ; i++) {
            if (filter.test(i) && !bucketMap.containsKey(i)) {
                bucketMap.put(i, 1);
                return i;
            }
        }
    }
// ... existing code ...

它的分配策略分为两个阶段:

  1. 阶段一:尝试复用现有 Bucket

    • 它首先遍历当前分区 part 已有的所有 Bucket(bucketMap.entrySet())。
    • 对于每个 Bucket,它会检查两个条件:
      1. filter.test(bucket): 该 Bucket 是否满足外部传入的过滤条件。这个 filter 非常关键,它通常用于确保当前的 Assigner 任务只分配属于它自己管辖范围的 Bucket。例如,在一个拥有 N 个 Assigner 的集群中,可以约定 assignerId 为 i 的任务只负责处理 bucket % N == i 的 Bucket。
      2. count < maxCount: 该 Bucket 中的记录数是否已经达到上限 maxCount
    • 如果两个条件都满足,说明这个 Bucket 可用,就将该 Bucket 的计数值加一,并返回这个 Bucket ID。
  2. 阶段二:创建新 Bucket

    • 如果遍历完所有现有 Bucket 都没有找到合适的,说明需要创建一个新的 Bucket。
    • 它从 i = 0 开始无限循环,依次尝试新的 Bucket ID。
    • 对于每个尝试的 ID i,它同样会检查 filter.test(i) 是否通过,并且 !bucketMap.containsKey(i) 确保这个 Bucket 是全新的。
    • 一旦找到一个满足条件的、全新的 Bucket ID,就将其加入到 bucketMap 中,初始计数值为 1,并返回这个新的 Bucket ID。

bootstrapBucket

这个方法用于在分配开始前“引导”或“预加载”已有的 Bucket 信息。

// ... existing code ...
    public void bootstrapBucket(BinaryRow part, int bucket) {
        TreeMap<Integer, Integer> bucketMap = bucketMap(part);
        Integer count = bucketMap.get(bucket);
        if (count == null) {
            count = 0;
        }
        bucketMap.put(bucket, count + 1);
    }
// ... existing code ...

当系统从之前的状态恢复时,可以通过这个方法将已存在的 Bucket 及其中的大致记录数(这里是简单地+1)加载到 stats 中,以便 assignBucket 能够在其基础上继续分配。

decrement

这个方法用于将指定 Bucket 的计数值减一。

// ... existing code ...
    public void decrement(BinaryRow part, int bucket) {
        bucketMap(part).compute(bucket, (k, v) -> v == null ? 0 : v - 1);
    }
// ... existing code ...

这可能用于一些回滚或者数据迁移的场景,当一条记录被移出某个 Bucket 时,可以通过此方法更新统计信息。使用 compute 方法可以很简洁地处理 Bucket 可能不存在的情况。

bucketMap

这是一个私有的辅助方法,用于获取或创建指定分区的 TreeMap

// ... existing code ...
    private TreeMap<Integer, Integer> bucketMap(BinaryRow part) {
        TreeMap<Integer, Integer> map = stats.get(part);
        if (map == null) {
            map = new TreeMap<>();
            stats.put(part.copy(), map);
        }
        return map;
    }
}
  • 它实现了 "get-or-create" 模式。如果 stats 中已经存在该分区的 TreeMap,直接返回。
  • 如果不存在,则创建一个新的 TreeMap,并将其放入 stats 中。
  • 关键点: 在 stats.put 时,它使用了 part.copy()BinaryRow 对象在 Paimon 的处理流程中经常被复用,其底层的字节数组可能会被修改。如果不进行 copy,那么下一次使用同一个 BinaryRow 对象但代表不同分区时,就会污染 stats 中已有的 Key。通过 copy() 创建一个独立的副本,保证了 HashMap 中 Key 的稳定性和正确性。

总结

BucketAssigner 是一个设计精巧的动态 Bucket 分配器。它通过内存中的统计信息,结合外部的过滤逻辑和容量限制,实现了对数据写入的智能分桶。这对于 Paimon 的动态 Bucket 表(Dynamic Bucket Table)功能至关重要,因为它允许系统根据数据量自动扩展 Bucket 数量,而无需用户预先指定一个固定的 Bucket 数,从而提高了灵活性和资源利用率。

GlobalIndexAssigner

GlobalIndexAssigner 是 Paimon 实现跨分区 UPSERT 和动态分桶(Dynamic Bucket)功能的核心组件,它的设计比较复杂。我们来一步步、有条理地详细解析它。

首先,我们要明白它的核心使命是什么。在一个支持 UPSERT 的主键表里,当一条新数据到来时,系统必须回答两个问题:

  1. 这条数据是全新的吗?
  2. 如果不是全新的(即主键已存在),它之前在哪?

对于普通的主键表,数据写入时会根据主键计算一个固定的 bucket,所以新数据和老数据一定在同一个分区、同一个 bucket 下,问题很简单。

但对于跨分区 UPSERTcross-partition-upsert)的场景,一条数据的主键可能不变,但分区键变了(比如,一个用户的归属城市从“北京”变成了“上海”)。这意味着新数据和老数据在不同的分区里。为了正确地执行 UPSERT(即删除老分区的数据,插入新分区),系统必须有一个全局的视角,能够根据主key快速定位到它之前所在的分区和 bucket

GlobalIndexAssigner 就是为了提供这个全局主键索引而生的。它会为每一条到来的数据,分配一个最终的 bucket,并在这个过程中处理跨分区的 UPDATE_BEFORE 消息。

主要成员变量解析

我们先来理解它内部的关键成员变量

// ... existing code ...
public class GlobalIndexAssigner implements Serializable, Closeable {

    // ...

    // State & Storage
    private transient RocksDBStateFactory stateFactory;
    private transient RocksDBValueState<InternalRow, PositiveIntInt> keyIndex;
    private transient BinaryExternalSortBuffer bootstrapKeys;
    private transient RowBuffer bootstrapRecords;

    // Logic Components
    private transient BucketAssigner bucketAssigner;
    private transient ExistingProcessor existingProcessor;
    private transient IDMapping<BinaryRow> partMapping;

    // Configuration & Context
    private final FileStoreTable table;
    private transient int targetBucketRowNumber;
    private transient int numAssigners;
    private transient int assignId;
    private transient BiConsumer<InternalRow, Integer> collector;

    // ...
}
  • State & Storage (状态与存储)

    • stateFactory 和 keyIndex: 这是核心中的核心keyIndex 是一个基于 RocksDB 的键值存储。
      • Key: 表的主键 (InternalRow)。
      • Value: 一个 PositiveIntInt 对象,包含了两个信息:分区ID (partId) 和 bucket号
      • 它的作用就是维护一个 主键 -> (分区ID, bucket) 的全局映射。通过它,我们可以用 O(1) 的时间复杂度查到一个主键的全局位置。
    • bootstrapKeys: 一个外部排序缓冲。在引导阶段,用来缓存并排序所有存量数据的 (主键, (分区ID, bucket)) 信息,最后通过 BulkLoader 高效地批量载入 RocksDB。
    • bootstrapRecords: 一个行缓冲。在引导阶段,用来缓存所有流入的原始数据记录。因为在引导阶段结束前,我们还不能处理它们。
  • Logic Components (逻辑组件)

    • bucketAssigner: 就是我们之前详细讨论过的桶分配器。它负责在确定了分区后,为数据分配一个具体的 bucket
    • existingProcessor存量数据处理器。当发现一条新数据的主键已经存在于另一个分区时,由它来决定如何处理。比如,对于 deduplicate 引擎,它会生成一条 UPDATE_BEFORE 消息来删除老数据;对于 first-row 引擎,它可能会直接忽略新数据。
    • partMapping: 一个 分区 -> 整数ID 的映射。因为 RocksDB 中存储分区信息用整数ID比用完整的 BinaryRow 更高效,所以用这个组件来做转换。
  • Configuration & Context (配置与上下文)

    • table: 当前操作的 Paimon 表对象。
    • targetBucketRowNumber: 动态分桶的目标行数,即一个桶里期望存放多少条数据。
    • numAssigners 和 assignId: Flink 算子的并行度和当前子任务的ID。用于实现我们之前讨论过的无锁并发分桶
    • collector: 一个回调函数(消费者),用来将处理完成的 (数据, bucket) 对发送给下游。

GlobalIndexAssigner 的工作流程可以清晰地分为两个阶段:引导 (Bootstrap) 和 处理 (Process)

阶段一:引导 (Bootstrap)

这个阶段的目标是用表的存量数据构建 RocksDB 全局索引

  1. open(...): 初始化所有组件,包括创建 RocksDB 实例、初始化 BucketAssigner 等。此时 bootstrap 标志位为 true

  2. bootstrapKey(InternalRow value):

    • 这个方法被外部调用,逐条喂入从 Paimon 表快照中读出的存量数据
    • 它从 value 中提取 主键分区bucket
    • 调用 bucketAssigner.bootstrapBucket(...) 来累积每个分区的桶内记录数。
    • 将 (序列化的主键, 序列化的(分区ID, bucket)) 写入 bootstrapKeys 这个外部排序缓冲中。
  3. processInput(InternalRow value): 在引导阶段 (inBootstrap() 为 true),所有新流入的数据(不是存量数据)都会被临时存入 bootstrapRecords 这个行缓冲中,等待引导结束后再处理。

  4. endBoostrap(boolean isEndInput):

    • 标志着引导阶段的结束,bootstrap 标志位被设为 false
    • 核心动作:将 bootstrapKeys 中缓存的所有主键索引信息,通过 BulkLoader 批量、高效地载入 RocksDB。这比一条条 put 快得多。
    • 处理 bootstrapRecords 中缓存的数据:将它们一条条拿出,交给 processInput 方法进行正式处理。

阶段二:处理 (Process)

引导结束后,GlobalIndexAssigner 进入常规处理模式。

  1. processInput(InternalRow value):
    • 从 value 中提取分区 (partition) 和主键 (key)。
    • 用 key 去查询 keyIndex (RocksDB)。
    • Case 1: keyIndex.get(key) 返回 null
      • 说明这是一个全新的主键
      • 调用 processNewRecord(...) -> assignBucket(...) 为它在新分区中分配一个新 bucket
      • 将 (key, (新分区ID, 新bucket)) 存入 keyIndex
      • 通过 collector 将 (value, 新bucket) 发往下游。
    • Case 2: keyIndex.get(key) 返回一个已存在的 (分区ID, bucket)
      • Sub-case 2.1: 新老分区相同。说明这是一次普通的分区内更新,直接将 value 和老的 bucket 发往下游即可。
      • Sub-case 2.2: 新老分区不同。这是跨分区更新的关键场景。
        • 调用 existingProcessor.processExists(...)ExistingProcessor 会根据表的 merge-engine(合并引擎)策略来处理。例如,对于 deduplicate,它会生成一条 UPDATE_BEFORE 记录,标记为删除,并使用老的分区和 bucket 发往下游。
        • 如果 processExists 返回 true(意味着新数据需要被处理),则接着调用 processNewRecord 为这条新数据在新的分区中分配 bucket,并更新 keyIndex

并发与分布式处理

GlobalIndexAssigner 本身是可序列化的,它会被分发到 Flink 的多个并行 TaskManager 上执行。

  • 并发控制: 通过 isAssignBucket 方法中的 computeAssignId(bucket) == assignId 逻辑(作为filter),保证了每个并行实例只负责分配一部分 bucket ID,从而实现了无锁的并发分配。
  • 状态独立: 每个并行实例都有自己独立的 RocksDB 实例。这看起来似乎违背了“全局索引”的初衷,但实际上 Paimon 通过 Flink 的 KeyedStream 机制,保证了相同主键的数据总是被路由到同一个并行实例。因此,虽然物理上有多个 RocksDB 实例,但逻辑上,对于任何一个主键,它的索引信息只存在于其中一个固定的实例中,从而保证了全局索引的一致性。

总结

GlobalIndexAssigner 是一个集成了本地状态存储 (RocksDB)外部排序缓存分布式计算思想的复杂但高效的组件。它通过引导+处理的两阶段模式,以及基于主键路由的分布式状态,巧妙地解决了跨分区 UPSERT 场景下的全局索引难题,是 Paimon 动态表功能能够强大、高效运行的基石。

GlobalIndexAssignerOperator

GlobalIndexAssignerOperator 是 GlobalIndexAssigner 在 Flink 计算引擎中的宿主执行器。如果说 GlobalIndexAssigner 是一个功能强大的“引擎”,那么 GlobalIndexAssignerOperator 就是为这个引擎量身打造的“座驾”。它负责将 GlobalIndexAssigner 的逻辑无缝地集成到 Flink 的流处理拓扑中,处理生命周期、数据流转和状态管理。

public class GlobalIndexAssignerOperator
        extends AbstractStreamOperator<Tuple2<InternalRow, Integer>>
        implements OneInputStreamOperator<
                        Tuple2<KeyPartOrRow, InternalRow>, Tuple2<InternalRow, Integer>>,
                BoundedOneInput {
// ...
}

深入分析:

  • extends AbstractStreamOperator<...>: 表明它是一个标准的 Flink 流处理算子。它的输出类型是 Tuple2<InternalRow, Integer>,即 (数据行, bucket号),这正是 GlobalIndexAssigner 处理后的结果。
  • implements OneInputStreamOperator<...>: 表明这是一个单输入流的算子。它的输入类型是 Tuple2<KeyPartOrRow, InternalRow>。这个 KeyPartOrRow 是一个枚举,用来区分输入的数据是存量数据KEY_PART)还是增量数据ROW),这对于实现两阶段工作模式至关重要。
  • implements BoundedOneInput: 表明这个算子可以处理有界流(批处理作业)。它需要实现 endInput() 方法,当输入流结束时 Flink 会调用该方法。

核心成员变量

// ...
    private final GlobalIndexAssigner assigner;

    private transient IOManager ioManager;
// ...

深入分析:

  • assigner: 这是最重要的成员,它持有一个 GlobalIndexAssigner 的实例。Operator 的所有核心逻辑都是通过委托给这个 assigner 对象来完成的。
  • ioManager: 一个 transient 的 IO 管理器。transient 关键字意味着它不会被序列化。它在 initializeState 方法中被创建,用于管理 GlobalIndexAssigner 可能需要的磁盘溢写操作(例如 RocksDB 和外部排序缓冲)。

生命周期与初始化 (initializeState)

这是 Operator 在 Flink TaskManager 上启动时最先执行的方法之一,负责准备好 GlobalIndexAssigner 的运行环境。

// ...
    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        org.apache.flink.runtime.io.disk.iomanager.IOManager flinkIoManager =
                getContainingTask().getEnvironment().getIOManager();
        ioManager = IOManager.create(flinkIoManager.getSpillingDirectoriesPaths());
        assigner.open(
                computeManagedMemory(this),
                ioManager,
                RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()),
                RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
                this::collect);
    }
// ...

深入分析:

  • 获取 Flink 资源: 它从 Flink 的 Task 环境中获取底层的 IOManager,并用其临时目录来创建 Paimon 自己封装的 IOManager
  • 调用 assigner.open(...): 这是关键的桥接步骤。它将 Flink 的运行时信息传递给 GlobalIndexAssigner
    • computeManagedMemory(this): 计算并传入 Flink 为该算子分配的托管内存(Managed Memory),assigner 会用这部分内存来配置 RocksDB 的 Block Cache 等。
    • ioManager: 传入 IO 管理器。
    • getNumberOfParallelSubtasks(...): 传入作业的并行度。
    • getIndexOfThisSubtask(...): 传入当前子任务的 ID。
    • this::collect: 传入一个方法引用作为回调函数。当 assigner 处理完一条数据后,会调用这个回调,Operator 再通过它将结果发送到下游。

通过这一步,GlobalIndexAssigner 就被完全“激活”并准备好处理数据了。

数据处理 (processElement)

这是算子的核心数据处理逻辑,每当一条数据记录到达时,Flink 就会调用这个方法。

// ...
    @Override
    public void processElement(StreamRecord<Tuple2<KeyPartOrRow, InternalRow>> streamRecord)
            throws Exception {
        Tuple2<KeyPartOrRow, InternalRow> tuple2 = streamRecord.getValue();
        InternalRow value = tuple2.f1;
        switch (tuple2.f0) {
            case KEY_PART:
                assigner.bootstrapKey(value);
                break;
            case ROW:
                assigner.processInput(value);
                break;
        }
    }
// ...

深入分析:

  • 数据分发processElement 的逻辑非常清晰,它就像一个交通警察。它解析输入 Tuple2 的第一个字段 KeyPartOrRow
    • 如果值是 KEY_PART,说明这是来自上游 IndexBootstrapOperator 的存量数据,应该用于引导。于是它调用 assigner.bootstrapKey(value)
    • 如果值是 ROW,说明这是来自数据源的增量数据(或在引导结束后重新处理的缓存数据),应该进行正常处理。于是它调用 assigner.processInput(value)

这个 switch 语句完美地实现了对 GlobalIndexAssigner 两阶段工作模式的驱动。

Checkpoint 与批处理结束

这两个方法处理了流处理中的 Checkpoint 和批处理中的结束信号,它们都与 GlobalIndexAssigner 的引导阶段结束有关。

// ...
    @Override
    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        endBootstrap(false);
    }

    @Override
    public void endInput() throws Exception {
        endBootstrap(true);
    }

    private void endBootstrap(boolean isEndInput) throws Exception {
        if (assigner.inBoostrap()) {
            assigner.endBoostrap(isEndInput);
        }
    }
// ...

深入分析:

  • prepareSnapshotPreBarrier: 在 Flink 中,当 Checkpoint Barrier 到达一个算子之前,会先调用这个方法。这是一个结束引导阶段的完美时机。当第一个 Checkpoint Barrier 到达时,意味着所有存量数据(KEY_PART)都已经处理完毕。此时调用 endBootstrap(false)GlobalIndexAssigner 就会执行批量加载 RocksDB、处理缓存数据等操作,并切换到正常处理模式。
  • endInput: 在批处理模式下,当所有输入数据都处理完毕后,Flink 会调用此方法。这也标志着引导阶段(如果存在)必须结束。此时调用 endBootstrap(true),参数 isEndInput=true 会触发 GlobalIndexAssigner 中针对批处理的特殊优化路径。
  • endBootstrap: 这个私有方法封装了公共逻辑,即只有在 assigner 仍处于引导模式时 (inBoostrap() 为 true),才调用 endBoostrap,避免了重复调用。

当 Flink 作业结束或取消时,会调用 close 方法来释放资源。

// ...
    @Override
    public void close() throws Exception {
        this.assigner.close();
        if (ioManager != null) {
            ioManager.close();
        }
    }
// ...

深入分析:

  • 它会依次关闭 assigner(这会关闭 RocksDB 并删除临时文件)和 ioManager,确保所有本地资源都被正确清理,不会造成泄露。

总结

GlobalIndexAssignerOperator 是一个典型的适配器模式应用。它本身不包含复杂的业务逻辑,而是作为一个轻量级的“外壳”,将 GlobalIndexAssigner 这个通用的、与计算引擎无关的核心组件,适配到 Flink 的流处理模型中。它通过实现 Flink 的算子接口,巧妙地利用 Flink 的生命周期(initializeStateclose)、数据处理(processElement)和事件机制(prepareSnapshotPreBarrierendInput)来驱动 GlobalIndexAssigner 的两阶段工作流程,是 Paimon 动态分桶功能在 Flink 上得以实现的关键连接点。

GlobalDynamicBucketSink 

GlobalDynamicBucketSink 是 Paimon Flink Sink 体系中,专门用于处理启用了全局索引的动态分桶表(即 bucket-mode = 'cross-partition')的总装配车间。它的核心职责不是执行具体的写入逻辑,而是构建和编排 Flink 的 DataStream 作业拓扑。它像一个总工程师,将 IndexBootstrapOperatorGlobalIndexAssignerOperatorDynamicBucketRowWriteOperator 等一系列专用的算子(Operator)按照正确的顺序和数据分发逻辑连接起来,形成一个完整、高效的数据写入流水线。

public class GlobalDynamicBucketSink extends FlinkWriteSink<Tuple2<InternalRow, Integer>> {
// ...
}

深入分析:

  • extends FlinkWriteSink<Tuple2<InternalRow, Integer>>:
    • 它继承自 FlinkWriteSink,这是一个通用的 Paimon Flink Sink 基类,封装了创建 Writer 和 Committer 的通用逻辑。
    • 泛型参数 Tuple2<InternalRow, Integer> 非常关键,它定义了这个 Sink 流水线中,写入算子 (Writer) 的输入数据类型是 (数据行, bucket号)。这与我们之前分析的 GlobalIndexAssignerOperator 的输出类型完全一致,表明了数据流的衔接关系。

核心方法 build

build 方法是这个类的灵魂,它定义了整个 Flink 作业的拓扑结构。让我们分步解析这个方法的实现。

// ...
    public DataStreamSink<?> build(DataStream<InternalRow> input, @Nullable Integer parallelism) {
// ...
        // Topology:
        // input -- bootstrap -- shuffle by key hash --> bucket-assigner -- shuffle by bucket -->
        // writer --> committer
// ...
    }
// ...

代码注释已经清晰地勾勒出了整个数据流拓扑,下面我们逐一解析每个阶段:

阶段 0: 输入 (input: DataStream<InternalRow>)

这是数据流的起点,代表着从各种数据源(如 Kafka, CDC Source)流入的原始数据。

阶段 1: 引导 (bootstrap)
// ...
        SingleOutputStreamOperator<Tuple2<KeyPartOrRow, InternalRow>> bootstraped =
                input.transform(
                                "INDEX_BOOTSTRAP",
                                new InternalTypeInfo<>(
                                        new KeyWithRowSerializer<>(
                                                bootstrapSerializer, rowSerializer)),
                                new IndexBootstrapOperator.Factory<>(
                                        new IndexBootstrap(table), r -> r))
                        .setParallelism(input.getParallelism());
// ...
  • 目的: 为后续的 GlobalIndexAssigner 准备引导数据。
  • 实现:
    • 通过 input.transform 添加了一个名为 INDEX_BOOTSTRAP 的算子,这个算子就是 IndexBootstrapOperator
    • IndexBootstrapOperator 内部持有一个 IndexBootstrap 实例,它会读取 Paimon 表的当前快照,将所有存量数据作为 KeyPartOrRow.KEY_PART 类型输出。
    • 同时,它也会将增量数据(即 input 流中的数据)直接作为 KeyPartOrRow.ROW 类型透传下去。
    • 这样,输出流 bootstraped 中就混合了两种类型的数据,为后续的 GlobalIndexAssignerOperator 的两阶段工作模式做好了准备。
阶段 2: 第一次 Shuffle (shuffle by key hash)
// ...
        // 1. shuffle by key hash
        Integer assignerParallelism =
                MathUtils.max(
                        options.dynamicBucketInitialBuckets(),
                        options.dynamicBucketAssignerParallelism());
        if (assignerParallelism == null) {
            assignerParallelism = parallelism;
        }

        KeyPartRowChannelComputer channelComputer =
                new KeyPartRowChannelComputer(rowType, bootstrapType, primaryKeys);
        DataStream<Tuple2<KeyPartOrRow, InternalRow>> partitionByKeyHash =
                partition(bootstraped, channelComputer, assignerParallelism);
// ...
  • 目的: 保证相同主键的数据(无论是存量还是增量)都被发送到同一个 bucket-assigner 并行实例上。这是 GlobalIndexAssigner 能够正确维护全局索引状态的前提。
  • 实现:
    • partition 是一个工具方法,它本质上是调用了 Flink 的 partitionCustom API。
    • KeyPartRowChannelComputer 是一个自定义的分区器。它会从输入数据中提取主键,计算其哈希值,然后根据哈希值决定数据应该被发送到哪个下游分区。
阶段 3: 分配桶 (bucket-assigner)
// ...
        // 2. bucket-assigner
        TupleTypeInfo<Tuple2<InternalRow, Integer>> rowWithBucketType =
                new TupleTypeInfo<>(input.getType(), BasicTypeInfo.INT_TYPE_INFO);
        SingleOutputStreamOperator<Tuple2<InternalRow, Integer>> bucketAssigned =
                partitionByKeyHash
                        .transform(
                                "cross-partition-bucket-assigner",
                                rowWithBucketType,
                                GlobalIndexAssignerOperator.forRowData(table))
                        .setParallelism(partitionByKeyHash.getParallelism());

        // declare managed memory for RocksDB
        declareManagedMemory(
                bucketAssigned, options.toConfiguration().get(SINK_CROSS_PARTITION_MANAGED_MEMORY));
// ...
  • 目的: 为每一条数据分配一个最终的 bucket 号。
  • 实现:
    • 在 partitionByKeyHash 流上再次调用 transform,添加了 GlobalIndexAssignerOperator
    • 这个算子接收上游按主键 shuffle 好的数据,利用其内部的 RocksDB 状态和 BucketAssigner 逻辑,为每条数据计算出 bucket
    • 输出流 bucketAssigned 的类型变成了 Tuple2<InternalRow, Integer>,即 (数据行, bucket号)
    • declareManagedMemory 为这个算子声明了需要的 Flink 托管内存,这部分内存主要会被 RocksDB 用于缓存。
阶段 4: 第二次 Shuffle (shuffle by bucket)
// ...
        // 3. shuffle by bucket
        DataStream<Tuple2<InternalRow, Integer>> partitionByBucket =
                partition(bucketAssigned, new RowWithBucketChannelComputer(schema), parallelism);
// ...
  • 目的: 将数据按照分区进行重新分发,确保同一个桶的数据被发送到同一个 writer 实例。这是 Paimon 文件写入的基本要求,因为一个数据文件只能由一个 writer 写入。
  • 实现:
    • 再次调用 partition 工具方法。
    • RowWithBucketChannelComputer 是另一个自定义分区器。它会从 (数据行, bucket号) 中提取分区键和 bucket 号,然后根据这两者的组合哈希值来决定下游分区。
阶段 5: 写入与提交 (writer --> committer)
// ...
        // 4. writer and committer
        return sinkFrom(partitionByBucket, createCommitUser(options.toConfiguration()));
    }
// ...
    @Override
    protected OneInputStreamOperatorFactory<Tuple2<InternalRow, Integer>, Committable>
            createWriteOperatorFactory(StoreSinkWrite.Provider writeProvider, String commitUser) {
        return new DynamicBucketRowWriteOperator.Factory(table, writeProvider, commitUser);
    }
// ...
  • 目的: 将数据写入 Paimon 的数据文件,并在 Checkpoint 完成时生成可提交的元数据(Committable)。
  • 实现:
    • sinkFrom 是 FlinkWriteSink 基类提供的方法,它会组装 Writer 和 Committer 算子。
    • createWriteOperatorFactory 方法被重写,返回一个 DynamicBucketRowWriteOperator.Factory。这告诉 sinkFrom 方法,应该使用 DynamicBucketRowWriteOperator 作为写入算子。
    • DynamicBucketRowWriteOperator 接收按桶 shuffle 好的 (数据行, bucket号) 数据,并调用底层的 StoreSinkWrite 将其写入文件。

什么是存量数据和增量数据?

为了更好地理解,我们可以用一个仓库管理的比喻:

  • 存量数据 (Stock Data):

    • 比喻: 想象一下,你今天第一天接管一个仓库。在开始记录今天新进来的货物之前,你首先需要做的,是把仓库里已经存在的所有货物都清点一遍,登记在册。这个“已经存在的货物”就是存量
    • 在 Paimon 中: 当你启动一个 Flink 作业去写入一张 Paimon 表时,这张表里可能已经通过之前的作业写入了很多数据。这些在 Flink 作业启动那一刻已经存在于 Paimon 表文件里的数据,就是存量数据
    • 代码关联IndexBootstrap 这个类的核心职责,就是去扫描 Paimon 表的最新快照,把这些存量数据读取出来。
  • 增量数据 (Incremental Data):

    • 比喻: 在你清点完库存之后,今天陆陆续续有新的货车开到仓库门口,卸下新的货物。这些“新来的货物”就是增量
    • 在 Paimon 中: 当你的 Flink 作业启动并运行后,从数据源(比如 Kafka、CDC)实时流过来的新数据,就是增量数据
    • 代码关联IndexBootstrapOperator 的 processElement 方法,以及 GlobalIndexAssignerOperator 在引导结束后处理的数据,都属于增量数据。

总结一下:存量是“过去时”,是启动时的快照;增量是“现在进行时”,是启动后源源不断流入的数据流。

为什么 GlobalIndexAssignerOperator 需要处理存量数据?

为了保证主键的全局唯一性,并正确处理跨分区的更新(UPSERT)。

GlobalIndexAssignerOperator 的核心使命是为每一条数据分配一个 bucket。对于启用了全局索引的动态分桶表,它必须知道一个主键(Primary Key)当前到底在哪一个分区、哪一个 bucket 里。如果不知道这个信息,它就无法做出正确的判断。

让我们通过一个具体的例子来理解这个过程:

场景:

  • 一张用户表,以 user_id 为主键,login_date 为分区键。
  • merge-engine 设置为 deduplicate(去重,保留最新)。

初始状态 (存量数据):

  • 表里已经有一条数据:{user_id: 101, name: 'Alice', login_date: '2023-10-01'}。这条数据位于分区 pt='2023-10-01',假设在 bucket-3

现在,Flink 作业启动了,GlobalIndexAssignerOperator 开始工作。

第一步:处理存量数据(引导阶段 Bootstrap)

  1. IndexBootstrapOperator 运行 IndexBootstrap 逻辑,扫描全表,找到了存量数据 {user_id: 101, ...}
  2. 它将这条数据标记为 KEY_PART,发送给 GlobalIndexAssignerOperator
  3. GlobalIndexAssignerOperator 收到后,调用 assigner.bootstrapKey()
  4. GlobalIndexAssigner 将这个信息加载到它的本地 RocksDB 索引中,记录下:Key(101) -> Value(partition_id_of_2023-10-01, bucket_3)

至此,引导阶段完成。GlobalIndexAssigner 已经知道了所有“老”数据的位置。

第二步:处理增量数据

  1. 现在,从数据源来了一条新的增量数据{user_id: 101, name: 'Alice_New', login_date: '2023-10-05'}
  2. 这条数据流经 IndexBootstrapOperator,被标记为 ROW,发送给 GlobalIndexAssignerOperator
  3. GlobalIndexAssignerOperator 调用 assigner.processInput()
  4. GlobalIndexAssigner 拿到主键 101,去查询它的 RocksDB 索引。
  5. 它查到了! 索引告诉它,user_id=101 的数据之前存在于分区 pt='2023-10-01' 的 bucket-3 中。
  6. 因为新数据的分区是 '2023-10-05',与老数据不同,这触发了跨分区更新逻辑。
  7. 根据 deduplicate 引擎的规则,它会执行以下操作:
    • 生成一条 DELETE 记录,指向老的位置:{user_id: 101, ..., RowKind: DELETE},并给它分配 bucket-3,发往下游的 Writer。
    • 为新数据 {user_id: 101, name: 'Alice_New', ...} 分配一个新的 bucket(比如 bucket-8),并更新 RocksDB 索引为 Key(101) -> Value(partition_id_of_2023-10-05, bucket_8),然后将这条 INSERT 记录发往下游。

如果没有第一步对存量数据的处理,GlobalIndexAssigner 的 RocksDB 索引就是空的。当第二步的增量数据到来时,它会认为 user_id=101 是一个全新的主键,直接为其分配一个新的 bucket 并插入。这样做的后果是灾难性的:Paimon 表里会同时存在两条 user_id=101 的数据(一条在 2023-10-01 分区,一条在 2023-10-05 分区),主键约束被破坏了

因此,GlobalIndexAssignerOperator 必须先通过 IndexBootstrap 加载和处理所有“之前的数据”(存量数据),构建起一个完整的、反映历史状态的全局索引。只有在这个坚实的基础上,它才能正确地处理后续的增量数据,实现精准的跨分区 UPSERT

总结

GlobalDynamicBucketSink 通过其 build 方法,以一种声明式的方式,完美地编排了一个复杂的多阶段 Flink 流处理作业。这个作业拓扑通过两次关键的 Shuffle 操作,解决了全局索引和数据写入的核心数据分发问题:

  1. 第一次 Shuffle (by Key): 保证了状态计算的正确性(相同主键到同个 assigner)。
  2. 第二次 Shuffle (by Bucket): 保证了数据写入的正确性(相同桶到同个 writer)。

整个流程清晰、高效,充分利用了 Flink 的数据流和算子模型,是 Paimon 实现跨分区动态分桶这一高级功能的基石。

IndexBootstrap 和 IndexBootstrapOperator 

它们共同完成了为全局索引提供存量数据这个关键任务。

IndexBootstrap 是逻辑执行者,负责从 Paimon 表中读取存量数据。 IndexBootstrapOperator 是物理执行者,它在 Flink 环境中运行 IndexBootstrap 的逻辑,并将读取到的存量数据和流经的增量数据一起发送给下游。

IndexBootstrap 的核心使命是:扫描 Paimon 表的最新快照,读取所有存量数据,并提取出构建全局索引所必需的信息

我们来看它的核心方法 bootstrap(int numAssigners, int assignId)

// ... existing code ...
    public RecordReader<InternalRow> bootstrap(int numAssigners, int assignId) throws IOException {
        RowType rowType = table.rowType();
        List<String> fieldNames = rowType.getFieldNames();
        int[] keyProjection =
                table.primaryKeys().stream()
                        .map(fieldNames::indexOf)
                        .mapToInt(Integer::intValue)
                        .toArray();

        // 1. 强制使用 latest 模式扫描
        ReadBuilder readBuilder =
                table.copy(Collections.singletonMap(SCAN_MODE.key(), LATEST.toString()))
                        .newReadBuilder()
                        .withProjection(keyProjection);

        // 2. 规划并过滤 Splits
        DataTableScan tableScan = (DataTableScan) readBuilder.newScan();
        List<Split> splits =
                tableScan
                        .withBucketFilter(bucket -> bucket % numAssigners == assignId)
                        .plan()
                        .splits();

        // 3. (可选) 根据 TTL 过滤 Splits
        CoreOptions options = CoreOptions.fromMap(table.options());
        Duration indexTtl = options.crossPartitionUpsertIndexTtl();
        if (indexTtl != null) {
            // ... filter splits by TTL ...
        }

        // 4. 并行读取并拼接数据
        RowDataToObjectArrayConverter partBucketConverter =
                new RowDataToObjectArrayConverter(
                        TypeUtils.concat(
                                TypeUtils.project(rowType, table.partitionKeys()),
                                RowType.of(DataTypes.INT())));

        return parallelExecute(
                TypeUtils.project(rowType, keyProjection),
                s -> readBuilder.newRead().createReader(s),
                splits,
                options.pageSize(),
                options.crossPartitionUpsertBootstrapParallelism(),
                split -> {
                    DataSplit dataSplit = ((DataSplit) split);
                    int bucket = dataSplit.bucket();
                    return partBucketConverter.toGenericRow(
                            new JoinedRow(dataSplit.partition(), GenericRow.of(bucket)));
                },
                (row, extra) -> new JoinedRow().replace(row, extra));
    }
// ... existing code ...

深入分析:

  1. 强制 latest 扫描: 它创建了一个新的 ReadBuilder,并强制将扫描模式(scan.mode)设置为 latest。这意味着它总是读取表的最新快照,确保获取到所有最新的存量数据。同时,它只投影(withProjection)了主键字段,因为引导阶段只需要主键信息。

  2. 规划并过滤 Splits:

    • 它调用 tableScan.plan() 来规划出所有需要读取的数据文件(Splits)。
    • 最关键的一步是 .withBucketFilter(bucket -> bucket % numAssigners == assignId)。这行代码实现了引导任务的并行化numAssigners 是下游 GlobalIndexAssignerOperator 的并行度,assignId 是当前实例的 ID。这个过滤器确保了每个 IndexBootstrap 实例只读取一部分 bucket 的数据,避免了重复工作。
  3. TTL 过滤 (可选): 如果用户配置了 cross-partition-upsert.index-ttl,它会进一步过滤 SplitsfilterSplit 方法会检查 Split 中的文件创建时间,如果所有文件都已经超出了 TTL,那么这个 Split 就会被丢弃,因为它的索引数据已经过期,无需加载。

  4. 并行读取与数据拼接:

    • parallelExecute 是一个工具方法,它会启动一个线程池(并行度由 cross-partition-upsert.bootstrap-parallelism 控制)来并行地读取所有规划好的 Splits
    • 对于每个 Split,它会提取出分区信息和 bucket 号
    • 对于从文件中读取的每一行数据(只包含主键),它会和上一步提取的 (分区, bucket) 信息拼接(JoinedRow)起来。
    • 最终,它返回一个 RecordReader,这个 Reader 产出的每一行 InternalRow 都包含了 (主键字段..., 分区字段..., bucket号),这正是下游 GlobalIndexAssigner 进行引导所需要的全部信息。

IndexBootstrapOperator: Flink 中的“调度器”

IndexBootstrapOperator 是一个 Flink 算子,它为 IndexBootstrap 提供了运行环境,并负责将引导数据和增量数据进行“标记”和“转发”。

public class IndexBootstrapOperator<T> extends AbstractStreamOperator<Tuple2<KeyPartOrRow, T>>
        implements OneInputStreamOperator<T, Tuple2<KeyPartOrRow, T>> {
// ...
    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        // 1. 启动引导过程
        bootstrap.bootstrap(
                RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()),
                RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
                this::collect);
    }

    @Override
    public void processElement(StreamRecord<T> streamRecord) throws Exception {
        // 2. 处理增量数据
        output.collect(new StreamRecord<>(new Tuple2<>(KeyPartOrRow.ROW, streamRecord.getValue())));
    }

    private void collect(InternalRow row) {
        // 3. 处理引导数据
        output.collect(
                new StreamRecord<>(new Tuple2<>(KeyPartOrRow.KEY_PART, converter.apply(row))));
    }
// ...
}

深入分析:

  1. 启动引导 (initializeState):

    • 在算子初始化时,它会立即调用 bootstrap.bootstrap(...) 方法。
    • 它将 Flink 的并行度(getNumberOfParallelSubtasks)和当前子任务 ID(getIndexOfThisSubtask)传递给 IndexBootstrap,这样 IndexBootstrap 才知道自己应该读取哪些 bucket 的数据。
    • 它将自己的 collect 方法作为回调函数传进去。
  2. 处理增量数据 (processElement):

    • 这个方法处理从上游流过来的增量数据(比如来自 Kafka 的新消息)。
    • 它将每一条增量数据包装成 Tuple2(KeyPartOrRow.ROW, data) 的形式,然后发送给下游。KeyPartOrRow.ROW 这个标记告诉下游的 GlobalIndexAssignerOperator:“这是一条增量数据,请按正常流程处理”。
  3. 处理引导数据 (collect):

    • 这个方法是 IndexBootstrap 的回调函数。当 IndexBootstrap 在后台读取并拼接好一条存量数据后,就会调用这个 collect 方法。
    • 它将收到的存量数据(row)包装成 Tuple2(KeyPartOrRow.KEY_PART, data) 的形式,然后发送给下游。KeyPartOrRow.KEY_PART 这个标记告诉下游的 GlobalIndexAssignerOperator:“这是一条用于引导的存量数据,请调用 bootstrapKey 方法”。

总结

IndexBootstrap 和 IndexBootstrapOperator 的关系可以总结如下:

  • IndexBootstrap 是一个数据源。它负责**“生产”** 用于构建全局索引的存量数据。它通过扫描 Paimon 表文件,并利用 bucket 过滤来实现并行化读取。
  • IndexBootstrapOperator 是一个转换器和转发器。它在 Flink 作业启动时,触发 IndexBootstrap 开始生产数据。然后,它扮演一个交通枢纽的角色:
    • 对于 IndexBootstrap 生产的存量数据,它打上 KEY_PART 标签。
    • 对于从上游流经的增量数据,它打上 ROW 标签。
    • 最后,它将这两种打好标签的数据流合并在一起,发送给下游的 GlobalIndexAssignerOperator 进行统一处理。

这个设计将存量数据读取增量数据处理巧妙地统一到了一个 Flink 数据流中,为后续的全局索引构建和动态分桶奠定了基础。


网站公告

今日签到

点亮在社区的每一天
去签到