大数据SQL优化之数据倾斜解决案例全集

2022年01月14日 阅读数:2
这篇文章主要向大家介绍大数据SQL优化之数据倾斜解决案例全集,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

1 什么是数据倾斜算法

数据倾斜即指在大数据计算任务中某个处理任务的进程(一般是一个JVM进程)被分配到的任务量过多,致使任务运行时间超长甚至最终失败,进而致使整个大任务超长时间运行或者失败。外部表现的话,在HiveSQL任务里看到map或者reduce的进度一直是99%持续数小时没有变化;在SparkSQL里则是某个stage里,正在运行的任务数量长时间是1或者2不变。总之若是任务进度信息一直在输出,但内容长时间没有任何变化的时候,大几率是出现数据倾斜了。有个特例须要注意,有时候你们会看到SparkSQL的任务信息也显示有1到2个任务在运行中,但进度信息再也不刷新而表现为假死好久的时候,这一般是在进行最后阶段的文件操做,并非数据倾斜(虽然这一般意味着小文件问题严重)。sql

再细分一下,倾斜能够分为如下四类:性能优化

1)读倾斜。即某个map(HiveSQL)或者task(SparkSQL)在读取数据阶段长期没法完成。这一般是由于文件分块过大或者此分块数据有异常。这种场景出现频率较小。微信

2)算倾斜。即在某个须要排序(如开窗函数或者非广播关联时)或者聚合操做的时候,同一个key(一般是一个或者多个字段或者表达式的组合)的处理耗时过长。这一般是最多的状况,状况也较为复杂。架构

3)写倾斜。即某个操做须要输出大量的数据,好比超过几亿甚至几十亿行。主要出如今关联后数据膨胀及某些只能由一个task来操做(如limit)的状况。分布式

4)文件操做倾斜。即数据生成在临时文件夹后,因为数量巨大,重命名和移动的操做很是耗时。这一般发生在动态分区致使小文件的状况。目前在国内和印度区域已经由于咱们默认进行小文件合并而再也不存在这个状况,新加坡还有(咱们在推进解决)。svg

2 为何会有数据倾斜函数

大数据计算依赖多种分布式系统,须要将全部的计算任务和数据通过必定的规则分发到集群中各个可用的机器和节点上去执行,最后可能还须要进行汇总到少数节点进行最后的聚合操做,以及数据写到HDFS/S3等分布式存储系统里以永储存。这个过程被设计来应对大多数状况,并不能应对全部的状况。它具备如下几个特色:工具

1)业务数据分布规律没法预知。好比系统没法不通过计算而提早知道某个表的某个字段的取值分布是否大体均匀。性能

2)计算结果数量没法预知。好比两表关联的结果对于某些key(关联的一个字段或者多个字段组合)的输出行数没法不通过计算而预知进而针对性处理;又好比对某个字段的值进行split操做或者explode等操做后产生的结果数量没法预知而进行针对性的应对。

3)某些操做只能由单一节点进行。一切须要维护一个全局状态的大多数操做,如排序,Limit,count distinct,全局聚合等,通常会安排到一个节点来执行。

上述三个主要特色致使单节点处理的数据量有几率出现巨量,形成了所谓的倾斜问题。固然,这些困难并非不可解决的。随着时间的推移,愈来愈多的针对性的优化措施已逐渐出现,也许在不久的未来业务同窗不会再被倾斜问题烦恼。

3 解决案例

因为将来在OPPO主推SparkSQL,所以如下案例将主要以SparkSQL的角度来展现。

3.1 事实表关联事实表数据膨胀

最近有两个业务同窗提出一个比较麻烦的问题,就是事实表关联事实表,其中有若干个key的输出达数十亿行,数据膨胀严重,形成数据计算和输出的倾斜。

好比如下场景:

咱们统计了两个表的倾斜KEY值分布:

a表:

b表:

你们能够看出,

只看option_id=7的关联结果最后是

46839*130836=6128227404,即61亿行;

option_id=2的关联结果是

71080*125541=8923454280,即89亿行。

