Spark 3.3.x版本中的动态分区裁剪(DPP,Dynamic Partition Pruning)的实现及应用剖析

发布于:2024-10-17 ⋅ 阅读:(9) ⋅ 点赞:(0)

Dynamic Partition Pruning(DPP)的作用

一种通用的描述是,DPP在分区级别过滤数据,注意它有别于Partitioin Filter。DPP是在execute阶段生效,对从数据源加载的InputPartition(Spark内部计算数据时定义的数据类型)进一步过滤,减少传递到下游算子的数据量;而Partition Filter则在Planning阶段就可以生效,对要加载的Catalog Partition进行过滤,因此这两类Filter有先后顺序,即先利用Partition Filter加载可见分区,然后再利用DPP对加载后的分区过滤。
希望通过这篇文章,能够帮助你手动推出DPP的完整处理过程。

当然DPP要过滤的对象是InputPartition还是其它类似的数据结构,则跟具体的实现有关,这里仅描述一种通常的处理过程。

注意区别如下4个概念:

Partition Filter:仅包含分区列的过滤条件,右值在planning阶段就可以确定。
Data Filter:仅包含非分区列的过滤条件,右值不确定。
Runtime Filter:可以包含分区列、非分区列的过滤条件,右值只能在execute阶段才能确定 (bloom filter)。
Source Filter:包含非分区列的过滤条件,右值是字面值,ODPS 表,传递给ODPS服务。

其中Runtime Filter也值得再深入讨论,因为Spark还会利用Subquery + Aggregation组合而成的子计划,优化JOIN计划(跟Mysql 中的Indexed Join的功能相似),主要是基于等值JOIN条件,构建BloomFilter数据结构,并将其作为Filter插入到JOIN的Application Side(如LEFT JOIN,就是指LEFT SIDE)。
更多Runtime Filter的故事,待后续的章节。

DPP生效的一些要点

  1. 将关联子查询/IN表达式/Exists表达式等转换成LEFT SEMI JOIN的子查询,并被封装成DynamicPruningExpression

  2. DynamicPruningExpression被会下推到pruning plan,例如a LEFT JOIN b,其中a即是pruning plan,而b是filtering plan

  3. 在默认参数配置下,Filtering plan被转换成可以被广播的子查询,它的输出列集是JOIN KEYs。

  4. 默认情况下DPP只能复用已有的Broadcast Stage起作用:
    设置 spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly=false,允许基于代价模型,进行DPP。

  5. Filtering Plan的子计划得有过滤条件:
    Bad: WHERE a.id IN (SELECT id FROM b WHERE a.id = b.id)
    Good: WHERE a.id IN (SELECT id FROM b WHERE a.id = b.id AND b.id IS NOT NULL)

  6. 当不限制broadcast only时,可以适当调整如下的参数优化DPP:
    默认情况下,spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio=0.5,在计算代价时,用于估算DPP生效时,pruning侧(被过滤)的数据集的减少数据量,只有当减少的量大于filtering侧的读入数据量时,才会应用DPP;
    默认情况下,spark.sql.optimizer.dynamicPartitionPruning.useStats=true,使得DPP的过滤效果的估算更加准备,避免性能回退,但对于ODPS表上的查询,又依赖于如下的两上特殊参数:
    spark.sql.odps.prunedPartitions.statistic.enable=true,默认允许收集统计信息
    spark.sql.odps.prunedPartitions.statistic.countThreshold=512,默认分区数量小于此值时才收集

  7. 尽量避免非等值的关联过滤
    形如:WHERE a.id IN (SELECT b.id FROM b WHERE a.id > b.id)
    考虑转换成WHERE a.id IN (SELECT b.id FROM b WHERE b.id > 0) + JOIN的组合

DPP生效的简单SQL示例

假设表a、b拥有相同的字段定义,其中id字段是分区字段。
那么如下的SQL表示从表a中查询id字段值在子查询返回的结果集的行。

-- 其中id在表a、g表b中都是分区字段
SELECT  *
FROM    a
WHERE a.id IN (SELECT id FROM b WHERE id = 1)

如上面的SQL,在表a JOIN 表b时有过滤条件a.id IN (SELECT id FROM b WHERE id = 1),因此可以尝试应用DPP优化规则,将过滤条件下推到读表a,基于分区字段id进行分区过滤。
开启DPP优化,SQL的执行逻辑是,读取表a中,id字段在满足SELECT id FROM b WHERE id = 1条件的分区数据,然后与表b JOIn;如果没有开启DPP优化,SQL的执行逻辑是,全量读表a的数据,然后与表b JOIN。

经过DPP优化后,上述示例最终等价转换为LEFT SEMI JOIN的句型:

SELECT  *
FROM    a
LEFT SEMI JOIN (SELECT id FROM b WHERE id = 1)
ON a.id = b.id

DPP生效SQL的解析示例

SELECT  *
FROM    a
WHERE a.id IN (SELECT  id
                FROM    b
                WHERE   b.id = a.id AND b.id > 0)

经过SQL解析后,生成的初始逻辑计划树,简单表示如下,由于IN条件的执行依赖于外部表的字段,即a.id,因此是不能直接进行物化执行的,需要对这类关联/依赖子查询进行改写。

Project [*]
  Filter [a.id] In (ListQuery []:
      Project [b.id]
        Filter [b.id = a.id, b.id > 0]
          Relation [b.id])
    Relation [a.*]

带有DPP信息的逻辑执行计划:

Project [a.*]
  LeftSemiJoin [b.id = a.id, b.id = a.id]
    Filter [DynamicPruningSubquery(Project [b.id]
            Filter [b.id > 0]
            Relation [b.id])]
      Relation [a.*]
    Project [b.id]
      Filter [b.id > 0]
        Relation [b.id]

最终的物理执行计划:

-- Physical Plan
-- DatasourceV2Strategy物化逻辑计划树时,会下推DynamicPruning类型的过滤表达式到BatchScanExec
-- 在这里就是DynamicPruningExpression,由于FilterExec只有一个表达式,因此会被完全消除。
ProjectExec [a.*]
  BroadcastJoinExec [a.*] [b.id = a.id, b.id = a.id]
      BatchScanExec [a.*] [runtimeFilters = DynamicPruningExpression(
              InSubqueryExec(a.id, 
                 SubqueryBroadcastExec(Project [b.id]
                                         Filter [b.id > 0]
                                           BatchScanExec [b.id])))]
    BroadcaseExchangeExec
      ProjectExec [b.id]
        FilterExec [b.id > 0]
          BatchScanExec [b.id]

