sql性能优化

发布于:2025-09-01 ⋅ 阅读:(20) ⋅ 点赞:(0)

性能优化的三个方面

性能优化从三个层面优化:

  • 数据源头方面:数据如何存储和组织,比如文件格式、压缩方式、数据分区和分桶

  • SQL语句方面:SQL查询,分析JOIN类型,WHERE过滤条件,GROUP BY使用等

  • 计算引擎方面:MapReduce和Spark内部运行机制,比如shuffle的原理、内存管理、数据倾斜等

数据源头层面

数据源头方面主要关注文件格式、数据压缩、分区和分桶三个方面,通常是三者结合使用。

文件格式

我们可以把文件格式想象成存放数据的“容器”。有的容器(比如 TXT 或 CSV)就像一个大杂烩的袋子,把每一行所有的数据都装在一起。而另一些容器(比如 Parquet 或 ORC)则更像一个有许多抽屉的柜子,把每一列数据都单独放在一个抽屉里。

这两种存储方式——行式存储(如 CSV, JSON)和列式存储(如 Parquet, ORC)——是我们在大数据领域最常遇到的。它们在处理 SQL 查询时,效率天差地别。

列式存储核心优势在于I/O的效率高。

  • 列式存储 (Parquet/ORC): 只需要直接找到 所需要的字段,读取里面的数据就行了。这被称为 “谓词下推” (Predicate Pushdown) 的一种体现,引擎只读取它绝对需要的数据。I/O 开销非常小。

  • 行式存储 (CSV/JSON): 引擎必须把每一行的数据(全部 100 个字段)都从磁盘加载到内存里,然后再从内存中提取出我们需要的字段。当数据量达到 TB 甚至 PB 级别时,这种无效 I/O 会浪费掉海量的时间和资源。

Apache Paimon(以前称为 Flink Table Store)正在获得很大的关注,Paimon 并不像 Parquet 或 ORC 那样是一种全新的_文件格式_ 。相反,将其视为存储抽象层或位于 Parquet 等文件格式_之上的表格式
区别:

  • Parquet/ORC 就像保存 Word 文档。保存后,该文件是静态且不可更改的(不可变)。要更改它,您必须创建一个全新的文件版本
  • Paimon就像使用 Google 文档一样。它允许多人实时对文档进行微小的更改(更新、删除、插入),并跟踪所有历史记录。您始终会看到最新版本,但基础更改会得到有效管理。

如何实现地像Google文档一样

  1. 基本数据文件(列格式): 大部分历史数据存储在高效、压缩的列式文件(如 Parquet)中
  2. 更改日志: 所有新更改(插入、更新、删除)都写入更小的单独日志文件。这个过程非常快

在 Paimon 表上运行 SQL 查询时,查询引擎足够智能,可以读取大型基本文件,然后应用日志文件中最近的更改,从而提供完美的最新结果。Paimon会定期将这些小日志文件合并到后台较大的基础文件中,以保持效率。这种架构被称为对数结构合并树(LSM 树)

Paimon 本质上为数据湖带来了类似数据库的更新/删除功能。鉴于这种处理实时变化的能力,其非常适合实时数据同步、流失更新分析、用户画像/特征存储。传统的数仓技术(比如 Hive on Parquet)处理“追加”新数据(INSERT)非常高效,但一旦涉及到修改(UPDATE)或删除(DELETE)某几条记录,就变得非常笨拙和昂贵,Paimon则赋予了数据湖“实时更新”的能力,让 Spark 和 Flink 可以在湖上进行更复杂的、交互性更强的 SQL 操作。

数据压缩

