flink超时未揽收单量统计

发布于:2025-05-12 ⋅ 阅读:(15) ⋅ 点赞:(0)

应用场景: 双十一大屏统计 - - 订单超时汇总

项目指标概况:

应用背景:晚点超时指标,例如:出库超6小时未揽收订单量

难点:flink消息触发式计算,没有消息到达则无法计算,而这类指标恰好是要求在指定的超时时刻计算出有多少“未到达”的消息,可以预警出订单积压等异常现象

方案1:flink往db里面高TPS写,产品前端高RPS查询OLAP数据库明细,大促数据洪峰场景因查询暴增会使得数据库压力打满,明细查询的方式势必不能支持日后大促暴涨的单量

方案2:metaQ定时消息,订单消息写入metaQ,利用metaQ的定时消息功能,根据用户写入的消息和时间,在指定时刻下发,flink接收两个数据源(kafka订单流,kafka出库流,metaQ延时消息流)判断订单是否超时揽收,这种方式除了需要维护flink程序,同时还要保障额外的消息中间层维护

方案3:flinkcep,使用起来确实比较简便,但是实际在统计上和真实结果有一定出入,原因是出库时间会被回传多次,开始回传的是9点,后面发现回传错了,又改成了8点,而cep的watermark是全局向前走的,对于这种场景,无法很好的适配

方案4:flink的processfunction,是一个low-level流处理操作,通过改写其中的Processelement方法,可以告诉flinkstate里面存什么,以及如何更新state。通过改写ontimer方法,可以告诉state何时下发超时消息

具体操作:

1.首先,根据业务主键物流订单code将消息做keyby处理,不同主键值的消息分流到不同的partition里面,生成keyedstream,因为在后续processfuntion中操作的state是valuestate类型的,即每一个key值对应一个state,更新是以key粒度(一个物流订单)进行的

2.每一条消息在processfuntion中处理时,为每个key的消息计算出timeoutmemont,并将该时刻注册到timeservice的定时器中,同时存储该消息至state,当同一个key值有多条消息到来时,可根据消息状态对state进行更新

3.当机器时间来到timeoutmemont时,timeserivcr中的定时器会自动回调ontimer函数,我们事先已经在ontimer函数中定义好操作:获取state,并判断标志位进行下发

如此一来,便做到了:制造出超时消息,并将其暂存在flink state中

该方案优势:

1.部署,运维成本比较低,不需要引入额外的消息中间件;

2.性能优良:

source rps:avg 3k/s,max:4.5k/s

sink rps:avg 1.5k/s,max:2.4k/s

延迟:avg:2.5/s,max:3.7s

3.通用化,复用性高,可以复用到各类业务场景,只需要修改下配置(超时时间)

1. 超时检测逻辑(改写ProcessFunction)

public class TimeoutDetector extends KeyedProcessFunction<String, Row, Tuple2<String, Long>> {

    private ValueState<Long> createTimeState;
    private ValueState<Boolean> scanFlagState;

    @Override
    public void open(Configuration parameters) {
        // 初始化出库时间状态
        createTimeState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("createTime", Long.class));
        // 初始化揽收标记状态
        scanFlagState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("scanFlag", Boolean.class));
    }

    @Override
    public void processElement(Row row, Context ctx, Collector<Tuple2<String, Long>> out) {
        String eventType = row.getFieldAs("event_type");
        Long eventTs = ((Timestamp)row.getFieldAs("ts")).getTime();

        if ("create".equals(eventType)) {
            // 记录出库时间并注册定时器
            createTimeState.update(eventTs);
            ctx.timerService().registerEventTimeTimer(eventTs + 6 * 3600 * 1000);
        } else if ("scan".equals(eventType)) {
            // 标记已揽收
            scanFlagState.update(true);
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) {
        if (scanFlagState.value() == null || !scanFlagState.value()) {
            // 输出超时订单ID与出库时间
            out.collect(Tuple2.of(ctx.getCurrentKey(), createTimeState.value()));
        }
        // 清理状态
        createTimeState.clear();
        scanFlagState.clear();
    }
}