Deduplicate Correlated Subquery

Rule Name: PullupCorrelatedPredicates

对于示例中的SQL句型,会生成如下的逻辑计划(其中b.id = a.id会被单独抽出来,以备后续的处理),ListQuery的子计划,由于没有了外部关联/依赖,因此可以独立地执行。

Project [*]
  Filter [a.id] In (ListQuery [b.id = a.id]:
      Project [b.id]
        Filter [b.id > 0]
          Relation [b.id])
    Relation [a.*]

PullupCorrelatedPredicates的实现过程及分析如下:

object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper {
   /**
    * Returns the correlated predicates and a updated plan that removes the outer references.
    */
  private def pullOutCorrelatedPredicates(
      sub: LogicalPlan,
      outer: LogicalPlan): (LogicalPlan, Seq[Expression]) = {
    // 存储所有的逻辑计划树与关联过滤条件的映射关系,由于关联过滤条件不能被对应的逻辑计划树直接处理
    // 因此需要抽取出来,以便将这些关联过滤条件,上推到与outer join结点中,作为新的join conditions。
    val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]]

    /** Determine which correlated predicate references are missing from this plan. */
    def missingReferences(p: LogicalPlan): AttributeSet = {
      val localPredicateReferences = p.collect(predicateMap)
        .flatten
        .map(_.references)
        .reduceOption(_ ++ _)
        .getOrElse(AttributeSet.empty)
      localPredicateReferences -- p.outputSet
    }

    // Simplify the predicates before pulling them out.
    // 先简化表达式,然后自底向上,抽出关联过滤条件,同时在必然的结点中,追加由于
    // 需要某个子树额外输出的attributes。
    val transformed = BooleanSimplification(sub) transformUp {
      case f @ Filter(cond, child) =>
        // 返回关联过滤条件和非关联过滤条件,例如
        //  SELECT * FROM t1 a
        //  WHERE
        //  NOT EXISTS (SELECT * FROM t1 b WHERE a.i = b.i AND b.i > 0)
        // EXISTS子查询处理后的结果为:(Seq(a.i = b.i), Seq(b.i > 0))
        val (correlated, local) =
          splitConjunctivePredicates(cond).partition(containsOuter)

        // Rewrite the filter without the correlated predicates if any.
        correlated match {
          case Nil => f
          case xs if local.nonEmpty =>
            // 如果子查询存在非关联的过滤条件时,就会将这些过滤条件组成一个新的Filter结点,
            // 替换原来的孩子计划树
            val newFilter = Filter(local.reduce(And), child)
            predicateMap += newFilter -> xs
            newFilter
          case xs =>
            // 只存在关联过滤条件时,保持原来的孩子计划树
            predicateMap += child -> xs
            child
        }
      case p @ Project(expressions, child) =>
        // 如果当前的sub计划树的存在project,则可能由于抽取了孩子子树的关联过滤条件,而
        // 这些filters中的某些attributes,并不会出现在project结点中,因此这里需要将这些
        // 丢失的属性追加到project中,才能保证被“上推”的过滤条件能够正确读取相应的字段。
        val referencesToAdd = missingReferences(p)
        if (referencesToAdd.nonEmpty) {
          Project(expressions ++ referencesToAdd, child)
        } else {
          p
        }
      case a @ Aggregate(grouping, expressions, child) =>
        // 同理Project结点的处理方式,只不过这里多了grouping表达式的处理,需要也把这些
        // 不需要参与聚合的属性,追加到聚合过程中,以便关联过滤条件“上移”后,能够正确读取。
        val referencesToAdd = missingReferences(a)
        if (referencesToAdd.nonEmpty) {
          Aggregate(grouping ++ referencesToAdd, expressions ++ referencesToAdd, child)
        } else {
          a
        }
      case p =>
        p
    }

    // Make sure the inner and the outer query attributes do not collide.
    // In case of a collision, change the subquery plan's output to use
    // different attribute by creating alias(s).
    val baseConditions = predicateMap.values.flatten.toSeq
    val outerPlanInputAttrs = outer.inputSet
    val (newPlan, newCond) = if (outerPlanInputAttrs.nonEmpty) {
      // 由于当前子查询的output attributes和父查询的input attributes可能存在重复的属性,
      // 因此要上推的关联过滤条件,也可能存在重复的属性,
      // 如果不去重,关联过滤条件被上推到JOIN后,由于可能产生`a = a`的情况,其中等式左边的属性
      // 字段名a来自原sub计划树中,右侧字段名a来自outer计划树,显示作为JOIN条件时,会被优化成true,
      // 打破预期的JOIN结构,因此这里会对这种情况进行重写,对来自sub计划树的attributes进行重命名,
      // 这样就能保证新的join条件的左右两侧属性是来自于`逻辑上不同的表`。
      val (plan, deDuplicatedConditions) =
        DecorrelateInnerQuery.deduplicate(transformed, baseConditions, outerPlanInputAttrs)
      // 返回解耦后的,新的子查询,同时返回新的JOIN条件
      (plan, stripOuterReferences(deDuplicatedConditions))
    } else {
      // outerPlanInputAttrs为空,暂时没有想到或找到合适的用例,但一种可能的情况是
      // outer plan是一个LocalRelation()的实例,它没有输出,仅仅表示一个空的集合。
      (transformed, stripOuterReferences(baseConditions))
    }
    (newPlan, newCond)
  }

  private def rewriteSubQueries(plan: LogicalPlan): LogicalPlan = {
    /**
     * This function is used as a aid to enforce idempotency of pullUpCorrelatedPredicate rule.
     * In the first call to rewriteSubqueries, all the outer references from the subplan are
     * pulled up and join predicates are recorded as children of the enclosing subquery expression.
     * The subsequent call to rewriteSubqueries would simply re-records the `children` which would
     * contains the pulled up correlated predicates (from the previous call) in the enclosing
     * subquery expression.
     */
    def getJoinCondition(newCond: Seq[Expression], oldCond: Seq[Expression]): Seq[Expression] = {
      if (newCond.isEmpty) oldCond else newCond
    }

    def decorrelate(
        sub: LogicalPlan,
        outer: LogicalPlan,
        handleCountBug: Boolean = false): (LogicalPlan, Seq[Expression]) = {
      if (SQLConf.get.decorrelateInnerQueryEnabled) {
        DecorrelateInnerQuery(sub, outer, handleCountBug)
      } else {
        pullOutCorrelatedPredicates(sub, outer)
      }
    }

    plan.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
      case ScalarSubquery(sub, children, exprId, conditions) if children.nonEmpty =>
        val (newPlan, newCond) = decorrelate(sub, plan)
        ScalarSubquery(newPlan, children, exprId, getJoinCondition(newCond, conditions))
      case Exists(sub, children, exprId, conditions) if children.nonEmpty =>
        val (newPlan, newCond) = pullOutCorrelatedPredicates(sub, plan)
        Exists(newPlan, children, exprId, getJoinCondition(newCond, conditions))
      case ListQuery(sub, children, exprId, childOutputs, conditions) if children.nonEmpty =>
        // 对应示例的SQL句型:WHERE a IN (subquery)
        val (newPlan, newCond) = pullOutCorrelatedPredicates(sub, plan)
        ListQuery(newPlan, children, exprId, childOutputs, getJoinCondition(newCond, conditions))
      case LateralSubquery(sub, children, exprId, conditions) if children.nonEmpty =>
        val (newPlan, newCond) = decorrelate(sub, plan, handleCountBug = true)
        LateralSubquery(newPlan, children, exprId, getJoinCondition(newCond, conditions))
    }
  }

  /**
   * Pull up the correlated predicates and rewrite all subqueries in an operator tree..
   */
  def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
    _.containsPattern(PLAN_EXPRESSION)) {
    case j: LateralJoin =>
      val newPlan = rewriteSubQueries(j)
      // Since a lateral join's output depends on its left child output and its lateral subquery's
      // plan output, we need to trim the domain attributes added to the subquery's plan output
      // to preserve the original output of the join.
      if (!j.sameOutput(newPlan)) {
        Project(j.output, newPlan)
      } else {
        newPlan
      }
    // Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries.
    case q: UnaryNode =>
      rewriteSubQueries(q)
    case s: SupportsSubquery =>
      rewriteSubQueries(s)
  }
}

