MapReduce的思想
MapReduce的思想核心是“先分再合,分而治之”。具体来说,MapReduce通过将复杂任务分解为若干个简单的子任务,这些子任务可以并行处理,彼此之间相对独立,没有依赖关系。MapReduce运行在yarn之上,一旦所有子任务完成,再将中间结果合并,形成对原始问题的完整解答。
MapReduce的工作流程主要包括两个阶段:Map阶段和Reduce阶段
- Map阶段: 负责将一个大的任务划分成小的任务,小任务之间不能有依赖关系
- Reduce阶段: 负责将Map阶段的结果进行汇总
MapReduce的八个步骤
- InputFormat:输入数据被分割成多个小数据块(splits),每个数据块可以在不同的服务器上并行处理12。
- Map阶段:Map任务对每个数据块进行处理,生成一系列的key/value对。Map任务通过RecordReader读取InputSplit,并调用用户定义的map函数处理这些数据12。
- Shuffle阶段:Map任务处理完数据后,将结果写入环形缓冲区。当缓冲区满时,数据会被写入本地磁盘,生成临时文件,并进行排序和合并操作3。
- Partition阶段:对Map输出的key/value对进行分区,决定每个key/value对应该由哪个Reduce任务处理12。
- Copy&Merge阶段:将分区后的数据复制到Reduce任务所在的节点,并进行合并操作1。
- Sort阶段:对Reduce任务接收到的数据进行全局排序3。
- Reduce阶段:Reduce任务对排序后的数据进行处理,生成最终结果12。
- OutputFormat:将Reduce任务的输出写入最终的输出文件中12。
每个步骤的详细解释和作用:
- InputFormat:将输入数据分割成多个小数据块,以便并行处理。例如,TextInputFormat会将文本文件按行分割12。
- Map阶段:Map任务读取分割后的数据块,执行用户定义的map函数,生成key/value对。这些结果存储在环形缓冲区中12。
- Shuffle阶段:当环形缓冲区满时,数据会被写入本地磁盘,生成临时文件。在写入前,会对数据进行排序和合并操作。如果设置了Combiner,还会在写入前对数据进行局部聚合3。
- Partition阶段:根据key或value及Reduce任务的数量,决定每个key/value对应该由哪个Reduce任务处理。默认使用HashPartitioner进行分区12。
- Copy&Merge阶段:将分区后的数据复制到Reduce任务所在的节点,并进行合并操作,生成全局排序后的数据集1。
- Sort阶段:Reduce任务接收到的数据进行全局排序,确保最终结果的有序性3。
- Reduce阶段:Reduce任务对排序后的数据进行处理,生成最终结果12。
- OutputFormat:将Reduce任务的输出写入最终的输出文件中,完成整个MapReduce过程12。
简单概述
- Map阶段2个步骤:
- 1. 设置 InputFormat 类, 将数据切分为 Key-Value(K1和V1) 对, 输入到第二步
- 2. 自定义 Map 逻辑(自己写代码), 将第一步的结果转换成另外的 Key-Value(K2和V2) 键值对对, 并输出结果
- Shuffle 阶段 4 个步骤
- 3. 分区(Partition)
- 4. 排序(Sort)
- 5. 规约(Combiner)
- 6. 分组(Group By)
- Reduce阶段2个步骤
-
7. 对多个 Map 任务的结果进行排序以及合并 , 编写 Reduce 函数实现自己的逻辑 , 对输入的Key-Value 进行处理 , 转为新的 Key-Value ( K3 和 V3 )输出
-
8. 设置 OutputFormat 处理并保存 Reduce 输出的 Key-Value 数据
词频统计实例
实现思路
代码实现
自定义WordCountMapper类,继承Mapper类,并重写Map方法将读取的<k1,v1>转换为<k2,v2>,并将 k2和v2写入上下文 发送到下一个阶段进行处理。
package cn.itcast.mapreuce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
四个泛型解释:
KEYIN :K1的类型 --java(Long)-->(LongWritable)
VALUEIN:VI的类型 --java(String)-->(Text)
KEYOUT:K2的类型 --java(String)-->(Text)
VALUEOUT:V2的类型 --java(Long)-->(LongWritable)
*/
// 1.继承 Mapper类
public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
// 2.重写 Map方法
// 将 <k1,v1> 转为 <k2,v2>
/* 参数
* key : k1 行偏移量
* value :v1 每一行的文本数据
* context :表示上下文对象,桥梁作用
* */
/*
如何将 <k1,v1> 转为 <k2,v2>
K1 V1
0 hello,world,hadoop
15 hdfs,hive,hello
---------------------------
K2 V2
hello 1
world 1
hdfs 1
hadoop 1
hello 1
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text text = new Text();
LongWritable longWritable = new LongWritable();
// 1.将一行数据的文本数据进行拆分
String[] split = value.toString().split(",");
// 2.遍历数组,组装 k2和v2
for (String word : split) {
// 3.将 k2和v2写入上下文 --将数据发送到下一个阶段进行处理
// context.write(new Text(word),new L
// ongWritable(1));
text.set(word);
longWritable.set(1);
context.write(text, longWritable);
}
}
}
自定义WordCountReduce类,继承Reduce类,并重写reduce方法将读取的<k2,v2>转换为<k3,v3>,并将 k3和v3写入上下文。
package cn.itcast.mapreuce;
/*
四个泛型解释:
KEYIN: K2类型
VALULEIN: V2类型
KEYOUT: K3类型
VALUEOUT:V3类型
*/
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, LongWritable,Text, LongWritable> {
//reduce方法作用: 将新的K2和V2转为 K3和V3 ,将K3和V3写入上下文中
/*
参数:
key : 新K2
values: 集合 新 V2
context :表示上下文对象
----------------------
如何将新的K2和V2转为 K3和V3
新 K2 V2
hello <1,1,1>
world <1,1>
hadoop <1>
------------------------
K3 V3
hello 3
world 2
hadoop 1
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
// 1.遍历集合将集合中的数字相加,得到v3
for (LongWritable value : values) {
count +=value.get();
}
// 2.将k3和v3写入上下文
context.write(key,new LongWritable(count));
}
}
定义JobMain主类用于启动MapReduce程序,将上述自定义的WordCountMapper类和WordCountReduce联系起来,并设置文件的输入输出流以及MapReduce程序的八个步骤。
package cn.itcast.mapreuce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class JobMain extends Configured implements Tool {
//该方法用于指定一个job任务
@Override
public int run(String[] strings) throws Exception {
// 1.创建 job任务对象 两个参数 1.configuration 2.job任务名称
Job job = Job.getInstance(super.getConf(), "wordcount");
// 打包运行出错添加
job.setJarByClass(JobMain.class);
// 2.配置 job任务对象(八个步骤)
// 2.1 读取文件 ---指定读取类
job.setInputFormatClass(TextInputFormat.class);
// 指点源文件路径
TextInputFormat.addInputPath(job,new Path("hdfs://hadoop01:9000/hadoop_mapreduce/wordcount"));
// --本地测试--
// TextInputFormat.addInputPath(job,new Path("file:///D:\\input_javaword"));
// 2.2 进入指定map阶段处理方式和数据类型
// 设置map阶段用的类
job.setMapperClass(WordCountMapper.class);
// 设置Map阶段K2的类型 --- 单词(字符串)
job.setMapOutputKeyClass(Text.class);
// 设置Map阶段V2的类型 --- 数字(long)
job.setMapOutputValueClass(LongWritable.class);
// 2.3(4,5,6) 进入Shuffle阶段 --先采用默认方式处理
// 2.7 指定Reduce阶段的处理方式和数据类型
job.setReducerClass(WordCountReducer.class);
// 设置Reduce阶段K3的类型 --- 单词(字符串)
job.setOutputKeyClass(Text.class);
// 设置Reduce阶段v3的类型 --- 单词(字符串)
job.setOutputValueClass(LongWritable.class);
// 2.8 设置输出类型
job.setOutputFormatClass(TextOutputFormat.class);
// 设置输出路径
// TextOutputFormat.setOutputPath(job,new Path("hdfs://hadoop01:9000/wordcount_out"));
// 判断目标目录是否存在,存在则删除
Path path = new Path("hdfs://hadoop01:9000/hadoop_mapreduce/wordcount_out");
TextOutputFormat.setOutputPath(job,path);
// 获取hdfs文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop01:9000/hadoop_mapreduce/"), new Configuration());
// --本地测试--
// FileSystem fileSystem = FileSystem.get(new URI("file:///D:\\output"), new Configuration());
// 判断目录是否存在
boolean exists = fileSystem.exists(path);
if (exists){
// 删除目标目录
fileSystem.delete(path,true);
}
// 等待任务结束
boolean bl = job.waitForCompletion(true);
return bl ? 0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
// 启动 job任务 --记录任务执行状态 0表示成功
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
程序启动
将源文件上传到上述定义的HDFS源文件夹中,并将MapReduce程序打包上传到Liunx系统中使用hadoop命令执行jar包运行。如下图所示先清空jar包再重新打包,在Liunx系统中上传压缩后的jab包即可。
运行指令:hadoop jar jar包名称 待运行主类的名称
hadoop jar hadoop_hdfs_operate-1.0-SNAPSHOT.jar demo1.JobMain
总结
上述实例实现了简单的MapReduce程序,对于Shuffle阶段的分区、排序、规约、分组均采用的是默认的方式。
分区实例
在 MapReduce 中, 通过我们指定分区, 会将同一个分区的数据发送到同一个 Reduce 当中进行处理
例如: 为了数据的统计, 可以把一批类似的数据发送到同一个 Reduce 当中, 在同一个 Reduce 当中统计相同类型的数据, 就可以实现类似的数据分区和统计等,其实就是相同类型的数据, 有共性的数据, 送到一起去处理。Reduce 当中默认的分区只有一个。
实现思路
与默认分区不同的是,自定义分区需自定义MyPartitioner类继承Partitioner并重新getPartition方法,之后在主类中需设置分区类和ReduceTask个数。指定的分区类在指定的Mapper类之后Reducer类之前,设置ReduceTack的个数在Reducer类之后
// 指定分区类 job.setPartitionerClass(MyPartitioner.class); // ------设置ReduceTack的个数------- job.setNumReduceTasks(2);
代码实现
// 自定义分区规则
package partition;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartitioner extends Partitioner<Text, NullWritable> {
// 实现功能 定义分区规则并返回对应的分区编号
@Override
public int getPartition(Text text, NullWritable nullWritable, int i) {
// 拆分行文本数据--k2数据,获取中奖字段的值
String[] split = text.toString().split("\t");
String numStr = split[5];
// 判断中奖字段的值和15的关系,返回对应的分区编号
if (Integer.parseInt(numStr)>15){
// 表示分区编号为 1
return 1;
}else {
// 表示分区编号为 0
return 0;
}
}
}
// 定义PartitionMapper类
package partition;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
四个泛型解释:
KEYIN :K1的类型 --java(Long)-->(LongWritable)
VALUEIN:VI的类型 --java(String)-->(Text)
KEYOUT:K2的类型 --java(String)-->(Text)
VALUEOUT:V2的类型 --NullWritable 站位
*/
// 2.重写 Map方法
// 将 <k1,v1> 转为 <k2,v2>
/* 参数
* key : k1 行偏移量
* value :v1 每一行的文本数据
* context :表示上下文对象,桥梁作用
* */
public class PartitionMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/*将 <k1,v1> 转为 <k2,v2>*/
context.write(value,NullWritable.get());
}
}
// 定义PartitionReducer 类
package partition;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/*
四个泛型解释:
KEYIN: K2类型 --Text
VALULEIN: V2类型 --NullWritable
KEYOUT: K3类型 --Text
VALUEOUT:V3类型 --NullWritable
*/
public class PartitionReducer extends Reducer<Text, NullWritable,Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// K2和V2转为 K3和V3
context.write(key,NullWritable.get());
}
}
// 定义启动类
package partition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
// 固定模版 继承类实现接口
public class JobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
// 1.创建 job任务对象 两个参数 1.configuration 2.job任务名称
Job job = Job.getInstance(super.getConf(), "partition_maperduce");
// 打包运行出错添加
job.setJarByClass(partition.JobMain.class);
// 2.配置 job任务对象(八个步骤)
// 2.1 读取文件 ---指定读取类
job.setInputFormatClass(TextInputFormat.class);
// 指点源文件路径
TextInputFormat.addInputPath(job,new Path("hdfs://hadoop01:9000/hadoop_mapreduce/partition"));
// 2.2 进入指定map阶段处理方式和数据类型
// 设置map阶段用的类
job.setMapperClass(PartitionMapper.class);
// 设置Map阶段K2的类型 --- 单词(字符串)
job.setMapOutputKeyClass(Text.class);
// 设置Map阶段V2的类型 --- 数字(long)
job.setMapOutputValueClass(NullWritable.class);
// 2.3(4,5,6) 进入Shuffle阶段 分区-排序-规约-分组
// 指定分区类
job.setPartitionerClass(MyPartitioner.class);
// 4,5,6采用默认
// 2.7 指定Reduce阶段的处理方式和数据类型
job.setReducerClass(PartitionReducer.class);
// 设置Reduce阶段K3的类型 --- 单词(字符串)
job.setOutputKeyClass(Text.class);
// 设置Reduce阶段v3的类型 --- 单词(字符串)
job.setOutputValueClass(NullWritable.class);
// ------设置ReduceTack的个数-------
job.setNumReduceTasks(2);
// 2.8 设置输出类型
job.setOutputFormatClass(TextOutputFormat.class);
// 设置输出路径
Path path = new Path("hdfs://hadoop01:9000/hadoop_mapreduce/partition_out");
TextOutputFormat.setOutputPath(job,path);
// 获取hdfs文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop01:9000/hadoop_mapreduce/"), new Configuration());
// 判断目录是否存在
boolean exists = fileSystem.exists(path);
if (exists){
// 删除目标目录
fileSystem.delete(path,true);
}
// 等待任务结束
boolean bl = job.waitForCompletion(true);
return bl ? 0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
// 启动 job任务 --记录任务执行状态 0表示成功
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
排序实例
- 序列化 (Serialization) 是指把结构化对象转化为字节流。
- 反序列化 (Deserialization) 是序列化的逆过程. 把字节流转为结构化对象. 当要在进程间传递对象或持久化对象的时候, 就需要序列化对象成字节流, 反之当要将接收到或从磁盘读取的字节流转换为对象, 就要进行反序列化。
- Java 的序列化 (Serializable) 是一个重量级序列化框架, 一个对象被序列化后, 会附带很多额外的信息 (各种校验信息, header, 继承体系等), 不便于在网络中高效传输. 所以, Hadoop 自己开发了一套序列化机制(Writable), 精简高效. 不用像 Java 对象类一样传输多层的父子关系, 需要哪个属性就传输哪个属性值, 大大的减少网络传输的开销。
- Writable 是 Hadoop 的序列化格式, Hadoop 定义了这样一个 Writable 接口. 一个类要支持可序列化只需实现这个接口即可。
- Writable 有一个子接口是 WritableComparable, WritableComparable 是既可实现序列化, 也可以对key进行比较, 我们这里可以通过自定义 Key 实现 WritableComparable 来实现排序功能。
a 1
a 9
b 3
a 7
b 8
b 10
a 5要求 :第一列按照字典顺序进行排列第一列相同的时候 , 第二列按照升序进行排列
实现思路
- 将 Map 端输出的 <key,value> 中的 key 和 value 组合成一个新的 key (newKey), value值不变
- 这里就变成 <(key,value),value> , 在针对 newKey 排序的时候, 如果 key 相同, 就再对 value进行排序
与默认排序不同的是,自定义排序需自定义SortBean类并重写WritableComparable接口,compareTo方法中实现自定义排序逻辑,write(readFields)方法实现序列化(反序列化)方法为固定格式写法,主类方面无需增添排序规则,会默认执行自定义的排序规则。
代码实现
// 自定义排序规则
package sort;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
// 序列化和排序
public class SortBean implements WritableComparable<SortBean> {
// 自定义成员变量
private String word;
private int num;
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
@Override
public String toString() {
return word + "\t" + num;
}
// 自定义比较器方法,指定升序排序规则 先排word——再排num
@Override
public int compareTo(SortBean o) {
// 字符串比较
int result = this.word.compareTo(o.word);
// 如果字符串相同则按照第二列排序
if (result == 0){
return this.num - o.num;
// 降序 return o.num - this.num;
}
return result;
}
// 序列化方法 固定格式
@Override
public void write(DataOutput dataOutput) throws IOException {
// 自定义字符串进行序列化
dataOutput.writeUTF(word);
// 自定义数字进行序列化
dataOutput.writeInt(num);
}
// 反序列化方法
@Override
public void readFields(DataInput dataInput) throws IOException {
// 自定义字符串进行反序列化
this.word = dataInput.readUTF();
// 自定义数字进行反序列化
this.num = dataInput.readInt();
}
}
// SortMapper 类
package sort;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
map方法将K1和V1转为K2和V2:
K1 V1
0 a 3
5 b 7
----------------------
K2 V2
SortBean(a 3) NullWritable
SortBean(b 7) NullWritable
*/
public class SortMapper extends Mapper<LongWritable, Text,SortBean, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1:将行文本数据(V1)拆分,并将数据封装到SortBean对象,就可以得到K2
String[] split = value.toString().split("\t");
SortBean sortBean = new SortBean();
sortBean.setWord(split[0]);
sortBean.setNum(Integer.parseInt(split[1]));
//2:将K2和V2写入上下文中
context.write(sortBean,NullWritable.get());
}
}
// SortReducer 类
package sort;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class SortReducer extends Reducer<SortBean, NullWritable,SortBean,NullWritable> {
@Override
protected void reduce(SortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//reduce方法将新的K2和V2转为K3和V3 ---直接向后传递
context.write(key, NullWritable.get());
}
}
// JobMain 主类
package sort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
// 固定模版 继承类实现接口
public class JobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
// 1.创建 job任务对象 两个参数 1.configuration 2.job任务名称
Job job = Job.getInstance(super.getConf(), "sort_maperduce");
// 打包运行出错添加
job.setJarByClass(sort.JobMain.class);
// 2.配置 job任务对象(八个步骤)
// 2.1 读取文件 ---指定读取类
job.setInputFormatClass(TextInputFormat.class);
// 指点源文件路径
TextInputFormat.addInputPath(job,new Path("hdfs://hadoop01:9000/hadoop_mapreduce/sort"));
// 2.2 进入指定map阶段处理方式和数据类型
// 设置map阶段用的类
job.setMapperClass(SortMapper.class);
// 设置Map阶段K2的类型 --- 单词(字符串)
job.setMapOutputKeyClass(SortBean.class);
// 设置Map阶段V2的类型 --- 数字(long)
job.setMapOutputValueClass(NullWritable.class);
// 2.3(4,5,6) 进入Shuffle阶段 分区-排序-规约-分组
// 排序只需指定排序规则即可
// 2.7 指定Reduce阶段的处理方式和数据类型
job.setReducerClass(SortReducer.class);
// 设置Reduce阶段K3的类型 --- 单词(字符串)
job.setOutputKeyClass(SortBean.class);
// 设置Reduce阶段v3的类型 --- 单词(字符串)
job.setOutputValueClass(NullWritable.class);
// 2.8 设置输出类型
job.setOutputFormatClass(TextOutputFormat.class);
// 设置输出路径
Path path = new Path("hdfs://hadoop01:9000/hadoop_mapreduce/sort_out");
TextOutputFormat.setOutputPath(job,path);
// 获取hdfs文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop01:9000/hadoop_mapreduce/"), new Configuration());
// 判断目录是否存在
boolean exists = fileSystem.exists(path);
if (exists){
// 删除目标目录
fileSystem.delete(path,true);
}
// 等待任务结束
boolean bl = job.waitForCompletion(true);
return bl ? 0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
// 启动 job任务 --记录任务执行状态 0表示成功
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
规约实例
每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能,是 MapReduce的一种优化手段之一。
1. combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件
2. combiner 组件的父类就是 Reducer
3. combiner 和 reducer 的区别在于运行的位置
Combiner 是在每一个 maptask 所在的节点运行
Reducer 是接收全局所有 Mapper 的输出结果
4. combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量
实现思路
- 1. 自定义一个 combiner 继承 Reducer,重写 reduce 方法
- 2. 在 job 中设置 job.setCombinerClass(CustomCombiner.class)
代码实现
下述代码继续以词频统计为实现对象,添加combine规约处理。与默认规约不同的是,自定义规约需自定义MyCombiner类重写Reducer类(本案例添加规约即提前执行一遍Reducer流程),之后在主类中需设置规约类 job.setCombinerClass(MyCombiner.class),在指定的Mapper类之后Reducer类之前。
// 自定义规约
package combiner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MyCombiner extends Reducer<Text, LongWritable,Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
// 1.遍历集合将集合中的数字相加,得到v3
for (LongWritable value : values) {
count +=value.get();
}
// 2.将k3和v3写入上下文
context.write(key,new LongWritable(count));
}
}
package combiner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
四个泛型解释:
KEYIN :K1的类型 --java(Long)-->(LongWritable)
VALUEIN:VI的类型 --java(String)-->(Text)
KEYOUT:K2的类型 --java(String)-->(Text)
VALUEOUT:V2的类型 --java(Long)-->(LongWritable)
*/
// 1.继承 Mapper类
public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
// 2.重写 Map方法
// 将 <k1,v1> 转为 <k2,v2>
/* 参数
* key : k1 行偏移量
* value :v1 每一行的文本数据
* context :表示上下文对象,桥梁作用
* */
/*
如何将 <k1,v1> 转为 <k2,v2>
K1 V1
0 hello,world,hadoop
15 hdfs,hive,hello
---------------------------
K2 V2
hello 1
world 1
hdfs 1
hadoop 1
hello 1
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text text = new Text();
LongWritable longWritable = new LongWritable();
// 1.将一行数据的文本数据进行拆分
String[] split = value.toString().split(",");
// 2.遍历数组,组装 k2和v2
for (String word : split) {
// 3.将 k2和v2写入上下文 --将数据发送到下一个阶段进行处理
// context.write(new Text(word),new L
// ongWritable(1));
text.set(word);
longWritable.set(1);
context.write(text, longWritable);
}
}
}
package combiner;
/*
四个泛型解释:
KEYIN: K2类型
VALULEIN: V2类型
KEYOUT: K3类型
VALUEOUT:V3类型
*/
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, LongWritable,Text, LongWritable> {
//reduce方法作用: 将新的K2和V2转为 K3和V3 ,将K3和V3写入上下文中
/*
参数:
key : 新K2
values: 集合 新 V2
context :表示上下文对象
----------------------
如何将新的K2和V2转为 K3和V3
新 K2 V2
hello <1,1,1>
world <1,1>
hadoop <1>
------------------------
K3 V3
hello 3
world 2
hadoop 1
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
// 1.遍历集合将集合中的数字相加,得到v3
for (LongWritable value : values) {
count +=value.get();
}
// 2.将k3和v3写入上下文
context.write(key,new LongWritable(count));
}
}
package combiner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class JobMain extends Configured implements Tool {
//该方法用于指定一个job任务
@Override
public int run(String[] strings) throws Exception {
// 1.创建 job任务对象 两个参数 1.configuration 2.job任务名称
Job job = Job.getInstance(super.getConf(), "add_combiner_wordcount");
// 打包运行出错添加
job.setJarByClass(JobMain.class);
// 2.配置 job任务对象(八个步骤)
// 2.1 读取文件 ---指定读取类
job.setInputFormatClass(TextInputFormat.class);
// 指点源文件路径
TextInputFormat.addInputPath(job,new Path("hdfs://hadoop01:9000/hadoop_mapreduce/add_combiner_wordcount"));
// --本地测试--
// TextInputFormat.addInputPath(job,new Path("file:///D:\\input_javaword"));
// 2.2 进入指定map阶段处理方式和数据类型
// 设置map阶段用的类
job.setMapperClass(WordCountMapper.class);
// 设置Map阶段K2的类型 --- 单词(字符串)
job.setMapOutputKeyClass(Text.class);
// 设置Map阶段V2的类型 --- 数字(long)
job.setMapOutputValueClass(LongWritable.class);
// 2.3(4,,6) 进入Shuffle阶段 --先采用默认方式处理
// 2.5 ---添加规约---
job.setCombinerClass(MyCombiner.class);
// 2.7 指定Reduce阶段的处理方式和数据类型
job.setReducerClass(WordCountReducer.class);
// 设置Reduce阶段K3的类型 --- 单词(字符串)
job.setOutputKeyClass(Text.class);
// 设置Reduce阶段v3的类型 --- 单词(字符串)
job.setOutputValueClass(LongWritable.class);
// 2.8 设置输出类型
job.setOutputFormatClass(TextOutputFormat.class);
// 设置输出路径
// TextOutputFormat.setOutputPath(job,new Path("hdfs://hadoop01:9000/wordcount_out"));
// 判断目标目录是否存在,存在则删除
Path path = new Path("hdfs://hadoop01:9000/hadoop_mapreduce/out_combiner_wordcount");
TextOutputFormat.setOutputPath(job,path);
// 获取hdfs文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop01:9000/hadoop_mapreduce/"), new Configuration());
// --本地测试--
// FileSystem fileSystem = FileSystem.get(new URI("file:///D:\\output"), new Configuration());
// 判断目录是否存在
boolean exists = fileSystem.exists(path);
if (exists){
// 删除目标目录
fileSystem.delete(path,true);
}
// 等待任务结束
boolean bl = job.waitForCompletion(true);
return bl ? 0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
// 启动 job任务 --记录任务执行状态 0表示成功
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
分组实例
分组是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义分组实现不同的key作为同一个组,调用一次reduce逻辑。
实例需求
求出每一个订单中成交金额最大的一笔交易,订单如下图所示。
实现思路
- 利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce
- 在reduce端利用分组将订单id相同的kv聚合成组,然后取第一个即是最大值
- 首先第一步定义一个OrderBean,里面定义两个字段,第一个字段是我们的orderId,第二个字段是我们的金额(注意金额一定要使用Double或者DoubleWritable类型,否则没法按照金额顺序排序)定义好排序规则。
- 第二步定义Mapper类封装OrderBean,得到K2,v2是传递过来的v1并将<k2,v2>写入上下文进行后续处理。
- 第三步自定义分区,按照订单id进行分区,把所有订单id相同的数据,都发送到同一个reduce中去。
- 第四步自定义分组按照我们自己的逻辑进行分组,通过比较相同的订单id,将相同的订单id放到一个组里面去,进过分组之后当中的数据,已经全部是排好序的数据,我们只需要取前topN即可
- 第五步定义Reduce类将每个分区中的第一条记录取出即实现需求
- 第六步定义JobMain类将上述步骤串起来打包放在Hadoop上运行查看结果
// 第一步
package mygrouping;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class OrderBean implements WritableComparable<OrderBean> {
// 成员变量 订单id
private String orderid;
// 成交金额
private Double price;
public String getOrderid() {
return orderid;
}
public void setOrderid(String orderid) {
this.orderid = orderid;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
@Override
public String toString() {
return orderid + '\t' + price ;
}
// 指定排序规则
@Override
public int compareTo(OrderBean o) {
// 1.比较订单id,若订单id一致则进行金额比较排序(降序)compareTo一致返回0
int i = this.orderid.compareTo(o.orderid);
if (i==0){
// 行金额比较排序(降序)
i = this.price.compareTo(o.price) * -1;
}
return i;
}
// 实现对象序列化
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(orderid);
dataOutput.writeDouble(price);
}
// 实现对象反序列化
@Override
public void readFields(DataInput dataInput) throws IOException {
this.orderid = dataInput.readUTF();
this.price = dataInput.readDouble();
}
}
// 第二步
package mygrouping;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class GroupMapper extends Mapper<LongWritable, Text,OrderBean,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1.拆分行文本数据,得到订单id和订单金额
String[] split = value.toString().split("\t");
// 2.封装OrderBean得到 K2
OrderBean orderBean = new OrderBean();
orderBean.setOrderid(split[0]);
orderBean.setPrice(Double.valueOf(split[2]));
// 3.写入上下文
context.write(orderBean,value);
}
}
// 第三步
package mygrouping;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class OrderPatition extends Partitioner<OrderBean, Text> {
// 分区规则 ---> 根据订单id实现分区
/*
* orderBean -----> k2
* text -----> v2
* i -----> ReduceTask个数
* 返回分区编号
* */
@Override
public int getPartition(OrderBean orderBean, Text text, int i) {
return (orderBean.getOrderid().hashCode() & 2147483647) % i;
}
}
// 第四步
package mygrouping;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/*
* 分组--->默认按照k2分组,k2相同的分到一个组
* 实现步骤:
* 1.继承 WritableComparator类
* 2.调用父类的有参构造
* 3.指定分组规则(重写方法)
* */
public class OrderGroupComparator extends WritableComparator {
// 自定义无参构造
public OrderGroupComparator() {
// 调用父类的有参构造 true允许创建 OrderBean实例
super(OrderBean.class,true);
}
// 指定分组规则
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 1.对形参做强类型转换
OrderBean first = (OrderBean) a;
OrderBean second = (OrderBean) b;
// 2.指定分组规则 first.getOrderid()与second.getOrderid()是否相同相同则分到同一组中
return first.getOrderid().compareTo(second.getOrderid());
}
}
// 第五步
package mygrouping;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class GroupReducer extends Reducer<OrderBean, Text,Text, NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// <k2,v2>转为<k3,v3>
int i = 0;
for (Text value : values) {
context.write(value,NullWritable.get()); // 默认输出集合中所有数据
i++;
if (i>1){
break;
}
}
}
}
// 第六步
package mygrouping;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class JobMain extends Configured implements Tool {
//该方法用于指定一个job任务
@Override
public int run(String[] strings) throws Exception {
// 1.创建 job任务对象 两个参数 1.configuration 2.job任务名称
Job job = Job.getInstance(super.getConf(), "mygrouping");
// 打包运行出错添加
job.setJarByClass(JobMain.class);
// 2.配置 job任务对象(八个步骤)
// 2.1 读取文件 ---指定读取类
job.setInputFormatClass(TextInputFormat.class);
// 指点源文件路径
TextInputFormat.addInputPath(job,new Path("hdfs://hadoop01:9000/hadoop_mapreduce/mygrouping"));
// 2.2 进入指定map阶段处理方式和数据类型
// 设置map阶段用的类
job.setMapperClass(GroupMapper.class);
// 设置Map阶段K2的类型 --- 单词(字符串)
job.setMapOutputKeyClass(OrderBean.class);
// 设置Map阶段V2的类型 --- 数字(long)
job.setMapOutputValueClass(Text.class);
// 2.3(4,5,6) 进入Shuffle阶段 --先采用默认方式处理
// -----分区-----
job.setPartitionerClass(OrderPatition.class);
// -----排序(定义好自动执行)-----
// -----分组-----
job.setGroupingComparatorClass(OrderGroupComparator.class);
// 2.7 指定Reduce阶段的处理方式和数据类型
job.setReducerClass(GroupReducer.class);
// 设置Reduce阶段K3的类型 --- 单词(字符串)
job.setOutputKeyClass(Text.class);
// 设置Reduce阶段v3的类型 --- 单词(字符串)
job.setOutputValueClass(NullWritable.class);
// ----设置 ReduceTesk个数(默认一个)
// 2.8 设置输出类型
job.setOutputFormatClass(TextOutputFormat.class);
// 设置输出路径
// TextOutputFormat.setOutputPath(job,new Path("hdfs://hadoop01:9000/wordcount_out"));
// 判断目标目录是否存在,存在则删除
Path path = new Path("hdfs://hadoop01:9000/hadoop_mapreduce/mygrouping_out");
TextOutputFormat.setOutputPath(job,path);
// 获取hdfs文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop01:9000/hadoop_mapreduce/"), new Configuration());
// --本地测试--
// FileSystem fileSystem = FileSystem.get(new URI("file:///D:\\output"), new Configuration());
// 判断目录是否存在
boolean exists = fileSystem.exists(path);
if (exists){
// 删除目标目录
fileSystem.delete(path,true);
}
// 等待任务结束
boolean bl = job.waitForCompletion(true);
return bl ? 0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
// 启动 job任务 --记录任务执行状态 0表示成功
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}