流式计算中的window机制

发布于:2023-02-07 ⋅ 阅读:(691) ⋅ 点赞:(0)

一、摘要

  1. 概述流式计算跟批计算,以及实时数仓和离线数仓的区别;引出流式计算中的window计算定义以及挑战
  2. 介绍实时计算中的Watermark概念,以及如何产生、传递,还有一些典型的生产实践中遇到的问题
  3. 介绍三种最基本的window类型,以及他们的实现原理;同时会结合业务场景介绍一些高级优化的功能和原理
  4. 结合两个真实业务场景的需求,讲解window是如何解决实际生产问题的(字节Flink使用量峰值90亿QPS)
QPS:全名 Queries Per Second,意思是“每秒查询率”,是一台服务器每秒能够响应的查询次数,是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准

二、概述流式计算机制

简述流式计算的基本概念,与批式计算相比的难点和挑战

    数据价值:实时性越高,数据价值越高

2.1批处理

批处理模型典型的数仓架构为T+1架构,即数据计算时天级别的,当天只能看到前一天的计算结果
通常使用的计算引擎为Hive或者Spark等。计算的时候,数据是完全ready的,输入和输出都是确定性的

2.2实时处理

实时计算:处理时间窗口
数据实时流动,实时计算,窗口结束直接发送结果,不需要周期调度任务

处理时间(Processing time):数据在流式计算系统中真正处理时所在机器的当前时间。
处理时间这是在系统中观察事件的时间
事件时间(Event time):数据产生的时间,比如客户端、传感器、后端代码等上报数据时的时间。
事件时间这是事件实际发生的时间

实时计算:事件时间窗口(传入,运算,到输出时间总和)
数据实时进入到真实事件发生的窗口中进行计算,可以有效的处理数据延迟和乱序

窗口真实结束时间(延迟与乱序)衡量:watermark
在数据中插入—些watermark,来表示当前的真实时间。
在数据存在乱序的时候,watermark就比较重要了,它可以用来在乱序容忍和实时性之间做一个平衡。


三、watermark(水印)

Watermark的含义、生成方法、传递机制,以及—些典型场景的问题和优化

    概念:表示系统认为的当前真实的事件时间

3.1产生watermark

可以通过Watermark Generator 来生成
一般是从数据的事件时间来产生,产生策略可以灵活多样,最常见的包括使用当前事件时间的时间减去一个固定的delay(下列-5s,-20s,即延迟),来表示可以可以容忍多长时间的乱序。
SQL:

CREATE TABLE Orders(
    user BIGINT,  
    product STRING, 
    order_time TIMESTAMP(3), 
    WATERMARK FOR order_time AS order_time-INTERVAL '5' SECOND
) WITH(...);

C++/DataStream:

WatermarkStrategy
.<Tuple2<Long,String>>forBoundedoutofOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event,timestamp)->event.f0);

3.2传递watermark

取上游传递watermark最小值,且依次递减(类Checkpoint快照传递)

3.3观察一个任务中的watermark量

  • 一般通过Flink Web UI上的信息来观察当前任务的watermark情况
  • (没有输出),是生产实践中最容易遇到的问题,开发事件时间的窗口任务的时候,经常会忘记了设置watermark,或者数据太少,watermark没有及时的更新,导致窗口一直不能触发

找watermark时间戳,对照是否触发,是否配置错误

3.4 典型问题

Per-partition(分区) Vs per-subtask(子任务) watermark生成

Per-subtask watermark生成(子任务)
早期版本都是这种机制。典型的问题是如果一个source subtask(源任务)消费多个partition(下游的区),那么多个partition之间的数据读取可能会加剧乱序程度。

Per-partition watermark生成(分区)
新版本引入了基于每个 partition单独的watermark生成机制,这种机制可以有效避免上面的问题。

部分partition/subtask断流

根据上面提到的watermark传递机制,下游subtask 会将上游所有subtask的watermark值的最小值作为自身的watermark值。如果上游有一个subtask的 watermark不更新了,则下游的watermark都不更新。

