目录
前言:
对 Flink有了基本的了解后,接下来就要理论联系实际,真正上手写代码了。Flink底层是以Java编写的,并为开发人员同时提供了完整的Java和ScalaAPI。在本书中,代码示例将全部用Java 实现;而在具体项目应用中,可以根据需要选择合适语言的API进行开发。在这一章,我们将会以大家最熟悉的 IntelliJIDEA 作为开发工具,用实际项目中最常见的Maven 作为包管理工具,在开发环境中编写一个简单的 Flink项目,实现零基础快速上手。
一、环境准备
1、小编的本地测试系统环境为 Windows 11。
2、需提前安装 Java 8。
3、集成开发环境(IDE)使用IntelliJIDEA,具体的安装流程参见IntelliJ官网。
4、安装 IntelliJIDEA之后,还需要安装一些插件--Maven和Git。Maven 用来管理项目依赖;通过Git 可以轻松获取我们的示例代码,并进行本地代码的版本控制。
二、创建项目
1.创建工程
打开IntellJIDEA,创建一个 Maven 工程,如图 1-1 所示
图:1-1
导入Maven仓库,File-> Settings->Maven,如图 1-2 所示
图:1-2
2、添加项目依赖
在项目的 pom 文件中,增加<properties>标签设置属性,然后增加<denpendencies>标签引入需要的依赖。我们需要添加的依赖最重要的就是Flink的相关组件,包括fink-java、flink-streaming-java,以及 ink-clients(客户端,也可以省略)。另外,为了方便查看运行日志,我们引入 slf4i 和 log4i 进行日志管理。
将下面依赖导入刚新建项目的pom.xml中
<properties>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
如下图 1-3:
图 1-3
三、WordCount代码编写(有界流)
需求:统计一段文字中,每个单词出现的频次。
环境准备:在 src/main/java目录下,新建一个包,命名为cn.konne.wc。
1、批处理和流处理
批处理基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。
流处理基本思路:一条一条的读取数据,每读到一条数据后计算结果,保存本地内存的状态,下一条数据进来后,从内存中读取之前状态,并进行计算。
2、数据准备
在工程根目录下新建一个input 文件夹,并在下面创建文本文件 words.txte。
在words.txt中输入一些文字,例如:
hello flink
hello worlde
hello iava
3、编写代码
在cn.konne.wc下面新建WordCountTest类,在类中编写main方法。并且将待会需要操作的每个步骤进行梳理,见图1-4:
图 1-4
具体代码如下:
1、DataSet API (不推荐)(批处理)
package cn.konne.wc;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* @author MJ
* @date 2025/3/19
*/
public class WordCountTest {
public static void main(String[] args) throws Exception {
// TODO 1.创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// TODO 2.读取数据:从文件中读取
DataSource<String> lonDs = env.readTextFile("/input/words.txt");
// TODO 3.切分、转换(word,1)
FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lonDs.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
// TODO 按照空格 切分单词、
String[] words = value.split(" ");
// TODO 将单词转为为(word,1)
for (String word : words) {
Tuple2<String, Integer> stringIntegerTuple2 = Tuple2.of(word, 1);
// TODO 使用 Collector 向下游发送消息
out.collect(stringIntegerTuple2);
}
}
});
// TODO 4.按照 word 分组
UnsortedGrouping<Tuple2<String, Integer>> tuple2UnsortedGrouping = wordAndOne.groupBy(0);
// TODO 5.各分组内聚台
AggregateOperator<Tuple2<String, Integer>> sum = tuple2UnsortedGrouping.sum(1);
// TODO 6.输出/
sum.print();
}
}
输出结构如下:
(java,1)
(flink,1)
(world,1)
(he11o,3)
需要注意的是,这种代码的实现方式,是基于DataSetAPI的,也就是我们对数据的处理转换,是看作数据集来进行操作的。事实上Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。所以从Flink1.12开始,官方推荐的做法是直接使用 DataStreamAP,在提交任务时通过将执行模式设为 BATCH来进行批处理:←S bin/flink run -Dexecution,runtime-mode=BATCH BatchWordCount.jar。
这样,DataSet API就没什么用了,在实际应用中我们只要维护一套 DataStream API就可以。这里只是为了方便大家理解,我们依然用 DataSetAPI做了批处理的实现。
2、DataStreaming(流处理)
package cn.konne.wc;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* DataStreaming
*/
public class WordCountStream {
public static void main(String[] args) throws Exception {
// TODO 1、创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO 2、读取数据
DataStreamSource<String> lineDs = env.readTextFile("D:\\JAVA\\konne\\words.txt");
// TODO 3、处理数据
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lineDs.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] s = value.split(" ");
for (String string : s) {
out.collect(new Tuple2<>(string, 1));
}
}
});
// TODO 4、分组
KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
// TODO 5、聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordAndOneKS.sum(1);
// TODO 6、输出数据
sum.print();
// TODO 7、执行
env.execute();
}
}
输出:
7> (worlde,1)
7> (flink,1)
3> (hello,1)
3> (hello,2)
3> (hello,3)
8> (iava,1)
总结:
对于批处理和流处理输出的结果,可以发现批处理的hello是一次性计算出的结果,这就是批处理的特点。而流处理的hello输出了三次,体现了流处理数据一条一条计算的结果。
四、WordCount代码编写(无界流)
在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要持续地处理捕获的数据。为了模拟这种场景,可以监听 socket 端口,然后向该端口不断的发送数据。
将 StreamWordCount 代码中读取文件数据的 readTextFile 方法,替换成读取 socket文本流的方法socketTextStream。具体代码实现如下:
1、代码编写
package cn.konne.un;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountUn {
public static void main(String[] args) throws Exception {
// TODO 1、创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO 2、读取数据 从socket读取数据
DataStreamSource<String> socketDs = env.socketTextStream("hadoop102", 9999);
// TODO 3、处理数据
SingleOutputStreamOperator<Tuple2<String, Integer>> tuple2SingleOutputStreamOperator = socketDs
.flatMap(
(String value, Collector<Tuple2<String, Integer>> out) -> {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
).returns(Types.TUPLE(Types.STRING, Types.INT));
// TODO 4、聚合
KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = tuple2SingleOutputStreamOperator.keyBy((Tuple2<String, Integer> value) -> {
return value.f0;
});
// TODO 5、输出数据
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = tuple2StringKeyedStream.sum(1);
sum.print();
// TODO 6、执行
env.execute();
}
}