关于spark:Spark-SQL-字段血缘在-vivo-互联网的实践

66次阅读

共计 6345 个字符,预计需要花费 16 分钟才能阅读完成。

作者:vivo 互联网服务器团队 -Hao Guangshi

一、背景

字段血统是在表处理的过程中将字段的处理过程保留下来。为什么会须要字段血统呢?

有了字段间的血缘关系,便能够晓得数据的起源去处,以及字段之间的转换关系,这样对数据的品质,治理有很大的帮忙。

Spark SQL 绝对于 Hive 来说通常状况下效率会比拟高,对于运行工夫、资源的应用下面等都会有较大的收益。

平台打算将 Hive 工作迁徙到 Spark SQL 上,同时也须要实现字段血统的性能。

二、后期调研

开发前咱们做了很多相干调研,从中得悉 Spark 是反对扩大的:容许用户对 Spark SQL 的 SQL 解析、逻辑打算的剖析和查看、逻辑打算的优化、物理打算的造成等进行扩大。

该计划可行,且对 Spark 的源码没有改变,代价也比拟小,确定应用该计划。

三、Spark SQL 扩大

3.1 Spark 可扩大的内容

SparkSessionExtensions 是比拟重要的一个类,其中定义了注入规定的办法,当初反对以下内容:

  • 【Analyzer Rules】逻辑打算剖析规定
  • 【Check Analysis Rules】逻辑打算查看规定
  • 【Optimizer Rules.】逻辑打算优化规定
  • 【Planning Strategies】造成物理打算的策略
  • 【Customized Parser】自定义的 sql 解析器
  • 【(External) Catalog listeners catalog】监听器

在以上六种能够用户自定义的中央,咱们抉择了【Check Analysis Rules】。因为该查看规定在办法调用的时候是不须要有返回值的,也就意味着不须要对以后遍历的逻辑打算树进行批改,这正是咱们须要的。

而【Analyzer Rules】、【Optimizer Rules】则须要对以后的逻辑打算进行批改,使得咱们难以迭代整个树,难以失去咱们想要的后果。

3.2 实现本人的扩大

class ExtralSparkExtension extends (SparkSessionExtensions => Unit) {override def apply(spark: SparkSessionExtensions): Unit = {

    // 字段血统
    spark.injectCheckRule(FieldLineageCheckRuleV3)

    //sql 解析器
    spark.injectParser {case (_, parser) => new ExtraSparkParser(parser) }

  }
}

下面依照这种形式实现扩大,并在 apply 办法中把本人须要的规定注入到 SparkSessionExtensions 即可,除了以上四种能够注入的以外还有其余的规定。要让 ExtralSparkExtension 起到作用的话咱们须要在 spark-default.conf 下配置 spark.sql.extensions=org.apache.spark.sql.hive.ExtralSparkExtension 在启动 Spark 工作的时候即可失效。

留神到咱们也实现了一个自定义的 SQL 解析器,其实该解析器并没有做太多的事件。只是在判断如果该语句蕴含 insert 的时候就将 SQLText(SQL 语句)设置到一个为 FIELD\_LINE\_AGE_SQL,之所以将 SQLText 放到 FIELD\_LINE\_AGE_SQL 外面。因为在 DheckRule 外面是拿不到 SparkPlan 的咱们须要对 SQL 再次解析拿到 SprkPlan,而 FieldLineageCheckRuleV3 的实现也特地简略,重要的在另一个线程实现外面。

这里咱们只关注了 insert 语句,因为插入语句外面有从某些个表外面输出而后写入到某个表。

