main:
val myConsumer = new FlinkKafkaConsumer[String]("topic_name", new SimpleStringSchema(), props)
.setStartFromEarliest();
val stream = env.addSource(myConsumer)
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator[UserData](new WatermarkGeneratorSupplier[UserData] {
override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[UserData] =
new PeriodicWatermarkGenerator
}).withTimestampAssigner(new SerializableTimestampAssigner[UserData] {
override def extractTimestamp(element: UserData, recordTimestamp: Long): Long = element.time * 1000L
}))
private class PeriodicWatermarkGenerator[T] extends WatermarkGenerator[T] {
private val maxOutOfOrderness = 10000L // 允许 10 秒乱序
private var currentMaxTimestamp: Long = Long.MinValue + maxOutOfOrderness
override def onEvent(event: T, eventTimestamp: Long, output: WatermarkOutput): Unit = {
currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp)
}
override def onPeriodicEmit(output: WatermarkOutput): Unit = {
// 定期生成水位线,即使没有新数据
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness))
currentMaxTimestamp +=10000L // 10s 生产一个水位线
}
}