乐趣区

关于大数据:Apache-Kyuubi-助力-CDH-解锁-Spark-SQL

Apache Kyuubi(Incubating)(下文简称 Kyuubi)是⼀个构建在 Spark SQL 之上的企业级 JDBC 网关,兼容 HiveServer2 通信协议,提供高可用、多租户能力。Kyuubi 具备可扩大的架构设计,社区正在致力使其可能反对更多通信协议(如 RESTful、MySQL)和计算引擎(如 Flink)。

Kyuubi 的愿景是让大数据平民化。一个的典型应用场景是替换 HiveServer2,帮忙企业把 HiveQL 迁徙到 Spark SQL,轻松取得 10~100 倍性能晋升(具体晋升幅度与 SQL 和数据无关)。另外最近比拟火的两个技术,LakeHouse 和数据湖,都与 Spark 联合得比拟严密,如果咱们能把计算引擎迁徙到 Spark 上,那咱们离这两个技术就很近了。

Kyuubi 最早起源于网易,这个我的项目自诞生起就是开源的。在 Kyuubi 倒退的前两年,它的应用场景次要在网易外部。自从 2020 年底进行了一次架构大降级、公布了 Kyuubi 1.0 之后,整个 Kyuubi 社区开始沉闷起来了,我的项目被越来越多的企业采纳,而后在往年 6 月进入了 Apache 基金会孵化器,并在往年 9 月公布了进入孵化器后的第一个版本 1.3.0-incubating。

Kyuubi vs SparkThriftServer vs HiveServer2

咱们通过 Kyuubi 和其余 SQL on Spark 计划的比照,看看用 Kyuubi 替换 HiveServer2 能带来什么样的晋升。图中对 Hive Server2 标记了 Hive on Spark,这是 Hive2 的一个性能,最早的 Hive 会把 SQL 翻译成 MapReduce 来执行,Hive on Spark 计划其实就是把 SQL 翻译成 Spark 算子来执行,然而这仅仅是物理算子的替换,因为复用了 Hive 的 SQL 解析逻辑,所以 SQL 方言还是 HiveQL,包含后续 SQL 的改写、优化走的都是 Hive 的优化器。Spark2 放弃了 Hive on Spark 计划,抉择从头开始做 SQL 解析、优化,发明了 Spark SQL 和 Catalyst。所以说,Spark Thrift Server 只是兼容了 HiveServer2 的 Thrift 通信协议,它整个 SQL 的解析、优化都被重写了。

Kyuubi 也是用 Spark 来解析、优化和执行 SQL,所以对于用户来说,客户端与服务端的通信协议是截然不同的,因为大家兼容的都是 HiveServer2 的通信协议。然而在 SQL 方言上,Kyuubi 和 Spark Thrift Server 是 Spark SQL,HiveServer2 是 HiveQL。SQL 的编译和优化过程,HiveServer2 在本过程上进行,Spark 是在 Driver 端进行。对于 STS,Thrift Server 和 Driver 在同一个过程内;对于 Kyuubi,Thrift Server 和 Spark Driver 是拆散的,它们能够跑在同一台机器上的不同过程(如 YARN Client 模式),也能够跑在不同的机器上(如 YARN Cluster 模式)。

对于执行阶段,当一条 SQL 提交后,HiveServer2 会将其翻译成了一个 Spark Application,每一次 SQL 提交就会生成一个全新的 Spark 的利用,都会经验 Driver 的创立、Executor 的创立过程,SQL 执行完结后再将其销毁掉。Spark Thrift Server 则是齐全相同的形式,一个 Spark Thrift Server 只持有一个 Driver,Driver 是常驻的,所有的 SQL 都会由这一个 Driver 来编译、执行。Kyuubi 不仅对这两种形式都反对,还反对了更灵便的 Driver 共享策略,会在后续具体介绍。

Kyuubi on Spark 与 CDH 集成

CDH 是应用最宽泛的 Apache Hadoop 发行版之一,其自身集成了 Spark,然而禁用了 Spark Thrift Server 性能和 spark-sql 命令,使得用户只能通过 spark-shell、spark-submit 应用 Spark,故而在 CDH 上应用 Spark SQL 具备肯定的门槛。在 CDH 上 SQL 计划用得更多的往往是 Hive,比如说咱们能够通过 Beeline、HUE 连贯 HiveServer2,来进行 SQL 批工作提交或交互式查问,同时也能够通过 Apache Superset 这类 BI 工具连贯到 HiveServer2 上做数据可视化展现,背地最终的计算引擎能够是 MapReduce,也能够是 Spark。

