Hadoop MapReduce过程

发布于:2025-08-11 ⋅ 阅读:(18) ⋅ 点赞:(0)

MapReduce 执行流程深度解析

第一部分:客户端与集群的分工 (getSplits vs. createRecordReader)

客户端(Client)
  1. job.submit() / job.waitForCompletion(true) 触发分片计算:客户端首先实例化作业指定的 InputFormat(如 TextInputFormat),然后调用其 getSplits() 方法。
  2. 提交资源:计算出的分片元数据(InputSplit 列表)、作业配置和 JAR 包等资源被提交给 YARN 的 ResourceManager
  • 源码佐证:
    • FileInputFormat.java 中核心的 getSplits(JobContext job) 方法,通过遍历文件、计算 splitSize 来生成 FileSplit 对象列表,这正是分片逻辑的实现。
      // ... existing code ...
        public List<InputSplit> getSplits(JobContext job) throws IOException {
          StopWatch sw = new StopWatch().start();
          long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
          long maxSize = getMaxSplitSize(job);
      
          // generate splits
          List<InputSplit> splits = new ArrayList<InputSplit>();
          List<FileStatus> files = listStatus(job);
          // ... 循环遍历文件并创建分片 ...
          return splits;
        }
      // ... existing code ...
      
    • 在大量的测试用例中(如 TestFixedLengthInputFormat.javaTestCombineTextInputFormat.java),都是在测试代码中直接调用 format.getSplits(job, ...),这模拟了客户端的行为。
