乐趣区

关于大数据:极光笔记丨Spark-SQL-在极光的建设实践


极光高级工程师——蔡祖光

前言

Spark 在 2018 开始在极光大数据平台部署应用, 历经多个版本的迭代, 逐渐成为离线计算的外围引擎。以后在极光大数据平台每天运行的 Spark 工作有 20000+, 执行的 Spark SQL 均匀每天 42000 条, 本文次要介绍极光数据平台在应用 Spark SQL 的过程中总结的局部实践经验, 包含以下方面内容:

Spark Extension 的利用实际
Spark Bucket Table 的革新优化
从 Hive 迁徙到 Spark SQL 的实际计划

一、Spark Extension 利用实际

Spark Extension 作为 Spark Catalyst 扩大点在 SPARK-18127 中被引入,Spark 用户能够在 SQL 解决的各个阶段扩大自定义实现, 十分弱小高效

1.1 血缘关系解析

在极光咱们有自建的元数据管理平台, 相干元数据由各数据组件进行信息收集, 其中对 Spark SQL 的血缘关系解析和收集就是通过自定义的 Spark Extension 实现的。

Spark Catalyst 的 SQL 解决分成 parser,analyzer,optimizer 以及 planner 等多个步骤,其中 analyzer,optimizer 等步骤外部也分为多个阶段, 为了获取最无效的血缘关系信息, 咱们抉择最终的 planner 阶段作为切入点, 为此咱们专门实现了一个 planner strategy 进行 Spark SQL 物理执行打算的解析, 并提取出读写表等元数据信息并存储到元数据管理平台

1.2 权限校验

在数据安全方面, 极光抉择用 Ranger 作为权限治理等组件, 但在理论应用的过程中咱们发现目前社区版本的 Ranger 次要提供的还是 HDFS、HBase、Hive、Yarn 的相干接入插件, 在 Spark 方面须要本人去实现相干性能, 对于以上问题咱们同样抉择用 Spark Extension 去帮忙咱们进行权限方面的二次开发, 在实现的过程中咱们借助了 Ranger Hive-Plugin 的实现原理, 对 Spark SQL 拜访 Hive 进行了权限校验性能的实现。

1.3 参数管制

随着数据平台应用 Spark SQL 的业务同学越来越多, 咱们发现每个业务同学对于 Spark 的相熟水平都有所不同, 对 Spark 配置参数的了解也有好有坏, 为了保障集群整体运行的稳定性, 咱们对业务同学提交的 Spark 工作的进行了拦挡解决, 提取工作设置的配置参数, 对其中配置不合理的参数进行屏蔽, 并给出危险提醒, 无效的疏导业务同学进行正当的线上操作。

二、Spark Bucket Table 的革新优化

在 Spark 的实际过程中, 咱们也积极关注业内其它公司优良计划, 在 2020 年咱们参考字节跳动对于 Spark Bucket Table 的优化思路, 在此基础上咱们对极光应用的 Spark 进行了二次革新, 实现如下优化项:

Spark Bucket Table 和 Hive Bucket Table 的相互兼容
Spark 反对 Bucket Num 是整数倍的 Bucket Join
Spark 反对 Join 字段和 Bucket 字段是蕴含关系的 Bucket Join

上述三点的优化, 丰盛了 Bucket Join 的应用场景, 能够让更多 Join、Aggregate 操作防止产生 Shuffle, 无效的进步了 Spark SQL 的运行效率. 在实现相干优化当前, 如何更好的进行业务革新推广, 成为了咱们关怀的问题。

通过对数据平台过往 SQL 执行记录的剖析, 咱们发现用户 ID 和设施 ID 的关联查问是非常高频的一项操作, 在此基础上, 咱们通过之前 SQL 血缘关系解析收集到的元数据信息, 对每张表进行 Join、Aggregate 操作的高频字段进行了剖析整顿, 统计出最为适合的 Bucket Cloumn, 并在这些元数据的撑持下辅助咱们进行 Bucket Table 的推广革新。

三、Hive 迁徙 Spark

随着公司业务的高速倒退, 在数据平台上提交的 SQL 工作继续一直增长, 对工作的执行工夫和计算资源的耗费都提出了新的挑战, 出于上述起因, 咱们提出了 Hive 工作迁徙到 Spark SQL 的工作指标, 由此咱们总结出了如下问题需要:

如何更好的定位哪些 Hive 工作能够迁徙, 哪些不能够
如何让业务部门无感知的从 Hive 迁徙到 Spark SQL
如何进行比照剖析, 确认工作迁徙前后的运行成果

3.1 Hive 迁徙分析程序的实现

在迁徙业务 job 时,咱们须要晓得这个部门有哪些人,因为 Azkaban 在执行具体 job 时会有执行人信息,所以咱们能够依据执行人来揣测有哪些 job。分析程序应用了元数据系统的某些表数据和 azkaban 相干的一些库表信息,用来帮忙咱们收集迁徙的部门下有多少 hive job,以及该 hive job 有多少 sql,sql 语法通过率是多少,当然在迁徙时还须要查看 Azkaban 的具体执行耗时等信息,用于帮忙咱们在精细化调参的时候大抵判断耗费的资源是多少。