当咱们引入 Kyuubi 后,图中左侧的这些 Client 都是不须要更改的,只须要部署 Spark3 和 Kyuubi Server(以后 Kyuubi 仅反对 Spark3),再把 Client 连贯地址改一下,即可实现从 HiveQL 到 Spark SQL 的迁徙。当然其间可能会碰到 HiveQL 方言跟 Spark SQL 方言的差异性问题,能够联合老本抉择批改 Spark 或者批改 SQL 来解决。

Kyuubi 中有一个重要的概念叫作引擎共享级别(Engine Share Level),Kyuubi 通过该个性提供了更高级的 Spark Driver 共享策略。Spark Engine 与 Spark Driver 概念是等价的,都对应一个 Spark Application。Kyuubi 目前提供了三种 Engine 共享级别,SERVER、USER 和 CONNECTION,缺省是 USER 模式。

SERVER 模式相似 Spark Thrift Server,Kyuubi Server 只会持有一个 Spark Engine,即所有的查问都会提交到一个 Engine 上来跑。USER 模式即每个用户会应用独立的 Engine,能做到用户级别的隔离,这也是更具普适性的一种形式,一方面不心愿太浪费资源,每个 SQL 都起一个 Engine,另一方面也心愿放弃肯定的隔离性。CONNECTION 模式是指 Client 每建设一个连贯,就创立一个 Engine,这种模式和 Hive Server2 较为相似,但不完全一致。CONNECTION 模式比拟适宜跑批计算,比方 ETL 工作,往往须要数十分钟甚至几个小时,用户不心愿这些工作相互烦扰,同时也心愿不同的 SQL 有不同的配置,例如为 Driver 调配 2G 内存还是 8G 内存。总的来说,CONNECTION 模式比拟适宜跑批工作或者大工作,USER 模式比拟适宜 HUE 交互式查问的场景。

大家可能会放心,咱们是不是对 Spark 做了很厚的一层包装,限度了很多性能?其实不是的,所有的 Spark 的 configuration 在 Kyuubi 中都是可用的。当一个申请发送给 Kyuubi 时,Kyuubi 会去找一个适合的 Engine 跑这个工作,如果找不到它就会通过拼接 spark-submit 命令来创立一个 Engine,所以所有的 Spark 反对的 configuration 都能够用。如图中列举了 YARN Client 和 YARN Cluster 两种模式,怎么写配置,Spark Driver 就跑在什么中央,改成 K8s,Driver 就跑在 Kubernetes 外面,所以 Kyuubi 对 Kubernetes 的反对也是瓜熟蒂落的。

还有一个用户常问的问题,是不是 USER 模式或者 CONNECTION 模式,每一个场景都须要独自部署一套 Kyuubi Server?不须要。咱们能够把罕用的配置固化在 kyuubi-defaults.conf 外面,当 Client 连贯 Kyuubi Server 时,能够通过在 URL 外面写一些配置参数来笼罩默认配置项,比方缺省应用的是 USER 模式,提交批工作时抉择 CONNECTION 模式。

Kyuubi 引擎隔离级别的实现

Kyuubi 多种灵便的隔离级别是怎么实现的?下图绿色的组件即 Service Discovery Layer,目前通过 ZooKeeper 实现,但它实质上是一个服务注册 / 发现的组件。咱们曾经发现有开发者把 Kyuubi 集成到 Kubernetes 上,用 API Server 来实现 Service Discovery Layer,该性能目前还没有提交给社区。

上图中的 User Side 即 Client 侧,社区目前策略是齐全兼容 Hive 的 Client,以更好的复用 Hive 生态。Hive 自身通过 ZooKeeper 实现了 Client 侧的 HA 模式,Server 启动当前,会将本身依照肯定的规定注册到 ZooKeeper 外面,Beeline 或者其余 Client 连贯的时候,将连贯地址设置成 ZooKeeper 集群的地址,Client 即能够发现指定的门路下所有的 Server,随机抉择其中一个来连贯。这是 HiveServer2 协定的一部分,Kyuubi 是齐全兼容的。