集群(YARN)
  1. ResourceManager 接收到作业后,启动 ApplicationMaster (AM)
  2. ApplicationMaster 根据 InputSplit 的数量向 ResourceManager 申请相应数量的容器(Container)来执行 Map 任务。
  3. NodeManager 在容器中启动 MapTask 进程(YarnChild)。
  4. MapTask 进程内部:
    • 反序列化获取分配给自己的那个 InputSplit
    • 加载 InputFormat 类,并调用 createRecordReader(split, context) 方法创建 RecordReader
    • 使用 RecordReader 从分片中读取 <key, value> 对,并传递给用户实现的 map() 方法。
  • 源码佐证:
    • InputSampler.java 是一个在客户端对输入进行采样的工具,它完美地模拟了 Map 任务的行为:它先获取分片,然后为每个分片创建 RecordReader 并读取数据。
      // ... existing code ...
            RecordReader<K,V> reader = inf.createRecordReader(
                splits.get(i), samplingContext);
            reader.initialize(splits.get(i), samplingContext);
            while (reader.nextKeyValue()) {
              samples.add(ReflectionUtils.copy(job.getConfiguration(),
                                               reader.getCurrentKey(), null));
      // ... existing code ...
      
    • MapReduceTutorial.md 也明确指出:RecordReader 的职责是从 InputSplit 提供的面向字节的视图中,转换为 Mapper 需要的面向记录的视图。

第二部分:MapReduce 完整流程

1. 输入(Input)和分片(Split)

  • 描述: 由 InputFormat 的 getSplits() 方法在客户端执行。它将输入数据源逻辑切分为多个 InputSplit,每个 InputSplit 对应一个 Map 任务。分片是逻辑概念,包含位置和长度,而非物理切割。
  • 源码佐证InputFormat.java 接口的 Javadoc 清晰地描述了 getSplits 的作用是“Logically split the set of input files for the job”。

2. 映射(Map)

  • 描述: 每个 Map 任务在一个 InputSplit 上执行。它使用 RecordReader 读取数据,并执行用户定义的 map() 方法,产出中间 <key, value> 对。

3. Shuffle(洗牌)

这是 MapReduce 的“心脏”。

  • Map 端 Shuffle:

    • 分区(Partition)Partitioner 决定中间键值对被发送到哪个 Reducer。
    • 排序和溢写(Sort & Spill): 数据首先写入环形内存缓冲区,在缓冲区内排序。当缓冲区满(由 mapreduce.map.sort.spill.percent 控制)时,数据被溢写(Spill)到磁盘上的临时文件。
    • 合并(Merge): 如果产生了多个溢写文件,Map 任务结束前,会将这些文件合并排序成一个单一的、已分区且内部有序的输出文件。这是为 Reduce 端的拉取做准备。
    • Combiner (可选): 在排序之后,合并之前对数据进行本地聚合,减少 I/O。
  • 源码佐证 (Map端Merge)MapTask.java 中有非常明确的合并逻辑。它会收集所有溢写文件(Spill)对应的 Segment,然后调用 Merger.merge 方法。

     
    // ... existing code ...
              //create the segments to be merged
              List<Segment<K,V>> segmentList =
                new ArrayList<Segment<K, V>>(numSpills);
              for(int i = 0; i < numSpills; i++) {
                IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
    // ... existing code ...
              //merge
              @SuppressWarnings("unchecked")
              RawKeyValueIterator kvIter = Merger.merge(job, rfs,
                             keyClass, valClass, codec,
                             segmentList, mergeFactor,
                             new Path(mapId.toString()),
                             job.getOutputKeyComparator(), reporter, sortSegments,
                             null, spilledRecordsCounter, sortPhase.phase(),
                             TaskType.MAP);
    // ... existing code ...
    
  • Reduce 端 Shuffle:

    • 复制(Copy): Reduce 任务启动后,主动从各个 Map 任务的输出位置拉取(Copy)属于自己分区的数据。
    • 合并(Merge/Sort): 在拉取数据的同时,在内存中进行归并排序。如果数据量过大,也会溢写到磁盘。最终,所有相关的 Map 输出被合并成一个统一的、有序的数据集。

4. 规约(Reduce)

  • 描述: 框架将 Shuffle 阶段排序好的数据,以 <key, (list of values)> 的形式喂给 reduce() 方法。对于每一个唯一的 key,reduce() 方法被调用一次。

5. 输出(Output)

  • 描述Reducer 的输出通过 OutputFormat(及其包含的 RecordWriter)写入最终目的地,如 HDFS。

Mapper 和 Reducer 的核心联系:中间键值对

Mapper 和 Reducer 的核心联系是中间键值对 (intermediate key/value pairs)

  1. Mapper 的产出: Mapper 的主要职责是处理输入的记录,并生成一系列中间的 <key, value> 对。这些是 Shuffle 过程的原材料。
  2. Reducer 的输入: Reducer 的输入是经过 Shuffle 和 Sort 之后,将来自所有 Mapper 的、具有相同 key 的中间值聚合在一起形成的 <key, (list of values)>

正如 Mapper.java 的注释所说:

All intermediate values associated with a given output key are subsequently grouped by the framework, and passed to a {@link Reducer} to determine the final output.

// ... existing code ...
 * <p>All intermediate values associated with a given output key are 
 * subsequently grouped by the framework, and passed to a {@link Reducer} to  
 * determine the final output. Users can control the sorting and grouping by 
 * specifying two key {@link RawComparator} classes.</p>
// ... existing code ...

Mapper 把数据发往哪里?—— Partitioner 的决策

Mapper 并不直接“知道”要把数据发给哪个具体的 Reducer。这个决策是由 Partitioner 做出的。

  1. 分区(Partitioning): 对于 Mapper 输出的每一个 <key, value> 对,框架都会调用 Partitioner 的 getPartition 方法。
  2. 决定 Reducer 索引getPartition 方法会根据 key(或者 key 的一部分)计算出一个整数,这个整数就是目标 Reducer 的索引(从 0 到 numReduceTasks - 1)。
  3. 写入本地分区文件: Mapper 会将这个键值对写入自己本地磁盘上对应分区的一个缓冲区,最终合并成一个大的、已分区的输出文件。

所以,Mapper 的输出首先是写到自己所在的 Worker Node 的本地磁盘上,而不是直接通过网络发送给 Reducer。它只是根据 Partitioner 的逻辑,将数据整理好,等待 Reducer 来拉取。

MapReduceTutorial.md 对此有清晰的描述:

Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or a subset of the key) is used to derive the partition, typically by a hash function. The total number of partitions is the same as the number of reduce tasks for the job. Hence this controls which of the m reduce tasks the intermediate key (and hence the record) is sent to for reduction.

MapReduceTutorial.md

[Partitioner](../../api/org/apache/hadoop/mapreduce/Partitioner.html) partitions the key space.

Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or a subset of the key) is used to derive the partition, typically by a *hash function*. The total number of partitions is the same as the number of reduce tasks for the job. Hence this controls which of the `m` reduce tasks the intermediate key (and hence the record) is sent to for reduction.

网络在何时以及如何表示?—— Reduce 端的拉取 (Copy/Fetch)

网络传输在 MapReduce 中主要用于 Reduce 端的 Shuffle 阶段

