本文作者:朱亮堂,观远数据计算引擎开发工程师,13 年毕业于湖南大学,有七年多的大数据研发教训。曾就任于出名互联网公司数据研发专家岗位,负责打造服务公司外部各业务线的数据中台,是开源我的项目 byzer-lang(基于 Spark 的开源中台计算引擎)的次要贡献者之一。
背景
测试同学在外部环境测试发现一个重大的问题,ETL 执行后果不合乎预期,依据 Rawdata Date 字段筛选 等于 2022-07-12 筛选进去的数据是 7.11 号的,间接筛选 2022-07-11 进去的数据是空的。
排查过程
- 简化逻辑
ETL 流程过于宏大,首先须要花点工夫简化 ETL,尽量最小化的复现出问题,利于排查。
简化的办法:
- 如果 ETL 执行工夫较长,首先缩减数据集的大小,能够 limit 之后保留新的数据集,用新的数据集进行测试。
- 从头尾往两头缩减逻辑,直到不能复现问题。头部的缩减能够保留长期数据集,而后应用长期数据集进行计算,尾部的则间接预览上一个步骤的后果。
简化后的逻辑是两个数据集,一张 A 表,有日期类型字段 a,一张 B 表,有日期类型字段 a,b,用 sql 示意为:
select to_date(a) as a, to_date(b) as b from
(select to_date(a) as a, to_date(A) as b from
(select to_date(a) as a from A group by to_date(a)) t1
union all
select to_date(a) as a, to_date(b) as b from B group by to_date(a), to_date(b)) t2
group by to_date(a), to_date(b)
当然 job engine 失去的是脚本文件,这里写成 sql 更加直观看出逻辑,通过查看脚本,确定 BI 生成的逻辑是正确的,那问题是出在了 engine 层。
- spark 本地复现排查
本地复现
将简化后的脚本转化为 spark 算子或者 spark sql,测试环境是 spark 3.2.1,在 spark 3.2.1 版本复现了该问题,本地复现后就便于去 debug 调试。
test("test1") {
val sqlText =
"""
|select to_date(a) a, to_date(b) b from
|(select to_date(a) a, to_date(a) as b from
|(select to_date(a) a from
| values ('2020-02-01') as t1(a)
| group by to_date(a)) t3
|union all
|select to_date(a) a, to_date(b) b from
|(select to_date(a) a, to_date(b) b from
|values ('2020-01-01','2020-01-02') as t1(a, b)
| group by to_date(a), to_date(b)) t4) t5
|group by to_date(a), to_date(b)
|""".stripMargin
spark.sql(sqlText).show()}
期待的后果:
返回的后果:
能够看进去,所有数据的 b 取了 a 的值,接下来就是查看 spark 执行打算。
执行打算
先简略介绍下 Spark Catalyst 优化器。
一条 SQL 语句生成执行引擎可辨认的程序,就离不开解析(Parser)、优化(Optimizer)、执行(Execution)这三大过程。而 Catalyst 优化器在执行打算生成和优化的工作时候,它离不开本人外部的五大组件,如下所示:
- Parser:将 sql 语句利用 Antlr4 进行词法和语法的解析;
- Analyzer:次要利用 Catalog 信息将 Unresolved Logical Plan 解析成 Analyzed logical plan;
- Optimizer:利用一些 Rule(规定)将 Analyzed logical plan 解析成 Optimized Logical Plan,规定很多,如常见的谓词下推 PushDownPredicate、列裁剪 ColumnPruning、常量替换 ConstantPropagation、常量累加 ConstantFolding 等;
- Planner:后面的 logical plan 不能被 Spark 执行,而这个过程是把 logical plan 转换成多个 physical plans,而后利用代价模型(cost model)抉择最佳的 physical plan;
- Code Generation:这个过程会把 SQL 查问生成 Java 字 节码。
通过 explain(true) 办法能够拿到 sql 各阶段的执行打算,如下:
== Parsed Logical Plan ==
'Aggregate ['to_date('a),'to_date('b)], ['to_date('a) AS a#7,'to_date('b) AS b#8]
+- 'SubqueryAlias t5
+- 'Union false, false
:- 'Project ['to_date('a) AS a#1,'to_date('a) AS b#2]
: +- 'SubqueryAlias t3
: +- 'Aggregate ['to_date('a)], ['to_date('a) AS a#0]
: +- 'SubqueryAlias t1
: +- 'UnresolvedInlineTable [a], [[2020-02-01]]
+- 'Project ['to_date('a) AS a#5,'to_date('b) AS b#6]
+- 'SubqueryAlias t4
+- 'Aggregate ['to_date('a),'to_date('b)], ['to_date('a) AS a#3,'to_date('b) AS b#4]
+- 'SubqueryAlias t1
+- 'UnresolvedInlineTable [a, b], [[2020-01-01, 2020-01-02]]
== Analyzed Logical Plan ==
a: date, b: date
Aggregate [to_date(a#1, None), to_date(b#2, None)], [to_date(a#1, None) AS a#7, to_date(b#2, None) AS b#8]
+- SubqueryAlias t5
+- Union false, false
:- Project [to_date(a#0, None) AS a#1, to_date(a#0, None) AS b#2]
: +- SubqueryAlias t3
: +- Aggregate [to_date(a#9, None)], [to_date(a#9, None) AS a#0]
: +- SubqueryAlias t1
: +- LocalRelation [a#9]
+- Project [to_date(a#3, None) AS a#5, to_date(b#4, None) AS b#6]
+- SubqueryAlias t4
+- Aggregate [to_date(a#10, None), to_date(b#11, None)], [to_date(a#10, None) AS a#3, to_date(b#11, None) AS b#4]
+- SubqueryAlias t1
+- LocalRelation [a#10, b#11]
== Optimized Logical Plan ==
Aggregate [_groupingexpression#16, _groupingexpression#16], [_groupingexpression#16 AS a#7, _groupingexpression#16 AS b#8]
+- Union false, false
:- Aggregate [_groupingexpression#16], [_groupingexpression#16, _groupingexpression#16]
: +- LocalRelation [_groupingexpression#16]
+- Aggregate [_groupingexpression#17, _groupingexpression#18], [_groupingexpression#17, _groupingexpression#18]
+- LocalRelation [_groupingexpression#17, _groupingexpression#18]
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[_groupingexpression#16, _groupingexpression#16], functions=[], output=[a#7, b#8])
+- Exchange hashpartitioning(_groupingexpression#16, _groupingexpression#16, 200), ENSURE_REQUIREMENTS, [id=#40]
+- HashAggregate(keys=[_groupingexpression#16, _groupingexpression#16], functions=[], output=[_groupingexpression#16, _groupingexpression#16])
+- Union
:- HashAggregate(keys=[_groupingexpression#16], functions=[], output=[_groupingexpression#16, _groupingexpression#16])
: +- Exchange hashpartitioning(_groupingexpression#16, 200), ENSURE_REQUIREMENTS, [id=#33]
: +- HashAggregate(keys=[_groupingexpression#16], functions=[], output=[_groupingexpression#16])
: +- LocalTableScan [_groupingexpression#16]
+- HashAggregate(keys=[_groupingexpression#17, _groupingexpression#18], functions=[], output=[_groupingexpression#17, _groupingexpression#18])
+- Exchange hashpartitioning(_groupingexpression#17, _groupingexpression#18, 200), ENSURE_REQUIREMENTS, [id=#35]
+- HashAggregate(keys=[_groupingexpression#17, _groupingexpression#18], functions=[], output=[_groupingexpression#17, _groupingexpression#18])
+- LocalTableScan [_groupingexpression#17, _groupingexpression#18]
细看执行打算发现,Analyzed Logical Plan 是失常的,Optimized Logical Plan 显著有问题:
Aggregate [_groupingexpression#16, _groupingexpression#16], [_groupingexpression#16 AS a#7, _groupingexpression#16 AS b#8]
最外层的聚合后果 a,b 字段变成了雷同的表达式_groupingexpression#16,问题出在了 optimizer 的表达式转化。
3. 尝试寻找捷径
optimizer 波及的 rule 有大几十个,一一理解排查太慢,先尝试是否找到捷径。
先别离在 spark 3.0.1、spark 3.2.2,spark master 别离测试,发现只有 spark 3.0.1 是正确的后果,其余版本均呈现了谬误的后果,阐明这是 spark 3.0.1 后新引入的 bug,而且还未被修复。
而后通过 union、aggregate、attribute、alias 等关键字查找 spark 未解决的 issues,并未发现雷同的 bug,寻找捷径失败,提 issue 到社区,而后得持续剖析。
issue 地址:issues.apache.org/jira/browse…
4. 剖析 optimizer rules
optimizer 中,通过排除发现启用 PushProjectionThroughUnion、CollapseProject、SimplifyCasts、RemoveRedundantAliases 这四个规定会触发 bug。通过本地 debug,能够看到每个规定执行后失去的新的逻辑打算树。
Aggregate [_groupingexpression#14, _groupingexpression#15], [_groupingexpression#14 AS a#7, _groupingexpression#15 AS b#8]
+- Project [a#1, b#2, cast(a#1 as date) AS _groupingexpression#14, cast(b#2 as date) AS _groupingexpression#15]
+- Union false, false
:- Project [cast(a#0 as date) AS a#1, cast(a#0 as date) AS b#2]
: +- Aggregate [_groupingexpression#16], [_groupingexpression#16 AS a#0]
: +- Project [a#9, cast(a#9 as date) AS _groupingexpression#16]
: +- LocalRelation [a#9]
+- Project [cast(a#3 as date) AS a#5, cast(b#4 as date) AS b#6]
+- Aggregate [_groupingexpression#17, _groupingexpression#18], [_groupingexpression#17 AS a#3, _groupingexpression#18 AS b#4]
+- Project [a#10, b#11, cast(a#10 as date) AS _groupingexpression#17, cast(b#11 as date) AS _groupingexpression#18]
+- LocalRelation [a#10, b#11]
PushProjectionThroughUnion 前
Aggregate [_groupingexpression#14, _groupingexpression#15], [_groupingexpression#14 AS a#7, _groupingexpression#15 AS b#8]
+- Union false, false
:- Project [a#1, b#2, cast(a#1 as date) AS _groupingexpression#14, cast(b#2 as date) AS _groupingexpression#15]
: +- Project [cast(a#0 as date) AS a#1, cast(a#0 as date) AS b#2]
: +- Aggregate [_groupingexpression#16], [_groupingexpression#16 AS a#0]
: +- Project [a#9, cast(a#9 as date) AS _groupingexpression#16]
: +- LocalRelation [a#9]
+- Project [a#5, b#6, cast(a#5 as date) AS _groupingexpression#19, cast(b#6 as date) AS _groupingexpression#20]
+- Project [cast(a#3 as date) AS a#5, cast(b#4 as date) AS b#6]
+- Aggregate [_groupingexpression#17, _groupingexpression#18], [_groupingexpression#17 AS a#3, _groupingexpression#18 AS b#4]
+- Project [a#10, b#11, cast(a#10 as date) AS _groupingexpression#17, cast(b#11 as date) AS _groupingexpression#18]
+- LocalRelation [a#10, b#11]
PushProjectionThroughUnion 后
Aggregate [_groupingexpression#14, _groupingexpression#15], [_groupingexpression#14 AS a#7, _groupingexpression#15 AS b#8]
+- Union false, false
:- Aggregate [_groupingexpression#16], [cast(_groupingexpression#16 as date) AS a#1, cast(_groupingexpression#16 as date) AS b#2, cast(cast(_groupingexpression#16 as date) as date) AS _groupingexpression#14, cast(cast(_groupingexpression#16 as date) as date) AS _groupingexpression#15]
: +- Project [a#9, cast(a#9 as date) AS _groupingexpression#16]
: +- LocalRelation [a#9]
+- Aggregate [_groupingexpression#17, _groupingexpression#18], [cast(_groupingexpression#17 as date) AS a#5, cast(_groupingexpression#18 as date) AS b#6, cast(cast(_groupingexpression#17 as date) as date) AS _groupingexpression#19, cast(cast(_groupingexpression#18 as date) as date) AS _groupingexpression#20]
+- Project [a#10, b#11, cast(a#10 as date) AS _groupingexpression#17, cast(b#11 as date) AS _groupingexpression#18]
+- LocalRelation [a#10, b#11]
CollapseProject 后
Aggregate [_groupingexpression#14, _groupingexpression#15], [_groupingexpression#14 AS a#7, _groupingexpression#15 AS b#8]
+- Union false, false
:- Aggregate [_groupingexpression#16], [_groupingexpression#16 AS a#1, _groupingexpression#16 AS b#2, cast(_groupingexpression#16 as date) AS _groupingexpression#14, cast(_groupingexpression#16 as date) AS _groupingexpression#15]
: +- Project [a#9, cast(a#9 as date) AS _groupingexpression#16]
: +- LocalRelation [a#9]
+- Aggregate [_groupingexpression#17, _groupingexpression#18], [_groupingexpression#17 AS a#5, _groupingexpression#18 AS b#6, cast(_groupingexpression#17 as date) AS _groupingexpression#19, cast(_groupingexpression#18 as date) AS _groupingexpression#20]
+- Project [a#10, b#11, cast(a#10 as date) AS _groupingexpression#17, cast(b#11 as date) AS _groupingexpression#18]
+- LocalRelation [a#10, b#11]
SimplifyCasts 后
Aggregate [_groupingexpression#14, _groupingexpression#15], [_groupingexpression#14 AS a#7, _groupingexpression#15 AS b#8]
+- Union false, false
:- Aggregate [_groupingexpression#16], [_groupingexpression#16 AS a#1, _groupingexpression#16 AS b#2, cast(_groupingexpression#16 as date) AS _groupingexpression#14, cast(_groupingexpression#16 as date) AS _groupingexpression#15]
: +- Project [a#9, cast(a#9 as date) AS _groupingexpression#16]
: +- LocalRelation [a#9]
+- Aggregate [_groupingexpression#17, _groupingexpression#18], [_groupingexpression#17 AS a#5, _groupingexpression#18 AS b#6, cast(_groupingexpression#17 as date) AS _groupingexpression#19, cast(_groupingexpression#18 as date) AS _groupingexpression#20]
+- Project [a#10, b#11, cast(a#10 as date) AS _groupingexpression#17, cast(b#11 as date) AS _groupingexpression#18]
+- LocalRelation [a#10, b#11]
RemoveRedundantAliases 后
联合源码,理解到每个 rule 的作用:
- PushProjectionThroughUnion:将 Project 运算符推送到 Union all 运算符的两侧;
- CollapseProject:将两个 Project 运算符合二为一并执行别名替换,在以下状况下将表达式合并为一个表达式:1. 当两个我的项目操作员相邻时;2. 当两个 Project 算子之间有 LocalLimit/Sample/Repartition 算子并且下层 project 由雷同数量的列组成,相等或有别名;
- SimplifyCasts:删除不必要的强制转换,因为输出曾经是正确的类型;
- RemoveRedundantAliases:从查问打算中删除多余的别名。冗余别名是不会更改列的名称或元数据。
前三个是为了简化执行打算,以及给其余优化包含 RemoveRedundantAliases 做筹备工作,重点在于 RemoveRedundantAliases 和 Union.output,Union.output 重用了第一个孩子的输入,但显然 Union 和它的第一个孩子输入不同的值。RemoveRedundantAliases 会将 Union 删除别名,包含这种状况 Union(Project(a#1, a#1 as a#2, …), …),从而引发了谬误。
5. 修复 bug
晓得了起因,那就开始修复,我的解决是对 union 的第一个 child 不做 RemoveRedundantAliases 操作,修复了问题:
case _: Union =>
var first = true
plan.mapChildren { child =>
if (first) {
first = false
removeRedundantAliases(child, excluded ++ child.outputSet)
} else {removeRedundantAliases(child, excluded)
}
}
然而当 union 的第一个 child 中不蕴含抵触的 attribute 时,则会影响第一个 child 的 RemoveRedundantAliases 优化,略微影响些性能,不会影响性能。针对该问题,社区大佬正在解决中:https://github.com/apache/spa…。
总结
本文次要介绍了 Spark Optimizer 规定 RemoveRedundantAliases 引发的 Bug 的排查修复全过程,次要几步走,第一步简化逻辑,便于排查;第二步,本地复现,确定是 spark 本身的 bug,大抵定位出问题所属模块;第三步尝试寻找捷径;如第三部失败进行第四步,深入分析,修复 bug。