Kyuubi Server 与 Engine 之间也应用了相似的服务注册与发现机制,Engine 在 Zookeeper 上的注册门路会恪守肯定的规定,例如在 USER 隔离级别下,门路规定是 /kyuubi_USER/{username}/{engine_node}。Kyuubi Server 接到连贯申请,会依照规定从特定门路下查找可用的 Engine 节点。若找到,间接连贯该 Engine;若找不到,创立一个 Engine 并期待其启动实现,Engine 启动实现后,会依照雷同的规定在指定门路下创立一个 engine 节点,并把本人的(蕴含连贯地址、版本等)填写到节点里。在 YARN Cluster 模式下,Engine 会被调配到 YARN 集群任意节点启动,正是通过这种机制,Server 能力找到并连贯 Engine。

对于 CONNECTION 模式,每一个 Engine 只会被连贯一次,为了实现这个成果,Kyuubi 设计了如下的门路规定,/kyuubi_CONNECTION/{username}/{uuid}/{engine_node},通 UUID 的唯一性来实现隔离。所以 Kyuubi 实现引擎隔离级别的形式是一个非常灵活的机制,咱们目前反对了 SERVER、USER 和 CONNECTION,但通过简略的扩大,它就能够反对更多更灵便的模式。在代码主线分支上,社区目前曾经实现了额定的两个共享级别,一个是 GROUP,能够让一组用户来共享一个 Engine;另外一个是 Engine Pool,能够让一个用户来应用多个 Engine 以进步并发能力,一个实用的场景是 BI 图表展现,例如为 Superset 配置一个 Service Account,对应多个 Engine,当几百个图表一起刷新时,能够将计算压力摊派到不同 Engine 上。

Kyuubi 实际 | 编译 Spark3.1 以适配 CDH5 并集成 Kyuubi

https://mp.weixin.qq.com/s/lgbmM1qNetuPB0-j-TAzkQ

Apache Kyuubi on Spark 在 CDH 上的深度实际

https://my.oschina.net/u/4565392/blog/5264848

咱们在 Kyuubi 的官网公众号提供了两篇文章,内容蕴含了将 Kyuubi 集成到 CDH 平台上的具体操作过程,第一篇形容了与 CDH5(Hadoop2,启用 Kerberos) 的集成,第二篇形容了 CDH6(Hadoop3,未启用 Kerberos) 的集成。对于集成其余非 CDH 平台的 Hadoop 发行版,也具备肯定的参考价值。

Spark 3 个性以及 Kyuubi 带来的加强

动静资源分配

首先是动静资源分配,Spark 自身曾经提供了 Executor 的动静伸缩能力。能够看到,这几个参数配置在语义上是十分明确的,形容了 Executor 起码能有多少个,最多能有多少个,最大闲置时长,以此管制 Executor 的动态创建和开释。

spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.maxExecutors=30
spark.dynamicAllocation.executorIdleTimeout=120

引入了 Kyuubi 后,联合方才提到的 Share Level 和 Engine 的创立机制,咱们能够实现 Driver 的动态创建,而后咱们还引入了一个参数,engine.idle.timeout,约定 Driver 闲置了多长时间当前也开释,这样就实现 Spark Driver 的动态创建与开释。

kyuubi.engine.share.level=CONNECTION|USER|SERVER
kyuubi.session.engine.idle.timeout=PT1H

这里要留神,因为 CONNECTION 场景比拟非凡,Driver 是不会被复用的,所以对于 CONNECTION 模式,engine.idle.timeout 是没有意义的,只有连贯断开 Driver 就会立即退出。

Adaptive Query Execution

Adaptive Query Execution(AQE)是 Spark 3 带来的一个重要个性,简而言之就是容许 SQL 边执行边优化。就拿 Join 作为例子来说,等值的 Inner Join,大表对大表做 Sort Merge Join,大表对小表做 Broadcast Join,但大小表的判断产生在 SQL 编译优化阶段,也就是在 SQL 执行之前。

