flink学习(10)——allowedLateness/测道输出

发布于:2024-11-28 ⋅ 阅读:(12) ⋅ 点赞:(0)

allowedLateness(lateness: Time)

水印:短期延迟,达到条件后触发计算并且关闭窗口(触发+关闭同时进行)
水印+allowedLateness : 短期延迟+ 等待长期延迟效果
1、达到水印条件后,会触发窗口计算,但是不关闭窗口
2、此时每进入一条数据就会触发一次窗口计算
3、达到allowedLateness 条件后(当数据到达的时间小于等于allowedLateness界限),关闭窗口,数据丢弃
4、水印时间间隔需要小于allowedLateness 的时间间隔
package com.bigdata.day05;

public class _03_CartInfo案例_waterMark_allowed_sideoutput {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<OrderInfo> orderInfoDataStreamSource = env.addSource(new MySource());
        SingleOutputStreamOperator<OrderInfo> watermarkSource = orderInfoDataStreamSource
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                .withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {
            @Override
            public long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {
                // 注意:时间戳和 watermark 都是从 1970-01-01T00:00:00Z 起的 Java 纪元开始,并以毫秒为单位。
                return orderInfo.getTimeStamp();
            }
        }));

//        OutputTag<OrderInfo> orderInfoOutputTag = new OutputTag<>("SideOutput", TypeInformation.of(OrderInfo.class));
        OutputTag<OrderInfo> orderInfoOutputTag = new OutputTag<OrderInfo>("SideOutput"){};
        SingleOutputStreamOperator<String> result = watermarkSource.keyBy(new KeySelector<OrderInfo, Integer>() {

                    @Override
                    public Integer getKey(OrderInfo value) throws Exception {
                        return value.getUId();
                    }
                }).window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(10))
                .sideOutputLateData(orderInfoOutputTag)
                .apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {
                    @Override
                    public void apply(Integer uId, TimeWindow window, Iterable<OrderInfo> input, Collector<String> out) throws Exception {
                        double sumMoney = 0;
                        String startStr = DateFormatUtils.format(window.getStart(), "yyyy-MM-dd HH:mm:ss");
                        String endStr = DateFormatUtils.format(window.getEnd(), "yyyy-MM-dd HH:mm:ss");

                        for (OrderInfo orderInfo : input) {
                            sumMoney += orderInfo.getMoney();
                            String eventStr = DateFormatUtils.format(orderInfo.getTimeStamp(), "yyyy-MM-dd HH:mm:ss");
                            out.collect("uid: " + uId + "," + "sumMoney: " + sumMoney + "," + "startTime: " + startStr + "," + "endTime: " + endStr + "eventTime: " + eventStr + "处理时间: " + DateFormatUtils.format(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"));
                        }


                    }
                });
        result.print("正常和迟到的订单");
        result.getSideOutput(orderInfoOutputTag).print("严重迟到的订单");

        env.execute();
    }
}

结果展示:

侧输出-SideOutput 

此时窗口已经关闭,但是还会有迟到的数据到来,放到另一个数据流中操作即可。
package com.bigdata.day05;

public class _03_CartInfo案例_waterMark_allowed_sideoutput {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<OrderInfo> orderInfoDataStreamSource = env.addSource(new MySource());
        SingleOutputStreamOperator<OrderInfo> watermarkSource = orderInfoDataStreamSource
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                .withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {
            @Override
            public long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {
                // 注意:时间戳和 watermark 都是从 1970-01-01T00:00:00Z 起的 Java 纪元开始,并以毫秒为单位。
                return orderInfo.getTimeStamp();
            }
        }));

// 两种创建标签的方式
//        OutputTag<OrderInfo> orderInfoOutputTag = new OutputTag<>("SideOutput", TypeInformation.of(OrderInfo.class));
        OutputTag<OrderInfo> orderInfoOutputTag = new OutputTag<OrderInfo>("SideOutput"){};
        SingleOutputStreamOperator<String> result = watermarkSource.keyBy(new KeySelector<OrderInfo, Integer>() {

                    @Override
                    public Integer getKey(OrderInfo value) throws Exception {
                        return value.getUId();
                    }
                }).window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(10))
                .sideOutputLateData(orderInfoOutputTag)
                .apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {
                    @Override
                    public void apply(Integer uId, TimeWindow window, Iterable<OrderInfo> input, Collector<String> out) throws Exception {
                        double sumMoney = 0;
                        String startStr = DateFormatUtils.format(window.getStart(), "yyyy-MM-dd HH:mm:ss");
                        String endStr = DateFormatUtils.format(window.getEnd(), "yyyy-MM-dd HH:mm:ss");

                        for (OrderInfo orderInfo : input) {
                            sumMoney += orderInfo.getMoney();
                            String eventStr = DateFormatUtils.format(orderInfo.getTimeStamp(), "yyyy-MM-dd HH:mm:ss");
                            out.collect("uid: " + uId + "," + "sumMoney: " + sumMoney + "," + "startTime: " + startStr + "," + "endTime: " + endStr + "eventTime: " + eventStr + "处理时间: " + DateFormatUtils.format(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"));
                        }


                    }
                });
        result.print("正常和迟到的订单");
        result.getSideOutput(orderInfoOutputTag).print("严重迟到的订单");

        env.execute();
    }
}