乐趣区

关于数据:大数据SQL优化之数据倾斜解决案例全集

1 什么是数据歪斜

数据歪斜即指在大数据计算工作中某个解决工作的过程(通常是一个 JVM 过程)被调配到的任务量过多,导致工作运行工夫超长甚至最终失败,进而导致整个大工作超长工夫运行或者失败。内部体现的话,在 HiveSQL 工作里看到 map 或者 reduce 的进度始终是 99% 继续数小时没有变动;在 SparkSQL 里则是某个 stage 里,正在运行的工作数量长时间是 1 或者 2 不变。总之如果工作进度信息始终在输入,但内容长时间没有任何变动的时候,大概率是呈现数据歪斜了。有个特例须要留神,有时候大家会看到 SparkSQL 的工作信息也显示有 1 到 2 个工作在运行中,但进度信息不再刷新而体现为假死很久的时候,这通常是在进行最初阶段的文件操作,并不是数据歪斜(尽管这通常意味着小文件问题重大)。

再细分一下,歪斜能够分为以下四类:

  1. 读歪斜。即某个 map(HiveSQL)或者 task(SparkSQL)在读取数据阶段长期无奈实现。这通常是因为文件分块过大或者此分块数据有异样。这种场景呈现频率较小。
  2. 算歪斜。即在某个须要排序(如开窗函数或者非播送关联时)或者聚合操作的时候,同一个 key(通常是一个或者多个字段或者表达式的组合)的解决耗时过长。这通常是最多的状况,状况也较为简单。
  3. 写歪斜。即某个操作须要输入大量的数据,比方超过几亿甚至几十亿行。次要呈现在关联后数据收缩及某些只能由一个 task 来操作(如 limit)的状况。
  4. 文件操作歪斜。即数据生成在长期文件夹后,因为数量微小,重命名和挪动的操作十分耗时。这通常产生在动静分区导致小文件的状况。目前在国内和印度区域曾经因为咱们默认进行小文件合并而不再存在这个状况,新加坡还有(咱们在推动解决)。

2 为什么会有数据歪斜

大数据计算依赖多种分布式系统,须要将所有的计算工作和数据通过肯定的规定散发到集群中各个可用的机器和节点下来执行,最初可能还须要进行汇总到多数节点进行最初的聚合操作,以及数据写到 HDFS/S3 等分布式存储系统里以永贮存。这个过程被设计来应答大多数状况,并不能应答所有的状况。它具备以下几个特点:

  1. 业务数据分布法则无奈预知。比方零碎无奈不通过计算而提前晓得某个表的某个字段的取值散布是否大抵平均。
  2. 计算结果数量无奈预知。比方两表关联的后果对于某些 key(关联的一个字段或者多个字段组合)的输入行数无奈不通过计算而预知进而针对性解决;又比方对某个字段的值进行 split 操作或者 explode 等操作后产生的后果数量无奈预知而进行针对性的应答。
  3. 某些操作只能由繁多节点进行。所有须要保护一个全局状态的大多数操作,如排序,Limit,count distinct,全局聚合等,个别会安顿到一个节点来执行。

上述三个次要特点导致单节点解决的数据量有概率呈现巨量,造成了所谓的歪斜问题。当然,这些艰难并不是不可解决的。随着工夫的推移,越来越多的针对性的优化措施已逐步呈现,兴许在不久的未来业务同学不会再被歪斜问题懊恼。

3 解决案例

因为将来在 OPPO 主推 SparkSQL,因而以下案例将次要以 SparkSQL 的角度来展现。

3.1 事实表关联事实表数据收缩

最近有两个业务同学提出一个比拟麻烦的问题,就是事实表关联事实表,其中有若干个 key 的输入达数十亿行,数据收缩重大,造成数据计算和输入的歪斜。

比方以下场景:

咱们统计了两个表的歪斜 KEY 值散布:

a 表:

b 表:

大家能够看出,

只看 option_id= 7 的关联后果最初是 46839*130836=6128227404,即 61 亿行;
option_id= 2 的关联后果是 71080*125541=8923454280,即 89 亿行。
属于重大歪斜的状况。

