1. DataStream API
原理
DataStream
是 Flink 流处理的核心抽象,代表一个无界的数据流。数据流中的元素可以是任何类型(需可序列化)。通过 Source
读取数据,经过转换操作(如 map
、filter
、keyBy
),最终由 Sink
输出。
建议:
优先使用 Flink 内置算子(如
keyBy
、reduce
)而非自定义函数避免在算子中维护可变状态,使用 Flink 的托管状态(
State
)设置合理的并行度
代码示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从Socket读取数据源
DataStream<String> textStream = env.socketTextStream("localhost", 9999);
// 转换操作:拆分单词并计数
DataStream<Tuple2<String, Integer>> wordCounts = textStream
.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
for (String word : line.split(" ")) {
out.collect(new Tuple2<>(word, 1));
}
})
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(value -> value.f0) // 按单词分组
.sum(1); // 求和
wordCounts.print(); // 输出到控制台
env.execute("WordCount");
2. Watermark
原理
Watermark 是事件时间处理的机制,用于解决乱序事件问题。它是一个特殊的时间戳,表示“早于该时间戳的数据已全部到达”。当 Watermark 超过窗口结束时间时,触发窗口计算。
建议:
根据数据乱序程度设置
allowedLateness
使用
BoundedOutOfOrderness
或Punctuated
策略监控 Watermark 生成延迟
代码示例:
DataStream<Event> events = ... // 输入数据流
DataStream<Event> withTimestamps = events
.assignTimestampsAndWatermarks(
// 允许最大乱序时间 = 5秒
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);
3. TimeWindow
原理
基于时间的窗口,分为两种:
滚动窗口(Tumbling):固定大小、不重叠
滑动窗口(Sliding):固定大小、可重叠
建议:
事件时间窗口需搭配 Watermark
处理时间窗口适用于低延迟需求
避免过大窗口导致状态膨胀
代码示例(滚动事件时间窗口):
DataStream<Tuple2<String, Integer>> result = withTimestamps
.keyBy(event -> event.getKey())
.window(TumblingEventTimeWindows.of(Time.seconds(30))) // 30秒滚动窗口
.reduce((a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1)); // 聚合
4. SessionWindow
原理
根据活动间隙划分窗口。当相邻事件的时间差超过 gap
时,关闭当前窗口并开启新窗口。
建议:
动态窗口大小,适合用户行为分析
调整
gap
参数平衡延迟和准确性结合
GlobalWindow
+ 自定义触发器实现复杂会话
代码示例:
DataStream<Tuple2<String, Integer>> sessionResult = withTimestamps
.keyBy(event -> event.getKey())
.window(EventTimeSessionWindows.withGap(Time.minutes(10))) // 10分钟不活动则关闭
.sum(1);
5. CountWindow
原理
基于元素数量的窗口:
滚动计数窗口:每 N 个元素触发
滑动计数窗口:每 S 个元素滑动,计算最近 N 个元素
建议:
适用于物理设备信号处理等计数场景
注意:不基于时间,可能长时间不触发
避免在数据稀疏时使用
代码示例(滚动计数窗口):
DataStream<Tuple2<String, Double>> sensorData = ...
DataStream<Tuple2<String, Double>> avgTemp = sensorData
.keyBy(sensor -> sensor.f0)
.countWindow(100) // 每100个元素一个窗口
.aggregate(new AverageAggregate()); // 自定义聚合函数
public static class AverageAggregate
implements AggregateFunction<Tuple2<String, Double>, Tuple2<Double, Integer>, Double> {
// 实现聚合逻辑
}