当一个 Reduce 任务启动后,它知道自己需要处理哪个分区的数据(例如,分区 i)。于是,它会向 ApplicationMaster 查询所有已完成的 Map 任务的地址,然后通过 HTTP 协议,从每一个 Map 任务所在的 NodeManager 上拉取(fetch/copy)属于分区 i 的那部分数据。

这个过程可以总结为:

  1. Reduce 任务启动: Reducer 知道自己的分区号。
  2. 查询 Map 输出位置: Reducer 向 ApplicationMaster 询问所有 Map 任务的输出位置。
  3. HTTP 拉取: Reducer 启动多个“复制线程”(copier threads),并行地通过 HTTP GET 请求从各个 Map 任务所在的节点上下载属于自己分区的数据块。
  4. 内存/磁盘合并: 拉取来的数据块首先在 Reducer 的内存中进行合并和排序,如果数据量太大,也会溢写到 Reducer 所在节点的本地磁盘,最终合并成一个有序的数据集,供 reduce() 方法使用。

MapReduceTutorial.md 在描述 Reduce 端的 Shuffle 阶段时提到了这一点:

Input to the Reducer is the sorted output of the mappers. In this phase the framework fetches the relevant partition of the output of all the mappers, via HTTP.

##### Shuffle

Input to the `Reducer` is the sorted output of the mappers. In this phase the framework fetches the relevant partition of the output of all the mappers, via HTTP.

总结

  • 联系: Mapper 和 Reducer 通过中间键值对联系起来。
  • Mapper 发送目标: Mapper 的输出目标由 Partitioner 决定,数据被写入本地磁盘的不同分区中。
  • 网络表示: 网络主要用于 Reduce 任务通过 HTTP 协议主动拉取各个 Mapper 节点上属于自己分区的数据。这是一个“拉”(Pull)模型,而不是“推”(Push)模型。

这种设计将计算(Map)和网络传输(Reduce Shuffle)解耦,并通过本地磁盘作为缓冲,提高了整个系统的鲁棒性和效率。

Mapper

Mapper 是 MapReduce 编程模型中至关重要的一环,它负责数据处理的第一个阶段,即“Map(映射)”阶段。它的核心职责是将输入的数据集(通常是原始数据)转换成一系列中间的键值对(Key-Value pairs),为后续的“Reduce(规约)”阶段做准备。

下面我们从用法、核心方法、执行流程以及与框架的交互等几个方面来深入分析。

Mapper 的基本用法

正如其 Javadoc 注释和类定义所示,开发者通常不直接实例化 Mapper 类,而是通过继承它来创建自己的 Mapper 实现。

// ... existing code ...
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
// ... existing code ...

Mapper 是一个泛型类,它有四个泛型参数,分别定义了输入和输出的键值对类型:

  • KEYIN: 输入键(Key)的数据类型。例如,LongWritable,通常表示文件中一行的偏移量。
  • VALUEIN: 输入值(Value)的数据类型。例如,Text,通常表示文件中的一行文本。
  • KEYOUT: 中间输出键的数据类型。由你的业务逻辑决定,例如在单词计数的例子中是 Text(单词)。
  • VALUEOUT: 中间输出值的数据类型。同样由业务逻辑决定,例如在单词计数的例子中是 IntWritable(计数值 1)。

开发者最主要的工作就是重写 Mapper 中的一个或多个核心方法来实现具体的业务逻辑:

  • setup(): 可选。在任务开始前执行一次,用于初始化。
  • map()必须。 对输入的每个键值对执行一次,是 Mapper 的核心逻辑所在。
  • cleanup(): 可选。在任务结束后执行一次,用于清理资源。

代码注释中提供了一个经典的“单词计数”(Word Count)示例,非常清晰地展示了如何使用 Mapper

public class TokenCounterMapper 
    extends Mapper<Object, Text, Text, IntWritable>{
   
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
  
  public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
      word.set(itr.nextToken());
      context.write(word, one); // 输出 <单词, 1>
    }
  }
}

在这个例子中:

  1. TokenCounterMapper 继承了 Mapper<Object, Text, Text, IntWritable>
  2. 输入是 <Object, Text>,即行的偏移量和行的内容。
  3. 输出是 <Text, IntWritable>,即单词和数字 1。
  4. map 方法将一行文本 (value) 切分成多个单词,然后对每个单词,通过 context.write() 方法输出一个 <单词, 1> 的键值对。

