前言
Spark 是目前支流的大数据计算引擎,性能涵盖了大数据畛域的离线批处理、SQL 类解决、流式 / 实时计算、机器学习、图计算等各种不同类型的计算操作,利用范畴与前景十分宽泛。作为一种内存计算框架,Spark 运算速度快,并可能满足 UDF、大小表 Join、多路输入等多样化的数据计算和解决需要。
作为国内业余的数据智能服务商,个推从晚期的 1.3 版本便引入 Spark,并基于 Spark 建设数仓,进行大规模数据的离线和实时计算。因为 Spark 在 2.x 版本之前的优化重心在计算引擎方面,而在元数据管理方面并未做重大改良和降级。因而个推依然应用 Hive 进行元数据管理,采纳 Hive 元数据管理 + Spark 计算引擎的大数据架构,以撑持本身大数据业务倒退。个推还将 Spark 广泛应用到报表剖析、机器学习等场景中,为行业客户和政府部门提供实时人口洞察、群体画像构建等服务。
▲个推在理论业务场景中,别离应用 SparkSQL 和 HiveSQL 对一份 3T 数据进行了计算,上图展现了跑数速度。数据显示:在锁死队列(120G 内存,<50core)前提下,SparkSQL2.3 的计算速度是 Hive1.2 的 5 -10 倍。
对企业来讲,效率和老本始终是其进行海量数据处理和计算时所必须关注的问题。如何充分发挥 Spark 的劣势,在进行大数据作业时真正实现降本增效呢?个推将多年积攒的 Spark 性能调优妙招进行了总结,与大家分享。
Spark 性能调优 - 根底篇
家喻户晓,正确的参数配置对晋升 Spark 的应用效率具备极大助力。因而,针对 不理解底层原理的 Spark 使用者,咱们提供了能够间接抄作业的参数配置模板,帮忙相干数据开发、剖析人员更高效地应用 Spark 进行离线批处理和 SQL 报表剖析等作业。
举荐参数配置模板如下:
Spark-submit 提交形式脚本
/xxx/spark23/xxx/spark-submit --master yarn-cluster \
--name ${mainClassName} \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC \
--driver-memory 2g \
--conf spark.sql.shuffle.partitions=1000 \
--conf hive.metastore.schema.verification=false \
--conf spark.sql.catalogImplementation=hive \
--conf spark.sql.warehouse.dir=${warehouse} \
--conf spark.sql.hive.manageFilesourcePartitions=false \
--conf hive.metastore.try.direct.sql=true \
--conf spark.executor.memoryOverhead=512M \
--conf spark.yarn.executor.memoryOverhead=512 \
--executor-cores 2 \
--executor-memory 4g \
--num-executors 50 \
--class 启动类 \
${jarPath} \
-M ${mainClassName}
spark-sql 提交形式脚本
option=/xxx/spark23/xxx/spark-sql
export SPARK_MAJOR_VERSION=2
${option} --master yarn-client \
--driver-memory 1G \
--executor-memory 4G \
--executor-cores 2 \
--num-executors 50 \
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties" \
--conf spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER \
--conf spark.sql.auto.repartition=true \
--conf spark.sql.autoBroadcastJoinThreshold=104857600 \
--conf "spark.sql.hive.metastore.try.direct.sql=true" \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=200 \
--conf spark.dynamicAllocation.executorIdleTimeout=10m \
--conf spark.port.maxRetries=300 \
--conf spark.executor.memoryOverhead=512M \
--conf spark.yarn.executor.memoryOverhead=512 \
--conf spark.sql.shuffle.partitions=10000 \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=134217728 \
--conf spark.sql.parquet.compression.codec=gzip \
--conf spark.sql.orc.compression.codec=zlib \
--conf spark.ui.showConsoleProgress=true
-f pro.sql
pro.sql 为业务逻辑脚本
Spark 性能调优 - 进阶篇
针对有志愿理解 Spark 底层原理的读者,本文梳理了 standalone、Yarn-client、Yarn-cluster 等 3 种常见工作提交形式的交互图,以帮忙相干使用者更直观地了解 Spark 的核心技术原理、为浏览接下来的进阶篇内容打好根底。
standalone
1) spark-submit 提交,通过反射的形式结构出 1 个 DriverActor 过程;
2) Driver 过程执行编写的 application,结构 sparkConf,结构 sparkContext;
3) SparkContext 在初始化时,结构 DAGScheduler、TaskScheduler,jetty 启动 webui;
4) TaskScheduler 有 sparkdeployschedulebackend 过程,去和 Master 通信,申请注册 Application;
5) Master 承受通信后,注册 Application,应用资源调度算法,告诉 Worker,让 worker 启动 Executor;
6) worker 会为该 application 启动 executor,executor 启动后,会反向注册到 TaskScheduler;
7) 所有 Executor 反向注册到 TaskScheduler 后,Driver 完结 sparkContext 的初始化;
8) Driver 持续往下执行编写的 application,每执行到 1 个 action,就会创立 1 个 job;
9) job 会被提交给 DAGScheduler,DAGScheduler 会对 job 划分为多个 stage(stage 划分算法),每个 stage 创立 1 个 taskSet;
10) taskScheduler 会把 taskSet 里每 1 个 task 都提交到 executor 上执行(task 调配算法);
11) Executor 每承受到 1 个 task,都会用 taskRunner 来封装 task,之后从 executor 的线程池中取出 1 个线程,来执行这个 taskRunner。(task runner:把编写的代码 / 算子 / 函数拷贝,反序列化,而后执行 task)。
Yarn-client
1) 发送申请到 ResourceManager(RM),申请启动 ApplicationMaster(AM);
2) RM 调配 container 在某个 NodeManager(NM)上,启动 AM,理论是个 ExecutorLauncher;
3) AM 向 RM 申请 container;
4) RM 给 AM 调配 container;
5) AM 申请 NM 来启动相应的 Executor;
6) executor 启动后,反向注册到 Driver 过程;
7) 后序划分 stage,提交 taskset 和 standalone 模式相似。
Yarn-cluster
1) 发送申请到 ResourceManager(RM),申请启动 ApplicationMaster(AM);
2) RM 调配 container 在某个 NodeManager(NM)上,启动 AM;
3) AM 向 RM 申请 container;
4) RM 给 AM 调配 container;
5) AM 申请 NM 来启动相应的 Executor;
6) executor 启动后,反向注册到 AM;
7) 后序划分 stage,提交 taskset 和 standalone 模式相似。
了解了以上 3 种常见工作的底层交互后,接下来本文从存储格局、数据歪斜、参数配置等 3 个方面来开展,为大家分享个推进行 Spark 性能调优的进阶姿态。
存储格局(文件格式、压缩算法)
家喻户晓,不同的 SQL 引擎在不同的存储格局上,其优化形式也不同,比方 Hive 更偏向于 orc,Spark 则更偏向于 parquet。同时,在进行大数据作业时,点查、宽表查问、大表 join 操作绝对频繁,这就要求文件格式最好采纳列式存储,并且可宰割。因而咱们举荐以 parquet、orc 为主的列式存储文件格式和以 gzip、snappy、zlib 为主的压缩算法。在组合形式上,咱们倡议应用 parquet+gzip、orc+zlib 的组合形式,这样的组合形式兼顾了列式存储与可宰割的状况,相比 txt+gz 这种行式存储且不可分割的组合形式更可能适应以上大数据场景的需要。
个推以线上 500G 左右的数据为例,在不同的集群环境与 SQL 引擎下,对不同的存储文件格式和算法组合进行了性能测试。测试数据表明:雷同资源条件下,parquet+gz 存储格局较 text+gz 存储格局在多值查问、多表 join 上提速至多在 60% 以上。
联合测试后果,咱们对不同的集群环境与 SQL 引擎下所举荐应用的存储格局进行了梳理,如下表:
同时,咱们也对 parquet+gz、orc+zlib 的内存耗费进行了测试。以某表的单个历史分区数据为例,parquet+gz、orc+zlib 比 txt+gz 别离节俭 26% 和 49% 的存储空间。
残缺测试后果如下表:
可见,parquet+gz、orc+zlib 的确在降本提效方面效果显著。那么,如何应用这两种存储格局呢?步骤如下:
➤hive 与 spark 开启指定文件格式的压缩算法
spark:set spark.sql.parquet.compression.codec=gzip;
set spark.sql.orc.compression.codec=zlib;
hive:set hive.exec.compress.output=true;
set mapreduce.output.fileoutputformat.compress=true;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
➤建表时指定文件格式
parquet 文件格式(序列化, 输入输出类)
CREATE EXTERNAL TABLE `test`(rand_num double)
PARTITIONED BY (`day` int)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
;
orc 文件格式(序列化, 输入输出类)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
;
➤线上表调
ALTER TABLE db1.table1_std SET TBLPROPERTIES ('parquet.compression'='gzip');
ALTER TABLE db2.table2_std SET TBLPROPERTIES ('orc.compression'='ZLIB');
➤ctas 建表
create table tablename stored as parquet as select ……;
create table tablename stored as orc TBLPROPERTIES ('orc.compress'='ZLIB') as select ……;
数据歪斜
数据歪斜分为 map 歪斜和 reduce 歪斜两种状况。本文着重介绍 reduce 歪斜,如 SQL 中常见的 group by、join 等都可能是其重灾区。数据歪斜产生时,个别体现为:局部 task 显著慢于同批 task,task 数据量显著大于其余 task,局部 taskOOM、spark shuffle 文件失落等。如下图示例,在 duration 列和 shuffleReadSize/Records 列,咱们能显著发现局部 task 解决数据量显著升高,耗时变长,造成了数据歪斜:
如何解决数据歪斜?
咱们总结了 7 种数据歪斜解决方案,可能帮忙大家解决常见的数据歪斜问题:
解决方案一:应用 Hive ETL 预处理数据
即在数据血缘关系中,把歪斜问题前移解决,从而使上游应用方无需再思考数据歪斜问题。
⁕该计划实用于上游交互性强的业务,如秒级 / 分钟级别提数查问。
解决方案二:过滤多数导致歪斜的 key
即剔除歪斜的大 key,该计划个别联合百分位点应用,如 99.99% 的 id 记录数为 100 条以内,那么 100 条以外的 id 就可思考予以剔除。
⁕该计划在统计型场景下较为实用,而在明细场景下,须要看过滤的大 key 是否为业务所偏重和关注。
解决方案三:进步 shuffle 操作的并行度
即对 spark.sql.shuffle.partitions 参数进行动静调整,通过减少 shuffle write task 写出的 partition 数量,来达到 key 的平均调配。SparkSQL2.3 在默认状况下,该值为 200。开发人员能够在启动脚本减少如下参数,对该值进行动静调整:
conf spark.sql.shuffle.partitions=10000
conf spark.sql.adaptive.enabled=true
conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=134217728
⁕该计划非常简单,然而对于 key 的平均调配却能起到较好的优化作用。比方,本来 10 个 key,每个 50 条记录,只有 1 个 partition,那么后续的 task 须要解决 500 条记录。通过减少 partition 数量,能够使每个 task 都解决 50 条记录,10 个 task 并行跑数,耗时只须要原来 1 个 task 的 1 /10。然而该计划对于大 key 较难优化,比方,某个大 key 记录数有百万条,那么大 key 还是会被调配到 1 个 task 中去。
解决方案四:将 reducejoin 转为 mapjoin
指的是在 map 端 join,不走 shuffle 过程。以 Spark 为例,能够通过播送变量的模式,将小 RDD 的数据下发到各个 Worker 节点上(Yarn 模式下是 NM),在各个 Worker 节点上进行 join。
⁕该计划实用于小表 join 大表场景(百 G 以上的数据体量)。此处的小表默认阈值为 10M,低于此阈值的小表,可散发到 worker 节点。具体可调整的下限须要小于 container 调配的内存。
解决方案五:采样歪斜 key 并分拆 join 操作
如下图示例:A 表 join B 表,A 表有大 key、B 表无大 key,其中大 key 的 id 为 1,有 3 条记录。
如何进行分拆 join 操作呢?
首先将 A 表、B 表中 id1 独自拆分进去,剔除大 key 的 A ’ 和 B’ 先 join,达到非歪斜的速度;
针对 A 表大 key 增加随机前缀,B 表扩容 N 倍,独自 join;join 后剔除随机前缀即可;
再对以上 2 局部 union。
⁕该计划的实质还是缩小单个 task 解决过多数据时所引发的数据歪斜危险,实用于大 key 较少的状况。
解决方案六:应用随机前缀和扩容 RDD 进行 join
比方,A 表 join B 表,以 A 表有大 key、B 表无大 key 为例:
对 A 表每条记录打上[1,n] 的随机前缀,B 表扩容 N 倍,join。
join 实现后剔除随机前缀。
⁕该计划实用于大 key 较多的状况,但也会减少资源耗费。
解决方案七:combiner
即在 map 端做 combiner 操作,缩小 shuffle 拉取的数据量。
⁕该计划适宜累加求和等场景。
在理论场景中,倡议相干开发人员具体情况具体分析,针对简单问题也可将以上办法进行组合应用。
Spark 参数配置
针对无数据歪斜的状况,咱们梳理总结了参数配置参照表帮忙大家进行 Spark 性能调优,这些参数的设置实用于 2T 左右数据的洞察与利用,根本满足大多数场景下的调优需要。
总结
目前,Spark 曾经倒退到了 Spark3.x,最新版本为 Spark 3.1.2 released (Jun 01, 2021)。Spark3.x 的许多新个性,如动静分区修剪、Pandas API 的重大改良、加强嵌套列的裁剪和下推等亮点性能,为进一步实现降本增效提供了好思路。将来,个推也将持续放弃对 Spark 演进的关注,并继续开展实际和分享。