Flink基础
今日课程内容目标
- Flink的执行环境
- 基本source算子
- 数据的转换操作(Transformation算子)
- 基本sink算子
Flink的执行环境
编程入口
- 流式计算入口(用于测试/生产)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- 流批一体入口(用于测试/生产)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 为env设置环境参数
ExecutionConfig config = env.getConfig();
// 设置为批处理模式
config.setExecutionMode(ExecutionMode.BATCH);
- 开启webui的本地运行环境(用于测试)
Configuration conf = new Configuration();
conf.setInteger("rest.port",8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
要开启本地webui功能,需要添加依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web</artifactId> <version>${flink.version}</version> </dependency>
执行模式
- 流执行(STREAMING)模式。
- STREAMING模式是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。在默认情况下,程序使用的就是STREAMING模式。
- 批执行(BATCH)模式。
- BATCH模式是专门用于批处理的执行模式,在这种模式下,Flink处理作业的方式类似于MapReduce框架。对于不会持续计算的有界数据,用这种模式处理会更方便。
- 自动(AUTOMATIC)模式。
- 在AUTOMATIC模式下,将由程序根据输入数据源是否有界来自动选择执行模式。
执行模式的配置方法(以BATCH为例,默认是STREAMING模式):
通过命令行配置(在提交作业时,增加execution.runtime-mode参数,进行指定)
bin/flink run -Dexecution.runtime-mode=BATCH ...
通过代码配置(在代码中调用setRuntimeMode方法指定)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setRuntimeMode(RuntimeExecutionMode.BATCH);
不要在代码中配置,而是使用命令行。这同设置并行度是类似的,在提交作业时指定参数可以更加灵活,同一段应用在程序写好之后,既可以用于批处理,又可以用于流处理,而在代码中进行硬编码的方式的可扩展性比较差,一般都不推荐。
基本source算子
基于集合的Source(测试用)
可将一个普通的Java集合、迭代器或者可变参数转换成一个分布式数据流DataStream;
fromElements
非并行的Source,可以将一到多个数据作为可变参数传入到该方法中,返回DataStreamSource。
public class FromElementDemo { public static void main(String[] args) throws Exception { //创建流计算执行上下文环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //指定多个相同类型的数据创建DataStream DataStreamSource<String> words = env.fromElements("flink", "hadoop", "flink"); //调用Sink将数据在控制台打印 words.print(); //执行 env.execute("FromElementDemo"); } }
fromCollection
非并行的Source,可以将一个Collection作为参数传入到该方法中,返回一个DataStreamSource。
//创建一个List List<String> wordList = Arrays.asList("flink", "spark", "hadoop", "flink"); //将List并行化成DataStream DataStreamSource<String> words = env.fromCollection(wordList); // fromParallelCollection fromParallelCollection(SplittableIterator, Class) 方法是一个并行的Source(并行度可以使用env的setParallelism来设置),该方法需要传入两个参数,第一个是继承SplittableIterator的实现类的迭代器,第二个是迭代器中数据的类型。 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3); //设置并行度为3//调用env的fromParallelCollection创建并行的生成数据的DataStreamSource DataStreamSource<Long> numbers = env.fromParallelCollection( new NumberSequenceIterator(1L, 10L), // 生成数组的range Long.class //输出数据的类型 );
generateSequence
并行的Source(并行度也可以通过调用该方法后,再调用setParallelism来设置)通过指定的起始值和结束值来生成数据序列流;
//调用env的generateSequence生成并行的DataSource,输出的数字是1到100 DataStreamSource<Long> numbers = env.generateSequence(1L, 100L).setParallelism(3);
基于Socket的Source(测试用)
非并行的Source,通过socket通信来获取数据得到数据流;
该方法还有多个重载的方法,如:
socketTextStream(String hostname, int port, String delimiter, long maxRetry)
可以指定行分隔符和最大重新连接次数。
//调用env的socketTextStream方法,从指定的Socket地址和端口创建DataStream
DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
注意:socketSource是一个非并行source,如果使用socketTextStream读取数据,在启动Flink程序之前,必须先启动一个Socket服务,为了方便,Mac或Linux用户可以在命令行终端输入nc -lk 9999启动一个Socket服务并在命令行中向该Socket服务发送数据。
基于文件的Source(测试用)
基于文件的Source,本质上就是使用指定的FileInputFormat组件读取数据,可以指定TextInputFormat、CsvInputFormat、BinaryInputFormat等格式;
底层都是ContinuousFileMonitoringFunction,这个类继承了RichSourceFunction
,都是非并行的Source;
readFile
readFile(FileInputFormat inputFormat, String filePath) 方法可以指定读取文件的FileInputFormat 格式,参数FileProcessingMode,可取值:
- PROCESS_ONCE,只读取文件中的数据一次,读取完成后,程序退出
- PROCESS_CONTINUOUSLY,会一直监听指定的文件,文件的内容发生变化后,会将以前的内容和新的内容全部都读取出来,进而造成数据重复读取
String path = "D://word.txt"; //PROCESS_CONTINUOUSLY模式是一直监听指定的文件或目录,2秒钟检测一次文件是否发生变化 DataStreamSource<String> lines = env.readFile(new TextInputFormat(null), path, FileProcessingMode.PROCESS_CONTINUOUSLY, 2000);
readTextFile
readTextFile(String filePath) 可以从指定的目录或文件读取数据,默认使用的是TextInputFormat格式读取数据,还有一个重载的方法readTextFile(String filePath, String charsetName)可以传入读取文件指定的字符集,默认是UTF-8编码。该方法是一个有限的数据源,数据读完后,程序就会退出,不能一直运行。该方法底层调用的是readFile方法,FileProcessingMode为PROCESS_ONCE
DataStreamSource<String> lines = env.readTextFile(path);
Kafka Source(生产常用)
在实际生产环境中,为了保证flink可以高效地读取数据源中的数据,通常是跟一些分布式消息中件结合使用,例如Apache Kafka。Kafka的特点是分布式、多副本、高可用、高吞吐、可以记录偏移量等。Flink和Kafka整合可以高效的读取数据,并且可以保证Exactly Once(精确一次性语义)。
首先在maven项目的pom.xml文件中导入Flink跟Kafka整合的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.3.0-1.20</version>
</dependency>
参考代码:
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
//指定kafka集群的地址
.setBootstrapServers("node1.itcast.cn:9092")
//设置订阅的主题
.setTopics("test01")
//制定消费者组id
.setGroupId("itcast001")
//指定起始消费位移
//OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST):消费起始位移选择之前所递交的偏移量(如果没有,则重置为Latest)
//OffsetsInitializer.earliest():消费起始位移直接选择为最早
//OffsetsInitializer.latest():消费起始位移直接选择为最新
//OffsetsInitializer.offsets(Map<TopicPartition, Long>):消费起始位移选择为:方法所传入的每个分区对应的起始偏移量
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
//设置value数据的反序列化起(使用默认的字符串解析器)
.setValueOnlyDeserializer(new SimpleStringSchema())
//设置动态分区检查(10秒钟检查一次新的分区)
.setProperty("partition.discovery.interval.ms", "10000")
//设置开启kafka底层消费者的自动位移递交机制
// TODO 注释:他会把最新的消费位移递交到kafka的consumer_offset中
// TODO 注释:建议在于flink整合的时候不要设置true,由flink决定是否递交偏移量可以保证业务的端对端的一次性语义
.setProperty("auto.offset.commit", "true")
// TODO 注释:将这个source段子设置为Bounded属性(有界流)
// 将来source去读取数据的时候,读取到指定的位置,停止读取并退出
// 经常用来进行补数或者重跑一段历史的数据
// .setBounded(OffsetsInitializer.committedOffsets())
// TODO 注释:将这个source段子设置为UnBounded属性(无界流)
// 并不会一直读取数据,而是达到了指定位置就停止读取,但是程序不会退出
// 主要应用场景式:需要从kafka中读取某一段固定长度数据,然后拿着数据去跟另外一个真正的无界流联合处理
// .setUnbounded(OffsetsInitializer.latest())
.build();
// 将实例化成功的对象作为参数传入到fromSource方法中
DataStreamSource<String> kafkaStreamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");
自定义Source
Flink的DataStream API可以让开发者根据实际需要,灵活的自定义Source,本质上就是定义一个类,实现SourceFunction或继承RichParallelSourceFunction,实现run方法和cancel方法。
准备工作:
定义一个JavaBean对象
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
@ToString
class EventLog{
//唯一标识符
private long guid;
//会话ID
private String sessionId;
//事件id
private String eventId;
//时间戳
private long timeStamp;
//事件信息
private Map<String,String> eventInfo;
}
自定义Source,实现自定义&并行度为1的source
自定义source,实现SourceFunction接口,实现一个没有并行度的案例
功能:每隔 1s 进行生成一个****EventLog****
实现的方法:run(),作为数据源,所有数据的产生都在 run() 方法中实现
public class CustomSourceFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<EventLog> dataStreamSource = env.addSource(new MySourceFunction());
dataStreamSource.map(JSON::toJSONString).print();
env.execute();
}
}
class MySourceFunction implements SourceFunction<EventLog> {
volatile boolean flag = true;
@Override
public void run(SourceContext<EventLog> ctx) throws Exception {
EventLog eventLog = new EventLog();
String[] events = {"hadoop","spark","flink","hbase","kafka","hdfs","mapreduce"};
HashMap<String, String> eventInfoMap = new HashMap<>();
while(flag) {
eventLog.setGuid(RandomUtils.nextLong(1, 1000));
//该方法随机生成一个包含大小写字母的字符串,一个参数表示该字符串包含的字母的个数
eventLog.setSessionId(RandomStringUtils.randomAlphabetic(12).toUpperCase());
eventLog.setTimeStamp(System.currentTimeMillis());
eventLog.setEventId(events[RandomUtils.nextInt(0, events.length)]);
//该方法随机生成一个包含大小写字母的字符串,但是该字符串的长度是随机的生成的,随机的范围就是该方法的两个参数,第一个表示随机的最小值,第二个表示随机的最大值
eventInfoMap.put(RandomStringUtils.randomAlphabetic(1), RandomStringUtils.randomAlphabetic(2));
eventLog.setEventInfo(eventInfoMap);
ctx.collect(eventLog);
eventInfoMap.clear();
Thread.sleep(1000);
}
}
@Override
public void cancel() {
flag = false;
}
}
自定义Source,实现一个支持并行度的source
实现ParallelSourceFunction接口
该接口只是个标记接口,用于标识继承该接口的Source都是并行执行的。其直接实现类是RichParallelSourceFunction,它是一个抽象类并继承自 AbstractRichFunction(从名称可以看出,它应该兼具 rich 和 parallel 两个特性,这里的rich体现在它定义了 open 和 close 这两个方法)。
public class CustomParallelSourceFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//DataStreamSource<EventLog> dataStreamSource = env.addSource(new MySourceFunction());
DataStreamSource<EventLog> dataStreamSource = env.addSource(new MyParallelSourceFunction()).setParallelism(2);
dataStreamSource.map(JSON::toJSONString).print();
env.execute();
}
}
class MyParallelSourceFunction implements ParallelSourceFunction<EventLog> {
volatile boolean flag = true;
@Override
public void run(SourceContext<EventLog> ctx) throws Exception {
EventLog eventLog = new EventLog();
String[] events = {"hadoop","spark","flink","hbase","kafka","hdfs","mapreduce"};
HashMap<String, String> eventInfoMap = new HashMap<>();
while(flag) {
eventLog.setGuid(RandomUtils.nextLong(1, 1000));
//该方法随机生成一个包含大小写字母的字符串,一个参数表示该字符串包含的字母的个数
eventLog.setSessionId(RandomStringUtils.randomAlphabetic(12).toUpperCase());
eventLog.setTimeStamp(System.currentTimeMillis());
eventLog.setEventId(events[RandomUtils.nextInt(0, events.length)]);
//该方法随机生成一个包含大小写字母的字符串,但是该字符串的长度是随机的生成的,随机的范围就是该方法的两个参数,第一个表示随机的最小值,第二个表示随机的最大值
eventInfoMap.put(RandomStringUtils.randomAlphabetic(1), RandomStringUtils.randomAlphabetic(2));
eventLog.setEventInfo(eventInfoMap);
ctx.collect(eventLog);
eventInfoMap.clear();
Thread.sleep(1000);
}
}
自定义Source,实现一个支持并行度的富类source
RichParallelSourceFunction 中的rich体现在额外提供open和close方法
针对source中如果需要获取其他链接资源,那么可以在open方法中获取资源链接,在close中关闭资源链接。
public class CustomSourceFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//DataStreamSource<EventLog> dataStreamSource = env.addSource(new MySourceFunction());
//DataStreamSource<EventLog> dataStreamSource = env.addSource(new MyParallelSourceFunction()).setParallelism(2);
DataStreamSource<EventLog> dataStreamSource = env.addSource(new MyRichSourceFunction());
// DataStreamSource<EventLog> dataStreamSource = env.addSource(new MyRichParallelSourceFunction()).setParallelism(2);
dataStreamSource.map(JSON::toJSONString).print();
env.execute();
}
}
class MyRichSourceFunction extends RichSourceFunction<EventLog> {
volatile boolean flag = true;
/**
* source组件初始化
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
RuntimeContext runtimeContext = getRuntimeContext();
// 可以从运行时上下文中,取到本算子所属的 task 的task名
String taskName = runtimeContext.getTaskName();
// 可以从运行时上下文中,取到本算子所属的 subTask 的subTaskId
int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
// 如果需要获取其他链接资源,那么可以在open方法中获取资源链接
System.out.println("资源链接.. ");
}
/**
* source组件生成数据的过程(核心工作逻辑)
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<EventLog> ctx) throws Exception {
EventLog eventLog = new EventLog();
String[] events = {"hadoop", "spark", "flink", "hbase", "kafka", "hdfs", "mapreduce"};
HashMap<String, String> eventInfoMap = new HashMap<>();
while (flag) {
eventLog.setGuid(RandomUtils.nextLong(1, 1000));
eventLog.setSessionId(RandomStringUtils.randomAlphabetic(12).toUpperCase());
eventLog.setTimeStamp(System.currentTimeMillis());
eventLog.setEventId(events[RandomUtils.nextInt(0, events.length)]);
eventInfoMap.put(RandomStringUtils.randomAlphabetic(1), RandomStringUtils.randomAlphabetic(2));
eventLog.setEventInfo(eventInfoMap);
ctx.collect(eventLog);
eventInfoMap.clear();
Thread.sleep(RandomUtils.nextInt(500, 1500));
}
}
/**
* job取消调用的方法
*/
@Override
public void cancel() {
flag = false;
}
/**
* 组件关闭调用的方法
* @throws Exception
*/
@Override
public void close() throws Exception {
// 在close中关闭资源链接
System.out.println("资源关闭.. ");
}
}
自定义Source,实现消费MySQL中的数据
这个更加接近实际的案例,上面我们已经使用了自定义数据源和Flink自带的Kafka source,那么接下来就模仿着写一个从 MySQL 中读取数据的 Source。
- mysql建表语句
create database if not exists flinkdemo;
use flinkdemo;
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`id` int(11) NOT NULL,
`username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES (10, 'dazhuang', '123456', '大壮');
INSERT INTO `user` VALUES (11, 'erya', '123456', '二丫');
INSERT INTO `user` VALUES (12, 'sanpang', '123456', '三胖');
SET FOREIGN_KEY_CHECKS = 1;
- 创建自定义Source类,继承 RichSourceFunction
public class CustomSourceFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<UserInfo> dataStreamSource = env.addSource(new MyRichParallelSourceFunction()).setParallelism(2);
dataStreamSource.map(JSON::toJSONString).print();
env.execute();
}
}
class MyRichParallelSourceFunction extends RichParallelSourceFunction<UserInfo> {
private Connection connection = null; // 定义数据库连接对象
private PreparedStatement ps = null; // 定义PreparedStatement对象
/*
使用open方法, 这个方法在实例化类的时候会执行一次, 比较适合用来做数据库连接
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 加载数据库驱动
Class.forName("com.mysql.jdbc.Driver");
// 创建数据库连接
String url = "jdbc:mysql://node1:3306/flinkdemo?useUnicode=true&characterEncoding=utf-8&useSSL=false";
this.connection = DriverManager.getConnection(url, "root", "123456");
// 准备PreparedStatement对象
this.ps = connection.prepareStatement("SELECT id, username, password, name FROM user");
System.out.println("资源链接.. ");
}
/*
使用close方法, 这个方法在销毁实例的时候会执行一次, 比较适合用来关闭连接
*/
@Override
public void close() throws Exception {
super.close();
// 关闭资源
if (this.ps != null) this.ps.close();
if (this.connection != null) this.connection.close();
System.out.println("资源关闭.. ");
}
@Override
public void run(SourceContext<UserInfo> ctx) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
int id = resultSet.getInt("id");
String username = resultSet.getString("username");
String password = resultSet.getString("password");
String name = resultSet.getString("name");
ctx.collect(new UserInfo(id, username, password, name));
}
}
@Override
public void cancel() {
System.out.println("任务被取消......");
}
}
/**
数据定义类, POJO
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
class UserInfo {
int id;
String username;
String password;
String name;
}
数据的转换操作(Transformation算子)
映射算子
map映射(DataStream → DataStream)
map(new MapFunction )
MapFunction: (x)-> y [1条变1条]
public class MapDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//调用env的fromElements创建一个非并行的DataStreamSource
DataStreamSource<String> words = env.fromElements(
"hadoop","spark","flink","hbase","flink","spark"
);
//在map方法中传入MapFunction实现类实例,重写map方法
DataStream<String> upperWords = words.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
//将每一个单词转成大写
return value.toUpperCase();
}
});
//调用Sink将数据打印在控制台
upperWords.print();
env.execute("MapDemo");
}
}
flatMap扁平化映射(DataStream → DataStream)
flatMap( new FlatMapFcuntion)
FlatMapFunction: x-> x1, x2,x3,x4 [1条变多条,并展平]
DataStream<Tuple2<String, Integer>> wordAndOne = lines.flatMap(
new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
//将一行字符串按空格切分成一个字符串数组
String[] arr = line.split(" ");
for (String word : arr) {
//将单词转成小写放入到Collector中
out.collect(Tuple2.of(word.toLowerCase(), 1));
}
}
}
);
如果是调用flatMap方法时传入Lambda表达式,需要在调用flatMap方法后,在调用returns方法指定返回的数据的类型。不然Flink无法自动推断出返回的数据类型,会出现异常。
DataStream<Tuple2<String, Integer>> wAndOne = lines.flatMap(
(String line, Collector<Tuple2<String, Integer>> out) -> {
Arrays.asList(line.split("\\W+")).forEach(word -> {
out.collect(Tuple2.of(word.toLowerCase(), 1));
});
}
).returns(Types.TUPLE(Types.STRING, Types.INT)); //使用returns指定返回数据的类型
过滤算子
filter过滤(DataStream → DataStream)
filter(new FilterFunction)
FilterFunction : x -> true/false
DataStreamSource<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
//过滤掉奇数,保留偶数
DataStream<Integer> even = numbers.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value % 2 == 0; //过滤掉返回false的数组
}
});
分组算子
keyBy按key分组(DataStream → KeyedStream)
//按照Tuple2中的第0个位置进行分组,分组后得到KeyedStream
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
//按照Bean中的属性名word进行分组
KeyedStream<CountBean, Tuple> keyed = wordAndOne.keyBy("word");
聚合算子
有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。
此处所说的聚合算子,是多个聚合算子的统称,有sum、min、minBy、max、maxBy;
这些算子的底层逻辑都是维护一个聚合值,并使用每条流入的数据对聚合值进行滚动更新;
这些算子都只能在KeyedStream上调用(就是必须keyby后调用);
sum
该算子实现实时滚动相加的功能,即新输入的数据和历史数据进行相加。
//按照Tuple2中的第0个位置进行分组,分组后得到KeyedStream
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);//将分组后的数据进行sum运算
DataStream<Tuple2<String, Integer>> result = keyed.sum(1);
//如果是自定义的POJO类型数据,可以传入一个要聚合的字段名称。
//按照Bean中的属性名word进行分组
KeyedStream<CountBean, Tuple> keyed = wordAndOne.keyBy("word");
//按照Bean中的属性名count进行sum聚合
DataStream<CountBean> result = keyed.sum("count");
min、minBy
这两个算子都是求最小值;min和minBy的区别在于:
min的返回值,最小值字段以外,其他字段是第一条输入数据的值;
minBy返回值,就是最小值字段所在的那条数据;
底层原理:滚动更新时是更新一个字段,还是更新整条数据的区别;
KeyedStream<Tuple3<String, String, Integer>, Tuple> keyedStream = wordAndCount.keyBy(1);
//将分组后的数据进行调用min、minBy
DataStream<Tuple3<String, String, Integer>> min1 = keyedStream.min(2);
DataStream<Tuple3<String, String, Integer>> min2 = keyedStream.minBy(2);
DataStream<Tuple3<String, String, Integer>> min3 = keyedStream.minBy(2, false);
//调用print sink打印结果
min1.print("min");
min2.print("minBy");
min3.print("minBy last");
max、maxBy
这两个算子都是求最大值,用法和min、minBy相同。
reduce归约
它的滚动聚合逻辑没有写死,而是由用户通过ReduceFunction来传入。
//按照Tuple2中的第0个位置进行分组,分组后得到KeyedStream
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
//将分组后的数据进行reduce
DataStream<Tuple2<String, Integer>> reduced = keyed.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
t1.f1 += t2.f1; //将元组对应的次数进行累加
return t1; //返回累加后的元组
}
}
);
基本sink算子
sink算子是将计算结果最终输出的算子
不同的sink算子可以将数据输出到不同的目标,如写入到的文件、输出到指定的网络端口、消息中间件、外部的文件系统或者是打印到控制台。
打印到控制台:print
打印是最简单的一个Sink,通常是用来做实验和测试时使用。
DataStream<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);
result.print();
打印到文件:StreamFileSink
该Sink不但可以将数据写入到各种文件系统中,而且整合了checkpoint机制来保证Exacly Once语义,还可以对文件进行分桶存储,还支持以列式存储的格式写入,功能更强大。
streamFileSink中输出的文件,其生命周期会经历3中状态:
- in-progress Files
- Pending Files
- Finished Files
Bucket:FileSink可向由Flink FileSystem抽象支持的文件系统写入分区文件(因为是流式写入,数据被视为无界)。该分区行为可配,默认按时间,具体来说每小时写入一个Bucket,该Bucket包括若干文件,内容是这一小时间隔内流中收到的所有record。
PartFile:每个Bukcket内部分为多个PartFile来存储输出数据,该Bucket生命周期内接收到数据的sink的每个子任务至少有一个PartFile。
而额外文件滚动由可配的滚动策略决定,默认策略是根据文件大小和打开超时(文件可以被打开的最大持续时间)以及文件最大不活动超时等决定是否滚动。
Bucket和SubTask、PartFile关系如图所示:

FileSink 支持****行编码(Row-encoded)*和*批量编码(Bulk-encoded,比如 Parquet)格式****。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用StreamingFileSink 的静态方法:
行编码:FileSink.forRowFormat(basePath,rowEncoder)
批量编码:FileSink.forBulkFormat(basePath,bulkWriterFactory)
在创建行或批量编码 Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径(basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)。
需要添加依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>${parquet-avro}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
Row格式文件输出代码示例
需求:编写Flink程序,每隔1s生成字符串数据,然后将接收到的数据流式方式存储到hdfs。
自定义数据源,每秒钟生成一条数据
public class MySourceFunction implements SourceFunction<EventLog>{ volatile boolean flag = true; @Override public void run(SourceContext<EventLog> ctx) throws Exception { EventLog eventLog = new EventLog(); String[] events = { "hadoop", "spark", "flink", "hbase", "kafka", "hdfs", "mapreduce"}; HashMap<String, String> eventInfoMap = new HashMap<>(); while (flag) { eventLog.setGuid(RandomUtils.nextLong(1, 1000)); eventLog.setSessionId(RandomStringUtils.randomAlphabetic(12).toUpperCase()); eventLog.setTimeStamp(System.currentTimeMillis()); eventLog.setEventId(events[RandomUtils.nextInt(0, events.length)]); eventInfoMap.put(RandomStringUtils.randomAlphabetic(1), RandomStringUtils.randomAlphabetic(2)); eventLog.setEventInfo(eventInfoMap); ctx.collect(eventLog); eventInfoMap.clear(); Thread.sleep(RandomUtils.nextInt(200, 1500)); } } @Override public void cancel() { flag = false; } }
示例代码
public class FileSinkRowFormat_Demo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt"); env.setParallelism(2); DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction()); /** * 应用 FileSink 算子,来将数据输出到 文件系统 */ /** * 1. 输出为 行格式 */ // 构造一个FileSink对象 FileSink<String> rowSink = FileSink .forRowFormat(new Path("d:/filesink/rowformat"), new SimpleStringEncoder<String>("utf-8")) // 文件的滚动策略 (间隔时长10s,或文件大小达到 5M,就进行文件切换 .withRollingPolicy( DefaultRollingPolicy.builder() //至少包含多少时间的数据 .withRolloverInterval(Duration.ofSeconds(10)) //多少时间没有新的数据 .withInactivityInterval(Duration.ofSeconds(10)) //数据达到多大1G .withMaxPartSize(MemorySize.ofMebiBytes(1)).build()) // 分桶的策略(划分子文件夹的策略) .withBucketAssigner(new DateTimeBucketAssigner<String>()) // 输出文件的文件名相关配置 .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("itcast").withPartSuffix(".txt").build()) .build(); // 然后添加到流,进行输出 streamSource.map(JSON::toJSONString) //.addSink() /* SinkFunction实现类对象,用addSink() 来添加*/ .sinkTo(rowSink).uid("fileSink"); /*Sink 的实现类对象,用 sinkTo()来添加 */ env.execute(); } }
Bulk列式存储文件输出代码示例1
需求:手动构建Avro的Schema对象,得到ParquetWriterFactory的方式实现如上需求
public class FileSinkBulkFormat_Demo1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启checkpoint
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");
env.setParallelism(1);
/**
* 方式一:
* 核心逻辑:
* - 构造一个schema
* - 利用schema构造一个parquetWriterFactory
* - 利用parquetWriterFactory构造一个FileSink算子
* - 将原始数据转成GenericRecord流,输出到FileSink算子
*/
// 1. 先定义GenericRecord的数据模式
Schema schema = SchemaBuilder.builder()
.record("DataRecord")
.namespace("cn.itcast.flink.avro.schema")
.fields()
.requiredInt("gid")
.requiredLong("ts")
.requiredString("eventId")
.requiredString("sessionId")
.name("eventInfo")
.type()
.map()
.values()
.type("string")
.noDefault()
.endRecord();
// 构造好一个数据流
DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction());
// 2. 通过定义好的schema模式,来得到一个parquetWriter
ParquetWriterFactory<GenericRecord> writerFactory = AvroParquetWriters.forGenericRecord(schema);
// 3. 利用生成好的parquetWriter,来构造一个 支持列式输出parquet文件的 sink算子
FileSink<GenericRecord> sink1 = FileSink.forBulkFormat(new Path("d:/filesink/bulkformat"), writerFactory)
.withBucketAssigner(new DateTimeBucketAssigner<GenericRecord>("yyyy-MM-dd--HH"))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("itcast").withPartSuffix(".parquet").build())
.build();
// 4. 将自定义javabean的流,转成 上述sink算子中parquetWriter所需要的 GenericRecord流
SingleOutputStreamOperator<GenericRecord> recordStream = streamSource
.map((MapFunction<EventLog, GenericRecord>) eventLog -> {
// 构造一个Record对象
GenericData.Record record = new GenericData.Record(schema);
// 将数据填入record
record.put("gid", (int) eventLog.getGuid());
record.put("eventId", eventLog.getEventId());
record.put("ts", eventLog.getTimeStamp());
record.put("sessionId", eventLog.getSessionId());
record.put("eventInfo", eventLog.getEventInfo());
return record;
}).returns(new GenericRecordAvroTypeInfo(schema)); // 由于avro的相关类、对象需要用avro的序列化器,所以需要显式指定AvroTypeInfo来提供AvroSerializer
// 5. 输出数据
recordStream.sinkTo(sink1).uid("fileSink");
env.execute();
}
}
Bulk列式存储文件输出代码示例2
需求:编写avsc 配置文件,并利用插件生成“特定JavaBean”,得到ParquetWriterFactory的方式实现如上需求。
开发步骤:
编写一个avsc文本文件(json),来描述数据模式
{"namespace": "cn.itcast.chapter5.avro.schema",
"type": "record",
"name": "AvroEventLog",
"fields": [
{"name": "guid", "type": "long"},
{"name": "sessionId", "type": "string"},
{"name": "eventId", "type": "string"},
{"name": "timeStamp", "type": "long"},
{"name": "eventInfo", "type": { "type":"map","values": "string"} }
]
}
添加 maven代码生成器插件,来针对上述的avsc生成avro特定格式的JavaBean类
利用代码生成器生成的 JavaBean,来构造一个 parquetWriterFactory
利用parquetWriterFactory构造一个FileSink算子
将原始数据流 转成 特定格式JavaBean流,输出到 FileSink算子
示例代码:
public class FileSinkBulkFormat_Demo2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启checkpoint
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");
// 构造好一个数据流
DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction());
/**
* 方式二:
* 核心逻辑:
* - 编写一个avsc文本文件(json),来描述数据模式
* - 添加 maven代码生成器插件,来针对上述的avsc生成avro特定格式的JavaBean类
* - 利用代码生成器生成的 JavaBean,来构造一个 parquetWriterFactory
* - 利用parquetWriterFactory构造一个FileSink算子
* - 将原始数据流 转成 特定格式JavaBean流,输出到 FileSink算子
*/
// 1. 先定义avsc文件放在resources文件夹中,并用maven的插件,来编译一下,生成特定格式的JavaBean : AvroEventLog
// 这种根据avsc生成的JavaBean类,自身就已经带有了Schema对象
// AvroEventLog avroEventLog = new AvroEventLog();
// Schema schema = avroEventLog.getSchema();
// 2. 通过自动生成 AvroEventLog类,来得到一个parquetWriter
ParquetWriterFactory<AvroEventLog> parquetWriterFactory = AvroParquetWriters.forSpecificRecord(AvroEventLog.class);
// 3. 利用生成好的parquetWriter,来构造一个 支持列式输出parquet文件的 sink算子
FileSink<AvroEventLog> bulkSink = FileSink.forBulkFormat(new Path("d:/filesink/bulkformat2"), parquetWriterFactory)
.withBucketAssigner(new DateTimeBucketAssigner<AvroEventLog>("yyyy-MM-dd--HH"))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("itcast").withPartSuffix(".parquet").build())
.build();
// 4. 将自定义javabean的 EventLog 流,转成 上述sink算子中parquetWriter所需要的 AvroEventLog 流
SingleOutputStreamOperator<AvroEventLog> avroEventLogStream = streamSource.map(new MapFunction<EventLog, AvroEventLog>() {
@Override
public AvroEventLog map(EventLog eventLog) throws Exception {
HashMap<CharSequence, CharSequence> eventInfo1 = new HashMap<>();
// 进行hashmap<charsequenct,charsequence>类型的数据转移
Map<String, String> eventInfo2 = eventLog.getEventInfo();
Set<Map.Entry<String, String>> entries = eventInfo2.entrySet();
for (Map.Entry<String, String> entry : entries) {
eventInfo1.put(entry.getKey(), entry.getValue());
}
return new AvroEventLog(eventLog.getGuid(), eventLog.getSessionId(), eventLog.getEventId(), eventLog.getTimeStamp(), eventInfo1);
}
});
// 5. 输出数据
avroEventLogStream.sinkTo(bulkSink);
env.execute();
}
}
Bulk列式存储文件输出代码示例3
需求:直接利用普通JavaBean,利用工具自身的反射机制,得到ParquetWriterFactory的方式实现如上需求;
public class FileSinkBulkFormat_Demo3 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启checkpoint
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");
// 构造好一个数据流
DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction());
// 将上面的数据流输出到文件系统(假装成一个经过了各种复杂计算后的结果数据流)
/**
* 方式三:
* 核心逻辑:
* - 利用自己的JavaBean类,来构造一个 parquetWriterFactory
* - 利用parquetWriterFactory构造一个FileSink算子
* - 将原始数据流,输出到 FileSink算子
*/
// 2. 通过自己的JavaBean类,来得到一个parquetWriter
ParquetWriterFactory<EventLog> parquetWriterFactory = AvroParquetWriters.forReflectRecord(EventLog.class);
// 3. 利用生成好的parquetWriter,来构造一个 支持列式输出parquet文件的 sink算子
FileSink<EventLog> bulkSink = FileSink.forBulkFormat(new Path("d:/filesink/bulkformat3"), parquetWriterFactory)
.withBucketAssigner(new DateTimeBucketAssigner<EventLog>("yyyy-MM-dd--HH"))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("itcast").withPartSuffix(".parquet").build())
.build();
// 5. 输出数据
streamSource.sinkTo(bulkSink);
env.execute();
}
}
输出到Kafka
Kafka 是一个分布式的基于发布/订阅的消息系统,本身处理的也是流式数据,所以跟Flink经常放到一起使用,作为 Flink 的输入数据源和输出系统。Flink 官方为 Kafka 提供了 Source和 Sink 的连接器,我们可以用它方便地从 Kafka 读写数据。如果仅仅是支持读写,那还说明不了 Kafka 和 Flink 关系的亲密;真正让它们密不可分的是,*Flink 与 Kafka 的连接器提供了端到端的精确一次(exactly once)语义保证,这在实际项目中是最高级别的一致性保证*。
import cn.itcast.chapter6.beans.EventLog;
import cn.itcast.chapter6.utils.source.MySourceFunction;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author : www.itcast.cn
* @date : 22.10.22 13:21
* @Desc:
* 利用KafkaSink将数据流写入kafka
* 测试准备,创建目标topic:
* kafka-topics.sh --create --topic event-log --partitions 3 --replication-factor 2 --zookeeper node1.itcast.cn:2181
**/
public class KafkaSink_Demo {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
// 开启checkpoint
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");
// 构造好一个数据流
DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction());
// 把数据写入kafka
// 1. 构造一个kafka的sink算子
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("node1.itcast.cn:9092,node2.itcast.cn:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder()
.setTopic("event-log")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
// 2. 把数据流输出到构造好的sink算子
streamSource
.map(JSON::toJSONString).disableChaining()
.sinkTo(kafkaSink);
env.execute();
}
}
注意:如果使用DeliveryGuarantee.EXACTLY_ONCE 的语义保证,则需要使用 setTransactionalIdPrefix(String),如:
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix(“itcast-” + RandomUtils.nextInt(1, 100))
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG , “36000”)
输出到 MySQL(JDBC)
不保证Exactly-Once的JdbcSink
创建表结构
CREATE TABLE flinkdemo.t_eventlog ( guid BIGINT NOT NULL, sessionId varchar(100) NULL, eventId varchar(100) NULL, `timeStamp` BIGINT NULL, eventInfo varchar(500) NULL, CONSTRAINT t_eventlog_PK PRIMARY KEY (guid) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci;
示例代码
public class JdbcSink_Demo { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.setInteger("rest.port", 8081); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); // 开启checkpoint env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt"); // 构造好一个数据流 DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction()); /** * 一、 不保证 EOS语义的方式 */ SinkFunction<EventLog> jdbcSink = JdbcSink.sink( "insert into t_eventlog values (?,?,?,?,?) on duplicate key update sessionId=?,eventId=?,ts=?,eventInfo=? ", new JdbcStatementBuilder<EventLog>() { @Override public void accept(PreparedStatement preparedStatement, EventLog eventLog) throws SQLException { preparedStatement.setLong(1, eventLog.getGuid()); preparedStatement.setString(2, eventLog.getSessionId()); preparedStatement.setString(3, eventLog.getEventId()); preparedStatement.setLong(4, eventLog.getTimeStamp()); preparedStatement.setString(5, JSON.toJSONString(eventLog.getEventInfo())); preparedStatement.setString(6, eventLog.getSessionId()); preparedStatement.setString(7, eventLog.getEventId()); preparedStatement.setLong(8, eventLog.getTimeStamp()); preparedStatement.setString(9, JSON.toJSONString(eventLog.getEventInfo())); } }, JdbcExecutionOptions.builder() .withMaxRetries(3) .withBatchSize(1) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://node1:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8") .withUsername("root") .withPassword("123456") .build() ); // 输出数据 streamSource.addSink(jdbcSink); env.execute(); } }
保证Exactly-Once的JdbcSink
- 示例代码
public class JdbcSink_Demo {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
// 开启checkpoint
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");
// 构造好一个数据流
DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction());
/**
* 二、可以提供 EOS 语义保证的 sink
*/
SinkFunction<EventLog> exactlyOnceSink = JdbcSink.exactlyOnceSink(
"insert into t_eventlog values (?,?,?,?,?) on duplicate key update sessionId=?,eventId=?,ts=?,eventInfo=? ",
new JdbcStatementBuilder<EventLog>() {
@Override
public void accept(PreparedStatement preparedStatement, EventLog eventLog) throws SQLException {
preparedStatement.setLong(1, eventLog.getGuid());
preparedStatement.setString(2, eventLog.getSessionId());
preparedStatement.setString(3, eventLog.getEventId());
preparedStatement.setLong(4, eventLog.getTimeStamp());
preparedStatement.setString(5, JSON.toJSONString(eventLog.getEventInfo()));
preparedStatement.setString(6, eventLog.getSessionId());
preparedStatement.setString(7, eventLog.getEventId());
preparedStatement.setLong(8, eventLog.getTimeStamp());
preparedStatement.setString(9, JSON.toJSONString(eventLog.getEventInfo()));
}
},
JdbcExecutionOptions.builder()
.withMaxRetries(0)
.withBatchSize(1)
.build(),
JdbcExactlyOnceOptions.builder()
// mysql不支持同一个连接上存在并行的多个事务,必须把该参数设置为true
.withTransactionPerConnection(true)
.build(),
new SerializableSupplier<XADataSource>() {
@Override
public XADataSource get() {
// XADataSource就是jdbc连接,不过它是支持分布式事务的连接
// 而且它的构造方法,不同的数据库构造方法不同
MysqlXADataSource xaDataSource = new MysqlXADataSource();
xaDataSource.setUrl("jdbc:mysql://node1:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8");
xaDataSource.setUser("root");
xaDataSource.setPassword("123456");
return xaDataSource;
}
}
);
// 输出数据
streamSource.addSink(exactlyOnceSink);
env.execute();
}
}
今日总结
本章节依次讲解了执行环境的创建、数据源的读取、数据流的转换操作,和最终结果数据的输出,对各种常见的转换操作 API 和外部系统的连接都做了详细介绍,这些api都是最基本的api,除此之外还有更复杂的api将会在后面的章节进行详细的介绍,通过这个章节主要让大家掌握 DataStream API 的基本用法以及熟悉 Flink 的编程习惯。