关于数据库:数仓血缘关系数据的存储与读写

1次阅读

共计 7892 个字符,预计需要花费 20 分钟才能阅读完成。

本文首发于 Nebula Graph Community 公众号

一、抉择 Nebula 的起因

性能优越

  • 查问速度极快
  • 架构拆散,易扩大(目前的机器配置低,后续可能扩大)
  • 高可用(因为是分布式,所以从应用到当初没有呈现过宕机状况)

上手容易

  • 介绍全(相熟架构和性能)
  • 部署快(通过手册的洗礼,疾速部署简略的集群)
  • 应用简便(遇到须要的数据,查问手册获取对应的 GNQL,针对性查问)
  • 答疑优良(遇到问题,能够先翻论坛,如果没有,那就公布帖子,开发人员的帮忙很及时)

开源,且技术稳固

  • 因为实际企业多,用起来释怀。

二、业务需要背景介绍

为不便数据治理、元数据管理及数据品质监控,将调度系统生成的数仓血统保存起来。

血统数据流程

从采集、存储到平台展现的数据全流程:

在查问平台的局部数据查问展现

三、我的具体实际

1、版本抉择

这里咱们采纳了 Nebula v3.0.0、Nebula Java Client v3.0.0,这里提下 Nebula Graph 和 Java 客户端须要相兼容,版本号要对齐。

2、集群部署

机器配置

四台实体机,同配置:
10C 2 / 16G 8 / 600G

3、装置形式

这里咱们采纳了 RPM 装置。
a. 通过 wget 拉取安装包后装置。
b. 更改配置文件,次要更改参数:

  • Meta 服务的所有机器 —— meta_server_addrs=ip1:9559, ip2:9559, ip3:9559
    以后机器 ip(如果是 meta / graph / storage,填对应 meta / graph / storage 机器的 ip)—— local_ip

c. 启动后通过 Console 简略测试
add hosts ip:port 减少本人的机器 ip 后(内核版本低于 v3.0.0 的 Nebula 用户可疏忽该步骤),show hosts,如果是 online,即可开始测试相干 nGQL。

4、数据导入

目前分两种状况更新数据。

a. 实时监控调度平台

监控每个工作实例,通过依赖节点获取上下游的关系,将关系实时打入到 MySQL 和 Nebula 中,更新 Nebula Graph 数据通过 Spark Connector 实现。(MySQL 做备份,因为 Nebula 不反对事务,可能存在数据偏差)

b. 定时调度改正数据

通过 MySQL 中的血缘关系,通过 Spark 工作定时校对 Nebula 数据,更新数据同样通过 Spark Connector 实现。

Spark Connector 的应用:NebulaConnectionConfig 初始化配置,而后通过连贯信息、插入的点与边的相干参数及实体 Tag、Edge 创立 WriteNebulaVertexConfig 和 WriteNebulaEdgeConfig 对象,以备写入点和边的数据。

5、数据平台查问

数据平台查问血统的利用:

a. 获取 Nebula 数据实现过程

通过初始化连接池 Nebula pool,实现单例工具类,不便在整个我的项目中调用并应用 Session。

这里肯定要留神,连接池只能够有一个,而 Session 能够通过 MaxConnectionNum 设置连接数,依据理论业务来判断具体参数(平台查问越频繁,连接数就要设置的越多一些)。而每次 Session 应用结束也是要开释的。

b. 查问数据,转换为 ECharts 须要的 JSON

① 通过 getSubGraph 获取以后表或字段的所有上下游相干点,这一点通过获取子图的办法,很不便。
② 须要通过后果,解析出其中两个方向数据的点,而后递归解析,最初转为一个递归调用本人的 Bean 类对象。
③ 写一个满足前端须要的 JSON 串的 toString 办法,失去后果后即可。

工具类和外围逻辑代码

这里分享下我用到的工具类和外围逻辑代码

工具类

object NebulaUtil {private val log: Logger = LoggerFactory.getLogger(NebulaUtil.getClass)

  private val pool: NebulaPool = new NebulaPool

  private var success: Boolean = false

  {
    // 首先初始化连接池

    val nebulaPoolConfig = new NebulaPoolConfig
    nebulaPoolConfig.setMaxConnSize(100)


    // 初始化 ip 和端口
    val addresses = util.Arrays.asList(new HostAddress("10.88.100.88", 9669))
    success = pool.init(addresses, nebulaPoolConfig)

  }

  def getPool(): NebulaPool = {pool}

  def isSuccess(): Boolean = {success}

