scala flink 实现自定义水位线,定时生成水位线

发布于:2025-03-02 ⋅ 阅读:(115) ⋅ 点赞:(0)

main:

val myConsumer = new FlinkKafkaConsumer[String]("topic_name", new SimpleStringSchema(), props)
.setStartFromEarliest();

val stream = env.addSource(myConsumer)
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator[UserData](new WatermarkGeneratorSupplier[UserData] {
override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[UserData] =
new PeriodicWatermarkGenerator
}).withTimestampAssigner(new SerializableTimestampAssigner[UserData] {
override def extractTimestamp(element: UserData, recordTimestamp: Long): Long = element.time * 1000L
}))

private class PeriodicWatermarkGenerator[T] extends WatermarkGenerator[T] {
private val maxOutOfOrderness = 10000L // 允许 10 秒乱序
private var currentMaxTimestamp: Long = Long.MinValue + maxOutOfOrderness

override def onEvent(event: T, eventTimestamp: Long, output: WatermarkOutput): Unit = {
currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp)
}

override def onPeriodicEmit(output: WatermarkOutput): Unit = {
// 定期生成水位线,即使没有新数据
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness))
currentMaxTimestamp +=10000L // 10s 生产一个水位线
}
}


网站公告

今日签到

点亮在社区的每一天
去签到