乐趣区

关于数据库:极客星球-数据质量监控的设计与实现

前言

讲数据品质监控重要性和实践的文章,去网上能够搜到很多,这篇文章次要讲怎么设计和开发数据品质监控平台。

第一局部:介绍

数据品质监控平台(DQC)是反对多数据源的依据用户配置的品质监控规定及时发现问题,并通过邮件告诉告警的一站式平台。

目前,数据品质监控性能为用户提供 10 余种预设的数据品质检测模板,反对:PSI 计算、缺失分区和及时性查看、磁盘稳定率查看、表行数稳定率查看、饱和度查看、列异样数据查看、主键反复值检查和计算统计指标等性能。

想理解咱们公司的数据处理流程和数据监控内容,能够参考墨竹的文章,但请留神截图为老版本 DQC,新版本 UI 产生了较多的变动。平台新版本的截图如下。极客星球 | 数据品质保障之路的摸索与实际

DQC,在架构上分为 2 层,java 开发负责的业务层,大数据开发负责的数据处理层。

业务层是一个 web 服务,实现了对用户和检测对象的治理,对检测对象的计算调度,对返回监控后果的展现。

数据处理层实现了具体的数据品质监控工作。DQC 的架构图如下:

因为自己是大数据平台开发,所以设计和实现次要集中在数据品质监控的数据处理层。业务层的检测对象配置和调度因为和数据处理层分割比拟亲密,所以也参加了设计。

第二局部:设计

数据品质的外围性能实现,大抵就是依据须要事后定义的监控指标设计出计算模版 (sql 和 api)。
而后依据具体须要监控的对象填充对应的模版就能够进行监控。
上面将展现我设计和实现 DQC 的过程。

一、基本思路

规定的显示大抵能够分为 2 类:

1.sql 能够解决的,例如表行数稳定率,数据异样率等。
2. 其余类型,通过脚本或者接口来实现,例如缺失分区查看,磁盘容量稳定率等。

先给一个稳定率的公式:

1.1 SQL 模版具体的例子(表行数稳定率):
1 CREATE TEMPORARY VIEW a as

2 SELECT COUNT(*)

3 FROM ${database}.${table}

4 WHERE day = ‘today – period’;

5

6 CREATE TEMPORARY VIEW b as

7 SELECT COUNT(*)

8 FROM ${database}.${table}

9 WHERE day = ‘today’;

10

11 SELECT (b.cnt – a.cnt) / b.cnt as rate

12 FROM a JOIN b

在这个表行数稳定率 sql 模版中须要填充的变量是库表名和工夫周期(日,2 周,1 月等)。

在这个 sql 模版在理论中会有很多有余,例如日期字段不是 day 而是 date,用户还须要额定的可配置过滤条件等等。这些都须要在联合生产进行进一步的设计和实现。

1.2 脚本的例子(磁盘容量稳定率):

以下的代码并不能运行,须要再附加一些额定解决。

rate = (b – a) / b

从这 2 个例子中,能够看出这里一共由 2 个子步骤组成。首先是就是获取规定中所须要的统计数量(表行数,磁盘容量等);而后是依据具体的规定指标(稳定率)进行计算。

通过计算失去规定理论后果,须要跟预期值进行比拟。例如心愿表行数稳定在 [-0.2, 0.2] 之间,如果是 0.5 就告警,告诉对应负责人进行解决。

二、模块化设计

依据基本思路中三步骤,来具体开展。我在设计零碎的时候,参考了 ebay 开源的 Apache Griffin。

2.1 统计数量的计算获取

统计数量的计算获取能够分为 2 个局部,一个是数据源,一个统计数量。

数据源能够是 hive,hbase,mysql,mongodb 等等。

统计数量也就是计算或者是获取到对应的数据,在基本思路中能够看到,这里的计算和获取动作是会反复的。所以在这一层是能够做一些形象的。能够细化出一些小的办法,而后进行组合,来失去理论的成果。

