mapreduce计算框架

发布于:2024-07-30 ⋅ 阅读:(62) ⋅ 点赞:(0)
  1. 简述什么是MapReduce ?

MapReduce 是一个分布式运算程序的编程框架,它的核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。
MapReduce的核心思想是将用户编写的逻辑代码和架构中的各个组件整合成一个分布式运算程序,实现 一定程序的并行处理海量数据,提高效率。
海量数据难以在单机上处理,而一旦将单机版程序扩展到集群上进行分布式运行势必将大大增加程序的 复杂程度。引入MapReduce架构,开发人员可以将精力集中于数据处理的核心业务逻辑上,而将分布式 程序中的公共功能封装成框架,以降低开发的难度。

一个完整的mapreduce程序有三类实例进程
MRAppMaster:负责整个程序的协调过程MapTask:负责map阶段的数据处理
ReduceTask:负责reduce阶段的数据处理

  1. 简述MapReduce优缺点 ?

优点
1)MapReduce 易于编程
它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的 PC 机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使 得 MapReduce 编程变得非常流行。
2)良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
3)高容错性
MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行, 不至于这个任务运行失败, 而且这个过程不需要人工参与,而完全是由 Hadoop 内部完成的。
4)适合 PB 级以上海量数据的离线处理可以实现上千台服务器集群并发工作,提供数据处理能力。

缺点
1) 不 擅 长 实 时 计 算MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。2)不擅长流式计算
流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为
MapReduce 自身的设计特点决定了数据源必须是静态的。
3)不擅长 DAG(有向无环图)计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不 是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘, 会造成大量的磁盘 IO,导致性能非常的低下。

  1. 简述MapReduce架构 ?

MapReduce包含四个组成部分,分别为Client、JobTracker、TaskTracker和Task,下面我们详细介绍这四个组成部分。
1)
每一个 Job 都会在用户端通过 Client 类将应用程序以及配置参数 Configuration 打包成 JAR 文件存储在HDFS,并把路径提交到 JobTracker 的 master 服务,然后由 master 创建每一个 Task(即 MapTask 和ReduceTask) 将它们分发到各个 TaskTracker 服务中去执行。
2)
JobTracke负责资源监控和作业调度。JobTracker 监控所有TaskTracker 与job的健康状况,一旦发现失败,就将相应的任务转移到其他节点;同时,JobTracker 会跟踪任务的执行进度、资源使用量等信息, 并将这些信息告诉任务调度器,而调度器会在资源出现空闲时,选择合适的任务使用这些资源。在Hadoop 中,任务调度器是一个可插拔的模块,用户可以根据自己的需要设计相应的调度器。
3)
TaskTracker 会周期性地通过Heartbeat 将本节点上资源的使用情况和任务的运行进度汇报给JobTracker, 同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)。TaskTracker 使用“slot”等量划分本节点上的资源量。“slot”代表计算资源(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,而Hadoop 调度器的作用就是将各个TaskTracker 上的空闲slot 分配给Task 使用。slot 分为Map slot 和Reduce slot 两种,分别供Map Task 和Reduce Task 使用。TaskTracker 通过slot 数目(可配置参数)限定Task 的并发度。
4)
Task 分为Map Task 和Reduce Task 两种,均由TaskTracker 启动。HDFS 以固定大小的block 为基本单位存储数据,而对于MapReduce 而言,其处理单位是split。
Map Task 执行过程如下图所示:由该图可知,Map Task 先将对应的split 迭代解析成一个个key/value 对,依次调用用户 自定义的map() 函数进行处理,最终将临时结果存放到本地磁盘上, 其中临时数据被分成若干个partition,每个partition 将被一个Reduce Task 处理

  1. 简述MapReduce工作原理 ?

1、提交作业
Client提交Job:
Client编写好Job后,调用Job实例的Sumit() 或者 waitForCompletion() 方法提交作业;
从RM(而不是Jobtracker)获取新的作业ID,在YARN命名法中它是一个Application ID(步骤2)。
Job提交到RM:
Client检查作业的输出说明,计算输入分片,并将作业资源(包括作业JAR、配置和分片信息)复 制到HDFS(步骤3);
调用RM的 submitApplication() 方法提交作业(步骤4)。
2、作业初始化
给作业分配ApplicationMaster:
RM收到调用它的 submitApplication() 消息后,便将请求传递给 scheduler (调度器);
scheduler分配一个 Container,然后 RM在该 NM的管理下在 Container中启动 ApplicationMaster(步骤5a & 5b)。
ApplicationMaster初始化作业:
MR作业的ApplicationMaster 是一个Java应用程序,它的主类是 MRAppMaster。它对作业进行初始化:通过创建多个薄记对象以保持对作业进度的跟踪,因为它将接受来自任务的进度和完成报告
(步骤6);
ApplicationMaster 从HDFS中获取 在Client 计算的输入分片(map、reduce任务数)(步骤7)【对每一个分片创建一个 map 任务对象以及由 mapreduce.job.reduces 属性确定的多个 reduce 任务对象】。
注意:

ApplicationMaster决定如何运行构成 MapReduce 作业的各个任务。如果作业很小,就选择在与它同一个
JVM上运行。
3、任务分配
ApplicationMaster 为该作业中的所有 map 任务和 reduce 任务向 RM 请求 Container (步骤8);
【随着心跳信息的请求包括每个map任务的数据本地化信息,特别是输入分片所在的主机和相应机 架信息。理想情况下,它将任务分配到数据本地化的节点,但如果不可能这么做,就会相对于本地 化的分配优先使用机架本地化的分配】
注意:
请求也为任务指定了内存需求。在默认情况下, map任务和reduce任务都分配到 1024MB 的内存,但这可以通过 mapreduce.map.memory.mb 和 mapreduce.reduce.memory.mb来设置。
4、任务执行
一旦 RM 的 scheduler 为任务分配了 Container, ApplicationMaster就通过与 NM通信来启动Container (步骤9a & 9b);
该任务由主类为 YardChild 的Java应用程序执行。在它运行任务之前,首先将任务需要的资源本地化(包括作业的配置、JAR文件和所有来自分布式缓存的文件)(步骤10);
最后,运行 map 任务或 reduce 任务(步骤11)。
5、进度和状态的更新
在YARN下运行,任务每 3s通过 umbilical 接口向 ApplicationMaster 汇报进度和状态(包括计数器),作为作业的汇聚试图(aggregate view)。
6、作业完成
除了向 ApplicationMaster 查询进度外,Client 每 5s还通过调用 Job 的 waitForCompletion() 来检查作业是否完成【查询的间隔可以通过 mapreduce.client.completion.pollinterval 属性进行设置】。
作业完成后, ApplicationMaster 和任务容器清理其工作状态, OutputCommitter 的作业清理方法会被调用。作业历史服务器保存作业的信息供用户需要时查询。

  1. 简述简述MapReduce工作原理(第2版) ?