Rewrite Predicates as Join

Rule Name: RewritePredicateSubquery
经过PullupCorrelatedPredicates优化规则的应用,原本的关联过滤条件会从子查询中抽取出来,生成一个新ListQuery结点。随后的过程,就是经过RewritePredicateSubquery规则,再次改写,生成合适的JOIN结点。

-- Before
Project [a.*]
  Filter [a.id] In (ListQuery:
      Project [b.id]
        Filter [(b.id IN (SELECT id FROM c WHERE c.id>0) = a.id, b.id > 0]
          Relation [b.id])
    Relation [a.*]
    
-- After
Project [a.*]
  LeftSemiJoin [b.id = a.id, b.id = a.id]
    Relation [a.*]
    Project [b.id]
      Filter [b.id > 0]
        Relation [b.id]

RewritePredicateSubquery的实现过程及分析如下,:

object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {

  private def buildJoin(
      outerPlan: LogicalPlan,
      subplan: LogicalPlan,
      joinType: JoinType,
      condition: Option[Expression]): Join = {
    // Deduplicate conflicting attributes if any.
    val dedupSubplan = dedupSubqueryOnSelfJoin(outerPlan, subplan, None, condition)
    // 生成一个新的JOIN计划,其中dedupSubplan就是,ListQuery结点对应的子计划
    // condition就是抽取出来的关联子查询
    Join(outerPlan, dedupSubplan, joinType, condition, JoinHint.NONE)
  }

  /**
   * 解耦自我join的子查询,型如:
   * SELECT * FROM t1 a
   * WHERE a.i EXISTS (SELECT i FROM t1 b WHERE a.i = b.i)
   * 逻辑上会被转换成如下的SQL:
   * SELECT a.* FROM t1 a
   * LEFT SEMI JOIN (SELECT i FROM t1) b
   * ON a.i = b.i
   * 不难看出,a与b对应的真实表是同一个,因此可能存在a.i = b.i被解析为true literal,导致
   * 解析问题。
   * 但从Spark 3.3.x版本的测试看,SPARK-21835、SPARK-26078的示例总是不会相同的attributes,
   * 可能是在某个历史版本才出现的问题吧。
   **/
  private def dedupSubqueryOnSelfJoin(
      outerPlan: LogicalPlan,
      subplan: LogicalPlan,
      valuesOpt: Option[Seq[Expression]],
      condition: Option[Expression] = None): LogicalPlan = {
    // SPARK-21835: It is possibly that the two sides of the join have conflicting attributes,
    // the produced join then becomes unresolved and break structural integrity. We should
    // de-duplicate conflicting attributes.
    // SPARK-26078: it may also happen that the subquery has conflicting attributes with the outer
    // values. In this case, the resulting join would contain trivially true conditions (e.g.
    // id#3 = id#3) which cannot be de-duplicated after. In this method, if there are conflicting
    // attributes in the join condition, the subquery's conflicting attributes are changed using
    // a projection which aliases them and resolves the problem.
    val outerReferences = valuesOpt.map(values =>
    AttributeSet.fromAttributeSets(values.map(_.references))).getOrElse(AttributeSet.empty)
    val outerRefs = outerPlan.outputSet ++ outerReferences
    val duplicates = outerRefs.intersect(subplan.outputSet)
    if (duplicates.nonEmpty) {
      condition.foreach { e =>
          val conflictingAttrs = e.references.intersect(duplicates)
          if (conflictingAttrs.nonEmpty) {
            throw QueryCompilationErrors.conflictingAttributesInJoinConditionError(
              conflictingAttrs, outerPlan, subplan)
          }
      }
      val rewrites = AttributeMap(duplicates.map { dup =>
        dup -> Alias(dup, dup.toString)()
      }.toSeq)
      val aliasedExpressions = subplan.output.map { ref =>
        rewrites.getOrElse(ref, ref)
      }
      Project(aliasedExpressions, subplan)
    } else {
      subplan
    }
  }

  def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
    _.containsAnyPattern(EXISTS_SUBQUERY, LIST_SUBQUERY)) {
     // 匹配的SQL子句型,如:WHERE a.id IN (SELECT id FROM b WHERE id = 1)
    case Filter(condition, child)
      if SubqueryExpression.hasInOrCorrelatedExistsSubquery(condition) =>
      val (withSubquery, withoutSubquery) =
        splitConjunctivePredicates(condition)
          .partition(SubqueryExpression.hasInOrCorrelatedExistsSubquery)

      // 构建新的过滤表达式,不带有exist/in (subquery)模式的表达式
      // Construct the pruned filter condition.
      val newFilter: LogicalPlan = withoutSubquery match {
        case Nil => child
        case conditions => Filter(conditions.reduce(And), child)
      }

      // Filter the plan by applying left semi and left anti joins.
      withSubquery.foldLeft(newFilter) {
        case (p, Exists(sub, _, _, conditions)) =>
          val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
          buildJoin(outerPlan, sub, LeftSemi, joinCond)
        case (p, InSubquery(values, ListQuery(sub, _, _, _, conditions))) =>
          // Deduplicate conflicting attributes if any.
          // 到这里conditions,已经包含了原本在sub树中的关联过滤条件,这里再次尝试
          // 消除self join的情况,但实际测试中,相关的单元测试的SQL总是不会出现self join
          // 的问题,因此newSub始终等于sub。
          val newSub = dedupSubqueryOnSelfJoin(p, sub, Some(values))
          // 为所有的JOIN keys生成一个新的等值条件,左侧来自于outer plan,右侧来自于sub
          // 如果condidtioins包含了某个key的等值条件,这里依然会重复生成,因此有一定的冗余
          // 不过会在后续的过程被优化掉。
          val inConditions = values.zip(newSub.output).map(EqualTo.tupled)
          // 递归对join条件进行重写,替换其中的exists/in表达式为ExistenceJoin,
          // 例如有如下的改写逻辑(其中a.id = (b.id IN (SELECT id FROM t2))是带有subquery的predicate:
          // Filter(a.id = (b.id IN (SELECT id FROM t2)), Relation(b))
          // ==>
          // Filter(
          //   Join(Relation(b),
          //     Subquery(SELECT id FROM t2),
          //     ExistenceJoin,
          //     b.id = t2.id,
          //     ExistenceJoin)
          //   ))
          // 
          val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p)
          // 生成一个新的JOIN,以替换原来的形如:
          //Filter i#254 IN (list#253 [i#254 && (i#254 = i#257)])
          //:  +- Project [i#257]
          //:     +- Relation default.t1[i#257,j#258] parquet
          //+- Relation default.t1[i#254,j#255] parquet
          // 转换为
          //Join LeftSemi, ((i#254 = i#257) AND (i#254 = i#257))
          //:- Relation default.t1[i#254,j#255] parquet
          //+- Project [i#257]
          //   +- Relation default.t1[i#257,j#258] parquet
          Join(outerPlan, newSub, LeftSemi, joinCond, JoinHint.NONE)
        case other => other // 这里删除了其它匹配模式的处理逻辑,不仅仅包含上面的两个case
      }
    // 匹配的SQL句型,如:SELECT a.id IN (SELECT id FROM b WHERE id = 1)
    case u: UnaryNode if u.expressions.exists(
        SubqueryExpression.hasInOrCorrelatedExistsSubquery) =>
      var newChild = u.child
      u.mapExpressions(expr => {
        val (newExpr, p) = rewriteExistentialExpr(Seq(expr), newChild)
        newChild = p
        // The newExpr can not be None
        newExpr.get
      }).withNewChildren(Seq(newChild))
  }

  /**
   * Given a predicate expression and an input plan, it rewrites any embedded existential sub-query
   * into an existential join. It returns the rewritten expression together with the updated plan.
   * Currently, it does not support NOT IN nested inside a NOT expression. This case is blocked in
   * the Analyzer.
   */
  private def rewriteExistentialExpr(
      exprs: Seq[Expression],
      plan: LogicalPlan): (Option[Expression], LogicalPlan) = {
    var newPlan = plan
    val newExprs = exprs.map { e =>
      e.transformDownWithPruning(_.containsAnyPattern(EXISTS_SUBQUERY, IN_SUBQUERY)) {
        case Exists(sub, _, _, conditions) =>
          val exists = AttributeReference("exists", BooleanType, nullable = false)()
          newPlan =
            buildJoin(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And))
          exists
        case Not(InSubquery(values, ListQuery(sub, _, _, _, conditions))) =>
          val exists = AttributeReference("exists", BooleanType, nullable = false)()
          // Deduplicate conflicting attributes if any.
          val newSub = dedupSubqueryOnSelfJoin(newPlan, sub, Some(values))
          val inConditions = values.zip(sub.output).map(EqualTo.tupled)
          // To handle a null-aware predicate not-in-subquery in nested conditions
          // (e.g., `v > 0 OR t1.id NOT IN (SELECT id FROM t2)`), we transform
          // `inCondition` (t1.id=t2.id) into `(inCondition) OR ISNULL(inCondition)`.
          //
          // For example, `SELECT * FROM t1 WHERE v > 0 OR t1.id NOT IN (SELECT id FROM t2)`
          // is transformed into a plan below;
          // == Optimized Logical Plan ==
          // Project [id#78, v#79]
          // +- Filter ((v#79 > 0) OR NOT exists#83)
          //   +- Join ExistenceJoin(exists#83), ((id#78 = id#80) OR isnull((id#78 = id#80)))
          //     :- Relation[id#78,v#79] parquet
          //     +- Relation[id#80] parquet
          val nullAwareJoinConds = inConditions.map(c => Or(c, IsNull(c)))
          val finalJoinCond = (nullAwareJoinConds ++ conditions).reduceLeft(And)
          newPlan = Join(newPlan, newSub, ExistenceJoin(exists), Some(finalJoinCond), JoinHint.NONE)
          Not(exists)
        case InSubquery(values, ListQuery(sub, _, _, _, conditions)) =>
          val exists = AttributeReference("exists", BooleanType, nullable = false)()
          // Deduplicate conflicting attributes if any.
          val newSub = dedupSubqueryOnSelfJoin(newPlan, sub, Some(values))
          val inConditions = values.zip(newSub.output).map(EqualTo.tupled)
          val newConditions = (inConditions ++ conditions).reduceLeftOption(And)
          newPlan = Join(newPlan, newSub, ExistenceJoin(exists), newConditions, JoinHint.NONE)
          exists
      }
    }
    (newExprs.reduceOption(And), newPlan)
  }
}