这种事实表关联事实表的状况在非报表类的计算工作偶然会遇到。平时咱们解决数据歪斜次要是计算结果的过程波及太多数据要解决导致慢,但通常输入的行数可能并不多,不存在写的艰难,所以相似过滤异样数据或者播送关联等办法都不起作用。

这个问题的实质是一个 task 最多由一个过程来执行,而雷同的 key 也必须在同一个 task 中解决,因而在无奈扭转这个机制的前提下,咱们只有想方法缩小一个 task 输入的行数。

那如何在不影响最终后果的前提下,缩小单个 task 所须要解决数据行数呢?

其实网上也有许多倡议,都是独自解决歪斜的 key,通过加前缀后缀等形式打散 key,再最初合并解决,但这样做法太麻烦了,不够优雅。咱们要谋求对业务同学更敌对,代码更优雅的形式。

最初我寻遍所有可用的零碎函数,发现了 collect_set/collect_list 这个聚合函数,能够在保证数据关系不失落的前提下将数据收拢缩小行数。比方以下两行:

能够收拢成一行:

最初咱们通过 explode+lateral view 的形式,能够实现一行开展为多行,从而还原成用户最初冀望的明细后果形式。

上述方法的外围是将原来歪斜的操作(同一个 key 关联),批改为不再相互依赖的操作(一行变多行)。

最终代码如下:

留神以上代码里值得注意的中央:

  • 代码里的 hint(repartition(1000))的作用是思考到通过 collect_list 聚合后的数据单行携带的数据通过一行变多行的开展操作后会收缩很多倍,因而单个工作解决的数据量必须很小,能力保障处理速度够快。这个 hint 的作用是通知零碎将上一阶段关联后的后果分成 1000 份,交给上游解决;
  • group by 语句里的 ceil(rand()*N)作用是将一个 key 分成最多 N 行,这样能够限度最初按 key 关联后生成的行数的下限;
  • 通过 spark.sql.files.maxPartitionBytes 参数管制单个工作解决的数据量,进一步拆分单个工作须要解决的数据。事实上如果第 1 点里文件足够小,这个参数能够省略。

通过验证,20 分钟工作就实现了,生成了近 800 亿行的数据,其中包含了 19 个超十亿行的 key。

3.2 防止排序

有一些算法根底的同学都晓得排序操作在软件畛域是开销十分大的操作,目前大规模利用的几大排序算法的工夫复杂度中最好的也是 O(nlogn),即随着数据量的增长而非线性的增长。这就是说,大规模数据量的排序往往意味着微小的工夫耗费。然而这在大数据 SQL 中却是常见的状况,从而引发歪斜。一旦有了排序的需要,什么优化参数都不好使了,一般来说只有进行改写代码。侥幸的是,在绝大多数大数据场景下,排序是不必要的,很多时候只是业务同学不解排序在大数据场景下的开销很大而信手写下了排序代码。上面介绍 2 个改写代码从而防止排序的案例。

1)用 max 函数替换排序。

最近收到一个共事的业务需要,须要对某个业务的埋点数据做一次样本展现,要在约 1200 亿行数据中,捞出约 1 万条数据。很简略的一个 SQL 如下:

略微解释一下 SQL 的意思:心愿取出上报数据里针对某个维度组合的一条内容较为丰盛的样本数据,因而以某字段的 size 作为降序排序并取后果的第一条。

这个 SQL 当然跑失败了。我对 partition by 的字段汇合(后续简称 key)进行了统计,最大的 key 有 137 亿行,另外还有至多 10 个 key 的数据量超过 20 亿行。这样 executor 的内存加得再大都无奈跑胜利了。

这个问题的实质还是对大数据做了不必要的排序(大数据架构里对排序暂无十分高效的解决方法)。因而优化的思路还是想方法缩小这种不必要排序。

既然用户只须要排序后的最大的一条,实质上不就是取某个 key 的最大值嘛。取出这个最大值,最初再跟源表进行关联,就能够取出最大值对应的那一条数据。