解决方案:ldle source(空闲)
当某个subtask断流超过配置的idle超时时间时,将当前subtask置为idle,并下发一个idle的状态给下游。下游在计算自身watermark的时候,可以忽略掉当前是idle的那些subtask(子任务)。

迟到数据处理

因为watermark表示当前事件发生的真实时间,那晚于watermark的数据到来时,系统会认为这种数据是迟到的数据。

算子自身来决定如何处理迟到数据:
Window聚合,默认会丢弃迟到数
双流join,如果是outer join,则可以认为它不能join到任何数据
CEP,默认丢弃


四、window(窗口)

Window基本功能和高级优化

4.1.基本功能

window分类

  • 典型的Window:

  1. Tumble Window(滚动窗口)

  2. Sliding Window(滑动窗口)

  3. Session Window(会话窗口)

  • 其它Window:

  1. 全局Window
  2. Count Window
  3. 累计窗口 等

window使用
API应用程序编程接口,抽象分层
抽象程度越高,用户使用成本低,表达能力低(有限)

4.1.1滚动窗口

这是最常见的窗口类型,就是根据数据的时间(可以是处理时间,也可以是事件时间)划分到它所属的窗口中windowStart = timestamp - timestamp % windowSize,这条数据所属的window就是[windowStart, windowStart + windowSize)

窗口划分:
1.每个key单独划分
2.每条数据只会属于一个窗口

窗口触发:
Window结束时间到达的时候一次性触发

4.1.2滑动窗口

窗口划分:
1.每个key单独划分
2.每条数据可能会属于多个窗口

窗口触发:
Window结束时间到达的时候一次性触发

4.1.3会话窗口

窗口划分:
1.每个key单独划分
2.每条数据会单独划分为一个窗口,如果window之间有交集,则会对窗口进行merge(合并)

窗口触发:
Window结束时间到达的时候一次性触发

4.1.4 迟到数据的处理

迟到数据定义
一条数据到来后,会用WindowAssigner 给它划分一个 window(start,end),一般时间窗口是一个时间区间,比如[10:00,11:00),如果划分出来的 window end 比当前的 watermark 值还小,说明这个窗口已经触发了计算了,这条数据会被认为是迟到数据。

迟到数据产生:
只有事件时间,实际时间下才会有迟到的数据。
处理时间(系统观测,理论时间)不产生

迟到数据默认处理:
丢弃
(此外处理)

  1. Allow lateness
    这种方式需要设置一个允许迟到的时间。设置之后,窗口正常计算结束后,不会马上清理状态,而是会多保留 allowLateness 这么长时间,在这段时间内如果还有数据到来,则继续之前(先retreat回滚,然后在缓存区继续,参考快照)的状态进行计算。
    适用于:DataStream、SQL
     
  2. SideOutput(侧输出流)
    这种方式需要对迟到数据打一个 tag,然后在 DataStream 上根据这个 tag 获取到迟到数据流,然后业务层面自行选择进行处理。
    适用于:DataStream

4.1.5 增量VS全量计算

增量计算(更优,平缓,保存少,只保存中间结果):

  • 每条数据到来,直接进行计算,window只存储计算结果。 比如计算sum,状态中只需要存储sum的结果,不需要保存每条数据。
  • 典型的reduce、aggregate等函数都是增量计算
  • SQL中的聚合只有增量计算
    全量计算(保存全体数据,大量缓存):
  • 每条数据到来,会存储到window的state中。等到window触发计算的时候,将所有数据(缓存,数据量,数据类型)拿出来一起计算。
  • 典型的process函数就是全量计算

4.1.6 EMIT 触发

  • 什么叫EMIT?
    通常来讲,Window 都是在结束的时候才能输出结果,比如 1h 的 tumble window,只有在 1 个小时结束的时候才能统一输出结果。
    如果窗口比较大,比如 1h 或者 1 天,甚至于更大的话,那计算结果输出的延迟就比较高,失去了实时计算的意义。
    EMIT 输出指的是,在 window 没有结束的时候,提前把 window 计算的部分结果输出出来。
  • 怎么实现?
    在DataStream里面可以通过自定义Trigger来实现,Trigger的结果可以是:
    (1) CONTINUE
    (2)FIRE(触发计算,但是不清理窗口状态)
    (3)PURGE
    (4)FIRE_AND_PURGE

