关于数据仓库:数仓建模OneID

9次阅读

共计 6542 个字符,预计需要花费 17 分钟才能阅读完成。

明天是我在上海租房的小区被封的第三天,因为我的粗心,没有屯吃的,外卖明天齐全点不到了,中午的时候我找到了一包快过期的肉松饼,才补充了 1000 焦耳的能量。然而中午去做核酸的时候,我感觉走路有点不稳,我看到大白的棉签深刻我的嘴里,我居然认为是吃的,差点咬住了,还好我有仅存的一点意识。下午我收到女朋友给我点的外卖——面包(我不晓得她是怎么点到的外卖,我很打动),很粗劣的面包,搁平时我根本不喜爱吃面包,然而曾经到了这个份上,我大口吃起来,居然感觉这是世界上最好吃的食物了。今天晚上 5:50 的闹钟,去叮咚和美团买菜,看能不能抢几桶泡面吧。愿神保佑,我暗暗下着信心并祷告着,胸前画着十字。。。

数据仓库系列文章(继续更新)

  1. 数仓架构发展史
  2. 数仓建模方法论
  3. 数仓建模分层实践
  4. 数仓建模—宽表的设计
  5. 数仓建模—指标体系
  6. 数据仓库之拉链表
  7. 数仓—数据集成
  8. 数仓—数据集市
  9. 数仓—商业智能零碎
  10. 数仓—埋点设计与治理
  11. 数仓—ID Mapping
  12. 数仓—OneID
  13. 数仓—AARRR 海盗模型
  14. 数仓—总线矩阵
  15. 数仓—数据安全
  16. 数仓—数据品质
  17. 数仓—数仓建模和业务建模

关注公众号:大数据技术派 ,回复: 材料 ,支付1024G 材料。

OneID

后面咱们学习了ID Mapping,包含ID Mapping 的背景介绍和业务场景,以及如何应用Spark 实现ID Mapping,这个过程中波及到了很多货色,当然咱们都通过文章的模式介绍给大家了,所以你再学习明天这一节之前,能够先看一下后面的文章

  1. Spark 实战—GraphX 编程指南
  2. 数仓建模—ID Mapping

在上一节咱们介绍 ID Mapping 的时候咱们就说过 ID Mapping 是为了买通用户各个维度的数据,从而打消数据孤岛、防止数据歧义,从而更好的刻画用户,所以说 ID Mapping 是伎俩不是目标,目标是为了买通数据体系,ID Mapping 最终的产出就是咱们明天的配角 OneID,也就是说数据收集过去之后通过 ID Mapping 买通,从而产生 OneID,这一步之后咱们的整个数据体系就将应用 OneID 作为用户的 ID,这样咱们整个数据体系就得以买通

OneData

开始之前咱们先看一下阿里的 OneData 数据体系,从而更好认识一下 OneID,后面咱们说过 ID Mapping 只是伎俩不是目标,目标是为了买通数据体系,ID Mapping 最终的产出就是 OneID

其实 OneID 在咱们整个数据服务体系中,也只是终点不是起点或者说是伎俩,咱们最终的目标是为了建设对立的数据资产体系。

没有建设对立的数据资产体系之前,咱们的数据体系建设存在上面诸多问题

  1. 数据孤岛:各产品、业务的数据互相隔离,难以通过共性 ID 买通
  2. 反复建设:反复的开发、计算、存储,带来昂扬的数据老本
  3. 数据歧义:指标定义口径不统一,造成计算偏差,利用艰难

在阿里巴巴 OneData 体系中,OneID 指对立数据萃取,是一套解决数据孤岛问题的思维和办法。数据孤岛是企业倒退到肯定阶段后广泛遇到的问题。各个部门、业务、产品,各自定义和存储其数据,使得这些数据间难以关联,变成孤岛个别的存在。

OneID 的做法是通过对立的实体辨认和连贯,突破数据孤岛,实现数据通融。简略来说,用户、设施等业务实体,在对应的业务数据中,会被映射为惟一辨认(UID)上,其各个维度的数据通过这个 UID 进行关联。

各个部门、业务、产品对业务实体的 UID 的定义和实现不一样,使得数据间无奈间接关联,成为了数据孤岛。基于手机号、身份证、邮箱、设施 ID 等信息,联合业务规定、机器学习、图算法等算法,进行 ID-Mapping,将各种 UID 都映射到对立 ID 上。通过这个对立 ID,便可关联起各个数据孤岛的数据,实现数据通融,以确保业务剖析、用户画像等数据利用的精确和全面。

OneModel 对立数据构建和治理

将指标定位细化为:

1. 原子指标
2. 工夫周期
3. 修饰词(统计粒度、业务限定, etc)

通过这些定义,设计出各类派生指标 基于数据分层,设计出维度表、明细事实表、汇总事实表,其实咱们看到 OneModel 其实没有什么新的内容,其实就是咱们数仓建模的那一套货色

OneService 对立数据服务

