作者:vivo互联网服务器团队-Hao Guangshi

一、背景

字段血统是在表处理的过程中将字段的处理过程保留下来。为什么会须要字段血统呢?

有了字段间的血缘关系,便能够晓得数据的起源去处,以及字段之间的转换关系,这样对数据的品质,治理有很大的帮忙。

Spark SQL 绝对于 Hive 来说通常状况下效率会比拟高,对于运行工夫、资源的应用下面等都会有较大的收益。

平台打算将 Hive 工作迁徙到 Spark SQL 上,同时也须要实现字段血统的性能。

二、后期调研

开发前咱们做了很多相干调研,从中得悉 Spark 是反对扩大的:容许用户对 Spark SQL 的 SQL 解析、逻辑打算的剖析和查看、逻辑打算的优化、物理打算的造成等进行扩大。

该计划可行,且对 Spark 的源码没有改变,代价也比拟小,确定应用该计划。

三、Spark SQL 扩大

3.1 Spark 可扩大的内容

SparkSessionExtensions 是比拟重要的一个类,其中定义了注入规定的办法,当初反对以下内容:

  • 【Analyzer Rules】逻辑打算剖析规定
  • 【Check Analysis Rules】逻辑打算查看规定
  • 【Optimizer Rules.】 逻辑打算优化规定
  • 【Planning Strategies】造成物理打算的策略
  • 【Customized Parser】自定义的sql解析器
  • 【(External) Catalog listeners catalog】监听器

在以上六种能够用户自定义的中央,咱们抉择了【Check Analysis Rules】。因为该查看规定在办法调用的时候是不须要有返回值的,也就意味着不须要对以后遍历的逻辑打算树进行批改,这正是咱们须要的。

而【Analyzer Rules】、【Optimizer Rules】则须要对以后的逻辑打算进行批改,使得咱们难以迭代整个树,难以失去咱们想要的后果。

3.2 实现本人的扩大