属于严重倾斜的状况。

这种事实表关联事实表的状况在非报表类的计算任务偶尔会遇到。平时咱们解决数据倾斜主要是计算结果的过程涉及太多数据要处理致使慢,但一般输出的行数可能并很少,不存在写的困难,因此相似过滤异常数据或者广播关联等方法都不起做用。

这个问题的本质是一个task最多由一个进程来执行,而相同的key也必须在同一个task中处理,所以在没法改变这个机制的前提下,咱们只有想办法减小一个task输出的行数。

那如何在不影响最终结果的前提下,减小单个task所须要处理数据行数呢?

其实网上也有许多建议,都是单独处理倾斜的key,经过加前缀后缀等方式打散key,再最后合并处理,但这样作法太麻烦了,不够优雅。咱们要追求对业务同窗更友好,代码更优雅的方式。

最后我寻遍全部可用的系统函数,发现了collect_set/collect_list这个聚合函数,能够在保证数据关系不丢失的前提下将数据收拢减小行数。好比如下两行:

能够收拢成一行:

最后咱们经过explode+lateral view的方式,能够实现一行展开为多行,从而还原成用户最后指望的明细结果方式。

上述办法的核心是将原来倾斜的操做(同一个key关联),修改成再也不相互依赖的操做(一行变多行)。

最终代码以下:

注意以上代码里值得注意的地方:

•代码里的hint(repartition(1000))的做用是考虑到通过collect_list聚合后的数据单行携带的数据通过一行变多行的展开操做后会膨胀不少倍,所以单个任务处理的数据量必须很小,才能保证处理速度够快。这个hint的做用是告诉系统将上一阶段关联后的结果分红1000份,交给下游处理;

•group by语句里的ceil(rand()*N)做用是将一个key分红最多N行,这样能够限制最后按key关联后生成的行数的上限;

•经过spark.sql.files.maxPartitionBytes参数控制单个任务处理的数据量,进一步拆分单个任务须要处理的数据。事实上若是第1点里文件足够小,这个参数能够省略。

通过验证,20分钟任务就完成了,生成了近800亿行的数据,其中包括了19个超十亿行的key。

3.2 避免排序

有一些算法基础的同窗都知道排序操做在软件领域是开销很是大的操做,目前大规模应用的几大排序算法的时间复杂度中最好的也是O(nlogn),即随着数据量的增加而非线性的增加。这就是说,大规模数据量的排序每每意味着巨大的时间消耗。然而这在大数据SQL中倒是常见的状况,从而引起倾斜。一旦有了排序的需求,什么优化参数都很差使了, 通常来讲只有进行改写代码。幸运的是,在绝大多数大数据场景下,排序是没必要要的,不少时候只是业务同窗不解排序在大数据场景下的开销很大而信手写下了排序代码。下面介绍2个改写代码从而避免排序的案例。

1)用max函数替换排序。

最近收到一个同事的业务需求,须要对某个业务的埋点数据作一次样本展现,要在约1200亿行数据中,捞出约1万条数据。很简单的一个SQL以下:

稍微解释一下SQL的意思:但愿取出上报数据里针对某个维度组合的一条内容较为丰富的样本数据,所以以某字段的size做为降序排序并取结果的第一条。

这个SQL固然跑失败了。我对partition by的字段集合(后续简称key)进行了统计,最大的key有137亿行,另外还有至少10个key的数据量超过20亿行。这样executor的内存加得再大都没法跑成功了。

这个问题的本质仍是对大数据作了没必要要的排序(大数据架构里对排序暂无很是高效的处理办法)。所以优化的思路仍是想办法减小这种没必要要排序。

既然用户只须要排序后的最大的一条,本质上不就是取某个key的最大值嘛。取出这个最大值,最后再跟源表进行关联,就能够取出最大值对应的那一条数据。

