MapReduce工作机制&源码解析

发布于:2024-10-12 ⋅ 阅读:(7) ⋅ 点赞:(0)

1. MapTask工作机制

MapTask一共分为五个阶段:Read、Map、Collect、溢写、Merge阶段。
在这里插入图片描述
在第4步MrAppMaster启动之前都是job的提交流程,从MrAppMaster启动之后开启对应的MapTask才真正进入MapTask阶段。
这里由于是200M数据,块大小是128M,会开启两个MapTask。
在这里插入图片描述

  • Read 阶段
    默认用 TextInputFormat 去读取数据,TextInputFormat 会调用 RecorderReader,RecorderReader再调用 reader() 去读取数据,解析出一个个 key/value,其中K对应偏移量,v 对应一行内容。(如果采用其他的 InputFormat则读取逻辑不一样,比如CombineTextInputFormat是读取一个文件)读完之后返回给map。

  • Map 阶段
    执行用户自己写的 map() 函数逻辑,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。

  • Collect 阶段——分区、排序
    环形缓冲区一侧存数据,一侧存索引,默认100M,这里的所有数据会按分区的方式进行存储,到达80%(默认阈值)后进行反向溢写,溢写之前会进行快速排序。(先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。)

  • 溢写阶段
    环形缓冲区到达阈值后会进行溢写(将数据写到本地磁盘上),生成大量的溢写文件(临时文件)(分区且区内有序)。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。(如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。)

  • Merge 阶段——归并排序
    溢写完之后会对所有的临时文件进行归并排序并合并,以确保每个MapTask 最终只会生成一个数据文件。
    在这里插入图片描述

2. ReduceTask工作机制

ReduceTask一共分为三个阶段:Copy、Sort、Reduce阶段。
在这里插入图片描述
每个MapTask已经把数据持久化到磁盘后,等待Reduce端拉取。ReduceTask闪亮登场。

  • Copy 阶段
    ReduceTask 拉取自己指定分区的数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

  • Sort 阶段
    对拉取过来的数据进行归并排序。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

  • Reduce 阶段
    相同的key进入reduce()函数,reduce()函数将计算结果写到HDFS上。

3. ReduceTask并行度决定机制

前面提到,MapTask并行度由切片个数决定,切片个数由输入文件和切片规则决定。
思考:ReduceTask并行度由谁决定?
(1) ReduceTask=0,表示没有Reduce阶段,输出文件个数和Map个数一致。
(2) ReduceTask默认值就是1,所以输出文件个数为一个。
(3) 如果数据分布不均匀,就有可能在Reduce阶段产生数据倾斜。
(4) ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下需要计算全局汇总结果,就只能有1个ReduceTask。
(5) 具体多少个ReduceTask,需要根据集群性能而定。
(6) 如果分区数不是1,但是ReduceTask为1,是否执行分区过程。答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行。

4. MapTask & ReduceTask源码解析

MapTask源码解析流程

=================== MapTask ===================
context.write(k, NullWritable.get());   //自定义的map方法的写出,进入
output.write(key, value);  
	//MapTask727行,收集方法,进入两次 
collector.collect(key, value,partitioner.getPartition(key, value, partitions));
	HashPartitioner(); //默认分区器
collect()  //MapTask1082行 map端所有的kv全部写出后会走下面的close方法
	close() //MapTask732行
	collector.flush() // 溢出刷写方法,MapTask735行,提前打个断点,进入
sortAndSpill() //溢写排序,MapTask1505行,进入
	sorter.sort()   QuickSort //溢写排序方法,MapTask1625行,进入
mergeParts(); //合并文件,MapTask1527行,进入
	 
collector.close(); //MapTask739行,收集器关闭,即将进入ReduceTask

ReduceTask源码解析流程

=================== ReduceTask ===================
if (isMapOrReduce())  //reduceTask324行,提前打断点
initialize()   // reduceTask333行,进入
init(shuffleContext);  // reduceTask375行,走到这需要先给下面的打断点
        totalMaps = job.getNumMapTasks(); // ShuffleSchedulerImpl第120行,提前打断点
         merger = createMergeManager(context); //合并方法,Shuffle第80行
			// MergeManagerImpl第232 235行,提前打断点
			this.inMemoryMerger = createInMemoryMerger(); //内存合并
			this.onDiskMerger = new OnDiskMerger(this); //磁盘合并
rIter = shuffleConsumerPlugin.run();
		eventFetcher.start();  //开始抓取数据,Shuffle第107行,提前打断点
		eventFetcher.shutDown();  //抓取结束,Shuffle第141行,提前打断点
		copyPhase.complete();   //copy阶段完成,Shuffle第151行
		taskStatus.setPhase(TaskStatus.Phase.SORT);  //开始排序阶段,Shuffle第152行
	sortPhase.complete();   //排序阶段完成,即将进入reduce阶段 reduceTask382行
reduce();  //reduce阶段调用的就是我们自定义的reduce方法,会被调用多次
	cleanup(context); //reduce完成之前,会最后调用一次Reducer里面的cleanup方法


Over ~