class ExtralSparkExtension extends (SparkSessionExtensions => Unit) {  override def apply(spark: SparkSessionExtensions): Unit = {    //字段血统    spark.injectCheckRule(FieldLineageCheckRuleV3)    //sql解析器    spark.injectParser { case (_, parser) => new ExtraSparkParser(parser) }  }}

下面依照这种形式实现扩大,并在 apply 办法中把本人须要的规定注入到 SparkSessionExtensions 即可,除了以上四种能够注入的以外还有其余的规定。要让 ExtralSparkExtension 起到作用的话咱们须要在spark-default.conf 下配置 spark.sql.extensions=org.apache.spark.sql.hive.ExtralSparkExtension 在启动 Spark 工作的时候即可失效。

留神到咱们也实现了一个自定义的SQL解析器,其实该解析器并没有做太多的事件。只是在判断如果该语句蕴含insert的时候就将 SQLText(SQL语句)设置到一个为 FIELD\_LINE\_AGE_SQL,之所以将SQLText放到 FIELD\_LINE\_AGE_SQL 外面。因为在 DheckRule 外面是拿不到SparkPlan的咱们须要对SQL再次解析拿到 SprkPlan,而FieldLineageCheckRuleV3的实现也特地简略,重要的在另一个线程实现外面。

这里咱们只关注了insert语句,因为插入语句外面有从某些个表外面输出而后写入到某个表。

class ExtraSparkParser(delegate: ParserInterface) extends ParserInterface with Logging{  override def parsePlan(sqlText: String): LogicalPlan = {    val lineAgeEnabled = SparkSession.getActiveSession      .get.conf.getOption("spark.sql.xxx-xxx-xxx.enable").getOrElse("false").toBoolean    logDebug(s"SqlText: $sqlText")    if(sqlText.toLowerCase().contains("insert")){      if(lineAgeEnabled){        if(FIELD_LINE_AGE_SQL_COULD_SET.get()){          //线程本地变量在这里          FIELD_LINE_AGE_SQL.set(sqlText)        }        FIELD_LINE_AGE_SQL_COULD_SET.remove()      }    }    delegate.parsePlan(sqlText)  }  //调用原始的sqlparser  override def parseExpression(sqlText: String): Expression = {    delegate.parseExpression(sqlText)  }  //调用原始的sqlparser  override def parseTableIdentifier(sqlText: String): TableIdentifier = {    delegate.parseTableIdentifier(sqlText)  }  //调用原始的sqlparser  override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {    delegate.parseFunctionIdentifier(sqlText)  }  //调用原始的sqlparser  override def parseTableSchema(sqlText: String): StructType = {    delegate.parseTableSchema(sqlText)  }  //调用原始的sqlparser  override def parseDataType(sqlText: String): DataType = {    delegate.parseDataType(sqlText)  }}

3.3 扩大的规定类

case class FieldLineageCheckRuleV3(sparkSession:SparkSession) extends (LogicalPlan=>Unit ) {  val executor: ThreadPoolExecutor =    ThreadUtils.newDaemonCachedThreadPool("spark-field-line-age-collector",3,6)  override def apply(plan: LogicalPlan): Unit = {    val sql = FIELD_LINE_AGE_SQL.get    FIELD_LINE_AGE_SQL.remove()    if(sql != null){      //这里咱们拿到sql而后启动一个线程做残余的解析工作      val task = new FieldLineageRunnableV3(sparkSession,sql)      executor.execute(task)    }  }}

很简略,咱们只是拿到了 SQL 而后便启动了一个线程去失去 SparkPlan,理论逻辑在FieldLineageRunnableV3。

3.4 具体的实现办法

3.4.1 失去 SparkPlan

咱们在 run 办法中失去 SparkPlan:

override def run(): Unit = {  val parser = sparkSession.sessionState.sqlParser  val analyzer = sparkSession.sessionState.analyzer  val optimizer = sparkSession.sessionState.optimizer  val planner = sparkSession.sessionState.planner      ............  val newPlan = parser.parsePlan(sql)  PASS_TABLE_AUTH.set(true)  val analyzedPlan = analyzer.executeAndCheck(newPlan)  val optimizerPlan = optimizer.execute(analyzedPlan)  //失去sparkPlan  val sparkPlan = planner.plan(optimizerPlan).next()  ...............if(targetTable != null){  val levelProject = new ArrayBuffer[ArrayBuffer[NameExpressionHolder]]()  val predicates = new ArrayBuffer[(String,ArrayBuffer[NameExpressionHolder])]()  //projection  projectionLineAge(levelProject, sparkPlan.child)  //predication  predicationLineAge(predicates, sparkPlan.child)  ...............

为什么要应用 SparkPlan 呢?当初咱们思考的时候,物理打算拿取字段关系的时候是比拟准的,且链路比拟短也更间接。

在这里补充一下 Spark SQL 解析的过程如下:

通过SqlParser后会失去逻辑打算,此时表名、函数等都没有解析,还不能执行;通过Analyzer会剖析一些绑定信息,例如表验证、字段信息、函数信息;通过Optimizer 后逻辑打算会依据既定规定被优化,这里的规定是RBO,当然 Spark 还反对CBO的优化;通过SparkPlanner后就成了可执行的物理打算。

咱们看一个逻辑打算与物理打算比照的例子:

一个 SQL 语句:

select item_id,TYPE,v_value,imei from t1union allselect item_id,TYPE,v_value,imei from t2union allselect item_id,TYPE,v_value,imei from t3

逻辑打算是这样的:

物理打算是这样的:

显然简化了很多。

失去 SparkPlan 后,咱们就能够依据不同的SparkPlan节点做迭代解决。

咱们将字段血统分为两种类型:projection(select查问字段)、predication(wehre查问条件)。

这两种是一种点对点的关系,即从原始表的字段生成指标表的字段的对应关系。

设想一个查问是一棵树,那么迭代关系会如下从树的顶端开始迭代,直到树的叶子节点,叶子节点即为原始表:

那么咱们迭代查问的后果应该为

id ->tab1.id ,

name->tab1.name,tabb2.name,

age→tabb2.age。

留神到有该变量 val levelProject = new ArrayBuffer ArrayBuffer[NameExpressionHolder],通过projecti-onLineAge 迭代后 levelProject 存储了顶层id,name,age对应的(tab1.id),(tab1.name,tabb2.name),(tabb2.age)。

当然也不是简略的递归迭代,还须要思考非凡状况例如:Join、ExplandExec、Aggregate、Explode、GenerateExec等都须要非凡思考。

例子及成果:

SQL:

with A as (select id,name,age from tab1 where id > 100 ) ,C as (select id,name,max(age) from A group by A.id,A.name) ,B as (select id,name,age from tabb2 where age > 28)insert into tab3   select C.id,concat(C.name,B.name) as name, B.age from     B,C where C.id = B.id

成果:

{  "edges": [    {      "sources": [        3      ],      "targets": [        0      ],      "expression": "id",      "edgeType": "PROJECTION"    },    {      "sources": [        4,        7      ],      "targets": [        1      ],      "expression": "name",      "edgeType": "PROJECTION"    },    {      "sources": [        5      ],      "targets": [        2      ],      "expression": "age",      "edgeType": "PROJECTION"    },    {      "sources": [        6,        3      ],      "targets": [        0,        1,        2      ],      "expression": "INNER",      "edgeType": "PREDICATE"    },    {      "sources": [        6,        5      ],      "targets": [        0,        1,        2      ],      "expression": "((((default.tabb2.`age` IS NOT NULL) AND (CAST(default.tabb2.`age` AS INT) > 28)) AND (B.`id` > 100)) AND (B.`id` IS NOT NULL))",      "edgeType": "PREDICATE"    },    {      "sources": [        3      ],      "targets": [        0,        1,        2      ],      "expression": "((default.tab1.`id` IS NOT NULL) AND (default.tab1.`id` > 100))",      "edgeType": "PREDICATE"    }  ],  "vertices": [    {      "id": 0,      "vertexType": "COLUMN",      "vertexId": "default.tab3.id"    },    {      "id": 1,      "vertexType": "COLUMN",      "vertexId": "default.tab3.name"    },    {      "id": 2,      "vertexType": "COLUMN",      "vertexId": "default.tab3.age"    },    {      "id": 3,      "vertexType": "COLUMN",      "vertexId": "default.tab1.id"    },    {      "id": 4,      "vertexType": "COLUMN",      "vertexId": "default.tab1.name"    },    {      "id": 5,      "vertexType": "COLUMN",      "vertexId": "default.tabb2.age"    },    {      "id": 6,      "vertexType": "COLUMN",      "vertexId": "default.tabb2.id"    },    {      "id": 7,      "vertexType": "COLUMN",      "vertexId": "default.tabb2.name"    }  ]}

四、总结

在 Spark SQL 的字段血统实现中,咱们通过其自扩大,首先拿到了 insert 语句,在咱们本人的查看规定中拿到 SQL 语句,通过SparkSqlParser、Analyzer、Optimizer、SparkPlanner,最终失去了物理打算。

咱们通过迭代物理打算,依据不同执行打算做对应的转换,而后就失去了字段之间的对应关系。以后的实现是比较简单的,字段之间是直线的对应关系,两头过程被疏忽,如果想实现字段的转换的整个过程也是没有问题的。