Rewrite Join With Dynamic Subquery

Rule Name: PartitionPruning

-- Before
-- id是一个分区字段
Project [a.*]
  LeftSemiJoin [b.id = a.id, b.id = a.id]
    Relation [a.*]
    Project [b.id]
      Filter [b.id > 0]
        Relation [b.id]
    
-- After
-- Cond1: Left Semi Join,可以对左侧表进行动态过滤
-- Cond2: id是分区字段,因此过滤条件能够被下推到scan
-- Cond3: JOIN右侧表计划树,包含Filter条件,b.id > 0
-- Cond4: 
--  JOIN Key/Pruning key是a.id/b.id,基于列的stats信息估算出,DPP的过滤比filterRatio
--   当a.id的distincts数量 > b.id的distinct数量时,filterRatio=1-distinct_b_id/distinct_a_id
--   其它情况则是spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio = 0.5
--   得到裁剪收益benefits:filterRatio * stats_size_a > stats_size_b,因此可以广播表b中id字段的数据集。
Project [a.*]
  LeftSemiJoin [b.id = a.id, b.id = a.id]
    Filter [DynamicPruningSubquery(Project [b.id]
            Filter [b.id > 0]
            Relation [b.id])]
      Relation [a.*]
    Project [b.id]
      Filter [b.id > 0]
        Relation [b.id]

