关于spark:Apache-Kyuubi-在B站大数据场景下的应用实践

0次阅读

共计 8710 个字符,预计需要花费 22 分钟才能阅读完成。

01 背景介绍

近几年随着 B 站业务高速倒退,数据量一直减少,离线计算集群规模从最后的两百台倒退到目前近万台,从单机房倒退到多机房架构。在离线计算引擎上目前咱们次要应用 Spark、Presto、Hive。架构图如下所示,咱们的 BI、ADHOC 以及 DQC 服务都是通过自研的 Dispatcher 路由服务来实现对立 SQL 调度,Dispatcher 会联合查问 SQL 语法特色、读 HDFS 量以及以后引擎的负载状况,动静地抉择以后最佳计算引擎执行工作。如果用户 SQL 失败了会做引擎主动降级,升高用户应用门槛;其中对于 Spark 查问晚期咱们都是走 STS,然而 STS 自身有很多性能和可用性上的问题,因而咱们引入了 Kyuubi,通过 Kyuubi 提供的多租户、多引擎代理以及齐全兼容 Hive Thrift 协定能力,实现各个部门 Adhoc 工作的资源隔离和权限验证。

Query 查问状况

目前在 Adhoc 查问场景下,SparkSQL 占比靠近一半,依赖 Kyuubi 对于 Scala 语法的反对,目前曾经有局部高级用户应用 scala 语法提交语句执行,并且能够在 SQL 和 Scala 模式做自在切换,这大大丰盛了 adhoc 的应用场景。

02 Kyuubi 利用

Kyuubi 是网易数帆大数据团队奉献给 Apache 社区的开源我的项目。Kyuubi 次要利用在大数据畛域场景,包含大数据离线计算、adhoc、BI 等方向。Kyuubi 是一个分布式、反对多用户、兼容 JDBC 或 ODBC 的大数据处理服务。

为目前热门的计算引擎(例如 Spark、Presto 或 Flink 等)提供 SQL 等查问服务。

咱们抉择 Kyuubi 的起因:

1. 齐全兼容 Hive thrift 协定,合乎 B 站已有的技术选型。

2. 高可用和资源隔离,对于大规模的生产环境必不可少。

3. 灵便可扩大,基于 kyuubi 能够做更多适配性开发。

4. 反对多引擎代理,为将来对立计算入口打下基础。

5. 社区高质量实现以及社区沉闷。

Kyuubi 的架构能够分成三个局部:

1. 客户端: 用户应用 jdbc 或者 restful 协定来提交作业获取后果。

2.kyuubi server: 接管、治理和调度与客户端建设的 Kyuubi Session,Kyuubi Session 最终被路由到理论的引擎执行。

3.kyuubi engine: 承受解决 kyuubi server 发送过去的工作,不同 engine 有着不同的实现形式。

03 基于 Kyuubi 的改良

Kyuubi 曾经在 B 站生产环境稳固运行一年以上,目前所有的 Adhoc 查问都通过 kyuubi 来接入大数据计算引擎。在这一年中咱们经验了两次大版本的演进过程,从最后 kyuubi 1.3 到 kyuubi 1.4 版本,再从 kyuubi 1.4 降级 kyuubi 1.6 版本。与之前的 STS 相比,kyuubi 在稳定性和查问性能方面有着更好的体现。在此演进过程中,咱们联合 B 站业务以及 kyuubi 性能特点,对 kyuubi 进行局部革新。

3.1 减少 QUEUE 模式

Kyuubi Engine 原生提供了 CONNECTION、USER、GROUP 和 SERVER 多种隔离级别。在 B 站大数据计算资源容量依照部门划分,不同部门在 Yarn 上对应不同的队列。咱们基于 GROUP 模式进行了革新,实现 Queue 级别的资源隔离和权限管制。

用户信息和队列的映射由下层工具平台对立配置和治理,Kyuubi 只需关怀上游 Dispatcher 提交过去 user 和 queue 信息,进行调度并散发到对应队列的 spark engine 上进行计算。目前咱们有 20+ 个 adhoc 队列,每个队列都对应一个或者多个 Engine 实例 (Engine pool)。

