SparkSQL是Spark生态系统中十分重要的组件。面向企业级服务时,SparkSQL存在易用性较差的问题,导致难满足日常的业务开发需要。本文将具体解读,如何通过构建SparkSQL服务器实现应用效率晋升和应用门槛升高。
前言
Spark 组件因为其较好的容错与故障复原机制,在企业的长时作业中应用的十分宽泛,而SparkSQL又是应用Spark组件中最为罕用的一种形式。
相比间接应用编程式的形式操作Spark的RDD或者DataFrame的API,SparkSQL可间接输出SQL对数据进行ETL等工作的解决,极大晋升了易用度。然而相比Hive等引擎来说,因为SparkSQL不足一个相似Hive Server2的SQL服务器,导致SparkSQL在易用性上比不上Hive。
很多时候,SparkSQL只能将本身SQL作业打包成一个Jar,进行spark-submit命令提交,因此大大降低Spark的易用性。除此之外,还可应用周边工具,如Livy,但Livy更像一个Spark 服务器,而不是SparkSQL服务器,因而无奈反对相似BI工具或者JDBC这样的标准接口进行拜访。
尽管Spark 提供Spark Thrift Server,然而Spark Thrift Server的局限十分多,简直很难满足日常的业务开发需要,具体的剖析请查看:观点|SparkSQL在企业级数仓建设的劣势
规范的JDBC接口
Java.sql包下定义了应用Java拜访存储介质的所有接口,然而并没有具体的实现,也就是说JavaEE外面仅仅定义了应用Java拜访存储介质的规范流程,具体的实现须要依附周边的第三方服务实现。
例如,拜访MySQL的mysql-connector-java启动包,即基于java.sql包下定义的接口,实现了如何去连贯MySQL的流程,在代码中只须要通过如下的代码形式:
Class.forName("com.mysql.cj.jdbc.Driver");Connection connection= DriverManager.getConnection(DB_URL,USER,PASS);//操作connection.close();
第一,初始化驱动、创立连贯,第二,基于连贯进行对数据的操作,例如增删改查。能够看到在Java定义的标准接口拜访中,先创立一个connection实现存储介质,而后实现connection后续操作。
性能问题导致单次申请实时创立connection的性能较差。因而咱们往往通过保护一个存有多个connection的连接池,将connection的创立与应用离开以晋升性能,因此也衍生出很多数据库连接池,例如C3P0,DBCP等。
Hive 的JDBC实现
构建SparkSQL服务器最好的形式是用如上Java接口,且大数据生态下行业已有标杆例子,即Hive Server2。Hive Server2在遵循Java JDBC接口标准上,通过对数据操作的形式,实现了拜访Hive服务。除此之外,Hive Server2在实现上,与MySQL等关系型数据稍有不同。
首先,Hive Server2自身是提供了一系列RPC接口,具体的接口定义在org.apache.hive.service.rpc.thrift包下的TCLIService.Iface中,局部接口如下:
public TOpenSessionResp OpenSession(TOpenSessionReq req) throws org.apache.thrift.TException;public TCloseSessionResp CloseSession(TCloseSessionReq req) throws org.apache.thrift.TException;public TGetInfoResp GetInfo(TGetInfoReq req) throws org.apache.thrift.TException;public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws org.apache.thrift.TException;public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws org.apache.thrift.TException;public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws org.apache.thrift.TException;public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws org.apache.thrift.TException;public TGetTablesResp GetTables(TGetTablesReq req) throws org.apache.thrift.TException;public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws org.apache.thrift.TException;public TGetColumnsResp GetColumns(TGetColumnsReq req) throws org.apache.thrift.TException;
也就是说,Hive Server2的每一个申请都是独立的,并且是通过参数的形式将操作和认证信息传递。Hive 提供了一个JDBC的驱动实现,通过如下的依赖便可引入:
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>version/version></dependency>
在HiveConnection类中实现了将Java中定义的SQL拜访接口转化为调用Hive Server2的RPC接口的实现,并且裁减了一部分Java定义中不足的能力,例如实时的日志获取。然而应用该能力时,须要将对应的实现类转换为Hive的实现类,例如:
HiveStatement hiveStatement = (HiveStatement) connection.createStatement();List<String> logs = hiveStatement.getQueryLog();
Log获取也需调用FetchResult接口,通过不同的参数来辨别获取Log信息还是获取内容信息,因而,Hive JDBC封装的调用Hive Server2 RPC接口流程是:
如果该流程触发获取MetaData、获取Functions等操作,则会调用其余接口,其中身份信息即token,是用THandleIdentifier类进行封装。在OpenSession时,由Hive Server2生成并且返回,后续所有接口都会附带传递这个信息,此信息是一次Connection连贯的惟一标记。
然而,Hive Server2在FetchResults办法中存在bug。因为Hive Server2没有很好解决hasMoreRows字段,导致Hive JDBC 客户端并未通过hasMoreRows字段去判断是否还有下一页,而是通过返回的List是否为空来判断。因而,相比Mysql Driver等驱动,Hive会多发动一次申请,直到返回List为空则进行获取下一页,对应的客户端的JDBC代码是:
ResultSet rs = hiveStatement.executeQuery(sql);while (rs.next()) { // }
即Hive JDBC实现next办法是通过返回的List是否为空来退出while循环。
构建SparkSQL服务器
介绍了JDBC接口常识与Hive的JDBC常识后,如果要构建一个SparkSQL服务器,那么这个服务器须要有以下几个特点:
- 反对JDBC接口,即通过Java 的JDBC规范进行拜访,能够较好与周边生态进行集成且升高应用门槛。
- 兼容Hive协定,如果要反对JDBC接口,那么须要提供SparkSQL的JDBC Driver。目前,大数据畛域Hive Server2提供的Hive-JDBC-Driver曾经被宽泛应用,从迁徙老本来说最好的形式就是放弃Hive的应用形式不变,只须要换个端口就行,也就是能够通过Hive的JDBC Driver间接拜访SparkSQL服务器。
- 反对多租户,以及相似用户名+明码和Kerberos等常见的用户认证能力。
- 反对跨队列提交,同时反对在JDBC的参数外面配置Spark的相干作业参数,例如Driver Memory,Execute Number等。
这里还有一个问题须要思考,即用户通过SparkSQL服务器提交的是一段SQL代码,而SparkSQL在执行时须要向Yarn提交Jar。那么,如何实现SQL到Jar提交转换?
一个最简略的形式是,用户每提交一个SQL就执行一次spark-submit命令,将后果保留再缓存,提供给客户端。还有更好形式,即提交一个常驻的Spark 作业,这个作业是一个常驻工作,作业会开启一个端口,用来接管用户的SQL进行执行,并且保留。
然而为了解决相似Spark Thrift Server的问题,作业须要和用户进行绑定,而不是随着Spark的组件启动进行绑定,即作业的提交以及接管哪个用户的申请,均来自于用户的行为触发。
有了这样几个大的方向后,便能够开始开发SparkSQL服务器。首先须要实现TCLIService.Iface下的所有接口,上面用代码+正文的形式来讲述这些Thrift接口的含意,以及如果实现一个SparkSQL服务器,须要在这些接口做什么内容:
public class SparkSQLThriftServer implements TCLIService.Iface { @Override public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { //Hive JDBC Driver在执行创立Connection的时候会调用此接口,在这里保护一个用户与Spark 作业的对应关系。 //来判断是须要复用一个曾经存在的Spark作业,还是全新执行一次spark-submt。 //用户与是否须要spark-submit的关联关系均在这里实现。 //同时须要生成THandleIdentifier对象,并且和用户身份进行关联,后续其余办法调用均须要应用这个对象关联出用户的信息。 return null; } @Override public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException { //客户端调用connection.close()办法后会进入到这里,在这里进行用户状态的革除,同时须要基于用户的状况判断是否须要进行用来执行该用户SQL的Spark 作业引擎。 return null; } @Override public TGetInfoResp GetInfo(TGetInfoReq req) throws TException { //获取服务器的元数据信息,例如应用BI工具,在命令会列出所连贯的服务的版本号等信息,均由此办法提供。 return null; } @Override public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException { //执行SQL工作,这里传递过去的是用户在客户端提交的SQL作业,接管到用户SQL后,将该SQL发送给常驻的Spark作业,这个常驻的作业在OpenSession的时候曾经确定。 return null; } @Override public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException { //获取数据库反对的类型信息,应用BI工具,例如beeline的时候会调用到这里。 return null; } @Override public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException { //获取Catalog,应用BI工具,例如beeline的时候会调用到这里。 return null; } @Override public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException { //返回查问后果,基于THandleIdentifier对象查问到用户的SQL执行的状况,将申请转发至常驻的Spark 实例,获取后果。 //参数中通过TFetchResultsReq的getFetchType来辨别是获取日志数据还是查问后果数据,getFetchType == 1为获取Log,为0是查问数据查问后果。 return null; }}
咱们采纳复用以后生态的形式,来实现兼容Hive JDBC Driver的服务器。有了下面的Thrift接口实现后,则须要启动一个Thrift服务,例如:
TThreadPoolServer.Args thriftArgs = new TThreadPoolServer.Args(serverTransport) .processorFactory(new TProcessorFactory(this)) .transportFactory(new TSaslServerTransport.Factory()) .protocolFactory(new TBinaryProtocol.Factory()) .inputProtocolFactory( new TBinaryProtocol.Factory( true, true, 10000, 10000 ) ) .requestTimeout(1000L) .requestTimeoutUnit(TimeUnit.MILLISECONDS) .beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS) .executorService(executorService);thriftArgs .executorService( new ThreadPoolExecutor( config.getMinWorkerThreads(), config.getMaxWorkerThreads(), config.getKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<>()));TThreadPoolServer server = new TThreadPoolServer(thriftArgs);server.serve();
至此便开发了一个反对Hive JDBC Driver拜访的服务器,并且在这个服务器的办法中,实现了对Spark 作业的治理。后续,还须要开发预设Spark Jar,Jar同样实现了如上接口,只是该作业的实现是理论执行用户的SQL。
通过后面的流程,曾经实现一个能够工作SparkSQL服务器开发,领有接管用户申请,执行SQL,并且返回后果的能力。但如何做的更加粗疏?例如,如何实现跨队列的提交、如何实现用户细粒度的资源管理、如何保护多个Spark 作业的连接池,咱们接下来会讲到。
因为对于Spark作业在Yarn上的提交,运行,进行均由SparkSQL服务器治理,对用户是不可见的,用户只须要编写规范的JDBC代码即可,因而能够基于用户的参数信息来匹配适合的引擎去执行,同时还能够限度一个Spark 常驻作业的工作个数,实现更加灵便的SparkSQL作业的治理,同时也能够实现相似C3P0连接池的思维,保护一个用户信息到Spark常驻作业的关联池。
SparkSQL服务器的HA
Hive Server2在启动的时候会将本人的服务器信息写入Zookeeper中,构造体如下所示:
[zk: localhost:2181(CONNECTED) 1] ls /hiveserver2\[serverUri=127.0.01:10000;version=3.1.2;sequence=0000000000]
当连贯HA模式下的服务器的时候,Hive JDBC Driver的URL须要切换成zookeeper的地址,Hive JDBC Driver会从多个地址中随机抉择一个,作为该Connection的地址,在整个Connection中均会应用该地址。
因而对于咱们实现的SparkSQL服务器,只须要在服务器启动的时候,放弃与Hive统一的数据格式,将本人的服务器的地址信息写入到Zookeeper中即可,便可通过规范的zk地址进行拜访,例如:
./bin/beeline -u "jdbc:hive2://127.0.01/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=自定义的节点;auth=LDAP" -n 用户名 -p明码
因为服务器的抉择基于Connection级别的,也就是在Connection被生成新的之前,整个服务器的地址是不会发生变化的,在产生谬误的时候服务端能够进行重试,进行地址的切换,因而HA的力度是在Connection级别而非申请级别。
对接生态工具
实现以上开发之后,即可实现在大部分的场景下,应用规范的Hive驱动只须要切换一个端口号。特地提到Hue工具,因为和Hive的集成并未应用规范的JDBC接口,而是间接离开调用的Hive Server2的Thrift接口,也就是Hue自行保护来如何拜访Thrift的接口的程序问题。
能够发现在这样的状况会有一个问题就是对于Hue来说,并没有Connection的概念,失常的SparkSQL在JDBC的交互方式下解决流程是:
因为Hue没有Connection概念,因而Hue的申请并不会先到OpenSession,而是间接发动ExecuteStatement。因为没有上下文信息,失常流程下ExecuteStatement处接管到Hue的申请会发现该申请为非法,所以OpenSession不能作为连贯的终点,而是须要在每一个Thrift接口处实现OpenSession的能力,以此作为上下文初始化。
序幕
SparkSQL在企业中的应用比重越来越大,而有一个更好用的SQL服务器,则会大大晋升应用效率和升高应用门槛。目前,SparkSQL在服务器这方面的能力显然不如Hive Server2提供的更加规范,所以各个企业均可基于本身状况,抉择是否须要开发一个适合于本身的SparkSQL服务器。
本文所提到的相干能力已通过火山引擎EMR产品向内部企业凋谢。联合字节跳动外部以及内部客户的需要状况,火山引擎EMR产品的Ksana for SparkSQL提供一个生产可用的SparkSQL服务器,并且在Spark 性能方面也做了较大的优化,本文次要围绕技术实现的角度来论述如何实现一个SparkSQL服务,后续会有更多文章讲述其余相干的优化。
产品介绍
火山引擎 E-MapReduce
反对构建开源Hadoop生态的企业级大数据分析系统,齐全兼容开源,提供 Hadoop、Spark、Hive、Flink集成和治理,帮忙用户轻松实现企业大数据平台的构建,升高运维门槛,疾速造成大数据分析能力。
更多技术交换、求职机会、试用福利,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群