在 Spark 中,缓存是一种将计算结果存储在内存中的方式,目的是加速后续操作。当你执行迭代算法或查询时,如果多次重复使用相同的数据集,缓存可以避免每次都重新计算相同的转换操作。通过缓存,Spark 可以将数据存储在内存中,这样在后续的处理阶段就能更快地访问。
1. Spark 缓存的关键点:
缓存基本概念:
- 通过调用
.cache()
对 DataFrame 或 RDD 进行缓存。 - 默认情况下,数据会存储在内存中(RAM),并且当内存不足或任务完成时,Spark 会重新计算数据。
示例:
df.cache()
- 通过调用
持久化与存储级别:
- 除了
.cache()
之外,Spark 还提供了persist()
方法,允许你使用不同的存储级别来控制数据的存储方式。 - 常见的存储级别:
- MEMORY_ONLY:仅存储在内存中(
.cache()
默认使用此级别)。 - MEMORY_AND_DISK:优先存储在内存中,如果内存不足则溢写到磁盘。
- DISK_ONLY:仅存储在磁盘中。
- MEMORY_ONLY_SER:以序列化格式存储在内存中,节省空间,但需要更多的 CPU 来进行反序列化。
- MEMORY_AND_DISK_SER:以序列化格式存储,内存不足时溢写到磁盘。
- MEMORY_ONLY:仅存储在内存中(
示例:
from pyspark import StorageLevel df.persist(StorageLevel.MEMORY_AND_DISK)
- 除了
何时缓存:
- 重复计算:如果同一个数据集在计算过程中多次使用(比如多次进行连接或过滤操作),缓存能够显著提高性能。
- 迭代算法:像 PageRank、K-means 等迭代算法,都会多次使用同一个数据集,这时缓存非常有效。
何时不缓存:
- 如果数据集很小,能够直接存放在内存中,或者数据集不被多次使用,缓存可能不会带来显著的性能提升。
- 对于非常大的数据集,如果无法完全存放在内存中,缓存会导致数据溢写到磁盘,这可能反而会降低性能。
释放缓存:
- 当完成对缓存数据的操作后,需要调用
unpersist()
来释放缓存,释放内存。
示例:
df.unpersist()
- 当完成对缓存数据的操作后,需要调用
存储效率:
- 缓存适用于可以完全存放在内存中的数据集。如果数据集非常大并且无法完全缓存到内存,Spark 会将数据溢写到磁盘,虽然可以提高可靠性,但性能可能会下降。你可以通过 Spark UI 来监控内存使用情况,并根据需要调整缓存策略。
2. 何时应该使用 Spark 缓存(cache/persist)
在 Spark 计算中,缓存(cache()
)和持久化(persist()
)可以提高性能,但如果使用不当,可能会导致内存压力,甚至影响任务执行。因此,合理使用缓存至关重要。以下是几个适合使用缓存的场景:
1. 数据被重复使用
如果一个 DataFrame 或 RDD 在多个操作中被重复使用,缓存可以避免重复计算,提高性能。
df = spark.read.parquet("hdfs://path/to/data.parquet")
# 多次使用 df
df.cache()
df_filtered = df.filter(df["age"] > 30)
df_grouped = df_filtered.groupBy("city").count()
df_grouped.show()
df_another_filtered = df.filter(df["income"] > 50000)
df_another_filtered.show()
缓存好处:
- 避免
df
每次都从磁盘加载,减少 I/O 开销。 - 避免
df
重新执行所有 transformations(转换操作),加快计算速度。
2. 迭代计算(如机器学习或图计算)
如果算法需要对同一个数据集进行多次迭代计算,例如 KMeans、PageRank 等,缓存可以提高性能。
from pyspark.ml.clustering import KMeans
df = spark.read.parquet("hdfs://path/to/data.parquet")
df.cache() # 缓存数据
kmeans = KMeans(k=5, seed=42)
model = kmeans.fit(df) # KMeans 迭代计算
缓存好处:
- 机器学习算法通常会多次遍历数据(如梯度下降、KMeans 迭代),如果不缓存,每次都要重新读取数据。
3. 数据较大但能装入内存
如果数据较大,但可以完整存入内存,那么缓存可以加速计算。
df = spark.read.parquet("hdfs://path/to/large_data.parquet")
df.persist(StorageLevel.MEMORY_AND_DISK) # 允许部分数据溢写到磁盘
缓存好处:
- 由于数据能放入内存,避免了重复 I/O 操作,提高性能。
4. 读取外部数据源但不希望重复 I/O
如果数据来自外部存储(如 HDFS、S3、数据库),每次查询都需要网络或磁盘 I/O,则缓存可以减少读取开销。
df = spark.read.jdbc(url="jdbc:mysql://localhost:3306/db", table="users", properties={"user": "root", "password": "1234"})
df.cache()
缓存好处:
- 避免每次查询都从 MySQL 读取数据,减少数据库负载。
5. 需要频繁进行跨 Stage 操作
在 Spark 任务中,每个 Stage 之间的数据会进行 shuffle 操作。如果一个中间数据集在多个 Stage 中被使用,缓存可以减少 shuffle 计算开销。
df = spark.read.parquet("hdfs://path/to/data.parquet")
# 计算某个中间数据
df_filtered = df.filter(df["age"] > 30)
df_filtered.cache() # 缓存中间数据
# df_filtered 被用于多个不同的操作
result1 = df_filtered.groupBy("city").count()
result2 = df_filtered.groupBy("country").avg("income")
缓存好处:
- 避免
df_filtered
需要在多个 Stage 之间重复计算。
6. 复杂查询的优化
如果查询中包含多个 expensive(昂贵)的转换,如 groupBy()
、join()
、window functions
,缓存可以减少这些计算的重复执行。
df = spark.read.parquet("hdfs://path/to/data.parquet")
# 复杂计算
df_transformed = df.withColumn("new_col", df["price"] * 1.1).groupBy("category").agg({"price": "sum"})
df_transformed.cache()
# 复用 df_transformed 进行多个操作
df_transformed.show()
df_transformed.write.parquet("hdfs://path/to/output.parquet")
缓存好处:
- 避免
df_transformed
每次都要从df
重新计算,提高性能。
3. 何时不应该使用缓存
数据只使用一次
- 如果数据只使用一次,缓存会浪费内存,并且可能不会带来明显的性能提升。
df = spark.read.parquet("hdfs://path/to/data.parquet") df.groupBy("category").count().show() # 只使用一次,不需要缓存
数据太大,无法装入内存
- 如果数据集远超集群的可用内存,缓存会导致内存溢出,并可能引发 GC(垃圾回收)问题,甚至 OOM(内存溢出)。
- 解决方案:
- 使用
persist(StorageLevel.MEMORY_AND_DISK)
让 Spark 将数据部分存入磁盘。
- 使用
任务本身很轻量,不需要缓存
- 如果计算任务很轻量(例如简单的
select()
查询),缓存不会带来明显的性能提升,反而会增加额外的存储开销。
- 如果计算任务很轻量(例如简单的