01 背景介绍
近几年随着B站业务高速倒退,数据量一直减少,离线计算集群规模从最后的两百台倒退到目前近万台,从单机房倒退到多机房架构。在离线计算引擎上目前咱们次要应用Spark、Presto、Hive。架构图如下所示,咱们的BI、ADHOC以及DQC服务都是通过自研的Dispatcher路由服务来实现对立SQL调度,Dispatcher会联合查问SQL语法特色、读HDFS量以及以后引擎的负载状况,动静地抉择以后最佳计算引擎执行工作。如果用户SQL失败了会做引擎主动降级,升高用户应用门槛;其中对于Spark查问晚期咱们都是走STS,然而STS自身有很多性能和可用性上的问题,因而咱们引入了Kyuubi,通过Kyuubi提供的多租户、多引擎代理以及齐全兼容Hive Thrift协定能力,实现各个部门Adhoc工作的资源隔离和权限验证。
Query查问状况
目前在Adhoc查问场景下,SparkSQL占比靠近一半,依赖Kyuubi对于Scala语法的反对,目前曾经有局部高级用户应用scala语法提交语句执行,并且能够在SQL和Scala模式做自在切换,这大大丰盛了adhoc的应用场景。
02 Kyuubi利用
Kyuubi 是网易数帆大数据团队奉献给 Apache 社区的开源我的项目。Kyuubi 次要利用在大数据畛域场景,包含大数据离线计算、adhoc、BI等方向。Kyuubi 是一个分布式、反对多用户、兼容 JDBC 或 ODBC 的大数据处理服务。
为目前热门的计算引擎(例如Spark、Presto或Flink等)提供SQL等查问服务。
咱们抉择Kyuubi的起因:
1. 齐全兼容Hive thrift 协定,合乎B站已有的技术选型。
2. 高可用和资源隔离,对于大规模的生产环境必不可少。
3. 灵便可扩大,基于kyuubi能够做更多适配性开发。
4. 反对多引擎代理,为将来对立计算入口打下基础。
5. 社区高质量实现以及社区沉闷。
Kyuubi 的架构能够分成三个局部:
1.客户端: 用户应用jdbc或者restful协定来提交作业获取后果。
2.kyuubi server: 接管、治理和调度与客户端建设的Kyuubi Session,Kyuubi Session最终被路由到理论的引擎执行。
3.kyuubi engine: 承受解决 kyuubi server发送过去的工作,不同engine有着不同的实现形式。
03 基于Kyuubi的改良
Kyuubi曾经在B站生产环境稳固运行一年以上,目前所有的Adhoc查问都通过kyuubi来接入大数据计算引擎。 在这一年中咱们经验了两次大版本的演进过程,从最后kyuubi 1.3到kyuubi 1.4版本,再从kyuubi 1.4降级kyuubi 1.6版本。与之前的STS相比,kyuubi在稳定性和查问性能方面有着更好的体现。在此演进过程中,咱们联合B站业务以及kyuubi性能特点,对kyuubi进行局部革新。
3.1 减少QUEUE模式
Kyuubi Engine原生提供了CONNECTION、USER、GROUP和SERVER多种隔离级别。在B站大数据计算资源容量依照部门划分,不同部门在Yarn上对应不同的队列。咱们基于GROUP模式进行了革新,实现Queue级别的资源隔离和权限管制。
用户信息和队列的映射由下层工具平台对立配置和治理,Kyuubi只需关怀上游Dispatcher提交过去user和queue信息,进行调度并散发到对应队列的spark engine上进行计算。目前咱们有20+个adhoc队列,每个队列都对应一个或者多个Engine实例(Engine pool)。
3.2 在QUEUE模式下反对多租户
kyuubi server端由超级用户Hive启动,在spark场景下driver和executor共享同一个的用户名。不同的用户提交不同的sql, driver端和executor端无奈辨别以后的工作是由谁提交的,在数据安全、资源申请和权限访问控制方面都存在着问题。
针对该问题,咱们对以下几个方面进行了革新。
3.2.1 kyuubi server端
1. kyuubi server以hive principal身份启动。
2. Dispatcher以username proxyUser身份提交SQL。
3.2.2 spark engine端
1. Driver和Executor以hive身份启动。
2. Driver以username proxyUser身份提交SQL。
3. Executor启动Task线程须要以username proxyUser身份执行Task。
4. 同时须要保障所有的公共线程池,绑定的UGI信息正确。如ORC Split线程池上,当Orc文件达到肯定数量会启用线程池进行split计算,线程池是全局共享,永恒绑定的是第一次触发调用的用户UGI信息,会导致用户UGI信息错乱。
3.3 kyuubi engine UI 展现性能
在日常应用中咱们发现 kyuubi 1.3 Engine UI页面展现不够敌对。不同的用户执行不同的SQL无奈辨别的开,session 、job、stage、task无奈关联的起来。
导致排查定位用户问题比拟艰难,咱们借鉴STS拓展了kyuubi Engine UI页面。咱们对以下几个方面进行了革新。
1. 自定义kyuubi Listener监听Spark Job、Stage、Task相干事件以及SparkSQL相干事件:SessionCreate、SessionClose、executionStart、executionRunning、executionEnd等
2. Engine执行SQL相干操作时,绑定并发送相干SQL Event,结构SQL相干状态事件,将采集的Event进行状态剖析、汇总以及存储。
3. 自定义Kyuubi Page进行Session以及SQL相干状态实时展现。
Session Statistics信息展现
SQL Statistics信息展现
3.4 kyuubi反对配置核心加载Engine参数
为了解决队列之间计算资源需要的差异性,如任务量大的队列须要更多计算资源(Memory、Cores),任务量小的队列须要大量资源,每个队列需要的差别,咱们将所有队列的Engine相干资源参数对立到配置核心治理。每个队列第一次启动Engine前,将查问本人所属队列的参数并追加到启动命令中,进行参数的笼罩。
3.5 Engine执行工作的进度显示与耗费资源上报性能
工作在执行过程中,用户最关怀的就是本人工作的进度以及健康状况,平台比较关心的是工作所耗费的计算资源老本。咱们在Engine端,基于事件采集user、session、job、stage信息并进行存储,启动定时工作将收集的user、session、job、stage信息进行关联并进行资源耗费成本计算,并将后果注入对应operation log中, 回传给前端日志展现。
工作进度信息展现
查问耗费资源上报展现
04 Kyuubi稳定性建设
4.1 大后果集溢写到磁盘
在adhoc 场景中用户通常会拉取大量后果到 driver 中,同一时间大量的用户同时拉取后果集,会造成大量的内存耗费,导致spark engine内存缓和,driver性能降落问题,间接影响着用户的查问体验,为此专门优化了driver fetch result 的过程,在获取后果时会实时监测driver内存应用状况,当driver内存使用量超过阈值后会先将拉取到的后果间接写出到本地磁盘文件中,在用户申请后果时再从文件中分批读出返回,减少driver的稳定性。
4.2 单个 SQL 的task并发数、执行工夫和 task 数量的限度
在生产过程中,咱们经常性的遇到单个大作业间接占用了整个Engine的全副计算资源,导致短作业长时间得不到计算资源,始终 pending的状况,为了解决这种问题咱们对以下几个方面进行优化。
- Task并发数方面:默认状况下Task调度时只有有资源就会全副调度调配进来,后续SQL过去就面临着齐全无资源可用的状况,咱们对单个SQL参加调度的task数进行了限度,具体的限度数随着可用资源大小进行动静调整。
- 单个 SQL 执行工夫方面:下层Dispatcher和上层Engine都做了超时限度,规定adhoc工作超过1小时,就会将该工作kill掉。
- 单个Stage task数量方面:同时咱们也对单个stage的task数进行限度,一个stage最大容许30W个task。
4.3 单次 table scan 的文件数和大小的限度
为保障kyuubi的稳定性,咱们对查问数据量过大的SQL进行限度。通过自定义内部optimization rule(TableScanLimit)来达到目标。TableScanLimit匹配LocalLimit,收集子节点project、filter。匹配叶子结点HiveTableRelation和HadoopFsRelation。即匹配Hive表和DataSource表的Logical relation,针对不同的表采取不同的计算形式。
1. HiveTableRelation:
- 非分区表, 通过table meta 拿到表的totalSize、numFiles、numRows值。
- 分区表,判断是否有下推下来的分区。若有,则拿对应分区的数据 totalSize、numFiles、numRows。若没有,则拿全表的数据。
2. HadoopFsRelation:判断partitionFilter是否存在动静filter
- 不存在,则通过partitionFilter失去须要扫描的分区
- 存在,则对partitionFilter扫描进去的分区进一步过滤失去最终须要扫描的分区
获取到SQL查问的dataSize、numFiles、numRows后, 还须要依据表存储类型、不同字段的类型、是否存在limit、在依据下推来的project、filter 得出最终须要扫描的列,估算出须要table scan size,如果table scan size超过制订阈值则回绝查问并告知起因。
4.4 危险join condition发现&Join膨胀率的限度
4.4.1 危险join condition发现
为保障kyuubi的稳定性,咱们也对影响Engine性能的SQL进行限度。用户在写sql时可能并不理解spark对于join的底层实现,可能会导致程序运行的十分慢甚至OOM,这个时候如果能够为用户提供哪些join condition可能是导致engine运行慢的起因,并揭示用户改良和不便定位问题,甚至能够回绝这些危险的query提交。
在抉择 join 形式的时候如果是等值 join 则依照 BHJ,SHJ,SMJ 的程序抉择,如果还没有抉择join type则断定为 Cartesian Join,如果 join 类型是InnerType的就应用 Cartesian Join,Cartesian Join会产生笛卡尔积比较慢,如果不是 InnerType,则应用 BNLJ,在判断 BHJ 时,表的大小就超过了broadcast 阈值,因而将表broadcast进来可能会对driver内存造成压力,性能比拟差甚至可能会 OOM,因而将这两种 join 类型定义为危险 join。
如果是非等值 join 则只能应用 BNLJ 或者 Cartesian Join,如果在第一次 BNLJ 时选不出 build side 阐明两个表的大小都超过了broadcast阈值,则应用Cartesian Join,如果Join Type不是 InnerType 则只能应用 BNLJ,因而Join策略中抉择Cartesian Join和第二次抉择 BNLJ 时为危险 join。
4.4.2 Join膨胀率的限度
在shareState 中的 statusScheduler 用于收集 Execution 的状态和指标,这其中的指标就是依照 nodes 汇总了各个 task 汇报上来的 metrics,咱们启动了一个 join检测的线程定时的监控 Join 节点的 "number of output rows"及Join 的2个父节点的 "number of output rows" 算出该 Join 节点的膨胀率。
Join 节点的收缩检测:
05 kyuubi 新利用场景
5.1 大查问connection&scala模式的应用
5.1.1 connection模式的应用
adhoc大工作和简单的SQL会导致kyuubi engine在肯定工夫内性能降落,重大影响了其余失常的adhoc工作的执行效率。咱们在adhoc前端凋谢了大查问模式,让这些简单、查问量大的工作走kyuubi connection模式。在kyuubi connection模式下一个用户工作独自享有本人申请的资源,独立的Driver,工作的大小快慢都由本身的SQL特色决定,不会影响到其余用户的SQL工作,同时咱们也会适当放开后面一些限度条件。
connection 模式在B站的应用场景:
- table scan断定该adhoc工作为大工作,执行工夫超过1个小时。
- 简单的SQL工作, 该工作存在笛卡尔积或Join收缩超过阈值。
- 单个SQL单个stage的task数超过30W。
- 用户自行抉择connection模式。
5.1.2 scala模式的应用
SQL模式能够解决大数据80%的业务问题,SQL模式加上Scala模式编程能够解决99%的业务问题;SQL是一种十分用户敌对的语言,用户不必理解Spark外部的原理,就能够应用SQL进行简单的数据处理,然而它也有肯定的局限性。
SQL模式不够灵便,无奈以dataset以及rdd两种形式进行数据处理操作。无奈解决更加简单的业务,特地是非数据处理相干的需要。另一方面,用户执行scala code我的项目时必须打包我的项目并提交到计算集群,如果code出错了就需来回打包上传,十分的耗时。
Scala模式能够间接提交code,相似Spark交互式Shell,简化流程。针对这些问题, 咱们将SQL模式、Scala模式的长处联合起来,两者进行混合编程,这样基本上能够解决数据分析场景下大部分的case。
5.2 Presto on spark
Presto为了保障集群的稳定性,每个Query的最大内存进行了限度,超过配置内存的Query会被Presto oom kill掉。局部ETL工作会呈现随着业务增长,数据量增大,占用内存也会增多,当超过阈值后,流程就会呈现失败。
为了解决这个问题,prestodb社区开发了一个presto on spark的我的项目,通过将query提交到Spark来解决query的内存占用过大导致的扩张性问题,然而社区计划对于曾经存在的查问并不是很敌对,用户的提交形式有presto-cli、pyhive等形式,而要应用Presto on spark我的项目,则必须通过spark-submit形式将query提交到yarn。
为了让用户无感知的执行presto on spark查问,咱们在presto gateway上做了一些革新,同时借助kyuubi restulful的接口,和service + engine的调度能力,在kyuubi内开发了Presto-Spark Engine,该engine可能比拟敌对的来提交查问到Yarn。
次要实现细节如下:
1. presto gateway将query的执行历史进行保留,包含query的资源应用状况、报错信息等。
2. presto gateway申请HBO服务,判断以后query是否须要通过presto on spark提交查问。
3. presto gateway通过zk获取可用的kyuubi server列表,随机抉择一台,通过http向kyuubi open一个session。
4. presto gateway依据获取到的sessionHandle信息,再提交语句。
5. kyuubi server接管到query后,会启动一个独立的Presto-Spark Engine,构建启动命令,执行命令提交spark-submit 到yarn。
6. Presto gateway依据返回的OperatorHandle信息, 通过http一直获取operation status。
7. 作业胜利,则通过fetch result申请将后果获取并返回给客户端。
06 kyuubi部署形式
6.1 Kyuubi server接入K8S
整合 Engine on yarn label的实际
生产实践中遇到的问题:
1.目前kyuubi server/engine部署在混部集群上,环境简单,各组件环境相互依赖、公布过程中难免会存在环境不统一、误操作等问题,从而导致服务运行出错。
2.资源管理问题。最后engine应用的是client模式,不同的队列的engine driver应用的都是大内存50g-100g不等 ,同时AM、NM 、DN、kyuubi server都共享着同一台物理机器上的资源,当AM启动过多, 占满整个机器的资源,导致机器内存不足,engine无奈启动。
针对于该问题,咱们研发了一套基于Queue模式资源分配调度实现:每个kyuubi server 和 spark engine在znode上都记录着以后资源应用状况。每个kyuubi server znode信息:以后kyuubi注册SparkEngine数量、以后kyuubi server注册SparkEngine实例、kyuubi server内存总大小以及以后kyuubi server残余内存总大小等。
每个engine znode信息:所属kyuubi server IP/端口、以后SparkEngine内存、以后SparkEngine所属队列等 。每次Spark engine的启动/退出,都会获取该队列的目录锁,而后对其所属的kyuubi server进行资源更新操作。kyuubi server如果宕机,在启动时,遍历获取所有engine在znode的信息,进行资源和状态的疾速复原。
3.针对资源管理性能也存在着一些问题: 资源碎片化问题、新性能的拓展不敌对以及保护老本大。Engine应用的是client模式,过多大内存的AM会占用客户端的过多计算资源,导致engine程度拓展受限。
针对以上提出的问题,咱们做了对应的解决方案:
1. kyuubi server接入k8s
咱们指定了一批机器作为kyuubi server在k8s上调度资源池,实现kyuubi server环境、资源的隔离。实现了kyuubi server疾速部署、进步kyuubi server程度扩大能力,升高了运维老本。
2. Engine on yarn label
咱们将kyuubi engine资源管理交给yarn,由yarn负责engine的调配和调度。咱们采纳了cluster模式以防engine在程度拓展时受到资源限度。采纳cluster模式后,咱们遇到了新的问题:在queue模式下engine driver应用的都是50g-100g不等的大内存,然而因为yarn集群的配置限度,可能申请的最大Container资源量为<28G, 10vCore>。为了在cluster模式的状况下保障Driver可能获取到足够的资源,咱们革新了yarn以适应此类场景。咱们将需要拆分为以下三项:
- 将kyuubi Driver搁置于独立的Node Label中,该Node Label中的服务器由kyuubi driver独立应用;
- kyuubi Executor依然搁置在Default Label的各对应队列的adhoc叶子队列内,承接adhoc工作解决工作;
- Driver申请的资源须要大于MaxAllocation,即上文所述的<28G, 10vCore>。心愿可能依据Node Label动静设置Queue级别的MaxAllocation,使得kyuubi Driver可能取得较大资源量。
首先,咱们在yarn上建设了kyuubi\_label,并在label内与Default Label映射建设kyuubi队列,以供所有的Driver对立提交在kyuubi队列上。并通过“spark.yarn.am.nodeLabelExpression=kyuubi\_label”指定Driver提交至kyuubi_label,通过“spark.yarn.executor.nodeLabelExpression= ”指定Executor提交至default label,实现如下的成果:
其次,咱们将yarn的资源最大值由原先的“集群”级别管控下放至“队列+Label”级别管控,通过调整"queue name + kyuubi_label"的Conf,咱们可能将Driver的Container资源量最大值进步至<200G, 72vCore>,且保障其余Container的最大值仍为<28G, 10vCore>。同样申请50G的Driver,在default集群中会呈现失败提醒:
而在kyuubi_lable的同队列下则可能胜利运行, 这样咱们既借助了yarn的资源管控能力,又保障了kyuubi driver取得的资源量。
07 将来布局
1. 小的ETL工作接入kyuubi,缩小ETL工作资源申请工夫
2. Kyuubi Engine(Spark和Flink)云原生,接入K8S对立调度
3. Spark jar 工作也对立接入Kyuubi
以上是明天的分享内容,如果你有什么想法或疑难,欢送大家在留言区与咱们互动,如果喜爱本期内容的话,请给咱们点个赞吧!
本文转载自:Apache Kyuubi 在 B 站大数据场景下的利用实际