共计 5556 个字符,预计需要花费 14 分钟才能阅读完成。
作者介绍:黄建博,云计算畛域技术开发工程师;金灵,Shopee 软件研发工程师。
他们的队伍 huang-b 在性能比赛中斩获一等奖,本文将介绍 Shuffle 优化 TiDB 算子我的项目的设计与实际过程。
在咱们平常的印象中,分与合是一对矛盾的概念,然而这次较量留给咱们队伍一个很粗浅的印象是,分与合是一对互相促进的矛盾,只有干净利落地合成,能力高效地合并。这种印象一方面来自于咱们的参赛思路,咱们抉择的方向是应用 Shuffle 操作将算子的数据源拆分为多个独立的分区,而后通过并行计算来晋升整体的吞吐,优化的过程就是寻找更适合的合成形式,谋求更好的扩展性和计算性能,相干技术细节在注释中有具体介绍。另一方面,则是来自于咱们的参赛体验。
咱们的队伍有两位成员一位参谋,扩散在国内三座不同的城市,较量从开始到完结,咱们没有机会线下交换过,全都是以 slack 和文档的模式进行单干,咱们开玩笑说,在做一个分布式的较量的同时,咱们也是一支分布式的队伍。在这样一种受限的条件下,有两个因素对队伍的高效单干起了关键作用:其一是咱们的参谋对工作做了干净利落的拆分,同样的思路利用到两个不同类型的算子上,使得咱们能够花开两朵,各表一枝;其二就是 TiDB 整体高内聚低耦合的设计,从纵向的角度讲,就是 TiDB 清晰的分层设计使得咱们的优化能够只关注解析器和执行器层面,而不用深刻更底层的 TiKV 存储,从横向的角度讲,就是 TiDB 对分治和多态的充沛实际使得咱们能够只关注被优化的算子,而不用放心对其它算子产生副作用。
技术背景
咱们的优化思路是应用 Shuffle 算子来实现 MergeJoin 算子和 StreamAggregation 算子的并行化。Shuffle 算子最先在 PR https://github.com/pingcap/ti… 中引入,用于并行化 Window 算子。图 1 展现的就是 Window 算子的并行化过程。
图中左侧是串行的 Window 算子,因为 Window 算子要求输出数据有序,所以在数据源和 Window 之间通常有一个 Sort 算子。图中右侧展现的是对应的 Shuffle 算子,为了实现并行计算,Window 算子和 Sort 算子都被复制了多份,每一份与一 ShuffleWorker 绝对应,从数据源流入的数据由 Splitter 按哈希值拆分为独立的数据分区,发往不同的 ShuffleWorker,最终各个 Window 算子的后果汇总后输入。图中所画箭头就是数据流动的方向,其中数据的散发和后果的汇总是通过 go channel 来实现的,其它数据流动都是父节点通过调用子节点的 Next 办法来获取的。图中虚线示意启动的协程,每个 ShuffleWorker 都会启动一个协程来实现本身的运算,同时 Splitter 也会启动一个协程来实现数据的散发。
题目链接:
1. ShuffleMergeJoin:https://github.com/pingcap/ti…
2. ShuffleStreamAgg:https://github.com/pingcap/ti…
图 1 Window 算子并行化
ShuffleMergeJoin
扩大 Shuffle 算子
对 MergeJoin 做并行优化,是不是简略套用 ShuffleWindow 的框架就能够了呢?不是的,MergeJoin 算子与上文的 Window 算子不同,MergeJoin 须要两个数据源。那当初的 Shuffle 实现能不能让每个并行算子对应两个 ShuffleWorker,进而对应两个数据源呢?答案也是不能够,因为前文提到的 Shuffle 实现把数据分区和计算并行这两个性能适度耦合在一起了,这种适度耦合使得它无奈反对两个数据源。上面咱们对这个问题作具体阐明。
适度耦合指的是 ShuffleWorker 充当的角色太多,它既是数据流动的一环,同时也是计算并行的根本单元,于是带来了这两个问题:
1. 因为 ShuffleWorker 时数据分区,所以并行后的每个 MergeJoin 须要两个 ShuffleWorker 来接管来自两个数据源的数据,然而 ShuffleWorker 同时又是计算并行的根本单元,于是有 n 个 MergeJoin 算子就会呈现 2n 个协程,同一个 MergeJoin 算子的两个协程还会呈现数据竞争。
2. 管制逻辑简单,ShuffleWorker 作为数据的一个分区,它必须作为 Sort 算子的子节点,而它作为计算并行的根本单元,又必须在协程中调用 Window 算子的 Next 办法来实现计算,所以在原来的实现往 ShuffleWorker 里放了个指向 Window 算子的指针,这样的设计一方面存在毁坏执行树有向无环个性的隐患,另一方面也升高了代码的可读性。
当然,第一个问题咱们能够通过在 ShuffleWorker 中减少一个布尔变量来解决:同一个 MergeJoin 对应的两个 ShuffleWorker 一个为 true,一个为 false,只有为 true 的那个才会启动协程。可是这个办法无疑会使下面提到的简单的逻辑更加简单。
咱们提出的解决方案是把数据分区和计算并行解耦。 如图 2 所示:计算并行还是由 ShuffleWorker 负责,然而它不再是数据流动过程中的一环,它原来在数据流动过程中的地位由 ShuffleReceiver 来代替。MergeJoin 是 ShuffleWorker 的一个成员,每个 ShuffleWorker 对应一个协程,在协程中调用 MergeJoin 的 Next 办法,并将后果发送给汇总算子,这样上文中提到的两个问题都失去了解决。
图 2 拓展后的 Shuffle 算子
相干 PR:https://github.com/pingcap/ti…
实现与成果
在实现中咱们思考两个场景:其一是数据源自身无序的状况,这种状况下数据进入 MergeJoin 之前要先通过 Sort 节点;另一是数据源自身有序的状况,这种状况下数据进入 MergeJoin 之前无需排序。
图 3 数据源无序状况下的 ShuffleMergeJoin
图 3 展现的就是数据源无序状况下 MergeJoin 的并行化过程,这种状况 MergeJoin 和 Sort 算子的计算开销都能够摊派到多个协程上。启动 2 个 worker 的优化成果如表 1 所示,咱们在不同规模的数据源上都做了测试,表中前两列是两个数据源的行数,表中的后两列是串行和并行版本的运行性能,单位是 ns/op,越小性能越高。从表中能够看出,Shuffle 是能够显著减速 MergeJoin 运算的,并且数据量越大的状况下减速成果越好(因为并行化是会引入管道、协程等额定开销的,比拟大的数据量能力保障并行化的收益大于开销)。在咱们的几个测试案例中,成果最好的状况下 2 个 worker 的运算工夫仅为串行版本的 56.5%。
表 1 ShuffleMergeJoin 优化成果
图 4 展现的是数据源有序状况下 MergeJoin 的并行化过程,区别就是数据不再通过 Sort 算子。这种状况下计算的负载自身比拟轻量,相比之下依据哈希值来散发数据的 Splitter 就成为了零碎的性能瓶颈,并行化当前性能晋升并不显著。
图 4 数据源有序状况下的 ShuffleMergeJoin
相干 PR:
1. ShuffleMergeJoin 实现:https://github.com/pingcap/ti…
2. 控制参数:https://github.com/pingcap/ti…
3. 单元测试与性能测试:https://github.com/pingcap/ti…
ShuffleStreamAggregation
聚合运算是 SQL 语句必不可少的一部分,无论是 OLTP 还是 OLAP 场景,聚合都是常常被应用到的算子。
从零碎实现层面来看,聚合的实现个别有两种,第一种是基于 Hash 的办法,该办法通过构建 Hash table,保护每一个被聚合元素的值,计算失去最初的后果值。另外一种,则是基于有序数据流的办法,该办法要求输出数据源必须是有序的,而后通过遍历有序的数据流,并在同时保护相应的聚合值,即可失去最初的计算结果。
一般来说,基于 Hash 的办法具备更快的计算速度,然而它须要保护一个 Hash table,内存空间应用老本较高,当被聚合 key 的可能取值个数十分大的时候,那么相应 Hash table 中的元素个数也会十分多,对内存是个不小的考验,存在爆内存的危险,这反而导致计算不能失常地被实现。而基于有序数据流办法的聚合运算实现形式,无需随时都在内存中保护所有的被聚合 key 的值,因而内存使用量绝对较小,然而它的运行速度相对而言更慢一点,且更为严格地要求输出数据必须是有序的。如果能够晋升基于有序数据流办法的聚合算子的运行速度,那么该办法将会更加实用于大数据量的状况。因而咱们抉择对基于有序数据流办法的聚合运算实现办法,即 Stream Aggregation 进行并行减速,以晋升该算子的整体运行速度。
实现与成果
在具体的实现过程中,咱们利用了之前由其余社区贡献者提供的 Shuffle 算子,在 StreamAggregation 算子外围,将输出数据宰割成多个有序的输出数据流,别离输出到多个 StreamAggregation 算子当中,而后通过简略的整合,失去最初的计算结果。简略地说,初始输出数据源 DataSource, 首先会通过 Shuffle 算子,被宰割成多个 Partition,且每个 Partition 都是其外部有序的,而后每个 Partition 别离被作为一 StreamAggregation 算子的输出,生成局部后果,最初通过对雷同 key 的元素进行整合,即可失去最初的整体计算结果。
此处须要思考 DataSource 是否有序的状况,如果 DataSource 在被聚合 key 上是无序的,比方一般的 PhysicalTableReader 算子,或其余算子的输入,那么须要在被宰割之前,使得其有序,因而须要在其上增加一个 Sort 算子(如图 5 所示)。
图 5 数据源无序状况下的 ShuffleStreamAggregation
针对这种场景,咱们的办法最初获得了非常明显的性能晋升(如表 2 所示)。剖析认为,非并行的状况下,Sort 作用在整个 DataSource 之上,而并行化的版本是作用在每个不同的 Partition 之上,输出绝对较小,且并行执行,因而性能晋升较大。
表 2 ShuffleStreamAggregation 优化成果
另外,还须要思考 DataSource 在被聚合 key 上是有序的状况,比方上面的 SQL 语句,被聚合 key 为 b,且输出数据源 t 上刚好有由 b 创立的索引,因而在具体的计算过程中,DataSource 是基于 b 的 PhysicalIndexTableReader,那么咱们就无序引入 Sort 算子,间接将输出宰割成多个 Partition,而后通过图 6 所示的计算过程即可失去后果。
create table t(a int, b int, key b(b));
select /*+ stream_agg() */ count(a) from t group by b;
图 6 数据源有序状况下的 ShuffleStreamAggregation
通过 Benchmark 的结果表明,在该状况下,目前的基于 Shuffle 的实现,运行速度并没有失去晋升,反而有所降落,咱们浅显地认为,以后的 Shuffle 实现形式是瓶颈点,是后续须要被解决的重点。
相干 PR:
- https://github.com/pingcap/ti…
- https://github.com/pingcap/ti…
RangeSplitter
下面说了,Shuffle 算子会把数据输出宰割成多个 Partition,最开始的时候只有基于 Hash 办法的 Splitter 被实现,该实现对输出数据是否有序并不做要求。对于数据源有序的状况,只管该办法仍旧实用,然而应用基于 Range 的办法对数据源进行宰割,是一个更为天然的形式,因为被聚合 key 雷同的多行数据,必然是紧挨在以前的,如果能够间接找到这一块数据的起始点和完结点,整体一次性宰割,则无需构建 HashTable,也不必调用开销更为显著的 HashFunction,使得 Partiton 过程的开销更小。基于这一思路,咱们实现了 PartitionRangeSplitter,该办法的计算原理是,将紧挨一起的雷同聚合 key 的多行数据,批量地散发到一个 worker 之上。相较于基于 Hash 办法的 Partitioner 而言,基于 Range 办法的实现形式的开销更小,在同时解决有序输出数据源的状况下,应用 RangePartitioner 能比应用 HashPartitioner 快上一倍的速度(详见表 3),由此证明了该算子更加实用于数据源有序的状况。
表 3 RangeSplitter 与 HashSplitter 性能比拟
相干 PR:
1. RangeSplitter 实现:https://github.com/pingcap/ti…
2. 相干性能测试:https://github.com/pingcap/ti…
总结
在本次性能挑战大赛中,咱们应用 Shuffle 算子对 MergeJoin 算子和 Stream Aggregation 算子进行了减速,在数据源无序的场景下,获得了显著的性能晋升。在优化 MergeJoin 的过程中,为了适配多个数据源的算子,咱们对现有的 Shuffle 实现做了扩大,进步了可读性和可扩展性。在优化 StreamAggregation 的过程中,思考到数据源有序的状况,提出了一个简略的基于 Range 办法的 Splitter 实现,也证实了其有效性。咱们在后续将会思考如何对现有的 Shuffle 算子进行革新,打消其中存在的性能瓶颈,以期进一步晋升基于 Shuffle 的一系列并行算子的性能。