咱们思考这样一个场景,两个大表和大表 Join,加了一个过滤条件,而后咱们发现跑完过滤条件之后,它就变成了一个大表和一个小表 Join 了,能够满足 Broadcast Join 的条件,但因为执行打算是在没跑 SQL 之前生成的,所以它还是一个 Sort Merge Join,这就会引入一个不必要的 Shuffle。这就是 AQE 优化的切入点,能够让 SQL 先跑一部分,而后回头再跑优化器,看看能不能满足一些优化规定来让 SQL 执行得更加高效。

另一个典型的场景是数据歪斜。咱们讲大数据不怕数据量大,然而怕数据歪斜,因为在现实状况下,性能是能够通过硬件的程度扩大来实现线性的晋升,然而一旦有了数据歪斜,可能会导致劫难。还是以 Join 为例,咱们假设是一个等值的 Inner Join,有个别的 partition 特地大,这种场景咱们会有一些须要批改 SQL 的解决方案,比方把这些大的挑出来做独自解决,最初把后果 Union 在一起;或者针对特定的 Join Key 加盐,比方加一个数字后缀,将其打散,然而同时还要放弃语义不变,就是说右表对应的数据要做拷贝来实现等价的 Join 语义,最初再把增加的数字后缀去掉。

能够发现,这样的一个手工解决计划也是有肯定法则可循的,在 Spark 中将该过程自动化了,所以在 Spark3 外面开启了 AQE 后,就能主动帮忙解决这类 Join 的数据歪斜问题。如果咱们的 ETL 工作外面有很多大表 Join,同时数据品质比拟差,有重大的数据歪斜,也没有精力去做逐条 SQL 的优化,这样状况从 HiveQL 迁到 Spark SQL 下面能够带来十分显著的性能晋升,10 到 100 倍一点也不夸大!

Extension

Spark 通过 Extension API 提供了扩大能力,Kyuubi 提供了 KyuubiSparkSQLExtension,利用 Extension API 在原有的优化器上做了一些加强。这里列举了其中一部分加强规定,其中有 Z -order 性能,通过自定义优化器规定反对了数据写入时 Z -order 优化排序的的性能,并且通过扩大 SQL 语法实现了 Z -order 语法的反对;也有一些规定丰盛了监控统计信息;还有一些规定限度了查问分区扫描数量,和后果返回数量等。

以 RepartitionBeforeWriteHive 为例做下简略介绍,这条规定用于解决 Hive 的小文件写入问题。对于 Hive 动静分区写入场景,如果执行打算最初一个 stage,在写入 Hive 表之前,DataFrame 的 Partition 散布与 Hive 表的 Partition 散布不统一,在数据写入时,每一个 task 就会将持有的数据写到很多 Hive 表分区外面,就会生成大量的小文件。当咱们开启 RepartitionBeforeWriteHive 规定当前,它会在写入 Hive 表之前按照 Hive 表的分区插入一个 Repartition 算子,保障雷同 Hive 表分区的数据被一个 task 持有,防止写入产生大量的小文件。

Kyuubi 为所有规定都提供了开关,如果你只心愿启用其中局部规定,参考配置文档将其关上即可。KyuubiSparkSQLExtension 提供一个 Jar,应用时,能够把 Jar 拷贝到 ${SPAKR_HOME}/jars 上面,或命令参数 –jar 增加 Jar,而后开启 KyuubiSparkSQLExtension,并依据配置项来抉择开启特定性能。

Spark on ClickHouse

Data Source V2 API 也是 Spark3 引入的一个重要个性。Data Source V2 最早在 Spark 2.3 提出,在 Spark 3.0 被从新设计。下图用多种色彩标记不同的 Spark 版本提供的 Data Source V2 API,咱们能够看到,每个版本都加了大量的 API。能够看出,DataSourceV2 API 性能非常丰盛,但咱们更看重的是它有一个十分良好的扩展性,使得 API 能够始终进化。

Apache Iceberg 在现阶段对 Data Source V2 API 提供了一个比拟残缺的适配,因为 Iceberg 的社区成员是 Data Source V2 API 次要设计者和推动者,这也为咱们提供了一个十分好的 Demo。

ClickHouse 目前在 OLAP 畛域,尤其在单表查问畛域能够说是一骑绝尘,如果咱们能联合 Spark 和 ClickHouse 这两个大数据组件,让 Spark 读写 ClickHouse 像拜访 Hive 表一样简略,就能简化很多工作,解决很多问题。

