共计 2535 个字符,预计需要花费 7 分钟才能阅读完成。
背景
Join 是一个非常耗费资源耗费时间的操作,特别是数据量很大的情况下。一般流程上会涉及底层表的扫描 /shuffle/Join 等过程, 如果我们能够尽可能的在靠近源头上减少参与计算的数据,一方面可以提高查询性能,另一方面也可以减少资源的消耗(网络 /IO/CPU 等),在同样的资源的情况下可以支撑更多的查询。
目前在 SparkSQL 中有 Filter 下推优化,包括两个维度:
生成 Filter
SparkSQL 会从用户的 SQL 语句中获取到 Filter
-
直接显示获取
select * from A where a=1
生成 Filter(a=1) on A
-
隐式推断
select * from A, B where A.a = B.b and A.a=1
推断出 Filter(b=1) on B
Filter 优化
利用生成的 Filter 算子可以优化,比如:
- 将 Filter 尽量下推到靠近 DataSource 端
- 如果 Filter 中的列是分区列,可以提前对 DataSource 进行分区裁剪,只扫描需要的分区数据
Runtime Filter
是针对 Equi-Join 场景提出的一种新的生成 Filter 的方式,通过动态获取 Filter 内容来做相关优化。
Runtime Filter 原理
优化对象
Equi Join, 形如
select x,y from A join B on A.a = B.b
其中 A 是一个小表 (如维表),B 是一个大表(如事实表)
备注: A/ B 也可以是一个简单的子查询
优化思路
如上述小表 A 和大表 B 进行 Join,Join 条件为 A.a=B.b,实际 Join 过程中需要对大表进行全表扫描才能完成 Join 操作,极端情况下如 A.a 仅仅只有一条记录,也需要对 B 表全表扫描,影响性能。
如果在 B 表扫描之前,能获取 A 表的 a 的相关信息(如所有的 a 值,或者 a 的 min/max/Bloomfilter 等统计信息),并在实际执行 Join 之前将这些信息对 B 表的数据进行过滤,而不是全表扫描,可以大大提高性能。
两种场景
根据大表 B 参与 join 的 key(b)的属性,可以分别采集小表 A 参与 join 的 key(a)的信息:
b 是分区列
如上 b 为大表 B 的一个分区列,则可以提前收集A.a 列的所有值
,然后利用 A.a 的值对 B 表的 b 列进行分区裁剪
b 不是分区列
不能做分区裁剪,只能在实际数据扫描的过程中进行过滤。可以提前收集A.a 列的 min/max/Bloomfilter 的统计信息
,然后利用这些统计信息对 B 表进行数据过滤,这个过滤又可以分成两种粒度:
- 可下推到存储层,减少数据扫描
如底层文件格式是 Parquet/ORC, 可以将相关过滤谓词 (min/max 等) 下推到存储层面,从而减少实际扫描的数据。 - 扫描后数据过滤
不能下推到存储层的,可以在数据被扫描后做条件过滤,减少后续参与计算的数据量(如 shuffle/join 等)
Runtime Filter 实现
Runtime Filter 的实现主要在 Catalyst 中,分为 4 个步骤:
谓词合成
在用户 SQL 生成的逻辑执行计划树 (logical plan) 中,寻找满足条件的 Equi-Join 节点,然后根据上面的思路,在 Join 的大表 B 侧插入一个新的 Filter 节点,如Filter(In(b, Seq(DynamicValue(a, A))), B)
谓词下推
上面生成的新的 Filter 会经过 PushDownPredicate 的 Rule,尽量下推靠近 DataSource 附近
物理执行计划生成
该阶段会将上面下推的 Filter(In(b, Seq(DynamicValue(a, A))), B)
转换成物理节点(FilterExec),根据上面两种场景会生成两种不同的 FilterExec
-
b 是分区列
b 是分区列,采集的是 a 列的所有值, 如:case class DynamicPartitionPruneFilterExec(child: SparkPlan, collectors: Seq[(Expression, SparkPlan)]) extends DynamicFilterUnaryExecNode with CodegenSupport with PredicateHelper
其中 colletors 就是用于采集信息的 SparkPlan,因为要跑一个 SQL 来采集 a 列的所有值 (select a from A group by a
);
因为有可能会有多个分区列,所以这个地方是一个 Seq.
-
b 是非分区列
b 是非分区列,采集的是 a 列的 min/max/bloomfilter 统计信息,如case class DynamicMinMaxFilterExec(child: SparkPlan, collectors: Seq[(Expression, SparkPlan)]) extends DynamicFilterUnaryExecNode with CodegenSupport with PredicateHelper
同理上面 collectors 也是用户采集信息的 SparkPlan,如select min(a),max(a) from A
执行
在物理执行计划实际执行的过程中,会在 DynamicPartitionPruneFilterExec/DynamicMinMaxFilterExec 物理算子内先执行 collectors 获取到 a 列的相关信息,然后对底层 B 的执行计划进行改写,比如利用采集到的信息做分区裁剪 / 数据过滤等。
Runtime Filter 性能测试
以 TPC-DS 10TB 的 Query54 为例:
Runtime Filter 关闭
Runtime Filter 打开
经过 DynamicPartitionPruneFilter 对 catalog_sales 的分区进行了裁剪,实际对表的扫描从 14,327,953,968 减少到 136,107,053,然后经过 min/max 的过滤继续减少到 135,564,763; 另外 Runtime Filter 减少了大表的扫描,shuffle 的数据量以及参加 Join 的数据量,所以对整个集群 IO/ 网络 /CPU 有比较大的节省
总结
针对 Equi-Join 的场景,可以额外的采集小表侧的信息,然后在 Join 之前对大表进行分区裁剪或者扫描后过滤,从而提高查询性能,减少资源消耗。
本文作者:寒沙牧
阅读原文
本文为云栖社区原创内容,未经允许不得转载。