  //TODO query:创立空间、进入空间、创立新的点和边的类型、插入点、插入边、执行查问
  def executeResultSet(query: String, session: Session): ResultSet = {val resp: ResultSet = session.execute(query)
    if (!resp.isSucceeded){log.error(String.format("Execute: `%s', failed: %s", query, resp.getErrorMessage))
      System.exit(1)
    }

    resp
  }

  def executeJSON(queryForJson: String, session: Session): String = {val resp: String = session.executeJson(queryForJson)
    val errors: JSONObject = JSON.parseObject(resp).getJSONArray("errors").getJSONObject(0)
    if (errors.getInteger("code") != 0){log.error(String.format("Execute: `%s', failed: %s", queryForJson, errors.getString("message")))
      System.exit(1)
    }

    resp
  }


  def executeNGqlWithParameter(query: String, paramMap: util.Map[String, Object], session: Session): Unit = {val resp: ResultSet = session.executeWithParameter(query, paramMap)
    if (!resp.isSucceeded){log.error(String.format("Execute: `%s', failed: %s", query, resp.getErrorMessage))
      System.exit(1)
    }

  }



  // 获取 ResultSet 中的各个列名及数据
  //_1 列名组成的列表
  //_2 多 row 组成的列表嵌套    单个 row 的列表 蕴含本行每一列的数据
  def getInfoForResult(resultSet: ResultSet): (util.List[String], util.List[util.List[Object]]) = {

    // 拿到列名
    val colNames: util.List[String] = resultSet.keys

    // 拿数据
    val data: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]]

    // 循环获取每行数据
    for (i <- 0 until resultSet.rowsSize) {val curData = new util.ArrayList[Object]
      // 拿到第 i 行数据的容器
      val record = resultSet.rowValues(i)
      import scala.collection.JavaConversions._

      // 获取容器中数据
      for (value <- record.values) {if (value.isString) curData.add(value.asString)
        else if (value.isLong) curData.add(value.asLong.toString)
        else if (value.isBoolean) curData.add(value.asBoolean.toString)
        else if (value.isDouble) curData.add(value.asDouble.toString)
        else if (value.isTime) curData.add(value.asTime.toString)
        else if (value.isDate) curData.add(value.asDate.toString)
        else if (value.isDateTime) curData.add(value.asDateTime.toString)
        else if (value.isVertex) curData.add(value.asNode.toString)
        else if (value.isEdge) curData.add(value.asRelationship.toString)
        else if (value.isPath) curData.add(value.asPath.toString)
        else if (value.isList) curData.add(value.asList.toString)
        else if (value.isSet) curData.add(value.asSet.toString)
        else if (value.isMap) curData.add(value.asMap.toString)
      }
      // 合并数据
      data.add(curData)
    }

    (colNames, data)
  }

  def close(): Unit = {pool.close()
  }

}