Map端:

每个输入分片会让一个map任务来处理,默认情况下,以HDFS的一个块的大小(默认为64M)为一个分 片,当然我们也可以设置块的大小。map输出的结果会暂且放在一个环形内存缓冲区中(该缓冲区的大 小默认为100M,由io.sort.mb属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的80%,由
io.sort.spill.percent属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个 文件。
在写入磁盘之前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任 务对应一个分区的数据。这样做是为了避免有些reduce任务分配到大量数据,而有些reduce任务却分到 很少数据,甚至没有分到数据的尴尬局面。其实分区就是对数据进行hash的过程。然后对每个分区中的 数据进行排序,如果此时设置了Combiner,将排序后的结果进行Combia操作,这样做的目的是让尽可 能少的数据写入到磁盘。
当map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并。合并的过程中 会不断地进行排序和combia操作,目的有两个:1.尽量减少每次写入磁盘的数据量;2.尽量减少下一复 制阶段网络传输的数据量。最后合并成了一个已分区且已排序的文件。为了减少网络传输的数据量,这 里可以将数据压缩,只要将mapred.compress.map.out设置为true就可以了。
将分区中的数据拷贝给相对应的reduce任务。有人可能会问:分区中的数据怎么知道它对应的reduce是 哪个呢?其实map任务一直和其父TaskTracker保持联系,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中保存了整个集群中的宏观信息。只要reduce任务向JobTracker获取对应的map输出位置 就ok了哦。
到这里,map端就分析完了。那到底什么是Shuule呢?Shuule的中文意思是“洗牌”,如果我们这样看: 一个map产生的数据,结果通过hash过程分区却分配给了不同的reduce任务,是不是一个对数据洗牌的 过程呢?呵呵。

Reduce端:
Reduce会接收到不同map任务传来的数据,并且每个map传来的数据都是有序的。如果reduce端接受的 数据量相当小,则直接存储在内存中(缓冲区大小由mapred.job.shuule.input.buuer.percent属性控制, 表示用作此用途的堆空间的百分比),如果数据量超过了该缓冲区大小的一定比例(由
mapred.job.shuule.merge.percent决定),则对数据合并后溢写到磁盘中。
随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样做是为了给后面的合并节 省时间。其实不管在map端还是reduce端,MapReduce都是反复地执行排序,合并操作,现在终于明白 了有些人为什么会说:排序是hadoop的灵魂。
合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少, 并且最后一次合并的结果并没有写入磁盘,而是直接输入到reduce函数

  1. 简述MapReduce哪个阶段最费时间 ?

Shuule阶段

  1. 简述MapReduce中的Combine机制 ?

在MapReduce中,Combine 阶段是当Map阶段所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机 读取带来的开销。
Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出k-v应该跟reducer的输入 k-v类型要对应起来

  1. 简述MapReduce为什么一定要有环型缓冲区 ?

在MapReduce的流程中,环形缓冲区是在溢写到磁盘之前的操作

我们先来剖析下环形缓冲区

环形缓冲区分为三块,空闲区、数据区、索引区。初始位置取名叫做“赤道”,就是圆环上的白线那个位 置。初始状态的时候,数据和索引都为0,所有空间都是空闲状态。
tips:这里有一个调优参数,可以设置环形缓冲区的大小:
mapreduce.task.io.sort.mb,默认100M,可以稍微设置大一些,但不要太大,因为每个spilt就128M。

环形缓冲区写入的时候,有个细节:数据是从赤道的右边开始写入,索引(每次申请4kb)是从赤道是 左边开始写。这个设计很有意思,这样两个文件各是各的,互不干涉。
在数据和索引的大小到了mapreduce.map.sort.spill.percent参数设置的比例时(默认80%,这个是调优的 参数),会有两个动作:
1、对写入的数据进行原地排序,并把排序好的数据和索引spill到磁盘上去;
2、在空闲的20%区域中,重新算一个新的赤道,然后在新赤道的右边写入数据,左边写入索引;
3、当20%写满了,但是上一次80%的数据还没写到磁盘的时候,程序就会panding一下,等80%空间腾 出来之后再继续写。
如此循环往复,永不停歇,直到所有任务全部结束。整个操作都在内存,形状像一个环,所以才叫环形 缓冲区。

  1. 简述MR为什么要有环形缓冲区 ?

主要有以下几点
1、使用环形缓冲区,便于写入缓冲区和写出缓冲区同时进行。
2、为了防止阻塞,所以环型缓冲区不会等缓冲区满了再spill
3、每个Map任务不断地将键值对输出到在内存中构造的一个环形数据结构中,使用环形数据结构是为 了更有效地使用内存空间,在内存中放置尽可能多的数据。
4、环形缓冲区不需要重新申请新的内存,始终用的都是这个内存空间。大家知道MR是用java写的,而
Java有一个最讨厌的机制就是Full GC。Full GC总是会出来捣乱,这个bug也非常隐蔽,发现了也不好处理。环形缓冲区从头到尾都在用那一个内存,不断重复利用,因此完美的规避了Full GC导致的各种问题,同时也规避了频繁申请内存引发的其他问题。
5、环形缓冲区同时做了两件事情:1、排序;2、索引。在这里一次排序,将无序的数据变为有序,写 磁盘的时候顺序写,读数据的时候顺序读,效率高非常多!
在这里设置索引区也是为了能够持续的处理任务。每读取一段数据,就往索引文件里也写一段,这样在 排序的时候能加快速度。

  1. 简述MapReduce为什么一定要有ShuGle过程 ?

MapReduce计算模型一般包括两个重要的阶段:Map是映射,负责数据的过滤分发;Reduce是规约,负 责数据的计算归并。Reduce的数据来源于Map,Map的输出即是Reduce的输入,Reduce需要通过ShuGle 来获取数据。
从Map输出到Reduce输入的整个过程可以广义地称为Shuule。ShuGle横跨Map端和Reduce端,在Map端 包括Spill过程,在Reduce端包括copy和sort过程

  1. 简述MapReduce的ShuGle过程及其优化 ?

Shuule的本义是洗牌、混洗,把一组有一定规则的数据尽量转换成一组无规则的数据,越随机越好。
MapReduce中的Shuule更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数 据。
为什么MapReduce计算模型需要Shuule过程?我们都知道MapReduce计算模型一般包括两个重要的阶 段:Map是映射,负责数据的过滤分发;Reduce是规约,负责数据的计算归并。Reduce的数据来源于
Map,Map的输出即是Reduce的输入,Reduce需要通过Shuule来获取数据。
从Map输出到Reduce输入的整个过程可以广义地称为Shuule。Shuule横跨Map端和Reduce端,在Map端 包括Spill过程,在Reduce端包括copy和sort过程

【Spill过程】
Spill过程包括输出、排序、溢写、合并等步骤

【collect过程】
每个Map任务不断地以对的形式把数据输出到在内存中构造的一个环形数据结构中。使用环形数据结构 是为了更有效地使用内存空间,在内存中放置尽可能多的数据。
注意:关于环形缓冲区,也可以看看前面的题目解答
这个数据结构其实就是个字节数组,叫Kvbuuer, 名如其义,但是这里面不光放置了数据,还放置了一些索引数据,给放置索引数据的区域起了一个Kvmeta的别名,在Kvbuuer的一块区域上穿了一个
IntBuuer(字节序采用的是平台自身的字节序)的马甲。数据区域和索引数据区域在Kvbuuer中是相邻不 重叠的两个区域,用一个分界点来划分两者,分界点不是亘古不变的,而是每次 Spill之后都会更新一次。初始的分界点是0,数据的存储方向是向上增长,索引数据的存储方向是向下增长
Kvbuuer的存放指针bufindex是一直闷着头地向上增长,比如bufindex初始值为0,一个Int型的key写完之 后,bufindex增长为4,一个Int型的value写完之后,bufindex增长为8。

索引是对在kvbuuer中的索引,是个四元组,包括:value的起始位置、key的起始位置、partition值、
value的长度, 占用四个Int长度,Kvmeta的存放指针Kvindex每次都是向下跳四个“格子”,然后再向上一个格子一个格子地填充四元组的数据。比如 Kvindex初始位置是-4,当第一个写完之后,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、 (Kvindex+2)的位置存放partition的
值、(Kvindex+3)的位置存放value的长度,然后Kvindex跳到-8位置,等第二 个和索引写完之后,Kvindex 跳到-32位置。
Kvbuuer的大小虽然可以通过参数设置,但是总共就那么大,和索引不断地增加,加着加着,Kvbuuer总 有不够用的那天,那怎么办?把数据从内存刷到磁盘上再接着往内存写数据,把 Kvbuuer中的数据刷到磁盘上的过程就叫Spill,多么明了的叫法,内存中的数据满了就自动地spill到具有更大空间的磁盘。
关于Spill 触发的条件,也就是Kvbuuer用到什么程度开始Spill,还是要讲究一下的。如果把Kvbuuer用得死死得,一点缝都不剩的时候再开始 Spill,那Map任务就需要等Spill完成腾出空间之后才能继续写数
据;如果Kvbuuer只是满到一定程度,比如80%的时候就开始 Spill,那在Spill的同时,Map任务还能继续写数据,如果Spill够快,Map可能都不需要为空闲空间而发愁。两利相衡取其大,一般选择后 者。
Spill这个重要的过程是由Spill线程承担,Spill线程从Map任务接到“命令”之后就开始正式干活,干的活叫
SortAndSpill,原来不仅仅是Spill,在Spill之前还有个颇具争议性的Sort。

【sort过程】
先把Kvbuuer中的数据按照partition值和key两个关键字升序排序,移动的只是索引数据,排序结果是
Kvmeta中数据按照partition为单位聚集在一起,同一partition内的按照key有序。
Spill过程
Spill线程为这次Spill过程创建一个磁盘文件:从所有的本地目录中轮训查找能存储这么大空间的目录, 找到之后在其中创建一个类似于 “spill12.out”的文件。Spill线程根据排过序的Kvmeta挨个partition的把数据吐到这个文件中,一个partition对应的数据吐完之后顺序地吐下个partition,直到把所有的partition遍 历完。一个partition在文件中对应的数据也叫段(segment)。
所有的partition对应的数据都放在这个文件里,虽然是顺序存放的,但是怎么直接知道某个partition在这 个文件中存放的起始位置呢?强大的索引又出场了。有一个三元组记录某个partition对应的数据在这个 文件中的索引:起始位置、原始数据长度、压缩之后的数据长度,一个partition对应一个三元组。然后 把这些索引信息存放在内存中,如果内存中放不下了,后续的索引信息就需要写到磁盘文件中了:从所 有的本地目录中轮训查找能存储这么大空间的目录,找到之后在其中创建一个类似于“spill12.out.index” 的文件,文件中不光存储了索引数据,还存储了crc32的校验数据。 (spill12.out.index不一定在磁盘上创建,如果内存(默认1M空间)中能放得下就放在内存中,即使在磁盘上创建了,和 spill12.out文件也不一定在同一个目录下。)
每一次Spill过程就会最少生成一个out文件,有时还会生成index文件,Spill的次数也烙印在文件名中。

在Spill线程如火如荼的进行SortAndSpill工作的同时,Map任务不会因此而停歇,而是一无既往地进行着 数据输出。Map还是把数据写到kvbuuer中,那问题就来了:只顾着闷头按照bufindex指针向上增长,
kvmeta只顾着按照Kvindex向下增长,是保持指针起始位置不变继续跑呢,还是另谋它路?如果保持指 针起始位置不变,很快bufindex和Kvindex就碰头了,碰头之后再重新开始或者移动内存都比较麻烦,不 可取。Map取 kvbuuer中剩余空间的中间位置,用这个位置设置为新的分界点,bufindex指针移动到这个分界点,Kvindex移动到这个分界点的-16位置,然后两者就可以和谐地按照自己既定的轨迹放置数据
了,当Spill完成,空间腾出之后,不需要做任何改动继续前进。分界点的转换如下图所示:
Map任务总要把输出的数据写到磁盘上,即使输出数据量很小在内存中全部能装得下,在最后也会把数 据刷到磁盘上。

【merge过程】
Map任务如果输出数据量很大,可能会进行好几次Spill,out文件和Index文件会产生很多,分布在不同 的磁盘上。最后把这些文件进行合并的merge过程闪亮登场。

Merge过程怎么知道产生的Spill文件都在哪了呢?从所有的本地目录上扫描得到产生的Spill文件,然后把 路径存储在一个数组里。Merge过程又怎么知道Spill的索引信息呢?没错,也是从所有的本地目录上扫 描得到Index文件,然后把索引信息存储在一个列表里。到这里,又遇到了一个值得纳闷的地方。 在之前Spill过程中的时候为什么不直接把这些信息存储在内存中呢,何必又多了这步扫描的操作?特别是
Spill的索引数据,之前当内存超限之后就把数 据写到磁盘,现在又要从磁盘把这些数据读出来,还是需要装到更多的内存中。之所以多此一举,是因为这时kvbuuer这个内存大户已经不再使用可以回 收,有内存空间来装这些数据了。(对于内存空间较大的土豪来说,用内存来省却这两个io步骤还是值得考虑 的。)

然后为merge过程创建一个叫file.out的文件和一个叫file.out.Index的文件用来存储最终的输出和索引。
一 个partition一个partition的进行合并输出。对于某个partition来说,从索引列表中查询这个partition对应的所有索引信 息,每个对应一个段插入到段列表中。也就是这个partition对应一个段列表,记录所有的Spill文件中对应的这个partition那段数据的文 件名、起始位置、长度等等。
然后对这个partition对应的所有的segment进行合并,目标是合并成一个segment。当这个 partition对应很多个segment时,会分批地进行合并:先从segment列表中把第一批取出来,以key为关键字放置成最 小堆,然后从最小 堆中每次取出最小的输出到一个临时文件中,这样就把这一批段合并成一个临时的段,把它加回到segment列表中;再从segment列表中把第二批取出 来合并输出到一个临时segment,把其加入到列表中;这样往复执行,直到剩下的段是一批,输出到最终的文件中。
最终的索引数据仍然输出到Index文件中。
Map端的Shuule过程到此结束。

2、copy过程
Reduce任务通过HTTP向各个Map任务拖取它所需要的数据。每个节点都会启动一个常驻的HTTP server, 其中一项服务就是响应Reduce拖取Map数据。当有MapOutput的HTTP请求过来的时候,HTTP server就读取相应的Map输出文件中对应这个Reduce部分的数据通过网络流输出给Reduce。
Reduce任务拖取某个Map对应的数据,如果在内存中能放得下这次数据的话就直接把数据写到内存中。Reduce要向每个Map去拖取数据,在内存中每个Map对应一块数据,当内存中存储的Map数据占用空间 达到一定程度的时候,开始启动内存中merge,把内存中的数据merge输出到磁盘上一个文件中。
如果在内存中不能放得下这个Map的数据的话,直接把Map数据写到磁盘上,在本地目录创建一个文 件,从HTTP流中读取数据然后写到磁盘,使用的缓存区大小是64K。拖一个Map数据过来就会创建一个 文件,当文件数量达到一定阈值时,开始启动磁盘文件merge,把这些文件合并输出到一个文件。
有些Map的数据较小是可以放在内存中的,有些Map的数据较大需要放在磁盘上,这样最后Reduce任务 拖过来的数据有些放在内存中了有些放在磁盘上,最后会对这些来一个全局合并。

3、merge sort过程

这里使用的Merge和Map端使用的Merge过程一样。Map的输出数据已经是有序的,Merge进行一次合并排 序,所谓Reduce端的sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个 阶段是重叠而不是完全分开的。
Reduce端的Shuule过程至此结束。
MapReduce ShuGle后续优化方向
压缩:对数据进行压缩,减少写读数据量;
减少不必要的排序:并不是所有类型的Reduce需要的数据都是需要排序的,排序这个nb的过程如 果不需要最好还是不要的好;
内存化:Shuule的数据不放在磁盘而是尽量放在内存中,除非逼不得已往磁盘上放;当然了如果有 性能和内存相当的第三方存储系统,那放在第三方存储系统上也是很好的;这个是个大招;
网络框架:netty的性能据说要占优了;
本节点上的数据不走网络框架:对于本节点上的Map输出,Reduce直接去读吧,不需要绕道网络框 架。

  1. 简述Reduce怎么知道去哪里拉Map结果集 ?

每个节点都会启动一个常驻的HTTP server,其中一项服务就是响应Reduce拖取Map数据。Reduce任务通过HTTP向各个Map任务拖取它所需要的数据。当有MapOutput的HTTP请求过来的时候,HTTP server就读取相应的Map输出文件中对应这个Reduce部分的数据通过网络流输出给Reduce

  1. 简述Reduce阶段都发生了什么,有没有进行分组 ?

1) Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一 定阈值,则写到磁盘上,否则直接放到内存中。
2) Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合 并,以防止内存使用过多或磁盘上文件过多。
3) Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了 将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理 结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
4) Reduce阶段:reduce()函数将计算结果写到HDFS上。