核心方法与执行流程剖析

Mapper 的执行由其生命周期方法 run() 控制。这个方法定义了一个标准的执行流程模板。

// ... existing code ...
  /**
   * Called once at the beginning of the task.
   */
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }

  /**
   * Called once for each key/value pair in the input split. Most applications
   * should override this, but the default is the identity function.
   */
  @SuppressWarnings("unchecked")
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }

  /**
   * Called once at the end of the task.
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }
  
  /**
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }
}
  • setup(Context context):

    • 调用时机: 在 run 方法中,map 循环开始之前被调用一次。
    • 作用: 执行初始化工作。例如,在 TokenCounterMapper 示例中,可以在 setup 方法里初始化 one 和 word 对象,避免在 map 方法中重复创建,提高性能。
  • map(KEYIN key, VALUEIN value, Context context):

    • 调用时机: 在 run 方法的 while 循环中,每当 context.nextKeyValue() 返回 true 时被调用。框架会为输入分片(InputSplit)中的每一条记录调用一次此方法。
    • 作用: 这是 Mapper 的心脏,用于实现核心的转换逻辑。
    • 默认实现Mapper 提供了一个默认的 map 实现:context.write((KEYOUT) key, (VALUEOUT) value);。这是一个“恒等映射”,即将输入原封不动地作为输出。这在某些场景下很有用,比如当 MapReduce 作业只用于排序或数据格式转换时。
  • cleanup(Context context):

    • 调用时机: 在 run 方法的 finally 块中,确保在任务(无论是正常完成还是异常退出)的最后阶段被调用一次。
    • 作用: 执行清理工作,如关闭在 setup 中打开的文件句柄或网络连接,确保资源被正确释放。
  • run(Context context):

    • 作用: 这是驱动整个 Mapper 任务执行的模板方法。它定义了 Mapper 的标准生命周期:setup -> map (循环) -> cleanup
    • 执行流程:
      1. 调用 setup()
      2. 进入 try 块,循环调用 context.nextKeyValue() 来读取新的键值对。
      3. 如果读取成功,则调用 map() 方法处理该键值对。
      4. 循环结束后(或发生异常时),finally 块保证 cleanup() 方法一定会被执行。
    • 高级定制: 如注释所说,高级用户可以重写 run 方法来实现更复杂的逻辑,例如启动多个线程来并行处理 map 任务,以充分利用多核 CPU 资源。

Mapper.Context 的作用

Context 对象是 Mapper 与 Hadoop 框架之间沟通的桥梁。它是一个在 Mapper 类内部定义的抽象类,框架会为其提供一个具体的实现。Context 为 Mapper 提供了执行所需的所有上下文信息和功能。

// ... existing code ...
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

  /**
   * The <code>Context</code> passed on to the {@link Mapper} implementations.
   */
  public abstract class Context
    implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  }
// ... existing code ...

它的主要功能包括:

  • 获取输入context.getCurrentKey() 和 context.getCurrentValue()
  • 迭代控制context.nextKeyValue(),用于在 run 方法中驱动循环。
  • 输出结果context.write(KEYOUT key, VALUEOUT value),将中间键值对写入缓冲区。
  • 获取配置context.getConfiguration(),可以访问作业的 Configuration 对象,读取配置信息。
  • 报告进度和状态context.setStatus() 和 context.progress()
  • 使用计数器context.getCounter(),用于统计自定义的指标,方便调试和监控。

Mapper 与框架的交互

一个 Mapper 实例并非独立运行,而是作为整个 MapReduce 作业流的一部分,与框架的其他组件紧密协作:

  1. 输入: 框架通过 InputFormat 读取数据源,将其切分为多个 InputSplit。每个 InputSplit 会被分配给一个 Map Task 来处理。
  2. 分区 (Partitioning)Mapper 输出的中间键值对会经过 PartitionerPartitioner 根据 KEYOUT 决定这条记录应该发送给哪个 Reducer。默认的分区器是 HashPartitioner,它通过计算 key 的哈希值来分配。
  3. 排序 (Sorting): 在写入磁盘和传给 Reducer 之前,框架会对 Mapper 的输出在缓冲区内按 KEYOUT 进行排序。
  4. 合并 (Combining): 用户可以指定一个 Combiner。它在 Mapper 端对排序后的输出进行一次本地的“预聚合”(mini-reduce)。这可以极大地减少从 Mapper 传输到 Reducer 的数据量,是 MapReduce 中一个非常重要的性能优化点。
  5. 零 Reducer: 如果作业设置的 Reducer 数量为 0,那么 Mapper 的输出将不会经过分区、排序和聚合,而是直接由 OutputFormat 写入到最终的输出文件中。这种作业被称为 "Map-Only Job"。

