关于大数据:极光笔记丨Spark-SQL-在极光的建设实践


极光高级工程师——蔡祖光

前言

Spark在2018开始在极光大数据平台部署应用,历经多个版本的迭代,逐渐成为离线计算的外围引擎。以后在极光大数据平台每天运行的Spark工作有20000+,执行的Spark SQL均匀每天42000条,本文次要介绍极光数据平台在应用Spark SQL的过程中总结的局部实践经验,包含以下方面内容:

Spark Extension的利用实际
Spark Bucket Table的革新优化
从Hive迁徙到Spark SQL的实际计划

一、Spark Extension利用实际

Spark Extension作为Spark Catalyst扩大点在SPARK-18127中被引入,Spark用户能够在SQL解决的各个阶段扩大自定义实现,十分弱小高效

1.1 血缘关系解析

在极光咱们有自建的元数据管理平台,相干元数据由各数据组件进行信息收集,其中对Spark SQL的血缘关系解析和收集就是通过自定义的Spark Extension实现的。

Spark Catalyst的SQL解决分成parser,analyzer,optimizer以及planner等多个步骤,其中analyzer,optimizer等步骤外部也分为多个阶段,为了获取最无效的血缘关系信息,咱们抉择最终的planner阶段作为切入点,为此咱们专门实现了一个planner strategy进行Spark SQL物理执行打算的解析,并提取出读写表等元数据信息并存储到元数据管理平台

1.2 权限校验

在数据安全方面,极光抉择用Ranger作为权限治理等组件,但在理论应用的过程中咱们发现目前社区版本的Ranger次要提供的还是HDFS、HBase、Hive、Yarn的相干接入插件,在Spark方面须要本人去实现相干性能,对于以上问题咱们同样抉择用Spark Extension去帮忙咱们进行权限方面的二次开发,在实现的过程中咱们借助了Ranger Hive-Plugin的实现原理,对Spark SQL拜访Hive进行了权限校验性能的实现。

1.3 参数管制

随着数据平台应用Spark SQL的业务同学越来越多,咱们发现每个业务同学对于Spark的相熟水平都有所不同,对Spark配置参数的了解也有好有坏,为了保障集群整体运行的稳定性,咱们对业务同学提交的Spark工作的进行了拦挡解决,提取工作设置的配置参数,对其中配置不合理的参数进行屏蔽,并给出危险提醒,无效的疏导业务同学进行正当的线上操作。

二、Spark Bucket Table的革新优化

在Spark的实际过程中,咱们也积极关注业内其它公司优良计划,在2020年咱们参考字节跳动对于Spark Bucket Table的优化思路,在此基础上咱们对极光应用的Spark进行了二次革新,实现如下优化项:

Spark Bucket Table和Hive Bucket Table的相互兼容
Spark反对Bucket Num是整数倍的Bucket Join
Spark反对Join字段和Bucket字段是蕴含关系的Bucket Join

上述三点的优化,丰盛了Bucket Join的应用场景,能够让更多Join、Aggregate操作防止产生Shuffle,无效的进步了Spark SQL的运行效率.在实现相干优化当前,如何更好的进行业务革新推广,成为了咱们关怀的问题。

通过对数据平台过往SQL执行记录的剖析,咱们发现用户ID和设施ID的关联查问是非常高频的一项操作,在此基础上,咱们通过之前SQL血缘关系解析收集到的元数据信息,对每张表进行Join、Aggregate操作的高频字段进行了剖析整顿,统计出最为适合的Bucket Cloumn,并在这些元数据的撑持下辅助咱们进行Bucket Table的推广革新。

三、Hive迁徙Spark

随着公司业务的高速倒退,在数据平台上提交的SQL工作继续一直增长,对工作的执行工夫和计算资源的耗费都提出了新的挑战,出于上述起因,咱们提出了Hive工作迁徙到Spark SQL的工作指标,由此咱们总结出了如下问题需要:

如何更好的定位哪些Hive工作能够迁徙,哪些不能够
如何让业务部门无感知的从Hive迁徙到Spark SQL
如何进行比照剖析,确认工作迁徙前后的运行成果

3.1 Hive迁徙分析程序的实现

在迁徙业务job时,咱们须要晓得这个部门有哪些人,因为Azkaban在执行具体job时会有执行人信息,所以咱们能够依据执行人来揣测有哪些job。分析程序应用了元数据系统的某些表数据和azkaban相干的一些库表信息,用来帮忙咱们收集迁徙的部门下有多少hive job,以及该hive job有多少sql,sql语法通过率是多少,当然在迁徙时还须要查看Azkaban的具体执行耗时等信息,用于帮忙咱们在精细化调参的时候大抵判断耗费的资源是多少。

