性能优化的三个方面
性能优化从三个层面优化:
数据源头方面:数据如何存储和组织,比如文件格式、压缩方式、数据分区和分桶
SQL语句方面:SQL查询,分析JOIN类型,WHERE过滤条件,GROUP BY使用等
计算引擎方面:MapReduce和Spark内部运行机制,比如shuffle的原理、内存管理、数据倾斜等
数据源头层面
数据源头方面主要关注文件格式、数据压缩、分区和分桶三个方面,通常是三者结合使用。
文件格式
我们可以把文件格式想象成存放数据的“容器”。有的容器(比如 TXT 或 CSV)就像一个大杂烩的袋子,把每一行所有的数据都装在一起。而另一些容器(比如 Parquet 或 ORC)则更像一个有许多抽屉的柜子,把每一列数据都单独放在一个抽屉里。
这两种存储方式——行式存储(如 CSV, JSON)和列式存储(如 Parquet, ORC)——是我们在大数据领域最常遇到的。它们在处理 SQL 查询时,效率天差地别。
列式存储核心优势在于I/O的效率高。
列式存储 (Parquet/ORC): 只需要直接找到 所需要的字段,读取里面的数据就行了。这被称为 “谓词下推” (Predicate Pushdown) 的一种体现,引擎只读取它绝对需要的数据。I/O 开销非常小。
行式存储 (CSV/JSON): 引擎必须把每一行的数据(全部 100 个字段)都从磁盘加载到内存里,然后再从内存中提取出我们需要的字段。当数据量达到 TB 甚至 PB 级别时,这种无效 I/O 会浪费掉海量的时间和资源。
Apache Paimon(以前称为 Flink Table Store)正在获得很大的关注,Paimon 并不像 Parquet 或 ORC 那样是一种全新的_文件格式_ 。相反,将其视为存储抽象层或位于 Parquet 等文件格式_之上的表格式
区别:
- Parquet/ORC 就像保存 Word 文档。保存后,该文件是静态且不可更改的(不可变)。要更改它,您必须创建一个全新的文件版本
- Paimon就像使用 Google 文档一样。它允许多人实时对文档进行微小的更改(更新、删除、插入),并跟踪所有历史记录。您始终会看到最新版本,但基础更改会得到有效管理。
如何实现地像Google文档一样
- 基本数据文件(列格式): 大部分历史数据存储在高效、压缩的列式文件(如 Parquet)中
- 更改日志: 所有新更改(插入、更新、删除)都写入更小的单独日志文件。这个过程非常快
在 Paimon 表上运行 SQL 查询时,查询引擎足够智能,可以读取大型基本文件,然后应用日志文件中最近的更改,从而提供完美的最新结果。Paimon会定期将这些小日志文件合并到后台较大的基础文件中,以保持效率。这种架构被称为对数结构合并树(LSM 树)。
Paimon 本质上为数据湖带来了类似数据库的更新/删除功能。鉴于这种处理实时变化的能力,其非常适合实时数据同步、流失更新分析、用户画像/特征存储。传统的数仓技术(比如 Hive on Parquet)处理“追加”新数据(
INSERT
)非常高效,但一旦涉及到修改(UPDATE
)或删除(DELETE
)某几条记录,就变得非常笨拙和昂贵,Paimon则赋予了数据湖“实时更新”的能力,让 Spark 和 Flink 可以在湖上进行更复杂的、交互性更强的 SQL 操作。
数据压缩
列式存储还更有利于数据压缩,两种典型的压缩技术就是字典编码 (Dictionary Encoding) 和行程长度编码 (Run-Length Encoding, RLE)。
- 字典编码 (Dictionary Encoding):可以把它想象成一个翻译的过程:
创建字典 📖:系统首先会扫描
city
这一整列的数据,找出所有不重复的城市名(比如 ‘北京’, ‘上海’, ‘广州’),然后给它们各自一个独一无二的、更短的编号(比如 ‘北京’ -> 0, ‘上海’ -> 1, ‘广州’ -> 2)。这个对应的关系就是“字典”。翻译原文 ✍️:接下来,系统会用这些简短的数字编号去替换原始的长字符串。所以,原来的一列数据
['北京', '上海', '北京', '广州', '上海']
就变成了[0, 1, 0, 2, 1]
- 行程长度编码 (Run-Length Encoding, RLE):在字典编码的基础上,对其进行排序,原有的顺序变为
[0, 0, 1, 1, 2]
,再进行压缩,即RLE,压缩成(0, 2), (1, 2), (2, 1)。
文件格式的应用场景
分析型场景:在 MapReduce 和 Spark SQL 中,当我们主要做数据分析(
SELECT
几列、GROUP BY
、WHERE
过滤)时,Parquet 和 ORC 这样的列式存储是绝对的首选。事务型/整行读取场景:如果业务需要频繁地读取或写入一整行所有字段,Avro 这样的行式存储会更合适。
分区和分桶
分区
分区是在我们未经过查询内容即可过滤掉海量无关数据。
想象一下你家里的文件柜 🗄️。如果你把所有的文件——账单、合同、信件——都胡乱塞进同一个抽屉,每次找一份特定日期的账单都会把整个抽屉翻个底朝天。
一个更聪明的方法是,你用标签把抽屉分开,比如 “2023年账单”、“2024年账单”、“保险合同”等等。这样,当你想找2024年的账单时,你会直接拉开标有“2024年账单”的抽屉,完全无视其他抽屉。
数据分区就是完全一样的道理。我们不是把一张表的所有数据都存在一个大文件夹里,而是根据某一列(或几列)的值,把数据文件存放到不同的子文件夹里。最常见的分区键就是日期。
例如,一个日志表 logs
可能会被 log_date
这一列分区。在底层存储上,它会形成这样的文件夹结构:
/warehouse/logs/
├── log_date=2024-01-01/
│ └── part-00001.parquet
├── log_date=2024-01-02/
│ └── part-00002.parquet
├── log_date=2024-01-03/
│ └── part-00003.parquet ...
执行SQL语句
SELECT *
FROM logs
WHERE log_date = '2024-01-02';
会直接去2024-01-02文件夹内去查询,有一个专门的术语,叫做 “分区裁剪” (Partition Pruning) 或 “分区消除” (Partition Elimination)。实现了 零 I/O。查询性能的提升是指数级的,尤其是在数据量巨大的情况下。
分区也可以进行二级分区,比如2024年账单中,有省份,有月份,那就可以将省份作为一级分区,月份作为二级分区,这样的话分区裁剪后,缩小了数据范围。
当然不是说分区越细越好,过度就会造成“小文件问题”(Small Files Problem),小文件问题主要带来两个核心负担:
对元数据管理(NameNode)的压力 🧠:在 HDFS 中,每一个文件,无论大小,都需要在 NameNode 的内存里保存一份元数据(文件名、位置、权限等)。如果有一亿个小文件,NameNode 就需要存储一亿条元数据记录,这会极大地消耗其内存,甚至可能导致整个集群的“大脑”崩溃。
对计算引擎(MapReduce/Spark)的压力 ⏱️:每处理一个小文件,计算引擎通常需要启动一个独立的计算任务(Task)。启动一个任务本身是有开销的(比如申请资源、启动进程等)。如果文件只有几 KB,可能启动任务花的时间比真正处理数据的时间还要长。这就好比派一辆大卡车去送一封信,运输工具的成本远高于货物本身的价值。
所以,过度分区导致的小文件问题,会让系统变得“虚胖”,看起来有很多任务在跑,但大部分时间都浪费在了任务调度和元数据管理上,而不是真正的数据计算。
如何解决小文件问题?一般是有两种思路:
合并小文件 (Compaction):集合成一个大的分区。我们可以定期运行一个专门的程序(比如一个 Spark 作业),把每个分区内的所有小文件读出来,然后重新写入成一两个大的、理想大小的文件(比如 256MB 或 1GB)。这个过程就像整理房间,把散落一地的小纸条都誊写到几张大纸上,既整洁又高效。
更换/调整分区策略:从源头的设计层面解决问题。在选择分区字段时,我们需要评估它的 “基数” (Cardinality),也就是这一列有多少个不同的值。
- 高基数 字段(如
user_id
,有上百万个值)绝对不能用作分区键,否则会产生灾难性的百万级小文件。 - 中等基数 字段(如
province
,有30多个值)就需要权衡。如果每天每个省的数据都很大,那没问题;如果数据量小,就可能产生小文件。 - 低基数 字段(如
region
- ‘华东’, ‘华北’, '华南’等,只有几个值)通常是更安全的选择。
- 高基数 字段(如
分桶
分桶可以看作是在分区这个“大抽屉”里,再放入一些“小格子”,它对优化 JOIN
操作有奇效。
在JOIN
操作时,Shuffle阶段是最耗时、耗资源的。
我们可以把它想象成一个大型的“配对游戏”。假设你有两堆海量的卡片(代表两个表),每张卡片上都有一个 user_id
。为了找到 user_id
相同的卡片对,你不得不:
把第一堆的所有卡片,根据
user_id
的某个规则(比如哈希值)分发到 100 个不同的篮子里。再把第二堆的所有卡片,也用完全相同的规则,分发到另外 100 个篮子里。
现在,你知道第一个篮子里的卡片,只可能和另一个牌堆的第一个篮子里的卡片配对成功。
这个把所有卡片都重新分发到对应篮子的过程,就是 Shuffle。在分布式计算中,这意味着海量的数据需要在网络中移动,这会极大地消耗网络 I/O 和磁盘 I/O。
分桶 (Bucketing),就是为了避免这个昂贵的“现场配对”过程而发明的。
它做的事情很简单:在数据写入表的时候,就提前把这个“配对游戏”玩一遍。也就是说,我们提前就规定好,user_id
相同的两张表的数据,必须被写入到编号相同的“桶”(文件)里。
这样,当 Spark/Hive 执行 JOIN
时,它会发现:“哦!这两个表都已经按 user_id
分好桶了!我只需要把 table_A
的 1 号桶和 table_B
的 1 号桶拿来JOIN,2 号桶和 2 号桶 JOIN … 就行了,根本不需要再做一次全局的 Shuffle!”
这个优化叫做 “桶剪枝” (Bucket Pruning),它可以将昂贵的 Sort-Merge Join 优化为更高效的 Bucket-Map Join,甚至完全避免 Shuffle。
哈希分桶工作原理
- 选择一个或多个列作为“分桶键”(比如
user_id
)。- 当一条数据(比如一个用户的订单信息)需要被写入时,系统会对
user_id
这个值进行一次 哈希运算,得到一个哈希值(一个数字)。- 最后,用这个哈希值对你设定的桶的数量取模(比如
hash(user_id) % 4
),得出的结果是 0, 1, 2, 或 3。这个结果就决定了这条数据应该被放进哪个桶(文件)里。
因为哈希函数对于同一个输入总能得到同一个输出,所以这个过程保证了同一个 user_id
的所有数据,在任何时候、任何表中,只要分桶规则相同,就必定会进入到同一个编号的桶里。这就是它能奇迹般地避免 JOIN
时 Shuffle 的根本原因。
SQL语句优化层面
SQL语句优化主要关注几个重点区域:
- 过滤逻辑(
WHERE
子句)**:如何尽早、尽快地过滤掉不需要的数据。 - 连接逻辑(
JOIN
子句)**:这是最常见的性能瓶颈,我们会看看如何选择正确的 JOIN 策略。 - 聚合操作(
GROUP BY
子句)**:如何高效地进行分组和计算,尤其是在数据不均匀时。
过滤逻辑
核心原则可以总结为一句话:“尽早过滤,越多越好”。
把数据处理想象成一个巨大的漏斗。我们希望在漏斗的最顶端就筛掉最大块的、最无关的“石头”(数据),这样流到下面的数据就越少,后续处理起来就越轻松。
表web_logs中,log_date是天分区字段
SELECT
user_id,
request_url
FROM
web_logs
WHERE
log_date = '2024-08-25'
AND request_method = 'GET'
AND contains(request_url, 'special_promo');
下面我们来分析下引擎执行情况。引擎将始终优先考虑 log_date
过滤器,因为它可以使用分区修剪 ,根本不需要读取任何数据即可丢弃所有其他日期。这通过显著减少甚至不需要考虑的初始数据量,提供了最大的性能提升。
再锁定到指定分区数据后,来看看剩余两个条件,提一个问题,你认为request_method = 'GET'
和 contains(request_url, 'special_promo')
哪个条件执行起来更容易?
如果把取出的数据看成是excel的表。
request_method = 'GET'
:执行时,是查看“单元格”中的值并检查它是否完全匹配。速度非常快contains(request_url, 'special_promo')
:执行时,必须打开“单元格”,读取其中的整个字符串,然后执行搜索算法来查看子字符串“special_promo”是否存在。这需要每一行更多的 CPU 工作。
在数据库术语中, SARGable(搜索 ARGument-able),意味着它们可以有效地使用索引和其他快捷方式。将函数应用于WHERE
子句中的列时,它通常会变得不可 SARGable,从而迫使引擎对每一行进行全面扫描和计算。
通过上面例子我们可以知道理想过滤顺序是
- 分区修剪(影响最大,避免 I/O)
- 索引列或低基数列上的简单、可 SARGable 过滤器
- 需要 CPU 密集型函数的复杂、不可 SAG 的过滤器
在现代的Spark 和 Hive 都有了非常成熟切复杂的 基于成本的优化器 (Cost-Based Optimizer, CBO),在写条件的时候我们不需要考虑写的顺序,会自动给我们做优化,但从最佳实践和代码可读性的角度出发,我们仍然强烈建议:总是把最强的、最高效的过滤条件写在最前面。
这样做有两个好处:
逻辑清晰:读代码的人能立刻明白这条 SQL 的核心过滤逻辑。
双重保险:万一在某些极端情况下,优化器的统计信息过时或者判断失误,你写的清晰逻辑也能帮它做出正确的选择。
连接逻辑
小表关联大表
join
操作进行多表连接,经常遇到小表关联大表的场景:你需要将一张非常大的事实表(比如 100GB 的 sales
销售记录表)和一张非常小的维度表(比如 1MB 的 stores
门店信息表)进行 JOIN
。
常见的操作就是,将小表去关联大表,在spark中可以对小表广播到excuter上,在mapreduce中可以使用mapjoin将小表缓存起来,我认为从优化角度上看本质上是一样的,即为了减少网络I/O,提升速度,避免昂贵的shuffle阶段和reduce阶段。
在spark中,广播哈希连接” (Broadcast Hash Join),通常也被称为 Map-Side Join。
工作流程:
- 判断:Spark 的优化器看到一个大表和一个小表在
JOIN
,它会判断小表的大小是否低于一个设定的阈值(比如,默认是 10MB)。 - 广播 📡:如果判断通过,Driver 节点会把这张小表的所有数据从 Executor 拉到自己的内存里,然后像广播一样,把它分发给集群中每一个参与计算的 Executor。
- 本地执行:现在,每一个 Executor 手里都有一份完整的小表“地图册”。当它处理大表的某个数据分片(Partition)时,它只需要在自己的内存里查找这张“地图册”来完成
JOIN
,完全不需要把大表的数据发送到网络上。
在MapReduce 里的 MapJoin 和Spark Broadcast Join,本质上是完全相同的思想。 MapReduce 的实现细节更“手动”一些。过程是这样的:
- 分发小表:在 MapReduce 中,我们利用一个叫做
DistributedCache
(分布式缓存) 的功能。在作业开始前,我们手动把小表文件上传到这个缓存里。Hadoop 框架会自动确保,每一个将要运行 Map 任务的节点,都会提前把这个小表文件下载到本地。 - Mapper 内存加载:当每一个 Map 任务启动时,它的第一件事就是从本地磁盘读取这个小表文件,然后把所有数据加载到自己的内存里,通常会存成一个哈希表(HashMap),这样查找起来非常快。
- 本地 Join:接下来,这个 Map 任务开始处理它被分配到的大表数据切片。每读到大表的一行数据,它就用这行数据的
JOIN key
,去内存里的哈希表里查找匹配项。如果找到了,就直接把合并后的结果输出。
整个 JOIN
的过程,完全在 Map
阶段就独立完成了,根本不需要把大表的数据进行 Shuffle,也不需要 Reducer
来进行最终的合并。所以它也被称为 “Map 端连接” (Map-Side Join)。
Spark 的广播连接,可以看作是这个过程的更智能、更自动化的版本。
大表关联大表
在大表关联大表的场景中,广播策略就不适用了,接上面的例子,原因有两个:
- 驱动程序内存过载 :驱动程序节点会尝试在广播之前将所有 100GB 数据拉入自己的内存中,这几乎肯定会导致它崩溃并显示 OutOfMemoryError
- 网络饱和 :即使驱动程序有足够的内存,向每个执行器推送 100GB 的数据也会淹没网络,造成巨大的瓶颈。
因此,当 Spark 发现两个表都太大而无法广播时,它会回退到其默认的、更健壮的策略: 排序-合并连接
这个过程有点像合并两副排序的扑克牌:
- shuffle:Spark 首先洗牌两张大桌子。它根据联接键在集群中重新分配数据,确保两个表中具有相同键的行最终位于同一台工作器机器上
- 排序 :在每台工作器计算机上,每个分区中的数据按联接键排序。
- 合并 :然后引擎同时迭代两个排序的数据集。由于它们是排序的,因此它可以非常有效地找到所有匹配的键并连接相应的行。
这种排序-合并连接是大数据连接的主力。然而,最初的shuffle阶段仍然非常昂贵。
面对这种问题,我们可以从数据源层面解决,就是使用分桶。当两张大表进行 JOIN
时,如果它们已经基于 JOIN key
进行了分桶 (Bucketing),那么计算引擎就可以执行更高效的 Bucket-Map Join,完全避免 Shuffle 阶段,因为引擎知道相同 key 的数据必定在对应的桶文件里。
在大表关联大表的过程中,经常遇到数据倾斜(Data Skew)的问题,即我们发现有几个特殊的 user_id
(比如代表“未登录游客”的 user_id = -1
,或者某个超级大卖家的 ID),它们的数据量占了整个表的 30%。这意味着,在 JOIN
时,处理这几个特殊 user_id
的那几个计算任务 (Task) 会接收到海量的数据,运行得极其缓慢,而其他 99% 的任务早就执行完了,整个集群都在“等”那几个慢任务。这就是典型的“长尾效应”。
那我们从SQL优化角度,如何解决此问题,即有没有办法把这些“异常”的 key 和“正常”的 key 分开处理呢?
假设我们的罪魁祸首是 user_id = -1
。在 JOIN
的时候,我们希望 sales
表里所有 user_id = -1
的数据,不要再只和 user_profile
表里 user_id = -1
的那一行去匹配了。
我们可以在 JOIN
之前,对 sales
表里 user_id = -1
的数据做一点“手脚”。比如,我们可以把这些数据的 JOIN key
随机地变成 -1_0
, -1_1
, -1_2
, …, -1_99
这样的新 key。当然,在另一张表user_profile
也要进行同样的改造,在 sales
表(大表)这边,我们把 user_id = -1
随机地变成了 -1_0
到 -1_99
。这是一个 “一对多” 的打散。那么,在 user_profile
表(小表)这边,我们只有一行 user_id = -1
的数据。为了确保 sales
表里任意一个被打散的 key(比如 -1_42
)都能找到匹配项,我们应该对 user_profile
表里的这一行数据做什么样的操作呢?
我们采用的核心策略是 “随机化与复制” (Randomize and Replicate)。
**随机化大表 (Randomize the Large Table) **
- 目标: 把
sales
表中所有user_id = -1
的数据,均匀地分发给多个不同的计算任务 (Task) 去处理。 - 方法: 我们在
JOIN
之前,通过一个CASE WHEN
或者IF
语句,对sales
表的JOIN key
进行改造。- 如果
user_id
不是-1
,key 保持不变。 - 如果
user_id
是-1
,我们给它拼接上一个随机数,比如 0 到 99 之间的一个整数。这样,原来的user_id = -1
就变成了-1_0
,-1_1
,-1_2
…-1_99
等100个不同的 key。
- 如果
- 目标: 把
**复制小表 (Replicate the Small Table)
- 目标: 确保
sales
表中每一个被随机化了的 key (如-1_42
),都能在user_profile
表中找到它唯一的匹配对象。 - 方法: 我们需要将
user_profile
表里user_id = -1
的那一行数据复制 100 份。每一份都对应一个我们刚才随机化时可能产生的新 key。- 所以
user_profile
表会多出 100 行数据,它们的JOIN key
分别是-1_0
,-1_1
, …,-1_99
,而其他的画像信息(如用户名、年龄等)都是完全一样的。
- 所以
- 目标: 确保
现在,JOIN
操作本身就变得均衡了。当 Spark 执行 JOIN
时:
- 处理
-1_42
这个 key 的任务,只会收到sales
表中大约 1/100 的倾斜数据,以及user_profile
表中那唯一的一行-1_42
的数据。 - 每一个处理倾斜 key 的任务,负载都变得大致相同。
这个手动“加盐” (Salting) 打散 key 的方法,是解决数据倾斜最有效也是最通用的手段之一。
其实,像 Spark 3.0 及之后的版本,已经内置了一些自动处理数据倾斜的功能。Spark维护着一套更详细的统计信息 (Statistics)。当我们在 Hive/Spark 中运行 ANALYZE TABLE ... COMPUTE STATISTICS
这样的命令时,就是在收集这些信息。
主要包括:
- 直方图 (Histograms): 它不只记录每个值的数量,而是将数据的值域分成许多个“桶”,然后统计落入每个桶里的数据量。通过直方图,优化器能非常精确地了解数据的分布情况,一眼就能看出哪个值是“超级热点”,从而识别出倾斜。
- 基数 (Cardinality): 也就是一列中不重复值的数量。
- 空值数量 (Null Counts): 这一列有多少个 NULL。
- 最大/平均长度: 对于字符串类型的列,它们的平均和最大长度是多少。
当 Spark 的查询优化器(特别是其 自适应查询执行 AQE 模块)拿到这些情报后,它在执行 JOIN
之前就能做出预判:“根据直方图,user_id = -1
这个 key 的数据量会大到产生倾斜!”
一旦做出这个预判,它就可以自动地、在运行时动态地应用我们刚才手动做的那套“加盐”策略,把倾斜的 key 打散,将任务均匀分配。
从手动优化聊到了自动优化,底层的原理都是相通的,就是识别倾斜 -> 打散倾斜
聚合操作
在group by聚合时也会发生数据倾斜的典型场景。根本原因和 JOIN
倾斜一模一样:少数几个 key 承担了绝大部分的数据处理压力。计算引擎在 Shuffle 阶段会把同一个 user_id
的所有数据都发送到同一个计算任务 (Reducer/Task) 上,导致那个处理 user_id = 1
的任务不堪重负。
既然我们已经知道问题所在,那么解决方案的思路其实也和 JOIN
倾斜非常相似。我们同样需要把那个巨大的 key “打散”。
这个技术通常被称为 “两阶段聚合” (Two-Stage Aggregation) 或 “局部聚合+全局聚合”。
我们可以把它想象成“地方选举计票”:
第一阶段(局部聚合): 不把所有选票都寄到首都去点。而是先在每个市、每个县进行一次初步计票,得到各地的分项统计。为了防止某个“超级大市”的选票过多,我们甚至可以把这个市再分成几个区来分别计票。
第二阶段(全局聚合): 各个地方只把它们的分项统计结果汇报到首都。首都只需要把这些汇总好的数字加起来,就能得到最终结果。
在 Spark/MapReduce 中,这个过程是:
Map 端聚合 (带随机盐): 在 Shuffle 之前,先对每个分区的数据进行一次
GROUP BY
。对于倾斜的 key (如user_id = 1
),我们给它加上一个随机数前缀或后缀(加盐),GROUP BY user_id, salt
。这样,原来由一个任务处理的user_id = 1
被打散成了多个小任务。Reduce 端聚合: 对已经局部聚合、数据量大大减少的结果进行第二次
GROUP BY
,这次只按原始的user_id
分组,将第一阶段的各个分项结果合并,得出最终的聚合值。
这种两阶段聚合的方法,极大地缓解了 Shuffle 的压力和单个 Reducer 的计算压力。
计算引擎优化
- 内存管理:Spark 如何分配和使用 Executor 的内存,以及我们如何通过
cache
或persist
来加速重复计算 - 并行度调优:如何设置合理的任务并发数,避免资源浪费或任务阻塞。这直接关系到
spark.sql.shuffle.partitions
这个核心参数 - 自适应查询执行:Spark 比较新的“黑科技”,引擎可以在运行时动态地调整自己的执行计划,比如自动处理数据倾斜。
内存管理
可以把 Spark 的 Executor 想象成一个独立的计算工厂。这个工厂的总空间(内存)是固定的。工厂内部被划分成了几个主要的车间:
执行内存 (Execution Memory):这是用来进行“数据加工”的地方,比如执行 Shuffle、Sort、Join、Aggregation 等操作时,存放中间数据的地方。
存储内存 (Storage Memory):这是一个“临时仓库”,用来存放我们手动通过
cache()
或persist()
命令缓存起来的数据(RDDs, DataFrames)。统一内存管理 (Unified Memory Management):现代 Spark 版本(1.6+)采用了一种聪明的动态机制。执行内存和存储内存共享一个大的区域。如果执行车间需要更多空间,它可以“借用”仓库里没用到的地方;反之,如果仓库需要缓存更多数据,它也可以占用加工车间空闲的位置。
面对一个非常复杂的 SQL 查询,它需要被反复执行多次(比如在一个机器学习的迭代训练中),为了让第二次及后续的执行速度大大加快,我们就需要缓存和持久化,当我们知道一个数据集(无论是 RDD、DataFrame 还是 DataSet)会被多次使用时,就应该用持久化机制把它缓存在内存里,避免从磁盘重复读取和计算。
Spark中缓存 (Cache) 和检查点 (Checkpoint),这是两种最主要的持久化方式。
缓存 (
cache
/persist
): 这是我们最常用的。它将数据保存在 Executor 的内存或磁盘上。它的优点是速度快,但缺点是它与创建它的 Spark 作业“血缘关系”还在。如果某个 Executor 节点宕机,导致缓存数据丢失,Spark 会根据血缘关系自动重新计算这部分数据。检查点 (
checkpoint
): 这是一种更“硬核”的持久化。它会切断 RDD 的血缘关系,并将数据可靠地写入一个外部存储系统(比如 HDFS)。它的优点是极其可靠,即使整个 Spark 应用挂了,数据还在。缺点是写入外部系统的 I/O 开销比缓存要大得多。
并行度调优
并行度调优主要目的是减少资源空闲率,将资源充分利用起来。
这是一个非常有意思的trade-off的艺术。我们可以把它想象成在超市开收银台:
- 并行度太低:只开 2 个收银台,但有 100 个顾客在排队。结果就是队伍特别长,顾客(数据)等待时间很久,而超市里其他空闲的员工(CPU 核心)无事可做,造成资源浪费。
- 并行度太高:为了 100 个顾客,开了 100 个收银台。每个收银台只处理一个顾客,看起来很快。但实际上,为每个收银台启动和关闭收银机(任务启动和关闭)本身就需要时间,管理这么多收银台也让经理(Driver)很累。这导致了大量的“管理开销”。
我们的目标,就是找到那个“刚刚好”的收银台数量,让数据能够被快速、高效地处理,同时最大化地利用集群资源。
在 Spark SQL 中,控制 Shuffle 后并行度的最核心、最直接的参数就是 spark.sql.shuffle.partitions
。这个参数的默认值通常是 200。
现在,我们来思考一个具体场景: 你的 Spark 作业运行在一个拥有 100 个 CPU 核心的集群上。默认的 spark.sql.shuffle.partitions
是 200。
这意味着,在 JOIN
或 GROUP BY
之后,Spark 会生成 200 个分区,也就是 200 个计算任务 (Task)。你觉得这 200 个任务和 100 个 CPU 核心之间,会产生什么样的执行情况?这里面有没有潜在的低效之处?
这个过程就会产生:
- 第一波执行: 100 个 CPU 核心同时拿起 100 个任务开始处理。
- 等待与第二波: 当一个 CPU 完成了它的第一个任务后,它会立刻去拿第二个任务来做。但是,整个
JOIN
或GROUP BY
操作(在 Spark 中被称为一个 Stage)必须等到所有 200 个任务都完成才能结束。
如果任务的大小(即每个分区的数据量)不均匀,或者某些节点的计算能力稍弱,就会导致: - 快的 CPU 可能 1 秒钟就完成了自己的 2 个任务,然后就进入了“空闲”状态。
- 慢的 CPU 可能花了 10 秒钟才完成它的 2 个任务。
结果就是,整个集群的这 100 个核心,有 9 秒钟的时间里,利用率都不是 100%。
为每个 CPU 核心分配 2 到 3 个任务,通常是一个比较理想的起点spark.sql.shuffle.partitions
=集群总CPU核心数 * 2
(或* 3
)
这样做的好处是,用少量的额外任务调度开销,来换取更高的 CPU 整体利用率,从而“填满”那些因为任务执行时间不均而产生的空闲时间片。
当然你可能注意到了,这还是数据倾斜问题,如果数据源没有存储好,还是调不到一个更优的性能。所以说性能调优是一个环环相扣的综合过程。
总结
第一层:数据源头层 (稳固的地基)
- 目标: 从物理上最大限度地减少数据读取量 (I/O)。
- 核心手段:
- 文件格式: 优选 Parquet/ORC 等列式存储,实现高效 I/O 和压缩。
- 数据分区: 利用“分区裁剪”技术,让查询引擎只访问必要的“文件夹”。
- 数据分桶: 预先“分拣”数据,为
JOIN
消除最昂贵的 Shuffle 环节。
- 关键问题: 我的数据组织是否能让引擎在执行前就过滤掉 99% 的无关数据?
第二层:SQL 语句层 (精妙的蓝图)
- 目标: 告诉引擎“做什么”的同时,也告诉它“怎么做”最高效。
- 核心手段:
WHERE
过滤: 尽早、尽快过滤,充分利用分区和简单谓词。JOIN
策略: 根据表的大小和数据分布,选择广播JOIN
或排序合并JOIN
,并手动或自动处理数据倾斜。GROUP BY
优化: 采用“两阶段聚合”等方式,打散聚合过程中的数据倾斜。
- 关键问题: 我的 SQL 是否避免了不必要的全表扫描、全量 Shuffle 和数据倾斜?
第三层:计算引擎层 (强大的引擎室)
- 目标: 为执行任务提供最优的资源配置和运行环境。
- 核心手段:
- 内存管理: 精细化控制 On-Heap/Off-Heap 内存,通过
persist
缓存关键数据,避免 OOM 和 GC 瓶颈。 - 并行度调优: 设置合理的并行任务数,平衡 CPU 利用率、内存压力和任务调度开销,避免数据溢写和小任务泛滥。
- 内存管理: 精细化控制 On-Heap/Off-Heap 内存,通过
- 关键问题: 我的资源配置是否与我的数据量和计算逻辑相匹配?