SparkSQL 是 Spark 生态系统中十分重要的组件。面向企业级服务时,SparkSQL 存在易用性较差的问题,导致难满足日常的业务开发需要。本文将具体解读,如何通过构建 SparkSQL 服务器实现应用效率晋升和应用门槛升高。
前言
Spark 组件因为其较好的容错与故障复原机制,在企业的长时作业中应用的十分宽泛,而 SparkSQL 又是应用 Spark 组件中最为罕用的一种形式。
相比间接应用编程式的形式操作 Spark 的 RDD 或者 DataFrame 的 API,SparkSQL 可间接输出 SQL 对数据进行 ETL 等工作的解决,极大晋升了易用度。然而相比 Hive 等引擎来说,因为 SparkSQL 不足一个相似 Hive Server2 的 SQL 服务器,导致 SparkSQL 在易用性上比不上 Hive。
很多时候,SparkSQL 只能将本身 SQL 作业打包成一个 Jar,进行 spark-submit 命令提交,因此大大降低 Spark 的易用性。除此之外,还可应用周边工具,如 Livy,但 Livy 更像一个 Spark 服务器,而不是 SparkSQL 服务器,因而无奈反对相似 BI 工具或者 JDBC 这样的标准接口进行拜访。
尽管 Spark 提供 Spark Thrift Server,然而 Spark Thrift Server 的局限十分多,简直很难满足日常的业务开发需要,具体的剖析请查看:观点|SparkSQL 在企业级数仓建设的劣势
规范的 JDBC 接口
Java.sql 包下定义了应用 Java 拜访存储介质的所有接口,然而并没有具体的实现,也就是说 JavaEE 外面仅仅定义了应用 Java 拜访存储介质的规范流程,具体的实现须要依附周边的第三方服务实现。
例如,拜访 MySQL 的 mysql-connector-java 启动包,即基于 java.sql 包下定义的接口,实现了如何去连贯 MySQL 的流程,在代码中只须要通过如下的代码形式:
Class.forName("com.mysql.cj.jdbc.Driver");
Connection connection= DriverManager.getConnection(DB_URL,USER,PASS);
// 操作
connection.close();
第一,初始化驱动、创立连贯,第二,基于连贯进行对数据的操作,例如增删改查。能够看到在 Java 定义的标准接口拜访中,先创立一个 connection 实现存储介质,而后实现 connection 后续操作。
性能问题导致单次申请实时创立 connection 的性能较差。因而咱们往往通过保护一个存有多个 connection 的连接池,将 connection 的创立与应用离开以晋升性能,因此也衍生出很多数据库连接池,例如 C3P0,DBCP 等。
Hive 的 JDBC 实现
构建 SparkSQL 服务器最好的形式是用如上 Java 接口,且大数据生态下行业已有标杆例子,即 Hive Server2。Hive Server2 在遵循 Java JDBC 接口标准上,通过对数据操作的形式,实现了拜访 Hive 服务。除此之外,Hive Server2 在实现上,与 MySQL 等关系型数据稍有不同。
首先,Hive Server2 自身是提供了一系列 RPC 接口,具体的接口定义在 org.apache.hive.service.rpc.thrift 包下的 TCLIService.Iface 中,局部接口如下:
public TOpenSessionResp OpenSession(TOpenSessionReq req) throws org.apache.thrift.TException;
public TCloseSessionResp CloseSession(TCloseSessionReq req) throws org.apache.thrift.TException;
public TGetInfoResp GetInfo(TGetInfoReq req) throws org.apache.thrift.TException;
public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws org.apache.thrift.TException;
public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws org.apache.thrift.TException;
public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws org.apache.thrift.TException;
public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws org.apache.thrift.TException;
public TGetTablesResp GetTables(TGetTablesReq req) throws org.apache.thrift.TException;
public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws org.apache.thrift.TException;
public TGetColumnsResp GetColumns(TGetColumnsReq req) throws org.apache.thrift.TException;
也就是说,Hive Server2 的每一个申请都是独立的,并且是通过参数的形式将操作和认证信息传递。Hive 提供了一个 JDBC 的驱动实现,通过如下的依赖便可引入:
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>version/version>
</dependency>
在 HiveConnection 类中实现了将 Java 中定义的 SQL 拜访接口转化为调用 Hive Server2 的 RPC 接口的实现,并且裁减了一部分 Java 定义中不足的能力,例如实时的日志获取。然而应用该能力时,须要将对应的实现类转换为 Hive 的实现类,例如:
HiveStatement hiveStatement = (HiveStatement) connection.createStatement();
List<String> logs = hiveStatement.getQueryLog();
Log 获取也需调用 FetchResult 接口,通过不同的参数来辨别获取 Log 信息还是获取内容信息,因而,Hive JDBC 封装的调用 Hive Server2 RPC 接口流程是:
如果该流程触发获取 MetaData、获取 Functions 等操作,则会调用其余接口,其中身份信息即 token,是用 THandleIdentifier 类进行封装。在 OpenSession 时,由 Hive Server2 生成并且返回,后续所有接口都会附带传递这个信息,此信息是一次 Connection 连贯的惟一标记。
然而,Hive Server2 在 FetchResults 办法中存在 bug。因为 Hive Server2 没有很好解决 hasMoreRows 字段,导致 Hive JDBC 客户端并未通过 hasMoreRows 字段去判断是否还有下一页,而是通过返回的 List 是否为空来判断。因而,相比 Mysql Driver 等驱动,Hive 会多发动一次申请,直到返回 List 为空则进行获取下一页,对应的客户端的 JDBC 代码是:
ResultSet rs = hiveStatement.executeQuery(sql);
while (rs.next()) {//}
即 Hive JDBC 实现 next 办法是通过返回的 List 是否为空来退出 while 循环。
构建 SparkSQL 服务器
介绍了 JDBC 接口常识与 Hive 的 JDBC 常识后,如果要构建一个 SparkSQL 服务器,那么这个服务器须要有以下几个特点:
- 反对 JDBC 接口,即通过 Java 的 JDBC 规范进行拜访,能够较好与周边生态进行集成且升高应用门槛。
- 兼容 Hive 协定,如果要反对 JDBC 接口,那么须要提供 SparkSQL 的 JDBC Driver。目前,大数据畛域 Hive Server2 提供的 Hive-JDBC-Driver 曾经被宽泛应用,从迁徙老本来说最好的形式就是放弃 Hive 的应用形式不变,只须要换个端口就行,也就是能够通过 Hive 的 JDBC Driver 间接拜访 SparkSQL 服务器。
- 反对多租户,以及相似用户名 + 明码和 Kerberos 等常见的用户认证能力。
- 反对跨队列提交,同时反对在 JDBC 的参数外面配置 Spark 的相干作业参数,例如 Driver Memory,Execute Number 等。
这里还有一个问题须要思考,即用户通过 SparkSQL 服务器提交的是一段 SQL 代码,而 SparkSQL 在执行时须要向 Yarn 提交 Jar。那么,如何实现 SQL 到 Jar 提交转换?
一个最简略的形式是,用户每提交一个 SQL 就执行一次 spark-submit 命令,将后果保留再缓存,提供给客户端。还有更好形式,即提交一个常驻的 Spark 作业,这个作业是一个常驻工作,作业会开启一个端口,用来接管用户的 SQL 进行执行,并且保留。
然而为了解决相似 Spark Thrift Server 的问题,作业须要和用户进行绑定,而不是随着 Spark 的组件启动进行绑定,即作业的提交以及接管哪个用户的申请,均来自于用户的行为触发。
有了这样几个大的方向后,便能够开始开发 SparkSQL 服务器。首先须要实现 TCLIService.Iface 下的所有接口,上面用代码 + 正文的形式来讲述这些 Thrift 接口的含意,以及如果实现一个 SparkSQL 服务器,须要在这些接口做什么内容:
public class SparkSQLThriftServer implements TCLIService.Iface {
@Override
public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException {
//Hive JDBC Driver 在执行创立 Connection 的时候会调用此接口,在这里保护一个用户与 Spark 作业的对应关系。// 来判断是须要复用一个曾经存在的 Spark 作业,还是全新执行一次 spark-submt。// 用户与是否须要 spark-submit 的关联关系均在这里实现。// 同时须要生成 THandleIdentifier 对象,并且和用户身份进行关联,后续其余办法调用均须要应用这个对象关联出用户的信息。return null;
}
@Override
public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException {// 客户端调用 connection.close()办法后会进入到这里,在这里进行用户状态的革除,同时须要基于用户的状况判断是否须要进行用来执行该用户 SQL 的 Spark 作业引擎。return null;
}
@Override
public TGetInfoResp GetInfo(TGetInfoReq req) throws TException {
// 获取服务器的元数据信息,例如应用 BI 工具,在命令会列出所连贯的服务的版本号等信息,均由此办法提供。return null;
}
@Override
public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException {
// 执行 SQL 工作,这里传递过去的是用户在客户端提交的 SQL 作业,接管到用户 SQL 后,将该 SQL 发送给常驻的 Spark 作业,这个常驻的作业在 OpenSession 的时候曾经确定。return null;
}
@Override
public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException {
// 获取数据库反对的类型信息,应用 BI 工具,例如 beeline 的时候会调用到这里。return null;
}
@Override
public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException {
// 获取 Catalog,应用 BI 工具,例如 beeline 的时候会调用到这里。return null;
}
@Override
public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException {
// 返回查问后果,基于 THandleIdentifier 对象查问到用户的 SQL 执行的状况,将申请转发至常驻的 Spark 实例,获取后果。// 参数中通过 TFetchResultsReq 的 getFetchType 来辨别是获取日志数据还是查问后果数据,getFetchType == 1 为获取 Log,为 0 是查问数据查问后果。return null;
}
}
咱们采纳复用以后生态的形式,来实现兼容 Hive JDBC Driver 的服务器。有了下面的 Thrift 接口实现后,则须要启动一个 Thrift 服务,例如:
TThreadPoolServer.Args thriftArgs = new TThreadPoolServer.Args(serverTransport)
.processorFactory(new TProcessorFactory(this))
.transportFactory(new TSaslServerTransport.Factory())
.protocolFactory(new TBinaryProtocol.Factory())
.inputProtocolFactory(
new TBinaryProtocol.Factory(
true,
true,
10000,
10000
)
)
.requestTimeout(1000L)
.requestTimeoutUnit(TimeUnit.MILLISECONDS)
.beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
.executorService(executorService);
thriftArgs
.executorService(
new ThreadPoolExecutor(config.getMinWorkerThreads(),
config.getMaxWorkerThreads(),
config.getKeepAliveTime(),
TimeUnit.SECONDS, new SynchronousQueue<>()));
TThreadPoolServer server = new TThreadPoolServer(thriftArgs);
server.serve();
至此便开发了一个反对 Hive JDBC Driver 拜访的服务器,并且在这个服务器的办法中,实现了对 Spark 作业的治理。后续,还须要开发预设 Spark Jar,Jar 同样实现了如上接口,只是该作业的实现是理论执行用户的 SQL。
通过后面的流程,曾经实现一个能够工作 SparkSQL 服务器开发,领有接管用户申请,执行 SQL,并且返回后果的能力。但如何做的更加粗疏?例如,如何实现跨队列的提交、如何实现用户细粒度的资源管理、如何保护多个 Spark 作业的连接池,咱们接下来会讲到。
因为对于 Spark 作业在 Yarn 上的提交,运行,进行均由 SparkSQL 服务器治理,对用户是不可见的,用户只须要编写规范的 JDBC 代码即可,因而能够基于用户的参数信息来匹配适合的引擎去执行,同时还能够限度一个 Spark 常驻作业的工作个数,实现更加灵便的 SparkSQL 作业的治理,同时也能够实现相似 C3P0 连接池的思维,保护一个用户信息到 Spark 常驻作业的关联池。
SparkSQL 服务器的 HA
Hive Server2 在启动的时候会将本人的服务器信息写入 Zookeeper 中,构造体如下所示:
[zk: localhost:2181(CONNECTED) 1] ls /hiveserver2\
[serverUri=127.0.01:10000;version=3.1.2;sequence=0000000000]
当连贯 HA 模式下的服务器的时候,Hive JDBC Driver 的 URL 须要切换成 zookeeper 的地址,Hive JDBC Driver 会从多个地址中随机抉择一个,作为该 Connection 的地址,在整个 Connection 中均会应用该地址。
因而对于咱们实现的 SparkSQL 服务器,只须要在服务器启动的时候,放弃与 Hive 统一的数据格式,将本人的服务器的地址信息写入到 Zookeeper 中即可,便可通过规范的 zk 地址进行拜访,例如:
./bin/beeline -u "jdbc:hive2://127.0.01/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace= 自定义的节点;auth=LDAP" -n 用户名 - p 明码
因为服务器的抉择基于 Connection 级别的,也就是在 Connection 被生成新的之前,整个服务器的地址是不会发生变化的,在产生谬误的时候服务端能够进行重试,进行地址的切换,因而 HA 的力度是在 Connection 级别而非申请级别。
对接生态工具
实现以上开发之后,即可实现在大部分的场景下,应用规范的 Hive 驱动只须要切换一个端口号。特地提到 Hue 工具,因为和 Hive 的集成并未应用规范的 JDBC 接口,而是间接离开调用的 Hive Server2 的 Thrift 接口,也就是 Hue 自行保护来如何拜访 Thrift 的接口的程序问题。
能够发现在这样的状况会有一个问题就是对于 Hue 来说,并没有 Connection 的概念,失常的 SparkSQL 在 JDBC 的交互方式下解决流程是:
因为 Hue 没有 Connection 概念,因而 Hue 的申请并不会先到 OpenSession,而是间接发动 ExecuteStatement。因为没有上下文信息,失常流程下 ExecuteStatement 处接管到 Hue 的申请会发现该申请为非法,所以 OpenSession 不能作为连贯的终点,而是须要在每一个 Thrift 接口处实现 OpenSession 的能力,以此作为上下文初始化。
序幕
SparkSQL 在企业中的应用比重越来越大,而有一个更好用的 SQL 服务器,则会大大晋升应用效率和升高应用门槛。目前,SparkSQL 在服务器这方面的能力显然不如 Hive Server2 提供的更加规范,所以各个企业均可基于本身状况,抉择是否须要开发一个适合于本身的 SparkSQL 服务器。
本文所提到的相干能力已通过火山引擎 EMR 产品向内部企业凋谢。联合字节跳动外部以及内部客户的需要状况,火山引擎 EMR 产品的 Ksana for SparkSQL 提供一个生产可用的 SparkSQL 服务器,并且在 Spark 性能方面也做了较大的优化,本文次要围绕技术实现的角度来论述如何实现一个 SparkSQL 服务,后续会有更多文章讲述其余相干的优化。
产品介绍
火山引擎 E-MapReduce
反对构建开源 Hadoop 生态的企业级大数据分析系统,齐全兼容开源,提供 Hadoop、Spark、Hive、Flink 集成和治理,帮忙用户轻松实现企业大数据平台的构建,升高运维门槛,疾速造成大数据分析能力。
更多技术交换、求职机会、试用福利,欢送 关注字节跳动数据平台微信公众号,回复【1】进入官网交换群