SQL也可以使用,通过配置:
(1)table.exec.emit.early-fire.enabled=true
(2)table.exec.emit.early-fire.delay=(time)

4.2.高级优化

以下说的所有的高级优化,都只限于在SQL中的window中才有。在DataStream中,用户需要自己通过代码来实现类似的能力。

4.2.1 Mini-batch 优化

Mini-batch优化解决频繁访问(rocksdb statebackend每次的状态访问就都需要做一次序列化和反序列化)状态的问题
(多数据批处理)

即赞一小批数据再进行计算,这批数据每个key的state访问只有一次,这样在单个key的数据比较集中的情况下,对于状态访问可以有效的降低频率,最终提升性能。

4.2.2倾斜优化-local-global

local-global优化是分布式系统中典型的优化,主要是可以降低数据shuffle的量,同时也可以缓解数据的倾斜。
将原本的聚合划分成两阶段,第一阶段先做一个local的聚合,这个阶段不需要数据shuffle,是直接跟在上游算子之后进行处理的;第二个阶段是要对第一个阶段的结果做一个merge

4.2.3 Distinct 计算状态复用

Distinct 状态复用降低状态量
批:引擎都是通过把它优化成aggregate的方式来处理
流:对于count distinct这种情况,保存所有数据是否出现过这样子的一个映射(byte统计)

4.2.4 Pane 优化

Pane 优化降低滑动窗口的状态存储量

将窗口的状态划分成更小粒度的pane,最小单位,可组合为窗口
将这个窗口对应的pane的结果merge(合并,归并)起来就可以了


五、案例分析

抖音 DAU 实时曲线计算
大数据任务资源使用实时统计分析

5.1需求一:使用Flink SQL 计算抖音的日活曲线 DAU

SELECT 
    COUNT(DISTINCT uid) as dau 
    TUMBLE_START(event_time, INTERVAL '1' DAY) as wstart, 
    LOCALTIMESTAMP AS current_ts 
FROM user_activity 
GROUP BY 
    TUMBLE(event_time, INTERVAL '1' DAY)

table. exec. emit. early-fire. enabled=true
table. exec. emit. early-fire. delay=5min

  • 问题:所有数据都需计算,无法并行。
  • 解决方案:通过两阶段聚合来把数据打散,完成第一轮聚合,第二轮聚合只需要对各个分桶的结果求和即可。
SELECT 
    SUM(partial_cnt)as dau 
    TUMBLE_START(event_time,INTERVAL'1'DAY)as wstart,
    LOCALTIMESTAMP as current_ts 
FROM(
    SELECT 
        COUNT(DISTINCT uid)as partial_cnt,
        TUMBLE_ROWTIME(event_time,INTERVAL'1'DAY)as event_time 
    FROM user_activity 
    GROUP BY 
        TUMBLE(event_time,INTERVAL,'1'DAY),
        MOD(uid,10000)--根据uid分为10000个桶
)
GROUP BY TUMBLE(event_time,INTERVAL'1'DAY)

table.exec.emit.early-fire.enabled=true
table.exec.emit.early-fire.delay=5min
table.exec.window.allow-retract-input=true

5.2 需求二:使用Flink SQL 计算大数据任务的资源使用

  • 问题描述:
    大数据任务(特指离线任务)运行时通常会有多个 container 启动并运行,每个 container 在运行结束的时候,YARN 会负责将它的资源使用(CPU、内存)情况上报。一般大数据任务运行时间从几分钟到几小时不等。
  • 需求:
    根据 YARN 上报的各个 container 的信息,在任务结束的时候,尽快的计算出一个任务运行所消耗的总的资源。 假设前后两个 container 结束时间差不超过 10min
SELECT 
    application_id 
    SUM(cpu_usage)as cpu_total 
    SUM(memory_usage)as memory_total, 
FROM resource_usage 
GROUP BY 
    application_id, 
    SESSION(event_time,INTERVAL '10'MINUTE)
  • 典型的可以通过会话窗口来将数据划分到一个window中,然后再将结果求和即可。

本文含有隐藏内容,请 开通VIP 后查看