这里的写入实际上只是sort run的临时文件
之后整理索引,和写Parquet、远程存储
BinaryExternalSortBuffer
BinaryExternalSortBuffer
是 Paimon 中用于处理大规模数据排序的核心组件。当待排序的数据量超过内存限制时,它能够利用磁盘空间完成排序,这种技术通常被称为“外部排序”(External Sorting)。
其核心思想是经典的 外部归并排序(External Merge Sort) 算法,主要分为两个阶段:
- 排序与溢写(Sort and Spill): 在内存中尽可能多地加载数据,对这部分数据进行排序,然后将排好序的“顺串”(run)写入临时的磁盘文件。重复此过程,直到所有数据都被处理完,形成多个有序的磁盘文件。
- 归并(Merge): 将所有在磁盘上的有序文件进行多路归并,生成一个最终的、全局有序的数据流。
BinaryExternalSortBuffer
实现了 SortBuffer
接口,其主要职责是:接收 InternalRow
类型的记录,并在所有记录写入完成后,提供一个能够按序迭代出所有记录的迭代器 MutableObjectIterator<BinaryRow>
。
它与 BinaryInMemorySortBuffer
协同工作,前者负责处理超出内存容量的情况,而后者则是一个纯粹的内存排序缓冲区。
// ... existing code ...
/** A spillable {@link SortBuffer}. */
public class BinaryExternalSortBuffer implements SortBuffer {
// 内存中的排序缓冲区
private final BinaryInMemorySortBuffer inMemorySortBuffer;
// IO管理器,用于创建和管理磁盘文件
private final IOManager ioManager;
// 记录所有溢写到磁盘的文件的元信息
private final List<ChannelWithMeta> spillChannelIDs;
// 外部归并器,负责合并多个溢写文件
private final BinaryExternalMerger merger;
// 限制最多能同时打开的文件句柄数
private final int maxNumFileHandles;
// 磁盘空间使用上限
private final MemorySize maxDiskSize;
// ... existing code ...
核心组件与成员变量
BinaryExternalSortBuffer
内部组合了几个关键对象来完成其功能:
inMemorySortBuffer
: 这是一个纯内存排序缓冲区。所有新写入的数据首先会进入这里。ioManager
: I/O 管理器,负责创建、删除和读写临时文件,是与磁盘交互的入口。spillChannelIDs
: 一个List
,用于追踪所有已经溢写到磁盘的临时文件(channel)的元数据(ID、大小等)。merger
: 一个BinaryExternalMerger
实例。这是实现多路归并的核心。当需要从所有溢写文件中读取最终排序结果时,由它来负责。maxNumFileHandles
: 一个整数,用于限制同时打开的溢写文件数量。当溢写文件过多时,会触发一次“中间归并”,将部分小文件合并成一个大文件,以减少文件句柄的占用。maxDiskSize
: 一个MemorySize
对象,用于限制该排序器能使用的最大磁盘空间,防止无限膨胀。compression...
: 相关的压缩配置,可以在数据写入磁盘前进行压缩,以节省磁盘空间和 I/O。
数据写入 (write
方法)
这是向排序器添加数据的主要入口。
// ... existing code ...
@Override
public boolean write(InternalRow record) throws IOException {
while (true) {
// 1. 尝试写入内存缓冲区
boolean success = inMemorySortBuffer.write(record);
if (success) {
this.numRecords++;
return true;
}
// 2. 如果内存缓冲区为空还写不进去,说明单个记录过大
if (inMemorySortBuffer.isEmpty()) {
// did not fit in a fresh buffer, must be large...
throw new IOException("The record exceeds the maximum size of a sort buffer.");
} else {
// 3. 内存缓冲区已满,触发溢写(spill)操作
spill();
// 4. 如果溢写文件数量达到句柄上限,触发中间合并
if (spillChannelIDs.size() >= maxNumFileHandles) {
List<ChannelWithMeta> merged = merger.mergeChannelList(spillChannelIDs);
spillChannelIDs.clear();
spillChannelIDs.addAll(merged);
}
}
}
}
// ... existing code ...
流程解读:
- 首先尝试将记录写入
inMemorySortBuffer
。如果成功,则直接返回。 - 如果写入失败,说明内存缓冲区满了。
- 此时会调用
spill()
方法,将内存中的数据排序后写入一个磁盘文件。 spill()
执行完后,内存被清空,while(true)
循环会再次尝试写入刚才失败的记录,这次通常会成功。- 代码还考虑了两种边界情况:
- 如果一条记录的大小超过了整个内存缓冲区的容量,会直接抛出异常。
- 如果溢写的临时文件太多,超过了
maxNumFileHandles
限制,会调用merger.mergeChannelList()
进行一次中间归并,将多个小文件合并成一个大文件,从而减少文件句柄的占用。
内存溢写到磁盘 (spill
方法)
这是执行“排序与溢写”阶段的核心逻辑。
// ... existing code ...
private void spill() throws IOException {
if (inMemorySortBuffer.isEmpty()) {
return;
}
// 1. 获取一个新的文件通道
FileIOChannel.ID channel = enumerator.next();
channelManager.addChannel(channel);
ChannelWriterOutputView output = null;
int blockCount;
try {
// 2. 创建带压缩的输出流
output =
FileChannelUtil.createOutputView(
ioManager, channel, compressionCodecFactory, compressionBlockSize);
// 3. 对内存中的数据进行排序
new QuickSort().sort(inMemorySortBuffer);
// 4. 将排好序的数据写入磁盘文件
inMemorySortBuffer.writeToOutput(output);
output.close();
blockCount = output.getBlockCount();
} catch (IOException e) {
// ... 异常处理 ...
}
// 5. 记录溢写文件的元信息
spillChannelIDs.add(new ChannelWithMeta(channel, blockCount, output.getWriteBytes()));
// 6. 清空内存缓冲区,为下一批数据做准备
inMemorySortBuffer.clear();
}
// ... existing code ...
流程解读:
- 如果内存缓冲区是空的,则无需溢写。
- 通过
ioManager
创建一个新的临时文件。 - 对
inMemorySortBuffer
中的数据执行快速排序。 - 将排好序的数据通过
ChannelWriterOutputView
写入磁盘。 - 将这个新生成的溢写文件的元数据(ID、块数、字节数)存入
spillChannelIDs
列表。 - 清空
inMemorySortBuffer
,使其可以接收新的数据。
获取排序结果 (sortedIterator
方法)
当所有数据都 write
完成后,调用此方法可以获得一个全局有序的迭代器。
BinaryInMemorySortBuffer 的内存容量是动态的,其上限由外部传入的 MemorySegmentPool 决定。当内存池耗尽,write() 方法返回 false,标志着内存缓冲区已满,这通常会触发溢写操作,然后清空并重用内存。
// ... existing code ...
@Override
public final MutableObjectIterator<BinaryRow> sortedIterator() throws IOException {
// 情况一:没有发生溢写,所有数据都在内存中
if (spillChannelIDs.isEmpty()) {
return inMemorySortBuffer.sortedIterator();
}
// 情况二:发生了溢写,需要进行归并
return spilledIterator();
}
private MutableObjectIterator<BinaryRow> spilledIterator() throws IOException {
// 1. 将内存中剩余的数据最后一次溢写到磁盘
spill();
List<FileIOChannel> openChannels = new ArrayList<>();
// 2. 使用 merger 创建一个多路归并迭代器
BinaryMergeIterator<BinaryRow> iterator =
merger.getMergingIterator(spillChannelIDs, openChannels);
channelManager.addOpenChannels(openChannels);
// 3. 返回一个包装后的迭代器
return new MutableObjectIterator<BinaryRow>() {
// ... existing code ...
@Override
public BinaryRow next() throws IOException {
BinaryRow row = iterator.next();
// 因为底层迭代器会复用对象,所以这里需要拷贝
return row == null ? null : row.copy();
}
};
}
// ... existing code ...
流程解读:
- 首先判断是否发生过溢写(
spillChannelIDs
是否为空)。 - 如果未发生溢写,说明所有数据都在
inMemorySortBuffer
中,直接调用其sortedIterator()
方法返回内存排序的结果即可。 - 如果发生过溢写,则进入
spilledIterator()
逻辑:- 先调用
spill()
做最后一次刷新,确保内存中所有数据都被写入磁盘。 - 然后,核心步骤是调用
merger.getMergingIterator()
。这个方法会为spillChannelIDs
列表中的每一个溢写文件创建一个读取器,并构造一个 最小堆(Min-Heap) 来进行 K-路归并。每次从迭代器中next()
,它都会从所有文件中取出最小的记录返回。 - 最后返回的迭代器对
BinaryMergeIterator
做了包装。一个重要的细节是row.copy()
,因为底层的归并迭代器为了性能会复用BinaryRow
对象,如果不拷贝,上层逻辑拿到的所有记录都会是同一个对象的引用,内容会被不断覆盖。
- 先调用
总结
BinaryExternalSortBuffer
是一个设计精良的外部排序实现,它完美地体现了“分而治之”的思想:
- 分(Spill): 将无法一次性载入内存的大问题,分解为一个个可以在内存中解决的小问题(对
inMemorySortBuffer
排序),并将中间结果(有序的顺串)存到磁盘。 - 治(Merge): 将所有子问题的解(磁盘上的有序文件)通过高效的多路归并算法,合并成最终的全局解(一个全局有序的数据流)。
通过内存与磁盘的协同工作,并辅以压缩、文件句柄管理等优化手段,BinaryExternalSortBuffer
能够高效地处理远超物理内存大小的数据排序任务。
PartialOrderPriorityQueue<T>
一个基于数组实现的最小堆 (Min-Heap)。和普通最小堆没有区别,只是在数组满了的时候有一个特定的判断规则:如果新加入元素不小于堆顶,替换堆顶元素。
构造函数
PartialOrderPriorityQueue(Comparator<T> comparator, int capacity)
:接收一个比较器
comparator
,用于决定堆中元素的顺序(在这个场景下,是“小于”关系,所以是最小堆)。capacity
指定了堆的最大容量。内部数组heap
的大小是capacity + 1
,因为堆的实现通常从索引 1 开始,方便计算父子节点。
lessThan(T a, T b)
: 使用传入的comparator
来比较两个元素。put(T element)
/offer(T element)
:put
是一个内部方法(但实际上是public,没有检查数组长度),直接将元素添加到堆的末尾,然后调用upHeap()
来维护堆的性质(新元素向上调整到正确位置)。offer
是公共接口:如果堆未满,调用
put
添加元素。如果堆已满,但新元素不小于堆顶元素(即新元素大于等于堆顶元素),则用新元素替换堆顶元素,并调用
adjustTop()
(即downHeap()
)来维护堆的性质。
peek()
: 返回堆顶元素(最小元素),但不移除它。poll()
: 移除并返回堆顶元素(最小元素)。将堆的最后一个元素移到堆顶,然后调用downHeap()
来维护堆的性质。adjustTop()
: 实际上就是downHeap()
,当堆顶元素被替换后,调用此方法使其下沉到正确位置。upHeap()
: 堆的上浮操作。当新元素添加到末尾时,将其与其父节点比较,如果小于父节点则交换,直到找到正确位置或到达堆顶。downHeap()
: 堆的下沉操作。当堆顶元素被移除或替换时,将一个元素(通常是原末尾元素或新替换的元素)放到堆顶,然后将其与其子节点中较小的那个比较,如果大于子节点则交换,直到找到正确位置或成为叶子节点。
BinaryMergeIterator
BinaryMergeIterator
是 Apache Paimon 项目中用于对多个已排序的二进制数据流进行归并排序的核心迭代器。它的主要作用是将多个迭代器(MutableObjectIterator
)的数据合并成一个单一的、全局有序的输出流。这在处理大量数据、特别是需要从多个文件或内存缓冲区进行合并的场景中非常关键,例如在外部排序(External Sorting)的归并阶段。
下面我们来分部分解析这个类:
// ... existing code ...
import java.util.Comparator;
import java.util.List;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Binary version of {@code MergeIterator}. Use {@code RecordComparator} to compare record. */
public class BinaryMergeIterator<Entry> implements MutableObjectIterator<Entry> {
// ... existing code ...
public class BinaryMergeIterator<Entry> implements MutableObjectIterator<Entry>
:- 这个类是一个泛型类,
Entry
代表了迭代器中处理的数据记录的类型。在 Paimon 的上下文中,这通常是BinaryRow
,一种紧凑的、二进制格式的行数据。 - 它实现了
MutableObjectIterator<Entry>
接口。这个接口定义了迭代器的基本行为,特别是next(E reuse)
方法,它允许在迭代过程中重用对象,从而减少垃圾回收(GC)的压力,这对于高性能数据处理系统至关重要。
- 这个类是一个泛型类,
核心数据结构:最小堆
// ... existing code ...
public class BinaryMergeIterator<Entry> implements MutableObjectIterator<Entry> {
// heap over the head elements of the stream
private final PartialOrderPriorityQueue<HeadStream<Entry>> heap;
private HeadStream<Entry> currHead;
public BinaryMergeIterator(
// ... existing code ...
private final PartialOrderPriorityQueue<HeadStream<Entry>> heap;
:- 这是
BinaryMergeIterator
的核心。它使用了一个 最小堆(PartialOrderPriorityQueue
)来管理所有的输入迭代器。 - 堆中存储的元素是
HeadStream<Entry>
对象,而不是直接的数据记录Entry
。HeadStream
是一个内部类,它封装了每个输入迭代器以及该迭代器当前的头部元素(即最小的元素)。 - 通过维护一个最小堆,
BinaryMergeIterator
可以非常高效地在所有输入流的当前头部元素中找到全局最小的那个,这个元素永远位于堆顶。
- 这是
构造函数:初始化
// ... existing code ...
private HeadStream<Entry> currHead;
public BinaryMergeIterator(
List<MutableObjectIterator<Entry>> iterators,
List<Entry> reusableEntries,
Comparator<Entry> comparator)
throws IOException {
checkArgument(iterators.size() == reusableEntries.size());
this.heap =
new PartialOrderPriorityQueue<>(
(o1, o2) -> comparator.compare(o1.getHead(), o2.getHead()),
iterators.size());
for (int i = 0; i < iterators.size(); i++) {
this.heap.add(new HeadStream<>(iterators.get(i), reusableEntries.get(i)));
}
}
@Override
// ... existing code ...
public BinaryMergeIterator(...)
:List<MutableObjectIterator<Entry>> iterators
: 接收一个迭代器列表,每个迭代器代表一个已经排好序的数据源。List<Entry> reusableEntries
: 接收一个可重用对象的列表,其大小与iterators
列表相同。每个reusableEntries
中的对象会与一个iterator
对应,用于在从该迭代器读取数据时承载数据,避免重复创建新对象。Comparator<Entry> comparator
: 一个比较器,用于比较两个Entry
对象的大小,这是维持堆有序性的关键。- 初始化过程:
- 创建一个
PartialOrderPriorityQueue
(最小堆),其比较逻辑基于传入的comparator
,比较的是HeadStream
的头部元素。 - 遍历所有输入的
iterators
,为每一个iterator
创建一个HeadStream
包装对象。 - 在创建
HeadStream
时,会立即从对应的iterator
中读取第一个元素作为head
。 - 将所有创建好的
HeadStream
对象添加到最小堆中。初始化完成后,堆顶的HeadStream
就包含了所有输入流中最小的那个元素。
- 创建一个
核心方法:next()
// ... existing code ...
@Override
public Entry next() throws IOException {
if (currHead != null) {
if (currHead.noMoreHead()) {
this.heap.poll();
} else {
this.heap.adjustTop();
}
}
if (this.heap.size() > 0) {
currHead = this.heap.peek();
return currHead.getHead();
} else {
return null;
}
}
private static final class HeadStream<Entry> {
// ... existing code ...
public Entry next() throws IOException
: 这是迭代器的核心逻辑,用于获取全局有序流中的下一个元素。if (currHead != null)
:currHead
保存了上一次next()
调用返回的那个元素所在的HeadStream
。如果不是第一次调用,就需要处理这个HeadStream
。if (currHead.noMoreHead())
: 调用noMoreHead()
方法,尝试从该HeadStream
对应的原始迭代器中读取下一个元素,并更新其head
。如果原始迭代器已经没有更多元素了(返回true
),说明这个流已经耗尽。this.heap.poll()
: 如果流已耗尽,就将这个HeadStream
从堆中移除。else { this.heap.adjustTop(); }
: 如果流中还有数据,那么它的head
已经更新为下一个元素。由于head
变了,它在堆中的位置可能不再正确,因此需要调用adjustTop()
来调整堆,确保堆顶依然是全局最小的元素。【这里巧妙的利用条件,如果还有元素,会进入 else;没有就弹出这个Stream】if (this.heap.size() > 0)
: 在处理完上一个元素后,检查堆是否为空。currHead = this.heap.peek()
: 如果堆不为空,那么堆顶的元素 (peek()
) 就是当前所有流中最小的元素。将其赋给currHead
。return currHead.getHead()
: 返回这个最小元素。else { return null; }
: 如果堆为空,说明所有输入流都已耗尽,归并过程结束,返回null
。
内部类:HeadStream
// ... existing code ...
}
}
private static final class HeadStream<Entry> {
private final MutableObjectIterator<Entry> iterator;
private Entry head;
private HeadStream(MutableObjectIterator<Entry> iterator, Entry head) throws IOException {
this.iterator = iterator;
this.head = head;
if (noMoreHead()) {
throw new IllegalStateException();
}
}
private Entry getHead() {
return this.head;
}
private boolean noMoreHead() throws IOException {
return (this.head = this.iterator.next(head)) == null;
}
}
}
private static final class HeadStream<Entry>
: 这是一个辅助类,非常关键。iterator
: 持有原始的输入迭代器。head
: 缓存了该迭代器当前的头部元素。这个head
对象就是从构造函数传入的reusableEntries
之一,实现了对象的复用。getHead()
: 返回当前缓存的头部元素。noMoreHead()
: 这是推进迭代器的核心。它调用原始迭代器的next(head)
方法,将下一个元素读入到head
对象中,并返回next
的结果。如果next
返回null
,表示这个流结束了。
总结
BinaryMergeIterator
通过一个最小堆(PartialOrderPriorityQueue
)和对每个输入流的 HeadStream
封装,实现了一个高效、低GC开销的多路归并排序迭代器。其工作流程可以概括为:
- 初始化:将每个排好序的输入流的第一个元素(通过
HeadStream
)放入最小堆。 - 迭代: a. 从堆顶取出(
peek
)当前全局最小的元素作为本次迭代的结果。 b. 从该元素所属的输入流中读取下一个元素,更新其在堆中的HeadStream
。 c. 调整堆,使新的堆顶仍然是全局最小元素。 d. 如果某个输入流被耗尽,则从堆中移除。 - 结束:当堆为空时,表示所有输入流都已处理完毕,迭代结束。
这种设计是处理大规模数据排序和合并的经典算法(K-way merge sort)的实现,在 Paimon 的数据合并(Compaction)等场景中扮演着至关重要的角色。
AbstractBinaryExternalMerger
AbstractBinaryExternalMerger<Entry>
这个抽象类。它是 Paimon 外部排序(External Sort)机制中负责 归并(Merge) 阶段的核心逻辑实现。
当内存中的数据(BinaryExternalSortBuffer
)被写满后,会先在内存中排序,然后作为一个有序的“顺串(run)”溢出到磁盘上的一个临时文件中。当所有数据都处理完毕后,磁盘上会存在多个这样的有序文件。AbstractBinaryExternalMerger
的任务就是将这些小的有序文件逐步归并成一个最终的、全局有序的大文件(或数据流)。
AbstractBinaryExternalMerger
是一个模板方法模式的应用。它定义了多路归并的核心算法和流程,但将一些与具体数据类型(Entry
)相关的操作(如比较、序列化/反序列化)抽象出去,交由子类实现。
其核心职责是:
- 多路归并策略: 实现一个高效的多路归并算法,能够一次性合并多个(最多
maxFanIn
个)已排序的磁盘文件。 - 迭代式归并: 如果溢出文件数量超过了
maxFanIn
,它能执行多轮归并,逐步减少文件数量,直到最终只剩下一个文件。 - 资源管理: 与
IOManager
和SpillChannelManager
紧密协作,负责创建归并后的新文件、打开待归并的旧文件,并在归并完成后及时关闭和删除旧文件,防止资源泄漏。 - 提供归并结果: 最终可以提供一个迭代器(
BinaryMergeIterator
),让调用者能够以流式的方式消费最终完全排序好的数据。
核心属性
// ... existing code ...
public abstract class AbstractBinaryExternalMerger<Entry> implements Closeable {
// ... existing code ...
private final int maxFanIn;
private final SpillChannelManager channelManager;
private final BlockCompressionFactory compressionCodecFactory;
private final int compressionBlockSize;
protected final int pageSize;
protected final IOManager ioManager;
// ... existing code ...
}
ioManager
: I/O 服务门面,用于创建文件通道。pageSize
: 内存页大小,用于创建读写缓冲区。maxFanIn
: 最大扇入。这是归并算法的一个关键参数,表示一次归并操作最多可以同时合并多少个文件。更大的maxFanIn
可以减少归并的轮次,但会消耗更多的内存(因为需要为每个输入文件都分配一个读缓冲区)。channelManager
: 溢出通道管理器,用于追踪和清理归并过程中涉及的临时文件。compressionCodecFactory
/compressionBlockSize
: 压缩相关的配置,用于在读写磁盘文件时进行解压和压缩,以节省 I/O。
getMergingIterator
方法本身
这个方法是整个归并排序(Merge Sort)过程的“发动机”,它的核心作用是:为一组已排序的磁盘文件(Channels),创建一个统一的、能够按顺序逐一吐出全局最小元素的归并迭代器(BinaryMergeIterator)。
// ... existing code ...
public BinaryMergeIterator<Entry> getMergingIterator(
List<ChannelWithMeta> channelIDs, List<FileIOChannel> openChannels) throws IOException {
// create one iterator per channel id
if (LOG.isDebugEnabled()) {
LOG.debug("Performing merge of " + channelIDs.size() + " sorted streams.");
}
// 1. 创建一个列表,用于存放每个文件的迭代器
final List<MutableObjectIterator<Entry>> iterators = new ArrayList<>(channelIDs.size() + 1);
// 2. 遍历每个输入文件 (Channel)
for (ChannelWithMeta channel : channelIDs) {
// 2a. 为每个文件创建输入视图 (InputView)
ChannelReaderInputView view =
FileChannelUtil.createInputView(
ioManager,
channel,
openChannels,
compressionCodecFactory,
compressionBlockSize);
// 2b. 将输入视图包装成特定类型的迭代器,并添加到列表中
iterators.add(channelReaderInputViewIterator(view));
}
// 3. 将所有单个文件的迭代器聚合到一个归并迭代器中
return new BinaryMergeIterator<>(
iterators, mergeReusedEntries(channelIDs.size()), mergeComparator());
}
// ... existing code ...
方法流程分解:
- 初始化: 创建一个
ArrayList
(iterators
),用于收集接下来为每个输入文件创建的独立迭代器。 - 循环处理每个 Channel:
- 步骤 2a: 调用
FileChannelUtil.createInputView(...)
。这是一个关键的辅助方法,它负责打开物理文件,处理解压缩,并返回一个ChannelReaderInputView
对象。这个view
对象可以被看作是一个从磁盘文件到内存的、带缓冲和解压功能的数据流。openChannels
列表在这里被传入,createInputView
会将新打开的FileIOChannel
添加到这个列表中,以便上层调用者(mergeChannels
方法)后续可以统一关闭和删除它们。 - 步骤 2b: 调用抽象方法
channelReaderInputViewIterator(view)
。这一步是将通用的字节流(view
)转换为特定数据类型Entry
的迭代器。具体的转换逻辑(即反序列化)由AbstractBinaryExternalMerger
的子类实现。例如,BinaryExternalMerger
会在这里返回一个ChannelReaderInputViewIterator
,它能从view
中读取字节并反序列化成BinaryRow
。
- 步骤 2a: 调用
- 创建最终的归并迭代器:
- 将包含所有单个文件迭代器的
iterators
列表,连同用于对象重用的reusableEntries
列表和用于比较元素的comparator
,一起传递给BinaryMergeIterator
的构造函数。 mergeReusedEntries
和mergeComparator
同样是子类实现的抽象方法。- 最终返回一个
BinaryMergeIterator
实例。
- 将包含所有单个文件迭代器的
mergeChannels(List<ChannelWithMeta> channelIDs)
这是执行单次归并操作的核心私有方法。它接收一批有序文件的元数据(channelIDs
),将它们合并成一个新的、更大的有序文件,并返回新文件的元数据。
// ... existing code ...
private ChannelWithMeta mergeChannels(List<ChannelWithMeta> channelIDs) throws IOException {
// 1. 获取归并迭代器
List<FileIOChannel> openChannels = new ArrayList<>(channelIDs.size());
final BinaryMergeIterator<Entry> mergeIterator =
getMergingIterator(channelIDs, openChannels);
// 2. 创建一个新的输出文件
final FileIOChannel.ID mergedChannelID = ioManager.createChannel();
channelManager.addChannel(mergedChannelID);
ChannelWriterOutputView output = null;
int numBlocksWritten;
try {
// 3. 创建输出视图并写入数据
output =
FileChannelUtil.createOutputView(
ioManager,
mergedChannelID,
compressionCodecFactory,
compressionBlockSize);
writeMergingOutput(mergeIterator, output); // <--- 调用抽象方法
output.close();
numBlocksWritten = output.getBlockCount();
} catch (IOException e) {
// ... 异常处理,确保关闭和删除失败的输出文件 ...
} finally {
// 4. 清理输入的旧文件
for (FileIOChannel channel : openChannels) {
channelManager.removeChannel(channel.getChannelID());
try {
channel.closeAndDelete();
} catch (Throwable ignored) {
}
}
}
// 5. 返回新文件的元数据
return new ChannelWithMeta(mergedChannelID, numBlocksWritten, output.getWriteBytes());
}
// ... existing code ...
流程分解:
- 获取归并迭代器: 调用
getMergingIterator
,它会为每个输入的channelID
创建一个文件读取器,并将这些读取器包装成一个BinaryMergeIterator
。这个迭代器内部使用一个最小堆,每次调用next()
都能返回所有输入文件中的最小(或最大,取决于比较器)的下一条记录。 - 创建输出文件: 通过
ioManager
创建一个新的临时文件ID,并通知channelManager
开始追踪这个新文件。 - 写入数据: 创建一个
ChannelWriterOutputView
用于向新文件写入数据。然后调用抽象方法writeMergingOutput
,该方法会不断从mergeIterator
中取出已排序的记录,并将其序列化写入到output
中。 - 清理输入: 在
finally
块中,确保所有用于本次归并的输入文件(openChannels
)都被关闭和删除。这是非常关键的资源回收步骤。 - 返回结果: 返回新创建的、更大的有序文件的元数据
ChannelWithMeta
。
mergeChannelList
这个方法是外部排序中多轮归并(multi-pass merging)的调度核心。当初始的有序文件(我们称之为“顺串”或“run”)数量非常多,以至于超过了单次归并的最大扇入(maxFanIn
)时,就需要进行多轮归并。mergeChannelList
的职责就是制定并执行一个高效的归并计划,分批次地将大量的输入文件逐步合并,直到文件数量减少到一个合理的范围。
核心思想:优先处理“零头”
方法注释中有一句非常关键的话:
It is most efficient to perform the partial round first. (最有效的方式是首先执行部分归并轮次。)
这是整个算法设计的核心思想。假设我们有 N
个文件,最大扇入为 k
(maxFanIn
)。一个“满”的归并操作是合并 k
个文件生成 1 个文件。如果 N
不是 k
的整数倍,那么必然会有一次或多次归并操作的输入文件数小于 k
,这就是“部分归并”(partial merge)。
这个算法认为,与其在最后一轮留下一个“零头”进行部分归并,不如在第一轮就先处理掉这些“零头”,使得后续的每一轮归并都是“满”的。这样可以最大化地利用 I/O 和计算资源,减少总的归并轮次。
我们结合代码来一步步分析这个归并计划是如何制定和执行的。
public List<ChannelWithMeta> mergeChannelList(List<ChannelWithMeta> channelIDs)
throws IOException {
// ...
// 1. 计算最终需要归并到的文件数量 (numEnd)
final double scale = Math.ceil(Math.log(channelIDs.size()) / Math.log(maxFanIn)) - 1;
final int numStart = channelIDs.size();
final int numEnd = (int) Math.pow(maxFanIn, scale);
// 2. 计算第一轮归并的详细计划
final int numMerges = (int) Math.ceil((numStart - numEnd) / (double) (maxFanIn - 1));
final int numNotMerged = numEnd - numMerges;
final int numToMerge = numStart - numNotMerged;
// 3. 将不需要在第一轮归并的文件直接加入结果列表
final List<ChannelWithMeta> mergedChannelIDs = new ArrayList<>(numEnd);
mergedChannelIDs.addAll(channelIDs.subList(0, numNotMerged));
// 4. 计算第一轮每次归并操作需要合并的文件数
final int channelsToMergePerStep = (int) Math.ceil(numToMerge / (double) numMerges);
// 5. 执行第一轮归并
final List<ChannelWithMeta> channelsToMergeThisStep =
new ArrayList<>(channelsToMergePerStep);
int channelNum = numNotMerged;
while (!closed && channelNum < channelIDs.size()) {
channelsToMergeThisStep.clear();
for (int i = 0;
i < channelsToMergePerStep && channelNum < channelIDs.size();
i++, channelNum++) {
channelsToMergeThisStep.add(channelIDs.get(channelNum));
}
// 核心调用:将一批文件合并成一个新文件
mergedChannelIDs.add(mergeChannels(channelsToMergeThisStep));
}
// 6. 返回第一轮归并后的文件列表
return mergedChannelIDs;
}
步骤 1: 计算目标文件数 numEnd
scale = Math.ceil(Math.log(channelIDs.size()) / Math.log(maxFanIn)) - 1;
Math.log(N) / Math.log(k)
计算的是以k
为底N
的对数,即log_k(N)
。这代表了如果每次都进行k
路归并,需要多少轮才能把N
个文件合并成 1 个。Math.ceil(...)
向上取整,得到完成所有归并需要的总轮次数i
。- 1
得到i-1
。
numEnd = (int) Math.pow(maxFanIn, scale);
- 计算
k^(i-1)
。这个值的含义是:经过第一轮归并后,我们期望剩下多少个文件,以便后续的i-1
轮归并都是“满”的k
路归并。
- 计算
举例: 假设有 19 个文件 (numStart=19
),maxFanIn=4
。
log4(19)
约等于 2.12。ceil(2.12)
= 3。总共需要 3 轮归并。scale
= 3 - 1 = 2。numEnd
= 4^2 = 16。- 这意味着,算法的目标是通过第一轮归并,将 19 个文件变成 16 个文件。这样,接下来的第二轮可以将 16 个文件合并成 4 个(4次4路归并),第三轮可以将 4 个文件合并成 1 个(1次4路归并)。
步骤 2: 计算第一轮的归并计划
numMerges = (int) Math.ceil((numStart - numEnd) / (double) (maxFanIn - 1));
numStart - numEnd
是第一轮需要“消灭”的文件数 (19 - 16 = 3)。- 每次归并操作会减少
maxFanIn - 1
个文件(例如,4个文件合并成1个,净减少3个)。 - 所以
(19-16) / (4-1)
= 1。ceil(1)
= 1。这说明第一轮只需要进行 1 次归并操作。
numNotMerged = numEnd - numMerges;
16 - 1
= 15。这表示有 15 个文件在第一轮中不需要参与归并,它们可以直接“晋级”到下一轮。
numToMerge = numStart - numNotMerged;
19 - 15
= 4。这表示第一轮需要拿出 4 个文件来进行归并。
计划总结: 从 19 个文件中,拿出 4 个进行一次 4 路归并,生成 1 个新文件。另外 15 个文件不动。最终,15 + 1 = 16 个文件进入下一轮。
步骤 3: “晋级”无需归并的文件
mergedChannelIDs.addAll(channelIDs.subList(0, numNotMerged));
- 将列表前
numNotMerged
(15) 个文件直接添加到结果列表中。
- 将列表前
步骤 4 & 5: 执行归并
channelsToMergePerStep = (int) Math.ceil(numToMerge / (double) numMerges);
ceil(4 / 1)
= 4。计算出每次归并操作需要合并的文件数。
while
循环和内部的for
循环负责从输入列表的numNotMerged
(15) 位置开始,取出channelsToMergePerStep
(4) 个文件。mergedChannelIDs.add(mergeChannels(channelsToMergeThisStep));
- 这是核心动作。调用
mergeChannels
方法,将这 4 个文件合并成一个更大的有序文件。 mergeChannels
的返回值(一个ChannelWithMeta
对象,代表新生成的文件)被添加到结果列表中。
- 这是核心动作。调用
步骤 6: 返回结果
- 函数返回
mergedChannelIDs
。此时,这个列表包含了 15 个原始文件和 1 个新生成的合并文件,总共 16 个文件。这个列表将作为下一轮mergeChannelList
的输入(如果需要的话),或者直接用于最终的归并迭代器。
抽象方法 (Template Methods)
这是留给子类实现的扩展点,使得这个归并器可以处理不同类型的数据。
// ... existing code ...
/** @return entry iterator reading from inView. */
protected abstract MutableObjectIterator<Entry> channelReaderInputViewIterator(
ChannelReaderInputView inView);
/** @return merging comparator used in merging. */
protected abstract Comparator<Entry> mergeComparator();
/** @return reused entry object used in merging. */
protected abstract List<Entry> mergeReusedEntries(int size);
/** read the merged stream and write the data back. */
protected abstract void writeMergingOutput(
MutableObjectIterator<Entry> mergeIterator, AbstractPagedOutputView output)
throws IOException;
}
channelReaderInputViewIterator(...)
: 如何从一个输入视图(ChannelReaderInputView
)中反序列化出Entry
对象的迭代器。mergeComparator()
: 提供一个Comparator
来比较两个Entry
对象的大小,这是归并排序的核心。mergeReusedEntries(...)
: 为了减少GC,归并迭代器会重用对象。此方法提供这些可重用的Entry
对象。writeMergingOutput(...)
: 定义了如何将从mergeIterator
中取出的Entry
对象序列化并写入到输出视图(AbstractPagedOutputView
)中。
例如,BinaryExternalMerger
子类实现了这些方法来处理 BinaryRow
类型的数据。
合并后写入一页占多数KB?
Merger 是通过在初始化时接收一个 compressionBlockSize 参数,然后用这个参数去配置一个 ChannelWriterOutputView,最终由这个 ChannelWriterOutputView 来保证合并后写出的每一“页”(数据块)的(未压缩)大小都符合这个预设值。
Merger 负责“生产”有序数据,而 ChannelWriterOutputView 负责将这些数据“打包”成固定大小的页。
总结
AbstractBinaryExternalMerger
是 Paimon 外部排序算法中一个设计精良、高度可复用的核心组件。它通过模板方法模式定义了通用的、高效的多轮多路归并算法框架,同时将与具体数据类型相关的逻辑(比较、序列化)解耦出去。它与 IOManager
、SpillChannelManager
和 BinaryMergeIterator
等组件无缝协作,共同完成从大量无序数据到单一有序数据流的转化过程中最关键的“归并”步骤,并保证了临时磁盘资源的有效管理和回收。
BinaryExternalMerger
首先,从类名和注释 /** Record merger for sort of BinaryRow. */
可以看出:
Binary
: 它处理的是二进制格式的数据,具体来说是 Paimon 中的BinaryRow
对象。BinaryRow
是一种紧凑的、序列化后的行数据格式,用于高效的内存操作和存储。External
: 这个词是关键,它表明这个类用于处理 "外部" 数据,即存储在磁盘上的数据。这通常与外排序 (External Sorting) 算法相关。当数据量大到无法一次性装入内存时,就需要将数据分块排序后写入磁盘临时文件(称为 "run" 或 "spill file"),然后再将这些有序的临时文件合并成一个最终的有序文件。Merger
: 它的核心功能是 "合并"(Merge)。
总结:BinaryExternalMerger
是 Paimon 外排序框架中的一个核心组件,其唯一职责是:将多个存储在磁盘上、已经排好序的 BinaryRow
数据文件,高效地合并成一个单一的、全局有序的数据流。
public class BinaryExternalMerger extends AbstractBinaryExternalMerger<BinaryRow> {
- 它继承自
AbstractBinaryExternalMerger<BinaryRow>
。这是一个典型的模板方法模式。 - 父类
AbstractBinaryExternalMerger
封装了外排序合并的通用逻辑和骨架,例如:- 管理磁盘上的临时文件(spill files)。
- 控制合并的 "扇入"(fan-in),即一次最多能合并多少个文件。
- 构建一个最小堆(Min-Heap)或优先队列,用于从多个输入流中实时找出最小的记录。
- 处理文件的读写、压缩和解压等 I/O 操作。
- 子类
BinaryExternalMerger
则负责实现与具体数据类型BinaryRow
相关的特定逻辑,通过重写父类的抽象方法来 "填充" 模板。
核心成员变量
private final BinaryRowSerializer serializer;
private final RecordComparator comparator;
这个类只有两个成员变量,它们是实现 BinaryRow
合并逻辑的关键:
serializer
(类型:BinaryRowSerializer
): 这是一个序列化器。因为磁盘上存储的是原始字节,BinaryExternalMerger
在从文件读取数据时,需要用serializer
将字节反序列化成BinaryRow
对象。同样,在将合并后的结果写回磁盘时,也需要用它将BinaryRow
对象序列化成字节。comparator
(类型:RecordComparator
): 这是一个记录比较器。合并算法的核心是不断地从所有输入文件中比较 "头部" 的记录,选出最小的一个。comparator
就定义了如何比较两个BinaryRow
对象的大小,从而确定排序的依据。
构造函数
public BinaryExternalMerger(
IOManager ioManager,
int pageSize,
int maxFanIn,
SpillChannelManager channelManager,
BinaryRowSerializer serializer,
RecordComparator comparator,
BlockCompressionFactory compressionCodecFactory,
int compressionBlockSize) {
super(
ioManager,
pageSize,
maxFanIn,
channelManager,
compressionCodecFactory,
compressionBlockSize);
this.serializer = serializer;
this.comparator = comparator;
}
构造函数接收所有必要的配置和组件,并将通用参数(如 IOManager
、pageSize
、压缩配置等)传递给父类 super(...)
进行初始化。然后,它保存特定于 BinaryRow
的 serializer
和 comparator
。
channelReaderInputViewIterator
@Override
protected MutableObjectIterator<BinaryRow> channelReaderInputViewIterator(
ChannelReaderInputView inView) {
return new ChannelReaderInputViewIterator(inView, null, serializer.duplicate());
}
- 作用: 当父类需要读取一个磁盘文件(
ChannelReaderInputView
)时,会调用此方法来获取一个记录迭代器。 - 实现: 它返回一个
ChannelReaderInputViewIterator
。这个迭代器内部会使用传入的serializer
,不断地从输入流inView
中读取字节数据,并将其反序列化成BinaryRow
对象,从而实现逐条记录的读取。MutableObjectIterator
是一种可复用对象的迭代器,有助于减少GC开销。
mergeComparator
@Override
protected Comparator<BinaryRow> mergeComparator() {
return comparator::compare;
}
- 作用: 父类的合并逻辑(通常是优先队列)需要一个标准的
java.util.Comparator
来比较元素。此方法用于提供这个比较器。 - 实现: 它直接将 Paimon 内部的
RecordComparator
接口适配为标准的Comparator
接口。comparator::compare
是一个方法引用,指向了RecordComparator
的compare
方法。
mergeReusedEntries
@Override
protected List<BinaryRow> mergeReusedEntries(int size) {
ArrayList<BinaryRow> reused = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
reused.add(serializer.createInstance());
}
return reused;
}
- 作用: 为了提高性能,合并迭代器会为每个输入文件(流)预先分配一个
BinaryRow
对象,用于承载从文件中读取的数据,避免在迭代过程中频繁创建新对象。此方法就是用来创建这些可复用的BinaryRow
对象的。 - 实现: 它根据需要的数量
size
(即合并文件的数量),调用serializer.createInstance()
创建相应数量的空BinaryRow
对象,并放入一个列表中返回。
writeMergingOutput
@Override
protected void writeMergingOutput(
MutableObjectIterator<BinaryRow> mergeIterator, AbstractPagedOutputView output)
throws IOException {
// read the merged stream and write the data back
BinaryRow rec = serializer.createInstance();
while ((rec = mergeIterator.next(rec)) != null) {
serializer.serialize(rec, output);
}
}
- 作用: 当父类准备好最终合并后的有序数据流(
mergeIterator
)后,调用此方法将这个流中的所有记录写入最终的输出目标(output
)。 - 实现: 它循环遍历
mergeIterator
,每取出一个BinaryRow
记录,就立即使用serializer
将其序列化并写入到output
视图中。这里同样复用了rec
对象来接收迭代器返回的数据。
总结
BinaryExternalMerger
是一个设计优雅且职责单一的类。它完美地利用了模板方法模式,将通用的外部合并算法框架与具体 BinaryRow
类型的处理逻辑解耦。它通过提供序列化器和比较器,并实现读、写、比较和对象复用这四个关键操作,使得父类 AbstractBinaryExternalMerger
能够顺利地完成对 BinaryRow
类型数据的外部归并排序。