PartitionPruning的实例逻辑及分析:

object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with JoinSelectionHelper {
  /**
   * Insert a dynamic partition pruning predicate on one side of the join using the filter on the
   * other side of the join.
   *  - to be able to identify this filter during query planning, we use a custom
   *    DynamicPruning expression that wraps a regular In expression
   *  - we also insert a flag that indicates if the subquery duplication is worthwhile and it
   *  should run regardless of the join strategy, or is too expensive and it should be run only if
   *  we can reuse the results of a broadcast
   */
  private def insertPredicate(
      pruningKey: Expression,
      pruningPlan: LogicalPlan,
      filteringKey: Expression,
      filteringPlan: LogicalPlan,
      joinKeys: Seq[Expression],
      partScan: LogicalPlan): LogicalPlan = {
    val reuseEnabled = conf.exchangeReuseEnabled
    val index = joinKeys.indexOf(filteringKey)

    // prunning plan被裁剪掉的数据集大小,大于于右边表时,才是有收益的
    lazy val hasBenefit = pruningHasBenefit(pruningKey, partScan, filteringKey, filteringPlan)
    if (reuseEnabled || hasBenefit) {
      // 只有开启了stage reuse功能,实际上只能是reuse broadcast stage;或是有收益的,才会插入DPP
      // insert a DynamicPruning wrapper to identify the subquery during query planning
      Filter(
        DynamicPruningSubquery(
          pruningKey,
          filteringPlan,
          joinKeys,
          index,
          conf.dynamicPartitionPruningReuseBroadcastOnly || !hasBenefit),
        pruningPlan)
    } else {
      // abort dynamic partition pruning
      pruningPlan
    }
  }

  /**
   * Given an estimated filtering ratio we assume the partition pruning has benefit if
   * the size in bytes of the partitioned plan after filtering is greater than the size
   * in bytes of the plan on the other side of the join. We estimate the filtering ratio
   * using column statistics if they are available, otherwise we use the config value of
   * `spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio`.
   */
  private def pruningHasBenefit(
      partExpr: Expression,
      partPlan: LogicalPlan,
      otherExpr: Expression,
      otherPlan: LogicalPlan): Boolean = {

    // get the distinct counts of an attribute for a given table
    def distinctCounts(attr: Attribute, plan: LogicalPlan): Option[BigInt] = {
      plan.stats.attributeStats.get(attr).flatMap(_.distinctCount)
    }

    // the default filtering ratio when CBO stats are missing, but there is a
    // predicate that is likely to be selective
    val fallbackRatio = conf.dynamicPartitionPruningFallbackFilterRatio
    // the filtering ratio based on the type of the join condition and on the column statistics
    val filterRatio = (partExpr.references.toList, otherExpr.references.toList) match {
      // filter out expressions with more than one attribute on any side of the operator
      case (leftAttr :: Nil, rightAttr :: Nil)
        if conf.dynamicPartitionPruningUseStats =>
          // get the CBO stats for each attribute in the join condition
          val partDistinctCount = distinctCounts(leftAttr, partPlan)
          val otherDistinctCount = distinctCounts(rightAttr, otherPlan)
          val availableStats = partDistinctCount.isDefined && partDistinctCount.get > 0 &&
            otherDistinctCount.isDefined
          if (!availableStats) {
            fallbackRatio
          } else if (partDistinctCount.get.toDouble <= otherDistinctCount.get.toDouble) {
            // there is likely an estimation error, so we fallback
            fallbackRatio
          } else {
            1 - otherDistinctCount.get.toDouble / partDistinctCount.get.toDouble
          }
      case _ => fallbackRatio
    }

    val estimatePruningSideSize = filterRatio * partPlan.stats.sizeInBytes.toFloat
    val overhead = calculatePlanOverhead(otherPlan)
    estimatePruningSideSize > overhead
  }

  /**
   * Calculates a heuristic overhead of a logical plan. Normally it returns the total
   * size in bytes of all scan relations. We don't count in-memory relation which uses
   * only memory.
   */
  private def calculatePlanOverhead(plan: LogicalPlan): Float = {
    val (cached, notCached) = plan.collectLeaves().partition(p => p match {
      case _: InMemoryRelation => true
      case _ => false
    })
    val scanOverhead = notCached.map(_.stats.sizeInBytes).sum.toFloat
    val cachedOverhead = cached.map {
      case m: InMemoryRelation if m.cacheBuilder.storageLevel.useDisk &&
          !m.cacheBuilder.storageLevel.useMemory =>
        m.stats.sizeInBytes.toFloat
      case m: InMemoryRelation if m.cacheBuilder.storageLevel.useDisk =>
        m.stats.sizeInBytes.toFloat * 0.2
      case m: InMemoryRelation if m.cacheBuilder.storageLevel.useMemory =>
        0.0
    }.sum.toFloat
    scanOverhead + cachedOverhead
  }


  /**
   * Search a filtering predicate in a given logical plan
   */
  private def hasSelectivePredicate(plan: LogicalPlan): Boolean = {
    plan.exists {
      case f: Filter => isLikelySelective(f.condition)
      case _ => false
    }
  }

  /**
   * To be able to prune partitions on a join key, the filtering side needs to
   * meet the following requirements:
   *   (1) it can not be a stream
   *   (2) it needs to contain a selective predicate used for filtering
   */
  private def hasPartitionPruningFilter(plan: LogicalPlan): Boolean = {
    !plan.isStreaming && hasSelectivePredicate(plan)
  }