在ReduceTask的Shuule阶段,会获取到比较器,将从Map获取到的数据中key相同的分为同一组
1 public RawComparator getOutputValueGroupingComparator() {
2 // 从配置中读取mapreduce.job.output.group.comparator.class,
3 Class theClass = getClass(JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
4 //如果没有设置,默认使用MapTask对key排序时,key的比较器
5 if (theClass == null) {
6 return getOutputKeyComparator();
7 }
8 // 否则用户设置了,就使用用户自定义的比较器
9 return ReflectionUtils.newInstance(theClass, this); 10 }

  1. 简述MapReduce ShuGle的排序算法 ?

1、排序的简单介绍
排序是MapReduce框架中最重要的操作之一。
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中 的数据均会被排序,而不管逻辑上是否需要。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
对于MapTask,它会将处理的结果暂时放在环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对 缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁 盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写 磁盘上,否则存储在内存中,如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大 文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数 据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
2、排序的分类
部分排序
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。全排序
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处 理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行框架。
辅助排序:(GroupingComparator分组)
在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个活几个字段相同(全部 字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
二次排序
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
3、自定义排序

原理分析
bean对象做为key传输,需要实现 WritableComparable 接口重写compareTo方法,就可以实现排序

  1. 简述shuGle为什么要排序 ?

shuule排序,按字典顺序排序的,目的是把相同的的key可以提前一步放到一起。
sort是用来shuule的,shuule就是把key相同的东西弄一起去,其实不一定要sort也能shuule,那为什么要
sort排序呢?
sort是为了通过外排(外部排序)降低内存的使用量:因为reduce阶段需要分组,将key相同的放在一起进 行规约,使用了两种算法:hashmap和sort,如果在reduce阶段sort排序(内部排序),太消耗内存,而
map阶段的输出是要溢写到磁盘的,在磁盘中外排可以对任意数据量分组(只要磁盘够大),所以,map
端排序(shuule阶段),是为了减轻reduce端排序的压力

  1. 简述map是怎么到reduce的 ?

Reduce的数据来源于Map,Map的输出即是Reduce的输入,Reduce需要通过Shuule来获取数据。
从Map输出到Reduce输入的整个过程可以广义地称为Shuule。Shuule横跨Map端和Reduce端,在Map端 包括Spill过程,在Reduce端包括copy和sort过程

  1. 简述你了解的用哪几种shuGle机制 ?

Map端:Spill机制,Reduce端:copy和sort机制

  1. 简述MapReduce的数据处理过程 ?

1、任务切分:对文件进行逻辑切片,切片按照范围划分,默认128M一片。
一个文件至少有一个切片,每个切片运行一个maptask,如果文件超过128M,同一个输入文件会有多个
maptask运行;为减少资源浪费,如果最后一个切片大小小于1.1*128M,将不会被切分处理。
2、输入对象:FileInputFormat.setInputPaths()方法,指定数据输入路径;输入目录中可以有单个或多个 文件。
读取数据、生成K-V对:由继承RecordReader的LineRecordReader类中的readLine()方法从输入的切片中读 取数据;每读取一行执行一次,生成一组K-V。
3、map()方法:以单词统计为例,自定义的WordCountMapper类继承父类Mapper,接收K-V对,重写
map()方法的业务逻辑。
map()的业务逻辑中,对数据进行切分,遍历数组,生成新的K-V对;由context.write(nk,nv)方法输出新的 K-V。
map()方法执行时机:一对K-V执行一次。
4、map()的输出:context.write()被调用时,OutputController组件会将新的K-V输出到数组缓存区,写入 数组缓存区中的还有新K-V的元数据;
5、KV分区:
MapOutputBuuer类调用collect(nk,nv,partition)方法接收新K-V;partition调用HashPartitioner组件; HashPartitioner.getPartition(nk,nv){
nk.hashcode%numberReduceTasks;
}
获得分区,得到区号,返回给partition。
6、区内排序:①按照分区排序;②区内数据再按照K进行排序。
7、溢出:spiller,当缓冲区中的数据到达80%时,进行分区、排序,将数据溢出,当前处于阻塞状态, 防止写入数据。(根据数据量大小溢出,至少一次)
8、归并、区内排序:将数组缓冲区中分区排序完的数据,用Merger组件进行归并,写入磁盘;同时进 行区内排序。
9、局部聚合:调用Combiner组件,根据相同K进行数据聚合。
10、写入本地磁盘:将归并排序完成的数据写入本地磁盘,此阶段提供http下载,便于数据传输。
11、拉取数据:reducetask分别拉取属于自己的数据(本地–>分区经由网络传输)
12、归并排序:调用Merger组件,按照K进行排序。
13、分组:调用GroupingComparator组件中的CompareTo(preK,postK)方法,将相同K的分到一组,放入 同一迭代器中。
14、聚合:reduce(K,iterator<>,context)方法中将相同K的数据进行聚合操作,聚合一次得到一组K-V。
15、输出:由TextOutputFormat的write方法,写出到HDFS(或本地磁盘)

  1. 简述map join的原理(实现)?应用场景 ?

map join流程
Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多 份,让每个map task内存中存在一份
(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。
MapJoin简单说就是在Map阶段将小表读入内存,顺序扫描大表完成Join。减少昂贵的shuGle操作及
reduce操作
MapJoin分为两个阶段:
通过MapReduce Local Task,将小表读入内存,生成HashTableFiles上传至Distributed Cache中,这里会HashTableFiles进行压缩。
MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中,顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务。
使用场景
MapJoin通常用于一个很小的表和一个大表进行join的场景,具体小表有多小,由参数
hive.mapjoin.smalltable.filesize来决定,该参数表示小表的总大小,默认值为25000000字节,即25M。
Hive0.7之前,需要使用hint提示 /*+ mapjoin(table) */才会执行MapJoin,否则执行Common Join,但在0.7
版本之后,默认自动会转换Map Join,由参数hive.auto.convert.join来控制,默认为true.
假设a表为一张大表,b为小表,并且hive.auto.convert.join=true,那么Hive在执行时候会自动转化为
MapJoin。
应用场景
Map Join 实现方式一:分布式缓存
使用场景:一张表十分小、一张表很大。用法:
在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表 进行join (比如放到Hash Map等等容器中)。然后扫描大表,看大表中的每条记录的join key /value值是否能够在内存中找到相同join key的记录,如果有则直接输出结果。

DistributedCache是分布式缓存的一种实现,它在整个MapReduce框架中起着相当重要的作用,他可以支 撑我们写一些相当复杂高效的分布式程序。说回到这里,JobTracker在作业启动之前会获取到
DistributedCache的资源uri列表,并将对应的文件分发到各个涉及到该作业的任务的TaskTracker上。另 外,关于DistributedCache和作业的关系,比如权限、存储路径区分、public和private等属性

  1. 简述reduce join如何执行(原理) ?

reduce side join是一种最简单的join方式,其主要思想如下:
在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数 据打一个标签> (tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。> 在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list,
然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接 操作。

  1. 简述MapReduce为什么不能产生过多小文件 ?

默认情况下,TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会单独交给一个 MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
MapReduce大量小文件的优化策略:
最优方案:在数据处理的最前端(预处理、采集),就将小文件合并成大文件,在上传到HDFS做后续 的分析
补救措施:如果HDFS中已经存在大量的小文件了,可以使用另一种Inputformat来做切片
(CombineFileInputformat),它的切片逻辑跟FileInputformat不同,它可以将多个小文件从逻辑上规划 到一个切片中,这样,多个小文件就可以交给一个 MapTask 处理。

  1. 简述MapReduce分区及作用 ?

1、默认分区
系统自动调用HashPartitioner类进行分区,默认分区是根据key的hashCode对ReduceTasks个数取模得到 的。用户没法控制哪个key存储到哪个分区。

1 public class HashPartitioner extends Partitioner {
2 public int getPartition(K key, V value, int numReduceTasks) {
3 return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; 4 }
5 }

2、自定义分区
(1)自定义类继承Partitioner,重写getPartition()方法
1 public class CustomPartitioner extends Partitioner {
2 @Override
3 public int getPartition(Text key, FlowBean value, int numPartitions) {
4 // 控制分区代码逻辑
5 … …
6 return partition; 7 }
8 }

(2)在Job驱动中,设置自定义Partitioner
job.setPartitionerClass(CustomPartitioner.class);
(3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
job.setNumReduceTasks(num);

3、全局排序
全局排序是通过将进入map端之前的数据进行随机采样,在采取的样本中设置分割点,通过分割点将数 据进行分区,将设置的分割点保存在二叉树中,Map Task每输出一个数据就会去查找其对应的区间,以此来达到分区效果。
作用
根据业务实际需求将统计结果按照条件产生多个输出文件(分区) 多个reduce任务运行,提高整体job的运行效率

  1. 简述ReduceTask数量和分区数量关系 ?

1)如果ReduceTask的数量 > getPartition的结果数(ReduceTask > 分区数量),则会多产生几个空的输出文件part-r-000xx;
2)如果1 3)如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件part-r-00000。
案例分析

例如:假设自定义分区数为5,则

1)job.setNumReduceTasks(1); => 会正常运行,只不过会产生一个输出文件
2)job.setNumReduceTasks(2); => 会报错
3)job.setNumReduceTasks(6); => 大于5,程序会正常运行,会产生空文件

  1. 简述Map的分片有多大 ?

