基于 Spark 实现 COS 海量数据处理

发布于:2025-06-24 ⋅ 阅读:(18) ⋅ 点赞:(0)

上周在组内分享了一下这个主题, 我觉得还是摘出一部分当文章输出出来
分享主要包括三个方面: 1. 项目背景 2.Spark 原理 3. Spark 实战
项目背景
主要是将海量日志进行多维度处理;
项目难点
1、数据量大(压缩包数量 6TB,60 亿条数据);
2、在 cos 上的目录不固定;
3、计算方式复杂,各种过滤、聚合、汇总逻辑;
4、处理时间有限,需在 4h 内完成;
基于上述的项目背景和难点, 最终决定采用 Spark,首先数据量大及计算方式复杂, 如果使用传统的服务方式, 需要大量的服务器资源, 而目录不固定, 使数据读取变的复杂, 且普通服务不太可能在 4h 内处理完毕; 综合考虑决定使用 Spark。
那么就要讲讲Spark 是什么, 以及在这些挑战中的优势了
Apache Spark
快速、通用、可扩展的大数据引擎;
优势:
1、快速: Spark 可以中集群中并行处理数据, 重复利用多台机器的计算能力,显著提高处理速度, 对于我们的大数据量场景尤为重要;
2、易于使用的 API, 支持 Java、Python、Scala API; Spark 原生只支持 Scala 和 Java,仅中外围包装通过 PySpark 中间件实现对 Python 语言的支持;
3、通用:提供多种计算模型,如: 批处理、交互式查询(Spark SQL)、实时流处理、机器学习、图计算等,可以灵活应对复杂的计算需求;
4、灵活: 支持多种数据源,如 HDFS、COS、Kafka、HBase 像我们的数据存储在 COS 上, 是可以直接读取 COS 目录, 且对于不确定路径, 可以直接使用* 代替,Hadoop-COS实现了以腾讯云 COS 作为底层文件系统运行上层计算任务的功能,支持使用Hadoop、Spark以及Tez等处理存储在腾讯云COS对象存储系统上的数据,地址如下:
https://github.com/tencentyun/hadoop-cos
Spark 架构
在这里插入图片描述
前面了解了 Spark 是什么, 这里讲一下 Spark 的架构
1、Driver是spark应用程序入口,是spark核心,负责spark集群的链接和资源管理。
2、ClusterManager负责所有Executor的资源管理和调度,Spark可以与多种集群管理器配合使用, 比如yarn k8s。

3、Executor 负责具体作业计算任务。

当我们提交一个任务开始执行时,是如何作业的:

1、启动 Driver 程序,会解析编写的程序,并序列化字节级别代码, 通过 SparkSession 的一个成员变量: SparkContext 向Cluster Manager 发出命令,Cluster Manager 会将当前的资源情况分配合适的资源给 Driver。

2、 Drvier 的字节级别代码会分发至将要执行的 Executor 上, 这些计算过程实际上是在每个节点本地计算并完成,每个spark会在集群中有一个或多个Executor,Executor 之间也可能会有数据的传输,比如一些聚合函数执行。

3、一旦整个执行过程完成,Driver 收集所有 Executor 返回的结果, 结束整个作业,同时像 ClusterManager 释放资源。

4、在整个过程中,Cluster Manager 扮演了资源管理和任务调度的关键角色。它确保了 Spark 作业能够高效地利用集群资源,调度任务到合适的 Executor 上执行,从而实现分布式计算的优势。

通过这种方式, Spark 可以高效利用集群资源, 实现大规模数据的分布式处理

Spark 核心组件

在这里插入图片描述
1、Spark Core是Spark基础,提供内存计算能力, 是分布式处理大数据的基础,它将分布式数据抽象为弹性分布式数据集(RDD),并为运行在其上的上层组件提供 API。所有 Spark 的上层组件都建立在 Spark Core 的基础之上。

2、Spark Streaming 是一个用于处理动态数据流的 Spark 组件。它能够开发出强大的交互和数据查询程序。在处理动态数据流时,流数据会被分割成微小的批处理,这些微小批处理将会在 Spark Core 上按时间顺序快速执行。

3、Spark SQL 是一个用于处理结构化数据的 Spark 组件。它允许使用 SQL 语句查询数据。Spark 支持多种数据源,包括 Hive 表、Parquet 和 JSON 等。

Spark 核心数据结构

在这里插入图片描述
前面我们带过 rdd, rdd 全称为弹性分布式数据集, 是 spark 的核心数据结构,一个不可变的分布式对象集合,

虽然名字带了分布式,但是在使用的时候,是感受不到分布式,就跟操作本地数据集一样操作在分布式存储中的数据。

RDD 特性有三种:

1、弹性

容错分两部分: 1> 机器层面的容错, 节点出错自动重试, 2>RDD层面的容错;也就是血统, rdd 的依赖关系, 有宽依赖和窄依赖, 可以通过血统信息重新计算丢失的分区,而不需要重新计算整个RDD;但是当计算逻辑复杂时,就会引起依赖链过长,重算代价就很高,可以适当使用rdd检查点;

2、分布式

数据分布在集群的多个节点上,RDD的分区(partition)是指将数据集划分成多个部分,以便在集群中的不同节点上并行处理。分区数与集群中的节点数无关,很可能集群中的单个节点有几个数据分区。

3、不可变性

Rdd只能基于在稳定物理存储中的数据集和其他已有的rdd上执行确定性操作来创建。

