MapReduce简单应用(三)——高级WordCount

发布于:2025-02-11 ⋅ 阅读:(7) ⋅ 点赞:(0)


  本文引用的Apache Hadoop源代码基于Apache许可证 2.0,详情请参阅 Apache许可证2.0

1. 高级WordCount

  文本内容就是下文2.3中的代码,目标是要实现文本计数,并且数量在前,文本在后,同时数量要升序排列。

1.1 IntWritable降序排列

  IntWritable类型中实现一个升序排列的比较器,代码如下。而实现IntWritable降序排序只需要定义一个新类,继承IntWritable.Comparator,并且重载public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2),使其返回值为父类该方法返回值的相反数。此外,如果你想要让作为键的IntWritable类型进行降序排列,还需要在MapReduce任务调度代码中设置Job.setSortComparatorClass(比较器.class)

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

  public static class Comparator extends WritableComparator {
    public Comparator() {
      super(IntWritable.class);
    }
    
    @Override
    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
      int thisValue = readInt(b1, s1);
      int thatValue = readInt(b2, s2);
      return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
    }
  }

1.2 输入输出格式

java类名 输入/输出 功能
org.apache.hadoop.mapreduce.lib.input.TextInputFormat MapReduce默认的输入格式 将输入文件按行分割,每一行作为<key, value>对,其中key是行的偏移量(从0开始),value 是行的内容
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat MapReduce默认的输出格式 将输出写成文本文件,每个<key, value>对占一行,key和value之间用制表符(\t)分隔
org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat SequenceFile的输入格式 读取Hadoop的二进制文件格式SequenceFile
org.apache.hadoop.mapreduce.lib.input.SequenceFileOutputFormat SequenceFile的输出格式 将输出写成Hadoop的二进制文件格式SequenceFile

  (Hadoop定义的SequenceFile是一种高效、可分割的二进制文件格式,支持压缩)
  (Hadoop定义了好多输入输出格式,由于我没有详细使用,这里就不介绍了)
  如果要进行多次MapReduce作业,中间结果可以以SequenceFile的形式存储,加速作业的运行。

1.3 处理流程

  首先高级WordCount也要像普通WordCount一样对文本进行计数,因此Reduce函数输入的键值对为<Text,IntWritable>。而最终要求的结果键值对为<IntWritable, Text>,如果把Reduce函数的输出键值对直接变为<IntWritable, Text>并且在该任务中只使用一个作业的话,你会发现无法完成IntWritable降序排列(尽管你可以已经设置SortComparatorClass),那是因为Shuffle过程的排序只会发生在Map结束后Reduce发生前,这时键的类型是Text而非IntWritable。
  为了解决这个任务,需要进行两次作业,第一次作业负责计数,并以SequenceFile的格式输出,Map的输出、Reduce的输入和输出均为<Text, IntWritable>,最终文件输出格式选择SequenceFileOutputFormat;第二次作业负责交换键值对,并以SequenceFile的个数读入,然后再对键进行降序排列,这就需要使用Hadoop自带的org.apache.hadoop.mapreduce.lib.map.InverseMapper,它能交换键值对。这次作业的输入格式选择SequenceFileInputFormat,Map输入和Map输出分别是<Text, IntWritable>、<IntWritable, Text>,这时设置SortComparatorClass就可以实现IntWritable降序排列。

2. 代码和结果

2.1 pom.xml中依赖配置

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>3.3.6</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>3.3.6</version>
      <type>pom</type>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
      <version>3.3.6</version>
    </dependency>
  </dependencies>

2.2 工具类util

import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class util {
    public static FileSystem getFileSystem(String uri, Configuration conf) throws Exception {
        URI add = new URI(uri);
        return FileSystem.get(add, conf);
    }

    public static void removeALL(String uri, Configuration conf, String path) throws Exception {
        FileSystem fs = getFileSystem(uri, conf);
        if (fs.exists(new Path(path))) {
            boolean isDeleted = fs.delete(new Path(path), true);
            System.out.println("Delete Output Folder? " + isDeleted);
        }
    }

    public static void removeALL(String uri, Configuration conf, String[] pathList) throws Exception {
        FileSystem fs = getFileSystem(uri, conf);
        for (String path : pathList) {
            if (fs.exists(new Path(path))) {
                boolean isDeleted = fs.delete(new Path(path), true);
                System.out.println(String.format("Delete %s? %s", path, isDeleted));
            }
        }
    }

    public static void showResult(String uri, Configuration conf, String path) throws Exception {
        FileSystem fs = getFileSystem(uri, conf);
        String regex = "part-r-";
        Pattern pattern = Pattern.compile(regex);

        if (fs.exists(new Path(path))) {
            FileStatus[] files = fs.listStatus(new Path(path));
            for (FileStatus file : files) {
                Matcher matcher = pattern.matcher(file.getPath().toString());
                if (matcher.find()) {
                    System.out.println(file.getPath() + ":");
                    FSDataInputStream openStream = fs.open(file.getPath());
                    IOUtils.copyBytes(openStream, System.out, 1024);
                    openStream.close();
                }
            }
        }
    }
}

