明天是我在上海租房的小区被封的第三天,因为我的粗心,没有屯吃的,外卖明天齐全点不到了,中午的时候我找到了一包快过期的肉松饼,才补充了1000
焦耳的能量。然而中午去做核酸的时候,我感觉走路有点不稳,我看到大白的棉签深刻我的嘴里,我居然认为是吃的,差点咬住了,还好我有仅存的一点意识。下午我收到女朋友给我点的外卖——面包(我不晓得她是怎么点到的外卖,我很打动),很粗劣的面包,搁平时我根本不喜爱吃面包,然而曾经到了这个份上,我大口吃起来,居然感觉这是世界上最好吃的食物了。今天晚上5:50的闹钟,去叮咚和美团买菜,看能不能抢几桶泡面吧。愿神保佑,我暗暗下着信心并祷告着,胸前画着十字。。。
数据仓库系列文章(继续更新)
- 数仓架构发展史
- 数仓建模方法论
- 数仓建模分层实践
- 数仓建模—宽表的设计
- 数仓建模—指标体系
- 数据仓库之拉链表
- 数仓—数据集成
- 数仓—数据集市
- 数仓—商业智能零碎
- 数仓—埋点设计与治理
- 数仓—ID Mapping
- 数仓—OneID
- 数仓—AARRR海盗模型
- 数仓—总线矩阵
- 数仓—数据安全
- 数仓—数据品质
- 数仓—数仓建模和业务建模
关注公众号:大数据技术派
,回复:材料
,支付1024G
材料。
OneID
后面咱们学习了ID Mapping
,包含ID Mapping
的背景介绍和业务场景,以及如何应用Spark
实现ID Mapping
,这个过程中波及到了很多货色,当然咱们都通过文章的模式介绍给大家了,所以你再学习明天这一节之前,能够先看一下后面的文章
- Spark实战—GraphX编程指南
- 数仓建模—ID Mapping
在上一节咱们介绍ID Mapping 的时候咱们就说过ID Mapping 是为了买通用户各个维度的数据,从而打消数据孤岛、防止数据歧义,从而更好的刻画用户,所以说ID Mapping是伎俩不是目标,目标是为了买通数据体系,ID Mapping最终的产出就是咱们明天的配角OneID,也就是说数据收集过去之后通过ID Mapping 买通,从而产生OneID,这一步之后咱们的整个数据体系就将应用OneID作为用户的ID,这样咱们整个数据体系就得以买通
OneData
开始之前咱们先看一下阿里的OneData 数据体系,从而更好认识一下OneID,后面咱们说过ID Mapping 只是伎俩不是目标,目标是为了买通数据体系,ID Mapping最终的产出就是OneID
其实OneID在咱们整个数据服务体系中,也只是终点不是起点或者说是伎俩,咱们最终的目标是为了建设对立的数据资产体系。
没有建设对立的数据资产体系之前,咱们的数据体系建设存在上面诸多问题
- 数据孤岛:各产品、业务的数据互相隔离,难以通过共性ID买通
- 反复建设:反复的开发、计算、存储,带来昂扬的数据老本
- 数据歧义:指标定义口径不统一,造成计算偏差,利用艰难
在阿里巴巴 OneData 体系中,OneID 指对立数据萃取,是一套解决数据孤岛问题的思维和办法。数据孤岛是企业倒退到肯定阶段后广泛遇到的问题。各个部门、业务、产品,各自定义和存储其数据,使得这些数据间难以关联,变成孤岛个别的存在。
OneID的做法是通过对立的实体辨认和连贯,突破数据孤岛,实现数据通融。简略来说,用户、设施等业务实体,在对应的业务数据中,会被映射为惟一辨认(UID)上,其各个维度的数据通过这个UID进行关联。
各个部门、业务、产品对业务实体的UID的定义和实现不一样,使得数据间无奈间接关联,成为了数据孤岛。基于手机号、身份证、邮箱、设施ID等信息,联合业务规定、机器学习、图算法等算法,进行 ID-Mapping,将各种 UID 都映射到对立ID上。通过这个对立ID,便可关联起各个数据孤岛的数据,实现数据通融,以确保业务剖析、用户画像等数据利用的精确和全面。
OneModel 对立数据构建和治理
将指标定位细化为:
1. 原子指标2. 工夫周期3. 修饰词(统计粒度、业务限定, etc)
通过这些定义,设计出各类派生指标 基于数据分层,设计出维度表、明细事实表、汇总事实表,其实咱们看到OneModel 其实没有什么新的内容,其实就是咱们数仓建模的那一套货色
OneService 对立数据服务
OneService 基于复用而不是复制数据的思维,指得是咱们的对立的数据服务,因为咱们始终再提倡复用,包含咱们数仓的建设,然而咱们的数据服务这一块却是空白,所以OneService外围是服务的复用,能力包含:
- 利用主题逻辑表屏蔽简单物理表的主题式数据服务
- 个别查问+ OLAP 剖析+在线服务的对立且多样化数据服务
- 屏蔽多种异构数据源的跨源数据服务
OneID 对立数据萃取
基于对立的实体辨认、连贯和标签生产,实现数据通融,包含:
- ID自动化辨认与连贯
- 行为元素和行为规定
- 标签生产
OneID基于超强ID辨认技术链接数据,高效生产标签;业务驱动技术价值化,打消数据孤岛,晋升数据品质,晋升数据价值。
而ID的买通,必须有ID-ID之间的两两映射买通关系,通过ID映射关系表,能力将多种ID之间的关联买通,齐全孤立的两种ID是无奈买通的。
买通整个ID体系,看似简略,实则计算简单,计算量十分大。如果某种对象有数亿个个体,每个个体又有数十种不同的ID标识,任意两种ID之间都有可能买通关系,想要实现这类对象的所有个体ID买通须要数亿次计算,个别的机器甚至大数据集群都无奈实现。
大数据畛域中的ID-Mapping技术就是用机器学习算法类来取代横蛮计算,解决对象数据买通的问题。基于输出的ID关系对,利用机器学习算法做稳定性和收敛性计算,输入关系稳固的ID关系对,并生成一个UID作为惟一辨认该对象的标识码。
OneID实现过程中存在的问题
后面咱们晓得咱们的ID Mapping 是通过图计算实现,外围就是连通图,其实实现OneID咱们在买通ID 之后,咱们就能够为一个个连通图生成一个ID, 因为一个连通图 就代表一个用户,这样咱们生成的ID就是用户的OneID,这里的用户指的是自然人,而不是某一个平台上的用户。
OneID 的生成问题
首先咱们须要一个ID 生成算法,因为咱们须要为大量用户生成ID,咱们的ID 要求是惟一的,所以在算法设计的时候就须要思考到这一点,咱们并不举荐应用UUID,起因是UUID了可能会呈现反复,而且UUID 没有含意,所以咱们不举荐应用UUID,咱们这里应用的是MD5 算法,所以咱们的MD5 算法的参数是咱们的图的标示ID。
OneID 的更新问题
这里的更新问题次要就是咱们的数据每天都在更新,也就是说咱们的图关系在更新,也就是说咱们要不要给这个自然人从新生成OneID ,因为他的图关系可能产生了变动。
其实这里咱们不能为该自然人生成新的OneID ,否则咱们数仓里的历史数据可能无奈关联应用,所以咱们的策略就是如果该自然人曾经有OneID了,则不须要从新生成,其实这里咱们就是判断该图中的所有的顶点是否存在OneID,咱们前面在代码中体现着一点。
OneID 的抉择问题
这个和下面的更新问题有点像,下面更新问题咱们能够保障一个自然人的OneID不发生变化,然而抉择问题会导致发生变化,然而这个问题是图计算中无奈防止的,咱们举个例子,假如咱们有用户的两个ID(A_ID,C_ID),然而这两个ID 在以后是没有方法买通的,所以咱们就会为这个两个ID 生成两个OneID,也就是(A_OneID,B_OneID),所以这个时候咱们晓得因为ID Mapping 不上,所以咱们认为这两个ID 是两个人。
前面咱们有了另外一个ID(B_ID),这个ID能够别离和其余的两个ID 买通,也就是B_ID<——>A_ID , B_ID<——>C_ID 这样咱们就买通这个三个ID,这个时候咱们晓得
这个用户存在三个ID,并且这个时候曾经存在了两个OneID,所以这个时候咱们须要在这两个OneID中抉择一个作为用户的OneID,简略粗犷点就能够抉择最小的或者是最大的。
咱们抉择了之后,要将另外一个OneID对应的数据,对应到抉择的OneID 下,否则没有被抉择的OneID的历史数据就无奈追溯了
OneID 代码实现
这个代码相比ID Mapping次要是多了OneID 的生成逻辑和更新逻辑 ,须要留神的是对于顶点汇合的结构咱们不是间接应用字符串的hashcode ,这是因为hashcode 很容易反复
object OneID { val spark = SparkSession .builder() .appName("OneID") .getOrCreate() val sc = spark.sparkContext def main(args: Array[String]): Unit = { val bizdate=args(0) val c = Calendar.getInstance val format = new SimpleDateFormat("yyyyMMdd") c.setTime(format.parse(bizdate)) c.add(Calendar.DATE, -1) val bizlastdate = format.format(c.getTime) println(s" 工夫参数 ${bizdate} ${bizlastdate}") // dwd_patient_identity_info_df 就是咱们用户的各个ID ,也就是咱们的数据源 // 获取字段,这样咱们就能够扩大新的ID 字段,然而不必更新代码 val columns = spark.sql( s""" |select | * |from | lezk_dw.dwd_patient_identity_info_df |where | ds='${bizdate}' |limit | 1 |""".stripMargin) .schema.fields.map(f => f.name).filterNot(e=>e.equals("ds")).toList // 获取数据 val dataFrame = spark.sql( s""" |select | ${columns.mkString(",")} |from | lezk_dw.dwd_patient_identity_info_df |where | ds='${bizdate}' |""".stripMargin ) // 数据筹备 val data = dataFrame.rdd.map(row => { val list = new ListBuffer[String]() for (column <- columns) { val value = row.getAs[String](column) list.append(value) } list.toList }) import spark.implicits._ // 顶点汇合 val veritx= data.flatMap(list => { for (i <- 0 until columns.length if StringUtil.isNotBlank(list(i)) && (!"null".equals(list(i)))) yield (new BigInteger(DigestUtils.md5Hex(list(i)),16).longValue, list(i)) }).distinct val veritxDF=veritx.toDF("id_hashcode","id") veritxDF.createOrReplaceTempView("veritx") // 生成边的汇合 val edges = data.flatMap(list => { for (i <- 0 to list.length - 2 if StringUtil.isNotBlank(list(i)) && (!"null".equals(list(i))) ; j <- i + 1 to list.length - 1 if StringUtil.isNotBlank(list(j)) && (!"null".equals(list(j)))) yield Edge(new BigInteger(DigestUtils.md5Hex(list(i)),16).longValue,new BigInteger(DigestUtils.md5Hex(list(j)),16).longValue, "") }).distinct // 开始应用点汇合与边汇合进行图计算训练 val graph = Graph(veritx, edges) val connectedGraph=graph.connectedComponents() // 连通节点 val vertices = connectedGraph.vertices.toDF("id_hashcode","guid_hashcode") vertices.createOrReplaceTempView("to_graph") // 加载昨日的oneid 数据 (oneid,id,id_hashcode) val ye_oneid = spark.sql( s""" |select | oneid,id,id_hashcode |from | lezk_dw.dwd_patient_oneid_info_df |where | ds='${bizlastdate}' |""".stripMargin ) ye_oneid.createOrReplaceTempView("ye_oneid") // 关联获取 曾经存在的 oneid,这里的min 函数就是咱们说的oneid 的抉择问题 val exists_oneid=spark.sql( """ |select | a.guid_hashcode,min(b.oneid) as oneid |from | to_graph a |inner join | ye_oneid b |on | a.id_hashcode=b.id_hashcode |group by | a.guid_hashcode |""".stripMargin ) exists_oneid.createOrReplaceTempView("exists_oneid") // 不存在则生成 存在则取已有的 这里nvl 就是oneid 的更新逻辑,存在则获取 不存在则生成 val today_oneid=spark.sql( s""" |insert overwrite table dwd_patient_oneid_info_df partition(ds='${bizdate}') |select | nvl(b.oneid,md5(cast(a.guid_hashcode as string))) as oneid,c.id,a.id_hashcode,d.id as guid,a.guid_hashcode |from | to_graph a |left join | exists_oneid b |on | a.guid_hashcode=b.guid_hashcode |left join | veritx c |on | a.id_hashcode=c.id_hashcode |left join | veritx d |on | a.guid_hashcode=d.id_hashcode |""".stripMargin ) sc.stop }}
这个代码中咱们应用了SparkSQL,其实你如果更加善于RDD的API,也能够应用RDD 优化,须要留神的是网上的很多代码中应用了播送变量,将vertices
变量播送了进来,其实这个时候存在一个危险那就是如果你的vertices 变量十分大,你播送的时候存在OOM 的危险,然而如果你应用了SparkSQL的话,Spark 就会依据理论的状况,帮你主动优化。
优化点 增量优化
咱们看到咱们每次都是全量的图,其实咱们能够将咱们的OneID 表加载进来,而后将咱们的增量数据和已有的图数据进行合并,而后再去生成图
val veritx = ye_veritx.union(to_veritx)val edges = ye_edges.union(to_edges)val graph = Graph(veritx, edges)
总结
ID Mapping
是OneID
的提前,OneID
是ID Mapping
的后果,所以要想做OneID
必须先做ID Mapping
;OneID
是为了买通整个数据体系的数据,所以OneID
须要以服务的形式对外提供服务,在数仓外面就是作为根底表应用,对外的话咱们就须要提供接口对外提供服务