Reducer

与 Mapper 类似,开发者通过继承 Reducer 抽象类并重写其方法来定义自己的业务逻辑。

// ... existing code ...
@Checkpointable
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
// ... existing code ...

Reducer 类同样有四个泛型参数,定义了其输入和输出的键值类型:

  • KEYIN: 输入键的类型。这个类型必须与 Mapper 输出的键类型 KEYOUT 一致。
  • VALUEIN: 输入值的类型。这个类型必须与 Mapper 输出的值类型 VALUEOUT 一致。
  • KEYOUT: 最终输出结果的键类型。
  • VALUEOUT: 最终输出结果的值类型。

Javadoc 中提供了一个非常经典的 IntSumReducer 示例,用于对 Mapper 产生的数字进行求和。这在单词计数的场景中非常常见。

// ... existing code ...
 * <p>Example:</p>
 * <p><blockquote><pre>
 * public class IntSumReducer<Key> extends Reducer<Key,IntWritable,
 *                                                 Key,IntWritable>; {
 *   private IntWritable result = new IntWritable();
 * 
 *   public void reduce(Key key, Iterable<IntWritable> values,
 *                      Context context) throws IOException, InterruptedException {
 *     int sum = 0;
 *     for (IntWritable val : values) {
 *       sum += val.get();
 *     }
 *     result.set(sum);
 *     context.write(key, result);
 *   }
 * }
 * </pre></blockquote>
// ... existing code ...

在这个例子中:

  1. IntSumReducer 继承了 Reducer
  2. 输入是 <Key, IntWritable>,例如 <"hello", 1>
  3. 输出是 <Key, IntWritable>,例如 <"hello", 100>
  4. reduce 方法接收一个键(如 "hello")和与该键关联的所有值的集合(一个包含很多 1 的 Iterable)。它遍历这个集合,将所有值累加起来,最后通过 context.write() 输出最终的键和总和。

Reducer 的核心阶段

Reducer 的工作过程比 Mapper 要复杂一些,其 Javadoc 中明确地将其划分为三个主要阶段:Shuffle、Sort 和 Reduce。

a. Shuffle (混洗)
  • 作用: 这是 Reduce 任务的第一个阶段。在此阶段,Reducer 任务通过网络(HTTP)从所有已完成的 Mapper 任务中拉取(copy)属于自己的那部分中间输出数据。
  • 分区 (Partitioning)Mapper 的输出在发送前会经过 PartitionerPartitioner 会根据 key 计算出一个分区号,确保所有相同的 key 会被发送到同一个 Reducer 任务。这就是为什么 Reducer 能收到一个 key 对应的所有 values。
b. Sort (排序)
  • 作用: 当 Reducer 任务拉取数据时,框架会在后台对这些数据进行归并排序(merge sort)。这个排序是根据键 (KEYIN) 来进行的。
  • 分组 (Grouping): 排序的最终目的是为了分组。排序完成后,所有具有相同 key 的 value 自然地聚集在一起,形成一个 <key, (list of values)> 的结构,这正是 reduce 方法的输入形式。
  • Shuffle 和 Sort 的并发: Javadoc 中提到,这两个阶段是同时进行的。也就是说,框架一边从 Mapper 拉取数据,一边就在内存和磁盘上进行归并排序,以提高效率。
c. Reduce (规约)
  • 作用: 这是 Reducer 的核心阶段,也是用户定义业务逻辑的地方。
  • 调用: 当 Shuffle 和 Sort 阶段完成后,框架会开始遍历排好序的键。对于每一个唯一的键及其关联的值列表,框架会调用一次用户实现的 reduce 方法。
  • 输出: 在 reduce 方法中,用户通过 Context.write(Object, Object) 将最终结果写入到输出文件系统(如 HDFS)。注意:Reducer 的输出是不会被再次排序的。

核心方法与执行流程

Reducer 的生命周期由其 run 方法控制,这个方法定义了一个标准的执行模板。

// ... existing code ...
  /**
   * This method is called once for each key. Most applications will define
   * their reduce class by overriding this method. The default implementation
   * is an identity function.
   */
  @SuppressWarnings("unchecked")
  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }
