明天是我在上海租房的小区被封的第三天,因为我的粗心,没有屯吃的,外卖明天齐全点不到了,中午的时候我找到了一包快过期的肉松饼,才补充了1000焦耳的能量。然而中午去做核酸的时候,我感觉走路有点不稳,我看到大白的棉签深刻我的嘴里,我居然认为是吃的,差点咬住了,还好我有仅存的一点意识。下午我收到女朋友给我点的外卖——面包(我不晓得她是怎么点到的外卖,我很打动),很粗劣的面包,搁平时我根本不喜爱吃面包,然而曾经到了这个份上,我大口吃起来,居然感觉这是世界上最好吃的食物了。今天晚上5:50的闹钟,去叮咚和美团买菜,看能不能抢几桶泡面吧。愿神保佑,我暗暗下着信心并祷告着,胸前画着十字。。。

数据仓库系列文章(继续更新)

  1. 数仓架构发展史
  2. 数仓建模方法论
  3. 数仓建模分层实践
  4. 数仓建模—宽表的设计
  5. 数仓建模—指标体系
  6. 数据仓库之拉链表
  7. 数仓—数据集成
  8. 数仓—数据集市
  9. 数仓—商业智能零碎
  10. 数仓—埋点设计与治理
  11. 数仓—ID Mapping
  12. 数仓—OneID
  13. 数仓—AARRR海盗模型
  14. 数仓—总线矩阵
  15. 数仓—数据安全
  16. 数仓—数据品质
  17. 数仓—数仓建模和业务建模
关注公众号:大数据技术派,回复: 材料,支付1024G材料。

OneID

后面咱们学习了ID Mapping,包含ID Mapping 的背景介绍和业务场景,以及如何应用Spark 实现ID Mapping,这个过程中波及到了很多货色,当然咱们都通过文章的模式介绍给大家了,所以你再学习明天这一节之前,能够先看一下后面的文章

  1. Spark实战—GraphX编程指南
  2. 数仓建模—ID Mapping

在上一节咱们介绍ID Mapping 的时候咱们就说过ID Mapping 是为了买通用户各个维度的数据,从而打消数据孤岛、防止数据歧义,从而更好的刻画用户,所以说ID Mapping是伎俩不是目标,目标是为了买通数据体系,ID Mapping最终的产出就是咱们明天的配角OneID,也就是说数据收集过去之后通过ID Mapping 买通,从而产生OneID,这一步之后咱们的整个数据体系就将应用OneID作为用户的ID,这样咱们整个数据体系就得以买通

OneData

开始之前咱们先看一下阿里的OneData 数据体系,从而更好认识一下OneID,后面咱们说过ID Mapping 只是伎俩不是目标,目标是为了买通数据体系,ID Mapping最终的产出就是OneID

其实OneID在咱们整个数据服务体系中,也只是终点不是起点或者说是伎俩,咱们最终的目标是为了建设对立的数据资产体系。

没有建设对立的数据资产体系之前,咱们的数据体系建设存在上面诸多问题

  1. 数据孤岛:各产品、业务的数据互相隔离,难以通过共性ID买通
  2. 反复建设:反复的开发、计算、存储,带来昂扬的数据老本
  3. 数据歧义:指标定义口径不统一,造成计算偏差,利用艰难

在阿里巴巴 OneData 体系中,OneID 指对立数据萃取,是一套解决数据孤岛问题的思维和办法。数据孤岛是企业倒退到肯定阶段后广泛遇到的问题。各个部门、业务、产品,各自定义和存储其数据,使得这些数据间难以关联,变成孤岛个别的存在。

OneID的做法是通过对立的实体辨认和连贯,突破数据孤岛,实现数据通融。简略来说,用户、设施等业务实体,在对应的业务数据中,会被映射为惟一辨认(UID)上,其各个维度的数据通过这个UID进行关联。

各个部门、业务、产品对业务实体的UID的定义和实现不一样,使得数据间无奈间接关联,成为了数据孤岛。基于手机号、身份证、邮箱、设施ID等信息,联合业务规定、机器学习、图算法等算法,进行 ID-Mapping,将各种 UID 都映射到对立ID上。通过这个对立ID,便可关联起各个数据孤岛的数据,实现数据通融,以确保业务剖析、用户画像等数据利用的精确和全面。

OneModel 对立数据构建和治理

将指标定位细化为:

1. 原子指标2. 工夫周期3. 修饰词(统计粒度、业务限定, etc)

通过这些定义,设计出各类派生指标 基于数据分层,设计出维度表、明细事实表、汇总事实表,其实咱们看到OneModel 其实没有什么新的内容,其实就是咱们数仓建模的那一套货色

OneService 对立数据服务