这里有个前提条件,要想在第二步关联回源表数据的时候干掉排序,咱们只有走一条路:广播关联(若是走sort-meger关联,仍是会避免不了sort步骤)。这就要求咱们的小表(key-最大值)要足够小。一般这个条件都会知足的,由于若是不知足的话,说明key值很是多,很是稀疏,也不会产生倾斜的困境了。如开始就说明了,最后Key的去重数据量不到1万条,彻底能够走广播关联。

最后的代码以下:

注意上述SQL有两点说明:

•咱们使用了semi join,这在平常代码中比较少见。它的意思是,左表去匹配右表,若是一旦发现左表的某条数据的关联key在右表,便保留此条左表的数据,再也不继续在右表里查找了。这样作有两个结果:1)速度更快;2)不会把右表的数据放到结果里)。它等价于 select * from left_table where key in (select key from right_table)。但大数据发展过程当中一度不支持in的用法(如今部分支持了),所以有这种语法,从效率上看,通常认为这样更高效。

•由于能匹配到最大值的数据可能有许多条,因此对最后结果再作一次row_number的开窗并取其中一条便可。这个时候因为size(xxxx)的值都是同样的,所以任意取一条均符合业务需求。

在通常状况下,上述SQL能较好的运行。但咱们此次状况出了点意外:通过上述操做后,咱们获得的数据还有800多亿行。由于max(size(xxxx) = size(xxxx)的数据占了绝大多数,致使咱们匹配回去没法有效的筛选出少许结果。咱们必须找到一个能有效区分各行数据的字段,这个字段的值必须很松散。最后我发现比较好的是userid。所以将 max(size(xxxx))替换成了 max(userid),任务很快就跑完了。由于不影响咱们讲述优化的原理,因此再也不描述这部分细节。

2)用分位函数替换排序。

在一个画像任务相关跑得很慢时,业务同窗求助于咱们,发现慢的代码以下:

问题点:上面的代码是想作一个全局排序,而后使用其序号所在位置来进行分类打标。上述代码在排序数据小于5亿5千万行的状况下勉强能运行出结果。但在某一天数据量到了5亿5千万行后就跑不出来,加了reducer的内存到10G也不行。

新思路:虽然可能还有一些参数能调整,但我认为这不是正确的方向,因而中止了研究,把方向转为干掉全局排序。在和一位前辈沟通的时候,忽然意识到,既然业务是想作一个分档,本质上就并不须要具体的排序号,因此理论上彻底的排序是能够省掉的。因而天然想到了分位数函数,立马想到了新方案。分位函数计算出数据必须大于或者等于某个值才能处于整个数据排序的某个位置。详情请你们自行搜索。

改以后代码以下:

注意上述代码有个小技巧,即与只有一行的子查询结果进行笛卡尔积关联,从而变相的实现了引入p2到p8等4个变量的效果,还算实用。

效果:对比了新旧算法的结果,差别极小,也在预期范围内。

再对比了任务执行时间,约有87%的降幅:

这个案例的本质在于识别出了费尽资源计算的全局序号是彻底没必要要的。相似的状况在咱们的业务代码里还存在不少,只是目前尚在业务可接受的范围内,存在很是大的优化空间。但愿将来能开展专项,以节省计算时间和资源。

3)经过广播关联完全避免排序。

SparkSQL目前处理关联(join)的方法主要有两种:

a) 广播关联。小表(经过参数spark.sql.au

toBroadcastJoinThreshold控制,目前咱们的默认值是20M)的话会采用广播关联,即将小表的所有数据传输到各节点的内存中,经过直接的内存操做快速完成关联。这种方式最大的好处是避免了对主表的数据进行shuffle,但会增长任务使用的内存量。另外特别说明3点:

•目前咱们的sparksql优化器尚不能很是准确地判断一个子查询结果(也被当成一张小表)是否适合进行广播,所以还在跟进解决中;

•左表不管大小都不能被广播;

•某些状况下会有相似:Kryo serialization failed: Buffer overflow 这样的OOM出现,并 “To avoid this, increase spark.kryose

rializer.buffer.max value”。但其实这样设置会无效。实质缘由是:虽然某张表小于32M,但因为高度压缩后,解压结果的行数达到了数千万,形成了节点的OOM。这个时候,只能手动禁掉广播关联。