  private def prune(plan: LogicalPlan): LogicalPlan = {
    plan transformUp {
      // skip this rule if there's already a DPP subquery on the LHS of a join
      case j @ Join(Filter(_: DynamicPruningSubquery, _), _, _, _, _) => j
      case j @ Join(_, Filter(_: DynamicPruningSubquery, _), _, _, _) => j
      case j @ Join(left, right, joinType, Some(condition), hint) =>
        // 只会对JOIN结构生效
        var newLeft = left
        var newRight = right

        // extract the left and right keys of the join condition
        val (leftKeys, rightKeys) = j match {
          case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, _, _, _) => (lkeys, rkeys)
          case _ => (Nil, Nil)
        }

        // checks if two expressions are on opposite sides of the join
        def fromDifferentSides(x: Expression, y: Expression): Boolean = {
          def fromLeftRight(x: Expression, y: Expression) =
            !x.references.isEmpty && x.references.subsetOf(left.outputSet) &&
              !y.references.isEmpty && y.references.subsetOf(right.outputSet)
          fromLeftRight(x, y) || fromLeftRight(y, x)
        }

        splitConjunctivePredicates(condition).foreach {
          case EqualTo(a: Expression, b: Expression)
              if fromDifferentSides(a, b) =>
            val (l, r) = if (a.references.subsetOf(left.outputSet) &&
              b.references.subsetOf(right.outputSet)) {
              a -> b
            } else {
              b -> a
            }

            // there should be a partitioned table and a filter on the dimension table,
            // otherwise the pruning will not trigger
            var filterableScan = getFilterableTableScan(l, left)
            if (filterableScan.isDefined && canPruneLeft(joinType) &&
                hasPartitionPruningFilter(right)) {
              // 左边表是prunning plan,右边表是filtering plan
              // 只有当右侧表有过滤条件时,才会会左边表插入DPP predicate
              newLeft = insertPredicate(l, newLeft, r, right, rightKeys, filterableScan.get)
            } else {
              filterableScan = getFilterableTableScan(r, right)
              if (filterableScan.isDefined && canPruneRight(joinType) &&
                  hasPartitionPruningFilter(left) ) {
                newRight = insertPredicate(r, newRight, l, left, leftKeys, filterableScan.get)
              }
            }
          case _ =>
        }
        // 返回一个新的plan结点
        Join(newLeft, newRight, joinType, Some(condition), hint)
    }
  }

  override def apply(plan: LogicalPlan): LogicalPlan = plan match {
    // Do not rewrite subqueries.
    case s: Subquery if s.correlated => plan
    case _ if !conf.dynamicPartitionPruningEnabled => plan
    case _ => prune(plan)
  }
}

Rewrite Dynamic Subquery as Dynamic Expression

Rule Name: PlanDynamicPruningFilters

-- Before
-- id是一个分区字段
-- Cond1: Left Semi Join,可以对左侧表进行动态过滤
-- Cond2: id是分区字段,因此过滤条件能够被下推到scan
-- Cond3: JOIN右侧表计划树,包含Filter条件,b.id > 0
-- Cond4: JOIN Key/ Pruning key是a.id/b.id,基于列的stats信息估算出,DPP的过滤比filterRatio
--   当a.id的distincts数量 > b.id的distinct数量时,filterRatio=1-distinct_b_id/distinct_a_id
--   其它情况则是spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio = 0.5
--   得到裁剪收益benefits:filterRatio * stats_size_a > stats_size_b,因此可以广播b
--   假设benefits = ture
Project [a.*]
  LeftSemiJoin [b.id = a.id, b.id = a.id]
    Filter [DynamicPruningSubquery(Project [b.id]
            Filter [b.id > 0]
            Relation [b.id]]
      Relation [a.*]
    Project [b.id]
      Filter [b.id > 0]
        Relation [b.id]

-- After
-- case1: 支持exchange reuse,而且计划树中存在broadcast计划,且与filtering plan相同,即DPS,时
--        DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
-- case2: filtering plan只能被广播时
--        DynamicPruningExpression(Literal.TrueLiteral)
-- case3: 即使不能走broadcast,但裁剪有收益
--        DynamicPruningExpression(expressions.InSubquery(
--            Seq(value), ListQuery(aggregate, childOutputs = aggregate.output)))
Project [a.*]
  LeftSemiJoin [b.id = a.id, b.id = a.id]
    Filter [DynamicPruningExpression(
              InSubqueryExec(a.id, 
                 SubqueryBroadcastExec(Project [b.id]
                                         Filter [b.id > 0]
                                           Relation [b.id]))
      Relation [a.*]
    Project [b.id]
      Filter [b.id > 0]
        Relation [b.id]

PlanDynamicPruningFilters优化规则的实现逻辑及分析:

case class PlanDynamicPruningFilters(sparkSession: SparkSession)
    extends Rule[SparkPlan] with PredicateHelper {

  /**
   * Identify the shape in which keys of a given plan are broadcasted.
   */
  private def broadcastMode(keys: Seq[Expression], output: AttributeSeq): BroadcastMode = {
    val packedKeys = BindReferences.bindReferences(HashJoin.rewriteKeyExpr(keys), output)
    HashedRelationBroadcastMode(packedKeys)
  }

  override def apply(plan: SparkPlan): SparkPlan = {
    if (!conf.dynamicPartitionPruningEnabled) {
      return plan
    }

    plan.transformAllExpressionsWithPruning(_.containsPattern(DYNAMIC_PRUNING_SUBQUERY)) {
      case DynamicPruningSubquery(
          value, buildPlan, buildKeys, broadcastKeyIndex, onlyInBroadcast, exprId) =>
        val sparkPlan = QueryExecution.createSparkPlan(
          sparkSession, sparkSession.sessionState.planner, buildPlan)
        // Using `sparkPlan` is a little hacky as it is based on the assumption that this rule is
        // the first to be applied (apart from `InsertAdaptiveSparkPlan`).
        val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty &&
          plan.exists {
            case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _) =>
              left.sameResult(sparkPlan)
            case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _) =>
              right.sameResult(sparkPlan)
            case _ => false
          }

        if (canReuseExchange) {
          // 只有当支持复用broadcast stage时,才能够应用DPP,因此这里会通过broadcast机制拿到
          // filtering plan的结果集,以在运行时对pruning plan(被裁剪的plan)的描述分区进一步删减
          val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, sparkPlan)
          val mode = broadcastMode(buildKeys, executedPlan.output)
          // plan a broadcast exchange of the build side of the join
          val exchange = BroadcastExchangeExec(mode, executedPlan)
          val name = s"dynamicpruning#${exprId.id}"
          // place the broadcast adaptor for reusing the broadcast results on the probe side
          val broadcastValues =
            SubqueryBroadcastExec(name, broadcastKeyIndex, buildKeys, exchange)
          DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
        } else if (onlyInBroadcast) {
          // it is not worthwhile to execute the query, so we fall-back to a true literal
          // 如果显示指定了只能利用broadcast实现DPP,同时整个计划树中不存在与filtering plan相同的
          // broadcast stage时,返回字面量true,表示dpp失效。
          DynamicPruningExpression(Literal.TrueLiteral)
        } else {
          // 如果不强制DPP只能依赖broadcast机制生效,同时DPP裁剪是有收益的,那么就改写SQL,构建一个子查询,
          // 采集filtering plan的与join key相关的distinct数据集,以便在运行时对prunning plan裁剪
          // we need to apply an aggregate on the buildPlan in order to be column pruned
          val alias = Alias(buildKeys(broadcastKeyIndex), buildKeys(broadcastKeyIndex).toString)()
          val aggregate = Aggregate(Seq(alias), Seq(alias), buildPlan)
          DynamicPruningExpression(expressions.InSubquery(
            Seq(value), ListQuery(aggregate, childOutputs = aggregate.output)))
        }
    }
  }
}