Hadoop中在 在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组。
源码中切片大小:
Math.max(minSize, Math.min(maxSize, blockSize));
mapreduce.input.fileinputformat.split.minsize=1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue
因此,默认情况下,切片大小=blocksize。
Hadoop 2.7.2版本及之前默认64MB,Hadoop 2.7.3版本及之后默认128M,可以在hdfs-site.xml中设置
dfs.block.size,注意单位是byte。
分片大小范围可以在mapred-site.xml中设置,mapred.min.split.size和mapred.max.split.size,
minSplitSize大小默认为1B,maxSplitSize大小默认为Long.MAX_VALUE = 9223372036854775807
切片大小设置:
maxsize(切片最大值):参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数 的值。
minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blockSize还大

  1. 简述MapReduce join两个表的流程 ?

1、map side join
之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuule阶段要进行大量的数据传输。

Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份
(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。
为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如 果是HDFS上的文件,可以这样:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的
NameNode端口号)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个
TaskTracker的本地磁盘上。
(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取 相应的文件。
2、SemiJoin
SemiJoin,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side
join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join
操作的数据,则可以大大节省网络IO。
实现方法很简单:选取一个小表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,File3 文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker上, 然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reduce side join相同。
3、reduce side join + BloomFilter
在某些情况下,SemiJoin抽取出来的小表的key集合在内存中仍然存放不下,这时候可以使用BloomFiler 以节省空间。
BloomFilter最常见的作用是:判断某个元素是否在一个集合里面。它最重要的两个方法是:add() 和
contains()。最大的特点是不会存在false negative,即:如果contains()返回false,则该元素一定不在集合中,但会存在一定的true negative,即:如果contains()返回true,则该元素可能在集合中。
因而可将小表中的key保存到BloomFilter中,在map阶段过滤大表,可能有一些不在小表中的记录没有过 滤掉(但是在小表中的记录一定不会过滤掉),这没关系,只不过增加了少量的网络IO而已。

  1. 简述reduce任务什么时候开始 ?

只要有map任务完成,就可以开始reduce任务

  1. 简述MapReduce的reduce使用的是什么排序 ?

对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对 缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁 盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写 磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大 文件;如果内存中文件大小或者 数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序

  1. 简述MapReduce怎么确定MapTask的数量 ?

MapTask数量影响因素
影响map个数(split个数)的主要因素有:
文件的大小。当块(dfs.block.size)为128m时,如果输入文件为128m,会被划分为1个split;当块为
256m,会被划分为2个split。
文件的个数。FileInputFormat按照文件分割split,并且只会分割大文件,即那些大小超过HDFS块的大小 的文件。如果HDFS中dfs.block.size设置为128m,而输入的目录中文件有100个,则划分后的split个数至 少为100个。
splitSize的大小。分片是按照splitszie的大小进行分割的,一个split的大小在没有设置的情况下,默认等 于hdfs block的大小。已知以下参数
1 input_file_num : 输入文件的个数
2 block_size : hdfs的文件块大小,2.7.3默认为128M,可以通过hdfs-site.xml中的dfs.block.size
参数进行设置
3 total_size : 输入文件整体的大小,由框架循环叠加计算
MapTask的数量计算原则为:
(1)默认map个数
如果不进行任何设置,默认的map个数是和blcok_size相关的
default_num = total_size / block_size;
(2)自定义设置分片的minSize、maxSize
如果在MapReduce的驱动程序(main方法)中通过以下方法设置了分片的最小或最大的大小
1 //设置最小分片大小,单位byte
2 FileInputFormat.setMinInputSplitSize(job,1024102410L); //10MB
3 //设置最大分片大小,单位byte
4 FileInputFormat.setMaxInputSplitSize(job,1024L); //1KB
应用程序可以通过数据分片的最大和最小大小两个参数来对splitsize进行调节,则计算方式就变成了
splitSize=Math.max(minSize, Math.min(maxSize, blockSize)
其中maxSize即方法 setMaxInputSplitSize 设置的值,minSize即方法·setMinInputSplitSize·设置的值。
其设置原则就是
1 要增加map的个数,调整maxSize 2 要减小map的个数,调整minSize>blockSize。
总结:默认block为128M,当输入文件大于128M时,则会进行分块,分块后剩余文件大小大于128*1.1 时,则会继续分块,否则不再分块,有多少块,就有多少MapTask

  1. 简述Map数量由什么决定 ?

影响map个数(split个数)的主要因素有:
文件的大小。当块(dfs.block.size)为128m时,如果输入文件为128m,会被划分为1个split;当块为
256m,会被划分为2个split。
文件的个数。FileInputFormat按照文件分割split,并且只会分割大文件,即那些大小超过HDFS块的大小 的文件。如果HDFS中dfs.block.size设置为128m,而输入的目录中文件有100个,则划分后的split个数至 少为100个。
splitSize的大小。分片是按照splitszie的大小进行分割的,一个split的大小在没有设置的情况下,默认等 于hdfs block的大小。已知以下参数
1 input_file_num : 输入文件的个数
2 block_size : hdfs的文件块大小,2.7.3默认为128M,可以通过hdfs-site.xml中的dfs.block.size
参数进行设置
3 total_size : 输入文件整体的大小,由框架循环叠加计算

  1. MapReduce的map进程和reducer进程的jvm垃圾回收器怎么选择可以提高吞吐量?

开启JVM重用
属性:mapred.job.reuse.jvm.num.tasks,默认值是1,在一个taskTracker上对于给定的作业的每个jvm上 可以运行任务最大数。-1表示无限制,即同一个jvm可以被该作业的所有任务使用
1 conf.set(“mapreduce.job.jvm.numtasks”, “-1”); //开启jvm重用
2 job.getConfiguration().setInt(job.JVM_NUMTASKS_TORUN, -1); //开启jvm重用

  1. 简述MapReduce的task数目划分 ?

在MapReduce当中,每个mapTask处理一个切片split的数据量,要注意切片与block块的概念很像,但是
block块是HDFS当中存储数据的单位,切片split是MapReduce当中每个MapTask处理数据量的单位。
在介绍map task的数量及切片机制之前先了解这两个概念:
block块(数据块,物理划分)

block是HDFS中的基本存储单位,hadoop1.x默认大小为64M,而hadoop2.x默认块大小为128M。文件上 传到HDFS,就要划分数据成块,这里的划分属于物理的划分(实现机制也就是设置一个read方法,每次 限制最多读128M的数据后调用write进行写入到hdfs),块的大小可通过 dfs.block.size配置。block采用冗余机制保证数据的安全:默认为3份,可通过dfs.replication配置。
注意:当更改块大小的配置后,新上传的文件的块大小为新配置的值,以前上传的文件的块大小为以前 的配置值。
split分片(数据分片,逻辑划分)
Hadoop中split划分属于逻辑上的划分,目的只是为了让map task更好地获取数据。split是通过hadoop中的 InputFormat接口中的getSplits()方法得到的。数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。
综上所述
Split切片数就是我们的MapTask数量,切片的大小是可以自行设置的

2、如何控制mapTask的个数
MapTask数量设置不当带来的问题:
Map Task数量过多的话,会产生大量的小文件, 过多的Mapper创建和初始化都会消耗大量的硬件资源 。
Map Task数量过少,就会导致并发度过小,Job执行时间过长,无法充分利用分布式硬件资源。
那么,如果需要控制maptask的个数,我们只需要调整maxSize和minsize这两个值,那么切片的大小就会 改变,切片大小改变之后,mapTask的个数就会改变,我们可以在MapReduce的驱动程序(main方法)中 通过以下方法设置了分片的最小或最大的大小
1 //设置最小分片大小,单位byte
2 FileInputFormat.setMinInputSplitSize(job,1024102410L); //10MB
3 //设置最大分片大小,单位byte
4 FileInputFormat.setMaxInputSplitSize(job,1024L); //1KB
设置原则如下
1 要增加map的个数,调整maxSize 2 要减小map的个数,调整minSize>blockSize

三、ReduceTask数量的确定
Reduce任务是一个数据聚合的步骤,数量默认为1。使用过多的Reduce任务则意味着复杂的shuule,并使输出文件数量激增。而reduce的个数设置相比map的个数设置就要简单的多

  1. MapReduce作业执行的过程中,中间的数据会存在什么地方?不会存在内存中么 ?

不会存在内存,存在本地磁盘

  1. Mapper端进行combiner之后,除了速度会提升,那从Mapper端到Reduece 端的数据量会怎么变 ?

数据量会减少,因为combiner之后,会将相同的key进行一次聚合,数据量会在这时候减少一部分

  1. 简述map输出的数据如何超出它的小文件内存之后,是落地到磁盘还是落地到 HDFS中 ?

数据会落地到磁盘中,因为map和reduce操作,就是一次次的I/O请求

  1. 简述Map到Reduce默认的分区机制是什么 ?

默认分区是根据key的hashCode对ReduceTask个数取模得到的。用户没法控制哪个key存储到哪个分区

  1. 简述结合wordcount述说MapReduce,具体各个流程,map怎么做,reduce怎么做 ?

MapReduce各个阶段做了什么
spliting :Documents会根据切割规则被切成若干块,
map阶段:然后进行Map过程,Map会并行读取文本,对读取的单词进行单词分割,并且每个词以键值 对形式生成。
1 例如:读取到”Hello World Hello Java“,分割单词形成Map
2

combine阶段:接下来Combine(该阶段是可以选择的,Combine其实也是一种reduce)会对每个片相同 的词进行统计。
shuule阶段:将Map输出作为reduce的输入的过程就是shuule,次阶段是最耗时间,也是重点需要优化的阶 段。shuule阶段会对数据进行拉取,对最后得到单词进行统计,每个单词的位置会根据Hash来确定所在 的位置,
reduce阶段:对数据做最后的汇总,最后结果是存储在hdfs上。

  1. 简述MapReduce数据倾斜产生的原因及其解决方案 ?

1、数据倾斜现象
数据倾斜就是数据的key的分化严重不均,造成一部分数据很多,一部分数据很少的局面。 数据频率倾斜——某一个区域的数据量要远远大于其他区域。
数据大小倾斜——部分记录的大小远远大于平均值。
2、数据倾斜产生的原因
(1)Hadoop框架的特性
Job数多的作业运行效率会相对比较低;
countdistinct、group by、join等操作,触发了Shuule动作,导致全部相同key的值聚集在一个或几个节点上,很容易发生单点问题。
(2)具体原因
key 分布不均匀,某一个key的条数比其他key多太多; 业务数据自带的特性;
建表时考虑不全面;
可能某些 HQL 语句自身就存在数据倾斜问题。
3、数据倾斜解决方案
从业务和数据方面解决数据倾斜有损的方法:找到异常数据。无损的方法:
对分布不均匀的数据,进行单独计算,首先对key做一层hash,把数据打散,让它的并行度变大,之后进行汇集
数据预处理
Hadoop平台的解决方法 1)针对join产生的数据倾斜
场景一:大表和小表join产生的数据倾斜
① 在多表关联情况下,将小表(关联键记录少的表)依次放到前面,这样能够触发reduce端减少操作次数,从而减少运行时间。
② 同时使用Map Join让小表缓存到内存。在map端完成join过程,这样就能省掉redcue端的工作。需要注意:这一功能使用时,需要开启map-side join的设置属性:set hive.auto.convert.join=true(默认是false)
③ 还可以对使用这个优化的小表的大小进行设置:
set hive.mapjoin.smalltable.filesize=25000000(默认值25M)
场景二:大表和大表的join产生的数据倾斜
① 将异常值赋一个随机值,以此来分散key,均匀分配给多个reduce去执行
② 如果key值都是有效值的情况下,需要设置以下几个参数来解决
set hive.exec.reducers.bytes.per.reducer = 1000000000

也就是每个节点的reduce,其 默认是处理数据地大小为1G,如果join 操作也产生了数据倾斜,那么就在
hive 中设定
1 set hive.optimize.skewjoin = true;
2 set hive.skewjoin.key = skew_key_threshold (default = 100000)

2) group by 造成的数据倾斜
解决方式相对简单:
1 hive.map.aggr=true (默认true) 这个配置项代表是否在map端进行聚合,相当于Combiner
2 hive.groupby.skewindata