class ExtraSparkParser(delegate: ParserInterface) extends ParserInterface with Logging{override def parsePlan(sqlText: String): LogicalPlan = {
    val lineAgeEnabled = SparkSession.getActiveSession
      .get.conf.getOption("spark.sql.xxx-xxx-xxx.enable").getOrElse("false").toBoolean
    logDebug(s"SqlText: $sqlText")
    if(sqlText.toLowerCase().contains("insert")){if(lineAgeEnabled){if(FIELD_LINE_AGE_SQL_COULD_SET.get()){
          // 线程本地变量在这里
          FIELD_LINE_AGE_SQL.set(sqlText)
        }
        FIELD_LINE_AGE_SQL_COULD_SET.remove()}
    }
    delegate.parsePlan(sqlText)
  }
  // 调用原始的 sqlparser
  override def parseExpression(sqlText: String): Expression = {delegate.parseExpression(sqlText)
  }
  // 调用原始的 sqlparser
  override def parseTableIdentifier(sqlText: String): TableIdentifier = {delegate.parseTableIdentifier(sqlText)
  }
  // 调用原始的 sqlparser
  override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {delegate.parseFunctionIdentifier(sqlText)
  }
  // 调用原始的 sqlparser
  override def parseTableSchema(sqlText: String): StructType = {delegate.parseTableSchema(sqlText)
  }
  // 调用原始的 sqlparser
  override def parseDataType(sqlText: String): DataType = {delegate.parseDataType(sqlText)
  }
}

3.3 扩大的规定类

case class FieldLineageCheckRuleV3(sparkSession:SparkSession) extends (LogicalPlan=>Unit) {

  val executor: ThreadPoolExecutor =
    ThreadUtils.newDaemonCachedThreadPool("spark-field-line-age-collector",3,6)

  override def apply(plan: LogicalPlan): Unit = {
    val sql = FIELD_LINE_AGE_SQL.get
    FIELD_LINE_AGE_SQL.remove()
    if(sql != null){
      // 这里咱们拿到 sql 而后启动一个线程做残余的解析工作
      val task = new FieldLineageRunnableV3(sparkSession,sql)
      executor.execute(task)
    }

  }
}

很简略,咱们只是拿到了 SQL 而后便启动了一个线程去失去 SparkPlan,理论逻辑在 FieldLineageRunnableV3。

3.4 具体的实现办法

3.4.1 失去 SparkPlan

咱们在 run 办法中失去 SparkPlan:

