关于scala:数据结构与算法的实际应用根据表关系构建SQL语句

53次阅读

共计 12872 个字符,预计需要花费 33 分钟才能阅读完成。

背景需要

最近在我的项目中有一个场景,依据前端可视化模式传入的参数构建一组 SQL 语句,利用在 Spark Streaming 利用的数据同步中。这其实是一个已有的性能,然而发现原先的代码实现发现有较重大的问题,导致该性能在有关联查问时不可用,我通过调研之后决定从新实现。

这些 SQL 由一般的 Lookup SQL 和 Spark SQL 组成,Lookup SQL 用于查问关联数据,SparkSQL 则用于输入后果,外围问题在于如何正当组织这些表的关联关系。

PS:实现代码为 Scala 语言。

参数

其中前端传入的参数为

case class UpdateTask(@BeanProperty id: Option[Long],
                      @BeanProperty taskName: Option[String],
                      @BeanProperty taskDesc: Option[String],
                      @BeanProperty sourceInstance: Option[String],
                      @BeanProperty targetInstance: Option[Long],
                      @BeanProperty eventInstance: Option[Long],
                      @BeanProperty sourceTree: Option[Seq[Long]],
                      @BeanProperty selectSourceTree: Option[Seq[Long]],
                      @BeanProperty targetTree: Option[Long],
                      @BeanProperty eventTable: Option[Long],
                      @BeanProperty tableRelation: Option[Seq[TableRelation]],
                      @BeanProperty filterCondition: Option[String],
                      @BeanProperty targetCalculateTableName: Option[String],
                      @BeanProperty targetCalculate: Option[Seq[TargetCalculate]],
                      @BeanProperty sourceTableField: Option[Seq[TableColumnInfo]],
                      @BeanProperty sqlType: Option[Int],
                      @BeanProperty classicSql: Option[String],
                      @BeanProperty sinkConfig: Option[String],
                      @BeanProperty targetPrimaryKey: Option[Seq[String]]
                     ) extends SimpleBaseEntity

所须要用的参数为

  • eventTable : 触发表
  • tableRelation : 表关联关系列表,其中TableRelation 的构造为

    case class TableRelation(@BeanProperty leftTableSelect: Long,
                             @BeanProperty rightTableSelect: Long,
                             @BeanProperty leftColumnSelect: String,
                             @BeanProperty rightColumnSelect: String)
  • targetCalculate : 输入后果的计算表达式,其中 TargetCalculate 的构造为

    case class TargetCalculate(@BeanProperty columnName: String,
                               @BeanProperty config: String)
  • selectSourceTree : 所用到的源表

解决方案

当没有关联关系的时候,比较简单,不在此探讨。当有多个关联关系时,应该先查问出被关联的表数据,再查问下一级的表,以此类推,理论场景下可能个别只有一两个表关联,然而毕竟还是须要思考极其状况,原先的实现只思考了简略的关联,简单一点的关联则无奈解决,通过一段时间思考后,决定基于树这种数据结构去实现此性能。

假如传入了如下一些表关系,并且 A 表为源表(触发表):

A <-> B
A <-> C
A <-> D
B <-> E
B <-> F
E <-> G
C <-> H
C <-> I

则通过解决后,能够生成如下一个树

             --> E <--> G
    --> B <--|
    |        --> F
    |
A <----> D
    |
    |        --> H
    --> C <--|
             --> I

在此须要阐明,不须要思考左右程序问题,例如 A <-> B 等价于 B <-> A,在前面对此问题会有阐明。

当传入了多个雷同的表关联关系时,须要做一个聚合,因为前端的参数中,每一个关联关系只蕴含一组关联字段,所以当有多个关联字段时,就传入了多个雷同的关联关系,然而关联字段不同。

失去这个树形关系后,也同时失去了表之间的依赖关系,然而还有一个前提,每个表只能依赖一个表,假如如下关系:

             --> E <--> G
    --> B <--|
    |        --> F
    |
A <----> D
    |
    |        --> H
    --> G <--|
             --> I

