简介
OpenMLDB是针对AI场景优化的开源数据库我的项目,实现了数据与计算一致性的离线MPP场景和在线OLTP场景计算引擎。MPP引擎可基于Spark实现,并通过拓展Spark源码实现数倍性能晋升。本文次要解释OpenMLDB如何基于Spark来解决窗口数据的歪斜问题。
背景
数据歪斜是在大数据处理场景下常见的一种景象,它由某一分区数据量过大造成。数据歪斜会导致歪斜分区与其余分区的运算工夫产生微小差距,换句话说就是歪斜数据分区的计算工作与其cpu资源重大不匹配。最终会造成多等一的状况——多个小数据量的分区计算结束后期待歪斜的大数据量分区,只有歪斜分区计算结束能力输入后果。这对效率来说是微小的劫难。
在机器学习的特色计算中,波及到很多的窗口计算。在窗口计算下,如果呈现繁多key数据量过大,也会导致某一分区数据过多,从而产生数据歪斜问题。而传统数据歪斜中分区优化的计划,如:数据加前缀再分区,是不适宜窗口计算场景的。它会导致窗口计算场景下最终计算结果谬误。因而OpenMLDB提出了一种基于Spark的窗口数据歪斜分区优化计划——在裁减窗口数据后,再依据分区键以及工夫片对歪斜数据进行再分区。
数据歪斜介绍
SELECT sum(Amount) OVER w AS sum,FROM inputWINDOW w as (PARTITION By Gender Order By Time ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
在上图的数据中,因为主键“Gender”只有两个值,离线计算最好状况下只能将数据划分到两个partition,即并行度只有2。此时同样的分区资源,计算工作的数据量差距却很大。在后续的计算中,“male”所在的分区计算的工夫必然比“female”所在分区计算的工夫大。当歪斜分区数据质变大的时候,这个工夫差距还会被一直拉大。且因为spark的底层执行里每个partition只有一个thread,这使得整个stage周期里只有两个thread在工作,还有很多其余的thread始终处于闲暇状态,这也会导致重大的性能节约。
传统数据歪斜解决方案
对于歪斜数据的优化,解决基本问题的办法就是对歪斜数据进行再分区,把本来一个歪斜分区内宏大的数据块,扩散成多个小的数据分区。以此来达到对大数据进行拆分从而进步计算效率的目标。
在常见的数据再分区策略中,有通过分区键加上不同前缀从而进行再分区的策略,也有通过多加几列作为分区键进行再分区的策略。然而这些简略的再分区计划,在窗口计算中,都会造成计算错误。
如果采纳数据加前缀再分区的简略分区优化计划,本来同一个partition下的数据会被拆分到不同的partition。而窗口计算波及到数据之间滑动取值的状况,因而如果只是简略的将分区内的数据再拆分,窗口计算将无奈取到本来相邻的数据,这会导致最终计算结果的谬误。
OpenMLDB窗口歪斜优化计划
整体思路
咱们的计划总体思路是在上述歪斜数据再分区的根底上,进一步保障各个再分区的数据块在窗口计算时后果正确。计划里采纳的形式是在每个再分区的数据块中,依据窗口须要滑动的数据条数,进行肯定的窗口数据裁减。
在优化中,总体上采纳的就是再分区+窗口补充的repartition策略来对数据进行分区。思路是采纳空间换工夫的策略,长处是计算工夫短性能高,毛病是补充的窗口数据会造成肯定的数据冗余,导致占用更多内存。
上面具体介绍本计划的技术细节,歪斜优化计划具体的实现次要分为五步,以上面SQL为例。
SELECT SUM(Amount) OVER W1 AS sumFROM InputTableWINDOW W1 AS (PARTITION BY Gender ORDER BY Time ROWS PRECEDING 2 AND CURRENT ROW)
第一步:数据评估——统计窗口分区键的数据分布
这一步须要对总体的数据做一个评估,统计出一些相干的指标,比方数据划分的分界线,以及partition内数据的条数等。参数介绍如下。
参数名 | 解释 |
---|---|
Quantile | 对于数据的拆分是通过传入的“Quantile”参数来确定的,并且咱们采纳的是n等分的机制,Quantile = 4代表了四等分(不肯定能保障严格四等分)。依据“Quantile”参数,咱们就能够划分进去不同值的分界线“percentile_i”,依据数据绝对于分界线的值能够划分出不同的数据块。 |
PRECENTILE | 依据(“Time“)列(SQL中窗口里Order By的值)划分数据块的分界线,PERCENTILE_i为第i条分界线,(”Time“)列合乎(PERCENTILE_i,PERCENTILE_i+1] 的数据为第i个数据块。非凡状况:第一个数据块为(0,PERCENTILE_1],最初一块为(PERCENTILE_n,无穷大) |
总体来说,第一步的数据评估是对数据各项指标进行统计和计算,并在统计后,对数据进行判断以及解决,但因为波及到全量数据的遍历,会比拟耗时。对此咱们也有一个额定的优化,咱们反对通过读取提前预处理好的distribution表来跳过第一步中统计的局部。这样就能够在凌晨或者不须要解决业务时,执行统计工作,将数据后果统计实现,来防止用户须要执行解决逻辑时,在第一步等待时间太久。
// Use skew configval distributionDf = ctx.getSparkSession.read.parquet(ctx.getConf.windowSkewOptConfig)logger.info("Load distribution dataframe")
第二步:数据标记——标记从新分区的编号
这一步依据Distribution Table中对数据的统计后果,来对数据进行划分,并对划分后的数据打上(“PART_ID”)和(“EXPANDED_ROW"),作为不同数据块重分区后的分区标号以及是否为裁减数据的标记。
在最开始的Join中,咱们采纳了Broadcast Join,来晋升Join时的效率。Broadcast Join是Spark中一种能够防止shuffle的Join,个别一张大表和一张小表进行Join时能够应用Broadcast Join,它是通过将小表的数据播送到每个Executor计算节点上,再通过map聚合的形式,来防止了数据的shuffle。在咱们的表中,Distribution Table比Input Table小很多,因而刚好能够采纳Broadcast Join。
在Join之后,能够失去数据分界线,且当PERCENTILE_i为第i条分界线时,合乎(PERCENTILE_i,PERCENTILE_i+1] 的数据就为第i个数据块,采纳固定策略划分完后果之后。就能够依据划分后果,生成新的分区标号——“PART_ID”。表数据介绍如下。
列名 | 解释 |
---|---|
PART_ID | 代表了再分区的ID,在AddColumnTable中,“PART_ID”+分区键雷同的行,就同属于一个新的partition,如Id = 1和Id = 3这两行同属于一个分区。 |
EXPANDED_ROW | 代表了以后行是否是裁减的窗口数据,默认值为false。在下述步 |
第三步:数据裁减——对不同分块的数据进行窗口数据的裁减
对窗口数据进行裁减是OpenMLDB对于窗口歪斜优化中,比拟外围的局部。因为数据较多,为了便于了解,上面只展现“male”局部数据。
具体实现时,咱们对每个须要裁减的数据块进行整体窗口数据的裁减,即通过遍历,对每个须要裁减数据的重分区数据块都裁减到第一条数据。过程图解如下,深色代表以后遍历的分区,浅色代表以后分区须要补充的窗口数据。
1.过滤出须要裁减的数据
对于Time为1和3,“PART_ID" = 1的第一个重分区数据块,因为是工夫最先的数据块,下面曾经没有数据能够给他们补充了,因而会跳过。
对于Time为5,“PART_ID" = 2的第二个重分区数据块,会将所有工夫比以后数据块前的数据都取出来,也就是“PART_ID" = 1的数据块。
对于Time为7,“PART_ID" = 3的第三个重分区数据块同理,将所有工夫比以后数据块前的数据都取出来,也就是取第一个和第二个数据块作为裁减的窗口数据。
后续第四个重分区数据块也同上,将所有须要的数据取出,因而不再赘述。
2.更改过滤数据的ID并进行Union
将数据取出来之后,咱们还须要将(“EXPANDED_ROW")改成true,代表是裁减的窗口数据。改完(“EXPANDED_ROW")之后,只须要一直的和原来的AddColumn Table进行Union,咱们就实现了一个数据块的窗口数据裁减。以第二块数据块为例子,下图Union Table中,不同色彩代表不同的重分区数据块,能够看到通过filter和union,第二块数据块曾经裁减好了数据。
对于其余数据块窗口裁减的形式和第二块数据块形式的思路一样,在过滤以及裁减完后,再和之前的Union表进行Unoin即可。
上面展现最终第四块数据块裁减完窗口上数据后失去的最终Union Table。
第四步:数据分区——依据再分区键进行从新分区
尽管之前咱们通过不同色块来标记不同的再分区数据,但实际上,到了第四步,咱们才真正的对数据进行了重分区,底层咱们依赖了Spark中的repartition函数进行数据重分区。在第三步后,咱们能够失去最终的Union Table,此时只须要依据分区键(“Gender”)和(“PART_ID")进行repartition,就能够将数据拆分到不同的executor上。
第五步:数据计算——对分区后的数据进行计算
在第三步中,咱们晓得那些"EXPANDED_ROW" = false的数据列是新补充进来的窗口数据,而且在理论计算中,他们是不须要参加计算的。因而只须要对"EXPANDED_ROW" = true的数据进行窗口计算,最终便可失去计算结果。
值得特地阐明的是,因为OpenMLDB底层解决引擎是自主研发设计的,因而窗口计算的外部逻辑也是由OpenMLDB实现的。上面贴出相干代码进行解说。
repartitionDf.rdd.mapPartitionsWithIndex { case (partitionIndex, iter) => val computer = WindowAggPlanUtil.createComputer(partitionIndex, hadoopConf, sparkFeConfig, windowAggConfig) windowAggIter(computer, iter, sparkFeConfig, windowAggConfig)}
对于第四步生成的repartitionDf,咱们在外层调用了Spark的mapPartitionsWithIndex办法。之后对于每个分区,OpenMLDB都构建一个computer计算单元,用来解决接下来的窗口计算。之后则是正式进行窗口计算,调用windowAggIter办法。
InputIter.flatMap(row => { if (lastRow != null) { computer.checkPartition(row, lastRow) } lastRow = row val orderKey = computer.extractKey(row) val expandedFlag = row.getBoolean(config.expandedFlagIdx) if (!isValidOrder(orderKey)) { None } else if (!expandedFlag) { Some(computer.compute(row, orderKey, config.keepIndexColumn, config.unionFlagIdx)) } else { computer.bufferRowOnly(row, orderKey) None }})
在windowAggIter办法里,咱们对传进来的迭代器InputIter进行了flatMap操作,之后再查看是否分区内数据有没有分错,如果有分错的row,则会对window进行从新设置。接下来查看orderKey没有问题后,会对expandedFlag也就是上图中的(“EXPANDED_ROW")作判断,如果为true,则证实以后row是裁减的数据,因而computer计算单元只进行buffeRowOnly操作,缓存裁减的窗口数据进内存,为之后实在须要计算的数据应用。如果为false,此时expandedFlag也为false,computer计算单元就进行真正的计算compute,在compute办法里会读取之前缓存的数据并进行计算,之后会返回解决实现的row。compute办法外部是由c实现的,有趣味的同学能够去查看OpenMLDB里相干源码。
性能比照测试
Benchmark性能测试应用Kaggle公开数据集,也就是New York City Taxi Trip Duration比赛的数据集,应用测试的SQL语句如下:
SELECT sum(vendor_id) over w as w_sum_vendor_id, max(vendor_id) over w as w_max_vendor_id, min(vendor_id) over w as w_min_vendor_id, avg(vendor_id) over w as w_avg_vendor_id, sum(pickup_longitude) over w as w_sum_pickup_longitude, max(pickup_longitude) over w as w_max_pickup_longitude, min(pickup_longitude) over w as w_min_pickup_longitude, avg(pickup_longitude) over w as w_avg_pickup_longitudeFROM taxi_skew_allWINDOW w as (partition by vendor_id order by pickup_datetime ROWS BETWEEN 10000 PRECEDING AND CURRENT ROW)
比照开源版本SparkSQL以及开源版本OpenMLDB进行测试,测试后果如下。
计算引擎 | 计算耗时 |
---|---|
SparkSQL(Spark 3.0.0) | 950.98s |
OpenMLDB,未开启歪斜优化 | 224.76s |
OpenMLDB,开启歪斜优化,歪斜分区数2 | 140.74s |
OpenMLDB,开启歪斜优化,歪斜分区数4 | 94.44s |
能够看到,OpenMLDB引擎即便在不开启歪斜优化的状况下,在不同的歪斜比例中,绝对于Spark引擎,依然有4倍以上的性能晋升,这种性能晋升次要是通过OpenMLDB底层高效的引擎实现来保障的。而OpenMLDB在开启了窗口歪斜优化之后,通过调整不同的再分区数,相比OpenMLDB不开启歪斜优化也还能晋升大概60%~140%的性能。
总结
OpenMLDB通过裁减窗口数据加上数据再分区的策略,实现了窗口计算下数据歪斜的优化。策略总体上采纳了空间换工夫的思维,行将本来集中在一个分区中的歪斜数据,在存储空间上进行窗口数据的裁减,之后再将数据扩散至多个分区并行计算,从而减少计算的并行度,来换取更短的计算工夫,并在最终实现了效率的大幅晋升。此外在数据测试中,咱们发现越在极其的歪斜散布下,OpenMLDB越有更好的体现。总的来说,对于窗口计算下的数据歪斜场景,OpenMLDB实现的数据歪斜优化有着不错的成果。
本文介绍了常见的滑动窗口数据歪斜问题,并且分析了OpenMLDB解决数据歪斜的实现计划以及展现最终的性能优化后果。如果你对Spark优化、大规模特色计算、OpenMLDB数据库等感兴趣,咱们会分享更多相似的技术文章,欢送大家持续关注 OpenMLDB专栏 。