外围代码

  //bean next 指针为可变数组
  // 获取子图
  //field_name 起始节点, direct 子图方向 (true 上游, false 上游)
  def getSubgraph(field_name: String, direct: Boolean, nebulaSession: Session): FieldRely = {

    // field_name 所在节点
    val relyResult = new FieldRely(field_name, new mutable.ArrayBuffer[FieldRely])

    // out 为上游, in 为上游
    var downOrUp = "out"
    // 获取以后查问的方向
    if (direct){downOrUp = "out"} else {downOrUp = "in"}

    //1 查问语句 查问上游所有子图
    val query =
      s"""| get subgraph 100 steps from"$field_name" $downOrUp field_rely yield edges as field_rely;
         |""".stripMargin

    val resultSet = NebulaUtil.executeResultSet(query, nebulaSession)

    //[[:field_rely "dws.dws_order+ds_code"->"dws.dws_order_day+ds_code" @0 {}], [:field_rely "dws.dws_order+ds_code"->"tujia_qlibra.dws_order+p_ds_code" @0 {}], [:field_rely "dws.dws_order+ds_code"->"tujia_tmp.dws_order_execution+ds_code" @0 {}]]
    // 非空则获取数据
    if (!resultSet.isEmpty) {
      // 非空,则拿数据,解析数据
      val data = NebulaUtil.getInfoForResult(resultSet)
      val curData: util.List[util.List[Object]] = data._2

      // 正则匹配引号中数据
      val pattern = Pattern.compile("\"([^\"]*)\"")

      // 上一步长的所有节点数组
      // 判断节点的父节点, 不便存储
      var parentNode = new mutable.ArrayBuffer[FieldRely]()


      //2 首先获取步长为 1 的边
      curData.get(0).get(0).toString.split(",").foreach(curEdge =>{
        // 拿到边的起始和目标点
        val matcher = pattern.matcher(curEdge)
        var startPoint = ""var endPoint =""

        // 将两点赋值
        while (matcher.find()){val curValue = matcher.group().replaceAll("\"", "")
          // 上下游的指向是不同的 所以须要依据上下游切换 开始节点和完结节点的信息获取
          // out 为上游, 数据结构是 startPoint -> endPoint
          if(direct){if ("".equals(startPoint)){startPoint = curValue}else{endPoint = curValue}
          }else {
            // in 为上游, 数据结构是 endPoint -> startPoint
            if ("".equals(endPoint)){endPoint = curValue}else{startPoint = curValue}
          }

        }
        // 合并到终点 bean 中
        relyResult.children.append(new FieldRely(endPoint, new ArrayBuffer[FieldRely]()))
      })

      //3 并初始化父节点数组
      parentNode = relyResult.children




      //4 失去其余所有边
      for (i <- 1 until curData.size - 1){
        // 贮存下个步长的父节点汇合
        val nextParentNode = new mutable.ArrayBuffer[FieldRely]()
        val curEdges = curData.get(i).get(0).toString

        //3 多个边循环解析, 拿到目标点
        curEdges.split(",").foreach(curEdge => {

          // 拿到边的起始和目标点
          val matcher = pattern.matcher(curEdge)
          var startPoint = ""val endNode = new FieldRely("")

          // 将两点赋值
          while (matcher.find()){val curValue = matcher.group().replaceAll("\"", "")
//            logger.info(s"not 1 curValue: $curValue")
            if(direct) {if ("".equals(startPoint)){startPoint = curValue}else{
                endNode.name = curValue
                endNode.children = new mutable.ArrayBuffer[FieldRely]()
                nextParentNode.append(endNode)
              }
            }else {if ("".equals(endNode.name)){
                endNode.name = curValue
                endNode.children = new mutable.ArrayBuffer[FieldRely]()
                nextParentNode.append(endNode)
              }else{startPoint = curValue}
            }

          }

          // 通过 startPoint 找到父节点, 将 endPoint 退出到本父节点的 children 中
          var flag = true
          // 至此, 一条边插入胜利
          for (curFieldRely <- parentNode if flag){if (curFieldRely.name.equals(startPoint)){curFieldRely.children.append(endNode)
              flag = false
            }
          }

        })

        // 更新父节点
        parentNode = nextParentNode
      }

    }
//    logger.info(s"relyResult.toString: ${relyResult.toString}")
    relyResult
  }

Bean toString

class FieldRely {

  @BeanProperty
  var name: String = _  // 以后节点字段名

  @BeanProperty
  var children: mutable.ArrayBuffer[FieldRely] = _  // 以后节点对应的所有上游或下游子字段名

  def this(name: String, children: mutable.ArrayBuffer[FieldRely]) = {this()
    this.name = name
    this.children = children
  }

  def this(name: String) = {this()
    this.name = name
  }


  override def toString(): String = {
    var resultString = ""
    // 引号变量
    val quote = "\""

    // 空的话间接将 child 置为空数组的 json
    if (children.isEmpty){resultString += s"{${quote}name${quote}: ${quote}$name${quote}, ${quote}children${quote}: []}"
    }else {
      //child 有数据, 增加索引并循环获取
      var childrenStr = ""
//      var index = 0

      for (curRely <- children){
        val curRelyStr = curRely.toString
        childrenStr += curRelyStr + ","
//        index += 1
      }

      // 去掉多余的  ','
      if (childrenStr.length > 2){childrenStr = childrenStr.substring(0, childrenStr.length - 2)
      }

      resultString += s"{${quote}name${quote}: ${quote}$name${quote}, ${quote}children${quote}: [$childrenStr]}"
    }
    resultString
  }
}

后果

在查问子图步长靠近 20 的状况下,基本上接口返回数据能够管制在 200ms 内(蕴含后端简单解决逻辑)。

本文正在加入 首届 Nebula 征文活动,如果你感觉本文对你有所帮忙能够来给我投个票,以示激励~
谢谢 (#^.^#)

我是数据开发的实习生,在这个岗位上工作四个月左右的工夫了,期间负责开发数据平台的性能。
因为其中一些数据的读写性能较低,所以在调研后,抉择部署一个 Nebula 集群,它的技术体系也是比拟成熟的,社区也比较完善,对刚刚接触的它的人十分敌对。所以很快就开始投入使用了。在应用过程中,有一些本人的见解,和遇到的一些问题及解决办法,在这里向大家分享一下本人的应用教训。


交换图数据库技术?退出 Nebula 交换群请先填写下你的 Nebula 名片,Nebula 小助手会拉你进群~~

正文完
 0