OneService 基于复用而不是复制数据的思维,指得是咱们的对立的数据服务,因为咱们始终再提倡复用,包含咱们数仓的建设,然而咱们的数据服务这一块却是空白,所以 OneService 外围是服务的复用,能力包含:

  • 利用主题逻辑表屏蔽简单物理表的主题式数据服务
  • 个别查问 + OLAP 剖析 + 在线服务的对立且多样化数据服务
  • 屏蔽多种异构数据源的跨源数据服务

OneID 对立数据萃取

基于对立的实体辨认、连贯和标签生产,实现数据通融,包含:

  • ID 自动化辨认与连贯
  • 行为元素和行为规定
  • 标签生产

OneID 基于超强 ID 辨认技术链接数据,高效生产标签;业务驱动技术价值化,打消数据孤岛,晋升数据品质,晋升数据价值。

而 ID 的买通,必须有 ID-ID 之间的两两映射买通关系 通过 ID 映射关系表,能力将多种 ID 之间的关联买通,齐全孤立的两种 ID 是无奈买通的

买通整个 ID 体系,看似简略,实则计算简单,计算量十分大。如果某种对象有数亿个个体,每个个体又有数十种不同的 ID 标识,任意两种 ID 之间都有可能买通关系,想要实现这类对象的所有个体 ID 买通须要数亿次计算,个别的机器甚至大数据集群都无奈实现。

大数据畛域中的 ID-Mapping 技术就是用机器学习算法类来取代横蛮计算,解决对象数据买通的问题。基于输出的 ID 关系对,利用机器学习算法做稳定性和收敛性计算,输入关系稳固的 ID 关系对,并生成一个 UID 作为惟一辨认该对象的标识码。

OneID 实现过程中存在的问题

后面咱们晓得咱们的 ID Mapping 是通过图计算实现,外围就是连通图,其实实现 OneID 咱们在买通 ID 之后,咱们就能够为一个个连通图生成一个 ID, 因为一个连通图 就代表一个用户,这样咱们生成的 ID 就是用户的 OneID,这里的用户指的是自然人,而不是某一个平台上的用户。

OneID 的生成问题

首先咱们须要一个 ID 生成算法,因为咱们须要为大量用户生成 ID, 咱们的 ID 要求是惟一的,所以在算法设计的时候就须要思考到这一点,咱们并不举荐应用 UUID, 起因是 UUID 了可能会呈现反复,而且 UUID 没有含意,所以咱们不举荐应用 UUID, 咱们这里应用的是 MD5 算法,所以咱们的 MD5 算法的参数是咱们的图的标示 ID。

OneID 的更新问题

这里的更新问题次要就是咱们的数据每天都在更新,也就是说咱们的图关系在更新,也就是说咱们要不要给这个自然人从新生成 OneID , 因为他的图关系可能产生了变动。

其实这里咱们不能为该自然人生成新的 OneID,否则咱们数仓里的历史数据可能无奈关联应用,所以咱们的策略就是如果该自然人曾经有 OneID 了,则不须要从新生成,其实这里咱们就是判断该图中的所有的顶点是否存在 OneID,咱们前面在代码中体现着一点。

OneID 的抉择问题

这个和下面的更新问题有点像,下面更新问题咱们能够保障一个自然人的 OneID 不发生变化,然而抉择问题会导致发生变化,然而这个问题是图计算中无奈防止的,咱们举个例子,假如咱们有用户的两个 ID(A_ID,C_ID),然而这两个 ID 在以后是没有方法买通的,所以咱们就会为这个两个 ID 生成两个 OneID, 也就是(A_OneID,B_OneID),所以这个时候咱们晓得因为 ID Mapping 不上,所以咱们认为这两个 ID 是两个人。

前面咱们有了另外一个 ID(B_ID), 这个 ID 能够别离和其余的两个 ID 买通,也就是 B_ID<——>A_ID , B_ID<——>C_ID 这样咱们就买通这个三个 ID,这个时候咱们晓得

这个用户存在三个 ID, 并且这个时候曾经存在了两个 OneID,所以这个时候咱们须要在这两个 OneID 中抉择一个作为用户的 OneID,简略粗犷点就能够抉择最小的或者是最大的。

咱们抉择了之后,要将另外一个 OneID 对应的数据,对应到抉择的 OneID 下,否则没有被抉择的 OneID 的历史数据就无奈追溯了

OneID 代码实现

这个代码相比 ID Mapping 次要是多了 OneID 的生成逻辑和更新逻辑,须要留神的是对于顶点汇合的结构咱们不是间接应用字符串的 hashcode,这是因为 hashcode 很容易反复

object OneID  {
    val spark = SparkSession
      .builder()
      .appName("OneID")
      .getOrCreate()

  val sc = spark.sparkContext