此时,G 表既能够由 A 失去,又能够由 E 失去,假如从 A 表失去 G 表,那么从 G 表又能够失去 E 表 …… 产生了歧义,并由此产生一个了有环图。然而咱们需要中目前没有这种关联关系(因为前端配置页面中,没有标识关联的方向性,即目前可视化模式传入的关联关系都是双向,对于一组关系,既能够从 A 失去 B,也能够从 B 失去 A,也就是后面的:A <-> B 等价于 B <-> A),所以不思考这种状况,呈现时给予报错,提醒依赖关系产生了环。如果有方向性的话,咱们生成树的算法会更简略一些,间接 DFS 即可,然而对于反复呈现的表,须要做额定解决,例如给反复表起别名,保障后果集不会呈现重名字段,否则 Spark 在处理过程中会产生异样。

在失去这个依赖关系后,前面的事件就好办了,咱们从根节点开始层序遍历(也即为 BFS 广度优先遍历),逐层构建 SQL 语句,也能够采纳树的先序遍历(DFS 深度优先),只有保障子节点在父节点前面遍历即可,保障前面的 SQL 语句用到的关联参数在后面的 SQL 中曾经查问到。

在生成 SQL 的过程中,为了防止不同库表有雷同的表名或字段名,除了最初一句输入后果的 Spark SQL,后面的 SQL 查问字段均须要起一个别名,在此沿用之前旧代码的计划:应用 {字段名} AS {库名}__{表名}__{字段名} 的模式保障字段名不会反复

代码实现

数据结构类定义

有了思路之后,便开始着手实现此性能,首先定义一个树节点的 case 类:

case class TableRelationTreeNode(value: Long, // 以后节点的表 id
                                 parentRelation: LinkRelation, // 和父节点的关联关系
                                 childs: ListBuffer[TableRelationTreeNode]  // 子节点
                                 )

LinkRelation 形容了两个表之间的关联关系,是对前端传入的 TableRelation 聚合后的后果:

case class LinkRelation(leftTable: Long, // 左表 id
                        rightTable: Long, // 右表 id
                        linkFields: Seq[(String, String)] // 关联字段, 元组的两个参数别离为左表字段、右表字段
                        )

关联关系树的构建

/**
 * @param parentNode      父节点
 * @param remainRelations 残余关联关系
 */
def buildRelationTree(parentNode: TableRelationTreeNode, remainRelations: ListBuffer[LinkRelation]): Any = {if (remainRelations.isEmpty) return
  val parentTableId = parentNode.value;
  // 找出关联关系中蕴含父节点的表 id
  val childRelation = remainRelations.filter(e => e.leftTable == parentTableId || e.rightTable == parentTableId)
  if (childRelation.isEmpty) return
  // 将关联关系中父节点的关联信息置于左侧,不便后续操作
  childRelation
    .map(e => if (e.leftTable == parentTableId) e else LinkRelation(e.rightTable, e.leftTable, e.linkFields.map(e => (e._2, e._1))))
    .foreach{e => parentNode.childs += TableRelationTreeNode(e.rightTable, e, new ListBuffer())}
    // 移除曾经应用过的关联关系
  remainRelations --= childRelation
  parentNode.childs foreach {buildRelationTree(_, remainRelations)}
}

SQL 语句生成的外围代码