这一层形象出了数据源和数据计算子步骤。

2.2 规定指标的计算

规定指标的计算就是最重要的一部分,这部分会影响到下面的统计数量具体的实现。

下面是咱们在理论生产教训中提取进去须要监控的规定指标。

一共 14 个,除了第一个缺失分区查看能够间接失去后果。前面都含有计算逻辑在其中。统计指标我间接应用 spark dataframe 的 summary 算子失去的,这里也能够应用 udf 来实现。其余的 12 个大抵由以下操作 (操作蕴含了 2.1 中统计数量) 失去。

1. 检查数据是否匹配 Accuracy

2. 查看数据完整性 Completeness

3. 数据去重计数和区间计数 Distinctness

4. 环比 Ratio

5.PSI

所以在这里,我依据须要计算的指标模版进行了具体的操作拆分,造成 4 个计算步骤组合,其子步骤局部,不同步骤组之间能够共享,也能够独自实现子步骤组。察看下面能够看到有 sample 这个步骤,做数据质量检查的时候,全量有时候太慢,就失去了及时性,同时也会对集群资源造成压力。所以须要提供数据采样性能。

这里附一张咱们零碎实现的步骤组。多进去的 2 个,MultiRatio 是做分组稳定计算的,算是 ratio 的扩大。profiling 能够算是子步骤的一种,用户理论解决数据中,会有更高的要求,profiling 这个步骤组能够实现 spark sql 计算的所有性能(用户并不会传残缺的 sql,所以须要这样一个步骤组来解决)。自己对非凡指标,例如缺失分区查看,就独自实现了,并不会用到步骤组,前面实现局部能够看到另一层形象。

所以在这个局部,咱们实现对整个数据计算的步骤形象。

2.3 后果的查看与输入

通过以上 2 步,咱们就到了指标后果。咱们须要将指标后果落库,同时对指标后果进行查看,不合乎用户设定的阈值就发送邮件告警,让用户来解决。

后果输入局部,后果是以约定的 json 模式存储,采纳 json 存储,看中其可伸缩性。起因如下:在计算指标的时候,一些两头后果会在查错提供很大帮忙。例如:是咱们计算异样占比的时候,同时会计算出异样值的数量和总量,当后果不合乎预期的时候,这些内容是很重要的查错信息。咱们能够抉择 mongodb,mysql 等存储最终后果。

后果查看局部,我用了 scala 的解析器组合子库来实现,具体见实现。在后果输入局部提到了,后果可能输入多个,那同样的,用户也就能够同时查看多个后果值,提供了 dsl 来实现这项工作。

在这一层,咱们实现了后果输入的形象和后果查看的性能实现。

三、实现

整个数据处理层会应用 spark 来作为计算引擎。依据业务层传来的 Json 参数来进行具体的工作生成。下图是工作参数框架。

  1. 数据源

因为采纳 spark 作为计算引擎,所以从数据源中获取对应的数据用 spark 的算子即可。spark 自身实现对很多数据源的数据提取,所以整个数据源局部的实现绝对简略,能够依据本人理论的需要来编写数据源类。对每种数据源会有本人须要定制化的内容,咱们在 config 参数中进行即可。

我会拿 Hive 数据源解说一下,实现了哪些内容,数据库类的数据源绝对容易实现。针对 Hive 数据源,首先是须要一些过滤条件,这部分以 key value 对的形式来配置。

在数据质量检查中最重要的一个变量就是工夫。对于工夫,次要有 2 个须要解决的点。

1. 因为具体的工夫格局不同,有的是 yyyyMM, yyyy-MM-dd, 或者具体到工夫。咱们采纳的计划是业务层对立传 yyyyMMdd,其为变量 $day,能够在分区字段中应用即可。还提供了 ${latest_partition}变量来获取最新分区。以及对这 2 个变量能够应用 spark udf 来获取指定的工夫格局。

