共计 7200 个字符,预计需要花费 18 分钟才能阅读完成。
为什么须要窗口函数?
Window 是一个罕用且重要的性能,PolarDB- X 作为一款分布式数据库,天然也反对了窗口函数。对于业务开发来讲,其能够大大简化业务 SQL 的设计,比方分组排序功能,如果反对窗口函数,则只需应用排序函数即可,例子如下。例:我当初有一张表,蕴含学生姓名,学生班级,学生问题,当初请你帮我写一条 SQL,实现对每个班级内的同学进行排名的需要?有窗口函数时:
SELECT
student_name,
class_name,
score,
DENSE_RANK() OVER (PARTITION BY class_name ORDER BY score DESC) AS rank
FROM student_scores
ORDER BY class_name, rank ASC;
无窗口函数时:就须要写比较复杂的 SQL,感兴趣的同学能够自行尝试或者网上搜寻一下如何写这样的 SQL(或者这也可能在面试中被问到:))。
窗口函数是什么?
实质上 window 是一种 aggregation,然而不同于 agg 的是,agg 要进行聚合的是该分组内的所有记录,每个分组也只会输入一行记录,而 window 则能够管制对于每一行来讲,想要聚合的记录到底是哪些,当然这种管制也是通过规定进行束缚的,输入的记录行数等于输出的记录行数,上面贴了一张图,应该还比较清楚:)。
上图中的 partitioin by 与大家常写的 SQL 中的 group by 根本等价,比拟容易了解,不再赘述。咱们来开展介绍一下 frame 是个什么样的货色,如前所述,frame 管制的是进行 partition 分区后,在该 partition 分区内,该行应该抉择哪些行进行聚合运算。具体来讲,咱们是通过 between 和 and 来指定咱们心愿框定向前和向后的哪些行进行聚合运算的,而指定形式也无非是行数,以后行以及不做限度,据此咱们能够将 frame 分为四类,如下图所示。
实际上,将 frame 划分为不同的类型,能够领导咱们依据不同的 frame 类型进行不同的优化,其次要目标是为了防止反复计算。比方对于 unbounded preceding and unbounded following 类型,显然咱们只需对该分组进行一次计算即可,该分组的后续记录可间接应用该后果。而对于 sliding frame 则不能这样解决,其解决要简单一些,对于每一行,咱们都须要找出该行所对应的框定行,而后对这些行进行聚合运算。进一步的,frame 能够分为两类,row 模式和 range 模式,row 模式寻找边界的根据是行数,而 range 模式的根据则是值。咱们拿一个例子进去,看下 row 模式和 range 模式的区别吧。如下图所示,其 frame 定义均为 between current row and current row,然而在 row 模式和 range 模式下,其选中的行并不相同。
在上述的介绍中,咱们没有介绍每个分区内的 order by 字段,其实一个残缺的 window 的定义蕴含 partition by, order by 和 frame specification。但 order by 了解起来也比较简单,顾名思义,order by 即指定对于每个分区内的行,该当依照什么程序进行排序,frame 中的向前向后多少行也是基于该排序后的汇合。Q:抛一个问题,有趣味的敌人能够思考一下,向前向后的行肯定是间断的,这是为什么呢?比方 range 模式下如何保障这一点?
窗口函数的设计与实现
窗口函数可能不是那么容易了解,所以咱们在后面进行了比拟多的介绍,当初咱们终于来到了设计与实现局部。
如何执行窗口函数?
咱们以一条 SQL 为例吧,如下所示,partition 字段为 c1,排序字段为 c2,frame 定义为 rows between 1 preceding and 1 following。
select
c1,
c2,
avg(c2) over (
partition by c1
order by c2
rows between 1 preceding and 1 following
)
from t;
首先,关键点 1,c1 字段雷同的记录该当被搁置在一起(shuffle);其次,关键点 2,当咱们对 c1 + c2 进行排序时,即可辨认每行属于哪个分区以及该行的相干行是哪些。据此咱们来开展介绍一下优化器和执行器的设计。
优化器
咱们能够把优化器中的相干规定分为生成规定与优化规定,所谓生成规定,即用来确保 window 可能被正确的辨认和转换,而优化规定则是为了优化某些场景下带有窗口函数的 SQL。
生成规定
生成规定次要蕴含三条,project 如何生成 logical window (ProjectToLogicalProjectAndWindowRule),logical window 如何转换为执行时所需的 sort window (LogicalWindowToSortWindowRule),如何让 sort window 并行起来 (MppSortWindowConvertRule)。在 project 如何生成 logical window 中,并没有太多要聊的货色,如何辨认和转换能够间接查看相干源码,咱们次要来聊一个有意思的问题,还是拿个 SQL 来举例子吧,如下。
select
c1,
c2,
avg(c2) over (
partition by c1
order by c2
rows between 1 preceding and 1 following
),
sum(c3) over (
partition by c1
order by c2
rows between 1 preceding and 1 following
)
from t;
Q:上述 SQL 应该生成几个 window?A:生成两个是最简略的,但其 window 的定义是雷同的,所以现实状况下应该只须要生成一个 window,window 外面蕴含 avg 和 sum 函数就好了。代码如下所示,如果这两个 window 定义雷同时,会被压到一个 window 中。
final List<Pair<RexWindow, Set<Integer>>> windowToIndices = new ArrayList<>();
for (int i = 0; i < exprs.size(); ++i) {final RexNode expr = exprs.get(i);
if (expr instanceof RexOver) {final RexOver over = (RexOver) expr;
// If we can found an existing cohort which satisfies the two conditions,
// we will add this RexOver into that cohort
boolean isFound = false;
for (Pair<RexWindow, Set<Integer>> pair : windowToIndices) {if (pair.left.equals(over.getWindow())) {pair.right.add(i);
isFound = true;
break;
}
}
// This RexOver cannot be added into any existing cohort
if (!isFound) {final Set<Integer> newSet = Sets.newHashSet(i);
windowToIndices.add(Pair.of(over.getWindow(), newSet));
}
}
}
Q:其实这外面有可供进一步优化的场景,实质上优化器和执行器须要更严密的配合,感兴趣的敌人能够 debug 一下。接下来咱们来看一下 logical window 如何转换为执行时所需的 sort window。外围在于咱们须要将 sort 的属性退出到 window 中,因为 logical window 自身是没有排序属性的,这里咱们须要 window 的输出是依照 partition column + sort column 有序的,同时 sort window 也领有此程序。
RelCollation relCollation = RelCollations.EMPTY;
if (groupSets.cardinality() + orderKeys.size() > 0) {relCollation = CBOUtil.createRelCollation(sortFields, orderKeys);
}
// change trait set of input
RelNode newInput = convert(input, input.getTraitSet().replace(DrdsConvention.INSTANCE).replace(relCollation));
// change trait set of window
SortWindow newWindow =
SortWindow.create(window.getTraitSet().replace(DrdsConvention.INSTANCE).replace(relCollation),
newInput,
window.getConstants(),
window.groups,
window.getRowType(),
window.getFixedCost());
仔细的敌人会发现,咱们在批改排序属性之外,还将 window 的 convention 批改为了 DrdsConvention,感兴趣的敌人能够思考一下为什么?此外,咱们退出了排序属性之后,如果其 input 自身并不满足排序属性的要求时,是在哪里插入排序的算子的呢,答案是 DrdsConvetion.enforce 办法,相干代码如下。
RelCollation toCollation = required.getTrait(RelCollationTraitDef.INSTANCE);
RelDistribution toDistribution = required.getTrait(RelDistributionTraitDef.INSTANCE);
if (!RuleUtils.satisfyCollation(toCollation, input)) {RelTraitSet emptyTraitSet = input.getCluster().getPlanner().emptyTraitSet();
MemSort memSort = MemSort.create(emptyTraitSet.replace(DrdsConvention.INSTANCE).replace(toCollation).replace(toDistribution),
input,
toCollation);
return memSort;
} else {return input;}
最初,咱们来看一下如何将 sort window 并行起来,外围是咱们当初要加上 distribution 的属性了,以便可能充沛的并行,代码如下。
boolean noPartition = keys.size() == 0;
// for exchange(shuffle)
RelDistribution relDistribution =
noPartition ? RelDistributions.SINGLETON : RelDistributions.hash(groupSet);
RelCollation relCollation =
sortWindow.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
input = convert(input, input.getTraitSet()
.replace(MppConvention.INSTANCE)
.replace(relDistribution)
.replace(relCollation));
SortWindow newSortWindow =
sortWindow.copy(sortWindow.getTraitSet()
.replace(MppConvention.INSTANCE)
.replace(relDistribution),
Arrays.asList(input)
);
优化规定
所有的优化规定起码要答复三个问题,为什么可能优化,在哪些场景中可能应用,在可能应用的场景中这种优化是否总是正向的。相干的规定次要包含,ProjectWindowTransposeRule,FilterWindowTransposeRule 和 CBOJoinWindowTransposeRule。ProjectWindowTransposeRule 用于将 project 尽可能下压到 window 上面,以便尽早过滤不须要的列,须要留神的是 project 中下层窗口函数计算结果的列显然不能推下去。
FilterWindowTransposeRule 用于将 filter 尽可能下压到 window 上面,以便尽早过滤不须要的记录。这外面的外围有两个,首先,咱们须要将 filter 中的 condition 合成为应用 and 连贯的 condition 列表,比方 c1 = 1 and (c2 > 1 or c2 > 2) -> List{c1=1, c2 > 1 or c3 > 2}。其次,对上述 list 中的 condition 进行循环判断,当 window 中用于 partition 的列须要蕴含该 condition 中的所有列时,该 condition 能够被推到 window 上面,否则不行,代码如下。
// decompose condition by AND
final List<RexNode> conditions =
RelOptUtil.conjunctions(filterRel.getCondition());
for (RexNode condition : conditions) {ImmutableBitSet rCols = RelOptUtil.InputFinder.bits(condition);
if (window.keys.contains(rCols)) {
pushedConditions.add(condition.accept(new RelOptUtil.RexInputConverter(rexBuilder,
origFields,
window.getInput(0).getRowType().getFieldList(),
adjustments)));
} else {remainingConditions.add(condition);
}
}
CBOJoinWindowTransposeRule 用于判断是否须要将 join 和 window 进行调换,筹备来讲,其匹配的模式是 join 的右侧为 filter,同时 filter 的输出是 window,如下所示。
public static final CBOJoinWindowTransposeRule INSTANCE =
new CBOJoinWindowTransposeRule(
operand(LogicalJoin.class,
operand(RelNode.class, any()),
operand(LogicalFilter.class, operand(LogicalWindow.class, any()))
),
RelFactories.LOGICAL_BUILDER,
"INSTANCE");
转换前后的子树结构如下图所示,并非对于所有 SQL,左边的子树都更优,这外面的外围在于 join 的过滤性与 filter 的过滤性。如果 join 的过滤性十分好,则左边可能会更优,因为通过 join 后,输出到 window 中的记录数被大大削减。不过利用该规定须要比拟小心,join 的右边列必须要能保障全局惟一,否则通过 join 后,输出到 window 中的相干的右表的记录数会被放大,这就不满足原始的语义了,同时 join 应该为等值 join 并且 join key 与 window 的 partition key 雷同。
执行器
window 算子接管到的输出是依照 partition key + sort column 排好序的,所以咱们要做的就是,找出每行记录对应的分区,而后依据目前缓存的记录和 frame 的定义,计算可能计算的所有行,如果须要输入,则向下层吐数据,否则持续承受下一批输出。当然,因为状况还比拟多,所以有一些细节须要思考,具体可参考 OverWindowFramesExec,也可对照下图进行了解。
其次是如何接入异步执行框架,因为这并不是一个通用的需要,并且咱们还没有进行异步执行框架的源码解读,开展介绍不容易讲清楚,意义也不太大。最初,咱们来聊两个细节的优化吧。第一个优化的出发点是针对非凡场景进行优化,即是否在所有场景下,咱们都须要缓存数据,实际上只有窗口不会蕴含以后行的后续行,就不须要缓存数据,详情可见下图,这部分的解决在 NonFrameOverWindowExec 算子中。
第二个优化的出发点是尽量避免当窗口滑动时,窗口函数须要全副从新计算。如下图所示,当咱们从右边的窗口滑动至左边的窗口时,咱们只须要把新增的记录放入窗口函数中计算即可,不用全副从新计算。
咱们再来看一个略微更简单一点的滑动窗口,此时并非只有新增的记录,也有移除的记录,做增量的计算就变得更加简单了。如下图所示,窗口中新增了 y,然而移除了 x,在这种状况下是否须要全量的从新计算取决于聚合函数的类型,比方 sum 是能够的,然而 bit_and 或者 max 之类的就是不行的,或者说没有那么容易,而且在这种状况下这种优化的成果是否有比拟好的成果取决于窗口的大小。更加简单一点的优化,感兴趣的敌人能够搜寻线段树。
总结
在本文中,咱们首先介绍了 window 是什么,后续通过举例的形式,别离从优化器和执行器方面对窗口函数的设计要点进行了介绍。
作者:越寒
原文链接
本文为阿里云原创内容,未经容许不得转载。