def buildTransSQL(task: UpdateTask): Seq[String] = {
        // 存储所有用到的表(namespace 为表的信息)val namespacesRef = mutable.HashMap[Long, Namespace]()
    task.selectSourceTree.get.foreach(i => namespacesRef += (kv = (i, Await.result(namespaceDal.findById(i), minTimeOut).get)))
    val targetTableId = task.targetTree.get
        // 指标表
    val targetNamespace = Await.result(namespaceDal.findById(targetTableId), minTimeOut).head
    namespacesRef.put(targetTableId, targetNamespace)
    val eventTableId = task.eventTable.get
        // 事件表(源 / 触发表)val eventNamespace = namespacesRef(eventTableId)
        // 没有计算逻辑,当做镜像同步,间接 SELECT * ...
    if (task.targetCalculate.isEmpty)
      return Seq.newBuilder.+=(s"spark_sql= select * from ${eventNamespace.nsTable};").result()
        
    val transSqlList = new ListBuffer[String]

    // 先将触发表的所有字段查问进去
    transSqlList += s"spark_sql= select ${sourceDataDal.getSourceDataTableField(eventTableId).filter(_ != "ums_active_").map(e => {s"$e AS ${eventNamespace.nsDatabase}__${eventNamespace.nsTable}__$e"
      }).mkString(",")
    } from ${eventNamespace.nsTable}"

    if (task.getTableRelation.nonEmpty) {val remainLinks = new ListBuffer[LinkRelation]()
      // 聚合反复的表关联关系
      task.getTableRelation.getOrElse(Seq.empty)
        .map(e => {if (e.leftTableSelect > e.rightTableSelect) {
            TableRelation(
              leftTableSelect = e.rightTableSelect,
              rightTableSelect = e.leftTableSelect,
              leftColumnSelect = e.rightColumnSelect,
              rightColumnSelect = e.leftColumnSelect
            )
          } else e
        })
        .groupBy(e => s"${e.leftTableSelect}-${e.rightTableSelect}")
        .map(e => {
          LinkRelation(
            leftTable = e._2.head.leftTableSelect,
            rightTable = e._2.head.rightTableSelect,
            linkFields = e._2.map(e => (e.leftColumnSelect, e.rightColumnSelect))
          )
        }) foreach {remainLinks += _}
            
            // 根结点
      val rootTreeNode = TableRelationTreeNode(
        eventTableId,
        null,
        new ListBuffer[TableRelationTreeNode]
      )
      // 构建关系树
      buildRelationTree(rootTreeNode, remainLinks)

      // 如果有残余的关系未被应用,则阐明有无奈连贯到根节点的关系,抛出异样
      if (remainLinks.nonEmpty) {
        throw new IllegalArgumentException(s" 游离的关联关系:${
          remainLinks.map(e => {val leftNs = namespacesRef(e.leftTable)
            val rightNs = namespacesRef(e.rightTable)
            s"${leftNs.nsDatabase}.${leftNs.nsTable} <-> ${rightNs.nsDatabase}.${rightNs.nsTable}"
          }).toString
        }\n 无奈与根节点 (${eventNamespace.nsDatabase}.${eventNamespace.nsTable}) 建设关系 ")
      }

      val queue = new mutable.Queue[TableRelationTreeNode]
      queue.enqueue(rootTreeNode)
      // 广度优先遍历,逐层构建 SQL 语句,保障依赖程序
      while (queue.nonEmpty) {
        val len = queue.size
        for (i <- 0 until len) {
          val node = queue.dequeue
          if (node.value != eventTableId) {
            val relation = node.parentRelation
            // 以后节点表
            val curNs = namespacesRef(node.value)
            // 父节点表
            val parNs = namespacesRef(relation.leftTable)
            val curTableName = s"${curNs.nsDatabase}.${curNs.nsTable}"
            val fields = sourceDataDal.getSourceDataTableField(node.value)
            val fieldAliasPrefix = s"${curNs.nsDatabase}__${curNs.nsTable}__"
            // 构建 lookup SQL
            transSqlList += s"pushdown_sql left join with ${curNs.nsSys}.${curNs.nsInstance}.${curNs.nsDatabase}=select ${fields.map(f => s"$f as $fieldAliasPrefix$f").mkString(",")
            } from $curTableName where (${relation.linkFields.map(_._2.replaceAll(".*\\.", "")).mkString(",")
            }) in (${relation.linkFields.map(_._1.replace(".","__")).map(e => "${" + e + "}").mkString(",")})";
          }
          node.childs foreach {queue.enqueue(_) }
        }
      }
    }
    // 输入最终后果集的 SparkSQL
    transSqlList += s"spark_sql= select ${
      task.targetCalculate.get.map { e =>
        s"${e.config.replaceAll("(\\w+)\\.(\\w+)\\.(\\w+)","$1__$2__$3")} as ${e.columnName}"
      }.mkString(",")
    } from ${eventNamespace.nsTable} where ${if (task.filterCondition.getOrElse("") =="") "1=1" else task.filterCondition.get}"

    transSqlList.toSeq
  }

测试

我新建了几张测试表,并应用小程序向库中随机生成了一些数据,而后又新建了一个指标表,以此来测试该性能,过程如下