2. 另外一些指标,例如稳定率,是同一个数据 2 个工夫点的比拟。所以还会带入工夫周期的概念。例如每天,每周二,每月 16 日。

  1. 构建计算步骤

这里许多概念和 Apache Griffin 统一,但具体实现有很大差别。

业务层首先会依据触发条件和库表聚合调度,使得同一个数据源同一日期的检测对象一起调度。

检测对象下文都称为 rule。

第一步 rule 的解决
构建的第一步就是对 rule 进行解决,例如 psi 这类的 rule,能够先拆分成多个 rule,例如分布区间的 psi,能够拆分为分布区间和 psi 2 个 rule,这里就存在了依赖关系。构建了一个 rule 的 DAG。

第二步 rule 转换成执行打算
一个 rule 转换成一个执行打算。执行打算的性能就是生成具体的工作步骤。

第三步 生成计算步骤
这一步有很多的优化空间,具体内容见四、优化

计算步骤分为 4 大类:

1. 不须要走 spark 工作计算,例如磁盘容量,间接用 hadoop api 查问即可。

2.spark sql 步骤组就是 2.2 规定指标的计算中提到的,由 4 组成。

3.spark 算子,例如查问 hbase 和数据库,summary 算子之类的操作。

4.spark sql 计算步骤,写好的 spark sql,预留了变量填充。

每一个执行打算会对应一个或者多个计算步骤。如下图,展现了表行数稳定率和异样数据占比率的转换过程。

3 . 计算步骤执行
通过上述的转换,咱们能够失去计算步骤的 DAG,之后对这个 DAG 进行并发执行就能够了。

这样的并发执行能够更充沛的应用 sprak executor 的资源。比方串行执行,200cores 的资源,一个 job 因为数据歪斜等起因,最初 20 个 task 执行较慢。这时候前面的 job 并不会启动,而且可能因为等待时间过长,局部 executor 被开释了。但如果是并发执行,闲暇的 180cores 就会执行其余未实现的 job。

  1. 后果告警
    后果查看局部,我用了 scala 的解析器组合子库来实现,BNF 和规定如下图。失去后果后就会发给邮件指定的负责人,这里也能够反对短信和电话。

  1. 后果输入
    这里是后果落库的步骤,依据不同库的写入形式落库即可。

四、优化
这里的优化都是针对构建计算步骤的。整体的优化方向就是合并 DAG 和复用曾经存在的后果。

  1. 过程复用
    计算步骤 DAG 能够依据节点是否计算逻辑雷同进行合并重组,造成优化的计算步骤 DAG。

如图所示,16 个计算步骤被优化成 11 个计算步骤。被复用的节点会用 df.cache() 算子来进行缓存,减速计算。

这里次要实现有节点的身份信息(其计算逻辑和数据信息),根据节点的身份信息来进行比拟合并。

同时须要根据更新的节点信息来合并多个子 DAG。

2 . 计算步骤 DAG 节点转换

常量替换
计算异样数据稳定率,公式为异样数据量 / 总数据量。如果用户应用该规定,采样了 10w 数据,那总数据量其实并不需要触发计算,能够间接转换成常量。公式就会转换成异样数据量 / 10w。

过往后果复用
表行数稳定率(每日),公式为(今⽇数据量—昨⽇数据量)/ 昨⽇数据量。

7 月 2 日的时候,公式为

7 月 3 日的时候,公式为

能够从中看到 7 月 3 日计算表行数稳定率时,7 月 2 日的 Cnt 能够读取前一天的后果,从而防止计算。但须要留神的是,复用这个后果之前,须要查看前一天后果计算结束之后,分区数据和检测对象配置有没有产生扭转。没有产生扭转才能够复用,否则,必须从新计算。

这种将计算步骤转换成常量读取,来躲避触发计算,也能带来显著的性能晋升。

退出移动版