前言
Spark是目前支流的大数据计算引擎,性能涵盖了大数据畛域的离线批处理、SQL类解决、流式/实时计算、机器学习、图计算等各种不同类型的计算操作,利用范畴与前景十分宽泛。作为一种内存计算框架,Spark运算速度快,并可能满足UDF、大小表Join、多路输入等多样化的数据计算和解决需要。
作为国内业余的数据智能服务商,个推从晚期的1.3版本便引入Spark,并基于Spark建设数仓,进行大规模数据的离线和实时计算。因为Spark 在2.x版本之前的优化重心在计算引擎方面,而在元数据管理方面并未做重大改良和降级。因而个推依然应用Hive进行元数据管理,采纳Hive元数据管理+ Spark计算引擎的大数据架构,以撑持本身大数据业务倒退。个推还将Spark广泛应用到报表剖析、机器学习等场景中,为行业客户和政府部门提供实时人口洞察、群体画像构建等服务。
▲个推在理论业务场景中,别离应用SparkSQL 和 HiveSQL对一份3T数据进行了计算,上图展现了跑数速度。数据显示:在锁死队列(120G内存,<50core)前提下, SparkSQL2.3 的计算速度是Hive1.2 的5-10倍。
对企业来讲,效率和老本始终是其进行海量数据处理和计算时所必须关注的问题。如何充分发挥Spark的劣势,在进行大数据作业时真正实现降本增效呢?个推将多年积攒的Spark性能调优妙招进行了总结,与大家分享。
Spark性能调优-根底篇
家喻户晓,正确的参数配置对晋升Spark的应用效率具备极大助力。因而,针对 不理解底层原理的Spark使用者,咱们提供了能够间接抄作业的参数配置模板,帮忙相干数据开发、剖析人员更高效地应用Spark进行离线批处理和SQL报表剖析等作业。
举荐参数配置模板如下:
Spark-submit 提交形式脚本
/xxx/spark23/xxx/spark-submit --master yarn-cluster \--name ${mainClassName} \--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \--conf spark.yarn.maxAppAttempts=2 \--conf spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC \--driver-memory 2g \--conf spark.sql.shuffle.partitions=1000 \--conf hive.metastore.schema.verification=false \--conf spark.sql.catalogImplementation=hive \--conf spark.sql.warehouse.dir=${warehouse} \--conf spark.sql.hive.manageFilesourcePartitions=false \--conf hive.metastore.try.direct.sql=true \--conf spark.executor.memoryOverhead=512M \--conf spark.yarn.executor.memoryOverhead=512 \--executor-cores 2 \--executor-memory 4g \--num-executors 50 \--class 启动类 \${jarPath} \-M ${mainClassName}
spark-sql 提交形式脚本
option=/xxx/spark23/xxx/spark-sqlexport SPARK_MAJOR_VERSION=2${option} --master yarn-client \--driver-memory 1G \--executor-memory 4G \--executor-cores 2 \--num-executors 50 \--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties" \--conf spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER \--conf spark.sql.auto.repartition=true \--conf spark.sql.autoBroadcastJoinThreshold=104857600 \--conf "spark.sql.hive.metastore.try.direct.sql=true" \--conf spark.dynamicAllocation.enabled=true \--conf spark.dynamicAllocation.minExecutors=1 \--conf spark.dynamicAllocation.maxExecutors=200 \--conf spark.dynamicAllocation.executorIdleTimeout=10m \--conf spark.port.maxRetries=300 \--conf spark.executor.memoryOverhead=512M \--conf spark.yarn.executor.memoryOverhead=512 \--conf spark.sql.shuffle.partitions=10000 \--conf spark.sql.adaptive.enabled=true \--conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=134217728 \--conf spark.sql.parquet.compression.codec=gzip \--conf spark.sql.orc.compression.codec=zlib \--conf spark.ui.showConsoleProgress=true-f pro.sqlpro.sql 为业务逻辑脚本
Spark性能调优-进阶篇
针对有志愿理解Spark底层原理的读者,本文梳理了standalone、Yarn-client、Yarn-cluster等3种常见工作提交形式的交互图,以帮忙相干使用者更直观地了解Spark的核心技术原理、为浏览接下来的进阶篇内容打好根底。
standalone
1) spark-submit 提交,通过反射的形式结构出1个DriverActor 过程;
2) Driver过程执行编写的application,结构sparkConf,结构sparkContext;
3) SparkContext在初始化时,结构DAGScheduler、TaskScheduler,jetty启动webui;
4) TaskScheduler 有sparkdeployschedulebackend 过程,去和Master通信,申请注册Application;
5) Master 承受通信后,注册Application,应用资源调度算法,告诉Worker,让worker启动Executor;
6) worker会为该application 启动executor,executor 启动后,会反向注册到TaskScheduler;
7) 所有Executor 反向注册到TaskScheduler 后,Driver 完结sparkContext 的初始化;
8) Driver持续往下执行编写的application,每执行到1个action,就会创立1个job;
9) job 会被提交给DAGScheduler,DAGScheduler 会对job 划分为多个stage(stage划分算法),每个stage创立1个taskSet;
10) taskScheduler会把taskSet里每1个task 都提交到executor 上执行(task 调配算法);
11) Executor 每承受到1个task,都会用taskRunner来封装task,之后从executor 的线程池中取出1个线程,来执行这个taskRunner。(task runner:把编写的代码/算子/函数拷贝,反序列化,而后执行task)。
Yarn-client
1) 发送申请到ResourceManager(RM),申请启动ApplicationMaster(AM);
2) RM 调配container 在某个NodeManager(NM)上,启动AM,理论是个ExecutorLauncher;
3) AM向RM 申请container;
4) RM给AM 调配container;
5) AM 申请NM 来启动相应的Executor;
6) executor 启动后,反向注册到Driver过程;
7) 后序划分stage,提交taskset 和standalone 模式相似。
Yarn-cluster
1) 发送申请到ResourceManager(RM),申请启动ApplicationMaster(AM);
2) RM 调配container 在某个NodeManager(NM)上,启动AM;
3) AM向RM 申请container;
4) RM给AM 调配container;
5) AM 申请NM 来启动相应的Executor;
6) executor 启动后,反向注册到AM;
7) 后序划分stage,提交taskset 和standalone 模式相似。
了解了以上3种常见工作的底层交互后,接下来本文从存储格局、数据歪斜、参数配置等3个方面来开展,为大家分享个推进行Spark性能调优的进阶姿态。
存储格局(文件格式、压缩算法)
家喻户晓,不同的SQL引擎在不同的存储格局上,其优化形式也不同,比方Hive更偏向于orc,Spark则更偏向于parquet。同时,在进行大数据作业时,点查、宽表查问、大表join操作绝对频繁,这就要求文件格式最好采纳列式存储,并且可宰割。因而咱们举荐以parquet、orc 为主的列式存储文件格式和以gzip、snappy、zlib 为主的压缩算法。在组合形式上,咱们倡议应用parquet+gzip、orc+zlib的组合形式,这样的组合形式兼顾了列式存储与可宰割的状况,相比txt+gz 这种行式存储且不可分割的组合形式更可能适应以上大数据场景的需要。
个推以线上500G左右的数据为例,在不同的集群环境与SQL引擎下,对不同的存储文件格式和算法组合进行了性能测试。测试数据表明:雷同资源条件下,parquet+gz 存储格局较text+gz存储格局在多值查问、多表join上提速至多在60%以上。
联合测试后果,咱们对不同的集群环境与SQL引擎下所举荐应用的存储格局进行了梳理,如下表:
同时,咱们也对parquet+gz、orc+zlib的内存耗费进行了测试。以某表的单个历史分区数据为例,parquet+gz、orc+zlib比txt+gz 别离节俭26%和49%的存储空间。
残缺测试后果如下表:
可见,parquet+gz、orc+zlib的确在降本提效方面效果显著。那么,如何应用这两种存储格局呢?步骤如下:
➤hive 与 spark 开启指定文件格式的压缩算法
spark:set spark.sql.parquet.compression.codec=gzip;set spark.sql.orc.compression.codec=zlib;hive:set hive.exec.compress.output=true;set mapreduce.output.fileoutputformat.compress=true;set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
➤建表时指定文件格式
parquet 文件格式(序列化,输入输出类)CREATE EXTERNAL TABLE `test`(rand_num double)PARTITIONED BY (`day` int)ROW FORMAT SERDE'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat';orc 文件格式(序列化,输入输出类) ROW FORMAT SERDE'org.apache.hadoop.hive.ql.io.orc.OrcSerde'STORED AS INPUTFORMAT'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat';
➤线上表调
ALTER TABLE db1.table1_std SET TBLPROPERTIES ('parquet.compression'='gzip');ALTER TABLE db2.table2_std SET TBLPROPERTIES ('orc.compression'='ZLIB');
➤ctas 建表
create table tablename stored as parquet as select ……;create table tablename stored as orc TBLPROPERTIES ('orc.compress'='ZLIB') as select ……;
数据歪斜
数据歪斜分为map歪斜和reduce歪斜两种状况。本文着重介绍reduce 歪斜,如SQL 中常见的group by、join 等都可能是其重灾区。数据歪斜产生时,个别体现为:局部task 显著慢于同批task,task 数据量显著大于其余task,局部taskOOM、spark shuffle 文件失落等。如下图示例,在duration 列和shuffleReadSize/Records列,咱们能显著发现局部task 解决数据量显著升高,耗时变长,造成了数据歪斜:
如何解决数据歪斜?
咱们总结了7种数据歪斜解决方案,可能帮忙大家解决常见的数据歪斜问题:
解决方案一:应用 Hive ETL预处理数据
即在数据血缘关系中,把歪斜问题前移解决,从而使上游应用方无需再思考数据歪斜问题。
⁕该计划实用于上游交互性强的业务,如秒级/分钟级别提数查问。
解决方案二:过滤多数导致歪斜的key
即剔除歪斜的大key,该计划个别联合百分位点应用,如99.99%的id 记录数为100条以内,那么100条以外的id 就可思考予以剔除。
⁕该计划在统计型场景下较为实用,而在明细场景下,须要看过滤的大key 是否为业务所偏重和关注。
解决方案三:进步shuffle操作的并行度
即对spark.sql.shuffle.partitions参数进行动静调整,通过减少shuffle write task写出的partition数量,来达到key的平均调配。SparkSQL2.3 在默认状况下,该值为200。开发人员能够在启动脚本减少如下参数,对该值进行动静调整:
conf spark.sql.shuffle.partitions=10000conf spark.sql.adaptive.enabled=true conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=134217728
⁕该计划非常简单,然而对于key的平均调配却能起到较好的优化作用。比方,本来10个key,每个50条记录,只有1个partition,那么后续的task须要解决500条记录。通过减少partition 数量,能够使每个task 都解决50条记录,10个task 并行跑数,耗时只须要原来1个task 的1/10。然而该计划对于大key较难优化,比方,某个大key记录数有百万条,那么大key 还是会被调配到1个task 中去。
解决方案四:将reducejoin转为mapjoin
指的是在map端join,不走shuffle过程。以Spark为例,能够通过播送变量的模式,将小RDD的数据下发到各个Worker节点上(Yarn 模式下是NM),在各个Worker节点上进行join。
⁕该计划实用于小表join大表场景(百G以上的数据体量)。此处的小表默认阈值为10M,低于此阈值的小表,可散发到worker节点。具体可调整的下限须要小于container调配的内存。
解决方案五:采样歪斜key并分拆join操作
如下图示例:A表 join B表,A表有大key、B表无大key,其中大key的id为1,有3条记录。
如何进行分拆join操作呢?
首先将A表、B表中id1独自拆分进去,剔除大key的A' 和 B' 先join,达到非歪斜的速度;
针对A表大key增加随机前缀,B表扩容N倍,独自join;join后剔除随机前缀即可;
再对以上2局部union。
⁕该计划的实质还是缩小单个task 解决过多数据时所引发的数据歪斜危险,实用于大key较少的状况。
解决方案六:应用随机前缀和扩容RDD进行join
比方,A 表 join B表,以A表有大key、B表无大key为例:
对A表每条记录打上[1,n] 的随机前缀,B表扩容N倍,join。
join实现后剔除随机前缀。
⁕该计划实用于大key较多的状况,但也会减少资源耗费。
解决方案七:combiner
即在map端做combiner操作,缩小shuffle 拉取的数据量。
⁕该计划适宜累加求和等场景。
在理论场景中,倡议相干开发人员具体情况具体分析,针对简单问题也可将以上办法进行组合应用。
Spark 参数配置
针对无数据歪斜的状况,咱们梳理总结了参数配置参照表帮忙大家进行Spark性能调优,这些参数的设置实用于2T左右数据的洞察与利用,根本满足大多数场景下的调优需要。
总结
目前,Spark曾经倒退到了Spark3.x,最新版本为Spark 3.1.2 released (Jun 01, 2021)。Spark3.x的许多新个性,如动静分区修剪、Pandas API的重大改良、加强嵌套列的裁剪和下推等亮点性能,为进一步实现降本增效提供了好思路。将来,个推也将持续放弃对Spark演进的关注,并继续开展实际和分享。