这里有个前提条件,要想在第二步关联回源表数据的时候干掉排序,咱们只有走一条路:播送关联(如果走 sort-meger 关联,还是会防止不了 sort 步骤)。这就要求咱们的小表(key- 最大值)要足够小。通常这个条件都会满足的,因为如果不满足的话,阐明 key 值十分多,十分稠密,也不会产生歪斜的窘境了。如开始就阐明了,最初 Key 的去重数据量不到 1 万条,齐全能够走播送关联。

最初的代码如下:

留神上述 SQL 有两点阐明:

  • 咱们应用了 semi join,这在日常代码中比拟少见。它的意思是,左表去匹配右表,如果一旦发现左表的某条数据的关联 key 在右表,便保留此条左表的数据,不再持续在右表里查找了。这样做有两个后果:1)速度更快;2)不会把右表的数据放到后果里)。它等价于 select * from left_table where key in (select key from right_table)。但大数据倒退过程中一度不反对 in 的用法(当初局部反对了),因而有这种语法,从效率上看,个别认为这样更高效。
  • 因为能匹配到最大值的数据可能有许多条,所以对最初后果再做一次 row_number 的开窗并取其中一条即可。这个时候因为 size(xxxx)的值都是一样的,因而任意取一条均合乎业务需要。

在个别状况下,上述 SQL 能较好的运行。但咱们这次状况出了点意外:通过上述操作后,咱们失去的数据还有 800 多亿行。因为 max(size(xxxx) = size(xxxx)的数据占了绝大多数,导致咱们匹配回去无奈无效的筛选出大量后果。咱们必须找到一个能无效辨别各行数据的字段,这个字段的值必须很涣散。最初我发现比拟好的是 userid。因而将 max(size(xxxx))替换成了 max(userid),工作很快就跑完了。因为不影响咱们讲述优化的原理,所以不再形容这部分细节。

2)用分位函数替换排序。

在一个画像工作相干跑得很慢时,业务同学求助于咱们,发现慢的代码如下:

问题点:下面的代码是想做一个全局排序,而后应用其序号所在位置来进行分类打标。上述代码在排序数据小于 5 亿 5 千万行的状况下勉强能运行出后果。但在某一天数据量到了 5 亿 5 千万行后就跑不进去,加了 reducer 的内存到 10G 也不行。

新思路:尽管可能还有一些参数能调整,但我认为这不是正确的方向,于是进行了钻研,把方向转为干掉全局排序。在和一位前辈沟通的时候,忽然意识到,既然业务是想做一个分档,实质上就并不需要具体的排序号,所以实践上齐全的排序是能够省掉的。于是天然想到了分位数函数,立马想到了新计划。分位函数计算出数据必须大于或者等于某个值能力处于整个数据排序的某个地位。详情请大家自行搜寻。

改之后代码如下:

留神上述代码有个小技巧,即与只有一行的子查问后果进行笛卡尔积关联,从而变相的实现了引入 p2 到 p8 等 4 个变量的成果,还算实用。

成果:比照了新旧算法的后果,差别极小,也在预期范畴内。

再比照了工作执行工夫,约有 87% 的降幅:

这个案例的实质在于辨认出了费尽资源计算的全局序号是齐全不必要的。相似的状况在咱们的业务代码里还存在很多,只是目前尚在业务可承受的范畴内,存在十分大的优化空间。心愿将来能发展专项,以节俭计算工夫和资源。

3)通过播送关联彻底防止排序。

SparkSQL 目前解决关联 (join) 的办法次要有两种:

a) 播送关联。小表(通过参数 spark.sql.autoBroadcastJoinThreshold 管制,目前咱们的默认值是 20M)的话会采纳播送关联,行将小表的全副数据传输到各节点的内存中,通过间接的内存操作疾速实现关联。这种形式最大的益处是防止了对主表的数据进行 shuffle,但会减少工作应用的内存量。另外特地阐明 3 点:

  • 目前咱们的 sparksql 优化器尚不能十分精确地判断一个子查问后果(也被当成一张小表)是否适宜进行播送,因而还在跟进解决中;
  • 左表无论大小都不能被播送;
  • 某些状况下会有相似:Kryo serialization failed: Buffer overflow 这样的 OOM 呈现,并“To avoid this, increase spark.kryoserializer.buffer.max value”。但其实这样设置会有效。本质起因是:尽管某张表小于 32M,但因为高度压缩后,解压后果的行数达到了数千万,造成了节点的 OOM。这个时候,只能手动禁掉播送关联。