2.3 高级WordCount

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class App {
    public static class IntWritableDecreaseingComparator extends IntWritable.Comparator {
        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return -super.compare(b1, s1, l1, b2, s2, l2);
        }
    }

    public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] splitStr = value.toString().split("\\s+");
            for (String str : splitStr) {
                context.write(new Text(str), new IntWritable(1));
            }
        }
    }

    public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String tempPath = "hdfs://localhost:9000/user/developer/Temp";
        String[] myArgs = {
                "file:///home/developer/CodeArtsProjects/advanced-word-count/AdvancedWordCount.txt",
                "hdfs://localhost:9000/user/developer/AdvancedWordCount/output"
        };
        util.removeALL("hdfs://localhost:9000", conf, new String[] { tempPath, myArgs[myArgs.length - 1] });
        Job job = Job.getInstance(conf, "AdvancedWordCount");
        job.setJarByClass(App.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setNumReduceTasks(2);
        for (int i = 0; i < myArgs.length - 1; i++) {
            FileInputFormat.addInputPath(job, new Path(myArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(tempPath));
        int res1 = job.waitForCompletion(true) ? 0 : 1;
        if (res1 == 0) {
            Job sortJob = Job.getInstance(conf, "Sort");
            sortJob.setJarByClass(App.class);
            sortJob.setMapperClass(InverseMapper.class);
            sortJob.setInputFormatClass(SequenceFileInputFormat.class);
            sortJob.setOutputKeyClass(IntWritable.class);
            sortJob.setOutputValueClass(Text.class);
            sortJob.setSortComparatorClass(IntWritableDecreaseingComparator.class);
            FileInputFormat.addInputPath(sortJob, new Path(tempPath));
            FileOutputFormat.setOutputPath(sortJob, new Path(myArgs[myArgs.length - 1]));
            int res2 = sortJob.waitForCompletion(true) ? 0 : 1;
            if (res2 == 0) {
                System.out.println("高级WordCount结果为:");
                util.showResult("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);
            }
            System.exit(res2);
        }
        System.exit(res1);
    }
}

2.4 结果

在这里插入图片描述
  结果文件内容如下:

64
14      {
13      }
12      import
8       int
7       public
7       =
4       static
4       class
4       -
4       new
4       @Override
3       for
3       :
3       void
3       throws
3       extends
2       l1,
2       1;
2       0;
2       String[]
2       s2,
2       s1,
2       i
2       context.write(new
2       context)
2       conf,
2       InterruptedException
2       key,
2       IntWritable,
2       return
2       IOException,
2       b2,
2       sum
2       Context
2       protected
2       myArgs[myArgs.length
2       Text,
2       1]);
1       };
1       values,
1       values)
1       value.toString().split("\\s+");
1       value,
1       val.get();
1       val
1       util.showResult("hdfs://localhost:9000",
1       util.removeALL("hdfs://localhost:9000",
1       str
1       splitStr)
1       splitStr
1       res
1       reduce(Text
1       org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
1       org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
1       org.apache.hadoop.mapreduce.Reducer;
1       org.apache.hadoop.mapreduce.Mapper;
1       org.apache.hadoop.mapreduce.Job;
1       org.apache.hadoop.io.WritableComparable;
1       org.apache.hadoop.io.Text;
1       org.apache.hadoop.io.LongWritable;
1       org.apache.hadoop.io.IntWritable;
1       org.apache.hadoop.fs.Path;
1       org.apache.hadoop.conf.Configuration;
1       myArgs.length
1       myArgs
1       map(LongWritable
1       main(String[]
1       l2);
1       l2)
1       key);
1       job.waitForCompletion(true)
1       job.setSortComparatorClass(IntWritableDecreaseingComparator.class);
1       job.setReducerClass(MyReducer.class);
1       job.setOutputValueClass(Text.class);
1       job.setOutputKeyClass(IntWritable.class);
1       job.setMapperClass(MyMapper.class);
1       job.setJarByClass(App.class);
1       job.setCombinerClass(MyReducer.class);
1       job
1       java.io.IOException;
1       if
1       i++)
1       compare(byte[]
1       compare(WritableComparable
1       byte[]
1       b1,
1       b);
1       b)
1       args)
1       a,
1       WritableComparable
1       Text>
1       Text(str),
1       Text
1       System.out.println("高级WordCount结果为:");
1       System.exit(res);
1       Reducer<Text,
1       Path(myArgs[myArgs.length
1       Path(myArgs[i]));
1       MyReducer
1       MyMapper
1       Mapper<LongWritable,
1       Job.getInstance(conf,
1       Job
1       Iterable<IntWritable>
1       IntWritableDecreaseingComparator
1       IntWritable>
1       IntWritable.Comparator
1       IntWritable(sum),
1       IntWritable(1));
1       FileOutputFormat.setOutputPath(job,
1       FileInputFormat.addInputPath(job,
1       Exception
1       Configuration();
1       Configuration
1       App
1       ?
1       ==
1       <
1       1]));
1       0)
1       0
1       -super.compare(b1,
1       -super.compare(a,
1       +=
1       (res
1       (int
1       (String
1       (IntWritable
1       "hdfs://localhost:9000/user/developer/AdvancedWordCount/output"
1       "file:///home/developer/CodeArtsProjects/AdvancedWordCount.txt",
1       "AdvancedWordCount");
1       conf

参考


网站公告

今日签到

点亮在社区的每一天
去签到