3) count(distinct)或者其他参数不当造成的数据倾斜
① reduce个数太少
set mapred.reduce.tasks=800
② HiveQL中包含count(distinct)时
使用sum…group byl来替代。例如select a,sum(1) from (select a, b from t group by a,b) group by a;

  1. 简述Map Join为什么能解决数据倾斜 ?

Map Join概念:将其中做连接的小表(全量数据)分发到所有 MapTask 端进行 Join,从而避免了reduceTask,前提要求是内存足以装下该全量数据。
Map Join通常用于一个很小的表和一个大表进行join的场景,具体小表有多小,由参数
hive.mapjoin.smalltable.filesize来决定,该参数表示小表的总大小,默认值为25000000字节,即25M。 一般默认就够了,无须修改。
1)在多表关联情况下,将小表(关联键记录少的表)依次放到前面,这样能够触发reduce端减少操作次 数,从而减少运行时间。
2)同时使用Map Join让小表缓存到内存。在map端完成join过程,这样就能省掉redcue端的工作。需要注意:这一功能使用时,需要开启map-side join的设置属性:set hive.auto.convert.join=true(默认是false)
注意: 大表放硬盘,小表放内存( 前提要求是内存足以装下该全量数据 )

  1. 简述MapReduce运行过程中会发生OOM,OOM发生的位置 ?