// ... existing code ...
  /**
   * Advanced application writers can use the 
   * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
   * control how the reduce task works.
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKey()) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }
    } finally {
      cleanup(context);
    }
  }
}
  • setup(Context context): 在任务开始时调用一次,用于初始化操作。
  • reduce(KEYIN key, Iterable<VALUEIN> values, Context context):
    • 调用时机: 在 run 方法的 while 循环中,每当 context.nextKey() 成功移动到下一个唯一的 key 时,该方法被调用一次。
    • 参数key 是当前处理的键,values 是一个迭代器,包含了所有与该 key 关联的值。
    • 默认实现: 默认的 reduce 方法是一个恒等函数,它会遍历 values 迭代器,并将每个 <key, value> 对原样输出。
  • cleanup(Context context): 在任务结束时调用一次,用于资源清理。
  • run(Context context):
    • 执行流程:
      1. 调用 setup()
      2. 进入 try 块,循环调用 context.nextKey() 来遍历所有唯一的、排好序的键。
      3. 如果 nextKey() 返回 true,则调用 reduce() 方法,并传入当前的键 context.getCurrentKey() 和对应的值迭代器 context.getValues()
      4. 循环结束后(或发生异常),finally 块保证 cleanup() 方法一定会被执行。

Reducer.Context 的作用

Context 对象是 Reducer 与 Hadoop 框架沟通的桥梁,它提供了 Reducer 运行所需的所有上下文信息和功能。

  • 迭代控制context.nextKey() 用于驱动 run 方法的循环,移动到下一个键。
  • 获取输入context.getCurrentKey() 获取当前键,context.getValues() 获取与当前键关联的值的 Iterable
  • 输出结果context.write(KEYOUT key, VALUEOUT value) 将最终结果写入输出。
  • 其他功能: 与 Mapper.Context 类似,它也提供了获取配置 (getConfiguration)、获取计数器 (getCounter)、报告状态 (setStatus) 等功能。

二次排序 (Secondary Sort)

Javadoc 中还提到了一个高级用法:二次排序。 当我们需要对同一个 key 对应的 values 列表也进行排序时,就需要用到二次排序。 实现方式是:

  1. 自定义组合键: 创建一个包含主键和次键的自定义 WritableComparable 类。
  2. 自定义排序比较器Job.setSortComparatorClass()。让框架在排序阶段按照整个组合键(主键+次键)进行排序。
  3. 自定义分组比较器Job.setGroupingComparatorClass()。这个比较器告诉框架,哪些键应该被分到同一个 reduce 调用中。它只比较组合键中的主键部分。

这样,框架在排序时会考虑次键,但在调用 reduce 方法时,会将主键相同的所有记录(尽管次键不同)都分到同一次 reduce 调用中,而此时 values 迭代器中的值就是按照次键排好序的了。

Partitioner

Partitioner(分区器)是 MapReduce 流程中一个非常关键的组件。它的核心职责是在 Map 阶段结束、Reduce 阶段开始之前,决定 Mapper 输出的每一个键值对(key-value pair)应该被发送到哪一个 Reducer 任务去处理

Partitioner 是一个抽象类,我们通常必须通过继承它来创建自定义的分区逻辑。

我们来看一下它的定义:

// ... existing code ...
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class Partitioner<KEY, VALUE> {
  
  /** 
   * Get the partition number for a given key (hence record) given the total 
   * number of partitions i.e. number of reduce-tasks for the job.
   *   
   * <p>Typically a hash function on a all or a subset of the key.</p>
   *
   * @param key the key to be partioned.
   * @param value the entry value.
   * @param numPartitions the total number of partitions.
   * @return the partition number for the <code>key</code>.
   */
  public abstract int getPartition(KEY key, VALUE value, int numPartitions);
  
}

从代码中可以看出:

  1. 泛型: 它有两个泛型参数 <KEY, VALUE>,这对应了 Mapper 输出的键值类型。
  2. 核心抽象方法: 它只定义了一个核心的抽象方法 getPartition。任何子类都必须实现这个方法。

getPartition 方法详解