3.2 在 QUEUE 模式下反对多租户

kyuubi server 端由超级用户 Hive 启动,在 spark 场景下 driver 和 executor 共享同一个的用户名。不同的用户提交不同的 sql,driver 端和 executor 端无奈辨别以后的工作是由谁提交的,在数据安全、资源申请和权限访问控制方面都存在着问题。

针对该问题,咱们对以下几个方面进行了革新。

3.2.1 kyuubi server 端

1. kyuubi server 以 hive principal 身份启动。

2. Dispatcher 以 username proxyUser 身份提交 SQL。

3.2.2 spark engine 端

1. Driver 和 Executor 以 hive 身份启动。

2. Driver 以 username proxyUser 身份提交 SQL。

3. Executor 启动 Task 线程须要以 username proxyUser 身份执行 Task。

4. 同时须要保障所有的公共线程池,绑定的 UGI 信息正确。如 ORC Split 线程池上,当 Orc 文件达到肯定数量会启用线程池进行 split 计算,线程池是全局共享,永恒绑定的是第一次触发调用的用户 UGI 信息,会导致用户 UGI 信息错乱。

3.3 kyuubi engine UI 展现性能

在日常应用中咱们发现 kyuubi 1.3 Engine UI 页面展现不够敌对。不同的用户执行不同的 SQL 无奈辨别的开,session、job、stage、task 无奈关联的起来。

导致排查定位用户问题比拟艰难,咱们借鉴 STS 拓展了 kyuubi Engine UI 页面。咱们对以下几个方面进行了革新。

1. 自定义 kyuubi Listener 监听 Spark Job、Stage、Task 相干事件以及 SparkSQL 相干事件:SessionCreate、SessionClose、executionStart、executionRunning、executionEnd 等

2. Engine 执行 SQL 相干操作时,绑定并发送相干 SQL Event,结构 SQL 相干状态事件,将采集的 Event 进行状态剖析、汇总以及存储。

3. 自定义 Kyuubi Page 进行 Session 以及 SQL 相干状态实时展现。

Session Statistics 信息展现

SQL Statistics 信息展现

3.4 kyuubi 反对配置核心加载 Engine 参数

为了解决队列之间计算资源需要的差异性,如任务量大的队列须要更多计算资源 (Memory、Cores),任务量小的队列须要大量资源,每个队列需要的差别,咱们将所有队列的 Engine 相干资源参数对立到配置核心治理。每个队列第一次启动 Engine 前,将查问本人所属队列的参数并追加到启动命令中,进行参数的笼罩。

3.5 Engine 执行工作的进度显示与耗费资源上报性能

工作在执行过程中,用户最关怀的就是本人工作的进度以及健康状况,平台比较关心的是工作所耗费的计算资源老本。咱们在 Engine 端,基于事件采集 user、session、job、stage 信息并进行存储,启动定时工作将收集的 user、session、job、stage 信息进行关联并进行资源耗费成本计算,并将后果注入对应 operation log 中, 回传给前端日志展现。

工作进度信息展现

查问耗费资源上报展现

04 Kyuubi 稳定性建设

4.1 大后果集溢写到磁盘

在 adhoc 场景中用户通常会拉取大量后果到 driver 中,同一时间大量的用户同时拉取后果集,会造成大量的内存耗费,导致 spark engine 内存缓和,driver 性能降落问题,间接影响着用户的查问体验,为此专门优化了 driver fetch result 的过程,在获取后果时会实时监测 driver 内存应用状况,当 driver 内存使用量超过阈值后会先将拉取到的后果间接写出到本地磁盘文件中,在用户申请后果时再从文件中分批读出返回,减少 driver 的稳定性。

4.2 单个 SQL 的 task 并发数、执行工夫和 task 数量的限度

