Hadoop3.x从入门到放弃,第五章:MapReduce基本概念与操作
一、MapReduce概述
1、MapReduce定义
MapReduce是一个【分布式运算程序】的编程框架,是基于Hadoop的数据分析应用的核心框架。
MapReduce核心功能是将【用户编写的业务逻辑代码】和【自带默认组件】整合成一个完整的【分布式运算程序】,
【并发】运行在一个Hadoop集群上
2、优缺点
优点:
1、易于编程。用户只关心,业务逻辑。实现框架的接口。
2、良好扩展性:可以动态增加服务器,解决计算资源不够的问题
3、高容错性。任何一台机器挂掉,可以将任务转移到其他节点
4、适合海量数据计算(TB/PB)几千台服务器共同计算。
缺点:
1、不擅长实时计算。 Mysgl
2、不擅长流式计算。 Sparkstreaming flink3、不擅长DAG有向无环图计算。spark
3、核心思想
1、MapReduce运算程序一般需要分成2个阶段【Map阶段】和【Reduce阶段】
2、Map阶段的并发MapTask,完全并行运行,互不相干
3、Reduce阶段的并发ReduceTask,完全互不相干但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出
4、MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行
##思考
1、MapTask如何工作
2、ReduceTask 如何工作
3、MapTask如何控制分区、排序等
4、MapTask和ReduceTask之间如何衔接
4、MapReduce进程
一个完整的 MapReduce 程序在分布式运行时有三类实例进程:
(1)、MrAppMaster:负责整个程序的过程调度及状态协调。
(2)、MapTask:负责Map阶段的整个数据处理流程。
(3)、ReduceTask:负责Reduce阶段的整个数据处理流程。←
5、WordCount源码
##源码位置
/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar
public class WordCount {
/**
* Mapper: 入参:前两个Object Text 是 要读取的 key value
* 后两个Text IntWritable 是 要写入的 key value
**/
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
/**
* Reducer: 入参:前两个Text IntWritable 是 要读取的 key value (对应 Mapper的输出)
* 后两个Text IntWritable 是 要写入的 key value
**/
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
6、常用的数据序列化类型
Java类型 | Hadoop Writable类型 (org.apache.hadoop.io.*) |
---|---|
Boolean | BooleanWritable |
Byte | ByteWritable |
Int | IntWritable |
Float | FloatWritable |
Long | LongWritable |
Double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
Null | NullWritable |
7、MapReduce编程规范
1>、Mapper阶段
(1)用户自定义的Mapper要继承自己的父类
(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)
(3)Mapper中的业务逻辑写在map()方法中
(4)Mapper的输出数据是KV对的形式(KV的类型可自定义)
(5)map()方法(MapTask进程)对每一个<K,V>调用一次
2>、Reducer阶段
(1)用户自定义的Reducer要继承自己的父类
(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
(3)Reducer的业务逻辑写在reduce()方法中
(4)ReduceTask进程对每一组相同k的<k,v>组调用一次reduce()方法
3>、Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的
job对象
(1)获取配置信息,获取job对象实例
(2)指定本程序的jar包所在的本地路径--->hadoop-mapreduce-examples-3.1.3.jar
(3)关联Mapper/Reducer业务类--->TokenizerMapper/IntSumReducer
(4)指定Mapper输出数据的kv类型
(5)指定最终输出的数据的kv类型
(6)指定job的输入原始文件所在目录--->wcinput
(7)指定job的输出结果所在目录提交作业--->wcoutput
(8)提交作业
8、单词计数案例
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.0</version>
</dependency>
输入: 输出:
lee fei lee 2 an 1
fei ren fei 2 ning 1
ya ling ren 1
lee an ya 1
ning ling 1
package com.lee.hadoop.mapReduce.wordCount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @description 单词计数Mapper
* @author Lee
* @date 2024/4/8 16:57
* @version 1.0
* @入参:
* KEYIN, Mapper的输入Key, 这里是 输入文字的偏移量用,LongWritable
* VALUEIN, Mapper的输入value,这里是输入的文字,Text
* KEYOUT, Mapper的输出Key,这里是 输出的文字 Text
* VALUEOUT Mapper的输出value,这里是输出文件的数 IntWritable
*/
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
private Text outKey = new Text();
private IntWritable outValue = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
//读取一行数据
String line = value.toString();
//拆分,有的行的内容如 lee lee
String[] words = line.split(" ");
for (String word : words) {
outKey.set(word);
context.write(outKey,outValue);
}
}
}
package com.lee.hadoop.mapReduce.wordCount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* @description 单词计数Reducer
* @author Lee
* @date 2024/4/8 16:57
* @version 1.0
* @入参:
* KEYIN, Mapper的输入Key, 这里是 输入的文字,Text
* VALUEIN, Mapper的输入value,这里是输入文字的数量,IntWritable
* KEYOUT, Mapper的输出Key,这里是 输出的文字 Text
* VALUEOUT Mapper的输出value,这里是输出文件的数 IntWritable
*/
public class WordCountReducer extends Reducer<Text, IntWritable,Text, IntWritable> {
private IntWritable outValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count = count + value.get();
}
outValue = new IntWritable(count);
//写出
context.write(key,outValue);
}
}
package com.lee.hadoop.mapReduce.wordCount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @description 单词计数Driver
* @author Lee
* @date 2024/4/8 16:57
* @version 1.0
*/
public class WordCountDriver {
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
//1、获取Job
Configuration configuration = new Configuration();
Job job = new Job(configuration);
//2、设置jar包路径
job.setJarByClass(WordCountDriver.class);
//3、设置执行的mapper和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4、设置mapper的key value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5、设置输出的key value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6、设置输入文件的路径 和 输出的文件路径
FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\input\\myWords.txt"));
FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop\\output1"));
//7、提交
job.waitForCompletion(true);
}
}
集群环境运行
hadoop jar wc.jar com.lee.hadoop.mapReduce.wordCount.WordCountDriver /input/myWords.txt /output
二、序列化
1、什么是序列化和反序列化
>序列化:将内存中的对象,转换成字节序列以便于存储到磁盘(持久化)和网络传输
(将内存中的对象序列化成字节码方便传输)
>反序列化:将收到的字节序列 或 磁盘的持久化数据 转换成内存中的对象
2、为什么要序列化
一般来说,“活的”对象只能存在于内存里,关机断电就没有了。 且只能被本地的进程使用,不能被发送到另外
的机器上。序列化却可以存储“活的”对象,也可以将其发送到远程计算机上。
【简单说就是:一个系统的内存对象没法往另一个系统传,只能先转成字节码传过去然后再转成内存对象】
【比如 MapTask 和 ReduceTask不在一台服务器上】
3、为什么不使用Java的序列化
Java的序列化是一个重量级序列化框架,对象被序列化后会附带很多额外信息(校验信息、Header、继承体系)。不便于网络高效传输。
4、优点
结构紧凑:存储空间少
快速:传输速度快
互操作性:多语言
5、序列化接口
@Public
@Stable
public interface Writable {
void write(DataOutput var1) throws IOException;
void readFields(DataInput var1) throws IOException;
}
6、自定义序列化
##1、必须实现Writable接口
##2、反序列化时,需要反射调用空参构造函数,所以必须有空参构造函数
##3、重写序列化方法
##4、重写反序列化方法
##5、注意反序列化的顺序和序列化的顺序要完全一致
##6、想要把结果显示在文件中,许重写toString(),可以用"\t"分开,方便后续用
##7、如果需要将自定义的bean放在 key 中传输,则还需要实现 Comparable 接口,因为MapReduce框中的
Shutle 过程要求对 key 必须能排序。详见后面排序案例。←
7、操作案例
##需求:
统计每个手机号耗费的上行流量、下行流量、总流量
> 数据格式:
id 手机号 ip 上行流量 下行流量 网络状态码
> 期望输出格式:
手机号码 上行流量 下行流量 总流量
> 输入数据:
1 18310103915 www.baidu.com 1024 20480 200
2 13930071233 www.bilibili.com 2048 30970 200
3 15097846807 www.hao123.com 1024 37829 200
4 18310103915 www.bilibili.com 1076 10520 200
5 15097846807 www.baidu.com 1024 10000 200
package com.lee.hadoop.mapReduce.writable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @author Lee
* @version 1.0
* @description 流量统计bean (序列化)
* @date 2024/4/9 13:40
* 1、implements Writable序列化接口
* 2、实现write 和 read 方法
* 3、实现无参构造
* 4、实现toString方法
*/
public class FlowBean implements Writable {
private long upFlow;//上行流量
private long downFlow;//下行流量
private long totalFlow;//总流量
public FlowBean() {
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getTotalFlow() {
return totalFlow;
}
public void setTotalFlow(long totalFlow) {
this.totalFlow = totalFlow;
}
public void setTotalFlow() {
this.totalFlow = this.upFlow + this.downFlow;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(totalFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.totalFlow = dataInput.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + totalFlow;
}
}
package com.lee.hadoop.mapReduce.writable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author Lee
* @version 1.0
* @description 序列化Mapper
* @date 2024/4/9 13:46
* id 手机号 ip 上行流量 下行流量 网络状态码
* 1 18310103915 www.baidu.com 1024 20480 200
* 2 13930071233 www.bilibili.com 2048 30970 200
* 3 15097846807 www.hao123.com 1024 37829 200
* 4 18310103915 www.bilibili.com 1076 10520 200
* 5 15097846807 www.baidu.com 1024 10000 200
*/
public class WritableMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private FlowBean flowBean = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split("\\s+");
flowBean.setUpFlow(Long.parseLong(words[3]));
flowBean.setDownFlow(Long.parseLong(words[4]));
flowBean.setTotalFlow();
context.write(new Text(words[1]), flowBean);
}
}
package com.lee.hadoop.mapReduce.writable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author Lee
* @version 1.0
* @description
* @date 2024/4/9 14:13
*/
public class WritableReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
private FlowBean flowBean = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
long upFlow = 0;
long downFlow = 0;
for (FlowBean flowBean : values) {
upFlow = upFlow + flowBean.getUpFlow();
downFlow = downFlow + flowBean.getDownFlow();
}
flowBean.setUpFlow(upFlow);
flowBean.setDownFlow(downFlow);
flowBean.setTotalFlow();
context.write(key, flowBean);
}
}
package com.lee.hadoop.mapReduce.writable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
import java.io.IOException;
/**
* @author Lee
* @version 1.0
* @description
* @date 2024/4/9 14:18
*/
public class WritableDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1、初始化Job
Configuration configuration = new Configuration();
Job job = new Job(configuration);
//2、设置jar包位置
job.setJarByClass(WritableDriver.class);
//3、设置mapper和reducer
job.setMapperClass(WritableMapper.class);
job.setReducerClass(WritableReducer.class);
//4、设置mapper输出格式的key value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//5、设置输出的k-v
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//6、设置input和output位置
// FileInputFormat.setInputPaths(job,new Path("D:\\hadoop\\input\\phoneFlow.txt"));
// FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop\\ouput-phone"));
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//7、提交
boolean res = job.waitForCompletion(true);
System.out.println("jobId:"+job.getJobID());
System.out.println("jobName:"+job.getJobName());
System.out.println("jobFile:"+job.getJobFile());
System.out.println("jobState:"+job.getJobState());
System.out.println("jobHistoryUrl:"+job.getHistoryUrl());
System.out.println(res);
}
}
三、核心框架原理
1、InputFormat输入数据
1.1、切片与MapTask 并行度决定机制
###问题引出
MapTask 的并行度决定 Map 阶段的任务处理并发度,进而影响到整个Job 的处理速度。
思考:
1G 的数据,启动8个MapTask,可以提高集群的并发处理能力。
那么 1K的数据,也启动8个 MapTask,会提高集群性能吗?
MapTask 并行任务是否越多越好呢?
哪些因素影响了 MapTask 并行度
###MapTask并行度决定机制
数据块: Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位(默认128m一块)
数据切片: 数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。
数据切片是MapReduce程序计算输入数据的单位,【一个切片会对应启动一个MapTask】
1)一个Job的Map阶段并行度由客户端在提交Job时的切片数决定
2)每一个Split切片分配一个MapTask并行实例处理
3)默认情况下,切片大小=BlockSize (集群一般是128M 或 256M)(本地运行模式是32M)
4)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
(比如要分析2个文件,一个300M 一个 100M,那么就会切成 4片,分别是128M,128M,44M,100M)
(不是所有的文件格式都支持切片的,比如某些压缩算法就不支持)
1.2、Job提交流程源码 + 切片源码分析
Job提交流程源码【重要】
1、job.waitForCompletion(true);
2、submit()
3、this.ensureState(Job.JobState.DEFINE)//校验JOB状态
4、this.setUseNewAPI(); //判断是使用哪个版本的API,根据mapper和reducer的包路径判断
3、建立连接
connect():
//1)创建提交JOB的代理
new Cluster(Job.this.getConfiguration())
//1) 判断是本地环境运行 (LocalJobRunner) 还是 yarn集群环境 (YarnRunner)
initialize(InetSocketAddress jobTrackAddr, Configuration conf)
4、提交JOB
submitter.submitJobInternal(Job.this, Job.this.cluster);
//0)输出路径校验
checkSpecs(Job job)
//1)创建给集群提交数据的Stag路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//2)获取JobId,并创建Job路径
JobID jobId = this.submitClient.getNewJobID();
//3)拷贝jar包到集群
this.copyAndConfigureFiles(job, submitJobDir);
//4)计算切片,生成切片规划文件
int maps = this.writeSplits(job, submitJobDir);
conf.setInt("mapreduce.job.maps", maps);
//5)向Stag路径写XML配置文件(各种集群默认配置)
this.writeConf(conf, submitJobFile);
conf.writeXml(out);
//6)提交JOB,返回提交状态
status = this.submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
切片源码【重要】
####计算切片,生成切片规划文件
int maps = this.writeSplits(job, submitJobDir);
conf.setInt("mapreduce.job.maps", maps);
//获取切片信息(input 读取 FileInputFormat)
List<InputSplit> splits = input.getSplits(job);
//1)程序先找到数据存储的目录
//2) 开始遍历处理(规划切片)目录下的每个文件
List<FileStatus> files = this.listStatus(job);
Iterator var9 = files.iterator();
while(var9.hasNext()) {
}
//3)遍历第一个文件
> 获取文件大小 long length = file.getLen();
> 计算切片大小 long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
> 默认情况下,切片大小=blocksize配置 128M
> 开始切
(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分为一块切片)
> 将切片信息写到切片规划文件中
> 整个切片的核心过程在getSplits()方法中完成
> InputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在节点列表等
//4)提交切片规划文件到Yarn上,Yarn上的MrAppMaster就可以根据切片规划文件计算开启的MapTask个数
(一个MapTask开启少说要消耗1G内存1个CPU)
1.3、FileInputFormat切片机制
##切片机制
(1)简单地按照文件的内容长度进行切片
(2)切片大小,默认等于Block大小
(3)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
##案例分析
比如输入两个文件file1.txt 300M file2.txt 10M
经过FileInputFormat切片后,切片信息如下
file1.txt.split1=0~128
file1.txt.split2=129~256
file1.txt.split3=257~300
file2.txt.split1=0~10
##源码中计算切片大小的公式
Math.max(minSize, Math.min(maxSize,blockSize));
mapreduce.input.fileinputformat.split.minsize=1默认值为1mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue
因此,默认情况下,切片大小=blockSize
##切片大小设置
maxsize(切片最大值): 参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数的值。
minsize(切片最小值): 参数调的比blockSize大,则可以让切片变得比blockSize还大
##获取切片信息API
// 获取切片的文件名称
String name =inputSplit.getPath().getName();
//根据文件类型获取切片信息
FileSplit inputSplit=(FileSplit)context.getInputSplit();
思考:
在运行 MapReduce 程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。
那么,针对不同的数据类型,MapReduce 是如何读取这些数据的呢?
FileInputFormat 常见的接口实现类包括:
TextInputFormat、KeyValueTextInputFormat、
NLineInputFormat、CombineTextInputFormat 和 自定义InputFormat 等。
##TextInputFormat
TextInputFormat是Hadoop的默认InputFormat实现方式。且是FileInputFormat的一个实现类。
【按任务文件规划切片,不管文件多小(不超过128M),都会是一个单独的切片】
##CombineTextInputFormat
CombineTextlnputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到个切片中,
这样,多个小文件就可以交给一个MapTask 处理。
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值
WordCountDriver增加如下代码:
//如果不设置 InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置4m
CombineTextInputFormat.setMaxInputSplitsize(job,4194304);
看日志:
number of splites:3 表示有3个切片
package com.lee.hadoop.mapReduce.combineTextInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author Lee
* @version 1.0
* @description 单词计数Driver
* @date 2024/4/8 16:57
*/
public class CombineWordCountDriver {
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
//1、获取Job
Configuration configuration = new Configuration();
Job job = new Job(configuration);
//2、设置jar包路径
job.setJarByClass(CombineWordCountDriver.class);
//3、设置执行的mapper和reducer
job.setMapperClass(CombineWordCountMapper.class);
job.setReducerClass(CombineWordCountReducer.class);
//4、设置mapper的key value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5、设置输出的key value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置使用哪个inputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置4m
CombineTextInputFormat.setMaxInputSplitSize(job,4194304);
//6、设置输入文件的路径 和 输出的文件路径
FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\inputcombine"));
FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop\\output1"));
//7、提交
job.waitForCompletion(true);
}
}
2、MapReduce工作流程
3、Shuffle机制【重点】
Map方法之后,Reduce方法之前的数据处理过程 称之为 Shuffle.
3.1、Partition分区
##问题引出:
要求将统计结果按照条件输出到不同文件中(分区)。
比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
##默认Partitioner分区
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
//根据key进行hash,用户没办法控制那个key进哪个分区,影响生成多少个output文件
public int getPartition(K2 key, V2 value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
【numReduceTasks 可以 在driver中进行设置:job.setNumReduceTasks(2);】
源码如何走到默认的partitioner的
写的mapper里 context.write(outKey,outValue);--->
WrappedMapper-->TaskInputOutputContextImpl-->MapTask
this.collector.collect(key, value, this.partitioner.getPartition(key, value, this.partitions)); 【这里就是shuffle的环形缓冲区】--->当numReduceTasks大于1时默认为HashPartitioner.getPartition(),如果numReduceTasks=1直接给值0
3.2、自定义partitioner
## 自定义集成Partitioner,重写getPartition()方法
## job.setPartitionerClass(xxxx.class);
## 根据自定义partitioner的逻辑设置相应数量的ReduceTask
job.setNumReduceTasks(5);//不设置默认是1,就不会走 上面自定义的partitioner
public class CustomPartitioner extends Partitioner<Text, FlowBean> {
/**
*
* @param text key
* @param flowBean value
* @param i partition数量
* @return
*/
public int getPartition(Text text, FlowBean flowBean, int i) {
String phone = text.toString();
String prefix = phone.substring(0, 3);
if(prefix.equals("183")){
return 0;
}else if(prefix.equals("150")){
return 1;
} else if (prefix.equals("139")) {
return 2;
}else{
return 3;
}
}
}
job.setPartitionerClass(CustomPartitioner.class);
job.setNumReduceTasks(4);
【总结】:
(1)如果ReduceTask的数量>getPartition的结果数,则会多空的输出文件part-r-000xx;
(2)如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;
(3)如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个
ReduceTask,最终也就只会产生一个结果文件 part-r-00000;
(4)分区号必须从零开始,逐一累加。
3.3、WritableComparable排序
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。
任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
##为什么要进行排序:
方便Reduce里进行合并(不过map里不进行排序,Reduce里也得进行排序,反而效率变慢)
> 对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,
(到达一定数量之后,要往磁盘溢写之前进行排序,且是在内存中完成的排序)
再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,
它会对磁盘上所有文件进行归并排序。
> 对于ReduceTask它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,
否则存储在内存中。如果磁盘上文件数目达到定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文
件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask
统一对内存和磁盘上的所有数据进行一次归并排序。
##排序分类
> 部分排序
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序.
> 全排序【慎用】
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型
文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
>辅助排序(GroupingComparator分组)【用的比较少】
在Reduce端对key进行分组。应用于:在接收的keygbean对象时,想让一个或几个字段相同(全部字段比
较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
>二次排序
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
##需要实现 WritableComparable接口重写 compareTo 方法
###案例 对上面 流量统计(序列化的)案例,按总流量大小倒序进行排序
需要将FlowBean 对象 实现WraitableComparable接口重写CompareTo方法
@Override
public int compareTo(FlowBean o){
//倒序排列,按照总流量从大到小
return this.sumFlow > o.getSumFlow() ? -1:1;
}
Mapper类
context.write(bean,手机号);
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow;//上行流量
private long downFlow;//下行流量
private long totalFlow;//总流量
public FlowBean() {
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getTotalFlow() {
return totalFlow;
}
public void setTotalFlow(long totalFlow) {
this.totalFlow = totalFlow;
}
public void setTotalFlow() {
this.totalFlow = this.upFlow + this.downFlow;
}
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(totalFlow);
}
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.totalFlow = dataInput.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + totalFlow;
}
public int compareTo(FlowBean o) {
if (this.totalFlow > o.getTotalFlow()) {
return -1;
} else if (this.totalFlow < o.getTotalFlow()) {
return 1;
}
return 0;
}
}
public class WritableMapper extends Mapper<LongWritable, Text, FlowBean,Text> {
private FlowBean outKey = new FlowBean();
private Text outValue = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split("\\s+");
String phone = words[0];
outValue.set(phone);
outKey.setUpFlow(Long.parseLong(words[1]));
outKey.setDownFlow(Long.parseLong(words[2]));
outKey.setTotalFlow(Long.parseLong(words[3]));
context.write(outKey,outValue);
}
}
public class WritableReducer extends Reducer<FlowBean,Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value,key);
}
}
}
public class WritableDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1、初始化Job
Configuration configuration = new Configuration();
Job job = new Job(configuration);
//2、设置jar包位置
job.setJarByClass(WritableDriver.class);
//3、设置mapper和reducer
job.setMapperClass(WritableMapper.class);
job.setReducerClass(WritableReducer.class);
//4、设置mapper输出格式的key value
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
//5、设置输出的k-v
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//6、设置input和output位置
FileInputFormat.setInputPaths(job,new Path("D:\\hadoop\\ouput-phone4"));
FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop\\output5"));
//7、提交
boolean res = job.waitForCompletion(true);
System.out.println("jobId:"+job.getJobID());
System.out.println("jobName:"+job.getJobName());
System.out.println("jobFile:"+job.getJobFile());
System.out.println("jobState:"+job.getJobState());
System.out.println("jobHistoryUrl:"+job.getHistoryUrl());
System.out.println(res);
}
}
###结果输出:
15097846807 2048 47829 49877
18310103915 2100 31000 33100
13930071233 2048 30970 33018
3.4、二次排序
##接上面2.3的案例,如果 有两个手机号的总流量相等,那么按 上行流量进行升序
public int compareTo(FlowBean o) {
if (this.totalFlow > o.getTotalFlow()) {
return -1;
} else if (this.totalFlow < o.getTotalFlow()) {
return 1;
} else {
if(this.upFlow > o.getUpFlow()) {
return 1;
}else if(this.upFlow<o.getUpFlow()){
return -1;
}else{
return 0;
}
}
}
2.5、区内排序
##接上面2.4的案例,现在想 将 手机号 进行分区,183一个区,139和150一个区,区内按总流量降序。
总流量相等按上行流量升序
##此时除了上面的实现 WritableComparator的compareTo接口,还需要 自定义分区
public class CustomPartitioner extends Partitioner<Text, FlowBean> {
/**
*
* @param text key
* @param flowBean value
* @param i partition数量
* @return
*/
public int getPartition(Text text, FlowBean flowBean, int i) {
String phone = text.toString();
String prefix = phone.substring(0, 3);
if(prefix.equals("183")){
return 0;
}else if(prefix.equals("150") || prefix.equals("139")){
return 1;
}else{
return 2;
}
}
}
3.5、Combiner合并
(1)、Combiner是MR程序中Mappe1和Reducer之外的一种组件。(环形缓冲区溢写之前做的处理)
(2)、Combiner组件的父类就是Reducer。
(3)、Combiner和Reduce1的区别在于运行的位置
Combiner是在每一个MapTask所在的节点运行
Reducer是接收全局所有Mapper的输出结果;
(4)、Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。
(5)、Combiner能够应用的前提是不能影响最终的业务逻辑,而且,
Combiner的输出kv应该跟Reduicer的输入kv类型要对应起来。
##如 Mapper Reducer
3 5 7 --> (3+5+7)/3=5 (3+5+7+2+6)/5=23/5 不等于 (5+4)/2=9/2
2 6 --> (2+6)/2=4
上面的内容就影响了最终的业务逻辑,所以 一般Combiner一般不做 求平均值
##自定义Combiner实现步骤:
1、自定义一个Combiner继承Reducer,重写Reduce方法
2、在Job驱动类中设置
job.setCombinerClass(xxxx.class);
###【注意通常可以直接使用自己的Reducer来代替combiner】
如:job.setCombinerClass(CombineWordCountReducer.class);
4、输出的数据OutputFormat
OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。
【默认的实现类是TextOutputFormat,按行写】
【应用场景】:
eg: 输出数据到MYSQL / HBASE / ES
【自定义OutputFormat步骤】:
> 自定义一个类 继承FileOutputFormat.
> 改写RecordWriter,具体改写输出数据的方法write().
4.1、实例
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
private Text outKey = new Text();
private IntWritable outValue = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
//读取一行数据
String line = value.toString();
//拆分,有的行的内容如 lee lee
String[] words = line.split(" ");
for (String word : words) {
outKey.set(word);
context.write(outKey,outValue);
}
}
}
public class WordCountReducer extends Reducer<Text, IntWritable,Text, IntWritable> {
private IntWritable outValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count = count + value.get();
}
outValue = new IntWritable(count);
//写出
context.write(key,outValue);
}
}
package com.lee.hadoop.mapReduce.outputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author Lee
* @version 1.0
* @description 自定义outputFormat, 功能就是将 各个单词 分开输出,如Lee单独写到一个文件里,其他的单独写一个文件中,
* @date 2024/4/17 9:12
*/
public class MyCustomOutputFormat extends FileOutputFormat<Text, IntWritable> {
public RecordWriter<Text, IntWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
MyRecordWriter myRecordWriter = new MyRecordWriter(taskAttemptContext);
return myRecordWriter;
}
}
package com.lee.hadoop.mapReduce.outputFormat;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
/**
* @author Lee
* @version 1.0
* @description
* @date 2024/4/17 9:15
*/
public class MyRecordWriter extends RecordWriter<Text, IntWritable> {
private FSDataOutputStream guiguOut;
private FSDataOutputStream otherOut;
public MyRecordWriter(TaskAttemptContext context) throws IOException {
FileSystem fs = FileSystem.get(context.getConfiguration());
guiguOut = fs.create(new Path("D:\\hadoop\\myrecord\\guigu.txt"));
otherOut = fs.create(new Path("D:\\hadoop\\myrecord\\other.txt"));
}
public void write(Text text, IntWritable intWritable) throws IOException, InterruptedException {
String word = text.toString();
if("lee".equals(word)){
guiguOut.write((word+" "+intWritable+"\n\t").getBytes());
}else{
otherOut.write((word+" "+intWritable+"\n\t").getBytes());
}
}
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
IOUtils.closeStream(guiguOut);
IOUtils.closeStream(otherOut);
}
}
/**
* @author Lee
* @version 1.0
* @description 单词计数Driver
* @date 2024/4/8 16:57
*/
public class WordCountDriver {
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
//1、获取Job
Configuration configuration = new Configuration();
Job job = new Job(configuration);
//2、设置jar包路径
job.setJarByClass(WordCountDriver.class);
//3、设置执行的mapper和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4、设置mapper的key value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5、设置输出的key value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(MyCustomOutputFormat.class);
//6、设置输入文件的路径 和 输出的文件路径
FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\input\\myWords.txt"));
//虽然我们自定义了outputFormat,但是因为我们的outputFormat继承子fileOutputFormat
//而fileOutputFormat要输出一个_SUCCESS文件、所以在这害的制定一个输出目录
FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop\\output1111"));
//7、提交
job.waitForCompletion(true);
}
}
5、MapTask工作机制
(1)Read阶段:MapTask通过InputFormat获得的RecordReader,从输入InputSplit中解析出一个个key/value。
(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,
它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,
先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
溢写阶段详情:
步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。
(5)Merge阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。
在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并mapreduce.task.io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
6、ReduceTask工作机制
(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
(2)Sort阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。
由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
(3)Reduce阶段:reduce()函数将计算结果写到HDFS上。
7、ReduceTask并行度决定机制
## MapTask并行度由切片个数决定,切片个数由输入文件和切片规则决定。
## ReduceTask并行度由谁决定?
// 默认值是1
job.setNumReduceTasks(4);
(1)ReduceTask=0,表示没有Redce阶段,输出文件个数和Map个数 致。
(2)ReduceTask默认值就是1,所以输出文件个数为一个。
(3)如果数据分布不均匀,就有可能在Reduce阶段产生数据倾斜
需要计算全
(4)ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,局汇总结果,就只能有1个ReciceTask。
(5)具体多少个ReduceTask,需要根据集群性能而定。
(6)如果分区数不是1,但是ReduceTask为1,是否执行分区过程。答案是:不执行分区过程。因为在MapTask的源码中,
执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行。
四、Join
Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,
其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于
不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。
订单数据表:
id | pid | amount |
---|---|---|
1001 | 01 | 1 |
1002 | 02 | 2 |
1003 | 03 | 3 |
1004 | 01 | 4 |
1005 | 02 | 5 |
1006 | 03 | 6 |
商品表
pid | pname |
---|---|
01 | 小米 |
02 | 华为 |
03 | 格力 |
最终要的格式
id | pname | amount |
---|---|---|
1001 | 小米 | 1 |
1004 | 小米 | 4 |
1002 | 华为 | 2 |
1005 | 华为 | 5 |
1003 | 格力 | 3 |
1006 | 格力 | 6 |
1、Reduce Join
public class TableBean implements Writable {
private String id;
private String pid;
private int amount;
private String pname;
private String tflag;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public String getTflag() {
return tflag;
}
public void setTflag(String tflag) {
this.tflag = tflag;
}
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(id);
dataOutput.writeUTF(pid);
dataOutput.writeInt(amount);
dataOutput.writeUTF(pname);
dataOutput.writeUTF(tflag);
}
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readUTF();
this.pid = dataInput.readUTF();
this.amount = dataInput.readInt();
this.pname = dataInput.readUTF();
this.tflag = dataInput.readUTF();
}
@Override
public String toString() {
return id + '\t'+" " + pname +'\t'+" " + amount ;
}
}
public class TableMapper extends Mapper<LongWritable, Text,Text,TableBean> {
private String fileName;
private Text outKey = new Text();
private TableBean outValue = new TableBean();
/**
* 根据inputSplit获取切面名(文件名)
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {
FileSplit inputSplit = (FileSplit) context.getInputSplit();
fileName = inputSplit.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split("\t");
if(fileName.contains("order")) {
outKey.set(split[1]);
outValue.setId(split[0]);
outValue.setPid(split[1]);
outValue.setAmount(Integer.valueOf(split[2]));
outValue.setPname("");
outValue.setTflag("order");
}else {
outKey.set(split[0]);
outValue.setId("");
outValue.setPid(split[0]);
outValue.setAmount(0);
outValue.setPname(split[1]);
outValue.setTflag("pd");
}
context.write(outKey,outValue);
}
}
public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {
@Override
protected void reduce(Text key, Iterable<TableBean> values, Reducer<Text, TableBean, TableBean, NullWritable>.Context context) {
List<TableBean> resList = new ArrayList<TableBean>();
Map<String,String> nameMap = new HashMap <String,String>();
Iterator<TableBean> iterator = values.iterator();
while(iterator.hasNext()){
TableBean tableBean = iterator.next();
if("pd".equals(tableBean.getTflag())){
nameMap.put(tableBean.getPid(),tableBean.getPname());
}else{
TableBean dest = new TableBean();
try {
BeanUtils.copyProperties(dest,tableBean);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
}
resList.add(dest);
}
}
for (TableBean value : resList) {
value.setPname(nameMap.get(value.getPid()));
try {
context.write(value,NullWritable.get());
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
public class TableDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(TableDriver.class);
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job,new Path("D:\\hadoop\\reduceJoinInput"));
FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop\\reduceJoinOutput"));
job.waitForCompletion(true);
}
}
缺点:
这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,
Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。
解决方案:
Map端实现数据合并。
2、Map Join
Map Join适用于一张表十分小、一张表很大的场景。
在Reduce端处理过多的表,非常容易产生数据倾斜。
在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
###具体办法:采用DistributedCache
(1)在Mapper的setup阶段,将文件读取到缓存集合中。
(2)在Driver驱动类中加载缓存。
//缓存普通文件到Task运行节点。
job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
//如果是集群运行,需要设置HDFS路径
job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
//Map端join的逻辑不需要Reduce阶段,设置ReduceTask数量为0
job.setNumReduceTasks(0);
//读取缓存文件数据
mapper--->setup读取
public class TableMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
private Map<String,String> proMap = new HashMap<String,String>();
@Override
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
URI[] cacheFiles = context.getCacheFiles();
FileSystem fileSystem = FileSystem.get(context.getConfiguration());
FSDataInputStream inputStream = fileSystem.open(new Path(cacheFiles[0]));
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
String line;
while(StringUtils.isNotBlank(line = reader.readLine())){
String[] split = line.split("\t");
proMap.put(split[0],split[1]);
}
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split("\t");
String result = split[0]+" "+"\t"+proMap.get(split[1])+" "+"\t"+split[2];
context.write(new Text(result),NullWritable.get());
}
}
public class TableDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(TableDriver.class);
job.setMapperClass(TableMapper.class);
//没有reducerClass
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//将小文件读取到缓存
job.addCacheFile(new URI("file:///D:/hadoop/reduceJoinInput/pd.txt"));
FileInputFormat.setInputPaths(job,new Path("D:\\hadoop\\reduceJoinInput\\order.txt"));
FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop\\reduceJoinOutput1\\"));
//Map端join的逻辑不需要Reduce阶段,设置ReduceTask数量为0
job.setNumReduceTasks(0);
job.waitForCompletion(true);
}
}
五、ETL清洗
“ETL,是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过"
抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。
ETL一词较常用在数据仓库,但其对象并不限于数据仓库.
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。
清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。
public class EtlLogMapper extends Mapper<LongWritable,Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
//读取一行数据
String line = value.toString();
//数据清洗,判断数据是不是符合格式
Boolean judgeRes = validateLine(line);
//不满足格式 直接 跳过
if(!judgeRes)return;
//输出
context.write(value, NullWritable.get());
}
private Boolean validateLine(String line) {
//判断数据是否满足格式需要
return true;
}
}
public class EtlLogDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(EtlLogDriver.class);
job.setMapperClass(EtlLogMapper.class);
//没有reducerClass
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job,new Path("D:\\hadoop\\reduceJoinInput\\log.txt"));
FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop\\etlOutput\\"));
job.waitForCompletion(true);
}
}
六、MapReduce开发总结
1、输入数据接口:InputFormat
(1)默认使用的实现类是:TextInputFormat
(2)TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回。
(3)CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率。
2、逻辑处理接口:Mapper
用户根据业务需求实现其中三个方法:map()业务处理逻辑 setup()初始化 cleanup ()关闭资源
3、Partitioner分区
(1)有默认实现 HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;
key.hashCode()&Integer.MAXVALUE % numReduces
(2)如果业务上有特别的需求,可以自定义分区。
4、Comparable排序
(1)当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法。
(2)部分排序:对最终输出的每一个文件进行内部排序。
(3)全排序:对所有数据进行排序,通常只有一个Reduce。 (job.setNumReduceTasks(1))
(4)二次排序:排序的条件有两个。
5、Combiner合并
Combiner合并可以提高程序执行效率,减少IO传输。但是使用前提是必须不能影响原有的业务处理结果。
在map阶段处理===>同时也是解决数据倾斜的一个方法
6、逻辑处理接口:Reducer
用户根据业务需求实现其中三个方法:reduce() setup() cleanup ()
7、输出数据接口:OutputFormat
(1)默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对,向目标文本文件输出一行。
(2)用户还可以自定义OutputFormat。
七、压缩
7.1、概念
##优缺点:
压缩的优点: 以减少磁盘IO、减少磁盘存储空间。
压缩的缺点: 增加CPU开销。
##使用原则
运算密集型的Job,少用压缩
IO密集型的Job,多用压缩
压缩方式选择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切片。
压缩格式 | Hadoop自带? | 算法 | 文件扩展名 | 是否可切片 | 换成压缩格式后,原来的程序是否需要修改 |
---|---|---|---|---|---|
DEFLATE | 是,直接使用 | DEFLATE | .deflate | 否 | 和文本处理一样,不需要修改 |
Gzip | 是,直接使用 | DEFLATE | .gz | 否 | 和文本处理一样,不需要修改 |
bzip2 | 是,直接使用 | bzip2 | .bz2 | 是 | 和文本处理一样,不需要修改 |
LZO | 否,需要安装 | LZO | .lzo | 是 | 需要建索引,还需要指定输入格式 |
Snappy | 是,直接使用 | Snappy | .snappy | 否 | 和文本处理一样,不需要修改 |
压缩算法 | 原始文件大小 | 压缩文件大小 | 压缩速度 | 解压速度 |
---|---|---|---|---|
gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s |
bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s |
LZO | 8.3GB | 2.9GB | 49.3MB/s | 74.6MB/s |
Snappy | 250MB/s | 500MB/s |
Gzip压缩
优点:压缩率比较高;
缺点:不支持Split;压缩/解压速度一般;
Bzip2压缩
优点:压缩率高;支持Split;
缺点:压缩/解压速度慢。
Lzo压缩
优点:压缩/解压速度比较快;支持Split;
缺点:压缩率一般;想支持切片需要额外创建索引。
Snappy压缩
优点:压缩和解压缩速度快;
缺点:不支持Split;压缩率一般;
压缩位置选择
压缩可以在MapReduce作用的任意阶段启用。
参数 | 位置 | 默认值 | 阶段 | 建议 |
---|---|---|---|---|
io.compression.codecs | core-site.xml | 无,这个需要在命令行输入hadoop checknative查看 | 输入压缩 | Hadoop使用文件扩展名判断是否支持某种编解码器 |
mapreduce.map.output.compress | mapred-site.xml | false | mapper输出 | 这个参数设为true启用压缩 |
mapreduce.map.output.compress.codec | mapred-site.xml | org.apache.hadoop.io.compress.DefaultCodec | mapper输出 | 企业多使用LZO或Snappy编解码器在此阶段压缩数据 |
mapreduce.output.fileoutputformat.compress | mapred-site.xml | false | reducer输出 | 这个参数设为true启用压缩 |
mapreduce.output.fileoutputformat.compress.codec | mapred-site.xml | org.apache.hadoop.io.compress.DefaultCodec | reducer输出 | 使用标准工具或者编解码器,如gzip和bzip2 |
7.2、Map输出端压缩
即使你的MapReduce的输入输出文件都是未压缩的文件,你仍然可以对Map任务的中间结果输出做压缩,
因为它要写在硬盘并且通过网络传输到Reduce节点,对其压缩可以提高很多性能,这些工作只要设置两个属性即可,我们来看下代码怎么设置。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* Hadoop源码支持的压缩格式有:BZip2Codec、DefaultCodec
**/
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// 开启map端输出压缩
conf.setBoolean("mapreduce.map.output.compress", true);
// 设置map端输出压缩方式
conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class,CompressionCodec.class);
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割
String[] words = line.split(" ");
// 3 循环写出
for(String word:words){
k.set(word);
context.write(k, v);
}
}
}
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
// 1 汇总
for(IntWritable value:values){
sum += value.get();
}
v.set(sum);
// 2 输出
context.write(key, v);
}
}
7.3、Reduce输出端压缩
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* Mapper和Reducer保持不变
**/
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置reduce端输出压缩开启
FileOutputFormat.setCompressOutput(job, true);
// 设置压缩的方式
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
// FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
// FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}