public abstract int getPartition(KEY key, VALUE value, int numPartitions);

  • keyMapper 输出的键。
  • valueMapper 输出的值。
  • numPartitions: 分区的总数,这个值等于你为这个 Job 设置的 Reducer 任务的数量 (job.setNumReduceTasks(int))。
  • 返回值: 一个整数,范围必须是 0 到 numPartitions - 1。这个返回值就是分区号,它直接决定了这条记录会被发送到哪个 Reducer(例如,返回 0 就发送给第一个 Reducer,返回 1 就发送给第二个,以此类推)。

注意: 正如 Javadoc 中提到的,只有当你设置的 Reducer 数量大于 1 时,Partitioner 才会被创建和使用。如果只有一个 Reducer 或者没有 Reducer,分区是没有意义的。

默认的 Partitioner: HashPartitioner

在你的工程中,如果你不通过 job.setPartitionerClass(...) 来指定一个自定义的 Partitioner,Hadoop 会使用默认的 HashPartitioner。从MapReduceTutorial.md 中也可以看到这一点。

HashPartitioner 的逻辑非常简单:

  1. 获取 key 的 hashCode()
  2. 用一个很大的正数(Integer.MAX_VALUE)进行按位与操作,确保结果为正。
  3. 用 numPartitions (Reducer 的数量) 取模。

// HashPartitioner 的核心逻辑伪代码
return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;

这种默认方式在大多数情况下能很好地工作,它可以相对均匀地将键分散到不同的 Reducer 中,实现负载均衡。

为什么要自定义 Partitioner?

既然有默认的实现,为什么我们还需要自定义呢?主要有以下几个原因:

  1. 数据倾斜 (Data Skew): 默认的 hashCode 方法可能无法均匀地分布你的特定数据集。例如,某些 key 的哈希值可能恰好都聚集在少数几个结果上,导致少数 Reducer 任务过重,而其他 Reducer 很空闲,拖慢整个作业的执行效率。通过自定义分区逻辑,可以根据数据特点进行更均匀的分配。

  2. 业务逻辑要求: 有些业务场景要求具有相同特征的 key 被发送到同一个 Reducer。例如,假设你正在处理订单数据,你可能希望所有来自同一个省份的订单都由同一个 Reducer 处理。这时,你可以自定义一个 Partitioner,它不根据整个 key(可能是订单 ID)来分区,而是根据 key 对象中的“省份”字段来分区。

如何实现自定义 Partitioner

TeraSort.java 提供了一个很好的例子:

// ... existing code ...
  /**
   * A total order partitioner that assigns keys based on their first 
   * PREFIX_LENGTH bytes, assuming a flat distribution.
   */
  public static class SimplePartitioner extends Partitioner<Text, Text>
      implements Configurable {
    int prefixesPerReduce;
    private static final int PREFIX_LENGTH = 3;
    private Configuration conf = null;
    public void setConf(Configuration conf) {
      this.conf = conf;
      prefixesPerReduce = (int) Math.ceil((1 << (8 * PREFIX_LENGTH)) / 
        (float) conf.getInt(MRJobConfig.NUM_REDUCES, 1));
    }
    
// ... existing code ...

这个 SimplePartitioner 继承了 Partitioner,并且还实现了 Configurable 接口。实现 Configurable 接口是为了让 Partitioner 能够获取到 Job 的 Configuration 对象,从而读取一些配置信息(比如这里的 NUM_REDUCES)。

高级 Partitioner: TotalOrderPartitioner

还存在一个更高级的实现:TotalOrderPartitioner。它用于实现全局排序。普通的 MapReduce 作业只能保证 Reducer 的输出在各自的文件内部是有序的,但 Reducer 0 的输出和 Reducer 1 的输出之间没有顺序关系。

TotalOrderPartitioner 通过读取一个预先生成的分区文件(包含了 key 的分割点),来保证所有被发送到 Reducer i 的 key 都小于被发送到 Reducer i+1 的 key。这样,当所有 Reducer 完成后,将它们的输出文件按顺序拼接起来,就得到了一个全局有序的大文件。

总结

  • Partitioner 是连接 Map 和 Reduce 阶段的桥梁,负责数据分发。
  • 它是一个抽象类,你必须继承它并实现 getPartition 方法来创建自定义分区器。
  • Hadoop 默认使用 HashPartitioner,它基于 key 的哈希值进行分区,适用于大多数场景。
  • 当需要解决数据倾斜或根据业务逻辑对数据进行分组时,就需要自定义 Partitioner
  • 更高级的 TotalOrderPartitioner 可以用来实现输出结果的全局排序。


网站公告

今日签到

点亮在社区的每一天
去签到