文章目录
前言
引言
数据流API
基于POJO的数据流
一般来说flink
中的源数据我们都会以简单java对象即pojo(Plain Ordinary Java Object )
的形式进行传输或游走,只要满足以下条件,flink就会识别这些数据类型:
- 类中所有非静态、非transient修饰的字段,要么以public且非final修饰或者对外提供get和set方法
- 该类不存在非静态的内部类
- 提供无参构造函数
对应的我们给出日常比较常用的POJO
示例,即私有成员但是提供get、set符合上述的要求:
public class Person {
private String name;
private Integer age;
//提供无参构造函数
public Person() {
}
public Person(String name, Integer age) {
this.name = name;
this.age = age;
}
//......
//get set方法
}
以上述POJO
作为源数据,可以看到笔者通过StreamExecutionEnvironment
构建流的执行环境,并通过fromData
进行关联:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//使用 fromData 关联源数据
DataStreamSource<Person> source = env.fromData(new Person("Alice", 18),
new Person("Bob", 28),
new Person("Charlie", 32));
基于上述的源数据利用DataStream api
尝试过滤出18岁以上的person数据并将过滤结果打印输出:
//基于 filter过滤出大于18岁的person
SingleOutputStreamOperator<Person> filterRes = source.filter(person -> person.getAge() > 18);
//输出打印
filterRes.print();
flink中的流操作和lambda类似需要有一个终端操作才能启动运行,所以我们再完成上述的执行环境设置之后,需确保通过 env.execute();
将当前job
提交到JobManager
,JobManager
切割为无数个子并行任务分发到指定的Task Managers
的slot槽中等待运行:
//执行execute后,上述任务提交到JobManager中的taskmanager某个slot中等待执行,若没提交这个则不会execute执行,这一点和java lambda的终端流操作思想一致
env.execute();
需要补充的是,flink
的fromData
方法提供了多种的重载,上面的示例我们也可以通过List
的方式将源数据传入:
List<Person> list = Arrays.asList(new Person("Alice", 18),
new Person("Bob", 28),
new Person("Charlie", 32));
//使用 fromData 关联源数据
DataStreamSource<Person> source = env.fromData(list);
基本源流配置示例
上述的示例我们基于DataStream
的fromData
方法来构建一些简单源流,实际上flink支持在配置直接直接指明文件流或者socket
流,因为socket
流相对于物理文件流更常用,所以我们给出一个采集本地9999
端口的socket
流示例:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.socketTextStream("localhost", 9999);
因为我们本案例发送的数据格式为hello,序列化的person对象的json字符串
,所以收到数据流之后需要对数据进行提取转换,所以我们还是通过map和filter完成映射转换和过滤:
dataStream.map(s -> {
String jsonStr = s.substring(s.indexOf(",")+1);
Person person = JSONUtil.toBean(jsonStr, Person.class);
return person;
})
.filter(p -> p.getAge() > 18).print();
env.execute();
为了方便测试,笔者这里给出个人服务端socket
代码使用示例,当然读者也可以在自己的系统上使用nc示例完成:
public static void main(String[] args) {
try {
// 1. 创建ServerSocket,监听9999端口
ServerSocket serverSocket = new ServerSocket(9999);
System.out.println("服务器启动,等待客户端连接...");
// 2. 接受客户端连接
Socket clientSocket = serverSocket.accept();
System.out.println("客户端已连接: " + clientSocket.getInetAddress());
// 3. 获取输出流,用于向客户端发送数据
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
// 4. 每隔1秒发送"hello"
while (true) {
Person person = new Person(RandomUtil.randomString(3), RandomUtil.randomInt(35));
out.println("hello," + JSONUtil.toJsonStr(person));
System.out.println("服务器发送: hello " + JSONUtil.toJsonStr(person));
Thread.sleep(1000); // 暂停1秒
}
// 注意:这里为了简化代码,没有关闭资源,实际应用应该添加try-catch-finally
} catch (Exception e) {
e.printStackTrace();
}
}
可以看到转换和实际收到的数据流结果如下:
2> {
"name":"Qex","age":27}
7> {
"name":"nH7","age":25}
14> {
"name":"zmN","age":34}
在实际的应用中这种配置方式常用于那些高吞吐、低延迟的数据源,例如Kafka这样的消息中间件,这一点flink也提供和上述一样方便的操作API。
基本流接收器
上文过滤出成年person的例子中我们在完成filter过滤后调用print方法进行打印输出,实际上其原理本质上就是为这个源流添加一个以打印输出的sink,这一点我们可以查看DataStream的print方法源码知晓:
@PublicEvolving
public DataStreamSink<T> print() {
PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
return addSink(printFunction).name("Print to Std. Out");
}
同时我们也需要说明在输出结果前面类似于14>
、7>
代表当前输出是由哪个并行流线程(子任务)执行。
当然关于接收器我们也可以基于源数据类型进行自定义,例如下面这段代码,笔者指明源数据为person希望按照我们预期的方式打印,可通过创建一个SinkFunction
指明person
泛型重写invoke实现自定义输出逻辑:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<Person> list = Arrays.asList(new Person("Alice", 18),
new Person("Bob", 28),
new Person("Charlie", 32));
//使用 fromData 关联源数据
DataStreamSource<Person> source = env.fromData(list);
//添加一个person的s