flink wordcount程序
批处理的wordCount
1 创建maven项目
2 配置pom文件的依赖及scala编译插件和maven打依赖包插件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink_wordcount</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.scope>provided</flink.scope>
<flink.version>1.14.4</flink.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 1.1.0之后需要加flink-clients依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<recompileMode>incremental</recompileMode>
<addScalacArgs>-target:jvm-1.7</addScalacArgs>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
3 在source目录下创建源文件wordcount.txt
hello,wao,yarning
hai,are,you,speak,Chinese
are,you,kidding,me,!
i,like,telling,a,story,!
i,hive,wine,and,do,you,hive,story
4 创建一个类WordCount
package com.test.wc
// 引入api包下的文件,以免隐式转换报错
import org.apache.flink.api.scala._
/** @ClassName WordCount.java
* @author admin
* @version 1.0.0
* @Description TODO
* @createTime 2022年08月10日 20:50:00
*/
object WordCount {
def main(args: Array[String]): Unit = {
//创建一个批处理环境
val env:ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//获取一个DataSet
val inputPath:String = "C:\\personal\\scala_project\\flink_wordcount\\src\\main\\resources\\wordcount.txt"
val inpourCsv :DataSet[String]= env.readTextFile(inputPath)
//
val result:DataSet[(String,Int)] = inpourCsv.flatMap(_.split(",")) // 将每行以逗号进行切割,并且进行扁平化操作,一个单词编程一个独立的个体
.map((_, 1)) // 将每个单词变成一个元组(单词,1)
.groupBy(0) // 对元组的第一个元素进行分组
.sum(1) // 对元组的第二个元素进行求和
result.print() // 对DataSet的信息进行打印
}
}
运行结果
(telling,1)
(a,1)
(wao,1)
(yarning,1)
(hai,1)
(hello,1)
(story,2)
(and,1)
(wine,1)
(i,2)
(speak,1)
(like,1)
(hive,2)
(you,3)
(Chinese,1)
(kidding,1)
(do,1)
(!,2)
(are,2)
(me,1)
流式处理的wordCount
1 创建maven项目
2 配置pom文件的依赖及scala编译插件和maven打依赖包插件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink_wordcount</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.scope>provided</flink.scope>
<flink.version>1.14.4</flink.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 1.1.0之后需要加flink-clients依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<recompileMode>incremental</recompileMode>
<addScalacArgs>-target:jvm-1.7</addScalacArgs>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
4 创建一个类StreamWordCount
package com.test.wc
import org.apache.flink.streaming.api.scala._
/**
* @ClassName StreamWordCount.java
* @author admin
* @version 1.0.0
* @Description TODO
* @createTime 2022年08月10日 22:26:00
*/
object StreamWordCount {
def main(args: Array[String]): Unit = {
// 创建流执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 创建一个socket监听作为输入
val inputSock:DataStream[String] = env.socketTextStream("localhost", 1313)
// 对DataStream进行转换
val resultDataStream: DataStream[(String, Int)] = inputSock.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
.sum(1)
resultDataStream.print()
env.execute()
}
}
window系统中下载一个nc并解压将nc.exe放在Windows/System32目录下 https://eternallybored.org/misc/netcat/
进入cmd,执行 下面的命令闯进一个socket连接
nc -l -p 1313
启动flink程序
在nc监听界面输入下面的内容
===》
hello world
hello scala
are you kidding me !
are you kidding you !
===> 查看idea控制台输出的信息,flink运行输出结果
5> (hello,1)
9> (world,1)
1> (scala,1)
5> (hello,2)
10> (you,1)
4> (!,1)
12> (kidding,1)
8> (are,1)
8> (me,1)
10> (you,2)
4> (!,2)
10> (you,3)
8> (are,2)
12> (kidding,2)
流式wordCount程序的传参版本
package com.test.wc
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
/**
* @ClassName StreamWordCount.java
* @author admin
* @version 1.0.0
* @Description TODO
* @createTime 2022年08月10日 22:26:00
*/
object StreamWordCount {
def main(args: Array[String]): Unit = {
// 创建流执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// flink提供的用于获取参数的静态工具类
val params: ParameterTool = ParameterTool.fromArgs(args)
// 获取参数中的host
val host: String = params.get("host")
// 获取参数中的port
val port: Int = params.getInt("port")
// 创建一个socket监听作为输入
val inputSock:DataStream[String] = env.socketTextStream(host,port)
// 对DataStream进行转换
val resultDataStream: DataStream[(String, Int)] = inputSock.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
.sum(1)
resultDataStream.print()
env.execute()
}
}
idea传参
运行结果
5> (hello,1)
13> (flink,1)
8> (are,1)
8> (now,1)
6> (doing,1)
10> (you,1)
7> (what,1)
10> (you,2)
7> (what,2)