**摘要:**本文整理自硕橙科技大数据工程师程兴源老师在 Flink Forward Asia 2024 Data+AI 专场中的分享。内容主要为以下几个部分:
一、Why Flink
二、算法工作流的设计
三、性能优化的探索之路
四、上下游链路协作的思考
五、未来展望
我们团队是一家专注于 To B 的公司,致力于工业互联网领域,与传统面向消费者的企业有着不同的业务模式。To C 端企业通常在早晨流量较为平稳,而到了晚上流量会急剧增加。相比之下,我们团队处理的数据,包括温度、振动、电流以及其他各种物理量,需要实时上报,这是我们与 To C 端企业的一个显著区别。此外,我们所面向的业务场景与一些大型企业也有所不同。大型企业可能专注于从 1 到 100 的规模化扩展,而作为一家初创公司,我们更多地专注于从零到一的创新和开拓。
接下来将从以下五个模块来介绍团队的工作。首先,探讨我们为何选择使用 Flink。接着,将分析算法工作流的设计理念及其背后的原因。围绕这一设计理念,我们进行了哪些性能优化?第四点,将深入了解我们如何在业务系统的摄像头链路中进行协作,并分享相关思考。最后,将展望未来,讨论我们为何采用 Flink。
一、Why Flink?
公司的业务现状以及如何与Flink 相结合
我们团队是一家专注于智能硬件的公司,致力于为不同厂商提供全场景、全链路的服务。通过利用团队特有的特征库和 AI 算法,我们能够进行工业故障的行为分析,从而提升设备的运行效率和可靠性。
团队的工作流主要服务于特定的行业场景,包括风电、石油化工、矿山矿业、兵器工业、船舶、汽车、港口货运以及能源行业。结合团队自主研发的采集站和多维数据处理能力,我们致力于为客户提供 AI 预测性维护解决方案。团队的算法工作流设计旨在支持不同类型的客户,从部署自研硬件和第三方传感器获取传感数据开始,直至为被监测设备的每个部件提供专业的诊断监测报告和数据分析报告。
研发的关键在于构建了一个可视化画布,用户可以定义各种数据源,并通过拖拽的方式组成一个算子链。这些算子可以是分析函数,也可以是类似于神经网络的复杂算法。团队的目标是多方面的:首先,使开发人员能够更便捷地编写插件;其次,使客户能够在控制台中更灵活地配置算子网络工作流;最后,使客户能够迅速获得数据分析报告。
首先,简要介绍团队的整体架构图。团队的工作流程始于数据采集站,数据首先被上传至边缘网关,然后存储到历史数据仓库中。接下来,团队从边缘网关获取数据,并将其与算法控制台的配置相关联。算法控制台的配置会与边缘端的 Kafka 系统同步。当数据被推送到算法依赖框架时,该框架利用评估和识别算法生成不同的算法结果,这些结果最终也会存储到历史数据仓库中。此外,还有一条分支路径,通过算法数据缓存生成两种不同的特征分析报告:一种是特征分析报告,另一种是评估分析报告。
二、算法工作流点设计
公司的业务现状以及简介
团队工作控制台负责配置算法控制流和数据源,数据源可以是 Kafka 或历史数据仓库。在算法一中,可能采用分类评估算法。例如,团队可能设计了一个从 2K 到 4K、6K 到 18K 的分类识别算法,用于将数据分为不同的区间。接下来,根据特定的频段进行噪声识别。例如,如果工况大于某个特定的 K 值,数据会被归入区间一;如果工况大于另一个 K 值,则归入区间二。最后,团队应用事件算法,将各种算法的输出匹配到不同类型的事件中。为了让大家更清楚地了解,以下将展示团队算法输出结果的具体形式。
实际上,这一过程是对设备健康状况的检测,以确定其处于危险、中等还是安全状态。团队会根据设备的故障类型、原因和工作条件进行检测,并在大屏幕上展示检测结果。这个全自动数据是中间数据,是团队通过上游迭代计算得出的成果,而配用端的原始数据则是团队算法流程设计的关键所在。
首先,团队需要确保在同一个座位内可以同时运行多个点位。这些点位可以包括电流传感器、温度传感器或声音传感器。为了保证多点位在同一个座位内顺利运行,团队会进行计算,并将算法结果输出给用户。团队拥有算法实力的概念,意味着我们可以为不同厂商(如厂商 ABCDE)配置不同的工作流。创建完一个算法实例后,团队的算法实力可以将上游算法实例的结果传递给下游算法实例,形成一个逻辑上的工作流。
其次,团队的数据是从边缘端上传并输出到算法控制台的。算法框架计算完成后,会将算法结果回推给边缘端的分发端。分发端再次下发数据,进行告警和进一步的算法处理。
三、性能优化的探索之路
寻找可能落地的方案
在这个过程中,团队会面临性能优化的问题,这正是团队数据样本构建方案的关键所在。
团队采用了 Intel 的 i5-10400 CPU,配备四核和 8GB 内存,使用的 PyFlink 版本为 1.16.1。测试方法是在一个点位上每天每秒记录一条数据,总共记录 86400 条数据。团队进行了不同数据量的测试,分别评估了在各种情况下三个和五个算子的性能表现。在测试中,团队比较了输出结果与上一个算子的输出 Schema,以及在不输出 Schema 的情况下性能的变化。
测试的关键点在于 PyFlink,它在实际应用中需要指定输出显示的 Schema 类型。具体而言,这些数据包括高分震动、高分均值、高分采样等,它们实际上是团队边缘端上传的原始数据。这些数据在一个算子输出时,由用户直接定义,用于测试三个算子的性能和性能损耗。
上图是分别是三个、五个以及十个算子的测试结果, 在没有输出耗时的情况下,随着数据量的增加和时间的推移,性能表现可能会变慢。
团队一旦发现问题,就需要定位问题所在。PyFlink 是一个典型的跨进程通信场景。在 PyFlink 中,每写一个 Python 作业,虽然看起来像是配置算子,但实际上是在 JVM 里面运行的。在配置算子时,JVM 会将输入数据递送到配置的一个 Runner,其他执行器会去序列化数据,通过递送序列化数据,经过一个叫做 Bundle Processor 的地方。
Bundle Processor 处理数据有两种方式:一种是通过 Beam 的方式,即市面上流通的高效的双跑框架;另一种是通过架构,也就是阿里开源的跨源通信框架,团队使用的是第二种。团队目前正致力于解决一个关键问题:从反序列化数据到经过 PVM 处理,最终返回到过程中,可能会出现基于 Python 本身的性能损耗。
探索之路
接下来,团队将探索一条更加优化的道路。起初,团队考虑了批量化处理,发现传统的序列化格式并不适用,因此探讨了是否可以采用二进制格式进行序列化,例如 Protocol Buffers (PB) 或 Avro。同时,团队还考虑通过增加并行度来加速作业的执行。经过一系列调优过程,团队逐步优化了数据处理的效率。
在第一个阶段,团队尝试了使用 Avro 进行序列化类型推导。由于团队的整个培训作业都是使用 Data Stream 进行的,需要满足上层用户通过拖拉拽拼接来映射到相应的 Python 分析函数。因此,团队必须定义一个可解释的算法工作流,其中每个节点都会调用算子(包括评估、识别以及阻塞)进行分析。
如果团队有一个包含 A、B、C 三个算子的工作流,而从 A 到 B 算子的输出 Schema 不一致,团队就需要进行类型推导。这是一个复杂的过程,团队发现它并不能完全解决问题。选择放弃 Avro 的原因在于其在 PyFlink 中的当前序列化实现仅支持从 source 到 map 再到 sink 的简单链路。如果处理链路较长,涉及到多个算子,Avro 格式可能会出现问题,比如类型冲突异常等。因此,团队在尝试后决定放弃这条路线。
接下来,团队考虑是否可以基于其他数据格式进行高效处理的调优。首先,团队会在算法和数据链路中进行优化。正如之前提到的,当上传数据时,系统会提取一系列特征值,如高分均值、高分震动、噪声区间等特征值。团队首先会执行行转列的操作,以确保能够进行多维向量的压缩。压缩完成后,根据点位进行分流。在处理过程中,每个点位会经过一系列插件处理。
这些插件处理包括使用分类、噪声评估等算法。这些算法作为插件,会将输出进行合并。在这个过程中,团队需要根据点位信息进行分区。例如,假设团队有三个点位,分别处理噪声、电流、震动,虽然都使用相同的工作流,但点位 A 使用的插件不同。这时,团队只能通过 KeyBy 进行分区。在 KeyBy 的过程中,团队遇到了三种方式:一种是基于时间窗口的多流分发计算,第二种是直接基于 value 的分发,即根据输入值的区间直接分发到相应的插件执行。第三种则是关于识别算法的输出与事件评估算法之间的数据流处理。在识别算法得出结果后,评估算法会引入一条数据流,与另外两条流合并,通过一个 merge key 操作进行数据整合。团队将批量数据验证算法以表值函数的形式运行,以实现数据的并行处理和均衡化。这样做是为了优化数据处理效率。
团队将所有字段作为 UDF 的参数,利用已有的插件化方法,基于 View 编写函数,并将其转换为 UDF。之后,基于 Schema 进行计算。根据 Partition By 产生的点位进行分区自增 ID,这个概念可能有些难以理解,因为在传统的 Data Stream 处理方式中,需要将中间流拉取过来,与其他流进行 Merge Key 操作。这会导致一个问题,即如何将 Data Stream 中的 Key 等价于 Circle 中的语法。目前,除了阿里云的商业版,这种行为并不通用。解决方法类似于 Spark 中的 random() over Partition By,通过为每个分区内的元素打标签,区分每个分区产生的数据。然后,在前端采用 Mini Batch 和两阶段提交的方式进行优化。
优化结果
如上图所示,采用批模式加 Pandas UDF 的方式可以清晰地展示出,前面的反压是偶尔形成的,并不会经常出现。而中间的反压是横跨 KeyBy 的,这种方式会导致一部分性能损耗在 KeyBy 上。这是团队接下来可能要讨论的关键问题——KeyBy 均衡化。
KeyBy 均衡化的收益在于优化数据流的分布,使得每个算子能够更均衡地处理数据,减少性能损耗。在 Flink 的批模式中,系统会首先调度所有算子,无论数据是否流到该算子,都会进行资源调度。资源闲置时,它会按照上下游算子的关系分成多个 Stage,按顺序调度。下游算子的调度是从 KeyBy 开始,经过三次流合并后到达下游。分流指的是中间的三个分流,因为这种方式没有流到下游算子,所以算子一直处于阻塞等待状态。其优势在于从 Source 到第一个算子的速度相对较快。
那么将如何在团队中运用 keyby 来实现设计目标呢?
简单来说,就是通过 KeyBy 来计算当前 key 应该分配到哪个 Subtask 执行。这个过程分为两步:首先,你需要计算这个 key 属于哪个 key group。假设并行度为八,那么将这个并行度乘以 1.5 后向上取整到二的 n 次幂。第二步是与 128(即二的七次方)以及二的 15 次方进行比较,取最大值和最小值。团队计算出这个 key 属于哪个 key group 后,下一步就是确定这个 key group 中哪一个 Subtask 能够执行这个 key。这需要从它的 Long run time 中取出当前并行度要计算的 key group 进行取模运算,上图最下面是一部分打印结果。
下面是代码,感兴趣的同学可在网址自取。
Pyflink 性能优化
团队从内存方向进行优化,同时也在数据均衡化方面进行了改进。
内存优化
Flink 的内存模型主要有两种方式:基于 Managed Memory 管理内存和基于堆外内存。在处理大量基于 UDF 的计算时,Flink 会调度较多的 BeamPythonRunner 线程。每个线程在执行 UDF 计算时,会累加所有需要的内存,得出最终 Python 解释器需要申请的内存。
通常,Flink 的 Managed Memory 不仅用于 Python 解释器的内存占用,还用于算子进行计算时的一些本地计算(如 map、flatmap),以及中间状态的生成(例如 UDAF 这种聚合操作)。由于这些操作可能会占用较大的内存,不建议在 PyFlink 中将这些操作的内存分配到一起。因此,建议将 Python 解释器的内存分配给堆外内存管理。
优化步骤分为三步,分别是是管理内存优化,以及 Python VM 和 JVM 的通信数据量控制,以及 JVM 作业拓扑的上下游网络通信。
首先,管理流程优化。在一次性申请较大 Python Beam Runner 内存时,如何降低其内存占用是个问题。Flink 提供了一个叫做消费者权重的功能,它分为三个部分:Operator、StateBackend 和 Python。Operator 是本地计算和中心状态生成的总管理层所占的比例,而 Python 是指示器启动时占的内存总大小。
在团队的生产环境中,官方提供的样例值为 77/30,团队调整为 66/40。这意味着在 Flink on Kubernetes 单 JM 单 TM 的计算逻辑中,UDF 部分的计算内存归 Python 解释器所有,而非 UDF 部分的计算内存归 JVM 所有。目前,团队是按照 8GB 为基准去计算内存分配的比例。
这种内存管理策略帮助团队优化资源使用,确保在执行复杂计算时的稳定性和性能。通过调整消费者权重,团队能够更好地控制 Python 和 JVM 的内存使用,提升系统效率。这个方法适用于 Flink 1.13 前后的版本,是一个经过实践验证的内存计算经验值。
在讨论第二部分时,团队已经提到了 Python VM 和 JVM 之间通信的数据量控制问题。UDF(用户定义函数)由 Python VM 管理,而非 UDF 的部分则由 JVM 管理。这种分工有助于优化内存使用和计算效率。
现在,我将简要说明 Python 解释器的内存总和。假设上游启动了一个 UDF,并设置了 8 个并行度,那么内存总和将是 8 个 UDF Python Worker 的内存乘以并行度,再加上 Python 解释器的内存,这构成了 Python 部分的内存总和。Python UDF Worker 的总和是如何计算的呢?它是基于 Beam 从 JVM 传输到 Python VM 的过程中,传输序列化所占用的内存总和。这包括传输过程中需要的序列化和反序列化内存,以及 Python Worker 执行时的内存需求。团队分享了一些调优配置供参考。这些配置基于单 JM 单 TM 的设置,使用 8GB 为基准计算出的经验值。
数据均衡化
关于数据均衡化,团队在流模式下采用了 Window TVF 的方式来解决反压问题,特别是在 KeyBy 上的反压情况。虽然 KeyBy 分散策略在某些情况下是可行的,但需要根据具体场景来判断其适用性。如果 Key 是随机且不固定的,确实可以通过 Key 均衡化来处理。但如果需要满足某些业务的严格限制,则需谨慎考虑。
以工作流中的两个关键点为例:点位一是电流,点位二是震动。在这种情况下,系统需要调用不同的插件,因此必须根据 Id 进行分区。如果按照之前提到的 KeyBy 方法,虽然 Key 的分布可能变得均匀,但可能会破坏业务的语义,因此这种条件并不成立。团队采用了 Window TVF,并惊喜地发现每个 subtask 的数据量大约在 25 到 27 万之间,分布相对均衡。团队在使用时,将原先的表达式写成 Group By 的形式,一个表示函数,使得数据更加均衡。
接下来,团队将讨论 JVM 作业拓扑图的上下游网络通信。在发生网络波动和数据堆积的情况下,作业偶尔会阻塞,这是因为 Flink 算子通信时,上游缓冲区的内存大小决定的。Netty 框架由服务端的缓冲区和客户端的缓冲区组成,只有在数据被刷出去的那一刻,消息才算是真正发送出去。同样地,Flink 的算子内部也有缓冲区。
团队假设在设置为八倍并行度的情况下,通过监控发现输出算子的缓冲池存在周期性的间隔频率。团队怀疑这可能是网络拥塞的迹象。那么,团队该如何调整呢?根据官方文档提供的公式:Buffer Size = Channel Capacity * Segment Size + Floating Memory Size。简单来说,团队可以根据数据量推算出进入算子时的总大小,并据此反推出需要设置的经验值。
在流模式中结合负载均衡的情况下,虽然 Keyby 已经实现了负载均衡,数据被均匀地分配到了各个分区,这种做法适用于 Key 分布不确定的情况。例如,假设存在三个 Key:k1、k2、k3,分别对应 300、400、500 个数据项。通过 Key 的并行度,数据可以分散到不同的处理单元。然而,由于团队中每个节点调用的算法插件不同,为了符合业务逻辑,必须按照点位的 Id 进行 KeyBy 操作。
流模式是一种数据处理方式,它类似于 Window TVF(Time-based Value Function),其总耗时可以通过简单的工作流说明。Sc Flink 相当于编写了一个固定的工作流,将所有算子嵌入其中,形成一个非常基础的工作流。因此,流模式本质上与 Workflow 等价。
通过实施流模式和 Window TVF,团队在处理 156 万数据时,性能损耗几乎降低了一半,这是团队目前研究阶段的成果。然而,团队认为这并不是最佳解决方案,因为流模式虽然有效减少了性能损耗,但可能无法满足所有业务需求和场景。因此,团队在后续的研究中考虑了其他解决方案,如 Apache Fury。Apache Fury 采用 Python 的 Pickle 编码和缓冲区的方式,基于外存进行读写优化。
四、上下游链路协作的思考
接下来详细阐述上下游链路的思考,以及算法依赖框架在其中扮演的角色。
数据链路中的流转方式
在处理边缘端数据上传时的数据补全过程中,需特别注意工业场景下设备可能出现的断流现象。这种断流通常是由于设备关机或损坏,导致无法上报特征值,从而影响 Flink 的水位线推进并导致反压问题。如果多个点位运行同一作业,解决方案需考虑现场网络环境的复杂性。团队研究了一种数据补全方法,首先利用 Flink 的依赖机制提供的时间序列差值方法。该方法能够在指定时间序列差值后,根据时间段剔除异常数据。具体来说,团队首先开启一个滑动窗口,先将这批数据暂存起来,然后使用差值方法处理时间序列数据,并在后续算子中用自定义的 UDF(用户定义函数)替换原有数据。
第二种方法是结合 UDAF(用户定义聚合函数)进行数据聚合,UDTF(用户定义表生成函数)将单行数据转换为多行,再配合滑动窗口的 Window TF 进行处理。以上就是相关代码的概述。简单概述一下工作内容:首先,团队需要在内存中进行数据补全。例如,通过开启一分钟的窗口来暂存数据,待数据修改完成后,将其放入一个 Python 迭代器中。接着,利用 Udtf(用户定义的表生成函数)将数据从单行转换为多行,逐条通过 Python迭代器输出。虽然这种方法较为原始,但为了确保业务的正常运行,团队不得不采取这种方式。此外,团队还会结合 Udtf 将数据分散到每个平衡度中。
下一种方法是团队会自定义一个补数连接器,目前仍在开发中,未来可能会开源。这个连接器基于 Flink-Faker-Connector,并在 Flink 的框架层上进行操作。团队会在 Flink 算子执行前,从上下文中补全数据。简而言之,团队会在框架层面缓存计算状态,并通过双指针和滑动窗口的方式判断上一条数据是否已到达主键。如果组件一致且时间戳更新,团队会集成 Upsert Kafka 来实现一致性保障。
在上下文链的设计上,团队首先会配置一些工况告警规则, 然后将告警信息发送到数据分析报告中。
这是团队告警下游的标签库建立过程。最上层是团队从边缘端收集的各种数据,如震动、噪声、温度和电流等,这些数据上传后会经过分发端,最终下发到 Flink 实时计算引擎中。可以理解为,团队会为各种规则进行计算,例如转速大于五且电流超过特定毫安数,以及特征值等于某个值等复杂表达式的定义。团队定义完毕后,匹配到的数据会被分为两个库:一个是用户标签库,一个是行为明细库标签库,即团队会将每次点名的信息与你所属厂房的报警系统关联,记录异动特征值,并以小时、天、周为周期进行统计异动的百分比。
例如 FFT(快速傅里叶变换),团队可能会设定一些类似偏度分布的检验标准进行评判。这些信息需要保存在画像标签库中。行为明细库是什么意思。团队异行或异常上报的告警数据都会有一个健康度评估,团队会从算法数据中识别评估,并输出一个健康度等级——是危险、中等还是低等。这个健康度等级和告警的时间、执行度、相关预设期间都需要被保留下来。
简要介绍一下上图,团队首先会定义一些行为规则,比如技术型规则还是次数性规则。告警中包含三个概念:恶化通知、容忍度、重置次数,这些都是可能需要维护历史峰值告警的行为指标。
这个图简单来说,比如团队在一条数据流中,数据恰好命中了某个规则,但规则在下一秒发布了中间这段时间差的数据,你该如何处理。目前团队的实验方式类似于第二个甬道实现的行为次序匹配,团队会先到 Flink 的状态存储中查询这个值是否存在于缓存时间区间内,并设置一个定时器来缓存数据。如果不存在,团队会进入相关系统中查询数据是否存在。
这是团队具体的告警条件。随着团队告警系统的建设,团队算法告警和数据源报警关联的指标有所不同。以工况转速和负载参数等于一为例,团队转速相关的一个派生指标是团队规则流。在设置时,可能需要根据算法测定的指标进行动态规则生成。
第二点,团队的每个子任务需要根据不同的算法实例进行多点位的联合告警。具体来说,尽管团队点位一和点位二属于同类型,但团队需要从这些点位中获取一些训练数据,类似于样本拼接,以获取作业中多个点位的信息。用户可以自行动态设置 Key By,以判断哪些点位需要输出告警数据。
第三点,由于团队同一工作流中运行的都是同类型的点位数据,如果数据来自多个算法数据的 Topic,团队可能需要进行多流合并,这是团队告警行为次序匹配的一部分。团队的容忍度是指团队会维护一个告警基线,超过这个基线的次数达到一定阈值后,会触发一个报警。报警间隔是指超过这个报警间隔的连续次数达到一定阈值后,将不再进行告警。
团队会设计一个行为次序匹配的规则,类似于 Flink State 的二级缓存方式,先在 State 中尝试,若未命中,则在 Redis中查找相关缓存结果。
五、未来展望
工业场景的归因目标
在探讨未来与展望时,团队计划执行三项主要任务。首先,当前的 CEP模式可能并不完全符合团队的系统定义。鉴于团队的目标是从复杂的内部波栏式结构中解析出告警规则,并进行计算后推送到下游,
团队将基于 Apache Flink 进行编解码的强化。Flink 本身是基于序列化和反序列化构建的,它将定义一个真实的缓冲区,并利用外部内存实现零拷贝,以此提升框架性能。目前,我已经提出了一个 PR (Pull Request),它CI流程中。
这是一个关于告警类型的提案,虽然目前还不完善,但大家可以初步了解一下。
团队的告警链路分为两层:数据标定链路和临时告警链路。在规则匹配和数据标定完成后,团队可能还需要处理一些临时告警的需求。为此,团队可能需要基于 Flink 进行操作,例如过滤出错误率大于零的数据。团队可能需要使用一些 NFA (Non-deterministic Finite Automaton) 算子来实现这一点。
接下来,简单介绍一下 Pattern 告警的实现。实际上,它只需要几行代码就可以完成。
这个工具是团队自行开发的,它的用途是尽管定义了很多代码,但随着上层告警规则变得越来越复杂,你肯定不希望每次生成规则时都编译出大量代码。那么,解决方案是什么呢?我将基于 Anltr4 定义一个表达式引擎,并将其桥接到 CEP 的 Operator 中。这就是我目前正在进行的工作。
对工业数据指标的建立感兴趣的朋友可以了解一下。刚刚提到,将建立用户画像标签库和用户行为库。既然建立了仓库,势必要为数据部门的归因提供一定的定位。归因目标是确定告警发生在哪个区域、哪个传感器、告警了多少次,是早班、中班还是晚班。团队的数据资产首先分为厂房点位,其次是设备部件,如闸机、电机、齿轮等。第三级是面向每个客户的场景,我会查看驾驶舱内从一个产线到闸机的结构是如何生成的。团队的分析类型,以苹果分析为例,包括 FFA 的浮列变换,它涉及两个域:时域和频域。时域指的是音频信号会分裂分解成若干正弦波,设置不同的频率信号增强其相应的频段。频率则是将一组声音的不同频段转换为赫兹。团队归根的目标是什么呢?是按小时、周集上报的频段以及异常的波动趋势,以及相关的健康度等。
目前能不能与 RIG 相结合。其实不算是一个 RAG,在 RAG 之前,大多很多都是通过 NLP 去解的。团队会通过一些文献、联网资料去解析出它的现象故障信息。比如,团队工业设备为什么故障?它是在工控机遇到了风化还是应急有离的情况,会有这种故障的情况。团队会提取故障现象,以结发分值的方式去停用词正则去符号,形成一个完整的词句。可能会通过 CNN 分类,分类成故障现象 K3 句继承类型 K1 句,去判断出他当中有没有一个故障部位词和告警词,转换成应的子句之后,会将它建立一个知识图。