BucketAssigner
这是一个用于在 Paimon 中进行动态 Bucket 分配的工具类。根据其包名 crosspartition
(跨分区),我们可以推断它主要服务于需要跨分区协调进行 Bucket 分配的场景。它的核心职责是为进入特定分区的每条记录智能地分配一个 Bucket ID。
BucketAssigner
类是一个状态化的管理器。它内部维护了每个分区(Partition)的 Bucket 分配情况的统计信息。其主要目标是:
- 动态分配:当有新数据写入时,能够动态地决定数据应该放入哪个 Bucket。
- 负载均衡:通过限制每个 Bucket 中的记录数量(
maxCount
),避免单个 Bucket 过大,从而实现负载均衡。 - 高效复用:优先复用未满的现有 Bucket,当没有可用 Bucket 时再创建新的。
核心数据结构
// ... existing code ...
public class BucketAssigner {
private final Map<BinaryRow, TreeMap<Integer, Integer>> stats = new HashMap<>();
// ... existing code ...
这个类只有一个成员变量 stats
,但它是整个逻辑的核心:
Map<BinaryRow, TreeMap<Integer, Integer>> stats
:- 外层 Map 的 Key (
BinaryRow
): 代表一个分区。BinaryRow
是 Paimon 中用于表示一行数据的二进制格式,这里用它来唯一标识一个分区键(Partition Key)。 - 外层 Map 的 Value (
TreeMap<Integer, Integer>
): 存储了该分区内所有 Bucket 的统计信息。- 内层 TreeMap 的 Key (
Integer
): 代表 Bucket 的 ID。 - 内层 TreeMap 的 Value (
Integer
): 代表当前已分配到该 Bucket 的记录数量。
- 内层 TreeMap 的 Key (
- 为什么用
TreeMap
?:TreeMap
会根据 Key(即 Bucket ID)进行排序。这使得在assignBucket
方法中遍历现有 Bucket 时,会按照 Bucket ID 从小到大的顺序进行,保证了分配逻辑的确定性。
- 外层 Map 的 Key (
assignBucket
这是最核心的方法,负责执行 Bucket 分配逻辑。
// ... existing code ...
public int assignBucket(BinaryRow part, Filter<Integer> filter, int maxCount) {
TreeMap<Integer, Integer> bucketMap = bucketMap(part);
// 阶段一:尝试复用现有 Bucket
for (Map.Entry<Integer, Integer> entry : bucketMap.entrySet()) {
int bucket = entry.getKey();
int count = entry.getValue();
if (filter.test(bucket) && count < maxCount) {
bucketMap.put(bucket, count + 1);
return bucket;
}
}
// 阶段二:创建新 Bucket
for (int i = 0; ; i++) {
if (filter.test(i) && !bucketMap.containsKey(i)) {
bucketMap.put(i, 1);
return i;
}
}
}
// ... existing code ...
它的分配策略分为两个阶段:
阶段一:尝试复用现有 Bucket
- 它首先遍历当前分区
part
已有的所有 Bucket(bucketMap.entrySet()
)。 - 对于每个 Bucket,它会检查两个条件:
filter.test(bucket)
: 该 Bucket 是否满足外部传入的过滤条件。这个filter
非常关键,它通常用于确保当前的Assigner
任务只分配属于它自己管辖范围的 Bucket。例如,在一个拥有 N 个 Assigner 的集群中,可以约定assignerId
为i
的任务只负责处理bucket % N == i
的 Bucket。count < maxCount
: 该 Bucket 中的记录数是否已经达到上限maxCount
。
- 如果两个条件都满足,说明这个 Bucket 可用,就将该 Bucket 的计数值加一,并返回这个 Bucket ID。
- 它首先遍历当前分区
阶段二:创建新 Bucket
- 如果遍历完所有现有 Bucket 都没有找到合适的,说明需要创建一个新的 Bucket。
- 它从
i = 0
开始无限循环,依次尝试新的 Bucket ID。 - 对于每个尝试的 ID
i
,它同样会检查filter.test(i)
是否通过,并且!bucketMap.containsKey(i)
确保这个 Bucket 是全新的。 - 一旦找到一个满足条件的、全新的 Bucket ID,就将其加入到
bucketMap
中,初始计数值为 1,并返回这个新的 Bucket ID。
bootstrapBucket
这个方法用于在分配开始前“引导”或“预加载”已有的 Bucket 信息。
// ... existing code ...
public void bootstrapBucket(BinaryRow part, int bucket) {
TreeMap<Integer, Integer> bucketMap = bucketMap(part);
Integer count = bucketMap.get(bucket);
if (count == null) {
count = 0;
}
bucketMap.put(bucket, count + 1);
}
// ... existing code ...
当系统从之前的状态恢复时,可以通过这个方法将已存在的 Bucket 及其中的大致记录数(这里是简单地+1)加载到 stats
中,以便 assignBucket
能够在其基础上继续分配。
decrement
这个方法用于将指定 Bucket 的计数值减一。
// ... existing code ...
public void decrement(BinaryRow part, int bucket) {
bucketMap(part).compute(bucket, (k, v) -> v == null ? 0 : v - 1);
}
// ... existing code ...
这可能用于一些回滚或者数据迁移的场景,当一条记录被移出某个 Bucket 时,可以通过此方法更新统计信息。使用 compute
方法可以很简洁地处理 Bucket 可能不存在的情况。
bucketMap
这是一个私有的辅助方法,用于获取或创建指定分区的 TreeMap
。
// ... existing code ...
private TreeMap<Integer, Integer> bucketMap(BinaryRow part) {
TreeMap<Integer, Integer> map = stats.get(part);
if (map == null) {
map = new TreeMap<>();
stats.put(part.copy(), map);
}
return map;
}
}
- 它实现了 "get-or-create" 模式。如果
stats
中已经存在该分区的TreeMap
,直接返回。 - 如果不存在,则创建一个新的
TreeMap
,并将其放入stats
中。 - 关键点: 在
stats.put
时,它使用了part.copy()
。BinaryRow
对象在 Paimon 的处理流程中经常被复用,其底层的字节数组可能会被修改。如果不进行copy
,那么下一次使用同一个BinaryRow
对象但代表不同分区时,就会污染stats
中已有的 Key。通过copy()
创建一个独立的副本,保证了HashMap
中 Key 的稳定性和正确性。
总结
BucketAssigner
是一个设计精巧的动态 Bucket 分配器。它通过内存中的统计信息,结合外部的过滤逻辑和容量限制,实现了对数据写入的智能分桶。这对于 Paimon 的动态 Bucket 表(Dynamic Bucket Table)功能至关重要,因为它允许系统根据数据量自动扩展 Bucket 数量,而无需用户预先指定一个固定的 Bucket 数,从而提高了灵活性和资源利用率。
GlobalIndexAssigner
GlobalIndexAssigner
是 Paimon 实现跨分区 UPSERT
和动态分桶(Dynamic Bucket)功能的核心组件,它的设计比较复杂。我们来一步步、有条理地详细解析它。
首先,我们要明白它的核心使命是什么。在一个支持 UPSERT
的主键表里,当一条新数据到来时,系统必须回答两个问题:
- 这条数据是全新的吗?
- 如果不是全新的(即主键已存在),它之前在哪?
对于普通的主键表,数据写入时会根据主键计算一个固定的 bucket
,所以新数据和老数据一定在同一个分区、同一个 bucket
下,问题很简单。
但对于跨分区 UPSERT
(cross-partition-upsert
)的场景,一条数据的主键可能不变,但分区键变了(比如,一个用户的归属城市从“北京”变成了“上海”)。这意味着新数据和老数据在不同的分区里。为了正确地执行 UPSERT
(即删除老分区的数据,插入新分区),系统必须有一个全局的视角,能够根据主key快速定位到它之前所在的分区和 bucket
。
GlobalIndexAssigner
就是为了提供这个全局主键索引而生的。它会为每一条到来的数据,分配一个最终的 bucket
,并在这个过程中处理跨分区的 UPDATE_BEFORE
消息。
主要成员变量解析
我们先来理解它内部的关键成员变量
// ... existing code ...
public class GlobalIndexAssigner implements Serializable, Closeable {
// ...
// State & Storage
private transient RocksDBStateFactory stateFactory;
private transient RocksDBValueState<InternalRow, PositiveIntInt> keyIndex;
private transient BinaryExternalSortBuffer bootstrapKeys;
private transient RowBuffer bootstrapRecords;
// Logic Components
private transient BucketAssigner bucketAssigner;
private transient ExistingProcessor existingProcessor;
private transient IDMapping<BinaryRow> partMapping;
// Configuration & Context
private final FileStoreTable table;
private transient int targetBucketRowNumber;
private transient int numAssigners;
private transient int assignId;
private transient BiConsumer<InternalRow, Integer> collector;
// ...
}
State & Storage (状态与存储)
stateFactory
和keyIndex
: 这是核心中的核心。keyIndex
是一个基于 RocksDB 的键值存储。- Key: 表的主键 (
InternalRow
)。 - Value: 一个
PositiveIntInt
对象,包含了两个信息:分区ID (partId
) 和 bucket号。 - 它的作用就是维护一个
主键 -> (分区ID, bucket)
的全局映射。通过它,我们可以用 O(1) 的时间复杂度查到一个主键的全局位置。
- Key: 表的主键 (
bootstrapKeys
: 一个外部排序缓冲。在引导阶段,用来缓存并排序所有存量数据的(主键, (分区ID, bucket))
信息,最后通过BulkLoader
高效地批量载入 RocksDB。bootstrapRecords
: 一个行缓冲。在引导阶段,用来缓存所有流入的原始数据记录。因为在引导阶段结束前,我们还不能处理它们。
Logic Components (逻辑组件)
bucketAssigner
: 就是我们之前详细讨论过的桶分配器。它负责在确定了分区后,为数据分配一个具体的bucket
。existingProcessor
: 存量数据处理器。当发现一条新数据的主键已经存在于另一个分区时,由它来决定如何处理。比如,对于deduplicate
引擎,它会生成一条UPDATE_BEFORE
消息来删除老数据;对于first-row
引擎,它可能会直接忽略新数据。partMapping
: 一个分区 -> 整数ID
的映射。因为 RocksDB 中存储分区信息用整数ID比用完整的BinaryRow
更高效,所以用这个组件来做转换。
Configuration & Context (配置与上下文)
table
: 当前操作的 Paimon 表对象。targetBucketRowNumber
: 动态分桶的目标行数,即一个桶里期望存放多少条数据。numAssigners
和assignId
: Flink 算子的并行度和当前子任务的ID。用于实现我们之前讨论过的无锁并发分桶。collector
: 一个回调函数(消费者),用来将处理完成的(数据, bucket)
对发送给下游。
GlobalIndexAssigner
的工作流程可以清晰地分为两个阶段:引导 (Bootstrap) 和 处理 (Process)。
阶段一:引导 (Bootstrap)
这个阶段的目标是用表的存量数据构建 RocksDB 全局索引。
open(...)
: 初始化所有组件,包括创建 RocksDB 实例、初始化BucketAssigner
等。此时bootstrap
标志位为true
。bootstrapKey(InternalRow value)
:- 这个方法被外部调用,逐条喂入从 Paimon 表快照中读出的存量数据。
- 它从
value
中提取主键
、分区
、bucket
。 - 调用
bucketAssigner.bootstrapBucket(...)
来累积每个分区的桶内记录数。 - 将
(序列化的主键, 序列化的(分区ID, bucket))
写入bootstrapKeys
这个外部排序缓冲中。
processInput(InternalRow value)
: 在引导阶段 (inBootstrap()
为true
),所有新流入的数据(不是存量数据)都会被临时存入bootstrapRecords
这个行缓冲中,等待引导结束后再处理。endBoostrap(boolean isEndInput)
:- 标志着引导阶段的结束,
bootstrap
标志位被设为false
。 - 核心动作:将
bootstrapKeys
中缓存的所有主键索引信息,通过BulkLoader
批量、高效地载入 RocksDB。这比一条条put
快得多。 - 处理
bootstrapRecords
中缓存的数据:将它们一条条拿出,交给processInput
方法进行正式处理。
- 标志着引导阶段的结束,
阶段二:处理 (Process)
引导结束后,GlobalIndexAssigner
进入常规处理模式。
processInput(InternalRow value)
:- 从
value
中提取分区 (partition
) 和主键 (key
)。 - 用
key
去查询keyIndex
(RocksDB)。 - Case 1:
keyIndex.get(key)
返回null
- 说明这是一个全新的主键。
- 调用
processNewRecord(...)
->assignBucket(...)
为它在新分区中分配一个新bucket
。 - 将
(key, (新分区ID, 新bucket))
存入keyIndex
。 - 通过
collector
将(value, 新bucket)
发往下游。
- Case 2:
keyIndex.get(key)
返回一个已存在的(分区ID, bucket)
- Sub-case 2.1: 新老分区相同。说明这是一次普通的分区内更新,直接将
value
和老的bucket
发往下游即可。 - Sub-case 2.2: 新老分区不同。这是跨分区更新的关键场景。
- 调用
existingProcessor.processExists(...)
。ExistingProcessor
会根据表的merge-engine
(合并引擎)策略来处理。例如,对于deduplicate
,它会生成一条UPDATE_BEFORE
记录,标记为删除,并使用老的分区和bucket
发往下游。 - 如果
processExists
返回true
(意味着新数据需要被处理),则接着调用processNewRecord
为这条新数据在新的分区中分配bucket
,并更新keyIndex
。
- 调用
- Sub-case 2.1: 新老分区相同。说明这是一次普通的分区内更新,直接将
- 从
并发与分布式处理
GlobalIndexAssigner
本身是可序列化的,它会被分发到 Flink 的多个并行 TaskManager 上执行。
- 并发控制: 通过
isAssignBucket
方法中的computeAssignId(bucket) == assignId
逻辑(作为filter),保证了每个并行实例只负责分配一部分bucket
ID,从而实现了无锁的并发分配。 - 状态独立: 每个并行实例都有自己独立的 RocksDB 实例。这看起来似乎违背了“全局索引”的初衷,但实际上 Paimon 通过 Flink 的 KeyedStream 机制,保证了相同主键的数据总是被路由到同一个并行实例。因此,虽然物理上有多个 RocksDB 实例,但逻辑上,对于任何一个主键,它的索引信息只存在于其中一个固定的实例中,从而保证了全局索引的一致性。
总结
GlobalIndexAssigner
是一个集成了本地状态存储 (RocksDB)、外部排序、缓存和分布式计算思想的复杂但高效的组件。它通过引导+处理的两阶段模式,以及基于主键路由的分布式状态,巧妙地解决了跨分区 UPSERT
场景下的全局索引难题,是 Paimon 动态表功能能够强大、高效运行的基石。
GlobalIndexAssignerOperator
GlobalIndexAssignerOperator
是 GlobalIndexAssigner
在 Flink 计算引擎中的宿主和执行器。如果说 GlobalIndexAssigner
是一个功能强大的“引擎”,那么 GlobalIndexAssignerOperator
就是为这个引擎量身打造的“座驾”。它负责将 GlobalIndexAssigner
的逻辑无缝地集成到 Flink 的流处理拓扑中,处理生命周期、数据流转和状态管理。
public class GlobalIndexAssignerOperator
extends AbstractStreamOperator<Tuple2<InternalRow, Integer>>
implements OneInputStreamOperator<
Tuple2<KeyPartOrRow, InternalRow>, Tuple2<InternalRow, Integer>>,
BoundedOneInput {
// ...
}
深入分析:
extends AbstractStreamOperator<...>
: 表明它是一个标准的 Flink 流处理算子。它的输出类型是Tuple2<InternalRow, Integer>
,即(数据行, bucket号)
,这正是GlobalIndexAssigner
处理后的结果。implements OneInputStreamOperator<...>
: 表明这是一个单输入流的算子。它的输入类型是Tuple2<KeyPartOrRow, InternalRow>
。这个KeyPartOrRow
是一个枚举,用来区分输入的数据是存量数据(KEY_PART
)还是增量数据(ROW
),这对于实现两阶段工作模式至关重要。implements BoundedOneInput
: 表明这个算子可以处理有界流(批处理作业)。它需要实现endInput()
方法,当输入流结束时 Flink 会调用该方法。
核心成员变量
// ...
private final GlobalIndexAssigner assigner;
private transient IOManager ioManager;
// ...
深入分析:
assigner
: 这是最重要的成员,它持有一个GlobalIndexAssigner
的实例。Operator
的所有核心逻辑都是通过委托给这个assigner
对象来完成的。ioManager
: 一个transient
的 IO 管理器。transient
关键字意味着它不会被序列化。它在initializeState
方法中被创建,用于管理GlobalIndexAssigner
可能需要的磁盘溢写操作(例如 RocksDB 和外部排序缓冲)。
生命周期与初始化 (initializeState
)
这是 Operator
在 Flink TaskManager 上启动时最先执行的方法之一,负责准备好 GlobalIndexAssigner
的运行环境。
// ...
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
org.apache.flink.runtime.io.disk.iomanager.IOManager flinkIoManager =
getContainingTask().getEnvironment().getIOManager();
ioManager = IOManager.create(flinkIoManager.getSpillingDirectoriesPaths());
assigner.open(
computeManagedMemory(this),
ioManager,
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()),
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
this::collect);
}
// ...
深入分析:
- 获取 Flink 资源: 它从 Flink 的
Task
环境中获取底层的IOManager
,并用其临时目录来创建 Paimon 自己封装的IOManager
。 - 调用
assigner.open(...)
: 这是关键的桥接步骤。它将 Flink 的运行时信息传递给GlobalIndexAssigner
:computeManagedMemory(this)
: 计算并传入 Flink 为该算子分配的托管内存(Managed Memory),assigner
会用这部分内存来配置 RocksDB 的 Block Cache 等。ioManager
: 传入 IO 管理器。getNumberOfParallelSubtasks(...)
: 传入作业的并行度。getIndexOfThisSubtask(...)
: 传入当前子任务的 ID。this::collect
: 传入一个方法引用作为回调函数。当assigner
处理完一条数据后,会调用这个回调,Operator
再通过它将结果发送到下游。
通过这一步,GlobalIndexAssigner
就被完全“激活”并准备好处理数据了。
数据处理 (processElement
)
这是算子的核心数据处理逻辑,每当一条数据记录到达时,Flink 就会调用这个方法。
// ...
@Override
public void processElement(StreamRecord<Tuple2<KeyPartOrRow, InternalRow>> streamRecord)
throws Exception {
Tuple2<KeyPartOrRow, InternalRow> tuple2 = streamRecord.getValue();
InternalRow value = tuple2.f1;
switch (tuple2.f0) {
case KEY_PART:
assigner.bootstrapKey(value);
break;
case ROW:
assigner.processInput(value);
break;
}
}
// ...
深入分析:
- 数据分发:
processElement
的逻辑非常清晰,它就像一个交通警察。它解析输入Tuple2
的第一个字段KeyPartOrRow
:- 如果值是
KEY_PART
,说明这是来自上游IndexBootstrapOperator
的存量数据,应该用于引导。于是它调用assigner.bootstrapKey(value)
。 - 如果值是
ROW
,说明这是来自数据源的增量数据(或在引导结束后重新处理的缓存数据),应该进行正常处理。于是它调用assigner.processInput(value)
。
- 如果值是
这个 switch
语句完美地实现了对 GlobalIndexAssigner
两阶段工作模式的驱动。
Checkpoint 与批处理结束
这两个方法处理了流处理中的 Checkpoint 和批处理中的结束信号,它们都与 GlobalIndexAssigner
的引导阶段结束有关。
// ...
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
endBootstrap(false);
}
@Override
public void endInput() throws Exception {
endBootstrap(true);
}
private void endBootstrap(boolean isEndInput) throws Exception {
if (assigner.inBoostrap()) {
assigner.endBoostrap(isEndInput);
}
}
// ...
深入分析:
prepareSnapshotPreBarrier
: 在 Flink 中,当 Checkpoint Barrier 到达一个算子之前,会先调用这个方法。这是一个结束引导阶段的完美时机。当第一个 Checkpoint Barrier 到达时,意味着所有存量数据(KEY_PART
)都已经处理完毕。此时调用endBootstrap(false)
,GlobalIndexAssigner
就会执行批量加载 RocksDB、处理缓存数据等操作,并切换到正常处理模式。endInput
: 在批处理模式下,当所有输入数据都处理完毕后,Flink 会调用此方法。这也标志着引导阶段(如果存在)必须结束。此时调用endBootstrap(true)
,参数isEndInput=true
会触发GlobalIndexAssigner
中针对批处理的特殊优化路径。endBootstrap
: 这个私有方法封装了公共逻辑,即只有在assigner
仍处于引导模式时 (inBoostrap()
为true
),才调用endBoostrap
,避免了重复调用。
当 Flink 作业结束或取消时,会调用 close
方法来释放资源。
// ...
@Override
public void close() throws Exception {
this.assigner.close();
if (ioManager != null) {
ioManager.close();
}
}
// ...
深入分析:
- 它会依次关闭
assigner
(这会关闭 RocksDB 并删除临时文件)和ioManager
,确保所有本地资源都被正确清理,不会造成泄露。
总结
GlobalIndexAssignerOperator
是一个典型的适配器模式应用。它本身不包含复杂的业务逻辑,而是作为一个轻量级的“外壳”,将 GlobalIndexAssigner
这个通用的、与计算引擎无关的核心组件,适配到 Flink 的流处理模型中。它通过实现 Flink 的算子接口,巧妙地利用 Flink 的生命周期(initializeState
, close
)、数据处理(processElement
)和事件机制(prepareSnapshotPreBarrier
, endInput
)来驱动 GlobalIndexAssigner
的两阶段工作流程,是 Paimon 动态分桶功能在 Flink 上得以实现的关键连接点。
GlobalDynamicBucketSink
GlobalDynamicBucketSink
是 Paimon Flink Sink 体系中,专门用于处理启用了全局索引的动态分桶表(即 bucket-mode = 'cross-partition'
)的总装配车间。它的核心职责不是执行具体的写入逻辑,而是构建和编排 Flink 的 DataStream
作业拓扑。它像一个总工程师,将 IndexBootstrapOperator
、GlobalIndexAssignerOperator
、DynamicBucketRowWriteOperator
等一系列专用的算子(Operator)按照正确的顺序和数据分发逻辑连接起来,形成一个完整、高效的数据写入流水线。
public class GlobalDynamicBucketSink extends FlinkWriteSink<Tuple2<InternalRow, Integer>> {
// ...
}
深入分析:
extends FlinkWriteSink<Tuple2<InternalRow, Integer>>
:- 它继承自
FlinkWriteSink
,这是一个通用的 Paimon Flink Sink 基类,封装了创建Writer
和Committer
的通用逻辑。 - 泛型参数
Tuple2<InternalRow, Integer>
非常关键,它定义了这个 Sink 流水线中,写入算子 (Writer
) 的输入数据类型是(数据行, bucket号)
。这与我们之前分析的GlobalIndexAssignerOperator
的输出类型完全一致,表明了数据流的衔接关系。
- 它继承自
核心方法 build
build
方法是这个类的灵魂,它定义了整个 Flink 作业的拓扑结构。让我们分步解析这个方法的实现。
// ...
public DataStreamSink<?> build(DataStream<InternalRow> input, @Nullable Integer parallelism) {
// ...
// Topology:
// input -- bootstrap -- shuffle by key hash --> bucket-assigner -- shuffle by bucket -->
// writer --> committer
// ...
}
// ...
代码注释已经清晰地勾勒出了整个数据流拓扑,下面我们逐一解析每个阶段:
阶段 0: 输入 (input: DataStream<InternalRow>
)
这是数据流的起点,代表着从各种数据源(如 Kafka, CDC Source)流入的原始数据。
阶段 1: 引导 (bootstrap
)
// ...
SingleOutputStreamOperator<Tuple2<KeyPartOrRow, InternalRow>> bootstraped =
input.transform(
"INDEX_BOOTSTRAP",
new InternalTypeInfo<>(
new KeyWithRowSerializer<>(
bootstrapSerializer, rowSerializer)),
new IndexBootstrapOperator.Factory<>(
new IndexBootstrap(table), r -> r))
.setParallelism(input.getParallelism());
// ...
- 目的: 为后续的
GlobalIndexAssigner
准备引导数据。 - 实现:
- 通过
input.transform
添加了一个名为INDEX_BOOTSTRAP
的算子,这个算子就是IndexBootstrapOperator
。 IndexBootstrapOperator
内部持有一个IndexBootstrap
实例,它会读取 Paimon 表的当前快照,将所有存量数据作为KeyPartOrRow.KEY_PART
类型输出。- 同时,它也会将增量数据(即
input
流中的数据)直接作为KeyPartOrRow.ROW
类型透传下去。 - 这样,输出流
bootstraped
中就混合了两种类型的数据,为后续的GlobalIndexAssignerOperator
的两阶段工作模式做好了准备。
- 通过
阶段 2: 第一次 Shuffle (shuffle by key hash
)
// ...
// 1. shuffle by key hash
Integer assignerParallelism =
MathUtils.max(
options.dynamicBucketInitialBuckets(),
options.dynamicBucketAssignerParallelism());
if (assignerParallelism == null) {
assignerParallelism = parallelism;
}
KeyPartRowChannelComputer channelComputer =
new KeyPartRowChannelComputer(rowType, bootstrapType, primaryKeys);
DataStream<Tuple2<KeyPartOrRow, InternalRow>> partitionByKeyHash =
partition(bootstraped, channelComputer, assignerParallelism);
// ...
- 目的: 保证相同主键的数据(无论是存量还是增量)都被发送到同一个
bucket-assigner
并行实例上。这是GlobalIndexAssigner
能够正确维护全局索引状态的前提。 - 实现:
partition
是一个工具方法,它本质上是调用了 Flink 的partitionCustom
API。KeyPartRowChannelComputer
是一个自定义的分区器。它会从输入数据中提取主键,计算其哈希值,然后根据哈希值决定数据应该被发送到哪个下游分区。
阶段 3: 分配桶 (bucket-assigner
)
// ...
// 2. bucket-assigner
TupleTypeInfo<Tuple2<InternalRow, Integer>> rowWithBucketType =
new TupleTypeInfo<>(input.getType(), BasicTypeInfo.INT_TYPE_INFO);
SingleOutputStreamOperator<Tuple2<InternalRow, Integer>> bucketAssigned =
partitionByKeyHash
.transform(
"cross-partition-bucket-assigner",
rowWithBucketType,
GlobalIndexAssignerOperator.forRowData(table))
.setParallelism(partitionByKeyHash.getParallelism());
// declare managed memory for RocksDB
declareManagedMemory(
bucketAssigned, options.toConfiguration().get(SINK_CROSS_PARTITION_MANAGED_MEMORY));
// ...
- 目的: 为每一条数据分配一个最终的
bucket
号。 - 实现:
- 在
partitionByKeyHash
流上再次调用transform
,添加了GlobalIndexAssignerOperator
。 - 这个算子接收上游按主键 shuffle 好的数据,利用其内部的 RocksDB 状态和
BucketAssigner
逻辑,为每条数据计算出bucket
。 - 输出流
bucketAssigned
的类型变成了Tuple2<InternalRow, Integer>
,即(数据行, bucket号)
。 declareManagedMemory
为这个算子声明了需要的 Flink 托管内存,这部分内存主要会被 RocksDB 用于缓存。
- 在
阶段 4: 第二次 Shuffle (shuffle by bucket
)
// ...
// 3. shuffle by bucket
DataStream<Tuple2<InternalRow, Integer>> partitionByBucket =
partition(bucketAssigned, new RowWithBucketChannelComputer(schema), parallelism);
// ...
- 目的: 将数据按照分区和桶进行重新分发,确保同一个桶的数据被发送到同一个
writer
实例。这是 Paimon 文件写入的基本要求,因为一个数据文件只能由一个writer
写入。 - 实现:
- 再次调用
partition
工具方法。 RowWithBucketChannelComputer
是另一个自定义分区器。它会从(数据行, bucket号)
中提取分区键和bucket
号,然后根据这两者的组合哈希值来决定下游分区。
- 再次调用
阶段 5: 写入与提交 (writer --> committer
)
// ...
// 4. writer and committer
return sinkFrom(partitionByBucket, createCommitUser(options.toConfiguration()));
}
// ...
@Override
protected OneInputStreamOperatorFactory<Tuple2<InternalRow, Integer>, Committable>
createWriteOperatorFactory(StoreSinkWrite.Provider writeProvider, String commitUser) {
return new DynamicBucketRowWriteOperator.Factory(table, writeProvider, commitUser);
}
// ...
- 目的: 将数据写入 Paimon 的数据文件,并在 Checkpoint 完成时生成可提交的元数据(
Committable
)。 - 实现:
sinkFrom
是FlinkWriteSink
基类提供的方法,它会组装Writer
和Committer
算子。createWriteOperatorFactory
方法被重写,返回一个DynamicBucketRowWriteOperator.Factory
。这告诉sinkFrom
方法,应该使用DynamicBucketRowWriteOperator
作为写入算子。DynamicBucketRowWriteOperator
接收按桶 shuffle 好的(数据行, bucket号)
数据,并调用底层的StoreSinkWrite
将其写入文件。
什么是存量数据和增量数据?
为了更好地理解,我们可以用一个仓库管理的比喻:
存量数据 (Stock Data):
- 比喻: 想象一下,你今天第一天接管一个仓库。在开始记录今天新进来的货物之前,你首先需要做的,是把仓库里已经存在的所有货物都清点一遍,登记在册。这个“已经存在的货物”就是存量。
- 在 Paimon 中: 当你启动一个 Flink 作业去写入一张 Paimon 表时,这张表里可能已经通过之前的作业写入了很多数据。这些在 Flink 作业启动那一刻已经存在于 Paimon 表文件里的数据,就是存量数据。
- 代码关联:
IndexBootstrap
这个类的核心职责,就是去扫描 Paimon 表的最新快照,把这些存量数据读取出来。
增量数据 (Incremental Data):
- 比喻: 在你清点完库存之后,今天陆陆续续有新的货车开到仓库门口,卸下新的货物。这些“新来的货物”就是增量。
- 在 Paimon 中: 当你的 Flink 作业启动并运行后,从数据源(比如 Kafka、CDC)实时流过来的新数据,就是增量数据。
- 代码关联:
IndexBootstrapOperator
的processElement
方法,以及GlobalIndexAssignerOperator
在引导结束后处理的数据,都属于增量数据。
总结一下:存量是“过去时”,是启动时的快照;增量是“现在进行时”,是启动后源源不断流入的数据流。
为什么 GlobalIndexAssignerOperator
需要处理存量数据?
为了保证主键的全局唯一性,并正确处理跨分区的更新(UPSERT)。
GlobalIndexAssignerOperator
的核心使命是为每一条数据分配一个 bucket
。对于启用了全局索引的动态分桶表,它必须知道一个主键(Primary Key)当前到底在哪一个分区、哪一个 bucket 里。如果不知道这个信息,它就无法做出正确的判断。
让我们通过一个具体的例子来理解这个过程:
场景:
- 一张用户表,以
user_id
为主键,login_date
为分区键。 merge-engine
设置为deduplicate
(去重,保留最新)。
初始状态 (存量数据):
- 表里已经有一条数据:
{user_id: 101, name: 'Alice', login_date: '2023-10-01'}
。这条数据位于分区pt='2023-10-01'
,假设在bucket-3
。
现在,Flink 作业启动了,GlobalIndexAssignerOperator
开始工作。
第一步:处理存量数据(引导阶段 Bootstrap)
IndexBootstrapOperator
运行IndexBootstrap
逻辑,扫描全表,找到了存量数据{user_id: 101, ...}
。- 它将这条数据标记为
KEY_PART
,发送给GlobalIndexAssignerOperator
。 GlobalIndexAssignerOperator
收到后,调用assigner.bootstrapKey()
。GlobalIndexAssigner
将这个信息加载到它的本地 RocksDB 索引中,记录下:Key(101) -> Value(partition_id_of_2023-10-01, bucket_3)
。
至此,引导阶段完成。GlobalIndexAssigner
已经知道了所有“老”数据的位置。
第二步:处理增量数据
- 现在,从数据源来了一条新的增量数据:
{user_id: 101, name: 'Alice_New', login_date: '2023-10-05'}
。 - 这条数据流经
IndexBootstrapOperator
,被标记为ROW
,发送给GlobalIndexAssignerOperator
。 GlobalIndexAssignerOperator
调用assigner.processInput()
。GlobalIndexAssigner
拿到主键101
,去查询它的 RocksDB 索引。- 它查到了! 索引告诉它,
user_id=101
的数据之前存在于分区pt='2023-10-01'
的bucket-3
中。 - 因为新数据的分区是
'2023-10-05'
,与老数据不同,这触发了跨分区更新逻辑。 - 根据
deduplicate
引擎的规则,它会执行以下操作:- 生成一条
DELETE
记录,指向老的位置:{user_id: 101, ..., RowKind: DELETE}
,并给它分配bucket-3
,发往下游的 Writer。 - 为新数据
{user_id: 101, name: 'Alice_New', ...}
分配一个新的 bucket(比如bucket-8
),并更新 RocksDB 索引为Key(101) -> Value(partition_id_of_2023-10-05, bucket_8)
,然后将这条INSERT
记录发往下游。
- 生成一条
如果没有第一步对存量数据的处理,GlobalIndexAssigner
的 RocksDB 索引就是空的。当第二步的增量数据到来时,它会认为 user_id=101
是一个全新的主键,直接为其分配一个新的 bucket 并插入。这样做的后果是灾难性的:Paimon 表里会同时存在两条 user_id=101
的数据(一条在 2023-10-01
分区,一条在 2023-10-05
分区),主键约束被破坏了。
因此,GlobalIndexAssignerOperator
必须先通过 IndexBootstrap
加载和处理所有“之前的数据”(存量数据),构建起一个完整的、反映历史状态的全局索引。只有在这个坚实的基础上,它才能正确地处理后续的增量数据,实现精准的跨分区 UPSERT
。
总结
GlobalDynamicBucketSink
通过其 build
方法,以一种声明式的方式,完美地编排了一个复杂的多阶段 Flink 流处理作业。这个作业拓扑通过两次关键的 Shuffle 操作,解决了全局索引和数据写入的核心数据分发问题:
- 第一次 Shuffle (by Key): 保证了状态计算的正确性(相同主键到同个 assigner)。
- 第二次 Shuffle (by Bucket): 保证了数据写入的正确性(相同桶到同个 writer)。
整个流程清晰、高效,充分利用了 Flink 的数据流和算子模型,是 Paimon 实现跨分区动态分桶这一高级功能的基石。
IndexBootstrap
和 IndexBootstrapOperator
它们共同完成了为全局索引提供存量数据这个关键任务。
IndexBootstrap
是逻辑执行者,负责从 Paimon 表中读取存量数据。 IndexBootstrapOperator
是物理执行者,它在 Flink 环境中运行 IndexBootstrap
的逻辑,并将读取到的存量数据和流经的增量数据一起发送给下游。
IndexBootstrap
的核心使命是:扫描 Paimon 表的最新快照,读取所有存量数据,并提取出构建全局索引所必需的信息。
我们来看它的核心方法 bootstrap(int numAssigners, int assignId)
:
// ... existing code ...
public RecordReader<InternalRow> bootstrap(int numAssigners, int assignId) throws IOException {
RowType rowType = table.rowType();
List<String> fieldNames = rowType.getFieldNames();
int[] keyProjection =
table.primaryKeys().stream()
.map(fieldNames::indexOf)
.mapToInt(Integer::intValue)
.toArray();
// 1. 强制使用 latest 模式扫描
ReadBuilder readBuilder =
table.copy(Collections.singletonMap(SCAN_MODE.key(), LATEST.toString()))
.newReadBuilder()
.withProjection(keyProjection);
// 2. 规划并过滤 Splits
DataTableScan tableScan = (DataTableScan) readBuilder.newScan();
List<Split> splits =
tableScan
.withBucketFilter(bucket -> bucket % numAssigners == assignId)
.plan()
.splits();
// 3. (可选) 根据 TTL 过滤 Splits
CoreOptions options = CoreOptions.fromMap(table.options());
Duration indexTtl = options.crossPartitionUpsertIndexTtl();
if (indexTtl != null) {
// ... filter splits by TTL ...
}
// 4. 并行读取并拼接数据
RowDataToObjectArrayConverter partBucketConverter =
new RowDataToObjectArrayConverter(
TypeUtils.concat(
TypeUtils.project(rowType, table.partitionKeys()),
RowType.of(DataTypes.INT())));
return parallelExecute(
TypeUtils.project(rowType, keyProjection),
s -> readBuilder.newRead().createReader(s),
splits,
options.pageSize(),
options.crossPartitionUpsertBootstrapParallelism(),
split -> {
DataSplit dataSplit = ((DataSplit) split);
int bucket = dataSplit.bucket();
return partBucketConverter.toGenericRow(
new JoinedRow(dataSplit.partition(), GenericRow.of(bucket)));
},
(row, extra) -> new JoinedRow().replace(row, extra));
}
// ... existing code ...
深入分析:
强制
latest
扫描: 它创建了一个新的ReadBuilder
,并强制将扫描模式(scan.mode
)设置为latest
。这意味着它总是读取表的最新快照,确保获取到所有最新的存量数据。同时,它只投影(withProjection
)了主键字段,因为引导阶段只需要主键信息。规划并过滤 Splits:
- 它调用
tableScan.plan()
来规划出所有需要读取的数据文件(Splits
)。 - 最关键的一步是
.withBucketFilter(bucket -> bucket % numAssigners == assignId)
。这行代码实现了引导任务的并行化。numAssigners
是下游GlobalIndexAssignerOperator
的并行度,assignId
是当前实例的 ID。这个过滤器确保了每个IndexBootstrap
实例只读取一部分bucket
的数据,避免了重复工作。
- 它调用
TTL 过滤 (可选): 如果用户配置了
cross-partition-upsert.index-ttl
,它会进一步过滤Splits
。filterSplit
方法会检查Split
中的文件创建时间,如果所有文件都已经超出了 TTL,那么这个Split
就会被丢弃,因为它的索引数据已经过期,无需加载。并行读取与数据拼接:
parallelExecute
是一个工具方法,它会启动一个线程池(并行度由cross-partition-upsert.bootstrap-parallelism
控制)来并行地读取所有规划好的Splits
。- 对于每个
Split
,它会提取出分区信息和 bucket 号。 - 对于从文件中读取的每一行数据(只包含主键),它会和上一步提取的
(分区, bucket)
信息拼接(JoinedRow
)起来。 - 最终,它返回一个
RecordReader
,这个 Reader 产出的每一行InternalRow
都包含了(主键字段..., 分区字段..., bucket号)
,这正是下游GlobalIndexAssigner
进行引导所需要的全部信息。
IndexBootstrapOperator
: Flink 中的“调度器”
IndexBootstrapOperator
是一个 Flink 算子,它为 IndexBootstrap
提供了运行环境,并负责将引导数据和增量数据进行“标记”和“转发”。
public class IndexBootstrapOperator<T> extends AbstractStreamOperator<Tuple2<KeyPartOrRow, T>>
implements OneInputStreamOperator<T, Tuple2<KeyPartOrRow, T>> {
// ...
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
// 1. 启动引导过程
bootstrap.bootstrap(
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()),
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
this::collect);
}
@Override
public void processElement(StreamRecord<T> streamRecord) throws Exception {
// 2. 处理增量数据
output.collect(new StreamRecord<>(new Tuple2<>(KeyPartOrRow.ROW, streamRecord.getValue())));
}
private void collect(InternalRow row) {
// 3. 处理引导数据
output.collect(
new StreamRecord<>(new Tuple2<>(KeyPartOrRow.KEY_PART, converter.apply(row))));
}
// ...
}
深入分析:
启动引导 (
initializeState
):- 在算子初始化时,它会立即调用
bootstrap.bootstrap(...)
方法。 - 它将 Flink 的并行度(
getNumberOfParallelSubtasks
)和当前子任务 ID(getIndexOfThisSubtask
)传递给IndexBootstrap
,这样IndexBootstrap
才知道自己应该读取哪些bucket
的数据。 - 它将自己的
collect
方法作为回调函数传进去。
- 在算子初始化时,它会立即调用
处理增量数据 (
processElement
):- 这个方法处理从上游流过来的增量数据(比如来自 Kafka 的新消息)。
- 它将每一条增量数据包装成
Tuple2(KeyPartOrRow.ROW, data)
的形式,然后发送给下游。KeyPartOrRow.ROW
这个标记告诉下游的GlobalIndexAssignerOperator
:“这是一条增量数据,请按正常流程处理”。
处理引导数据 (
collect
):- 这个方法是
IndexBootstrap
的回调函数。当IndexBootstrap
在后台读取并拼接好一条存量数据后,就会调用这个collect
方法。 - 它将收到的存量数据(
row
)包装成Tuple2(KeyPartOrRow.KEY_PART, data)
的形式,然后发送给下游。KeyPartOrRow.KEY_PART
这个标记告诉下游的GlobalIndexAssignerOperator
:“这是一条用于引导的存量数据,请调用bootstrapKey
方法”。
- 这个方法是
总结
IndexBootstrap
和 IndexBootstrapOperator
的关系可以总结如下:
IndexBootstrap
是一个数据源。它负责**“生产”** 用于构建全局索引的存量数据。它通过扫描 Paimon 表文件,并利用bucket
过滤来实现并行化读取。IndexBootstrapOperator
是一个转换器和转发器。它在 Flink 作业启动时,触发IndexBootstrap
开始生产数据。然后,它扮演一个交通枢纽的角色:- 对于
IndexBootstrap
生产的存量数据,它打上KEY_PART
标签。 - 对于从上游流经的增量数据,它打上
ROW
标签。 - 最后,它将这两种打好标签的数据流合并在一起,发送给下游的
GlobalIndexAssignerOperator
进行统一处理。
- 对于
这个设计将存量数据读取和增量数据处理巧妙地统一到了一个 Flink 数据流中,为后续的全局索引构建和动态分桶奠定了基础。