共计 6345 个字符,预计需要花费 16 分钟才能阅读完成。
作者:vivo 互联网服务器团队 -Hao Guangshi
一、背景
字段血统是在表处理的过程中将字段的处理过程保留下来。为什么会须要字段血统呢?
有了字段间的血缘关系,便能够晓得数据的起源去处,以及字段之间的转换关系,这样对数据的品质,治理有很大的帮忙。
Spark SQL 绝对于 Hive 来说通常状况下效率会比拟高,对于运行工夫、资源的应用下面等都会有较大的收益。
平台打算将 Hive 工作迁徙到 Spark SQL 上,同时也须要实现字段血统的性能。
二、后期调研
开发前咱们做了很多相干调研,从中得悉 Spark 是反对扩大的:容许用户对 Spark SQL 的 SQL 解析、逻辑打算的剖析和查看、逻辑打算的优化、物理打算的造成等进行扩大。
该计划可行,且对 Spark 的源码没有改变,代价也比拟小,确定应用该计划。
三、Spark SQL 扩大
3.1 Spark 可扩大的内容
SparkSessionExtensions 是比拟重要的一个类,其中定义了注入规定的办法,当初反对以下内容:
- 【Analyzer Rules】逻辑打算剖析规定
- 【Check Analysis Rules】逻辑打算查看规定
- 【Optimizer Rules.】逻辑打算优化规定
- 【Planning Strategies】造成物理打算的策略
- 【Customized Parser】自定义的 sql 解析器
- 【(External) Catalog listeners catalog】监听器
在以上六种能够用户自定义的中央,咱们抉择了【Check Analysis Rules】。因为该查看规定在办法调用的时候是不须要有返回值的,也就意味着不须要对以后遍历的逻辑打算树进行批改,这正是咱们须要的。
而【Analyzer Rules】、【Optimizer Rules】则须要对以后的逻辑打算进行批改,使得咱们难以迭代整个树,难以失去咱们想要的后果。
3.2 实现本人的扩大
class ExtralSparkExtension extends (SparkSessionExtensions => Unit) {override def apply(spark: SparkSessionExtensions): Unit = {
// 字段血统
spark.injectCheckRule(FieldLineageCheckRuleV3)
//sql 解析器
spark.injectParser {case (_, parser) => new ExtraSparkParser(parser) }
}
}
下面依照这种形式实现扩大,并在 apply 办法中把本人须要的规定注入到 SparkSessionExtensions 即可,除了以上四种能够注入的以外还有其余的规定。要让 ExtralSparkExtension 起到作用的话咱们须要在 spark-default.conf 下配置 spark.sql.extensions=org.apache.spark.sql.hive.ExtralSparkExtension 在启动 Spark 工作的时候即可失效。
留神到咱们也实现了一个自定义的 SQL 解析器,其实该解析器并没有做太多的事件。只是在判断如果该语句蕴含 insert 的时候就将 SQLText(SQL 语句)设置到一个为 FIELD\_LINE\_AGE_SQL,之所以将 SQLText 放到 FIELD\_LINE\_AGE_SQL 外面。因为在 DheckRule 外面是拿不到 SparkPlan 的咱们须要对 SQL 再次解析拿到 SprkPlan,而 FieldLineageCheckRuleV3 的实现也特地简略,重要的在另一个线程实现外面。
这里咱们只关注了 insert 语句,因为插入语句外面有从某些个表外面输出而后写入到某个表。
class ExtraSparkParser(delegate: ParserInterface) extends ParserInterface with Logging{override def parsePlan(sqlText: String): LogicalPlan = {
val lineAgeEnabled = SparkSession.getActiveSession
.get.conf.getOption("spark.sql.xxx-xxx-xxx.enable").getOrElse("false").toBoolean
logDebug(s"SqlText: $sqlText")
if(sqlText.toLowerCase().contains("insert")){if(lineAgeEnabled){if(FIELD_LINE_AGE_SQL_COULD_SET.get()){
// 线程本地变量在这里
FIELD_LINE_AGE_SQL.set(sqlText)
}
FIELD_LINE_AGE_SQL_COULD_SET.remove()}
}
delegate.parsePlan(sqlText)
}
// 调用原始的 sqlparser
override def parseExpression(sqlText: String): Expression = {delegate.parseExpression(sqlText)
}
// 调用原始的 sqlparser
override def parseTableIdentifier(sqlText: String): TableIdentifier = {delegate.parseTableIdentifier(sqlText)
}
// 调用原始的 sqlparser
override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {delegate.parseFunctionIdentifier(sqlText)
}
// 调用原始的 sqlparser
override def parseTableSchema(sqlText: String): StructType = {delegate.parseTableSchema(sqlText)
}
// 调用原始的 sqlparser
override def parseDataType(sqlText: String): DataType = {delegate.parseDataType(sqlText)
}
}
3.3 扩大的规定类
case class FieldLineageCheckRuleV3(sparkSession:SparkSession) extends (LogicalPlan=>Unit) {
val executor: ThreadPoolExecutor =
ThreadUtils.newDaemonCachedThreadPool("spark-field-line-age-collector",3,6)
override def apply(plan: LogicalPlan): Unit = {
val sql = FIELD_LINE_AGE_SQL.get
FIELD_LINE_AGE_SQL.remove()
if(sql != null){
// 这里咱们拿到 sql 而后启动一个线程做残余的解析工作
val task = new FieldLineageRunnableV3(sparkSession,sql)
executor.execute(task)
}
}
}
很简略,咱们只是拿到了 SQL 而后便启动了一个线程去失去 SparkPlan,理论逻辑在 FieldLineageRunnableV3。
3.4 具体的实现办法
3.4.1 失去 SparkPlan
咱们在 run 办法中失去 SparkPlan:
override def run(): Unit = {
val parser = sparkSession.sessionState.sqlParser
val analyzer = sparkSession.sessionState.analyzer
val optimizer = sparkSession.sessionState.optimizer
val planner = sparkSession.sessionState.planner
............
val newPlan = parser.parsePlan(sql)
PASS_TABLE_AUTH.set(true)
val analyzedPlan = analyzer.executeAndCheck(newPlan)
val optimizerPlan = optimizer.execute(analyzedPlan)
// 失去 sparkPlan
val sparkPlan = planner.plan(optimizerPlan).next()
...............
if(targetTable != null){val levelProject = new ArrayBuffer[ArrayBuffer[NameExpressionHolder]]()
val predicates = new ArrayBuffer[(String,ArrayBuffer[NameExpressionHolder])]()
//projection
projectionLineAge(levelProject, sparkPlan.child)
//predication
predicationLineAge(predicates, sparkPlan.child)
...............
为什么要应用 SparkPlan 呢?当初咱们思考的时候,物理打算拿取字段关系的时候是比拟准的,且链路比拟短也更间接。
在这里补充一下 Spark SQL 解析的过程如下:
通过 SqlParser 后会失去逻辑打算,此时表名、函数等都没有解析,还不能执行;通过 Analyzer 会剖析一些绑定信息,例如表验证、字段信息、函数信息;通过 Optimizer 后逻辑打算会依据既定规定被优化,这里的规定是 RBO,当然 Spark 还反对 CBO 的优化;通过 SparkPlanner 后就成了可执行的物理打算。
咱们看一个逻辑打算与物理打算比照的例子:
一个 SQL 语句:
select item_id,TYPE,v_value,imei from t1
union all
select item_id,TYPE,v_value,imei from t2
union all
select item_id,TYPE,v_value,imei from t3
逻辑打算是这样的:
物理打算是这样的:
显然简化了很多。
失去 SparkPlan 后,咱们就能够依据不同的 SparkPlan 节点做迭代解决。
咱们将字段血统分为两种类型:projection(select 查问字段)、predication(wehre 查问条件)。
这两种是一种点对点的关系, 即从原始表的字段生成指标表的字段的对应关系。
设想一个查问是一棵树,那么迭代关系会如下从树的顶端开始迭代,直到树的叶子节点,叶子节点即为原始表:
那么咱们迭代查问的后果应该为
id ->tab1.id ,
name->tab1.name,tabb2.name,
age→tabb2.age。
留神到有该变量 val levelProject = new ArrayBuffer ArrayBuffer[NameExpressionHolder],通过 projecti-onLineAge 迭代后 levelProject 存储了顶层 id,name,age 对应的(tab1.id),(tab1.name,tabb2.name),(tabb2.age)。
当然也不是简略的递归迭代,还须要思考非凡状况例如:Join、ExplandExec、Aggregate、Explode、GenerateExec 等都须要非凡思考。
例子及成果:
SQL:
with A as (select id,name,age from tab1 where id > 100) ,
C as (select id,name,max(age) from A group by A.id,A.name) ,
B as (select id,name,age from tabb2 where age > 28)
insert into tab3
select C.id,concat(C.name,B.name) as name, B.age from
B,C where C.id = B.id
成果:
{
"edges": [
{
"sources": [3],
"targets": [0],
"expression": "id",
"edgeType": "PROJECTION"
},
{
"sources": [
4,
7
],
"targets": [1],
"expression": "name",
"edgeType": "PROJECTION"
},
{
"sources": [5],
"targets": [2],
"expression": "age",
"edgeType": "PROJECTION"
},
{
"sources": [
6,
3
],
"targets": [
0,
1,
2
],
"expression": "INNER",
"edgeType": "PREDICATE"
},
{
"sources": [
6,
5
],
"targets": [
0,
1,
2
],
"expression": "((((default.tabb2.`age` IS NOT NULL) AND (CAST(default.tabb2.`age` AS INT) > 28)) AND (B.`id` > 100)) AND (B.`id` IS NOT NULL))",
"edgeType": "PREDICATE"
},
{
"sources": [3],
"targets": [
0,
1,
2
],
"expression": "((default.tab1.`id` IS NOT NULL) AND (default.tab1.`id` > 100))",
"edgeType": "PREDICATE"
}
],
"vertices": [
{
"id": 0,
"vertexType": "COLUMN",
"vertexId": "default.tab3.id"
},
{
"id": 1,
"vertexType": "COLUMN",
"vertexId": "default.tab3.name"
},
{
"id": 2,
"vertexType": "COLUMN",
"vertexId": "default.tab3.age"
},
{
"id": 3,
"vertexType": "COLUMN",
"vertexId": "default.tab1.id"
},
{
"id": 4,
"vertexType": "COLUMN",
"vertexId": "default.tab1.name"
},
{
"id": 5,
"vertexType": "COLUMN",
"vertexId": "default.tabb2.age"
},
{
"id": 6,
"vertexType": "COLUMN",
"vertexId": "default.tabb2.id"
},
{
"id": 7,
"vertexType": "COLUMN",
"vertexId": "default.tabb2.name"
}
]
}
四、总结
在 Spark SQL 的字段血统实现中,咱们通过其自扩大,首先拿到了 insert 语句,在咱们本人的查看规定中拿到 SQL 语句,通过 SparkSqlParser、Analyzer、Optimizer、SparkPlanner,最终失去了物理打算。
咱们通过迭代物理打算,依据不同执行打算做对应的转换,而后就失去了字段之间的对应关系。以后的实现是比较简单的,字段之间是直线的对应关系,两头过程被疏忽,如果想实现字段的转换的整个过程也是没有问题的。