OneService 基于复用而不是复制数据的思维,指得是咱们的对立的数据服务,因为咱们始终再提倡复用,包含咱们数仓的建设,然而咱们的数据服务这一块却是空白,所以OneService外围是服务的复用,能力包含:

  • 利用主题逻辑表屏蔽简单物理表的主题式数据服务
  • 个别查问+ OLAP 剖析+在线服务的对立且多样化数据服务
  • 屏蔽多种异构数据源的跨源数据服务

OneID 对立数据萃取

基于对立的实体辨认、连贯和标签生产,实现数据通融,包含:

  • ID自动化辨认与连贯
  • 行为元素和行为规定
  • 标签生产

OneID基于超强ID辨认技术链接数据,高效生产标签;业务驱动技术价值化,打消数据孤岛,晋升数据品质,晋升数据价值。

而ID的买通,必须有ID-ID之间的两两映射买通关系通过ID映射关系表,能力将多种ID之间的关联买通,齐全孤立的两种ID是无奈买通的

买通整个ID体系,看似简略,实则计算简单,计算量十分大。如果某种对象有数亿个个体,每个个体又有数十种不同的ID标识,任意两种ID之间都有可能买通关系,想要实现这类对象的所有个体ID买通须要数亿次计算,个别的机器甚至大数据集群都无奈实现。

大数据畛域中的ID-Mapping技术就是用机器学习算法类来取代横蛮计算,解决对象数据买通的问题。基于输出的ID关系对,利用机器学习算法做稳定性和收敛性计算,输入关系稳固的ID关系对,并生成一个UID作为惟一辨认该对象的标识码。

OneID实现过程中存在的问题

后面咱们晓得咱们的ID Mapping 是通过图计算实现,外围就是连通图,其实实现OneID咱们在买通ID 之后,咱们就能够为一个个连通图生成一个ID, 因为一个连通图 就代表一个用户,这样咱们生成的ID就是用户的OneID,这里的用户指的是自然人,而不是某一个平台上的用户。

OneID 的生成问题

首先咱们须要一个ID 生成算法,因为咱们须要为大量用户生成ID,咱们的ID 要求是惟一的,所以在算法设计的时候就须要思考到这一点,咱们并不举荐应用UUID,起因是UUID了可能会呈现反复,而且UUID 没有含意,所以咱们不举荐应用UUID,咱们这里应用的是MD5 算法,所以咱们的MD5 算法的参数是咱们的图的标示ID。

OneID 的更新问题

这里的更新问题次要就是咱们的数据每天都在更新,也就是说咱们的图关系在更新,也就是说咱们要不要给这个自然人从新生成OneID ,因为他的图关系可能产生了变动。

其实这里咱们不能为该自然人生成新的OneID ,否则咱们数仓里的历史数据可能无奈关联应用,所以咱们的策略就是如果该自然人曾经有OneID了,则不须要从新生成,其实这里咱们就是判断该图中的所有的顶点是否存在OneID,咱们前面在代码中体现着一点。

OneID 的抉择问题

这个和下面的更新问题有点像,下面更新问题咱们能够保障一个自然人的OneID不发生变化,然而抉择问题会导致发生变化,然而这个问题是图计算中无奈防止的,咱们举个例子,假如咱们有用户的两个ID(A_ID,C_ID),然而这两个ID 在以后是没有方法买通的,所以咱们就会为这个两个ID 生成两个OneID,也就是(A_OneID,B_OneID),所以这个时候咱们晓得因为ID Mapping 不上,所以咱们认为这两个ID 是两个人。

前面咱们有了另外一个ID(B_ID),这个ID能够别离和其余的两个ID 买通,也就是B_ID<——>A_ID , B_ID<——>C_ID 这样咱们就买通这个三个ID,这个时候咱们晓得

这个用户存在三个ID,并且这个时候曾经存在了两个OneID,所以这个时候咱们须要在这两个OneID中抉择一个作为用户的OneID,简略粗犷点就能够抉择最小的或者是最大的。

咱们抉择了之后,要将另外一个OneID对应的数据,对应到抉择的OneID 下,否则没有被抉择的OneID的历史数据就无奈追溯了

OneID 代码实现

这个代码相比ID Mapping次要是多了OneID 的生成逻辑和更新逻辑 ,须要留神的是对于顶点汇合的结构咱们不是间接应用字符串的hashcode ,这是因为hashcode 很容易反复