前端配置

关联关系

计算逻辑

形象出的关联关系应为:

                                         ------> customer_transaction
                                         |
customer <---> customer_account_info <----
                                         |
                                         ------> customer_seller_relation  <-----> seller_info

后盾生成的 SQL:

spark_sql =
select
  address AS adp_mock_spr_mirror__customer__address,
  company AS adp_mock_spr_mirror__customer__company,
  gender AS adp_mock_spr_mirror__customer__gender,
  id AS adp_mock_spr_mirror__customer__id,
  id_card AS adp_mock_spr_mirror__customer__id_card,
  mobile AS adp_mock_spr_mirror__customer__mobile,
  real_name AS adp_mock_spr_mirror__customer__real_name,
  ums_id_ AS adp_mock_spr_mirror__customer__ums_id_,
  ums_op_ AS adp_mock_spr_mirror__customer__ums_op_,
  ums_ts_ AS adp_mock_spr_mirror__customer__ums_ts_
from
  customer;

pushdown_sql
  left join with tidb.spr_ods_department.adp_mock_spr_mirror =
select
  account_bank as adp_mock_spr_mirror__customer_account_info__account_bank,
  account_level as adp_mock_spr_mirror__customer_account_info__account_level,
  account_no as adp_mock_spr_mirror__customer_account_info__account_no,
  customer_id as adp_mock_spr_mirror__customer_account_info__customer_id,
  entry_time as adp_mock_spr_mirror__customer_account_info__entry_time,
  id as adp_mock_spr_mirror__customer_account_info__id,
  loc_seller as adp_mock_spr_mirror__customer_account_info__loc_seller,
  risk_level as adp_mock_spr_mirror__customer_account_info__risk_level,
  risk_test_date as adp_mock_spr_mirror__customer_account_info__risk_test_date,
  ums_active_ as adp_mock_spr_mirror__customer_account_info__ums_active_,
  ums_id_ as adp_mock_spr_mirror__customer_account_info__ums_id_,
  ums_op_ as adp_mock_spr_mirror__customer_account_info__ums_op_,
  ums_ts_ as adp_mock_spr_mirror__customer_account_info__ums_ts_
from
  adp_mock_spr_mirror.customer_account_info
where
  (id) in ($ { adp_mock_spr_mirror__customer__id});

pushdown_sql
  left join with tidb.spr_ods_department.adp_mock_spr_mirror =
select
  customer_id as adp_mock_spr_mirror__customer_seller_relation__customer_id,
  id as adp_mock_spr_mirror__customer_seller_relation__id,
  relation_type as adp_mock_spr_mirror__customer_seller_relation__relation_type,
  seller_id as adp_mock_spr_mirror__customer_seller_relation__seller_id,
  ums_active_ as adp_mock_spr_mirror__customer_seller_relation__ums_active_,
  ums_id_ as adp_mock_spr_mirror__customer_seller_relation__ums_id_,
  ums_op_ as adp_mock_spr_mirror__customer_seller_relation__ums_op_,
  ums_ts_ as adp_mock_spr_mirror__customer_seller_relation__ums_ts_,
  wechat_relation as adp_mock_spr_mirror__customer_seller_relation__wechat_relation
from
  adp_mock_spr_mirror.customer_seller_relation
where
  (customer_id) in ($ { adp_mock_spr_mirror__customer_account_info__id}
  );
  
pushdown_sql
  left join with tidb.spr_ods_department.adp_mock_spr_mirror =
