【WordCount】MR、Spark、Flink实现WordCount

发布于:2023-01-04 ⋅ 阅读:(333) ⋅ 点赞:(0)

目录

1、需求

2、MapReduce

3、Spark

4、Flink批处理

5、Flink流处理


1、需求

        在给定的文本文件中统计输出每一个单词出现的总次数

输入数据

hello java
hello scala
hello hadoop
hello spark
hello flink
hello spark
hello flink

输出数据

hello    7
java     1
scala    1
hadoop   1
spark    2
flink    2

2、MapReduce

Mapper

(1)将MapTask传送的文本内容换成String

        hello java

(2)根据空格将这一行分成单词

        hello

        java

(3)将单词输出为<单词,1>

        hello,1

        java,1

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    Text k = new Text();
    IntWritable v = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
        // 1 获取一行
        String line = value.toString();
        // 2 切割
        String[] words = line.split(" ");
        // 3 输出
        for (String word : words) {
            k.set(word);
            context.write(k, v);
        }
    }
}

Reduce

(1)汇总各个key的个数

        hello,1

        hello,1

(2)输出该key的总次数

        hello,2

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    int sum;
    IntWritable v = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
        // 1 累加求和
        sum = 0;
        for (IntWritable count : values) {
            sum += count.get();
        }
        // 2 输出
        v.set(sum);
        context.write(key,v);
    }
}

Driver

(1)获取配置信息,获取job对象实例

(2)指定本程序的jar包所在的本地路径 

(3)关联Mapper/Reducer业务类

(4)指定Mapper输出数据的kv类型

(5)指定最终输出的数据的kv类型

(6)指定job的输入原始文件所在目录

(7)指定job的输出结果所在目录

(8)提交作业

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1 获取配置信息以及获取 job 对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 关联本 Driver 程序的 jar
        job.setJarByClass(WordCountDriver.class);

        // 3 关联 Mapper 和 Reducer 的 jar
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 4 设置 Mapper 输出的 kv 类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5 设置最终输出 kv 类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 提交 job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

3、Spark

// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

// 创建 Spark 上下文环境对象(连接对象)
val sc : SparkContext = new SparkContext(sparkConf)

// 读取文件数据
val fileRDD: RDD[String] = sc.textFile("input/word.txt")

// 将文件中的数据进行分词
val wordRDD: RDD[String] = fileRDD.flatMap( _.split(" ") )

// 转换数据结构 word => (word, 1)
val word2OneRDD: RDD[(String, Int)] = wordRDD.map((_,1))

// 将转换结构后的数据按照相同的单词进行分组聚合
val word2CountRDD: RDD[(String, Int)] = word2OneRDD.reduceByKey(_+_)

// 将数据聚合结果采集到内存中
val word2Count: Array[(String, Int)] = word2CountRDD.collect()

// 打印结果
word2Count.foreach(println)

//关闭 Spark 连接
sc.stop()

4、Flink批处理

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {
     public static void main(String[] args) throws Exception {
         // 1. 创建执行环境
         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

         // 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)
         DataSource<String> lineDS = env.readTextFile("input/words.txt");

         // 3. 转换数据格式
         FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS
             .flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
                 String[] words = line.split(" ");
                 for (String word : words) {
                     out.collect(Tuple2.of(word, 1L));
                 }
             })
             .returns(Types.TUPLE(Types.STRING, Types.LONG)); //当 Lambda 表达式使用 Java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息

         // 4. 按照 word 进行分组
         UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);

         // 5. 分组内聚合统计
         AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);

         // 6. 打印结果
         sum.print();
     }
}

5、Flink流处理

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;

public class StreamWordCount {
     public static void main(String[] args) throws Exception {
         // 1. 创建流式执行环境
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

         // 2. 读取文本流
         DataStreamSource<String> lineDSS = env.socketTextStream("hadoop102", 7777);

         // 3. 转换数据格式
         SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
             .flatMap((String line, Collector<String> words) -> {
                 Arrays.stream(line.split(" ")).forEach(words::collect);
             })
             .returns(Types.STRING)
             .map(word -> Tuple2.of(word, 1L))
             .returns(Types.TUPLE(Types.STRING, Types.LONG));

         // 4. 分组
         KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
             .keyBy(t -> t.f0);

         // 5. 求和
         SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
             .sum(1);

         // 6. 打印
         result.print();

         // 7. 执行
         env.execute();
     }
}

本文含有隐藏内容,请 开通VIP 后查看