override def run(): Unit = {
  val parser = sparkSession.sessionState.sqlParser
  val analyzer = sparkSession.sessionState.analyzer
  val optimizer = sparkSession.sessionState.optimizer
  val planner = sparkSession.sessionState.planner
      ............
  val newPlan = parser.parsePlan(sql)
  PASS_TABLE_AUTH.set(true)
  val analyzedPlan = analyzer.executeAndCheck(newPlan)

  val optimizerPlan = optimizer.execute(analyzedPlan)
  // 失去 sparkPlan
  val sparkPlan = planner.plan(optimizerPlan).next()
  ...............
if(targetTable != null){val levelProject = new ArrayBuffer[ArrayBuffer[NameExpressionHolder]]()
  val predicates = new ArrayBuffer[(String,ArrayBuffer[NameExpressionHolder])]()
  //projection
  projectionLineAge(levelProject, sparkPlan.child)
  //predication
  predicationLineAge(predicates, sparkPlan.child)
  ...............

为什么要应用 SparkPlan 呢?当初咱们思考的时候,物理打算拿取字段关系的时候是比拟准的,且链路比拟短也更间接。

在这里补充一下 Spark SQL 解析的过程如下:

通过 SqlParser 后会失去逻辑打算,此时表名、函数等都没有解析,还不能执行;通过 Analyzer 会剖析一些绑定信息,例如表验证、字段信息、函数信息;通过 Optimizer 后逻辑打算会依据既定规定被优化,这里的规定是 RBO,当然 Spark 还反对 CBO 的优化;通过 SparkPlanner 后就成了可执行的物理打算。

咱们看一个逻辑打算与物理打算比照的例子:

一个 SQL 语句:

select item_id,TYPE,v_value,imei from t1
union all
select item_id,TYPE,v_value,imei from t2
union all
select item_id,TYPE,v_value,imei from t3

逻辑打算是这样的:

物理打算是这样的:

显然简化了很多。

失去 SparkPlan 后,咱们就能够依据不同的 SparkPlan 节点做迭代解决。

咱们将字段血统分为两种类型:projection(select 查问字段)、predication(wehre 查问条件)。

这两种是一种点对点的关系, 即从原始表的字段生成指标表的字段的对应关系。

设想一个查问是一棵树,那么迭代关系会如下从树的顶端开始迭代,直到树的叶子节点,叶子节点即为原始表:

那么咱们迭代查问的后果应该为

id ->tab1.id ,

name->tab1.name,tabb2.name,

age→tabb2.age。

留神到有该变量 val levelProject = new ArrayBuffer ArrayBuffer[NameExpressionHolder],通过 projecti-onLineAge 迭代后 levelProject 存储了顶层 id,name,age 对应的(tab1.id),(tab1.name,tabb2.name),(tabb2.age)。

当然也不是简略的递归迭代,还须要思考非凡状况例如:Join、ExplandExec、Aggregate、Explode、GenerateExec 等都须要非凡思考。

例子及成果:

SQL:

with A as (select id,name,age from tab1 where id > 100) ,
C as (select id,name,max(age) from A group by A.id,A.name) ,
B as (select id,name,age from tabb2 where age > 28)
insert into tab3
   select C.id,concat(C.name,B.name) as name, B.age from
     B,C where C.id = B.id

成果:

{
  "edges": [
    {
      "sources": [3],
      "targets": [0],
      "expression": "id",
      "edgeType": "PROJECTION"
    },
    {
      "sources": [
        4,
        7
      ],
      "targets": [1],
      "expression": "name",
      "edgeType": "PROJECTION"
    },
    {
      "sources": [5],
      "targets": [2],
      "expression": "age",
      "edgeType": "PROJECTION"
    },
    {
      "sources": [
        6,
        3
      ],
      "targets": [
        0,
        1,
        2
      ],
      "expression": "INNER",
      "edgeType": "PREDICATE"
    },
    {
      "sources": [
        6,
        5
      ],
      "targets": [
        0,
        1,
        2
      ],
      "expression": "((((default.tabb2.`age` IS NOT NULL) AND (CAST(default.tabb2.`age` AS INT) > 28)) AND (B.`id` > 100)) AND (B.`id` IS NOT NULL))",
      "edgeType": "PREDICATE"
    },
    {
      "sources": [3],
      "targets": [
        0,
        1,
        2
      ],
      "expression": "((default.tab1.`id` IS NOT NULL) AND (default.tab1.`id` > 100))",
      "edgeType": "PREDICATE"
    }
  ],
  "vertices": [
    {
      "id": 0,
      "vertexType": "COLUMN",
      "vertexId": "default.tab3.id"
    },
    {
      "id": 1,
      "vertexType": "COLUMN",
      "vertexId": "default.tab3.name"
    },
    {
      "id": 2,
      "vertexType": "COLUMN",
      "vertexId": "default.tab3.age"
    },
    {
      "id": 3,
      "vertexType": "COLUMN",
      "vertexId": "default.tab1.id"
    },
    {
      "id": 4,
      "vertexType": "COLUMN",
      "vertexId": "default.tab1.name"
    },
    {
      "id": 5,
      "vertexType": "COLUMN",
      "vertexId": "default.tabb2.age"
    },
    {
      "id": 6,
      "vertexType": "COLUMN",
      "vertexId": "default.tabb2.id"
    },
    {
      "id": 7,
      "vertexType": "COLUMN",
      "vertexId": "default.tabb2.name"
    }
  ]
}

四、总结

在 Spark SQL 的字段血统实现中,咱们通过其自扩大,首先拿到了 insert 语句,在咱们本人的查看规定中拿到 SQL 语句,通过 SparkSqlParser、Analyzer、Optimizer、SparkPlanner,最终失去了物理打算。

咱们通过迭代物理打算,依据不同执行打算做对应的转换,而后就失去了字段之间的对应关系。以后的实现是比较简单的,字段之间是直线的对应关系,两头过程被疏忽,如果想实现字段的转换的整个过程也是没有问题的。

正文完
 0