MapOutputBuffer
MapOutputBuffer
是 MapOutputCollector
接口的一个实现。它的核心职责是:在内存中高效地收集、分区、排序 Mapper
的输出,并在内存不足时,将数据 溢写(Spill)到磁盘。当 Map 任务结束时,它还要负责将所有溢写文件合并(Merge) 成一个最终的、对 Reduce 任务友好的输出文件。
开发者通常不会直接与 MapOutputBuffer
交互。它的使用是框架自动完成的:
- 在
MapTask
的runNewMapper
或runOldMapper
方法中,框架会检查作业的 Reduce 任务数量 (job.getNumReduceTasks()
)。 - 如果存在 Reduce 任务(即
numReduceTasks > 0
),框架就会创建一个MapOutputBuffer
实例(通过createSortingCollector
方法)。 - 这个
MapOutputBuffer
实例会被包装进一个RecordWriter
(例如NewOutputCollector
)。 - 这个
RecordWriter
会被传递给Mapper
的Context
对象。 - 当用户在
Mapper
代码中调用context.write(key, value)
时,这个调用链最终会触发MapOutputBuffer
的collect
方法,从而将数据写入环形缓冲区。
// ... existing code ...
// get an output object
if (job.getNumReduceTasks() == 0) {
// 如果没有 Reducer,数据直接写入 HDFS,不使用 MapOutputBuffer
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
// 如果有 Reducer,则使用 NewOutputCollector,其内部封装了 MapOutputBuffer
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
// ... existing code ...
// 在 NewOutputCollector 的构造函数中,创建了 MapOutputBuffer
NewOutputCollector(/*...*/) throws IOException, ClassNotFoundException {
collector = createSortingCollector(job, reporter); // collector 就是 MapOutputBuffer
// ...
}
// ... existing code ...
// 在 NewOutputCollector 的 write 方法中,调用了 MapOutputBuffer 的 collect
@Override
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}
// ... existing code ...
核心实现机制:环形缓冲区与元数据
MapOutputBuffer
的设计精髓在于它对内存的极致利用。它不存储 Java 对象,而是将键值对序列化后存入一个巨大的字节数组 kvbuffer
。这个数组被当作一个环形缓冲区使用。
为了快速排序和查找,它并没有在 kvbuffer
中移动庞大的序列化数据,而是采用了一种更聪明的方法:元数据与数据分离。
kvbuffer
(byte[]): 实际存储序列化后的 key/value 数据的字节数组。kvmeta
(IntBuffer): 这是一个IntBuffer
,它覆盖在kvbuffer
的同一块内存上。它不存实际数据,而是存储每条记录的元数据。
每条记录的元数据占用 NMETA
(值为4) 个整数,也就是 METASIZE
(值为16) 个字节。
// ... existing code ...
private static final int VALSTART = 0; // value 在 kvbuffer 中的起始位置
private static final int KEYSTART = 1; // key 在 kvbuffer 中的起始位置
private static final int PARTITION = 2; // 这条记录所属的分区号
private static final int VALLEN = 3; // value 的长度
private static final int NMETA = 4; // 元数据的整数个数
private static final int METASIZE = NMETA * 4; // 元数据占用的字节数
// ... existing code ...
这个环形缓冲区由一系列指针管理,元数据区域和序列化数据区域从缓冲区的两端相向增长,最大化地利用空间。
kvmeta
和 kvbuffer
的关系:同一块内存,不同的视图
从类的成员变量定义来看,它们确实是两个独立的变量:
// ... existing code ...
// k/v accounting
private IntBuffer kvmeta; // metadata overlay on backing store
// ...
byte[] kvbuffer; // main output buffer
// ... existing code ...
但是,关键在于它们的初始化过程。在 MapOutputBuffer
的 init
方法中,你会看到这样一段代码:
// ... existing code ...
// buffers and accounting
int maxMemUsage = sortmb << 20;
maxMemUsage -= maxMemUsage % METASIZE;
kvbuffer = new byte[maxMemUsage]; // 1. 首先,分配一块大的字节数组
bufvoid = kvbuffer.length;
kvmeta = ByteBuffer.wrap(kvbuffer) // 2. 然后,将这个字节数组包装成一个 ByteBuffer
.order(ByteOrder.nativeOrder())
.asIntBuffer(); // 3. 最后,从这个 ByteBuffer 派生出一个 IntBuffer 视图
setEquator(0);
// ... existing code ...
让我们一步步解析这个过程:
kvbuffer = new byte[maxMemUsage];
: 这里,程序向 JVM 申请了一大块连续的内存,并用一个字节数组kvbuffer
来引用它。这块内存是所有数据存储的物理基础。ByteBuffer.wrap(kvbuffer)
:java.nio.ByteBuffer
是 Java NIO 库中的一个核心类,它可以将一个已有的byte[]
数组“包装”起来,提供更丰富的读写操作。重要的是,wrap
操作不会复制数组,ByteBuffer
实例和原始的kvbuffer
共享同一块底层内存。对ByteBuffer
的任何修改都会直接反映在kvbuffer
上,反之亦然。.asIntBuffer()
: 这是最关键的一步。ByteBuffer
对象有一个asIntBuffer()
方法,它可以创建一个IntBuffer
视图(View)。这个IntBuffer
同样不会分配新的内存,它和ByteBuffer
(也就是和kvbuffer
)共享同一块底层内存。
这种设计的优点是:
- 节省内存:不需要为元数据额外分配内存空间。
- 缓存友好:数据和它的元数据在物理上是相邻的,这有利于 CPU 缓存的命中率,提高访问速度。
- 高效:通过 IntBuffer 直接以整数为单位读写元数据,比从字节数组中手动转换要快得多。
byte[] b0 = new byte[0];
的作用
这个定义看起来非常奇怪:一个长度为0的、不可变的空字节数组。它的作用体现在 collect
方法的这段代码中:
// ... existing code ...
// It's possible for records to have zero length, i.e. the serializer
// will perform no writes. To ensure that the boundary conditions are
// checked and that the kvindex invariant is maintained, perform a
// zero-length write into the buffer. The logic monitoring this could be
// moved into collect, but this is cleaner and inexpensive. For now, it
// is acceptable.
bb.write(b0, 0, 0);
// the record must be marked after the preceding write, as the metadata
// for this record are not yet written
int valend = bb.markRecord();
// ... existing code ...
这里的 bb
是 BlockingBuffer
的实例,它是一个内部类,充当了向 kvbuffer
写入数据的代理。
核心目的:处理值(value)为空或长度为0的特殊情况,并统一处理逻辑。
在 MapReduce 中,一个 <key, value>
对的 value 完全有可能是空的(例如,NullWritable
)或者是一个空的 BytesWritable
。当 valSerializer.serialize(value)
执行时,如果 value 是空的,它可能不会向底层的 kvbuffer
写入任何字节。
这会带来一个问题:MapOutputBuffer
的很多内部逻辑,特别是环形缓冲区的指针管理,都依赖于每次 write
操作来更新状态。如果一次 serialize
操作没有发生任何实际的写入,那么一些边界检查和指针更新的逻辑可能就不会被触发,从而导致状态不一致。
bb.write(b0, 0, 0);
的作用就是:
无论 valSerializer
是否真的写入了数据,代码都强制执行一次“写入长度为0”的操作。这个操作本身不会向 kvbuffer
添加任何字节,但它会触发 BlockingBuffer
内部的 write
方法。在这个方法里,可以统一处理环形缓冲区的边界检查、指针回绕(wrap-around)等逻辑。
通过这种方式,代码避免了为“零长度写入”编写一套单独的、复杂的 if-else
判断逻辑,使得 collect
方法的主流程更加清晰和健壮。定义一个 static final
的 b0
数组可以避免在每次调用时都创建一个新的空数组对象,是一种微小的性能优化。
简单来说,b0
是一个“占位符”,用于确保即使在写入空值的情况下,缓冲区的管理逻辑也能被正确地执行一次。
collect
方法:数据收集与溢写触发
collect
方法是数据进入缓冲区的入口。这是一个 synchronized
方法,保证了线程安全。
// ... existing code ...
public synchronized void collect(K key, V value, final int partition
) throws IOException {
reporter.progress();
// ... 类型和分区检查 ...
checkSpillException(); // 检查后台溢写线程是否有异常
bufferRemaining -= METASIZE; // 预留元数据空间
if (bufferRemaining <= 0) {
// 缓冲区空间不足,触发溢写判断逻辑
spillLock.lock();
try {
do {
if (!spillInProgress) {
// ... 计算缓冲区使用情况 ...
final boolean bufsoftlimit = bUsed >= softLimit;
// ...
if (bufsoftlimit && kvindex != kvend) {
// 如果已用空间超过软限制 (io.sort.spill.percent, 默认80%)
// 并且有数据可写,则启动溢写
startSpill();
// ... 重新计算剩余空间 ...
}
}
} while (false);
} finally {
spillLock.unlock();
}
}
// ... 序列化 key 和 value 到 kvbuffer ...
// ... 将元数据 (partition, key_start, val_start, val_len) 写入 kvmeta ...
// ... 更新 kvindex 和 bufindex 指针 ...
}
// ... existing code ...
核心流程:
- 检查空间:每次收集数据前,先检查剩余空间
bufferRemaining
。 - 触发溢写:当
bufferRemaining
小于等于0时,进入一个复杂的判断逻辑。如果当前没有正在进行的溢写,并且已用空间超过了softLimit
(由mapreduce.map.sort.spill.percent
配置,默认 80%),就会调用startSpill()
启动后台溢写线程。 - 序列化:将 key 和 value 对象序列化成字节,写入
kvbuffer
。 - 记录元数据:将这条记录的分区号、key 的起始位置、value 的起始位置和 value 的长度这四个整数写入
kvmeta
。 - 更新指针:移动
kvindex
和bufindex
指针,标记新的数据边界。
SpillThread
:后台排序与溢写
MapOutputBuffer
最重要的优化之一就是将排序和磁盘I/O操作放在一个独立的后台线程 SpillThread
中执行,这样 Mapper
的 collect
方法就不会因为磁盘操作而长时间阻塞,可以继续接收新的数据。
启动:
collect
方法调用startSpill()
,它会获取spillLock
锁,然后唤醒正在等待spillReady
条件的SpillThread
。排序:
SpillThread
醒来后,执行sortAndSpill()
方法。该方法的第一步就是对内存中的数据进行排序。MapOutputBuffer
实现了IndexedSortable
接口,提供了compare
和swap
方法。sorter.sort(...)
方法(默认是QuickSort
)只对kvmeta
中的元数据进行排序,而不是移动kvbuffer
中庞大的实际数据。compare
方法的逻辑是排序的核心:首先按分区号(PARTITION)排序,如果分区号相同,再按 key 排序。key的比较是直接在kvbuffer
字节数组上进行的,效率很高。
// ... existing code ... @Override public int compare(final int mi, final int mj) { // ... 获取两条记录的元数据 ... final int kvip = kvmeta.get(kvi + PARTITION); final int kvjp = kvmeta.get(kvj + PARTITION); // sort by partition if (kvip != kvjp) { return kvip - kvjp; } // sort by key return comparator.compare(kvbuffer, kvmeta.get(kvi + KEYSTART), kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART), kvbuffer, kvmeta.get(kvj + KEYSTART), kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART)); } // ... existing code ...
Combiner (可选):如果用户配置了
Combiner
,在排序之后、写入磁盘之前,sortAndSpill
会对每个分区内排序好的数据运行Combiner
。这可以看作是一次 "mini-reduce",能有效减少写入磁盘和后续网络传输的数据量。溢写到文件:
sortAndSpill
遍历排序后的元数据,将每个分区的数据顺序写入一个临时的溢写文件(spill file)。同时,它会为这个溢写文件生成一个SpillRecord
,记录下每个分区数据在这个文件中的偏移量、原始长度和压缩后长度等索引信息。
flush
和 close
:最终合并
当 MapTask
处理完所有输入数据后,会调用 collector.close()
,这会触发 MapOutputBuffer
的 flush()
和 close()
方法。
flush()
:- 强制执行最后一次溢写,将缓冲区中剩余的所有数据都写入一个新的溢写文件。
- 等待
SpillThread
完全结束。 - 调用
mergeParts()
(该方法在MapTask
中,但由MapOutputBuffer
的生命周期触发)。mergeParts
会将磁盘上所有的溢写文件进行多路归并排序,生成一个最终的、全局有序(分区内key有序)的输出文件file.out
和一个总索引文件file.out.index
。这个索引文件将被 Reduce 任务用来确定需要拉取的数据范围。
总结
MapOutputBuffer
是 MapReduce 框架的性能心脏。它通过以下一系列精巧的设计,实现了在有限内存下对海量中间数据的高效处理:
- 环形内存缓冲区:最大化利用配置的内存(
io.sort.mb
)。 - 元数据与数据分离:通过只排序轻量级的元数据指针,极大地提高了内存排序的速度。
- 后台溢写线程:将耗时的排序和I/O操作与
Mapper
的数据收集过程解耦,实现了并发执行。 - 分区内排序:在 Map 端就完成了大部分排序工作,为 Reduce 端的归并排序奠定了基础。
- 可选的 Combiner:在数据落盘前进行聚合,显著减少了 I/O 负载。