b) Sort-Merge 关联。即先将两表按连贯字段进行排序,而后在些根底上进行匹配关联。因为数据是排序过的,只须要一次性的匹配即可实现最终的关联,速度较快。但这种办法的弊病是要进行对关联 key 的排序,并且每个雷同的 Key 和对应的数据必须调配到一个 executor 里,引发大量的 shuffle 操作;另一方面如果一个 executor 须要解决一个巨量的 key,通常会破费大量的工夫以及大量的磁盘 IO。

通过上述原理形容能够看出如果采纳播送关联,引擎齐全不必做任何排序,天然也不会有排序带来的歪斜了,这是效率微小的晋升,当然代价就是会减少内存占用。一般来说这种内存应用的减少被认为是划算的。

如果引擎没有辨认进去,咱们能够通过被动批示的方法影响执行打算。比方以下:

要让执行打算改成播送 s 子查问后果,加 hint mapjoin(也能够是 broadcast)就能够了。

从理论的后果看,播送关联的提速都有翻倍以上的成果。

3.3 写歪斜的防止

这部分简要形容一下。在动静分区场景下,咱们经常很难意料最初每个分区将要输入的数据量会是多少,但调配的 task 数量对于每个最终分区都是固定的。以国家分区条件为例,印尼这个分区如果是输入 10 亿行,而新加坡只输入 100 万行,这个时候如果咱们只调配 2 个工作去写数据,印尼这个分区单个工作会接受 1 亿行的工作,会十分慢。而如果设置为 100 个工作来写数据,对印尼这个分区来说是比拟适合的,但新加坡这个分辨别产生 100 个小文件,对后续的文件操作和将来上游工作的读取都有消极的影响。最初通过实际后,找到一个比拟好的方法。即找出歪斜的分区 key,通过 distribute by + case when 表达式,让引擎对不同的分区做不同数量的数据散发。具体代码(以 region 为动静分区字段):

目前这种状况在海内工作上还须要利用,将来随着咱们推动 AWS 解决小文件主动合并问题,应该不必再操心了。

3.4 非法值过滤

这应该是网上讲得比拟多的方法,我也简略说下。

在优化策略生态部门的工作 dwd_ocloud_dau_info_d 工作的时候,咱们发现工作的运行工夫始终在增长,一度达到 7 个小时,直到 8 月 1 号便再也跑不胜利,总是 OOM(内存不够),即便将 executor 的内存调高到 10G 仍然解决不了问题。通过认真诊断,发现工作慢在一个开窗函数阶段,代码如下:

在对 guid 这个 key 进行初步统计后,发现为空值的数量居然有数亿行,并一直增长:

这也就解释了运行时长一直增长,排序的内存开销和时长都一直增长。通过和业务同学的沟通,确认空值无意义,进行排除:

而后在默认的参数下进行了重跑,30 分钟内就跑完了。耗时降落约 90%,成果显著。

这个例子里,歪斜值恰好是有效的能够间接过滤,比拟侥幸。那同学们会问,如果歪斜值是有价值的怎么办?通常来说是须要将这类歪斜值独自拎进去以另外一套针对性的逻辑来计算,而后将后果 union all 回到其余非歪斜的数据计算结果里。

4 结语

数据歪斜解决的状况基本上局限在上述案例分类里,置信大家稍加学习都能把握。将来咱们有打算开发诊断和优化的工具,重点帮大家找出歪斜的节点和提出代码级别的优化倡议。敬请期待!

作者简介

Luckyfish OPPO 大数据服务质量负责人

次要负责大数据平台反对保护及服务质量保障工作,曾供职于京东科技,有较丰盛的大数据工作开发和性能优化教训,同时对产品体验和老本优化有较多趣味和教训。

获取更多精彩内容,请扫码关注 [OPPO 数智技术] 公众号

退出移动版