列式存储还更有利于数据压缩,两种典型的压缩技术就是字典编码 (Dictionary Encoding)行程长度编码 (Run-Length Encoding, RLE)

  • 字典编码 (Dictionary Encoding):可以把它想象成一个翻译的过程:
    1. 创建字典 📖:系统首先会扫描 city 这一整列的数据,找出所有不重复的城市名(比如 ‘北京’, ‘上海’, ‘广州’),然后给它们各自一个独一无二的、更短的编号(比如 ‘北京’ -> 0, ‘上海’ -> 1, ‘广州’ -> 2)。这个对应的关系就是“字典”。

    2. 翻译原文 ✍️:接下来,系统会用这些简短的数字编号去替换原始的长字符串。所以,原来的一列数据 ['北京', '上海', '北京', '广州', '上海'] 就变成了 [0, 1, 0, 2, 1]

  • 行程长度编码 (Run-Length Encoding, RLE):在字典编码的基础上,对其进行排序,原有的顺序变为[0, 0, 1, 1, 2],再进行压缩,即RLE,压缩成(0, 2), (1, 2), (2, 1)。

文件格式的应用场景

  • 分析型场景:在 MapReduce 和 Spark SQL 中,当我们主要做数据分析(SELECT 几列、GROUP BYWHERE 过滤)时,ParquetORC 这样的列式存储是绝对的首选。

  • 事务型/整行读取场景:如果业务需要频繁地读取或写入一整行所有字段,Avro 这样的行式存储会更合适。

分区和分桶

分区

分区是在我们未经过查询内容即可过滤掉海量无关数据。

想象一下你家里的文件柜 🗄️。如果你把所有的文件——账单、合同、信件——都胡乱塞进同一个抽屉,每次找一份特定日期的账单都会把整个抽屉翻个底朝天。

一个更聪明的方法是,你用标签把抽屉分开,比如 “2023年账单”、“2024年账单”、“保险合同”等等。这样,当你想找2024年的账单时,你会直接拉开标有“2024年账单”的抽屉,完全无视其他抽屉。

数据分区就是完全一样的道理。我们不是把一张表的所有数据都存在一个大文件夹里,而是根据某一列(或几列)的值,把数据文件存放到不同的子文件夹里。最常见的分区键就是日期

例如,一个日志表 logs 可能会被 log_date 这一列分区。在底层存储上,它会形成这样的文件夹结构:

/warehouse/logs/ 
	├── log_date=2024-01-01/ 
	│ └── part-00001.parquet 
	├── log_date=2024-01-02/ 
	│ └── part-00002.parquet 
	├── log_date=2024-01-03/ 
	│ └── part-00003.parquet ...

执行SQL语句

SELECT *
FROM logs
WHERE log_date = '2024-01-02';

会直接去2024-01-02文件夹内去查询,有一个专门的术语,叫做 “分区裁剪” (Partition Pruning)“分区消除” (Partition Elimination)。实现了 零 I/O。查询性能的提升是指数级的,尤其是在数据量巨大的情况下。

分区也可以进行二级分区,比如2024年账单中,有省份,有月份,那就可以将省份作为一级分区,月份作为二级分区,这样的话分区裁剪后,缩小了数据范围。

当然不是说分区越细越好,过度就会造成“小文件问题”(Small Files Problem),小文件问题主要带来两个核心负担:

  1. 对元数据管理(NameNode)的压力 🧠:在 HDFS 中,每一个文件,无论大小,都需要在 NameNode 的内存里保存一份元数据(文件名、位置、权限等)。如果有一亿个小文件,NameNode 就需要存储一亿条元数据记录,这会极大地消耗其内存,甚至可能导致整个集群的“大脑”崩溃。

  2. 对计算引擎(MapReduce/Spark)的压力 ⏱️:每处理一个小文件,计算引擎通常需要启动一个独立的计算任务(Task)。启动一个任务本身是有开销的(比如申请资源、启动进程等)。如果文件只有几 KB,可能启动任务花的时间比真正处理数据的时间还要长。这就好比派一辆大卡车去送一封信,运输工具的成本远高于货物本身的价值。

所以,过度分区导致的小文件问题,会让系统变得“虚胖”,看起来有很多任务在跑,但大部分时间都浪费在了任务调度和元数据管理上,而不是真正的数据计算。

