flink 批处理和流式 wordcount

发布于:2023-01-21 ⋅ 阅读:(142) ⋅ 点赞:(0)

flink wordcount程序

批处理的wordcount
流式处理的wordcount

批处理的wordCount

1 创建maven项目

2 配置pom文件的依赖及scala编译插件和maven打依赖包插件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink_wordcount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.scope>provided</flink.scope>
        <flink.version>1.14.4</flink.version>
        <scala.binary.version>2.11</scala.binary.version>

    </properties>

    <dependencies>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
        <!--        1.1.0之后需要加flink-clients依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <configuration>
                    <recompileMode>incremental</recompileMode>
                    <addScalacArgs>-target:jvm-1.7</addScalacArgs>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

3 在source目录下创建源文件wordcount.txt

hello,wao,yarning
hai,are,you,speak,Chinese
are,you,kidding,me,!
i,like,telling,a,story,!
i,hive,wine,and,do,you,hive,story

4 创建一个类WordCount

package com.test.wc

// 引入api包下的文件,以免隐式转换报错
import org.apache.flink.api.scala._

/** @ClassName WordCount.java
 * @author admin
 * @version 1.0.0
 * @Description TODO
 * @createTime 2022年08月10日 20:50:00
 */
object WordCount {
  def main(args: Array[String]): Unit = {
    //创建一个批处理环境
    val env:ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    //获取一个DataSet
    val inputPath:String = "C:\\personal\\scala_project\\flink_wordcount\\src\\main\\resources\\wordcount.txt"
    val inpourCsv :DataSet[String]= env.readTextFile(inputPath)

    // 
    val result:DataSet[(String,Int)] = inpourCsv.flatMap(_.split(","))  // 将每行以逗号进行切割,并且进行扁平化操作,一个单词编程一个独立的个体
      .map((_, 1))  // 将每个单词变成一个元组(单词,1)
      .groupBy(0)  // 对元组的第一个元素进行分组
      .sum(1) // 对元组的第二个元素进行求和

    result.print()  // 对DataSet的信息进行打印
  }
}

运行结果

(telling,1)
(a,1)
(wao,1)
(yarning,1)
(hai,1)
(hello,1)
(story,2)
(and,1)
(wine,1)
(i,2)
(speak,1)
(like,1)
(hive,2)
(you,3)
(Chinese,1)
(kidding,1)
(do,1)
(!,2)
(are,2)
(me,1)

流式处理的wordCount

1 创建maven项目

2 配置pom文件的依赖及scala编译插件和maven打依赖包插件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink_wordcount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.scope>provided</flink.scope>
        <flink.version>1.14.4</flink.version>
        <scala.binary.version>2.11</scala.binary.version>

    </properties>

    <dependencies>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
        <!--        1.1.0之后需要加flink-clients依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <configuration>
                    <recompileMode>incremental</recompileMode>
                    <addScalacArgs>-target:jvm-1.7</addScalacArgs>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

4 创建一个类StreamWordCount

package com.test.wc

import org.apache.flink.streaming.api.scala._

/**
 * @ClassName StreamWordCount.java
 * @author admin
 * @version 1.0.0
 * @Description TODO
 * @createTime 2022年08月10日 22:26:00
 */
object StreamWordCount {
  def main(args: Array[String]): Unit = {

    // 创建流执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 创建一个socket监听作为输入
    val inputSock:DataStream[String] = env.socketTextStream("localhost", 1313)

    // 对DataStream进行转换
    val resultDataStream: DataStream[(String, Int)] = inputSock.flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    resultDataStream.print()

    env.execute()
  }
}

window系统中下载一个nc并解压将nc.exe放在Windows/System32目录下 https://eternallybored.org/misc/netcat/

进入cmd,执行 下面的命令闯进一个socket连接

nc -l -p 1313

启动flink程序

在nc监听界面输入下面的内容
===》
hello world
hello scala
are you kidding me !
are you kidding you !

===> 查看idea控制台输出的信息,flink运行输出结果
5> (hello,1)
9> (world,1)
1> (scala,1)
5> (hello,2)
10> (you,1)
4> (!,1)
12> (kidding,1)
8> (are,1)
8> (me,1)
10> (you,2)
4> (!,2)
10> (you,3)
8> (are,2)
12> (kidding,2)

流式wordCount程序的传参版本

package com.test.wc

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

/**
 * @ClassName StreamWordCount.java
 * @author admin
 * @version 1.0.0
 * @Description TODO
 * @createTime 2022年08月10日 22:26:00
 */
object StreamWordCount {
  def main(args: Array[String]): Unit = {

    // 创建流执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // flink提供的用于获取参数的静态工具类
    val params: ParameterTool = ParameterTool.fromArgs(args)

    // 获取参数中的host
    val host: String = params.get("host")

    // 获取参数中的port
    val port: Int = params.getInt("port")

    // 创建一个socket监听作为输入
    val inputSock:DataStream[String] = env.socketTextStream(host,port)

    // 对DataStream进行转换
    val resultDataStream: DataStream[(String, Int)] = inputSock.flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    resultDataStream.print()

    env.execute()
  }
}

idea传参

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dnZlMwBW-1660229037442)(images/1660228417933-avf.png)]

运行结果

5> (hello,1)
13> (flink,1)
8> (are,1)
8> (now,1)
6> (doing,1)
10> (you,1)
7> (what,1)
10> (you,2)
7> (what,2)