在生产过程中,咱们经常性的遇到单个大作业间接占用了整个 Engine 的全副计算资源,导致短作业长时间得不到计算资源,始终 pending 的状况,为了解决这种问题咱们对以下几个方面进行优化。

  • Task 并发数方面:默认状况下 Task 调度时只有有资源就会全副调度调配进来,后续 SQL 过去就面临着齐全无资源可用的状况,咱们对单个 SQL 参加调度的 task 数进行了限度,具体的限度数随着可用资源大小进行动静调整。
  • 单个 SQL 执行工夫方面:下层 Dispatcher 和上层 Engine 都做了超时限度,规定 adhoc 工作超过 1 小时,就会将该工作 kill 掉。
  • 单个 Stage task 数量方面:同时咱们也对单个 stage 的 task 数进行限度,一个 stage 最大容许 30W 个 task。

4.3 单次 table scan 的文件数和大小的限度

为保障 kyuubi 的稳定性,咱们对查问数据量过大的 SQL 进行限度。通过自定义内部 optimization rule(TableScanLimit) 来达到目标。TableScanLimit 匹配 LocalLimit,收集子节点 project、filter。匹配叶子结点 HiveTableRelation 和 HadoopFsRelation。即匹配 Hive 表和 DataSource 表的 Logical relation,针对不同的表采取不同的计算形式。

1. HiveTableRelation:

  • 非分区表, 通过 table meta 拿到表的 totalSize、numFiles、numRows 值。
  • 分区表,判断是否有下推下来的分区。若有,则拿对应分区的数据 totalSize、numFiles、numRows。若没有,则拿全表的数据。

2. HadoopFsRelation:判断 partitionFilter 是否存在动静 filter

  • 不存在,则通过 partitionFilter 失去须要扫描的分区
  • 存在,则对 partitionFilter 扫描进去的分区进一步过滤失去最终须要扫描的分区

获取到 SQL 查问的 dataSize、numFiles、numRows 后,还须要依据表存储类型、不同字段的类型、是否存在 limit、在依据下推来的 project、filter 得出最终须要扫描的列,估算出须要 table scan size,如果 table scan size 超过制订阈值则回绝查问并告知起因。

4.4 危险 join condition 发现 &Join 膨胀率的限度

4.4.1 危险 join condition 发现

为保障 kyuubi 的稳定性,咱们也对影响 Engine 性能的 SQL 进行限度。用户在写 sql 时可能并不理解 spark 对于 join 的底层实现,可能会导致程序运行的十分慢甚至 OOM,这个时候如果能够为用户提供哪些 join condition 可能是导致 engine 运行慢的起因,并揭示用户改良和不便定位问题,甚至能够回绝这些危险的 query 提交。

在抉择 join 形式的时候如果是等值 join 则依照 BHJ,SHJ,SMJ 的程序抉择,如果还没有抉择 join type 则断定为 Cartesian Join,如果 join 类型是 InnerType 的就应用 Cartesian Join,Cartesian Join 会产生笛卡尔积比较慢,如果不是 InnerType,则应用 BNLJ,在判断 BHJ 时,表的大小就超过了 broadcast 阈值,因而将表 broadcast 进来可能会对 driver 内存造成压力,性能比拟差甚至可能会 OOM,因而将这两种 join 类型定义为危险 join。

如果是非等值 join 则只能应用 BNLJ 或者 Cartesian Join,如果在第一次 BNLJ 时选不出 build side 阐明两个表的大小都超过了 broadcast 阈值,则应用 Cartesian Join,如果 Join Type 不是 InnerType 则只能应用 BNLJ,因而 Join 策略中抉择 Cartesian Join 和第二次抉择 BNLJ 时为危险 join。

4.4.2 Join 膨胀率的限度

在 shareState 中的 statusScheduler 用于收集 Execution 的状态和指标,这其中的指标就是依照 nodes 汇总了各个 task 汇报上来的 metrics,咱们启动了一个 join 检测的线程定时的监控 Join 节点的 “number of output rows” 及 Join 的 2 个父节点的 “number of output rows” 算出该 Join 节点的膨胀率。

Join 节点的收缩检测:

05 kyuubi 新利用场景

5.1 大查问 connection&scala 模式的应用

5.1.1 connection 模式的应用

adhoc 大工作和简单的 SQL 会导致 kyuubi engine 在肯定工夫内性能降落,重大影响了其余失常的 adhoc 工作的执行效率。咱们在 adhoc 前端凋谢了大查问模式,让这些简单、查问量大的工作走 kyuubi connection 模式。在 kyuubi connection 模式下一个用户工作独自享有本人申请的资源,独立的 Driver,工作的大小快慢都由本身的 SQL 特色决定,不会影响到其余用户的 SQL 工作,同时咱们也会适当放开后面一些限度条件。