基于 Data Source V2 API,咱们实现了并开源了 Spark on ClickHouse,除了适配 API 晋升 Spark 操作 ClickHouse 的易用性,也非常重视性能,其中包含了反对本地表的通明读写。ClickHouse 基于分布式表和本地表实现了一套比较简单的分布式计划。如果间接写分布式表,开销是比拟大的,一个变通的计划是批改调整逻辑,手动计算分片,间接写分布式表背地的本地表,该计划繁琐且有概率出错。当应用 Spark ClickHouse Connector 写 ClickHouse 分布式表时,只需应用 SQL 或者 DataFrame API,框架会自动识别分布式表,并尝试将对其分布式表的写入转化成对本地表的写入,实现主动透写本地表,带来很大的性能晋升。

我的项目地址:
援用
https://github.com/housepower…

下图描述了通过革新之后的新一代数据平台,革新带来了非常显著的收益。在硬件资源不变的状况下,首先,Daily Batch ETL 从 8 个小时降落到了 2 个小时;其次,通过引入 Iceberg 和增量同步,数据的时效性是从天级降至十分钟级;第三,膨胀计算引擎,原有平台须要搭配 Hive、Elasticsearch、Presto、MongoDB、Druid、Spark、Kylin 等多种计算引擎满足不同的业务场景,在新平台中,Spark 和 ClickHouse 能够满足大部分场景,大大减少了计算引擎的保护老本;最初,缩短了数据链路,在以前,受限于 RDMS 的计算能力,咱们往往在数据展现前须要进行一遍一遍的加工,最终把聚合后果放到 MySQL 外面,然而当引入 ClickHouse 当前,通常只有把数据加工成主题大宽表,报表展现借助 ClickHouse 强悍的单表计算能力,现场计算就能够了,所以数据链路会短很多。

Kyuubi 社区瞻望

最初瞻望一下 Kyuubi 社区将来的倒退。方才提到了 Kyuubi 架构是可扩大的,目前 Kyuubi 兼容 HiveServer2,是因为仅实现了 Thrift Binary 协定;Kyuubi 目前仅反对 Spark 引擎,Flink 引擎正在开发中,这项工作由 T3 出行的社区搭档在做。RESTful Frontend 社区也正在做,这样咱们也能够提供 RESTful API。咱们还打算提供 MySQL 的 API,这样用户就能够间接应用 MySQL Client 或 MySQL JDBC Driver 来连贯。前面附了社区外面相干的 issue 或者 PR。

如下是行将呈现在 1.4 版本中的一些个性,包含方才曾经提到过的 Engine Pool、Z-order、Kerberos 相干的解决方案,以及新公布的 Spark 3.2 的适配工作。

[KYUUBI #913] Support long running Kerberos enabled SQL engine
[KYUUBI #939] Add Z-Order extensions to support optimize SQL
[KYUUBI #962] Support engine pool feature
[KYUUBI #981] Add more detail stats to kyuubi engine page
[KYUUBI #1059] Add Plan Only Operations
[KYUUBI #1085] Add forcedMaxOutputRows rule for limitation
[KYUUBI #1131] Rework kyuubi-hive-jdbc module
[KYUUBI #1206] Support GROUP for engine.share.level
[KYUUBI #1224] Support Spark 3.2
. . .

最初展现一下已知应用 Kyuubi 的企业,如果你也应用 Kyuubi,或者在调研企业级的 Spark SQL Gateway 计划,有任何相干的问题,欢送能够到咱们社区外面分享、探讨。

GitHub 分享页面:
援用
https://github.com/apache/inc…

结语

本文依据网易数帆大数据平台专家、Apache Kyuubi(Incubating) PPMC 成员潘成在 Apache Hadoop Meetup 2021 北京站的分享内容整顿,重点结合实际落地的案例讲述了如何实现 Apache Kyuubi(Incubating) on Spark 和 CDH 集成,以及该计划为企业数据平台建设带来的收益,并瞻望了 Apache Kyuubi(Incubating) 社区将来的倒退。

Apache Kyuubi(Incubating) 我的项目地址:

https://github.com/apache/inc…

视频回放:

https://www.bilibili.com/vide…

案例分享:

Apache Kyuubi 在 T3 出行的深度实际

退出移动版