Push Down Dynamic Expression And Materialization

Strategy Name: DataSourceV2Strategy
从逻辑计划树转换为物理计划树的过程中,会将DPP过滤条件,下推到BatchScanExec算子,以便能够在生成RDD时(execution阶段)能够应用这些条件,过滤分区。

-- Before
-- case1: 支持exchange reuse,而且计划树中存在broadcast计划,且与filtering plan相同,即DPS,时
--        DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
-- case2: filtering plan只能被广播时
--        DynamicPruningExpression(Literal.TrueLiteral)
-- case3: 即使不能走broadcast,但裁剪有收益
--        DynamicPruningExpression(expressions.InSubquery(
--            Seq(value), ListQuery(aggregate, childOutputs = aggregate.output)))
Project [a.*]
  LeftSemiJoin [b.id = a.id, b.id = a.id]
    Filter [DynamicPruningExpression(
              InSubqueryExec(a.id, 
                 SubqueryBroadcastExec(Project [b.id]
                                         Filter [b.id > 0]
                                           Relation [b.id])))]
      Relation [a.*]
    Project [b.id]
      Filter [b.id > 0]
        Relation [b.id]
        
-- Physical Plan
-- DatasourceV2Strategy物化逻辑计划树时,会下推DynamicPruning类型的过滤表达式到BatchScanExec
-- 在这里就是DynamicPruningExpression,由于FilterExec只有一个表达式,因此会被完全消除
ProjectExec [a.*]
  BroadcastJoinExec [a.*] [b.id = a.id, b.id = a.id]
      -- Filter算子被消除了
      BatchScanExec [a.*] [runtimeFilters = DynamicPruningExpression(
              InSubqueryExec(a.id, 
                 SubqueryBroadcastExec(Project [b.id]
                                         Filter [b.id > 0]
                                           Relation [b.id])))]
    BroadcastExchangeExec
      ProjectExec [b.id]
        FilterExec [b.id > 0]
          BatchScanExec [b.id]

Pruning Partitions at Runtime

BatchScanExec::compute被调用时,即生成RDD时,才会应用DPP过滤。

/**
 * Physical plan node for scanning a batch of data from a data source v2.
 */
case class BatchScanExec(
    output: Seq[AttributeReference],
    @transient scan: Scan,
    runtimeFilters: Seq[Expression],
    keyGroupedPartitioning: Option[Seq[Expression]] = None) extends DataSourceV2ScanExecBase {

  @transient override lazy val inputPartitions: Seq[InputPartition] = batch.planInputPartitions()

  @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
    // 将DPP表达式转换成Spark统一的表达式,即Source Filter
    val dataSourceFilters = runtimeFilters.flatMap {
      case DynamicPruningExpression(e) => DataSourceStrategy.translateRuntimeFilter(e)
      case _ => None
    }

    if (dataSourceFilters.nonEmpty) {
      val originalPartitioning = outputPartitioning

      // the cast is safe as runtime filters are only assigned if the scan can be filtered
      // 在这里,如果Scan实例,确实支持了runtime filter的功能,那么会在运行时将DynamicPruningExpression下推到数据源
      val filterableScan = scan.asInstanceOf[SupportsRuntimeFiltering]
      // Scan::filter接口,提供了一个入口,方便用户将Source Filter按自己的需求,再次进行转换,
      // 例如Parquet Filter
      filterableScan.filter(dataSourceFilters.toArray)

      // call toBatch again to get filtered partitions
      // 生成最终的`InputPartition`集合,它们经过了dataSourceFilters洗礼。
      val newPartitions = scan.toBatch.planInputPartitions()

      originalPartitioning match {
        case p: KeyGroupedPartitioning =>
          if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) {
            throw new SparkException("Data source must have preserved the original partitioning " +
                "during runtime filtering: not all partitions implement HasPartitionKey after " +
                "filtering")
          }

          val newRows = new InternalRowSet(p.expressions.map(_.dataType))
          newRows ++= newPartitions.map(_.asInstanceOf[HasPartitionKey].partitionKey())
          val oldRows = p.partitionValuesOpt.get

          if (oldRows.size != newRows.size) {
            throw new SparkException("Data source must have preserved the original partitioning " +
                "during runtime filtering: the number of unique partition values obtained " +
                s"through HasPartitionKey changed: before ${oldRows.size}, after ${newRows.size}")
          }

          if (!oldRows.forall(newRows.contains)) {
            throw new SparkException("Data source must have preserved the original partitioning " +
                "during runtime filtering: the number of unique partition values obtained " +
                s"through HasPartitionKey remain the same but do not exactly match")
          }

          groupPartitions(newPartitions).get.map(_._2)

        case _ =>
          // no validation is needed as the data source did not report any specific partitioning
        newPartitions.map(Seq(_))
      }

    } else {
      partitions
    }
  }

  override lazy val inputRDD: RDD[InternalRow] = {
    if (filteredPartitions.isEmpty && outputPartitioning == SinglePartition) {
      // return an empty RDD with 1 partition if dynamic filtering removed the only split
      sparkContext.parallelize(Array.empty[InternalRow], 1)
    } else {
      new DataSourceRDD(
        sparkContext, filteredPartitions, readerFactory, supportsColumnar, customMetrics)
    }
  }

  override def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")
    inputRDD.map { r =>
      numOutputRows += 1
      r
    }
  }
}

