Mapreduce初使用

发布于:2025-05-15 ⋅ 阅读:(13) ⋅ 点赞:(0)

MapReduce基本介绍

MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。

MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序(例如:jar包),并发运行在一个Hadoop集群上。

MapReduce程序被分为Map(映射) 阶段和Reduce(化简)阶段。把计算任务分发到数据节点进行运算;Map会处理本节点的原始数据,产生的数据会临时保存到本地磁盘,那么每个节点会得到一部分结果(因为节点上的数据是一部分数据)。Reduce是会跨节点fetch属于自己的数据,并进行处理,把结果进行汇总,存储到HDFS。

核心思想

分而治之,并行计算

移动计算,而非移动数据。数据在各个节点上,我们把计算任务移动上去,而不是移动数据。

MapReduce工作过程

我们来通过一个例子来看看mapreduce的大致工作过程。假设很多份的英文资料,我们要对英语单词进行分拣:统计以a-p,或者q-z开头的单词,单独把他们放在两个不同的文件中。

说明如下:

  1. 绿色的文件有200M,并分成两个块。红色的文件有100M,所以一共分成了3个块。启动3个Map任务。
  2. 每个Map任务读取数据,按行处理,按空格进行切分,组成KV键值对,单词为键,1为值,将键值对保存到磁盘。其他的mapTask也会去生成这样的文件,这个文件的内容会有两个部分:a-p是一部分,q-z是另一部分。
  3. reducer任务。根据具体的需求去启对应的数量的reducerTask(这里需要两个),每个ReducerTash会去每个节点上去拉取自己需要的数据。运行reduce程序,保存数据。

MapReduce的两个阶段

第一阶段,也称之为 Map 阶段。这个阶段会有若干个 MapTask 实例,完全并行运行,互不相干。每个 MapTask 会读取分析一个 Inputsplit (输入分片,简称分片) 对应的原始数据。计算的结果数据会临时保存到所在节点的本地磁盘里。

该阶段的编程模型中会有一个 map 函数需要开发人员重写,map 函数的输入是一个 < key,value > 对,map 函数的输出也是一个 < key,value > 对,key和value的类型需要开发人员指定。

第二阶段,也称为 Reduce 阶段。这个阶段会有若干个 ReduceTask 实例并发运行,互不相干。但是它们的数据依赖于上一个阶段所有 mapTask 并发实例的输出。一个 ReduceTask 会从多个 MapTask 运行节点上 fetch 自己要处理的分区数据,经过处理后,输出到 HDFS 上。

该阶段编程模型中有一个 reduce 函数需要开发人员重写,reduce 函数的输入也是一个 <key, value> 对,reduce 函数的输出也是一个 < key,List<value>> 对。需要强调的是,reduce 的输入其实就是 map 的输出,只不过 map 的输出经过 shuffle 技术后变成了<key, List<Value>>而已。参考下图:

注意:MapReduce编程模型只能包含一个map

MapReduce的编程规范

用户编写MapReduce程序的时候,需要设计至少三个类:Mapper, Reducer, Driver(用于提交MR的任务)。

用户编写的程序分成三个部分:Mapper、Reducer和Driver。

1. Mapper

(1) 用户自定义类,继承org.apache.hadoop.mapreduce.Mapper类

