Spark Cache 的用武之地

发布于:2024-07-04 ⋅ 阅读:(17) ⋅ 点赞:(0)

在什么情况下适合使用 Cache

我建议你在做决策的时候遵循以下 2 条基本原则:

  • 如果 RDD/DataFrame/Dataset 在应用中的引用次数为 1,就坚决不使用 Cache
  • 如果引用次数大于 1,且运行成本占比超过 30%,应当考虑启用

Cache第一条很好理解,我们详细说说第二条。这里咱们定义了一个新概念:运行成本占比。它指的是计算某个分布式数据集所消耗的总时间与作业执行时间的比值。我们来举个例子,假设我们有个数据分析的应用,端到端的执行时间为 1 小时。应用中有个 DataFrame 被引用了 2 次,从读取数据源,经过一系列计算,到生成这个 DataFrame 需要花费 12 分钟,那么这个 DataFrame 的运行成本占比应该算作:12 * 2 / 60 = 40%。
你可能会说:“作业执行时间好算,直接查看 Spark UI 就好了,DataFrame 的运行时间怎么算呢?”这里涉及一个小技巧,我们可以从现有应用中 把 DataFrame 的计算逻辑单拎出来,然后利用 Spark 3.0 提供的 Noop 来精确地得到 DataFrame 的运行时间。假设 df 是那个被引用 2 次的 DataFrame,我们就可以把 df 依赖的所有代码拷贝成一个新的作业,然后在 df 上调用 Noop 去触发计算。
Noop 的作用很巧妙,它只触发计算,而不涉及落盘与数据存储,因此,新作业的执行时间刚好就是 DataFrame 的运行时间。

//利用noop精确计算DataFrame运行时间
df.write.format("noop").save()

你可能会觉得每次计算占比会很麻烦,但只要你对数据源足够了解、对计算 DataFrame 的中间过程心中有数了之后,其实不必每次都去精确地计算运行成本占比,尝试几次,你就能对分布式数据集的运行成本占比估摸得八九不离十了。

##Cache 的注意事项
弄清楚了应该什么时候使用 Cache 之后,我们再来说说 Cache 的注意事项。首先,我们都知道,.cache是惰性操作,因此在调用.cache之后,需要先用 Action 算子触发缓存的物化过程。
但是,我发现很多同学在选择 Action 算子的时候很随意,first、take、show、count 中哪个顺手就用哪个。这肯定是不对的,这 4 个算子中只有 count 才会触发缓存的完全物化,而 first、take 和 show 这 3 个算子只会把涉及的数据物化。举个例子,show 默认只产生 20 条结果,如果我们在.cache 之后调用 show 算子,它只会缓存数据集中这 20 条记录。
选择好了算子之后,我们再来讨论一下怎么 Cache 这个问题。你可能会说:“这还用说吗?在 RDD、DataFrame 后面调用.cache不就得了”。还真没这么简单,我出一道选择题来考考你,如果给定包含数十列的 DataFrame df 和后续的数据分析,你应该采用下表中的哪种 Cache 方式?

val filePath: String = _
val df: DataFrame = spark.read.parquet(filePath)
 
//Cache方式一
val cachedDF = df.cache
//数据分析
cachedDF.filter(col2 > 0).select(col1, col2)
cachedDF.select(col1, col2).filter(col2 > 100)
 
//Cache方式二
df.select(col1, col2).filter(col2 > 0).cache
//数据分析
df.filter(col2 > 0).select(col1, col2)
df.select(col1, col2).filter(col2 > 100)
 
//Cache方式三
val cachedDF = df.select(col1, col2).cache
//数据分析
cachedDF.filter(col2 > 0).select(col1, col2)
cachedDF.select(col1, col2).filter(col2 > 100)

我们都知道,由于 Storage Memory 内存空间受限,因此 Cache 应该遵循最小公共子集原则,也就是说,开发者应该仅仅缓存后续操作必需的那些数据列。按照这个原则,实现方式 1 应当排除在外,毕竟 df 是一张包含数十列的宽表。
我们再来看第二种 Cache 方式,方式 2 缓存的数据列是col1和col2,且col2数值大于 0。第一条分析语句只是把filter和select调换了顺序;第二条语句filter条件限制col2数值要大于 100,那么,这个语句的结果就是缓存数据的子集。因此,乍看上去,两条数据分析语句在逻辑上刚好都能利用缓存的数据内容。但遗憾的是,这两条分析语句都会跳过缓存数据,分别去磁盘上读取 Parquet 源文件,然后从头计算投影和过滤的逻辑。这是为什么呢?究其缘由是,Cache Manager 要求两个查询的 Analyzed Logical Plan 必须完全一致,才能对 DataFrame 的缓存进行复用。
Analyzed Logical Plan 是比较初级的逻辑计划,主要负责 AST 查询语法树的语义检查,确保查询中引用的表、列等元信息的有效性。像谓词下推、列剪枝这些比较智能的推理,要等到制定 Optimized Logical Plan 才会生效。因此,即使是同一个查询语句,仅仅是调换了select和filter的顺序,在 Analyzed Logical Plan 阶段也会被判定为不同的逻辑计划。因此,为了避免因为 Analyzed Logical Plan 不一致造成的 Cache miss,我们应该采用第三种实现方式,把我们想要缓存的数据赋值给一个变量,凡是在这个变量之上的分析操作,都会完全复用缓存数据。
你看,缓存的使用可不仅仅是调用.cache那么简单。除此之外,我们也应当及时清理用过的 Cache,尽早腾出内存空间供其他数据集消费,从而尽量避免 Eviction 的发生。一般来说,我们会用.unpersist 来清理弃用的缓存数据,它是.cache 的逆操作。unpersist 操作支持同步、异步两种模式:

异步模式:调用 unpersist() 或是 unpersist(False)
同步模式:调用 unpersist(True)

在异步模式下,Driver 把清理缓存的请求发送给各个 Executors 之后,会立即返回,并且继续执行用户代码,比如后续的任务调度、广播变量创建等等。在同步模式下,Driver 发送完请求之后,会一直等待所有 Executors 给出明确的结果(缓存清除成功还是失败)。各个 Executors 清除缓存的效率、进度各不相同,Driver 要等到最后一个 Executor 返回结果,才会继续执行 Driver 侧的代码。
显然,同步模式会影响 Driver 的工作效率。因此,通常来说,在需要主动清除 Cache 的时候,我们往往采用异步的调用方式,也就是调用 unpersist() 或是unpersist(False)。

总结

我们要掌握使用 Cache 的一般性原则和注意事项,我把它们总结为 3 条:

  • 如果 RDD/DataFrame/Dataset 在应用中的引用次数为 1,我们就坚决不使用 Cache
  • 如果引用次数大于 1,且运行成本占比超过 30%,我们就考虑启用 Cache(其中,运行成本占比的计算,可以利用 Spark 3.0 推出的 noop 功能)
  • Action 算子要选择 count 才能完全物化缓存数据,以及在调用 Cache 的时候,我们要把待缓存数据赋值给一个变量。这样一来,只要是在这个变量之上的分析操作都会完全复用缓存数据。