扩展知识

DPP的设计实现

类结构

下图展示了Spark中与DPP相关的类定义,其中DynamicPruning是一个接口,标识了一个Logical Plan结点是不是DPP相关的。
为了能够完成DPP的功能,Spark实现了两个具体的表达式(Expression)类,DynamicPruningSubqueryDynamicPruningExpression

DynamicPruningSubquery

其中DynamicPruningSubquery维护了可以进行DPP的过滤条件的细节,如在前一节的SQL示例中提到的JOIN过滤条件a.id IN (SELECT id FROM b WHERE id = 1),因此它包含了一个子查询。

case class DynamicPruningSubquery(
    pruningKey: Expression,   // 被裁剪的JOIN侧的字段,如前面的SQL示例中提到的a.id字段
    buildQuery: LogicalPlan,  // 被广播的子查询
    buildKeys: Seq[Expression],  // 被广播的子查询对应的所有JOIN keys,如前面的SQL示例中提到的b.id
    broadcastKeyIndex: Int,   // 被广播的子查询的输出字段的索引,例如前面的SQL示例中的JOIN条件a.id = b.id,其中a.id对应于pruningKey,b.id对应于broadcastKey
    onlyInBroadcast: Boolean, // 用于标识过滤子查询的结果是否只能被Broadcast到JOIN的另一侧(被过滤侧)
    exprId: ExprId = NamedExpression.newExprId,
    hint: Option[HintInfo] = None)
  extends SubqueryExpression(buildQuery, Seq(pruningKey), exprId, Seq.empty, hint)
  with DynamicPruning
  with Unevaluable
  with UnaryLike[Expression]

DynamicPruningExpression

DynamicPruningExpression则是对DynamicPruningSubquery的封装和替代,维护的信息逻辑是是子查询的结果集,因此它与DynamicPruningSubquery类有前后关系。

// child成员变量,对应了DynamicPruningSubquery返回的结果集
case class DynamicPruningExpression(child: Expression)
  extends UnaryExpression
  with DynamicPruning

简单来说,Spark会在planning阶段,先收集可以进行DPP的信息,生成DynamicPruningSubquery结点;然后对DynamicPruningSubquery进行分析,按一定的规则可以DPP的逻辑计划。

生成DynamicPruningSubquery

在逻辑计划树中插入DynamicPruningSubquery结点,是通过PartitionPruning优化规则实现的,它被注册在SparkOptimizer的defaultBatches中,因此所有的Query都会尝试应用此规则。

object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with JoinSelectionHelper {
  private def prune(plan: LogicalPlan): LogicalPlan = {
    plan transformUp {
      // skip this rule if there's already a DPP subquery on the LHS of a join
      case j @ Join(Filter(_: DynamicPruningSubquery, _), _, _, _, _) => j
      case j @ Join(_, Filter(_: DynamicPruningSubquery, _), _, _, _) => j
      case j @ Join(left, right, joinType, Some(condition), hint) =>
        var newLeft = left
        var newRight = right

        // extract the left and right keys of the join condition
        val (leftKeys, rightKeys) = j match {
          case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, _, _, _) => (lkeys, rkeys)
          case _ => (Nil, Nil)
        }

        // checks if two expressions are on opposite sides of the join
        def fromDifferentSides(x: Expression, y: Expression): Boolean = {
          def fromLeftRight(x: Expression, y: Expression) =
            !x.references.isEmpty && x.references.subsetOf(left.outputSet) &&
              !y.references.isEmpty && y.references.subsetOf(right.outputSet)
          fromLeftRight(x, y) || fromLeftRight(y, x)
        }

        splitConjunctivePredicates(condition).foreach {
          case EqualTo(a: Expression, b: Expression)
              if fromDifferentSides(a, b) =>
            val (l, r) = if (a.references.subsetOf(left.outputSet) &&
              b.references.subsetOf(right.outputSet)) {
              a -> b
            } else {
              b -> a
            }

            // there should be a partitioned table and a filter on the dimension table,
            // otherwise the pruning will not trigger
            var filterableScan = getFilterableTableScan(l, left)
            if (filterableScan.isDefined && canPruneLeft(joinType) &&
                hasPartitionPruningFilter(right)) {
              newLeft = insertPredicate(l, newLeft, r, right, rightKeys, filterableScan.get)
            } else {
              filterableScan = getFilterableTableScan(r, right)
              if (filterableScan.isDefined && canPruneRight(joinType) &&
                  hasPartitionPruningFilter(left) ) {
                newRight = insertPredicate(r, newRight, l, left, leftKeys, filterableScan.get)
              }
            }
          case _ =>
        }
        Join(newLeft, newRight, joinType, Some(condition), hint)
    }
  }

  override def apply(plan: LogicalPlan): LogicalPlan = plan match {
    // Do not rewrite subqueries.
    case s: Subquery if s.correlated => plan
    case _ if !conf.dynamicPartitionPruningEnabled => plan
    case _ => prune(plan)
  }
}

Subquery(子查询)的定义及分类

依赖子查询

由于了表b上的子查询,包含了外部查询(这里指表a)的字段/列,因此Spark不会对这一类子查询应用动态裁剪优化规则。
其中a.id是一个类型为OuterReference的属性,因此它已经在外层的Query scope中被解析了;而b.id是一个类型为AttributeReference的属性,故这种有内、外依赖关系的查询,被称之为关联/依赖查询。

SELECT  *
FROM    a
WHERE   EXISTS (SELECT  *
                FROM    b
                WHERE   b.id > a.id)

非依赖子查询

内查询(表b上的查询)与外查询(表a上的查询)没有关联关系,因此Spark可以修改计划,应用动态裁剪功能优化规则。

SELECT  *
FROM    a
WHERE   EXISTS (SELECT  *
                FROM    b
                WHERE   b.id > 10)

几类常见的Subquery

Lateral
SELECT * FROM t LATERAL (SELECT * FROM u) uu
Exists
SELECT  *
FROM    a
WHERE   EXISTS (SELECT  *
                FROM    b
                WHERE   b.id > 10)
IN
SELECT  *
FROM    a
WHERE   a.id IN (SELECT  id
                 FROM    b)
Scala
SELECT (SELECT CURRENT_DATE())
Table Valued Function
SELECT * FROM my_tvf(TABLE (v1), TABLE (SELECT 1))