摘要:为了解决过多依赖 Hive 的问题, SparkSQL 应用了一个新的 SQL 优化器代替 Hive 中的优化器, 这个优化器就是 Catalyst。
本文分享自华为云社区《Spark 开源新个性:Catalyst 优化流程裁剪》,作者:hzjturbo 。
1. 问题背景
上图是典型的Spark Catalyst优化器的布局,一条由用户输出的SQL,到实在可调度执行的RDD DAG工作,须要经验以下五个阶段:
- Parser: 将SQL解析成相应的形象语法树(AST),spark也称为 Unresolved Logical Plan;
- Analyzer: 通过查找Metadata的Catalog信息,将 Unresolved Logical Plan 变为 Resolved Logical Plan,这个过程会做表、列、数据类型等做校验;
- Optimizer: 逻辑优化流程,通过一些优化规定对匹配上的Plan做转换,失去优化后的逻辑Plan
- Planner:依据Optimized Logical Plan的统计信息等转换成相应的Physical Plan
- Query Execution: 次要是执行前的一些preparations优化,比方AQE, Exchange Reuse, CodeGen stages合并等
上述的五个阶段中,除了Parser (由Antlr实现),其余的每个阶段都是由一个个规定(Rule)形成,总共大概有200+个,对于不同的规定,还可能须要跑屡次,所以对于绝对比较复杂的查问,可能失去一个executed Plan都须要消耗数秒。
Databricks外部基准测试表明,对于TPC-DS查问,每个查问均匀调用树转换函数约280k次,这远远超出了必要的范畴。因而,咱们摸索在每个树节点中嵌入BitSet,以传递本身及其子树的信息,并利用打算不变性来修剪不必要的遍历。通过原型实现验证:在TPC-DS基准测试中,咱们看到优化的速度约为50%,剖析的速度约为30%,整个查问编译的速度约为34%(包含Hive元存储RPC和文件列表)[1]。
2. 设计实现
2.1 Tree Pattern Bits and Rule Id Bits
Tree pattern bits
在TreeNode 减少nodePatterns属性,所有继承该类的节点能够通过复写该属性值来标识本人的属性。
/** * @return a sequence of tree pattern enums in a TreeNode T. It does not include propagated * patterns in the subtree of T. */protected val nodePatterns: Seq[TreePattern] = Seq()
TreePattern 是一个枚举类型, 对于每个节点/表达式都能够为其设置一个TreePattern不便标识,具体可见 TreePatterns.scala 。
例如对于Join节点的nodePatterns:
override val nodePatterns : Seq[TreePattern] = { var patterns = Seq(JOIN) joinType match { case _: InnerLike => patterns = patterns :+ INNER_LIKE_JOIN case LeftOuter | FullOuter | RightOuter => patterns = patterns :+ OUTER_JOIN case LeftSemiOrAnti(_) => patterns = patterns :+ LEFT_SEMI_OR_ANTI_JOIN case NaturalJoin(_) | UsingJoin(_, _) => patterns = patterns :+ NATURAL_LIKE_JOIN case _ => } patterns}
Rule ID bits
将规定ID的缓存BitSet嵌入到每个树/表达式节点T中,这样咱们就能够跟踪规定R对于根植于T的子树是无效还是有效。这样,如果R在T上被调用,并且已知R有效,如果R再次利用于T(例如,R位于定点规定批处理中),咱们能够跳过它。这个想法最后被用于Cascades optimizer,以放慢探索性布局。
Rule:
abstract class Rule[TreeType <: TreeNode[_]] extends SQLConfHelper with Logging { // The integer id of a rule, for pruning unnecessary tree traversals. protected lazy val ruleId = RuleIdCollection.getRuleId(this.ruleName)
TreeNode:
/** * A BitSet of rule ids to record ineffective rules for this TreeNode and its subtree. * If a rule R (which does not read a varying, external state for each invocation) is * ineffective in one apply call for this TreeNode and its subtree, R will still be * ineffective for subsequent apply calls on this tree because query plan structures are * immutable. */private val ineffectiveRules: BitSet = new BitSet(RuleIdCollection.NumRules)
2.2 Changes to The Transform Function Family
革新后的transform 办法相比之前的多了两个判断,如下所示
def transformDownWithPruning( cond: TreePatternBits => Boolean, // 判断是否存在可优化的节点,由规定设计者所提供 ruleId: RuleId = UnknownRuleId // 不会失效的规定ID,自动更新 )(rule: PartialFunction[BaseType, BaseType]): BaseType = { // 如果上述两个条件存在一个不满足,间接跳过本次规定 if (!cond.apply(this) || isRuleIneffective(ruleId)) { return this } // 执行rule的逻辑 val afterRule = CurrentOrigin.withOrigin(origin) { rule.applyOrElse(this, identity[BaseType]) } // Check if unchanged and then possibly return old copy to avoid gc churn. if (this fastEquals afterRule) { val rewritten_plan = mapChildren(_.transformDownWithPruning(cond, ruleId)(rule)) // 如果没失效,把规定ID退出到不失效的BitSet里 if (this eq rewritten_plan) { markRuleAsIneffective(ruleId) this } else { rewritten_plan } } else { // If the transform function replaces this node with a new one, carry over the tags. afterRule.copyTagsFrom(this) afterRule.mapChildren(_.transformDownWithPruning(cond, ruleId)(rule)) }}
2.3 Changes to An Individual Rule
规定的例子:
object OptimizeIn extends Rule[LogicalPlan] with SQLConfHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform ({ case q: LogicalPlan => q transformExpressionsDown ({ case In(v, list) if list.isEmpty => ... case expr @ In(v, list) if expr.inSetConvertible => ... }, _.containsPattern(IN), ruleId) // 必须蕴含IN }, _.containsPattern(IN), ruleId) // 必须蕴含IN}
3. 测试后果
在Delta中应用TPC-DS SF10对TPC-DS查问编译工夫进行了基准测试。后果如下:
- 图1显示了查问编译速度;
- 表1显示了几个要害树遍历函数的调用计数和CPU缩小的细分。
我简略运行了开版本的TPCDSQuerySuite,该测试会把TPCDS的语句解析优化,并且查看下生成的代码(CodeGen),均匀耗时的工夫为三次运行失去的最优值, 失去的后果如下:
- 合入PR前[2], 蕴含156个Tpcds查问,均匀总耗时~56s
- 最新Spark开源代码,蕴含150个Tpcds查问,均匀总耗时~19s
之所以最新的Tpcds查问比合入PR前的条数少6条,是因为后续有个缩小反复TPCDS的PR。总时长优化前是优化后的两倍多。
参考援用
[1]. [SPARK-34916] Tree Traversal Pruning for Catalyst Transform/Resolve Function Families. SISP
[2]. SPARK-35544 Add tree pattern pruning to Analyzer rules.
[3]. Building a SIMD Supported Vectorized Native Engine for Spark SQL. link
点击关注,第一工夫理解华为云陈腐技术~