object OneID  {    val spark = SparkSession      .builder()      .appName("OneID")      .getOrCreate()  val sc = spark.sparkContext  def main(args: Array[String]): Unit = {    val bizdate=args(0)    val c = Calendar.getInstance    val format = new SimpleDateFormat("yyyyMMdd")    c.setTime(format.parse(bizdate))    c.add(Calendar.DATE, -1)    val bizlastdate = format.format(c.getTime)    println(s" 工夫参数  ${bizdate}    ${bizlastdate}")    // dwd_patient_identity_info_df 就是咱们用户的各个ID ,也就是咱们的数据源    // 获取字段,这样咱们就能够扩大新的ID 字段,然而不必更新代码    val columns = spark.sql(      s"""         |select         |   *         |from         |   lezk_dw.dwd_patient_identity_info_df         |where         |   ds='${bizdate}'         |limit         |   1         |""".stripMargin)      .schema.fields.map(f => f.name).filterNot(e=>e.equals("ds")).toList    // 获取数据    val dataFrame = spark.sql(      s"""        |select        |   ${columns.mkString(",")}        |from        |   lezk_dw.dwd_patient_identity_info_df        |where        |   ds='${bizdate}'        |""".stripMargin    )    // 数据筹备    val data = dataFrame.rdd.map(row => {      val list = new ListBuffer[String]()      for (column <- columns) {        val value = row.getAs[String](column)        list.append(value)      }      list.toList    })    import spark.implicits._    // 顶点汇合    val veritx= data.flatMap(list => {      for (i <- 0 until columns.length if StringUtil.isNotBlank(list(i)) && (!"null".equals(list(i))))        yield (new BigInteger(DigestUtils.md5Hex(list(i)),16).longValue, list(i))    }).distinct    val veritxDF=veritx.toDF("id_hashcode","id")    veritxDF.createOrReplaceTempView("veritx")    // 生成边的汇合    val edges = data.flatMap(list => {      for (i <- 0 to list.length - 2 if StringUtil.isNotBlank(list(i)) && (!"null".equals(list(i)))           ; j <- i + 1 to list.length - 1 if StringUtil.isNotBlank(list(j)) && (!"null".equals(list(j))))      yield Edge(new BigInteger(DigestUtils.md5Hex(list(i)),16).longValue,new BigInteger(DigestUtils.md5Hex(list(j)),16).longValue, "")    }).distinct    // 开始应用点汇合与边汇合进行图计算训练    val graph = Graph(veritx, edges)    val connectedGraph=graph.connectedComponents()    // 连通节点    val  vertices = connectedGraph.vertices.toDF("id_hashcode","guid_hashcode")    vertices.createOrReplaceTempView("to_graph")    // 加载昨日的oneid 数据 (oneid,id,id_hashcode)     val ye_oneid = spark.sql(      s"""        |select        |   oneid,id,id_hashcode        |from        |   lezk_dw.dwd_patient_oneid_info_df        |where        |   ds='${bizlastdate}'        |""".stripMargin    )    ye_oneid.createOrReplaceTempView("ye_oneid")    // 关联获取 曾经存在的 oneid,这里的min 函数就是咱们说的oneid 的抉择问题    val exists_oneid=spark.sql(      """        |select        |   a.guid_hashcode,min(b.oneid) as oneid        |from        |   to_graph a        |inner join        |   ye_oneid b        |on        |   a.id_hashcode=b.id_hashcode        |group by        |   a.guid_hashcode        |""".stripMargin    )    exists_oneid.createOrReplaceTempView("exists_oneid")    // 不存在则生成 存在则取已有的 这里nvl 就是oneid  的更新逻辑,存在则获取 不存在则生成    val today_oneid=spark.sql(      s"""        |insert overwrite table dwd_patient_oneid_info_df partition(ds='${bizdate}')        |select        |   nvl(b.oneid,md5(cast(a.guid_hashcode as string))) as oneid,c.id,a.id_hashcode,d.id as guid,a.guid_hashcode        |from        |   to_graph a        |left join        |   exists_oneid b        |on        |   a.guid_hashcode=b.guid_hashcode        |left join        |   veritx c        |on        |   a.id_hashcode=c.id_hashcode        |left join        |   veritx d        |on        |   a.guid_hashcode=d.id_hashcode        |""".stripMargin    )    sc.stop  }}

这个代码中咱们应用了SparkSQL,其实你如果更加善于RDD的API,也能够应用RDD 优化,须要留神的是网上的很多代码中应用了播送变量,将vertices 变量播送了进来,其实这个时候存在一个危险那就是如果你的vertices 变量十分大,你播送的时候存在OOM 的危险,然而如果你应用了SparkSQL的话,Spark 就会依据理论的状况,帮你主动优化。

优化点 增量优化

咱们看到咱们每次都是全量的图,其实咱们能够将咱们的OneID 表加载进来,而后将咱们的增量数据和已有的图数据进行合并,而后再去生成图

val veritx = ye_veritx.union(to_veritx)val edges = ye_edges.union(to_edges)val graph = Graph(veritx, edges)

总结

  1. ID MappingOneID 的提前,OneIDID Mapping 的后果,所以要想做OneID必须先做ID Mapping;
  2. OneID 是为了买通整个数据体系的数据,所以OneID 须要以服务的形式对外提供服务,在数仓外面就是作为根底表应用,对外的话咱们就须要提供接口对外提供服务