connection 模式在 B 站的应用场景:

  1. table scan 断定该 adhoc 工作为大工作,执行工夫超过 1 个小时。
  2. 简单的 SQL 工作, 该工作存在笛卡尔积或 Join 收缩超过阈值。
  3. 单个 SQL 单个 stage 的 task 数超过 30W。
  4. 用户自行抉择 connection 模式。

5.1.2 scala 模式的应用

SQL 模式能够解决大数据 80% 的业务问题,SQL 模式加上 Scala 模式编程能够解决 99% 的业务问题;SQL 是一种十分用户敌对的语言,用户不必理解 Spark 外部的原理,就能够应用 SQL 进行简单的数据处理,然而它也有肯定的局限性。

SQL 模式不够灵便,无奈以 dataset 以及 rdd 两种形式进行数据处理操作。无奈解决更加简单的业务,特地是非数据处理相干的需要。另一方面,用户执行 scala code 我的项目时必须打包我的项目并提交到计算集群,如果 code 出错了就需来回打包上传,十分的耗时。

Scala 模式能够间接提交 code,相似 Spark 交互式 Shell,简化流程。针对这些问题,咱们将 SQL 模式、Scala 模式的长处联合起来,两者进行混合编程,这样基本上能够解决数据分析场景下大部分的 case。

5.2 Presto on spark

Presto 为了保障集群的稳定性,每个 Query 的最大内存进行了限度,超过配置内存的 Query 会被 Presto oom kill 掉。局部 ETL 工作会呈现随着业务增长,数据量增大,占用内存也会增多,当超过阈值后,流程就会呈现失败。

为了解决这个问题,prestodb 社区开发了一个 presto on spark 的我的项目,通过将 query 提交到 Spark 来解决 query 的内存占用过大导致的扩张性问题,然而社区计划对于曾经存在的查问并不是很敌对,用户的提交形式有 presto-cli、pyhive 等形式,而要应用 Presto on spark 我的项目,则必须通过 spark-submit 形式将 query 提交到 yarn。

为了让用户无感知的执行 presto on spark 查问,咱们在 presto gateway 上做了一些革新,同时借助 kyuubi restulful 的接口,和 service + engine 的调度能力,在 kyuubi 内开发了 Presto-Spark Engine,该 engine 可能比拟敌对的来提交查问到 Yarn。

次要实现细节如下:

1. presto gateway 将 query 的执行历史进行保留,包含 query 的资源应用状况、报错信息等。

2. presto gateway 申请 HBO 服务,判断以后 query 是否须要通过 presto on spark 提交查问。

3. presto gateway 通过 zk 获取可用的 kyuubi server 列表,随机抉择一台,通过 http 向 kyuubi open 一个 session。

4. presto gateway 依据获取到的 sessionHandle 信息,再提交语句。

5. kyuubi server 接管到 query 后,会启动一个独立的 Presto-Spark Engine,构建启动命令,执行命令提交 spark-submit 到 yarn。

6. Presto gateway 依据返回的 OperatorHandle 信息, 通过 http 一直获取 operation status。

7. 作业胜利,则通过 fetch result 申请将后果获取并返回给客户端。

06 kyuubi 部署形式

6.1 Kyuubi server 接入 K8S

整合 Engine on yarn label 的实际

生产实践中遇到的问题:

1. 目前 kyuubi server/engine 部署在混部集群上,环境简单,各组件环境相互依赖、公布过程中难免会存在环境不统一、误操作等问题,从而导致服务运行出错。

2. 资源管理问题。最后 engine 应用的是 client 模式,不同的队列的 engine driver 应用的都是大内存 50g-100g 不等,同时 AM、NM、DN、kyuubi server 都共享着同一台物理机器上的资源,当 AM 启动过多,占满整个机器的资源,导致机器内存不足,engine 无奈启动。