  def main(args: Array[String]): Unit = {val bizdate=args(0)
    val c = Calendar.getInstance
    val format = new SimpleDateFormat("yyyyMMdd")
    c.setTime(format.parse(bizdate))

    c.add(Calendar.DATE, -1)
    val bizlastdate = format.format(c.getTime)

    println(s"工夫参数  ${bizdate}    ${bizlastdate}")
    // dwd_patient_identity_info_df 就是咱们用户的各个 ID,也就是咱们的数据源
    // 获取字段,这样咱们就能够扩大新的 ID 字段,然而不必更新代码
    val columns = spark.sql(
      s"""
         |select
         |   *
         |from
         |   lezk_dw.dwd_patient_identity_info_df
         |where
         |   ds='${bizdate}'
         |limit
         |   1
         |""".stripMargin)
      .schema.fields.map(f => f.name).filterNot(e=>e.equals("ds")).toList

    // 获取数据
    val dataFrame = spark.sql(
      s"""
        |select
        |   ${columns.mkString(",")}
        |from
        |   lezk_dw.dwd_patient_identity_info_df
        |where
        |   ds='${bizdate}'
        |""".stripMargin
    )

    // 数据筹备
    val data = dataFrame.rdd.map(row => {val list = new ListBuffer[String]()
      for (column <- columns) {val value = row.getAs[String](column)
        list.append(value)
      }
      list.toList
    })
    import spark.implicits._
    // 顶点汇合
    val veritx= data.flatMap(list => {for (i <- 0 until columns.length if StringUtil.isNotBlank(list(i)) && (!"null".equals(list(i))))
        yield (new BigInteger(DigestUtils.md5Hex(list(i)),16).longValue, list(i))

    }).distinct

    val veritxDF=veritx.toDF("id_hashcode","id")
    veritxDF.createOrReplaceTempView("veritx")

    // 生成边的汇合
    val edges = data.flatMap(list => {for (i <- 0 to list.length - 2 if StringUtil.isNotBlank(list(i)) && (!"null".equals(list(i)))
           ; j <- i + 1 to list.length - 1 if StringUtil.isNotBlank(list(j)) && (!"null".equals(list(j))))
      yield Edge(new BigInteger(DigestUtils.md5Hex(list(i)),16).longValue,new BigInteger(DigestUtils.md5Hex(list(j)),16).longValue, "")
    }).distinct


    // 开始应用点汇合与边汇合进行图计算训练
    val graph = Graph(veritx, edges)
    val connectedGraph=graph.connectedComponents()

    // 连通节点
    val  vertices = connectedGraph.vertices.toDF("id_hashcode","guid_hashcode")
    vertices.createOrReplaceTempView("to_graph")

    // 加载昨日的 oneid 数据 (oneid,id,id_hashcode) 
    val ye_oneid = spark.sql(
      s"""
        |select
        |   oneid,id,id_hashcode
        |from
        |   lezk_dw.dwd_patient_oneid_info_df
        |where
        |   ds='${bizlastdate}'
        |""".stripMargin
    )
    ye_oneid.createOrReplaceTempView("ye_oneid")

    // 关联获取 曾经存在的 oneid,这里的 min 函数就是咱们说的 oneid 的抉择问题
    val exists_oneid=spark.sql(
      """
        |select
        |   a.guid_hashcode,min(b.oneid) as oneid
        |from
        |   to_graph a
        |inner join
        |   ye_oneid b
        |on
        |   a.id_hashcode=b.id_hashcode
        |group by
        |   a.guid_hashcode
        |""".stripMargin
    )
    exists_oneid.createOrReplaceTempView("exists_oneid")
    // 不存在则生成 存在则取已有的 这里 nvl 就是 oneid  的更新逻辑,存在则获取 不存在则生成
    val today_oneid=spark.sql(s"""|insert overwrite table dwd_patient_oneid_info_df partition(ds='${bizdate}')
        |select
        |   nvl(b.oneid,md5(cast(a.guid_hashcode as string))) as oneid,c.id,a.id_hashcode,d.id as guid,a.guid_hashcode
        |from
        |   to_graph a
        |left join
        |   exists_oneid b
        |on
        |   a.guid_hashcode=b.guid_hashcode
        |left join
        |   veritx c
        |on
        |   a.id_hashcode=c.id_hashcode
        |left join
        |   veritx d
        |on
        |   a.guid_hashcode=d.id_hashcode
        |""".stripMargin
    )
    sc.stop
  }

}

这个代码中咱们应用了 SparkSQL, 其实你如果更加善于 RDD 的 API,也能够应用 RDD 优化,须要留神的是网上的很多代码中应用了播送变量,将vertices 变量播送了进来,其实这个时候存在一个危险那就是如果你的 vertices 变量十分大,你播送的时候存在 OOM 的危险,然而如果你应用了 SparkSQL 的话,Spark 就会依据理论的状况,帮你主动优化。

优化点 增量优化

咱们看到咱们每次都是全量的图,其实咱们能够将咱们的 OneID 表加载进来,而后将咱们的增量数据和已有的图数据进行合并,而后再去生成图

val veritx = ye_veritx.union(to_veritx)
val edges = ye_edges.union(to_edges)

val graph = Graph(veritx, edges)

总结

  1. ID MappingOneID 的提前,OneIDID Mapping 的后果,所以要想做OneID 必须先做ID Mapping;
  2. OneID 是为了买通整个数据体系的数据,所以OneID 须要以服务的形式对外提供服务,在数仓外面就是作为根底表应用,对外的话咱们就须要提供接口对外提供服务
正文完
 0