如何解决小文件问题?一般是有两种思路:

  1. 合并小文件 (Compaction):集合成一个大的分区。我们可以定期运行一个专门的程序(比如一个 Spark 作业),把每个分区内的所有小文件读出来,然后重新写入成一两个大的、理想大小的文件(比如 256MB 或 1GB)。这个过程就像整理房间,把散落一地的小纸条都誊写到几张大纸上,既整洁又高效。

  2. 更换/调整分区策略:从源头的设计层面解决问题。在选择分区字段时,我们需要评估它的 “基数” (Cardinality),也就是这一列有多少个不同的值。

    • 高基数 字段(如 user_id,有上百万个值)绝对不能用作分区键,否则会产生灾难性的百万级小文件。
    • 中等基数 字段(如 province,有30多个值)就需要权衡。如果每天每个省的数据都很大,那没问题;如果数据量小,就可能产生小文件。
    • 低基数 字段(如 region - ‘华东’, ‘华北’, '华南’等,只有几个值)通常是更安全的选择。

分桶

分桶可以看作是在分区这个“大抽屉”里,再放入一些“小格子”,它对优化 JOIN 操作有奇效。

JOIN 操作时,Shuffle阶段是最耗时、耗资源的。
我们可以把它想象成一个大型的“配对游戏”。假设你有两堆海量的卡片(代表两个表),每张卡片上都有一个 user_id。为了找到 user_id 相同的卡片对,你不得不:

  1. 把第一堆的所有卡片,根据 user_id 的某个规则(比如哈希值)分发到 100 个不同的篮子里。

  2. 再把第二堆的所有卡片,也用完全相同的规则,分发到另外 100 个篮子里。

  3. 现在,你知道第一个篮子里的卡片,只可能和另一个牌堆的第一个篮子里的卡片配对成功。

这个把所有卡片都重新分发到对应篮子的过程,就是 Shuffle。在分布式计算中,这意味着海量的数据需要在网络中移动,这会极大地消耗网络 I/O 和磁盘 I/O。

分桶 (Bucketing),就是为了避免这个昂贵的“现场配对”过程而发明的。

它做的事情很简单:在数据写入表的时候,就提前把这个“配对游戏”玩一遍。也就是说,我们提前就规定好,user_id 相同的两张表的数据,必须被写入到编号相同的“桶”(文件)里。

这样,当 Spark/Hive 执行 JOIN 时,它会发现:“哦!这两个表都已经按 user_id 分好桶了!我只需要把 table_A 的 1 号桶和 table_B 的 1 号桶拿来JOIN,2 号桶和 2 号桶 JOIN … 就行了,根本不需要再做一次全局的 Shuffle!”

这个优化叫做 “桶剪枝” (Bucket Pruning),它可以将昂贵的 Sort-Merge Join 优化为更高效的 Bucket-Map Join,甚至完全避免 Shuffle。

哈希分桶工作原理

  1. 选择一个或多个列作为“分桶键”(比如 user_id)。
  2. 当一条数据(比如一个用户的订单信息)需要被写入时,系统会对 user_id 这个值进行一次 哈希运算,得到一个哈希值(一个数字)。
  3. 最后,用这个哈希值对你设定的桶的数量取模(比如 hash(user_id) % 4),得出的结果是 0, 1, 2, 或 3。这个结果就决定了这条数据应该被放进哪个桶(文件)里。

因为哈希函数对于同一个输入总能得到同一个输出,所以这个过程保证了同一个 user_id 的所有数据,在任何时候、任何表中,只要分桶规则相同,就必定会进入到同一个编号的桶里。这就是它能奇迹般地避免 JOIN 时 Shuffle 的根本原因。

SQL语句优化层面

SQL语句优化主要关注几个重点区域:

  1. 过滤逻辑(WHERE子句)**:如何尽早、尽快地过滤掉不需要的数据。
  2. 连接逻辑(JOIN子句)**:这是最常见的性能瓶颈,我们会看看如何选择正确的 JOIN 策略。
  3. 聚合操作(GROUP BY子句)**:如何高效地进行分组和计算,尤其是在数据不均匀时。

过滤逻辑

核心原则可以总结为一句话:“尽早过滤,越多越好”
把数据处理想象成一个巨大的漏斗。我们希望在漏斗的最顶端就筛掉最大块的、最无关的“石头”(数据),这样流到下面的数据就越少,后续处理起来就越轻松。
表web_logs中,log_date是天分区字段

SELECT
    user_id,
    request_url
FROM
    web_logs
WHERE
    log_date = '2024-08-25'
    AND request_method = 'GET'
    AND contains(request_url, 'special_promo');

下面我们来分析下引擎执行情况。引擎将始终优先考虑 log_date 过滤器,因为它可以使用分区修剪 ,根本不需要读取任何数据即可丢弃所有其他日期。这通过显著减少甚至不需要考虑的初始数据量,提供了最大的性能提升。
再锁定到指定分区数据后,来看看剩余两个条件,提一个问题,你认为request_method = 'GET'
contains(request_url, 'special_promo') 哪个条件执行起来更容易?

如果把取出的数据看成是excel的表。

  • request_method = 'GET':执行时,是查看“单元格”中的值并检查它是否完全匹配。速度非常快
  • contains(request_url, 'special_promo') :执行时,必须打开“单元格”,读取其中的整个字符串,然后执行搜索算法来查看子字符串“special_promo”是否存在。这需要每一行更多的 CPU 工作。
    在数据库术语中, SARGable(搜索 ARGument-able),意味着它们可以有效地使用索引和其他快捷方式。将函数应用于 WHERE 子句中的列时,它通常会变得不可 SARGable,从而迫使引擎对每一行进行全面扫描和计算。

通过上面例子我们可以知道理想过滤顺序是

  1. 分区修剪(影响最大,避免 I/O)
  2. 索引列或低基数列上的简单、可 SARGable 过滤器
  3. 需要 CPU 密集型函数的复杂、不可 SAG 的过滤器

在现代的Spark 和 Hive 都有了非常成熟切复杂的 基于成本的优化器 (Cost-Based Optimizer, CBO),在写条件的时候我们不需要考虑写的顺序,会自动给我们做优化,但从最佳实践代码可读性的角度出发,我们仍然强烈建议:总是把最强的、最高效的过滤条件写在最前面。

这样做有两个好处:

  • 逻辑清晰:读代码的人能立刻明白这条 SQL 的核心过滤逻辑。

  • 双重保险:万一在某些极端情况下,优化器的统计信息过时或者判断失误,你写的清晰逻辑也能帮它做出正确的选择。

连接逻辑

小表关联大表

join操作进行多表连接,经常遇到小表关联大表的场景:你需要将一张非常大的事实表(比如 100GB 的 sales 销售记录表)和一张非常小的维度表(比如 1MB 的 stores 门店信息表)进行 JOIN
常见的操作就是,将小表去关联大表,在spark中可以对小表广播到excuter上,在mapreduce中可以使用mapjoin将小表缓存起来,我认为从优化角度上看本质上是一样的,即为了减少网络I/O,提升速度,避免昂贵的shuffle阶段和reduce阶段。

在spark中,广播哈希连接” (Broadcast Hash Join),通常也被称为 Map-Side Join。
工作流程:

  1. 判断:Spark 的优化器看到一个大表和一个小表在 JOIN,它会判断小表的大小是否低于一个设定的阈值(比如,默认是 10MB)。
  2. 广播 📡:如果判断通过,Driver 节点会把这张小表的所有数据从 Executor 拉到自己的内存里,然后像广播一样,把它分发给集群中每一个参与计算的 Executor。
  3. 本地执行:现在,每一个 Executor 手里都有一份完整的小表“地图册”。当它处理大表的某个数据分片(Partition)时,它只需要在自己的内存里查找这张“地图册”来完成 JOIN,完全不需要把大表的数据发送到网络上。

在MapReduce 里的 MapJoin 和Spark Broadcast Join,本质上是完全相同的思想。 MapReduce 的实现细节更“手动”一些。过程是这样的:

  1. 分发小表:在 MapReduce 中,我们利用一个叫做 DistributedCache (分布式缓存) 的功能。在作业开始前,我们手动把小表文件上传到这个缓存里。Hadoop 框架会自动确保,每一个将要运行 Map 任务的节点,都会提前把这个小表文件下载到本地。
  2. Mapper 内存加载:当每一个 Map 任务启动时,它的第一件事就是从本地磁盘读取这个小表文件,然后把所有数据加载到自己的内存里,通常会存成一个哈希表(HashMap),这样查找起来非常快。
  3. 本地 Join:接下来,这个 Map 任务开始处理它被分配到的大表数据切片。每读到大表的一行数据,它就用这行数据的 JOIN key,去内存里的哈希表里查找匹配项。如果找到了,就直接把合并后的结果输出。

整个 JOIN 的过程,完全在 Map 阶段就独立完成了,根本不需要把大表的数据进行 Shuffle,也不需要 Reducer 来进行最终的合并。所以它也被称为 “Map 端连接” (Map-Side Join)

Spark 的广播连接,可以看作是这个过程的更智能、更自动化的版本。

大表关联大表

在大表关联大表的场景中,广播策略就不适用了,接上面的例子,原因有两个:

  1. 驱动程序内存过载 :驱动程序节点会尝试在广播之前将所有 100GB 数据拉入自己的内存中,这几乎肯定会导致它崩溃并显示 OutOfMemoryError
  2. 网络饱和 :即使驱动程序有足够的内存,向每个执行器推送 100GB 的数据也会淹没网络,造成巨大的瓶颈。

因此,当 Spark 发现两个表都太大而无法广播时,它会回退到其默认的、更健壮的策略: 排序-合并连接

这个过程有点像合并两副排序的扑克牌:

  1. shuffle:Spark 首先洗牌两张大桌子。它根据联接键在集群中重新分配数据,确保两个表中具有相同键的行最终位于同一台工作器机器上
  2. 排序 :在每台工作器计算机上,每个分区中的数据按联接键排序。
  3. 合并 :然后引擎同时迭代两个排序的数据集。由于它们是排序的,因此它可以非常有效地找到所有匹配的键并连接相应的行。

这种排序-合并连接是大数据连接的主力。然而,最初的shuffle阶段仍然非常昂贵。

面对这种问题,我们可以从数据源层面解决,就是使用分桶。当两张大表进行 JOIN 时,如果它们已经基于 JOIN key 进行了分桶 (Bucketing),那么计算引擎就可以执行更高效的 Bucket-Map Join,完全避免 Shuffle 阶段,因为引擎知道相同 key 的数据必定在对应的桶文件里。

在大表关联大表的过程中,经常遇到数据倾斜(Data Skew)的问题,即我们发现有几个特殊的 user_id (比如代表“未登录游客”的 user_id = -1,或者某个超级大卖家的 ID),它们的数据量占了整个表的 30%。这意味着,在 JOIN 时,处理这几个特殊 user_id 的那几个计算任务 (Task) 会接收到海量的数据,运行得极其缓慢,而其他 99% 的任务早就执行完了,整个集群都在“等”那几个慢任务。这就是典型的“长尾效应”。

那我们从SQL优化角度,如何解决此问题,即有没有办法把这些“异常”的 key 和“正常”的 key 分开处理呢?

假设我们的罪魁祸首是 user_id = -1。在 JOIN 的时候,我们希望 sales 表里所有 user_id = -1 的数据,不要再只和 user_profile 表里 user_id = -1 的那一行去匹配了。

我们可以在 JOIN 之前,对 sales 表里 user_id = -1 的数据做一点“手脚”。比如,我们可以把这些数据的 JOIN key 随机地变成 -1_0, -1_1, -1_2, …, -1_99 这样的新 key。当然,在另一张表user_profile也要进行同样的改造,在 sales 表(大表)这边,我们把 user_id = -1 随机地变成了 -1_0-1_99。这是一个 “一对多” 的打散。那么,在 user_profile 表(小表)这边,我们只有一行 user_id = -1 的数据。为了确保 sales 表里任意一个被打散的 key(比如 -1_42)都能找到匹配项,我们应该对 user_profile 表里的这一行数据做什么样的操作呢?

我们采用的核心策略是 “随机化与复制” (Randomize and Replicate)

  1. **随机化大表 (Randomize the Large Table) **

    • 目标: 把 sales 表中所有 user_id = -1 的数据,均匀地分发给多个不同的计算任务 (Task) 去处理。
    • 方法: 我们在 JOIN 之前,通过一个 CASE WHEN 或者 IF 语句,对 sales 表的 JOIN key 进行改造。
      • 如果 user_id 不是 -1,key 保持不变。
      • 如果 user_id -1,我们给它拼接上一个随机数,比如 0 到 99 之间的一个整数。这样,原来的 user_id = -1 就变成了 -1_0, -1_1, -1_2-1_99 等100个不同的 key。
  2. **复制小表 (Replicate the Small Table)

    • 目标: 确保 sales 表中每一个被随机化了的 key (如 -1_42),都能在 user_profile 表中找到它唯一的匹配对象。
    • 方法: 我们需要将 user_profile 表里 user_id = -1那一行数据复制 100 份。每一份都对应一个我们刚才随机化时可能产生的新 key。
      • 所以 user_profile 表会多出 100 行数据,它们的 JOIN key 分别是 -1_0, -1_1, …, -1_99,而其他的画像信息(如用户名、年龄等)都是完全一样的。

现在,JOIN 操作本身就变得均衡了。当 Spark 执行 JOIN 时:

  • 处理 -1_42 这个 key 的任务,只会收到 sales 表中大约 1/100 的倾斜数据,以及 user_profile 表中那唯一的一行 -1_42 的数据。
  • 每一个处理倾斜 key 的任务,负载都变得大致相同。
    这个手动“加盐” (Salting) 打散 key 的方法,是解决数据倾斜最有效也是最通用的手段之一。

其实,像 Spark 3.0 及之后的版本,已经内置了一些自动处理数据倾斜的功能。Spark维护着一套更详细的统计信息 (Statistics)。当我们在 Hive/Spark 中运行 ANALYZE TABLE ... COMPUTE STATISTICS 这样的命令时,就是在收集这些信息。
主要包括:

  1. 直方图 (Histograms): 它不只记录每个值的数量,而是将数据的值域分成许多个“桶”,然后统计落入每个桶里的数据量。通过直方图,优化器能非常精确地了解数据的分布情况,一眼就能看出哪个值是“超级热点”,从而识别出倾斜。
  2. 基数 (Cardinality): 也就是一列中不重复值的数量。
  3. 空值数量 (Null Counts): 这一列有多少个 NULL。
  4. 最大/平均长度: 对于字符串类型的列,它们的平均和最大长度是多少。

当 Spark 的查询优化器(特别是其 自适应查询执行 AQE 模块)拿到这些情报后,它在执行 JOIN 之前就能做出预判:“根据直方图,user_id = -1 这个 key 的数据量会大到产生倾斜!”

一旦做出这个预判,它就可以自动地、在运行时动态地应用我们刚才手动做的那套“加盐”策略,把倾斜的 key 打散,将任务均匀分配。

从手动优化聊到了自动优化,底层的原理都是相通的,就是识别倾斜 -> 打散倾斜

聚合操作

在group by聚合时也会发生数据倾斜的典型场景。根本原因和 JOIN 倾斜一模一样:少数几个 key 承担了绝大部分的数据处理压力。计算引擎在 Shuffle 阶段会把同一个 user_id 的所有数据都发送到同一个计算任务 (Reducer/Task) 上,导致那个处理 user_id = 1 的任务不堪重负。

既然我们已经知道问题所在,那么解决方案的思路其实也和 JOIN 倾斜非常相似。我们同样需要把那个巨大的 key “打散”。

这个技术通常被称为 “两阶段聚合” (Two-Stage Aggregation)“局部聚合+全局聚合”

我们可以把它想象成“地方选举计票”:

  1. 第一阶段(局部聚合): 不把所有选票都寄到首都去点。而是先在每个市、每个县进行一次初步计票,得到各地的分项统计。为了防止某个“超级大市”的选票过多,我们甚至可以把这个市再分成几个区来分别计票。

  2. 第二阶段(全局聚合): 各个地方只把它们的分项统计结果汇报到首都。首都只需要把这些汇总好的数字加起来,就能得到最终结果。

在 Spark/MapReduce 中,这个过程是:

  1. Map 端聚合 (带随机盐): 在 Shuffle 之前,先对每个分区的数据进行一次 GROUP BY。对于倾斜的 key (如 user_id = 1),我们给它加上一个随机数前缀或后缀(加盐),GROUP BY user_id, salt。这样,原来由一个任务处理的 user_id = 1 被打散成了多个小任务。

  2. Reduce 端聚合: 对已经局部聚合、数据量大大减少的结果进行第二次 GROUP BY,这次只按原始的 user_id 分组,将第一阶段的各个分项结果合并,得出最终的聚合值。

这种两阶段聚合的方法,极大地缓解了 Shuffle 的压力和单个 Reducer 的计算压力。

计算引擎优化

  • 内存管理:Spark 如何分配和使用 Executor 的内存,以及我们如何通过 cachepersist 来加速重复计算
  • 并行度调优:如何设置合理的任务并发数,避免资源浪费或任务阻塞。这直接关系到 spark.sql.shuffle.partitions 这个核心参数
  • 自适应查询执行:Spark 比较新的“黑科技”,引擎可以在运行时动态地调整自己的执行计划,比如自动处理数据倾斜。

内存管理

可以把 Spark 的 Executor 想象成一个独立的计算工厂。这个工厂的总空间(内存)是固定的。工厂内部被划分成了几个主要的车间:

  1. 执行内存 (Execution Memory):这是用来进行“数据加工”的地方,比如执行 Shuffle、Sort、Join、Aggregation 等操作时,存放中间数据的地方。

  2. 存储内存 (Storage Memory):这是一个“临时仓库”,用来存放我们手动通过 cache()persist() 命令缓存起来的数据(RDDs, DataFrames)。

  3. 统一内存管理 (Unified Memory Management):现代 Spark 版本(1.6+)采用了一种聪明的动态机制。执行内存和存储内存共享一个大的区域。如果执行车间需要更多空间,它可以“借用”仓库里没用到的地方;反之,如果仓库需要缓存更多数据,它也可以占用加工车间空闲的位置。

面对一个非常复杂的 SQL 查询,它需要被反复执行多次(比如在一个机器学习的迭代训练中),为了让第二次及后续的执行速度大大加快,我们就需要缓存和持久化,当我们知道一个数据集(无论是 RDD、DataFrame 还是 DataSet)会被多次使用时,就应该用持久化机制把它缓存在内存里,避免从磁盘重复读取和计算。

Spark中缓存 (Cache)检查点 (Checkpoint),这是两种最主要的持久化方式。

  • 缓存 (cache/persist): 这是我们最常用的。它将数据保存在 Executor 的内存或磁盘上。它的优点是速度快,但缺点是它与创建它的 Spark 作业“血缘关系”还在。如果某个 Executor 节点宕机,导致缓存数据丢失,Spark 会根据血缘关系自动重新计算这部分数据。

  • 检查点 (checkpoint): 这是一种更“硬核”的持久化。它会切断 RDD 的血缘关系,并将数据可靠地写入一个外部存储系统(比如 HDFS)。它的优点是极其可靠,即使整个 Spark 应用挂了,数据还在。缺点是写入外部系统的 I/O 开销比缓存要大得多。

并行度调优

并行度调优主要目的是减少资源空闲率,将资源充分利用起来。
这是一个非常有意思的trade-off的艺术。我们可以把它想象成在超市开收银台:

  • 并行度太低:只开 2 个收银台,但有 100 个顾客在排队。结果就是队伍特别长,顾客(数据)等待时间很久,而超市里其他空闲的员工(CPU 核心)无事可做,造成资源浪费。
  • 并行度太高:为了 100 个顾客,开了 100 个收银台。每个收银台只处理一个顾客,看起来很快。但实际上,为每个收银台启动和关闭收银机(任务启动和关闭)本身就需要时间,管理这么多收银台也让经理(Driver)很累。这导致了大量的“管理开销”。

我们的目标,就是找到那个“刚刚好”的收银台数量,让数据能够被快速、高效地处理,同时最大化地利用集群资源。

在 Spark SQL 中,控制 Shuffle 后并行度的最核心、最直接的参数就是 spark.sql.shuffle.partitions。这个参数的默认值通常是 200。

现在,我们来思考一个具体场景: 你的 Spark 作业运行在一个拥有 100 个 CPU 核心的集群上。默认的 spark.sql.shuffle.partitions 是 200。

这意味着,在 JOINGROUP BY 之后,Spark 会生成 200 个分区,也就是 200 个计算任务 (Task)。你觉得这 200 个任务和 100 个 CPU 核心之间,会产生什么样的执行情况?这里面有没有潜在的低效之处?

这个过程就会产生:

  • 第一波执行: 100 个 CPU 核心同时拿起 100 个任务开始处理。
  • 等待与第二波: 当一个 CPU 完成了它的第一个任务后,它会立刻去拿第二个任务来做。但是,整个 JOINGROUP BY 操作(在 Spark 中被称为一个 Stage)必须等到所有 200 个任务都完成才能结束。
    如果任务的大小(即每个分区的数据量)不均匀,或者某些节点的计算能力稍弱,就会导致:
  • 快的 CPU 可能 1 秒钟就完成了自己的 2 个任务,然后就进入了“空闲”状态。
  • 慢的 CPU 可能花了 10 秒钟才完成它的 2 个任务。
    结果就是,整个集群的这 100 个核心,有 9 秒钟的时间里,利用率都不是 100%。
    为每个 CPU 核心分配 2 到 3 个任务,通常是一个比较理想的起点spark.sql.shuffle.partitions = 集群总CPU核心数 * 2 (或 * 3)

这样做的好处是,用少量的额外任务调度开销,来换取更高的 CPU 整体利用率,从而“填满”那些因为任务执行时间不均而产生的空闲时间片。

当然你可能注意到了,这还是数据倾斜问题,如果数据源没有存储好,还是调不到一个更优的性能。所以说性能调优是一个环环相扣的综合过程。

总结

第一层:数据源头层 (稳固的地基)

  • 目标: 从物理上最大限度地减少数据读取量 (I/O)。
  • 核心手段:
    • 文件格式: 优选 Parquet/ORC 等列式存储,实现高效 I/O 和压缩。
    • 数据分区: 利用“分区裁剪”技术,让查询引擎只访问必要的“文件夹”。
    • 数据分桶: 预先“分拣”数据,为 JOIN 消除最昂贵的 Shuffle 环节。
  • 关键问题: 我的数据组织是否能让引擎在执行前就过滤掉 99% 的无关数据?

第二层:SQL 语句层 (精妙的蓝图)

  • 目标: 告诉引擎“做什么”的同时,也告诉它“怎么做”最高效。
  • 核心手段:
    • WHERE 过滤: 尽早、尽快过滤,充分利用分区和简单谓词。
    • JOIN 策略: 根据表的大小和数据分布,选择广播 JOIN 或排序合并 JOIN,并手动或自动处理数据倾斜
    • GROUP BY 优化: 采用“两阶段聚合”等方式,打散聚合过程中的数据倾斜。
  • 关键问题: 我的 SQL 是否避免了不必要的全表扫描、全量 Shuffle 和数据倾斜?

第三层:计算引擎层 (强大的引擎室)

  • 目标: 为执行任务提供最优的资源配置和运行环境。
  • 核心手段:
    • 内存管理: 精细化控制 On-Heap/Off-Heap 内存,通过 persist 缓存关键数据,避免 OOM 和 GC 瓶颈。
    • 并行度调优: 设置合理的并行任务数,平衡 CPU 利用率、内存压力和任务调度开销,避免数据溢写和小任务泛滥。
  • 关键问题: 我的资源配置是否与我的数据量和计算逻辑相匹配?

网站公告

今日签到

点亮在社区的每一天
去签到