共计 4358 个字符,预计需要花费 11 分钟才能阅读完成。
前言
讲数据品质监控重要性和实践的文章,去网上能够搜到很多,这篇文章次要讲怎么设计和开发数据品质监控平台。
第一局部:介绍
数据品质监控平台(DQC)是反对多数据源的依据用户配置的品质监控规定及时发现问题,并通过邮件告诉告警的一站式平台。
目前,数据品质监控性能为用户提供 10 余种预设的数据品质检测模板,反对:PSI 计算、缺失分区和及时性查看、磁盘稳定率查看、表行数稳定率查看、饱和度查看、列异样数据查看、主键反复值检查和计算统计指标等性能。
想理解咱们公司的数据处理流程和数据监控内容,能够参考墨竹的文章,但请留神截图为老版本 DQC,新版本 UI 产生了较多的变动。平台新版本的截图如下。极客星球 | 数据品质保障之路的摸索与实际
DQC,在架构上分为 2 层,java 开发负责的业务层,大数据开发负责的数据处理层。
业务层是一个 web 服务,实现了对用户和检测对象的治理,对检测对象的计算调度,对返回监控后果的展现。
数据处理层实现了具体的数据品质监控工作。DQC 的架构图如下:
因为自己是大数据平台开发,所以设计和实现次要集中在数据品质监控的数据处理层。业务层的检测对象配置和调度因为和数据处理层分割比拟亲密,所以也参加了设计。
第二局部:设计
数据品质的外围性能实现,大抵就是依据须要事后定义的监控指标设计出计算模版 (sql 和 api)。
而后依据具体须要监控的对象填充对应的模版就能够进行监控。
上面将展现我设计和实现 DQC 的过程。
一、基本思路
规定的显示大抵能够分为 2 类:
1.sql 能够解决的,例如表行数稳定率,数据异样率等。
2. 其余类型,通过脚本或者接口来实现,例如缺失分区查看,磁盘容量稳定率等。
先给一个稳定率的公式:
1.1 SQL 模版具体的例子(表行数稳定率):
1 CREATE TEMPORARY VIEW a as
2 SELECT COUNT(*)
3 FROM ${database}.${table}
4 WHERE day = ‘today – period’;
5
6 CREATE TEMPORARY VIEW b as
7 SELECT COUNT(*)
8 FROM ${database}.${table}
9 WHERE day = ‘today’;
10
11 SELECT (b.cnt – a.cnt) / b.cnt as rate
12 FROM a JOIN b
在这个表行数稳定率 sql 模版中须要填充的变量是库表名和工夫周期(日,2 周,1 月等)。
在这个 sql 模版在理论中会有很多有余,例如日期字段不是 day 而是 date,用户还须要额定的可配置过滤条件等等。这些都须要在联合生产进行进一步的设计和实现。
1.2 脚本的例子(磁盘容量稳定率):
以下的代码并不能运行,须要再附加一些额定解决。
rate = (b – a) / b
从这 2 个例子中,能够看出这里一共由 2 个子步骤组成。首先是就是获取规定中所须要的统计数量(表行数,磁盘容量等);而后是依据具体的规定指标(稳定率)进行计算。
通过计算失去规定理论后果,须要跟预期值进行比拟。例如心愿表行数稳定在 [-0.2, 0.2] 之间,如果是 0.5 就告警,告诉对应负责人进行解决。
二、模块化设计
依据基本思路中三步骤,来具体开展。我在设计零碎的时候,参考了 ebay 开源的 Apache Griffin。
2.1 统计数量的计算获取
统计数量的计算获取能够分为 2 个局部,一个是数据源,一个统计数量。
数据源能够是 hive,hbase,mysql,mongodb 等等。
统计数量也就是计算或者是获取到对应的数据,在基本思路中能够看到,这里的计算和获取动作是会反复的。所以在这一层是能够做一些形象的。能够细化出一些小的办法,而后进行组合,来失去理论的成果。
这一层形象出了数据源和数据计算子步骤。
2.2 规定指标的计算
规定指标的计算就是最重要的一部分,这部分会影响到下面的统计数量具体的实现。
下面是咱们在理论生产教训中提取进去须要监控的规定指标。
一共 14 个,除了第一个缺失分区查看能够间接失去后果。前面都含有计算逻辑在其中。统计指标我间接应用 spark dataframe 的 summary 算子失去的,这里也能够应用 udf 来实现。其余的 12 个大抵由以下操作 (操作蕴含了 2.1 中统计数量) 失去。
1. 检查数据是否匹配 Accuracy
2. 查看数据完整性 Completeness
3. 数据去重计数和区间计数 Distinctness
4. 环比 Ratio
5.PSI
所以在这里,我依据须要计算的指标模版进行了具体的操作拆分,造成 4 个计算步骤组合,其子步骤局部,不同步骤组之间能够共享,也能够独自实现子步骤组。察看下面能够看到有 sample 这个步骤,做数据质量检查的时候,全量有时候太慢,就失去了及时性,同时也会对集群资源造成压力。所以须要提供数据采样性能。
这里附一张咱们零碎实现的步骤组。多进去的 2 个,MultiRatio 是做分组稳定计算的,算是 ratio 的扩大。profiling 能够算是子步骤的一种,用户理论解决数据中,会有更高的要求,profiling 这个步骤组能够实现 spark sql 计算的所有性能(用户并不会传残缺的 sql,所以须要这样一个步骤组来解决)。自己对非凡指标,例如缺失分区查看,就独自实现了,并不会用到步骤组,前面实现局部能够看到另一层形象。
所以在这个局部,咱们实现对整个数据计算的步骤形象。
2.3 后果的查看与输入
通过以上 2 步,咱们就到了指标后果。咱们须要将指标后果落库,同时对指标后果进行查看,不合乎用户设定的阈值就发送邮件告警,让用户来解决。
后果输入局部,后果是以约定的 json 模式存储,采纳 json 存储,看中其可伸缩性。起因如下:在计算指标的时候,一些两头后果会在查错提供很大帮忙。例如:是咱们计算异样占比的时候,同时会计算出异样值的数量和总量,当后果不合乎预期的时候,这些内容是很重要的查错信息。咱们能够抉择 mongodb,mysql 等存储最终后果。
后果查看局部,我用了 scala 的解析器组合子库来实现,具体见实现。在后果输入局部提到了,后果可能输入多个,那同样的,用户也就能够同时查看多个后果值,提供了 dsl 来实现这项工作。
在这一层,咱们实现了后果输入的形象和后果查看的性能实现。
三、实现
整个数据处理层会应用 spark 来作为计算引擎。依据业务层传来的 Json 参数来进行具体的工作生成。下图是工作参数框架。
- 数据源
因为采纳 spark 作为计算引擎,所以从数据源中获取对应的数据用 spark 的算子即可。spark 自身实现对很多数据源的数据提取,所以整个数据源局部的实现绝对简略,能够依据本人理论的需要来编写数据源类。对每种数据源会有本人须要定制化的内容,咱们在 config 参数中进行即可。
我会拿 Hive 数据源解说一下,实现了哪些内容,数据库类的数据源绝对容易实现。针对 Hive 数据源,首先是须要一些过滤条件,这部分以 key value 对的形式来配置。
在数据质量检查中最重要的一个变量就是工夫。对于工夫,次要有 2 个须要解决的点。
1. 因为具体的工夫格局不同,有的是 yyyyMM, yyyy-MM-dd, 或者具体到工夫。咱们采纳的计划是业务层对立传 yyyyMMdd,其为变量 $day,能够在分区字段中应用即可。还提供了 ${latest_partition}变量来获取最新分区。以及对这 2 个变量能够应用 spark udf 来获取指定的工夫格局。
2. 另外一些指标,例如稳定率,是同一个数据 2 个工夫点的比拟。所以还会带入工夫周期的概念。例如每天,每周二,每月 16 日。
- 构建计算步骤
这里许多概念和 Apache Griffin 统一,但具体实现有很大差别。
业务层首先会依据触发条件和库表聚合调度,使得同一个数据源同一日期的检测对象一起调度。
检测对象下文都称为 rule。
第一步 rule 的解决
构建的第一步就是对 rule 进行解决,例如 psi 这类的 rule,能够先拆分成多个 rule,例如分布区间的 psi,能够拆分为分布区间和 psi 2 个 rule,这里就存在了依赖关系。构建了一个 rule 的 DAG。
第二步 rule 转换成执行打算
一个 rule 转换成一个执行打算。执行打算的性能就是生成具体的工作步骤。
第三步 生成计算步骤
这一步有很多的优化空间,具体内容见四、优化
计算步骤分为 4 大类:
1. 不须要走 spark 工作计算,例如磁盘容量,间接用 hadoop api 查问即可。
2.spark sql 步骤组就是 2.2 规定指标的计算中提到的,由 4 组成。
3.spark 算子,例如查问 hbase 和数据库,summary 算子之类的操作。
4.spark sql 计算步骤,写好的 spark sql,预留了变量填充。
每一个执行打算会对应一个或者多个计算步骤。如下图,展现了表行数稳定率和异样数据占比率的转换过程。
3 . 计算步骤执行
通过上述的转换,咱们能够失去计算步骤的 DAG,之后对这个 DAG 进行并发执行就能够了。
这样的并发执行能够更充沛的应用 sprak executor 的资源。比方串行执行,200cores 的资源,一个 job 因为数据歪斜等起因,最初 20 个 task 执行较慢。这时候前面的 job 并不会启动,而且可能因为等待时间过长,局部 executor 被开释了。但如果是并发执行,闲暇的 180cores 就会执行其余未实现的 job。
- 后果告警
后果查看局部,我用了 scala 的解析器组合子库来实现,BNF 和规定如下图。失去后果后就会发给邮件指定的负责人,这里也能够反对短信和电话。
- 后果输入
这里是后果落库的步骤,依据不同库的写入形式落库即可。
四、优化
这里的优化都是针对构建计算步骤的。整体的优化方向就是合并 DAG 和复用曾经存在的后果。
- 过程复用
计算步骤 DAG 能够依据节点是否计算逻辑雷同进行合并重组,造成优化的计算步骤 DAG。
如图所示,16 个计算步骤被优化成 11 个计算步骤。被复用的节点会用 df.cache() 算子来进行缓存,减速计算。
这里次要实现有节点的身份信息(其计算逻辑和数据信息),根据节点的身份信息来进行比拟合并。
同时须要根据更新的节点信息来合并多个子 DAG。
2 . 计算步骤 DAG 节点转换
常量替换
计算异样数据稳定率,公式为异样数据量 / 总数据量。如果用户应用该规定,采样了 10w 数据,那总数据量其实并不需要触发计算,能够间接转换成常量。公式就会转换成异样数据量 / 10w。
过往后果复用
表行数稳定率(每日),公式为(今⽇数据量—昨⽇数据量)/ 昨⽇数据量。
7 月 2 日的时候,公式为
7 月 3 日的时候,公式为
能够从中看到 7 月 3 日计算表行数稳定率时,7 月 2 日的 Cnt 能够读取前一天的后果,从而防止计算。但须要留神的是,复用这个后果之前,须要查看前一天后果计算结束之后,分区数据和检测对象配置有没有产生扭转。没有产生扭转才能够复用,否则,必须从新计算。
这种将计算步骤转换成常量读取,来躲避触发计算,也能带来显著的性能晋升。