RDD 支持操作
在这里插入图片描述
rdd支持两种

1、转换,从现有数据集创建一个新的数据集:如map将数据集每个元素传递给函数,返回一个新的分布式数据集表示结果。

RDD的所有转换都是惰性的, 也就是说并不会直接计算,他们只是记住这些应用到基础数据集(比如一个文件)的转换动作, 只有当发生一个要求返回结果给Driver的动作, 才会真正执行; 比如map创建一个新数据集,并在reduce中使用,最终只返回reduce的结果给Driver,而不是整个大的新数据集。

这样Spark就可以了解所有执行的算子, 从而设定并优化执行计划。

2、动作: 如reduce 将所有元素叠加起来,将最终结果返回给Driver

转换算子返回的还是RDD,但是行动算子返回的是非RDD类型值,比如整数,或者无返回值

RDD 依赖关系

1、窄依赖

每一个parent RDD的Partition最多被子RDD的一个Partition使用

2、宽依赖(也称Shuffle依赖)

多个子RDD的Partition会依赖同一个parent RDD的Partition

Shuffle 是指在分布式计算过程中,数据在不同的分区之间重新分配的过程。Shuffle 通常发生在需要跨分区进行数据交换的操作中,例如 groupByKey、reduceByKey、join 等。这些操作需要将数据从一个分区移动到另一个分区,以便进行合并或聚合

在书上截的一个图, 还是很清晰的:
在这里插入图片描述
RDD 逻辑计算图
在这里插入图片描述
这里结合我们的项目背景: 万象图片请求数据, 对海量日志进行多维度处理、计算、分析,我们来了解一下 rdd 的逻辑计算

我们的日志都是以压缩包的方式,json 的格式存储在 cos 上; 首先从 cos读取出来的数据也就是第一步创建 RDD,其中解析 json,确定 key 以及 filter 过滤逻辑, 是 RDD 的转换操作;

转换完成后,进行按桶粒度聚合或者统计,是action动作,生成运算结果, 转换和执行在Executor上操作的;

每个Executor处理其中的一部分RDD,最终将执行结果又写回 COS 上;

RDD 缓存

在这里插入图片描述
Spark 速度非常快的原因之一就是 RDD 缓存。

我们看右侧的这个图, 以场景来说明:

RDD0 过滤生成RDD1, 在RDD1基础上, 进行不同的聚合计算, 常规情况下, 要做两次filter;

首先进行了RDD0→RDD1→RDD2的计算作业,那么计算结束时,RDD1就已经缓存在系统中了。在进行RDD0→RDD1→RDD3的计算作业时,由于RDD1已经缓存在系统中,因此RDD0→RDD1的转换不会重复进行,计算作业只须进行RDD1→RDD3的计算就可以了,因此计算速度可以得到很大提升

所以在不同操作中在内存中持久化(或缓存)一个RDD后,每个节点就将计算的分片结果保存在内存中,对次数据集进行的其他操作中重用。

缓存有可能丢失, 或者基于内存的数据由于内存不足被删除, RDD的缓存机制,保证了即使缓存丢失也能保证计算的正确执行。

Spark 配置及调优
在这里插入图片描述
先讲资源配置:

Executor.memory: 设置过大, 部分任务分配到资源等待, 设置过小,频繁gc,影响性能;

Executor-cores: 每个Executor可以使用的cpu核心数每个Executor可以并行执行多个任务,核心数越多,Executor的并行处理能力越强。
在这里插入图片描述
在代码中的一些使用: 数据处理优化;

数据分区设置: 分区数决定了数据集被划分成多少个部分,影响到并行度和任务调度。过多: 上下文频繁切换;过少,并行度不足,任务处理数据量大,影响作业完成时间;

那么如何合理设置分区数:

分区数应根据数据量和集群的计算资源来设置。一个常见的经验法则是每个分区的数据量在 128MB 到 256MB 之间。在实际运行中,监控作业的执行情况,观察任务的执行时间、资源利用率等指标,根据实际情况进行调整。

数据倾斜:数据倾斜会导致某些任务处理数据量过大, 以reduceByKey 和groupByKey 为例:

1> 内存使用

groupByKey 会将所有具有相同键的值聚集到一个列表中,这可能会导致大量的数据在内存中存储,尤其是当某个键的值非常多时。这可能会导致内存溢出或性能下降。

reduceByKey 在每个分区内先进行局部聚合(即在每个分区内对相同键的值进行合并),然后再将结果发送到 reducer。这种方式减少了需要传输的数据量,从而降低了内存使用和网络传输的开销。

2> 网络传输

groupByKey 会将所有相同键的值发送到同一个节点,这可能会导致大量的数据在网络中传输。

reduceByKey 通过在每个分区内进行局部合并,减少了需要在网络中传输的数据量,从而提高了性能。

缓存: 将常用数据进行缓存,缓存有几种形式, 比如都放内存中, 可以选择节省空间的级别,序列化对象等多种级别。
在这里插入图片描述
1> 比如在使用filter算子后,通常数据会被打碎成很多个小分区,这会影响后面的执行操作,可以先对后面的数据用coalesce算子进行一次合并。

2>像在实际处理cos 文件, 文件只有几十 k,但是十几万的数据, 光遍历读 COS 就需要 1h+, 处理加工只需要 30min;

到这里, 就结束了!

附录 COS 使用 demo:

https://cloud.tencent.com/document/product/436/79146


网站公告

今日签到

点亮在社区的每一天
去签到