针对于该问题,咱们研发了一套基于 Queue 模式资源分配调度实现:每个 kyuubi server 和 spark engine 在 znode 上都记录着以后资源应用状况。每个 kyuubi server znode 信息:以后 kyuubi 注册 SparkEngine 数量、以后 kyuubi server 注册 SparkEngine 实例、kyuubi server 内存总大小以及以后 kyuubi server 残余内存总大小等。

每个 engine znode 信息:所属 kyuubi server IP/ 端口、以后 SparkEngine 内存、以后 SparkEngine 所属队列等。每次 Spark engine 的启动 / 退出,都会获取该队列的目录锁,而后对其所属的 kyuubi server 进行资源更新操作。kyuubi server 如果宕机,在启动时,遍历获取所有 engine 在 znode 的信息,进行资源和状态的疾速复原。

3. 针对资源管理性能也存在着一些问题: 资源碎片化问题、新性能的拓展不敌对以及保护老本大。Engine 应用的是 client 模式,过多大内存的 AM 会占用客户端的过多计算资源,导致 engine 程度拓展受限。

针对以上提出的问题,咱们做了对应的解决方案:

1. kyuubi server 接入 k8s

咱们指定了一批机器作为 kyuubi server 在 k8s 上调度资源池,实现 kyuubi server 环境、资源的隔离。实现了 kyuubi server 疾速部署、进步 kyuubi server 程度扩大能力,升高了运维老本。

2. Engine on yarn label

咱们将 kyuubi engine 资源管理交给 yarn,由 yarn 负责 engine 的调配和调度。咱们采纳了 cluster 模式以防 engine 在程度拓展时受到资源限度。采纳 cluster 模式后,咱们遇到了新的问题:在 queue 模式下 engine driver 应用的都是 50g-100g 不等的大内存,然而因为 yarn 集群的配置限度,可能申请的最大 Container 资源量为 <28G, 10vCore>。为了在 cluster 模式的状况下保障 Driver 可能获取到足够的资源,咱们革新了 yarn 以适应此类场景。咱们将需要拆分为以下三项:

  • 将 kyuubi Driver 搁置于独立的 Node Label 中,该 Node Label 中的服务器由 kyuubi driver 独立应用;
  • kyuubi Executor 依然搁置在 Default Label 的各对应队列的 adhoc 叶子队列内,承接 adhoc 工作解决工作;
  • Driver 申请的资源须要大于 MaxAllocation,即上文所述的 <28G, 10vCore>。心愿可能依据 Node Label 动静设置 Queue 级别的 MaxAllocation,使得 kyuubi Driver 可能取得较大资源量。

首先,咱们在 yarn 上建设了 kyuubi\_label,并在 label 内与 Default Label 映射建设 kyuubi 队列,以供所有的 Driver 对立提交在 kyuubi 队列上。并通过“spark.yarn.am.nodeLabelExpression=kyuubi\_label”指定 Driver 提交至 kyuubi_label,通过“spark.yarn.executor.nodeLabelExpression=”指定 Executor 提交至 default label,实现如下的成果:

其次,咱们将 yarn 的资源最大值由原先的“集群”级别管控下放至“队列 +Label”级别管控,通过调整 ”queue name + kyuubi_label” 的 Conf,咱们可能将 Driver 的 Container 资源量最大值进步至 <200G, 72vCore>,且保障其余 Container 的最大值仍为 <28G, 10vCore>。同样申请 50G 的 Driver,在 default 集群中会呈现失败提醒:

而在 kyuubi_lable 的同队列下则可能胜利运行, 这样咱们既借助了 yarn 的资源管控能力,又保障了 kyuubi driver 取得的资源量。

07 将来布局

1. 小的 ETL 工作接入 kyuubi,缩小 ETL 工作资源申请工夫

2. Kyuubi Engine(Spark 和 Flink)云原生,接入 K8S 对立调度

3. Spark jar 工作也对立接入 Kyuubi

以上是明天的分享内容,如果你有什么想法或疑难,欢送大家在留言区与咱们互动,如果喜爱本期内容的话,请给咱们点个赞吧!

本文转载自:Apache Kyuubi 在 B 站大数据场景下的利用实际

正文完
 0