背景:Hive中跑MapReduce Job出现OOM问题

该异常发生既不是map阶段,也不是reduce阶段,发现不是执行过程,而是driver提交job阶段就OOM
1 Hive中XMLEncoder序列化MapredWork引发OutOfMemoryError
2 XMLEncoder导致java.lang.OutOfMemoryError: GC overhead limit exceeded

先来看下,Hive中出现OOM的异常原因大致分为以下几种:
Map 阶 段 OOM Reduce阶段OOM
Driver提交Job阶段OOM
1、Map阶段OOM
发生OOM的几率很小,除非你程序的逻辑不正常,亦或是程序写的不高效,产生垃圾太多。
2、Reduce阶段OOM

1) data skew 数据倾斜
data skew是引发这个的一个原因。
key分布不均匀,导致某一个reduce所处理的数据超过预期,导致jvm频繁GC。
2) value对象过多或者过大
某个reduce中的value堆积的对象过多,导致jvm频繁GC。
解决方案:
① 增加reduce个数,set mapred.reduce.tasks=300,。
② 在hive-site.xml中设置,或者在hive shell里设置 set mapred.child.java.opts = -Xmx512m
或者只设置reduce的最大heap为2G,并设置垃圾回收器的类型为并行标记回收器,这样可以显著减少GC 停顿,但是稍微耗费CPU。
set mapred.reduce.child.java.opts=-Xmx2g -XX:+UseConcMarkSweepGC;
③ 使用map join 代替 common join. 可以set hive.auto.convert.join = true
④ 设置 hive.optimize.skewjoin = true 来解决数据倾斜问题
3、Driver提交Job阶段OOM
job产生的执行计划的条目太多,比如扫描的分区过多,上到4k-6k个分区的时候,并且是好几张表的分 区都很多时,这时做join。
究其原因,是因为序列化时,会将这些分区,即hdfs文件路径,封装为Path对象,这样,如果对象太多 了,而且Driver启动的时候设置的heap size太小,则会导致在Driver内序列化这些MapRedWork时,生成的对象太多,导致频繁GC,则会引发如下异常:
1 java.lang.OutOfMemoryError: GC overhead limit exceeded
2 at sun.nio.cs.UTF_8.newEncoder(UTF_8.java:53)
3 at java.beans.XMLEncoder.createString(XMLEncoder.java:572)