因为线上间接检测某条 sql 是否合乎 spark 语义须要具备相干的读写权限,间接凋谢权限给分析程序不平安。所以实现的思路是通过应用元数据系统存储的库表构造信息,以及 azkaban 上有采集业务 job 执行的 sql 信息。只有领有某条 sql 所须要的全副库表信息,咱们就能在本地通过重建库表构造剖析该条 sql 是否合乎 spark 语义(当然线上环境和本地是有不同的,比方函数问题,但大多状况下是没有问题的)。

图 3 -1-1

以下为某数据部通过分析程序失去的 SQL 通过率

3.2 SQL 执行引擎的无感知切换

目前业务方应用 Hive 的次要形式是通过 beeline 去连贯 hiveserver2,因为 livy 也提供了 thriftserver 模块,所以 beeline 也能够间接连贯 livy。迁徙的策略就是先把合乎 Spark 语法的 SQL 发往 livy 执行,如果执行失败再切换到 Hive 进行兜底执行。

beeline 可获取用户 SQL,启动 beeline 时通过 thrift 接口创立 livy session,获取用户 sql 发送给 livy 执行,期间执行进度等信息能够查问 livy 取得,同时一个 job 对应一个 session,以及每启动一次 beeline 对应一个 session,当 job 执行结束或者 beeline 被敞开时,敞开 livy session。(如果 spark 不能胜利执行则走之前 hive 的逻辑)


图 3 -2-1

有了以上切换思路当前, 咱们开始着手 beeline 程序的批改设计

beeline 重要类图如图 3 -2- 2 所示, Beeline 类是启动类,获取用户命令行输出并调用 Commands 类去 执行,Commands 负责调用 JDBC 接口去执行和获取后果, 单向调用流程如图 3 -2- 3 所示。


图 3 -2-2

图 3 -2-3

由图 3 -2- 2 和图 3 -2- 3 可知,所有的操作都是通过 DatabaseConnection 这个对象去实现的,持有这个 对象的是 DatabaseConnections 这个对象,所以多计算引擎切换,通过策略适配

DatabaseConnections 对象,这样就能在不批改其余代码的状况下切换执行引擎(即获取不同的 connection)


图 3 -2-4

3.3 工作迁徙黑名单

前文有说到, 当一个 Hive 工作用 SQL 分析程序走通, 并且在迁徙程序用 livy 进行 Spark 工作提交当前, 还是会有可能执行失败, 这个时候咱们会用 Hive 进行兜底执行保障工作稳定性。然而失败的 SQL 会有多种起因, 有的 SQL 的确用 Hive 执行稳定性更好, 如果每次都先用 Spark SQL 执行失败当前再用 Hive 执行会影响工作效率, 基于以上目标, 咱们对迁徙程序开发了黑名单性能, 用来保障每个 SQL 能够找到它真正适宜的执行引擎, 思考到 beeline 是轻量级客户端,辨认的性能应该放在 livy-server 侧来做,开发一个相似 HBO 的性能来将这样的异样 SQL 退出黑名单,节俭迁徙工作执行工夫。

指标: 基于 HBE(History-Based Executing)的异样 SQL 辨认

有了上述指标当前咱们次要通过如下形式进行了 SQL 黑名单的辨认切换

SQL 辨认限定在雷同 appName 中 (放大辨认范畴防止辨认谬误)
失去 SQL 形象语法树的后续遍历内容后生成 md5 值作为该 sql 的唯一性标识
把执行失败超过 N 次的 SQL 信息写入黑名单
下次执行时依据赋值规定比拟两条 SQL 的构造树特色
对于在黑名单中的 SQL 不进行 Spark SQL 切换

3.4 迁徙成绩

往年通过迁徙程序的迁徙革新,HSQL 最大降幅为 50%+(后随往年业务增长有所回升)


四、Spark3.0 的利用

以后极光应用的 Spark 默认版本曾经从 2.X 版本升级到了 3.X 版本,Spark3.X 的 AQE 个性也辅助咱们更好的应用 Spark

实际配置优化:

spark3.0.0 参数

动静合并 shuffle partitions

spark.sql.adaptive.coalescePartitions.enabled true

spark.sql.adaptive.coalescePartitions.minPartitionNum 1

spark.sql.adaptive.coalescePartitions.initialPartitionNum 500

spark.sql.adaptive.advisoryPartitionSizeInBytes 128MB

动静优化数据歪斜, 通过理论的数据个性思考,skewedPartitionFactor 咱们设置成了 1

spark.sql.adaptive.skewJoin.enabled true

spark.sql.adaptive.skewJoin.skewedPartitionFactor 1

spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 512MB

五、后续布局

目前针对线上运行的 Spark 工作, 咱们正在开发一套 Spark 全链路监控平台, 作为咱们大数据运维平台的一部分, 该平台会承当对线上 Spark 工作运行状态的采集监控工作, 咱们心愿能够通过该平台及时定位发现资源应用节约、写入大量小文件、存在 slow task 等问题的 Spark 工作, 并以此进行有针对性的优化, 让数据平台能够更高效的运行。

退出移动版