简介
OpenMLDB 是针对 AI 场景优化的开源数据库我的项目,实现了数据与计算一致性的离线 MPP 场景和在线 OLTP 场景计算引擎。MPP 引擎可基于 Spark 实现,并通过拓展 Spark 源码实现数倍性能晋升。本文次要解释 OpenMLDB 如何基于 Spark 来解决窗口数据的歪斜问题。
背景
数据歪斜是在大数据处理场景下常见的一种景象,它由某一分区数据量过大造成。数据歪斜会导致歪斜分区与其余分区的运算工夫产生微小差距,换句话说就是歪斜数据分区的计算工作与其 cpu 资源重大不匹配。最终会造成多等一的状况——多个小数据量的分区计算结束后期待歪斜的大数据量分区,只有歪斜分区计算结束能力输入后果。这对效率来说是微小的劫难。
在机器学习的特色计算中,波及到很多的窗口计算。在窗口计算下,如果呈现繁多 key 数据量过大,也会导致某一分区数据过多,从而产生数据歪斜问题。而传统数据歪斜中分区优化的计划,如:数据加前缀再分区,是不适宜窗口计算场景的。它会导致窗口计算场景下最终计算结果谬误。因而 OpenMLDB 提出了一种基于 Spark 的窗口数据歪斜分区优化计划——在裁减窗口数据后,再依据分区键以及工夫片对歪斜数据进行再分区。
数据歪斜介绍
SELECT sum(Amount) OVER w AS sum,
FROM input
WINDOW w as (PARTITION By Gender Order By Time ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
在上图的数据中,因为主键“Gender”只有两个值,离线计算最好状况下只能将数据划分到两个 partition,即并行度只有 2。此时同样的分区资源,计算工作的数据量差距却很大。在后续的计算中,“male”所在的分区计算的工夫必然比“female”所在分区计算的工夫大。当歪斜分区数据质变大的时候,这个工夫差距还会被一直拉大。且因为 spark 的底层执行里每个 partition 只有一个 thread,这使得整个 stage 周期里只有两个 thread 在工作,还有很多其余的 thread 始终处于闲暇状态,这也会导致重大的性能节约。
传统数据歪斜解决方案
对于歪斜数据的优化,解决基本问题的办法就是对歪斜数据进行再分区,把本来一个歪斜分区内宏大的数据块,扩散成多个小的数据分区。以此来达到对大数据进行拆分从而进步计算效率的目标。
在常见的数据再分区策略中,有通过分区键加上不同前缀从而进行再分区的策略,也有通过多加几列作为分区键进行再分区的策略。然而这些简略的再分区计划,在窗口计算中,都会造成计算错误。
如果采纳数据加前缀再分区的简略分区优化计划,本来同一个 partition 下的数据会被拆分到不同的 partition。而窗口计算波及到数据之间滑动取值的状况,因而如果只是简略的将分区内的数据再拆分,窗口计算将无奈取到本来相邻的数据,这会导致最终计算结果的谬误。
OpenMLDB 窗口歪斜优化计划
整体思路
咱们的计划总体思路是在上述歪斜数据再分区的根底上,进一步保障各个再分区的数据块在窗口计算时后果正确。计划里采纳的形式是在每个再分区的数据块中,依据窗口须要滑动的数据条数,进行肯定的窗口数据裁减。
在优化中,总体上采纳的就是再分区 + 窗口补充的 repartition 策略来对数据进行分区。思路是采纳空间换工夫的策略,长处是计算工夫短性能高,毛病是补充的窗口数据会造成肯定的数据冗余,导致占用更多内存。
上面具体介绍本计划的技术细节,歪斜优化计划具体的实现次要分为五步,以上面 SQL 为例。
SELECT SUM(Amount) OVER W1 AS sum
FROM InputTable
WINDOW W1 AS (PARTITION BY Gender ORDER BY Time ROWS PRECEDING 2 AND CURRENT ROW)
第一步:数据评估——统计窗口分区键的数据分布
这一步须要对总体的数据做一个评估,统计出一些相干的指标,比方数据划分的分界线,以及 partition 内数据的条数等。参数介绍如下。
参数名 | 解释 |
---|---|
Quantile | 对于数据的拆分是通过传入的“Quantile”参数来确定的,并且咱们采纳的是 n 等分的机制,Quantile = 4 代表了四等分(不肯定能保障严格四等分)。依据“Quantile”参数,咱们就能够划分进去不同值的分界线“percentile_i”,依据数据绝对于分界线的值能够划分出不同的数据块。 |
PRECENTILE | 依据(“Time“)列(SQL 中窗口里 Order By 的值)划分数据块的分界线,PERCENTILE_i 为第 i 条分界线,(”Time“)列合乎 (PERCENTILE_i,PERCENTILE_i+1] 的数据为第 i 个数据块。非凡状况:第一个数据块为(0,PERCENTILE_1],最初一块为(PERCENTILE_n,无穷大) |
总体来说,第一步的数据评估是对数据各项指标进行统计和计算,并在统计后,对数据进行判断以及解决,但因为波及到全量数据的遍历,会比拟耗时。对此咱们也有一个额定的优化,咱们反对通过读取提前预处理好的 distribution 表来跳过第一步中统计的局部。这样就能够在凌晨或者不须要解决业务时,执行统计工作,将数据后果统计实现,来防止用户须要执行解决逻辑时,在第一步等待时间太久。
// Use skew config
val distributionDf = ctx.getSparkSession.read.parquet(ctx.getConf.windowSkewOptConfig)
logger.info("Load distribution dataframe")
第二步:数据标记——标记从新分区的编号
这一步依据 Distribution Table 中对数据的统计后果,来对数据进行划分,并对划分后的数据打上(“PART_ID”)和(“EXPANDED_ROW”),作为不同数据块重分区后的分区标号以及是否为裁减数据的标记。
在最开始的 Join 中,咱们采纳了 Broadcast Join,来晋升 Join 时的效率。Broadcast Join 是 Spark 中一种能够防止 shuffle 的 Join,个别一张大表和一张小表进行 Join 时能够应用 Broadcast Join,它是通过将小表的数据播送到每个 Executor 计算节点上,再通过 map 聚合的形式,来防止了数据的 shuffle。在咱们的表中,Distribution Table 比 Input Table 小很多,因而刚好能够采纳 Broadcast Join。
在 Join 之后,能够失去数据分界线,且当 PERCENTILE_i 为第 i 条分界线时,合乎 (PERCENTILE_i,PERCENTILE_i+1] 的数据就为第 i 个数据块,采纳固定策略划分完后果之后。就能够依据划分后果,生成新的分区标号——“PART_ID”。表数据介绍如下。
列名 | 解释 |
---|---|
PART_ID | 代表了再分区的 ID,在 AddColumnTable 中,“PART_ID”+ 分区键雷同的行,就同属于一个新的 partition,如 Id = 1 和 Id = 3 这两行同属于一个分区。 |
EXPANDED_ROW | 代表了以后行是否是裁减的窗口数据,默认值为 false。在下述步 |
第三步:数据裁减——对不同分块的数据进行窗口数据的裁减
对窗口数据进行裁减是 OpenMLDB 对于窗口歪斜优化中,比拟外围的局部。因为数据较多,为了便于了解,上面只展现“male”局部数据。
具体实现时,咱们对每个须要裁减的数据块进行整体窗口数据的裁减,即通过遍历,对每个须要裁减数据的重分区数据块都裁减到第一条数据。过程图解如下,深色代表以后遍历的分区,浅色代表以后分区须要补充的窗口数据。
1. 过滤出须要裁减的数据
对于 Time 为 1 和 3,“PART_ID” = 1 的第一个重分区数据块,因为是工夫最先的数据块,下面曾经没有数据能够给他们补充了,因而会跳过。
对于 Time 为 5,“PART_ID” = 2 的第二个重分区数据块,会将所有工夫比以后数据块前的数据都取出来,也就是“PART_ID” = 1 的数据块。
对于 Time 为 7,“PART_ID” = 3 的第三个重分区数据块同理,将所有工夫比以后数据块前的数据都取出来,也就是取第一个和第二个数据块作为裁减的窗口数据。
后续第四个重分区数据块也同上,将所有须要的数据取出,因而不再赘述。
2. 更改过滤数据的 ID 并进行 Union
将数据取出来之后,咱们还须要将(“EXPANDED_ROW”)改成 true,代表是裁减的窗口数据。改完(“EXPANDED_ROW”)之后,只须要一直的和原来的 AddColumn Table 进行 Union,咱们就实现了一个数据块的窗口数据裁减。以第二块数据块为例子,下图 Union Table 中,不同色彩代表不同的重分区数据块,能够看到通过 filter 和 union,第二块数据块曾经裁减好了数据。
对于其余数据块窗口裁减的形式和第二块数据块形式的思路一样,在过滤以及裁减完后,再和之前的 Union 表进行 Unoin 即可。
上面展现最终第四块数据块裁减完窗口上数据后失去的最终 Union Table。
第四步:数据分区——依据再分区键进行从新分区
尽管之前咱们通过不同色块来标记不同的再分区数据,但实际上,到了第四步,咱们才真正的对数据进行了重分区,底层咱们依赖了 Spark 中的 repartition 函数进行数据重分区。在第三步后,咱们能够失去最终的 Union Table,此时只须要依据分区键(“Gender”)和(“PART_ID”)进行 repartition,就能够将数据拆分到不同的 executor 上。
第五步:数据计算——对分区后的数据进行计算
在第三步中,咱们晓得那些 ”EXPANDED_ROW” = false 的数据列是新补充进来的窗口数据,而且在理论计算中,他们是不须要参加计算的。因而只须要对 ”EXPANDED_ROW” = true 的数据进行窗口计算,最终便可失去计算结果。
值得特地阐明的是,因为 OpenMLDB 底层解决引擎是自主研发设计的,因而窗口计算的外部逻辑也是由 OpenMLDB 实现的。上面贴出相干代码进行解说。
repartitionDf.rdd.mapPartitionsWithIndex {case (partitionIndex, iter) =>
val computer = WindowAggPlanUtil.createComputer(partitionIndex, hadoopConf, sparkFeConfig, windowAggConfig)
windowAggIter(computer, iter, sparkFeConfig, windowAggConfig)
}
对于第四步生成的 repartitionDf,咱们在外层调用了 Spark 的 mapPartitionsWithIndex 办法。之后对于每个分区,OpenMLDB 都构建一个 computer 计算单元,用来解决接下来的窗口计算。之后则是正式进行窗口计算,调用 windowAggIter 办法。
InputIter.flatMap(row => {if (lastRow != null) {computer.checkPartition(row, lastRow)
}
lastRow = row
val orderKey = computer.extractKey(row)
val expandedFlag = row.getBoolean(config.expandedFlagIdx)
if (!isValidOrder(orderKey)) {None} else if (!expandedFlag) {Some(computer.compute(row, orderKey, config.keepIndexColumn, config.unionFlagIdx))
} else {computer.bufferRowOnly(row, orderKey)
None
}
})
在 windowAggIter 办法里,咱们对传进来的迭代器 InputIter 进行了 flatMap 操作,之后再查看是否分区内数据有没有分错,如果有分错的 row,则会对 window 进行从新设置。接下来查看 orderKey 没有问题后,会对 expandedFlag 也就是上图中的(“EXPANDED_ROW”)作判断,如果为 true,则证实以后 row 是裁减的数据,因而 computer 计算单元只进行 buffeRowOnly 操作,缓存裁减的窗口数据进内存,为之后实在须要计算的数据应用。如果为 false,此时 expandedFlag 也为 false,computer 计算单元就进行真正的计算 compute,在 compute 办法里会读取之前缓存的数据并进行计算,之后会返回解决实现的 row。compute 办法外部是由 c 实现的,有趣味的同学能够去查看 OpenMLDB 里相干源码。
性能比照测试
Benchmark 性能测试应用 Kaggle 公开数据集,也就是 New York City Taxi Trip Duration 比赛的数据集,应用测试的 SQL 语句如下:
SELECT
sum(vendor_id) over w as w_sum_vendor_id,
max(vendor_id) over w as w_max_vendor_id,
min(vendor_id) over w as w_min_vendor_id,
avg(vendor_id) over w as w_avg_vendor_id,
sum(pickup_longitude) over w as w_sum_pickup_longitude,
max(pickup_longitude) over w as w_max_pickup_longitude,
min(pickup_longitude) over w as w_min_pickup_longitude,
avg(pickup_longitude) over w as w_avg_pickup_longitude
FROM taxi_skew_all
WINDOW w as (partition by vendor_id order by pickup_datetime ROWS BETWEEN 10000 PRECEDING AND CURRENT ROW)
比照开源版本 SparkSQL 以及开源版本 OpenMLDB 进行测试,测试后果如下。
计算引擎 | 计算耗时 |
---|---|
SparkSQL(Spark 3.0.0) | 950.98s |
OpenMLDB,未开启歪斜优化 | 224.76s |
OpenMLDB,开启歪斜优化,歪斜分区数 2 | 140.74s |
OpenMLDB,开启歪斜优化,歪斜分区数 4 | 94.44s |
能够看到,OpenMLDB 引擎即便在不开启歪斜优化的状况下,在不同的歪斜比例中,绝对于 Spark 引擎,依然有 4 倍以上的性能晋升,这种性能晋升次要是通过 OpenMLDB 底层高效的引擎实现来保障的。而 OpenMLDB 在开启了窗口歪斜优化之后,通过调整不同的再分区数,相比 OpenMLDB 不开启歪斜优化也还能晋升大概 60%~140% 的性能。
总结
OpenMLDB 通过裁减窗口数据加上数据再分区的策略,实现了窗口计算下数据歪斜的优化。策略总体上采纳了空间换工夫的思维,行将本来集中在一个分区中的歪斜数据,在存储空间上进行窗口数据的裁减,之后再将数据扩散至多个分区并行计算,从而减少计算的并行度,来换取更短的计算工夫,并在最终实现了效率的大幅晋升。此外在数据测试中,咱们发现越在极其的歪斜散布下,OpenMLDB 越有更好的体现。总的来说,对于窗口计算下的数据歪斜场景,OpenMLDB 实现的数据歪斜优化有着不错的成果。
本文介绍了常见的滑动窗口数据歪斜问题,并且分析了 OpenMLDB 解决数据歪斜的实现计划以及展现最终的性能优化后果。如果你对 Spark 优化、大规模特色计算、OpenMLDB 数据库等感兴趣,咱们会分享更多相似的技术文章,欢送大家持续关注 OpenMLDB 专栏。