b) Sort-Merge关联。即先将两表按链接字段进行排序,而后在些基础上进行匹配关联。因为数据是排序过的,只须要一次性的匹配便可完成最终的关联,速度较快。但这种方法的弊端是要进行对关联key的排序,而且每一个相同的Key和对应的数据必须分配到一个executor里,引起大量的shuffle操做;另外一方面若是一个executor须要处理一个巨量的key,一般会花费大量的时间以及大量的磁盘IO。

经过上述原理描述能够看出若是采用广播关联,引擎彻底不用作任何排序,天然也不会有排序带来的倾斜了,这是效率巨大的提高,固然代价就是会增长内存占用。通常来讲这种内存使用的增长被认为是划算的。

若是引擎没有识别出来,咱们能够经过主动指示的办法影响执行计划。好比如下:

要让执行计划改为广播s子查询结果,加hint mapjoin (也能够是 broadcast)就能够了。

从实际的结果看,广播关联的提速都有翻倍以上的效果。

3.3 写倾斜的避免

这部分简要描述一下。在动态分区场景下,咱们经常很难预料最后每一个分区将要输出的数据量会是多少,但分配的task数量对于每一个最终分区都是固定的。以国家分区条件为例,印尼这个分区若是是输出10亿行,而新加坡只输出100万行,这个时候若是咱们只分配2个任务去写数据,印尼这个分区单个任务会承受1亿行的任务,会很是慢。而若是设置为100个任务来写数据,对印尼这个分区来讲是比较合适的,但新加坡这个分区分产生100个小文件,对后续的文件操做和将来下游任务的读取都有消极的影响。最后通过实践后,找到一个比较好的办法。即找出倾斜的分区key,经过distribute by + case when表达式,让引擎对不一样的分区作不一样数量的数据分发。具体代码(以region为动态分区字段):

目前这种状况在海外任务上还须要应用,将来随着咱们推进AWS解决小文件自动合并问题,应该不用再操心了。

3.4 非法值过滤

这应该是网上讲得比较多的办法,我也简略说下。

在优化战略生态部门的任务dwd_ocloud_da

u_info_d任务的时候,咱们发现任务的运行时间一直在增加,一度达到7个小时,直到8月1号便再也跑不成功,老是OOM(内存不够),即便将executor的内存调高到10G依然解决不了问题。通过仔细诊断,发现任务慢在一个开窗函数阶段,代码以下:

在对guid这个key进行初步统计后,发现为空值的数量居然有数亿行,并不断增加:

这也就解释了运行时长不断增加,排序的内存开销和时长都不断增加。通过和业务同窗的沟通,确认空值无心义,进行排除:

而后在默认的参数下进行了重跑,30分钟内就跑完了。耗时降低约90%,效果明显。

这个例子里,倾斜值刚好是无效的能够直接过滤,比较幸运。那同窗们会问,若是倾斜值是有价值的怎么办?一般来讲是须要将这类倾斜值单独拎出来以另一套针对性的逻辑来计算,而后将结果union all回到其余非倾斜的数据计算结果里。

4 结语

数据倾斜处理的状况基本上局限在上述案例分类里,相信你们稍加学习都能掌握。将来咱们有计划开发诊断和优化的工具,重点帮你们找出倾斜的节点和提出代码级别的优化建议。敬请期待!


做者简介

Luckyfish  OPPO大数据服务质量负责人

主要负责大数据平台支持维护及服务质量保证工做,曾供职于京东科技,有较丰富的大数据任务开发和性能优化经验,同时对产品体验和成本优化有较多兴趣和经验。

推荐阅读
|QCon-OPPO数据平台Cloud Lake 降本增效实践
|OPPO大数据离线任务调度系统OFLOW
|OPPO数据湖统一存储技术实践
本文版权归OPPO公司全部,如需转载请在后台留言联系。

本文分享自微信公众号 - OPPO数智技术(OPPO_tech)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。