因为线上间接检测某条sql是否合乎spark语义须要具备相干的读写权限,间接凋谢权限给分析程序不平安。所以实现的思路是通过应用元数据系统存储的库表构造信息,以及azkaban上有采集业务job执行的sql信息。只有领有某条sql所须要的全副库表信息,咱们就能在本地通过重建库表构造剖析该条sql是否合乎spark语义(当然线上环境和本地是有不同的,比方函数问题,但大多状况下是没有问题的)。

图3-1-1

以下为某数据部通过分析程序失去的SQL通过率

3.2 SQL执行引擎的无感知切换

目前业务方应用Hive的次要形式是通过beeline去连贯hiveserver2,因为livy也提供了thriftserver模块,所以beeline也能够间接连贯livy。迁徙的策略就是先把合乎Spark语法的SQL发往livy执行,如果执行失败再切换到Hive进行兜底执行。

beeline可获取用户SQL,启动beeline时通过thrift接口创立livy session,获取用户sql发送给livy 执行,期间执行进度等信息能够查问livy取得,同时一个job对应一个session,以及每启动一次 beeline对应一个session,当job执行结束或者beeline被敞开时,敞开livy session。(如果spark不能胜利执行则走之前hive的逻辑)


图3-2-1

有了以上切换思路当前,咱们开始着手beeline程序的批改设计

beeline重要类图如图3-2-2所示, Beeline类是启动类,获取用户命令行输出并调用Commands类去 执行,Commands负责调用JDBC接口去执行和获取后果, 单向调用流程如图3-2-3所示。


图3-2-2

图3-2-3

由图3-2-2和图3-2-3可知,所有的操作都是通过DatabaseConnection这个对象去实现的,持有这个 对象的是DatabaseConnections这个对象,所以多计算引擎切换,通过策略适配

DatabaseConnections对象,这样就能在不批改其余代码的状况下切换执行引擎(即获取不同的 connection)


图3-2-4

3.3 工作迁徙黑名单

前文有说到,当一个Hive工作用SQL分析程序走通,并且在迁徙程序用livy进行Spark工作提交当前,还是会有可能执行失败,这个时候咱们会用Hive进行兜底执行保障工作稳定性。然而失败的SQL会有多种起因,有的SQL的确用Hive执行稳定性更好,如果每次都先用Spark SQL执行失败当前再用Hive执行会影响工作效率,基于以上目标,咱们对迁徙程序开发了黑名单性能,用来保障每个SQL能够找到它真正适宜的执行引擎,思考到beeline是轻量级客户端,辨认的性能应该放在livy-server侧来做,开发一个相似HBO的性能来将这样的异样SQL退出黑名单,节俭迁徙工作执行工夫。

指标: 基于HBE(History-Based Executing)的异样SQL辨认

有了上述指标当前咱们次要通过如下形式进行了SQL黑名单的辨认切换

SQL辨认限定在雷同appName中(放大辨认范畴防止辨认谬误)
失去SQL形象语法树的后续遍历内容后生成md5值作为该sql的唯一性标识
把执行失败超过N次的SQL信息写入黑名单
下次执行时依据赋值规定比拟两条SQL的构造树特色
对于在黑名单中的SQL不进行Spark SQL切换

3.4 迁徙成绩

往年通过迁徙程序的迁徙革新,HSQL最大降幅为50%+(后随往年业务增长有所回升)


四、Spark3.0的利用

以后极光应用的Spark默认版本曾经从2.X版本升级到了3.X版本,Spark3.X的AQE个性也辅助咱们更好的应用Spark

实际配置优化:

spark3.0.0参数

动静合并shuffle partitions

spark.sql.adaptive.coalescePartitions.enabled true

spark.sql.adaptive.coalescePartitions.minPartitionNum 1

spark.sql.adaptive.coalescePartitions.initialPartitionNum 500

spark.sql.adaptive.advisoryPartitionSizeInBytes 128MB

动静优化数据歪斜,通过理论的数据个性思考,skewedPartitionFactor咱们设置成了1

spark.sql.adaptive.skewJoin.enabled true

spark.sql.adaptive.skewJoin.skewedPartitionFactor 1

spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 512MB

五、后续布局

目前针对线上运行的Spark工作,咱们正在开发一套Spark全链路监控平台,作为咱们大数据运维平台的一部分,该平台会承当对线上Spark工作运行状态的采集监控工作,咱们心愿能够通过该平台及时定位发现资源应用节约、写入大量小文件、存在slow task等问题的Spark工作,并以此进行有针对性的优化,让数据平台能够更高效的运行。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理