select
  balance as adp_mock_spr_mirror__customer_transaction__balance,
  borrow_loan as adp_mock_spr_mirror__customer_transaction__borrow_loan,
  comment as adp_mock_spr_mirror__customer_transaction__comment,
  customer_account_id as adp_mock_spr_mirror__customer_transaction__customer_account_id,
  customer_id as adp_mock_spr_mirror__customer_transaction__customer_id,
  deal_abstract_code as adp_mock_spr_mirror__customer_transaction__deal_abstract_code,
  deal_account_type_code as adp_mock_spr_mirror__customer_transaction__deal_account_type_code,
  deal_code as adp_mock_spr_mirror__customer_transaction__deal_code,
  deal_partner_account as adp_mock_spr_mirror__customer_transaction__deal_partner_account,
  deal_partner_name as adp_mock_spr_mirror__customer_transaction__deal_partner_name,
  deal_partner_ogr_name as adp_mock_spr_mirror__customer_transaction__deal_partner_ogr_name,
  deal_partner_org_num as adp_mock_spr_mirror__customer_transaction__deal_partner_org_num,
  id as adp_mock_spr_mirror__customer_transaction__id,
  subject as adp_mock_spr_mirror__customer_transaction__subject,
  transaction_amount as adp_mock_spr_mirror__customer_transaction__transaction_amount,
  transaction_time as adp_mock_spr_mirror__customer_transaction__transaction_time,
  ums_active_ as adp_mock_spr_mirror__customer_transaction__ums_active_,
  ums_id_ as adp_mock_spr_mirror__customer_transaction__ums_id_,
  ums_op_ as adp_mock_spr_mirror__customer_transaction__ums_op_,
  ums_ts_ as adp_mock_spr_mirror__customer_transaction__ums_ts_
from
  adp_mock_spr_mirror.customer_transaction
where
  (customer_id, customer_account_id) in ($ { adp_mock_spr_mirror__customer_account_info__id},
    $ {adp_mock_spr_mirror__customer_account_info__account_no}
  );

pushdown_sql
  left join with tidb.spr_ods_department.adp_mock_spr_mirror =
select
  current_bank as adp_mock_spr_mirror__seller_info__current_bank,
  department_id as adp_mock_spr_mirror__seller_info__department_id,
  email as adp_mock_spr_mirror__seller_info__email,
  entry_time as adp_mock_spr_mirror__seller_info__entry_time,
  id as adp_mock_spr_mirror__seller_info__id,
  id_card as adp_mock_spr_mirror__seller_info__id_card,
  leader_id as adp_mock_spr_mirror__seller_info__leader_id,
  mobile as adp_mock_spr_mirror__seller_info__mobile,
  name as adp_mock_spr_mirror__seller_info__name,
  position as adp_mock_spr_mirror__seller_info__position,
  tenant_id as adp_mock_spr_mirror__seller_info__tenant_id,
  ums_active_ as adp_mock_spr_mirror__seller_info__ums_active_,
  ums_id_ as adp_mock_spr_mirror__seller_info__ums_id_,
  ums_op_ as adp_mock_spr_mirror__seller_info__ums_op_,
  ums_ts_ as adp_mock_spr_mirror__seller_info__ums_ts_
from
  adp_mock_spr_mirror.seller_info
where
  (id) in ($ { adp_mock_spr_mirror__customer_seller_relation__seller_id}
  );

spark_sql =
select
  adp_mock_spr_mirror__customer_account_info__id as id,
  adp_mock_spr_mirror__customer__real_name as name,
  IF(adp_mock_spr_mirror__customer__gender = 0, "0", "1") as sex,
  adp_mock_spr_mirror__seller_info__department_id as age,
  adp_mock_spr_mirror__customer__mobile as phone,
  adp_mock_spr_mirror__seller_info__entry_time as born,
  adp_mock_spr_mirror__customer__address as address,
  IF(
    adp_mock_spr_mirror__customer_transaction__borrow_loan = 1,
    "1",
    "0"
  ) as married,
  NOW() as create_time,
  NOW() as update_time,
  'P' as zodiac
from
  customer
where
  1 = 1;

同步后果

从 Spark 后盾日志中能够看到,数据曾经失常插入指标表。

结语

以上是树和 BFS 在理论开发场景中的一个利用,代码实现其实较为简单,重点是实现的思路,当然解决问题的办法并不是惟一的,在此问题中,也能够在构建树的过程中间接构建 SQL 语句,省去后续的 BFS 过程,然而我思考到后续可能减少的需要,还是将此处拆成了两步,不便后续在扩大,依据理论场景抉择计划即可。另外,计算逻辑中短少字段强校验,当用户输出谬误字段时在运行期间能力察觉到,思考前期再减少此性能。

有不对的中央欢送斧正,心愿本文对大家有所帮忙。

正文完
 0