关于spark:Spark-开源新特性Catalyst-优化流程裁剪

8次阅读

共计 4042 个字符,预计需要花费 11 分钟才能阅读完成。

摘要:为了解决过多依赖 Hive 的问题, SparkSQL 应用了一个新的 SQL 优化器代替 Hive 中的优化器, 这个优化器就是 Catalyst。

本文分享自华为云社区《Spark 开源新个性:Catalyst 优化流程裁剪》,作者:hzjturbo。

1. 问题背景

上图是典型的 Spark Catalyst 优化器的布局,一条由用户输出的 SQL,到实在可调度执行的 RDD DAG 工作,须要经验以下五个阶段:

  • Parser: 将 SQL 解析成相应的形象语法树(AST),spark 也称为 Unresolved Logical Plan;
  • Analyzer: 通过查找 Metadata 的 Catalog 信息,将 Unresolved Logical Plan 变为 Resolved Logical Plan,这个过程会做表、列、数据类型等做校验;
  • Optimizer: 逻辑优化流程,通过一些优化规定对匹配上的 Plan 做转换,失去优化后的逻辑 Plan
  • Planner:依据 Optimized Logical Plan 的统计信息等转换成相应的 Physical Plan
  • Query Execution: 次要是执行前的一些 preparations 优化,比方 AQE, Exchange Reuse, CodeGen stages 合并等

上述的五个阶段中,除了 Parser (由 Antlr 实现),其余的每个阶段都是由一个个规定 (Rule) 形成,总共大概有 200+ 个,对于不同的规定,还可能须要跑屡次,所以对于绝对比较复杂的查问,可能失去一个 executed Plan 都须要消耗数秒。

Databricks 外部基准测试表明,对于 TPC-DS 查问,每个查问均匀调用树转换函数约 280k 次,这远远超出了必要的范畴。因而,咱们摸索在每个树节点中嵌入 BitSet,以传递本身及其子树的信息,并利用打算不变性来修剪不必要的遍历。通过原型实现验证:在 TPC-DS 基准测试中,咱们看到优化的速度约为 50%,剖析的速度约为 30%,整个查问编译的速度约为 34%(包含 Hive 元存储 RPC 和文件列表)[1]。

2. 设计实现

2.1 Tree Pattern Bits and Rule Id Bits

  • Tree pattern bits

在 TreeNode 减少 nodePatterns 属性,所有继承该类的节点能够通过复写该属性值来标识本人的属性。

/**
 * @return a sequence of tree pattern enums in a TreeNode T. It does not include propagated
 *         patterns in the subtree of T.
 */
protected val nodePatterns: Seq[TreePattern] = Seq()

TreePattern 是一个枚举类型,对于每个节点 / 表达式都能够为其设置一个 TreePattern 不便标识,具体可见 TreePatterns.scala。

例如对于 Join 节点的 nodePatterns:

override val nodePatterns : Seq[TreePattern] = {var patterns = Seq(JOIN)
  joinType match {
    case _: InnerLike => patterns = patterns :+ INNER_LIKE_JOIN
    case LeftOuter | FullOuter | RightOuter => patterns = patterns :+ OUTER_JOIN
    case LeftSemiOrAnti(_) => patterns = patterns :+ LEFT_SEMI_OR_ANTI_JOIN
    case NaturalJoin(_) | UsingJoin(_, _) => patterns = patterns :+ NATURAL_LIKE_JOIN
    case _ =>
  }
  patterns
}
  • Rule ID bits

将规定 ID 的缓存 BitSet 嵌入到每个树 / 表达式节点 T 中,这样咱们就能够跟踪规定 R 对于根植于 T 的子树是无效还是有效。这样,如果 R 在 T 上被调用,并且已知 R 有效,如果 R 再次利用于 T(例如,R 位于定点规定批处理中),咱们能够跳过它。这个想法最后被用于 Cascades optimizer,以放慢探索性布局。

Rule:

abstract class Rule[TreeType <: TreeNode[_]] extends SQLConfHelper with Logging {

  // The integer id of a rule, for pruning unnecessary tree traversals.
  protected lazy val ruleId = RuleIdCollection.getRuleId(this.ruleName)

TreeNode:

/**
 * A BitSet of rule ids to record ineffective rules for this TreeNode and its subtree.
 * If a rule R (which does not read a varying, external state for each invocation) is
 * ineffective in one apply call for this TreeNode and its subtree, R will still be
 * ineffective for subsequent apply calls on this tree because query plan structures are
 * immutable.
 */
private val ineffectiveRules: BitSet = new BitSet(RuleIdCollection.NumRules)

2.2 Changes to The Transform Function Family

革新后的 transform 办法相比之前的多了两个判断,如下所示

def transformDownWithPruning(
  cond: TreePatternBits => Boolean, // 判断是否存在可优化的节点,由规定设计者所提供
  ruleId: RuleId = UnknownRuleId // 不会失效的规定 ID,自动更新
    )(rule: PartialFunction[BaseType, BaseType]): BaseType = {
  // 如果上述两个条件存在一个不满足,间接跳过本次规定
  if (!cond.apply(this) || isRuleIneffective(ruleId)) {return this}
  // 执行 rule 的逻辑
  val afterRule = CurrentOrigin.withOrigin(origin) {rule.applyOrElse(this, identity[BaseType])
  }

  // Check if unchanged and then possibly return old copy to avoid gc churn.
  if (this fastEquals afterRule) {val rewritten_plan = mapChildren(_.transformDownWithPruning(cond, ruleId)(rule))
    // 如果没失效,把规定 ID 退出到不失效的 BitSet 里
    if (this eq rewritten_plan) {markRuleAsIneffective(ruleId)
      this
    } else {rewritten_plan}
  } else {
    // If the transform function replaces this node with a new one, carry over the tags.
    afterRule.copyTagsFrom(this)
    afterRule.mapChildren(_.transformDownWithPruning(cond, ruleId)(rule))
  }
}

2.3 Changes to An Individual Rule

规定的例子:

object OptimizeIn extends Rule[LogicalPlan] with SQLConfHelper {def apply(plan: LogicalPlan): LogicalPlan = plan transform ({
   case q: LogicalPlan => q transformExpressionsDown ({case In(v, list) if list.isEmpty => ...
     case expr @ In(v, list) if expr.inSetConvertible => ...
   }, _.containsPattern(IN), ruleId) // 必须蕴含 IN
 }, _.containsPattern(IN), ruleId) // 必须蕴含 IN
}

3. 测试后果

在 Delta 中应用 TPC-DS SF10 对 TPC-DS 查问编译工夫进行了基准测试。后果如下:

  • 图 1 显示了查问编译速度;
  • 表 1 显示了几个要害树遍历函数的调用计数和 CPU 缩小的细分。

我简略运行了开版本的 TPCDSQuerySuite,该测试会把 TPCDS 的语句解析优化,并且查看下生成的代码(CodeGen),均匀耗时的工夫为三次运行失去的最优值,失去的后果如下:

  • 合入 PR 前[2],蕴含 156 个 Tpcds 查问,均匀总耗时~56s
  • 最新 Spark 开源代码,蕴含 150 个 Tpcds 查问,均匀总耗时~19s

之所以最新的 Tpcds 查问比合入 PR 前的条数少 6 条,是因为后续有个缩小反复 TPCDS 的 PR。总时长优化前是优化后的两倍多。

参考援用

[1]. [SPARK-34916] Tree Traversal Pruning for Catalyst Transform/Resolve Function Families. SISP

[2]. SPARK-35544 Add tree pattern pruning to Analyzer rules.

[3]. Building a SIMD Supported Vectorized Native Engine for Spark SQL. link

点击关注,第一工夫理解华为云陈腐技术~

正文完
 0