解决方案:
经过查询, 是因为扫描的表的分区太多,上到3千到6千个分区,这样在对计划进行序列化时,仅仅是路径对象Path就会耗去大半Driver,如果Driver设置的heap太小,甚至都会OOM。
解决思路

① 减少分区数量,将历史数据做成一张整合表,做成增量数据表,这样分区就很少了。
② 调大Hive CLI Driver的heap size, 默认是256MB,调节成512MB或者更大。具体做法是在bin/hive bin/hive-config里可以找到启动CLI的JVM OPTIONS。这里我们设置
export HADOOP_HEAPSIZE=512

双管齐下, 即做成了整合,方便使用,又调节了Hive CLI Driver的heap size,保证线上的运行稳定。
遇到这种问题:
一是HiveQL的写法上,尽量少的扫描同一张表,并且尽量少的扫描分区。扫太多,一是job数多, 慢,二是耗费网络资源,慢。
二是Hive的参数调优和JVM的参数调优,尽量在每个阶段,选择合适的jvm max heap size来应对
OOM的问题。

  1. 简述MapReduce用了几次排序,分别是什么 ?

在Map任务和Reduce任务的过程中,一共发生了3次排序
1)当map函数产生输出时,会首先写入内存的环形缓冲区,当达到设定的阀值,在刷写磁盘之前,后 台线程会将缓冲区的数据划分成相应的分区。在每个分区中,后台线程按键进行内排序
2)在Map任务完成之前,磁盘上存在多个已经分好区,并排好序的,大小和缓冲区一样的溢写文件,这 时溢写文件将被合并成一个已分区且已排序的输出文件。由于溢写文件已经经过第一次排序,所有合并 文件只需要再做一次排序即可使输出文件整体有序。
3)在reduce阶段,需要将多个Map任务的输出文件copy到ReduceTask中后合并,由于经过第二次排序, 所以合并文件时只需再做一次排序即可使输出文件整体有序
在这3次排序中第一次是内存缓冲区做的内排序,使用的算法使快速排序,第二次排序和第三次排序都 是在文件合并阶段发生的,使用的是归并排序

  1. MapReduce中怎么处理一个大文件 ?

1、输入大文件时
1 // 小于这个数据时进行合并
2 conf.setLong(FileInputFormat.SPLIT_MINSIZE,10241024256L);
3 // 大于这个数据时进行切分
4 conf.setLong(FileInputFormat.SPLIT_MAXSIZE,102410241024);

2、输入大量小文件时
方式一:小文件先进行Merge操作再使用MapReduce
方式二:使用FileInputFormat子类CombineFileInputFormat重写RecordReader()将多个input path合并成一个InputSplit

Hadoop中用到了那些缓存机制? 问过的一些公司:大华(2021.07) 参考答案:
分布式缓存
就是在job任务执行前,将需要的文件拷贝到Task机器上进行缓存,提高mapreduce的执行效率