(2) 定义K1,V1,K2,V2的泛型(K1,V1是Mapper的输入数据;K2,V2Mapper的输出数据类型

(3) 写map方法的处理逻辑 

LongWritable: 即K1的数据类型,表示读取到的一行数据的行偏移量,只能设置为LongWritable类型。

注意:Map方法,每一个键值对都会调用一次。

2. Reducer

(1) 用户自定义类,继承org.apache.hadoop.mapreduce.Reducer 

(2) 定义K2,V2,K3,V3的泛型(K2,V2是Reducer的输入数据数据;K3,V3Reducer的输出数据类型

(3) reduce方法的处理逻辑

3.Driver

MapReduce的程序,需要进行执行之前的属性配置与任务的提交,这些操作都需要在Driver类中来完成。相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象

WordCount案例分析

给定一个路径,统计这个路径下所有的文件中的每一个单词的出现次数。

其中,需要我们去实现代码的部分是:map函数和reduce函数。它们各自的作用是:

map函数的入参是kv结构,k是偏移量,v是一行的具体内容。map函数的返回值格式也是kv结构,k是每个单词,v是数字1。

reduce函数的入参是kv结构,k是单词,v是集合,每个元素值都是1。reduce函数的返回值格式也是kv结构,k是每个单词,v是汇总之后的数字。

第二课时

WordCount案例实操-编码实现

准备maven工程,具体要求和之前的一致。具体操作如下:

1.新建一个空白项目

2.配置maven

3.创建三个类。

每个类的具体内容如下。

1.编写Mapper类

核心要点是:

  1. 继承Mapper类。约定泛型<keyIn,ValueIn,KeyOut,ValueOut>
  2. 重写map方法(keyIn, ValueIn,Content<KeyOut, Key>)

我们来看下代码。

package com.example.mapreduce;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

// 继承Mapper

public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 获取一行数据,用空格拆分为一个个单词

        String[] words = value.toString().split(" ");

        // 遍历单词,设置键值对,值为1

        for (String word : words) {

            context.write(new Text(word), new LongWritable(1));

        }

    }

}

代码说明:

  1. LongWritable是固定写法。它表示读取到一行的偏移量。
  2. LongWritable, Text是hadoop的数据类型。

编写Reducer类

核心要点是:

  1. 继承Reducer类。约定泛型<keyIn,ValueIn,KeyOut,ValueOut>
  2. 重写reduce方法(keyIn, Iterable<ValueIn>,Content<KeyOut, Key>)

代码如下:

package com.example.mapreduce;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

// 继承 reducer类
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        // 对 values中的值进行累加求和
        long sum = 0;
        for (LongWritable value : values) {
            sum += value.get();
        }
        // 输出结果
        context.write(key, new LongWritable(sum));
    }
}

请注意Mapper的来源,它是mapreduce.Mapper,而不是mapred.Mapper。后者是hadoop的老版本用法。

编写Driver驱动类

Driver类负责提交job。它的核心代码有7个步骤,属于固定写法。这七个步骤分别如下:

  1. 获取job对象
  2. 关联本地Driver类的jar
  3. 关联map和reduce
  4. 设置map的输出kv类型
  5. 设置reduce的输出kv类型
  6. 设置输入数据和输出结果的地址
  7. 提交job。

下面我们一起来编写这份代码。

参考代码如下。

package com.example.mapreduce;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {

    // mapreduce的Driver

    // 提交job

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        // 1. 获取配置信息以及获取job对象

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        // 2. 关联本地的jar

        job.setJarByClass(WordCountDriver.class);

        // 3. 关联Mapper和Reducer

        job.setMapperClass(WordCountMapper.class);

        job.setReducerClass(WordCountReducer.class);

        // 4. 设置Mapper输出的KV类型

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(LongWritable.class);

        // 5. 设置最终输出KV类型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(LongWritable.class);

        // 6. 设置输入和输出路径。请确保wcinput是存在的,并且下面有待统计词频的单词文件。
        //                         output1会自动被创建,如果它已经存在,程序会报错!

        FileInputFormat.setInputPaths(job, new Path("D://vm//wcinput"));

        FileOutputFormat.setOutputPath(job, new Path("D://vm//output1"));

        // 7. 提交job

        boolean b = job.waitForCompletion(true);

        System.exit(b ? 0 : 1);

    }

}

WordCount代码运行

在本地运行代码,在本地查看效果。此时要注意,我们的程序并没有使用集群中的资源,在yarn中看不到运行的任务,我们也没有把结果保存在hdfs中。

三、课堂小结

通过本堂课的学习,我们学习了mapreduce的基本概念,并编写了第一个mapreduce程序。

四、实训内容

任务:完成wordcount程序的编写

[巡班并及时解决学生问题]


网站公告

今日签到

点亮在社区的每一天
去签到