MapReduce 学习

发布于:2025-07-09 ⋅ 阅读:(31) ⋅ 点赞:(0)

MapReduce 的过程 :

map shuffle reduce 

其中,程序员需要实现的内容是:

程序员手动实现 Map 任务的具体逻辑,将数据根据Map代码进行分割,返回 (key, value) 键值对

然后这些 (Key,Values)键值对 先会被存放到磁盘,然后由 MapReduce 按照 Key ,进行排序,排序原则为,将同一个 Key 的键值对组织到一起,然后将同Key的键值对组,按照Key排序。而后将每个Map节点上 找出 相同的Key ,将 键值对 移送到 Reduce节点 处。即,把分散在不同 Map 节点上的同一个 key 的数据汇集到同一个 Reduce 节点。

程序员手动实现 Reduce 任务的具体逻辑,对有着同样 Key的键值对 做对应操作,最后返回 (key,result)

为什么要做 按照 key 对 每个Map节点的键值对做排序:

因为 如果不对键值对的key进行排序,只是将每个Map节点的相同 key 的键值对顺序组织在一起,那么,在将 相同key的键值对从Map节点进行 shuffle 到 Reduce 节点时,并不清楚何时才能将同一个 key的键值对从各个Map节点都取走了,

例如 : Map 1 节点: (b, 1)  (b, 1)  (b, 1)  (a, 1) (a, 1)  (a, 1)  (a, 1)  (c, 1) (c, 1)

Map 2 节点 :(c, 1) (c, 1) (a, 1) (a, 1)  (a, 1)  (a, 1) (b, 1)  (b, 1)  (b, 1) 

现在想把 所有的 key 为 a 的聚合,那么,得先扫一遍 Map 1节点,把所有的 key=a 挑出来,然后再扫一遍 Map 2节点,把所有的key=a 的挑出来,

如果按照 key 排序,那就不用全扫了,直接是有序列表合并问题了

Map 1 节点: (a, 1) (a, 1)  (a, 1)  (a, 1)  (b, 1)  (b, 1)  (b, 1)  (c, 1) (c, 1)

Map 2 节点 :(a, 1) (a, 1)  (a, 1)  (a, 1) (b, 1)  (b, 1)  (b, 1) (c, 1) (c, 1) 

shuffle 的意义来源于:

  • Reduce 的逻辑是「同一个 key 下所有 value 做汇总」。

  • 如果同一个 key 的 value 在不同机器上,就必须先汇总到一处才能计算。

自定义输入格式、输出格式

  • 默认 TextInputFormat:按行切

  • 但可以自定义:WholeFileInputFormat, SequenceFileInputFormat, Avro...

  • 输出格式也是同理:TextOutputFormat, SequenceFileOutputFormat

  • 用于支持二进制文件、特殊格式等

Map 输出的排序和分组

  • Map 输出本地先排序(spill 时)

  • Reduce 阶段框架再做归并排序

  • 分组:Reduce 是按 key 分组

  • 可以自定义 Comparator,改变排序和分组的方式(比如 secondary sort)

Secondary Sort

  • 有时一个 key 对应的多个 value 要按某个字段再排序

  • 比如 key 是 user_id,value 是 (time, action)

  • 要按 user_id 分组,但希望 value 按 time 排序

  • 这需要自定义 CompositeKey + 自定义分组 Comparator

Combiner 的意义与局限

  • Combiner 是可选的本地 Reduce:在 Map 节点本地先聚合

  • 比如 WordCount,Map 输出大量 ("the", 1),Combiner 可以先本地加总成 ("the", 100)

  • 局限:结果必须是相同的(Combine 结果不影响最终正确性),否则不能用

  • 不能跨 Map 节点,只能本地

Combiner 的意义 :用来减少网络传输量

Partition 与分区策略

  • Map 输出的所有 key,要决定发给哪个 Reduce

  • 默认是 hash(key) % num_reduces

  • 但可以自定义 Partition,比如让 A、M 的 key 到 Reduce0,N、Z 到 Reduce1

  • 用于控制负载均衡或业务语义

数据倾斜、资源调度

  • 数据倾斜:某些 key 特别热门,导致某个 Reduce 很慢

  • 解决:更细粒度分区、预聚合、抽样分区

  • 资源调度:YARN 负责任务调度,合理配置资源

简单的 word count 例子 :

目录结构

src/main/java
└── org.example.wordcount
    ├── WordCountMapper.java
    ├── WordCountReducer.java
    ├── WordCountCombiner.java (可选)
    └── WordCountDriver.java

1) Mapper

package org.example.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper 
     extends Mapper<LongWritable, Text, Text, IntWritable> {

    private final static IntWritable ONE = new IntWritable(1);
    private Text word = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // key 是行偏移量, value 是行内容
        String line = value.toString();
        String[] words = line.split("\\s+"); // 按空格切
        for (String w : words) {
            word.set(w);
            context.write(word, ONE);
        }
    }
}

2) Reducer

package org.example.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer 
     extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

 3) Combiner(可选)

其实和 Reducer 一样,只是加快本地聚合

package org.example.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountCombiner 
     extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

驱动类 WordCountDriver 不写了,需要再查吧,

总结

  • Mapper:对每行做拆词,输出 (word, 1)

  • Reducer:对同一个 word 的所有 1 相加

  • Combiner:在 Map 节点本地先加一遍