Hadoop3:Yarn的Tool接口案例

发布于:2024-07-06 ⋅ 阅读:(29) ⋅ 点赞:(0)

一、需求

依然以wordcount案例为基础,进行开发
我们知道,用hadoop自带的example.jar执行wordcount
命令如下

hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount -D mapreduce.job.queuename=hive /input /output1

这个命令中, 首先,指定方法名参数,然后,指定了队列-D参数,最后,指定输入文件和输出目录。

那么我们自己开发的wc.jar可以这样用吗?

二、测试之前的wc.jar

命令1

hadoop jar wc.jar com.atguigu.mapreduce.wordcount.WordCountDriver wordcount -D mapreduce.job.queuename=hive /input /output1

报错
在这里插入图片描述
命令2

hadoop jar wc.jar com.atguigu.mapreduce.wordcount.WordCountDriver -D mapreduce.job.queuename=hive /input /output1

报错
在这里插入图片描述
可以发现,我们自己开发的jar,不能,像hadoop自带的jar一样,带上其他各种参数。

三、实现Tool接口的jar

WordCount

package com.atguigu.mapreduce.tool;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;

import java.io.IOException;

public class WordCount implements Tool {

    private Configuration conf;

    // 核心驱动(conf 需要传入)
    @Override
    public int run(String[] args) throws Exception {

        Job job = Job.getInstance(conf);

        job.setJarByClass(WordCountDriver.class);

        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    @Override
    public void setConf(Configuration conf) {
        this.conf = conf;
    }

    @Override
    public Configuration getConf() {

        return conf;
    }

    // mapper
    public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

        private Text outK = new Text();
        private IntWritable outV = new IntWritable(1);

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // ss  cls
            // 1 获取一行
            String line = value.toString();

            // 2 切割
            String[] words = line.split(" ");

            // 3 循环遍历写出
            for (String word : words) {
                outK.set(word);

                context.write(outK, outV);
            }

        }
    }

    // reducer
    public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable outV = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }

            outV.set(sum);

            context.write(key, outV);
        }
    }
}

WordCountDriver

package com.atguigu.mapreduce.tool;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.util.Arrays;

public class WordCountDriver {

    private static Tool tool;

    public static void main(String[] args) throws Exception {

        // 创建配置
        Configuration conf = new Configuration();

        switch (args[0]){
            case "wordcount":
                tool = new WordCount();
                break;
            default:
                throw new RuntimeException("no such tool "+ args[0]);
        }

        // 执行程序
        int run = ToolRunner.run(conf, tool, Arrays.copyOfRange(args, 1, args.length));

        System.exit(run);
    }
}

测试命令

hadoop jar wc.jar com.atguigu.mapreduce.tool.WordCountDriver wordcount -D mapreduce.job.queuename=hive /input /output1

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到