在flinksql中调用udf

CREATE TEMPORARY FUNCTION CheckTimeout AS 'com.example.TimeoutDetector';

SELECT 
    order_id,
    ship_time,
    CheckTimeout(order_id, ship_time) AS is_timeout
FROM (
    SELECT 
        order_id,
        event_time AS ship_time
    FROM order_events
    WHERE event_type = 'SHIP'
) 
WHERE is_timeout = true;

doris sink 

INSERT INTO doris_sink_table
SELECT 
    TUMBLE_START(ts, INTERVAL '1' HOUR) AS window_start,
    COUNT(order_id) AS timeout_count
FROM timeout_orders
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

方案3

flinkcep处理:

-- 步骤1:定义数据源表(Kafka输入)
CREATE TABLE order_events (
  order_id         STRING,
  event_type       STRING,   -- 'outbound'=出库, 'collection'=揽收
  event_time       TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND  -- 允许5秒乱序
) WITH (
  'connector' = 'kafka',
  'topic'     = 'order_events',
  'scan.startup.mode' = 'latest-offset',
  'format'    = 'json'
);

-- 步骤2:使用 MATCH_RECOGNIZE 进行超时模式匹配
CREATE TABLE timeout_orders (
  order_id         STRING,
  outbound_time    TIMESTAMP(3),
  timeout_reason   STRING
) WITH (
  'connector' = 'kafka',
  'topic'     = 'timeout_alerts',
  'format'    = 'json'
);

INSERT INTO timeout_orders
SELECT 
  order_id, 
  outbound_time, 
  '未在6小时内揽收' AS timeout_reason
FROM order_events
MATCH_RECOGNIZE (
  PARTITION BY order_id
  ORDER BY event_time
  MEASURES
    A.event_time AS outbound_time,
    LAST(B.event_time) AS collection_time
  ONE ROW PER MATCH
  AFTER MATCH SKIP TO LAST B
  PATTERN (A NOT? B*) WITHIN INTERVAL '6' HOUR  -- 超时窗口定义
  DEFINE
    A AS event_type = 'outbound',
    B AS event_type = 'collection' AND 
      B.event_time <= A.event_time + INTERVAL '6' HOUR
)
WHERE collection_time IS NULL;  -- 未匹配到揽收事件

关键设计解析

  1. 时间语义处理

    • WATERMARK 定义处理5秒内的乱序事件,与物流场景常见的网络延迟匹配 
    • WITHIN INTERVAL '6' HOUR 精确控制超时窗口,符合出库后6小时未揽收的业务规则 
  2. 模式匹配逻辑

    • PATTERN (A NOT? B*) 表示匹配出库事件后未出现揽收事件(NOT操作符)
    • DEFINE B 中增加时间约束,确保揽收事件在出库后6小时内发生
  3. 结果处理优化

    • ONE ROW PER MATCH 减少重复告警,每个订单仅触发一次超时事件
    • AFTER MATCH SKIP TO LAST B 跳过已处理事件,提升处理效率 

优化方式:

1.rocksdb statebackend RocksDB 将状态存储在磁盘而非内存,适合 TB 级状态;增量检查点仅保存差异数据,减少 IO 压力。

2.大字段/无关字段去除

3.statettl=13h(12h+1h缓冲)自动清理已完成窗口的过期状态,避免状态无限膨胀

4.检查点间隔与异步快照,增大checkpoint时间间隔

SET 'execution.checkpointing.interval' = '10min';  -- 增大间隔降低IO压力
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.unaligned' = 'true';  -- 启用非对齐检查点

三、技术优化策略

  1. 状态管理优化

    • 启用RocksDB状态后端:state.backend: rocksdb
    • 设置TTL自动清理过期订单状态:table.exec.state.ttl = 2h
  2. 性能调优

    • 调整并行度:SET 'parallelism.default' = 8;
    • 启用MiniBatch聚合:table.exec.mini-batch.enabled = true
  3. 容错机制


 


网站公告

今日签到

点亮在社区的每一天
去签到