关于数据仓库:50000字数仓建设保姆级教程离线和实时一网打尽理论实战-下

48次阅读

共计 38171 个字符,预计需要花费 96 分钟才能阅读完成。

本文纲要:

因内容较多,本文将间接从第五章开始,完整版文档请点击下方链接

本文数仓建设保姆级教程残缺 PDF 版

前四章内容在上方链接获取

五、实时数仓建设外围

1. 实时计算初期

尽管实时计算在最近几年才火起来,然而在晚期也有局部公司有实时计算的需要,然而数据量比拟少,所以在实时方面造成不了残缺的体系,根本所有的开发都是具体问题具体分析,来一个需要做一个,根本不思考它们之间的关系,开发模式如下:

如上图所示,拿到数据源后,会通过数据荡涤,扩维,通过 Flink 进行业务逻辑解决,最初间接进行业务输入。把这个环节拆开来看,数据源端会反复援用雷同的数据源,前面进行荡涤、过滤、扩维等操作,都要反复做一遍,惟一不同的是业务的代码逻辑是不一样的。

随着产品和业务人员对实时数据需要的一直增多,这种开发模式呈现的问题越来越多:

  1. 数据指标越来越多,“烟囱式”的开发导致代码耦合问题重大。
  2. 需要越来越多,有的须要明细数据,有的须要 OLAP 剖析。繁多的开发模式难以应酬多种需要。
  3. 每个需要都要申请资源,导致资源老本急速收缩,资源不能粗放无效利用。
  4. 短少欠缺的监控零碎,无奈在对业务产生影响之前发现并修复问题。

大家看实时数仓的倒退和呈现的问题,和离线数仓十分相似,前期数据量大了之后产生了各种问题,离线数仓过后是怎么解决的?离线数仓通过分层架构使数据解耦,多个业务能够共用数据,实时数仓是否也能够用分层架构呢?当然是能够的,然而细节上和离线的分层还是有一些不同,稍后会讲到。

2. 实时数仓建设

从方法论来讲,实时和离线是十分类似的,离线数仓晚期的时候也是具体问题具体分析,当数据规模涨到一定量的时候才会思考如何治理。分层是一种十分无效的数据治理形式,所以在实时数仓如何进行治理的问题上,首先思考的也是分层的解决逻辑。

实时数仓的架构如下图:

从上图中咱们具体分析下每层的作用:

  • 数据源:在数据源的层面,离线和实时在数据源是统一的,次要分为日志类和业务类,日志类又包含用户日志,埋点日志以及服务器日志等。
  • 实时明细层:在明细层,为了解决反复建设的问题,要进行对立构建,利用离线数仓的模式,建设对立的根底明细数据层,依照主题进行治理,明细层的目标是给上游提供间接可用的数据,因而要对根底层进行对立的加工,比方荡涤、过滤、扩维等。
  • 汇总层:汇总层通过 Flink 的简洁算子间接能够算出后果,并且造成汇总指标池,所有的指标都对立在汇总层加工,所有人依照对立的标准治理建设,造成可复用的汇总后果。

咱们能够看出,实时数仓和离线数仓的分层十分相似,比方 数据源层,明细层,汇总层,乃至应用层,他们命名的模式可能都是一样的。但认真比拟不难发现,两者有很多区别:

  • 与离线数仓相比,实时数仓的档次更少一些:

    • 从目前建设离线数仓的教训来看,数仓的数据明细层内容会十分丰盛,解决明细数据外个别还会蕴含轻度汇总层的概念,另外离线数仓中应用层数据在数仓外部,但实时数仓中,app 应用层数据曾经落入利用零碎的存储介质中,能够把该层与数仓的表拆散
    • 应用层少建设的益处:实时处理数据的时候,每建一个档次,数据必然会产生肯定的提早
    • 汇总层少建的益处:在汇总统计的时候,往往为了容忍一部分数据的提早,可能会人为的制作一些提早来保证数据的精确。举例,在统计跨天相干的订单事件中的数据时,可能会等到 00:00:05 或者 00:00:10 再统计,确保 00:00 前的数据曾经全副承受到位了,再进行统计。所以,汇总层的档次太多的话,就会更大的减轻人为造成的数据提早。
  • 与离线数仓相比,实时数仓的数据源存储不同:

    • 在建设离线数仓的时候,根本整个离线数仓都是建设在 Hive 表之上 。然而,在建设实时数仓的时候,同一份表,会应用不同的形式进行存储。比方常见的状况下, 明细数据或者汇总数据都会存在 Kafka 外面,然而像城市、渠道等维度信息须要借助 Hbase,MySQL 或者其余 KV 存储等数据库来进行存储

3. Lambda 架构的实时数仓

Lambda 和 Kappa 架构的概念已在前文中解释,不理解的小伙伴可点击链接:一文读懂大数据实时计算

下图是基于 Flink 和 Kafka 的 Lambda 架构的具体实际,下层是实时计算,上层是离线计算,横向是按计算引擎来分,纵向是按实时数仓来辨别:

Lambda 架构是比拟经典的架构,以前实时的场景不是很多,以离线为主,当附加了实时场景后,因为离线和实时的时效性不同,导致技术生态是不一样的。Lambda 架构相当于附加了一条实时生产链路,在利用层面进行一个整合,双路生产,各自独立。这在业务利用中也是牵强附会采纳的一种形式。

双路生产会存在一些问题,比方加工逻辑 double,开发运维也会 double,资源同样会变成两个资源链路。因为存在以上问题,所以又演进了一个 Kappa 架构。

4. Kappa 架构的实时数仓

Kappa 架构相当于去掉了离线计算局部的 Lambda 架构,具体如下图所示:

Kappa 架构从架构设计来讲比较简单,生产对立,一套逻辑同时生产离线和实时。然而在理论利用场景有比拟大的局限性,因为实时数据的同一份表,会应用不同的形式进行存储,这就导致关联时须要跨数据源,操作数据有很大局限性,所以在业内间接用 Kappa 架构生产落地的案例不多见,且场景比拟繁多。

对于 Kappa 架构,相熟实时数仓生产的同学,可能会有一个疑难。因为咱们常常会面临业务变更,所以很多业务逻辑是须要去迭代的。之前产出的一些数据,如果口径变更了,就须要重算,甚至重刷历史数据。对于实时数仓来说,怎么去解决数据重算问题?

Kappa 架构在这一块的思路是:首先要筹备好一个可能存储历史数据的音讯队列,比方 Kafka,并且这个音讯队列是能够反对你从某个历史的节点从新开始生产的。接着须要新起一个工作,从原来比拟早的一个工夫节点去生产 Kafka 上的数据,而后当这个新的工作运行的进度曾经可能和当初的正在跑的工作齐平的时候,你就能够把当初工作的上游切换到新的工作下面,旧的工作就能够停掉,并且原来产出的后果表也能够被删掉。

5. 流批联合的实时数仓

随着实时 OLAP 技术的倒退,目前开源的 OLAP 引擎在性能,易用等方面有了很大的晋升,如 Doris、Presto 等,加上数据湖技术的迅速倒退,使得流批联合的形式变得简略。

如下图是流批联合的实时数仓:

数据从日志对立采集到音讯队列,再到实时数仓,作为根底数据流的建设是对立的。之后对于日志类实时特色,实时大屏类利用走实时流计算。对于 Binlog 类业务剖析走实时 OLAP 批处理。

咱们看到流批联合的形式与下面几种架构的存储形式产生了变动,由 Kafka 换成了 Iceberg,Iceberg 是介于下层计算引擎和底层存储格局之间的一个中间层,咱们能够把它定义成一种“数据组织格局”,底层存储还是 HDFS,那么为什么加了中间层,就对流批联合解决的比拟好了呢?Iceberg 的 ACID 能力能够简化整个流水线的设计,升高整个流水线的提早,并且所具备的批改、删除能力可能无效地升高开销,晋升效率。Iceberg 能够无效反对批处理的高吞吐数据扫描和流计算按分区粒度并发实时处理。

六、基于 Flink SQL 从 0 到 1 构建一个实时数仓

注:本大节内容来自公众号大数据技术与数仓!

实时数仓次要解决传统数仓数据时效性低的问题,实时数仓通常会用在实时的 OLAP 剖析,实时大屏展现,实时监控报警各个场景。尽管对于实时数仓架构及技术选型与传统的离线数仓会存在差别,然而对于数仓建设的根本方法论是统一的。接下来次要介绍 Flink SQL 从 0 到 1 搭建一个实时数仓的 demo,波及到数据采集、存储、计算、可视化整个流程。

1. 案例简介

本文以电商业务为例,展现实时数仓的数据处理流程。另外,本文旨在阐明实时数仓的构建流程,所以不会波及简单的数据计算。为了保障案例的可操作性和完整性,本文会给出具体的操作步骤。为了不便演示,本文的所有操作都是在 Flink SQL Cli 中实现。

2. 架构设计

具体的架构设计如图所示:首先通过 canal 解析 MySQL 的 binlog 日志,将数据存储在 Kafka 中。而后应用 Flink SQL 对原始数据进行荡涤关联,并将解决之后的明细宽表写入 Kafka 中。维表数据存储在 MySQL 中,通过 Flink SQL 对明细宽表与维表进行 join,将聚合后的数据写入 MySQL,最初通过 FineBI 进行可视化展现。

3. 业务数据筹备

1. 订单表(order_info)

CREATE TABLE `order_info` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `consignee` varchar(100) DEFAULT NULL COMMENT '收货人',
  `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',
  `order_status` varchar(20) DEFAULT NULL COMMENT '订单状态',
  `user_id` bigint(20) DEFAULT NULL COMMENT '用户 id',
  `payment_way` varchar(20) DEFAULT NULL COMMENT '付款形式',
  `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',
  `order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',
  `out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方领取用)',
  `trade_body` varchar(200) DEFAULT NULL COMMENT '订单形容(第三方领取用)',
  `create_time` datetime DEFAULT NULL COMMENT '创立工夫',
  `operate_time` datetime DEFAULT NULL COMMENT '操作工夫',
  `expire_time` datetime DEFAULT NULL COMMENT '生效工夫',
  `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',
  `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',
  `img_url` varchar(200) DEFAULT NULL COMMENT '图片门路',
  `province_id` int(20) DEFAULT NULL COMMENT '地区',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';

2. 订单详情表(order_detail)

CREATE TABLE `order_detail` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',
  `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku 名称(冗余)',
  `img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时 sku 价格)',
  `sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
  `create_time` datetime DEFAULT NULL COMMENT '创立工夫',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单详情表';

3. 商品表(sku_info)

CREATE TABLE `sku_info` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'skuid(itemID)',
  `spu_id` bigint(20) DEFAULT NULL COMMENT 'spuid',
  `price` decimal(10,0) DEFAULT NULL COMMENT '价格',
  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku 名称',
  `sku_desc` varchar(2000) DEFAULT NULL COMMENT '商品规格形容',
  `weight` decimal(10,2) DEFAULT NULL COMMENT '分量',
  `tm_id` bigint(20) DEFAULT NULL COMMENT '品牌(冗余)',
  `category3_id` bigint(20) DEFAULT NULL COMMENT '三级分类 id(冗余)',
  `sku_default_img` varchar(200) DEFAULT NULL COMMENT '默认显示图片(冗余)',
  `create_time` datetime DEFAULT NULL COMMENT '创立工夫',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='商品表';

4. 商品一级类目表(base_category1)

CREATE TABLE `base_category1` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `name` varchar(10) NOT NULL COMMENT '分类名称',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='一级分类表';

5. 商品二级类目表(base_category2)

CREATE TABLE `base_category2` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `name` varchar(200) NOT NULL COMMENT '二级分类名称',
  `category1_id` bigint(20) DEFAULT NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='二级分类表';

6. 商品三级类目表(base_category3)

CREATE TABLE `base_category3` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `name` varchar(200) NOT NULL COMMENT '三级分类名称',
  `category2_id` bigint(20) DEFAULT NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='三级分类表';

7. 省份表(区域表(base_region)base_province)

CREATE TABLE `base_province` (`id` int(20) DEFAULT NULL COMMENT 'id',
  `name` varchar(20) DEFAULT NULL COMMENT '省名称',
  `region_id` int(20) DEFAULT NULL COMMENT '大区 id',
  `area_code` varchar(20) DEFAULT NULL COMMENT '行政区位码'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

8. 区域表(base_region)

CREATE TABLE `base_region` (`id` int(20) NOT NULL COMMENT '大区 id',
  `region_name` varchar(20) DEFAULT NULL COMMENT '大区名称',
   PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

4. 数据处理流程

1. ods 层数据同步

对于 ODS 层的数据同步这里就不具体开展。次要应用 canal 解析 MySQL 的 binlog 日志,而后将其写入到 Kafka 对应的 topic 中。因为篇幅限度,不会对具体的细节进行阐明。同步之后的后果如下图所示:

2. DIM 层数据筹备

本案例中将维表存储在了 MySQL 中,理论生产中会用 HBase 存储维表数据。咱们次要用到两张维表:区域维表和商品维表。处理过程如下:

  • 区域维表

首先将 mydw.base_provincemydw.base_region这个主题对应的数据抽取到 MySQL 中,次要应用 Flink SQL 的 Kafka 数据源对应的 canal-json 格局,留神:在执行装载之前,须要先在 MySQL 中创立对应的表,本文应用的 MySQL 数据库的名字为 dim,用于寄存维表数据。如下:

-- -------------------------
--   省份
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_province`;
CREATE TABLE `ods_base_province` (
  `id` INT,
  `name` STRING,
  `region_id` INT ,
  `area_code`STRING
) WITH(
'connector' = 'kafka',
 'topic' = 'mydw.base_province',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 
 
-- -------------------------
--   省份
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_province`;
CREATE TABLE `base_province` (
    `id` INT,
    `name` STRING,
    `region_id` INT ,
    `area_code`STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_province', -- MySQL 中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);
 
-- -------------------------
--   省份
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_province
SELECT *
FROM ods_base_province;
 
-- -------------------------
--   区域
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_region`;
CREATE TABLE `ods_base_region` (
  `id` INT,
  `region_name` STRING
) WITH(
'connector' = 'kafka',
 'topic' = 'mydw.base_region',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 
 
-- -------------------------
--   区域
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_region`;
CREATE TABLE `base_region` (
    `id` INT,
    `region_name` STRING,
     PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_region', -- MySQL 中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);
 
-- -------------------------
--   区域
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_region
SELECT *
FROM ods_base_region;

通过下面的步骤,将创立维表所须要的原始数据曾经存储到了 MySQL 中,接下来就须要在 MySQL 中创立维表,咱们应用下面的两张表,创立一张视图:dim_province作为维表:

-- ---------------------------------
-- DIM 层, 区域维表,
-- 在 MySQL 中创立视图
-- ---------------------------------
DROP VIEW IF EXISTS dim_province;
CREATE VIEW dim_province AS
SELECT
  bp.id AS province_id,
  bp.name AS province_name,
  br.id AS region_id,
  br.region_name AS region_name,
  bp.area_code AS area_code
FROM base_region br 
     JOIN base_province bp ON br.id= bp.region_id;

这样咱们所须要的维表:dim_province就创立好了,只须要在维表 join 时,应用 Flink SQL 创立 JDBC 的数据源,就能够应用该维表了。同理,咱们应用雷同的办法创立商品维表,具体如下:

-- -------------------------
--  一级类目表
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_category1`;
CREATE TABLE `ods_base_category1` (
  `id` BIGINT,
  `name` STRING
)WITH(
 'connector' = 'kafka',
 'topic' = 'mydw.base_category1',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ;
 
-- -------------------------
--  一级类目表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_category1`;
CREATE TABLE `base_category1` (
    `id` BIGINT,
    `name` STRING,
     PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_category1', -- MySQL 中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);
 
-- -------------------------
--  一级类目表
--   MySQL Sink Load Data
-- ------------------------- 
 
INSERT INTO base_category1
SELECT *
FROM ods_base_category1;
 
-- -------------------------
--  二级类目表
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_category2`;
CREATE TABLE `ods_base_category2` (
  `id` BIGINT,
  `name` STRING,
  `category1_id` BIGINT
)WITH(
'connector' = 'kafka',
 'topic' = 'mydw.base_category2',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ;
 
-- -------------------------
--  二级类目表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_category2`;
CREATE TABLE `base_category2` (
    `id` BIGINT,
    `name` STRING,
    `category1_id` BIGINT,
     PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_category2', -- MySQL 中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);
 
-- -------------------------
--  二级类目表
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_category2
SELECT *
FROM ods_base_category2;
 
-- -------------------------
-- 三级类目表
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_category3`;
CREATE TABLE `ods_base_category3` (
  `id` BIGINT,
  `name` STRING,
  `category2_id` BIGINT
)WITH(
'connector' = 'kafka',
 'topic' = 'mydw.base_category3',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 
 
-- -------------------------
--  三级类目表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_category3`;
CREATE TABLE `base_category3` (
    `id` BIGINT,
    `name` STRING,
    `category2_id` BIGINT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_category3', -- MySQL 中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);
 
-- -------------------------
--  三级类目表
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_category3
SELECT *
FROM ods_base_category3;
 
-- -------------------------
--   商品表
--   Kafka Source
-- ------------------------- 
 
DROP TABLE IF EXISTS `ods_sku_info`;
CREATE TABLE `ods_sku_info` (
  `id` BIGINT,
  `spu_id` BIGINT,
  `price` DECIMAL(10,0),
  `sku_name` STRING,
  `sku_desc` STRING,
  `weight` DECIMAL(10,2),
  `tm_id` BIGINT,
  `category3_id` BIGINT,
  `sku_default_img` STRING,
  `create_time` TIMESTAMP(0)
) WITH(
 'connector' = 'kafka',
 'topic' = 'mydw.sku_info',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 
 
-- -------------------------
--   商品表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `sku_info`;
CREATE TABLE `sku_info` (
  `id` BIGINT,
  `spu_id` BIGINT,
  `price` DECIMAL(10,0),
  `sku_name` STRING,
  `sku_desc` STRING,
  `weight` DECIMAL(10,2),
  `tm_id` BIGINT,
  `category3_id` BIGINT,
  `sku_default_img` STRING,
  `create_time` TIMESTAMP(0),
   PRIMARY KEY (tm_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'sku_info', -- MySQL 中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);
 
-- -------------------------
--   商品
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO sku_info
SELECT *
FROM ods_sku_info;

通过下面的步骤,咱们能够将创立商品维表的根底数据表同步到 MySQL 中,同样须要提前创立好对应的数据表。接下来咱们应用下面的根底表在 mySQL 的 dim 库中创立一张视图:dim_sku_info,用作后续应用的维表。

-- ---------------------------------
-- DIM 层, 商品维表,
-- 在 MySQL 中创立视图
-- ---------------------------------
CREATE VIEW dim_sku_info AS
SELECT
  si.id AS id,
  si.sku_name AS sku_name,
  si.category3_id AS c3_id,
  si.weight AS weight,
  si.tm_id AS tm_id,
  si.price AS price,
  si.spu_id AS spu_id,
  c3.name AS c3_name,
  c2.id AS c2_id,
  c2.name AS c2_name,
  c3.id AS c1_id,
  c3.name AS c1_name
FROM
(
  sku_info si 
  JOIN base_category3 c3 ON si.category3_id = c3.id
  JOIN base_category2 c2 ON c3.category2_id =c2.id
  JOIN base_category1 c1 ON c2.category1_id = c1.id
);

至此,咱们所须要的维表数据曾经筹备好了,接下来开始解决 DWD 层的数据。

3. DWD 层数据处理

通过下面的步骤,咱们曾经将所用的维表曾经筹备好了。接下来咱们将对 ODS 的原始数据进行解决,加工成 DWD 层的明细宽表。具体过程如下:

-- -------------------------
--   订单详情
--   Kafka Source
-- ------------------------- 
 
DROP TABLE IF EXISTS `ods_order_detail`;
CREATE TABLE `ods_order_detail`(
  `id` BIGINT,
  `order_id` BIGINT,
  `sku_id` BIGINT,
  `sku_name` STRING,
  `img_url` STRING,
  `order_price` DECIMAL(10,2),
  `sku_num` INT,
  `create_time` TIMESTAMP(0)
) WITH(
 'connector' = 'kafka',
 'topic' = 'mydw.order_detail',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 
 
-- -------------------------
--   订单信息
--   Kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_order_info`;
CREATE TABLE `ods_order_info` (
  `id` BIGINT,
  `consignee` STRING,
  `consignee_tel` STRING,
  `total_amount` DECIMAL(10,2),
  `order_status` STRING,
  `user_id` BIGINT,
  `payment_way` STRING,
  `delivery_address` STRING,
  `order_comment` STRING,
  `out_trade_no` STRING,
  `trade_body` STRING,
  `create_time` TIMESTAMP(0) ,
  `operate_time` TIMESTAMP(0) ,
  `expire_time` TIMESTAMP(0) ,
  `tracking_no` STRING,
  `parent_order_id` BIGINT,
  `img_url` STRING,
  `province_id` INT
) WITH(
'connector' = 'kafka',
 'topic' = 'mydw.order_info',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 
 
-- ---------------------------------
-- DWD 层, 领取订单明细表 dwd_paid_order_detail
-- ---------------------------------
DROP TABLE IF EXISTS dwd_paid_order_detail;
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,0),
  create_time STRING,
  pay_time STRING
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'dwd_paid_order_detail',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- DWD 层, 已领取订单明细表
-- 向 dwd_paid_order_detail 装载数据
-- ---------------------------------
INSERT INTO dwd_paid_order_detail
SELECT
  od.id,
  oi.id order_id,
  oi.user_id,
  oi.province_id,
  od.sku_id,
  od.sku_name,
  od.sku_num,
  od.order_price,
  oi.create_time,
  oi.operate_time
FROM
    (
    SELECT * 
    FROM ods_order_info
    WHERE order_status = '2' -- 已领取
    ) oi JOIN
    (
    SELECT *
    FROM ods_order_detail
    ) od 
    ON oi.id = od.order_id;

4. ADS 层数据

通过下面的步骤,咱们创立了一张 dwd_paid_order_detail 明细宽表,并将该表存储在了 Kafka 中。接下来咱们将应用这张明细宽表与维表进行 JOIN,失去咱们 ADS 应用层数据。

  • ads_province_index

首先在 MySQL 中创立对应的 ADS 指标表:ads_province_index

CREATE TABLE ads.ads_province_index(province_id INT(10),
  area_code VARCHAR(100),
  province_name VARCHAR(100),
  region_id INT(10),
  region_name VARCHAR(100),
  order_amount DECIMAL(10,2),
  order_count BIGINT(10),
  dt VARCHAR(100),
  PRIMARY KEY (province_id, dt) 
) ;

向 MySQL 的 ADS 层指标装载数据:

-- Flink SQL Cli 操作
-- ---------------------------------
-- 应用 DDL 创立 MySQL 中的 ADS 层表
-- 指标:1. 每天每个省份的订单数
--      2. 每天每个省份的订单金额
-- ---------------------------------
CREATE TABLE ads_province_index(
  province_id INT,
  area_code STRING,
  province_name STRING,
  region_id INT,
  region_name STRING,
  order_amount DECIMAL(10,2),
  order_count BIGINT,
  dt STRING,
  PRIMARY KEY (province_id, dt) NOT ENFORCED  
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/ads',
    'table-name' = 'ads_province_index', 
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe'
);
-- ---------------------------------
-- dwd_paid_order_detail 已领取订单明细宽表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,2),
  create_time STRING,
  pay_time STRING
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'dwd_paid_order_detail',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
 
-- ---------------------------------
-- tmp_province_index
-- 订单汇总长期表
-- ---------------------------------
CREATE TABLE tmp_province_index(
    province_id INT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    pay_date DATE
)WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_province_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_province_index
-- 订单汇总长期表数据装载
-- ---------------------------------
INSERT INTO tmp_province_index
SELECT
      province_id,
      count(distinct order_id) order_count,-- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
      TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY province_id,TO_DATE(pay_time,'yyyy-MM-dd')
;
-- ---------------------------------
-- tmp_province_index_source
-- 应用该长期汇总表,作为数据源
-- ---------------------------------
CREATE TABLE tmp_province_index_source(
    province_id INT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    pay_date DATE,
    proctime as PROCTIME()   -- 通过计算列产生一个解决工夫列) WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_province_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
 
-- ---------------------------------
-- DIM 层, 区域维表,
-- 创立区域维表数据源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_province`;
CREATE TABLE dim_province (
  province_id INT,
  province_name STRING,
  area_code STRING,
  region_id INT,
  region_name STRING ,
  PRIMARY KEY (province_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'dim_province', 
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'scan.fetch-size' = '100'
);
 
-- ---------------------------------
-- 向 ads_province_index 装载数据
-- 维表 JOIN
-- ---------------------------------
 
INSERT INTO ads_province_index
SELECT
  pc.province_id,
  dp.area_code,
  dp.province_name,
  dp.region_id,
  dp.region_name,
  pc.order_amount,
  pc.order_count,
  cast(pc.pay_date as VARCHAR)
FROM
tmp_province_index_source pc
  JOIN dim_province FOR SYSTEM_TIME AS OF pc.proctime as dp 
  ON dp.province_id = pc.province_id;

当提交工作之后:察看 Flink WEB UI

查看 ADS 层的 ads_province_index 表数据:

  • ads_sku_index

首先在 MySQL 中创立对应的 ADS 指标表:ads_sku_index

CREATE TABLE ads_sku_index
(sku_id BIGINT(10),
  sku_name VARCHAR(100),
  weight DOUBLE,
  tm_id BIGINT(10),
  price DOUBLE,
  spu_id BIGINT(10),
  c3_id BIGINT(10),
  c3_name VARCHAR(100) ,
  c2_id BIGINT(10),
  c2_name VARCHAR(100),
  c1_id BIGINT(10),
  c1_name VARCHAR(100),
  order_amount DOUBLE,
  order_count BIGINT(10),
  sku_count BIGINT(10),
  dt varchar(100),
  PRIMARY KEY (sku_id,dt)
);

向 MySQL 的 ADS 层指标装载数据:

-- ---------------------------------
-- 应用 DDL 创立 MySQL 中的 ADS 层表
-- 指标:1. 每天每个商品对应的订单个数
--      2. 每天每个商品对应的订单金额
--      3. 每天每个商品对应的数量
-- ---------------------------------
CREATE TABLE ads_sku_index
(
  sku_id BIGINT,
  sku_name VARCHAR,
  weight DOUBLE,
  tm_id BIGINT,
  price DOUBLE,
  spu_id BIGINT,
  c3_id BIGINT,
  c3_name VARCHAR ,
  c2_id BIGINT,
  c2_name VARCHAR,
  c1_id BIGINT,
  c1_name VARCHAR,
  order_amount DOUBLE,
  order_count BIGINT,
  sku_count BIGINT,
  dt varchar,
  PRIMARY KEY (sku_id,dt) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/ads',
    'table-name' = 'ads_sku_index', 
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe'
);
 
-- ---------------------------------
-- dwd_paid_order_detail 已领取订单明细宽表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,2),
  create_time STRING,
  pay_time STRING
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'dwd_paid_order_detail',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
 
-- ---------------------------------
-- tmp_sku_index
-- 商品指标统计
-- ---------------------------------
CREATE TABLE tmp_sku_index(
    sku_id BIGINT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
 order_sku_num BIGINT,
    pay_date DATE
)WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_sku_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_sku_index
-- 数据装载
-- ---------------------------------
INSERT INTO tmp_sku_index
SELECT
      sku_id,
      count(distinct order_id) order_count,-- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
   sum(sku_num) order_sku_num,
      TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY sku_id,TO_DATE(pay_time,'yyyy-MM-dd')
;
 
-- ---------------------------------
-- tmp_sku_index_source
-- 应用该长期汇总表,作为数据源
-- ---------------------------------
CREATE TABLE tmp_sku_index_source(
    sku_id BIGINT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    order_sku_num BIGINT,
    pay_date DATE,
    proctime as PROCTIME()   -- 通过计算列产生一个解决工夫列) WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_sku_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- DIM 层, 商品维表,
-- 创立商品维表数据源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_sku_info`;
CREATE TABLE dim_sku_info (
  id BIGINT,
  sku_name STRING,
  c3_id BIGINT,
  weight DECIMAL(10,2),
  tm_id BIGINT,
  price DECIMAL(10,2),
  spu_id BIGINT,
  c3_name STRING,
  c2_id BIGINT,
  c2_name STRING,
  c1_id BIGINT,
  c1_name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'dim_sku_info', 
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'scan.fetch-size' = '100'
);
-- ---------------------------------
-- 向 ads_sku_index 装载数据
-- 维表 JOIN
-- ---------------------------------
INSERT INTO ads_sku_index
SELECT
  sku_id ,
  sku_name ,
  weight ,
  tm_id ,
  price ,
  spu_id ,
  c3_id ,
  c3_name,
  c2_id ,
  c2_name ,
  c1_id ,
  c1_name ,
  sc.order_amount,
  sc.order_count ,
  sc.order_sku_num ,
  cast(sc.pay_date as VARCHAR)
FROM
tmp_sku_index_source sc 
  JOIN dim_sku_info FOR SYSTEM_TIME AS OF sc.proctime as ds
  ON ds.id = sc.sku_id;

当提交工作之后:察看 Flink WEB UI

查看 ADS 层的 ads_sku_index 表数据

5. FineBI 展现

七、数据治理

数仓建设真正的难点不在于数仓设计,而在于后续业务倒退起来,业务线变的宏大之后的数据治理,包含资产治理、数据品质监控、数据指标体系的建设等。

其实数据治理的范畴很⼴,蕴含数据本⾝的治理、数据安全、数据品质、数据老本等。在 DAMA 数据管理常识体系指南 中,数据治理位于数据管理“车轮图”的正地方,是数据架构、数据建模、数据存储、数据安全、数据品质、元数据管理、主数据管理等 10 大数据管理畛域的总纲,为各项数据管理流动提供总体领导策略。

1. 数据治理之道是什么

1. 数据治理须要体系建设

为施展数据价值须要满足三个因素:正当的平台架构、欠缺的治理服务、体系化的经营伎俩

依据企业的规模、所属行业、数据量等状况抉择适合的平台架构;治理服务须要贯通数据全生命周期,保证数据在采集、加工、共享、存储、利用整个过程中的完整性、准确性、一致性和实效性;经营伎俩则该当包含标准的优化、组织的优化、平台的优化以及流程的优化等等方面。

2. 数据治理须要夯实根底

数据治理须要循序渐进,但在建设初期至多须要关注三个方面:数据标准、数据品质、数据安全。规范化的模型治理是保障数据能够被治理的前提条件,高质量的数据是数据可用的前提条件,数据的平安管控是数据能够共享替换的前提条件。

3. 数据治理须要 IT 赋能

数据治理不是一堆标准文档的堆砌,而是须要将治理过程中所产生的的标准、流程、规范落地到 IT 平台上,在数据生产过程中通过“以终为始”前向的形式进行数据治理,防止预先稽核带来各种被动和运维老本的减少。

4. 数据治理须要聚焦数据

数据治理的实质是治理数据,因而须要增强元数据管理和主数据管理,从源头治理数据,补齐数据的相干属性和信息,比方:元数据、品质、平安、业务逻辑、血统等,通过元数据驱动的形式治理数据生产、加工和应用。

5. 数据治理须要建管一体化

数据模型血统与任务调度的一致性是建管一体化的要害,有助于解决数据管理与数据生产口径不统一的问题,避免出现两张皮的低效管理模式。

2. 浅谈数据治理形式

如下面所说,数据治理的范畴十分广,其中最重要的是数据品质治理,而数据品质波及的范畴也很广,贯通数仓的整个生命周期,从 数据产生 -> 数据接入 -> 数据存储 -> 数据处理 -> 数据输入 -> 数据展现 ,每个阶段都须要品质治理,评估维度包含 完整性、规范性、一致性、准确性、唯一性、关联性 等。

在零碎建设的各个阶段都应该依据规范进行数据品质检测和标准,及时进行治理,防止预先的荡涤工作。

品质检测可参考以下维度:

维度 衡量标准
完整性 业务指定必须的数据是否缺失,不容许为空字符或者空值等。例如,数据源是否残缺、维度取值是否残缺、数据取值是否残缺等
时效性 当须要应用时,数据是否反映以后事实。即数据必须及时,可能满足系统对数据工夫的要求。例如解决(获取、整顿、荡涤、加载等)的及时性
唯一性 在指定的数据集中数据值是否惟一
参照完整性 数据项是否在父表中有定义
依赖一致性 数据项取值是否满足与其余数据项之间的依赖关系
正确性 数据内容和定义是否统一
精确性 数据精度是否达到业务规定要求的位数
技术有效性 数据项是否按已定义的格局规范组织
业务有效性 数据项是否合乎已定义的
可信度 依据客户考察或客户被动提供取得
可用性 数据可用的工夫和数据须要被拜访工夫的比例
可拜访性 数据是否便于自动化读取

上面是依据美团的技术文章总结的几点具体治理形式:

1. 标准治理

标准是数仓建设的保障。为了避免出现指标反复建设和数据品质差的状况,对立依照最具体、可落地的办法进行标准建设。

(1) 词根

词根是维度和指标治理的根底,划分为一般词根与专有词根,进步词根的易用性和关联性。

  • 一般词根:形容事物的最小单元体,如:交易 -trade。
  • 专有词根:具备约定成俗或行业专属的形容体,如:美元 -USD。

(2) 表命名标准

通用标准

  • 表名、字段名采纳一个下划线分隔词根(示例:clienttype->client_type)。
  • 每局部应用小写英文单词,属于通用字段的必须满足通用字段信息的定义。
  • 表名、字段名需以字母为结尾。
  • 表名、字段名最长不超过 64 个英文字符。
  • 优先应用词根中已有关键字(数仓标准配置中的词根治理),定期 Review 新增命名的不合理性。
  • 在表名自定义局部禁止采纳非标准的缩写。

表命名规定

  • 表名称 = 类型 + 业务主题 + 子主题 + 表含意 + 存储格局 + 更新频率 + 结尾,如下图所示:

(3) 指标命名标准

联合指标的个性以及词根治理标准,将指标进行结构化解决。

  1. 根底指标词根,即所有指标必须蕴含以下根底词根:

  1. 业务修饰词,用于形容业务场景的词汇,例如 trade- 交易。

3. 日期修饰词,用于润饰业务产生的工夫区间。

4. 聚合修饰词,对后果进行汇集操作。

5. 根底指标,繁多的业务修饰词 + 根底指标词根构建根底指标,例如:交易金额 -trade_amt。

6. 派生指标,多修饰词 + 根底指标词根构建派生指标。派生指标继承根底指标的个性,例如:装置门店数量 -install_poi_cnt。

7. 一般指标命名标准,与字段命名标准统一,由词汇转换即能够。

2. 架构治理

(1) 数据分层

优良牢靠的数仓体系,往往须要清晰的数据分层构造,即要保证数据层的稳固又要屏蔽对上游的影响,并且要防止链路过长,个别的分层架构如下:

(2) 数据流向

稳固业务依照规范的数据流向进行开发,即 ODS–>DWD–>DWA–>APP。非稳固业务或探索性需要,能够遵循 ODS->DWD->APP 或者 ODS->DWD->DWT->APP 两个模型数据流。在保障了数据链路的合理性之后,又在此基础上确认了模型分层援用准则:

  • 失常流向:ODS>DWD->DWT->DWA->APP,当呈现 ODS >DWD->DWA->APP 这种关系时,阐明主题域未笼罩全。应将 DWD 数据落到 DWT 中,对于应用频度非常低的表容许 DWD->DWA。
  • 尽量避免呈现 DWA 宽表中应用 DWD 又应用(该 DWD 所归属主题域)DWT 的表。
  • 同一主题域内对于 DWT 生成 DWT 的表,原则上要尽量避免,否则会影响 ETL 的效率。
  • DWT、DWA 和 APP 中禁止间接应用 ODS 的表,ODS 的表只能被 DWD 援用。
  • 禁止呈现反向依赖,例如 DWT 的表依赖 DWA 的表。
3. 元数据治理

元数据可分为技术元数据和业务元数据:

技术元数据 为开发和治理数据仓库的 IT 人员应用,它形容了与数据仓库开发、治理和保护相干的数据,包含数据源信息、数据转换形容、数据仓库模型、数据荡涤与更新规定、数据映射和拜访权限等。

常见的技术元数据有:

  • 存储元数据:如表、字段、分区等信息。
  • 运行元数据:如大数据平台上所有作业运行等信息:相似于 Hive Job 日志,包含作业类型、实例名称、输入输出、SQL、运行参数、执行工夫,执行引擎等。
  • 数据开发平台中数据同步、计算工作、任务调度等信息:包含数据同步的输入输出表和字段,以及同步工作自身的节点信息:计算工作次要有输入输出、工作自身的节点信息 任务调度次要有工作的依赖类型、依赖关系等,以及不同类型调度工作的运行日志等。
  • 数据品质和运维相干元数据:如工作监控、运维报警、数据品质、故障等信息,包含工作监控运行日志、告警配置及运行日志、故障信息等。

业务元数据 为管理层和业务剖析人员服务,从业务角度形容数据,包含商务术语、数据仓库中有什么数据、数据的地位和数据的可用性等,帮忙业务人员更好地了解数据仓库中哪些数据是可用的以及如何应用。

  • 常见的业务元数据有维度及属性(包含维度编码,字段类型,创建人,创立工夫,状态等)、业务过程、指标(蕴含指标名称, 指标编码,业务口径,指标类型,责任人,创立工夫,状态,sql 等),安全等级,计算逻辑等的规范化定义,用于更好地治理和应用数据。数据利用元数据,如数据报表、数据产品等的配置和运行元数据。

元数据不仅定义了数据仓库中数据的模式、起源、抽取和转换规则等,而且是整个数据仓库零碎运行的根底,元数据把数据仓库零碎中各个涣散的组件分割起来,组成了一个有机的整体

元数据治理次要解决三个问题

  1. 通过建设相应的组织、流程和工具,推动业务规范的落地施行,实现指标的标准定义,打消指标认知的歧义;
  2. 基于业务现状和将来的演进形式,对业务模型进行形象,制订清晰的主题、业务过程和剖析方向,构建齐备的技术元数据,对物理模型进行精确欠缺的形容,并买通技术元数据与业务元数据的关系,对物理模型进行齐备的刻画;
  3. 通过元数据建设,为应用数据提效,解决“找数、了解数、评估”难题以及“取数、数据可视化”等难题。
4. 平安治理

围绕数据安全规范,首先要有数据的分级、分类规范,确保数据在上线前有着精确的密级。第二,针对数据应用方,要有明确的角色受权规范,通过分级分类和角色受权,来保障重要数据拿不走。第三,针对敏感数据,要有隐衷治理规范,保障敏感数据的平安存储,即便未受权用户绕过权限治理拿到敏感数据,也要确保其看不懂。第四,通过制订审计规范,为后续的审计提供审计根据,确保数据走不脱。

5. 数据生命周期治理

任何事物都具备肯定的生命周期,数据也不例外。从数据的产生、加工、应用乃至沦亡都应该有一个迷信的治理方法,将极少或者不再应用的数据从零碎中剥离进去,并通过核实的存储设备进行保留,不仅可能进步零碎的运行效率,更好的服务客户,还能大幅度缩小因为数据长期保留带来的贮存老本。数据生命周期个别蕴含在线阶段、归档阶段(有时还会进一步划分为在线归档阶段和离线归档阶段)、销毁阶段三大阶段,治理内容包含建设正当的数据类别,针对不同类别的数据制订各个阶段的保留工夫、存储介质、清理规定和形式、注意事项等。

从上图数据生命周期中各参数间的关系中咱们能够理解到,数据生命周期治理能够使得高价值数据的查问效率大幅晋升,而且高价格的存储介质的洽购量也能够缩小很多;然而随着数据的应用水平的降落,数据被逐步归档,查问工夫也缓缓的变长;最初随着数据的应用频率和价值根本没有了之后,就能够逐步销毁了。


猜你喜爱:

  1. 美团数据平台及数仓建设实际,超十万字总结
  2. 上百本优质大数据书籍,附必读清单(大数据宝藏)
  3. 五万字 | 耗时一个月整顿出这份 Hadoop 吐血宝典

八、数据品质建设

数据治理的范畴十分广,蕴含数据本⾝的治理、数据安全、数据品质、数据老本等。在这么多治理内容中,大家想下最重要的治理是什么?当然是 数据品质治理,因为数据品质是数据分析论断有效性和准确性的根底,也是这所有的前提。所以如何保障数据品质,确保数据可用性是数据仓库建设中不容忽视的环节。

数据品质波及的范畴也很广,贯通数仓的整个生命周期,从 数据产生 -> 数据接入 -> 数据存储 -> 数据处理 -> 数据输入 -> 数据展现,每个阶段都须要品质治理。

在零碎建设的各个阶段都应该依据规范进行数据品质检测和标准,及时进行治理,防止预先的荡涤工作。

本文档首发于公众号【五分钟学大数据】,残缺的数据治理及数仓建设文章公众号上都有!

1. 为什么要进行数据品质评估

很多刚入门的数据人,拿到数据后会立即开始对数据进行各种探查、统计分析等,希图能立刻发现数据背地暗藏的信息和常识。然而忙活了一阵才颓然发现,并不能提炼出太多有价值的信息,白白浪费了大量的工夫和精力。比方和数据打交道的过程中,可能会呈现以下的场景:

场景一:作为数据分析人员,要统计一下近 7 天用户的购买状况,后果从数仓中统计完发现,很多数据产生了重复记录,甚至有些数据统计单位不对立。

场景二:业务看报表,发现某一天的成交 gmv 暴涨,通过排查发现,是当天的数据缺失。

造成这一状况的一个重要因素就是漠视了对数据品质的主观评估,没有制订正当的衡量标准,导致没有发现数据已呈现问题。所以,进行迷信、主观的数据品质衡量标准是十分必要且非常重要的。

2. 数据品质衡量标准

如何评估数据品质的好坏,业界有不同的规范,我总结了以下六个维度进行评估,包含 完整性、规范性、一致性、准确性、唯一性、及时性

  1. 数据完整性

完整性指的是数据信息是否存在缺失的情况,数据缺失的状况可能是整个数据记录缺失,也可能是数据中某个字段信息的记录缺失。

  1. 数据规范性

规范性指的是形容数据遵循预约的语法规定的水平,是否合乎其定义,比方数据的类型、格局、取值范畴等。

  1. 数据一致性

一致性是指数据是否遵循了对立的标准,数据汇合是否放弃了对立的格局。数据品质的一致性次要体现在数据记录的标准和数据是否合乎逻辑,一致性并不意味着数值上的相对雷同,而是数据收集、解决的办法和规范的统一。常见的一致性指标有:ID 重合度、属性统一、取值统一、采集办法统一、转化步骤统一。

  1. 数据准确性

准确性是指数据记录的信息是否存在异样或谬误。和一致性不一样,存在准确性问题的数据不仅仅只是规定上的不统一,更为常见的数据准确性谬误就如乱码,其次异样的大或者小的数据也是不符合条件的数据。常见的准确性指标有:缺失值占比、谬误值占比、异样值占比、抽样偏差、数据噪声。

  1. 数据唯一性

唯一性指的是数据库的数据不存在反复的情景。比方实在成交 1 万条,但数据表有 3000 条反复了,成了 1.3 万条成交记录,这种数据不合乎数据唯一性。

  1. 数据及时性

及时性是指数据从产生到能够查看的工夫距离,也叫数据的延时时长。比方一份数据是统计离线今日的,后果都是第二天甚至第三天能力统计完,这种数据不合乎数据及时性。

_还有一些其余的衡量标准,在此简略列出_:

维度 衡量标准
参照完整性 数据项是否在父表中有定义
依赖一致性 数据项取值是否满足与其余数据项之间的依赖关系
正确性 数据内容和定义是否统一
精确性 数据精度是否达到业务规定要求的位数
技术有效性 数据项是否按已定义的格局规范组织
业务有效性 数据项是否合乎已定义的
可信度 依据客户考察或客户被动提供取得
可用性 数据可用的工夫和数据须要被拜访工夫的比例
可拜访性 数据是否便于自动化读取

3. 数据品质治理流程

本节流程如下图所示:

1. 数据资产等级

1) 等级定义

依据 当数据品质不满足完整性、规范性、一致性、准确性、唯一性、及时性时,对业务的影响水平大小 来划分数据的资产等级。

  1. 毁灭性:数据一旦出错,会引起微小的资产损失,面临重大收益受损等。标记为 L1
  2. 全局性:数据用于团体业务、企业级成果评估和重要决策工作等。标记为 L2
  3. 局部性:数据用于某个业务线的日常经营、剖析报告等,如果呈现问题会给该业务线造成肯定的影响或影响其工作效率。标记为 L3
  4. 一般性:数据用于日常数据分析,呈现问题的带来的影响很小。标记为 L4
  5. 未知性质:无奈追溯数据的利用场景。标记为 Lx

重要水平:L1>L2>L3>L4>Lx。如果一份数据呈现在多个利用场景中,则依据其最重要水平进行标记。

2) 等级划分

定义数据资产等级后,咱们能够从数据流程链路开始进行数据资产等级标记,实现数据资产等级确认,给不同的数据定义不同的重要水平。

1. 剖析数据链路

数据是从业务零碎中产生的,通过同步工具进入数据仓库零碎中,在数据仓库中进行个别意义上的荡涤、加工、整合、算法、模型等一系列运算后,再通过同步工具输入到数据产品中进行生产。而从业务零碎到数据仓库再到数据产品都是以表的模式体现的,其流转过程如下图所示:

2. 标记数据资产等级

在所有数据链路上,整顿出生产各个表的利用业务。通过给这些利用业务划分数据资产等级,联合数据的上下游依赖关系,将整个链路打上某一类资产等级标签。

举例

假如公司有对立的订单服务中心。应用层的利用业务是依照业务线,商品类型和地区统计公司的订单数量和订单金额,命名为order_num_amount

假如该利用会影响到整个企业的重要业务决策,咱们能够把利用定级为 L2,从而 整个数据链路上的表的数据等级,都能够标记为L2-order_num_amount,始终标记到源数据业务零碎,如下图所示:

2. 数据加工过程卡点校验

1) 在线零碎数据校验

在线业务复杂多变,总是在一直地变更,每一次变更都会带来数据的变动,数据仓库须要适应这多变的业务倒退,及时做到数据的准确性。

基于此,在线业务的变更如何高效地告诉到离线数据仓库,同样也是须要思考的问题。为了保障在线数据和离线数据的一致性,咱们能够通过 工具 + 人员治理并行的形式 来尽可能的解决以上问题:既要在工具上主动捕获每一次业务的变动,同时也要求开发人员在意识上主动进行业务变更告诉。

1. 业务上线公布平台

监控业务上线公布平台上的重大业务变更,通过订阅这个公布过程,及时将变更内容告诉到数据部门。

因为业务零碎复杂多变,若日常公布变更频繁,那么每次都告诉数据部门,会造成不必要的资源节约。这时,咱们能够应用 之前曾经实现标记的数据资产等级标签,针对波及高等级数据利用的数据资产,整顿出哪些类型的业务变更会影响数据的加工或者影响数据统计口径的调整,则这些状况都必须及时告诉到数据部门。

如果公司没有本人的业务公布平台,那么就须要与业务部门约定好,_针对高等级的数据资产的业务变更,须要以邮件或者其余书面的阐明及时反馈到数据部门_。

2. 操作人员治理

工具只是辅助监管的一种伎俩,而应用工具的人员才是外围。数据资产等级的上下游买通过程须要告诉给在线业务零碎开发人员,使其晓得哪些是重要的外围数据资产,哪些临时还只是作为外部剖析数据应用,进步在线开发人员的数据风险意识。

能够通过培训的形式,_把数据品质治理的诉求,数据品质治理的整个数据加工过程,以及数据产品的利用形式及利用场景告知在线开发人员,使其理解数据的重要性、价值及危险_。确保在线开发人员在实现业务指标的同时,也要思考数据的指标,放弃业务端和数据段统一。

2) 离线零碎数据校验

数据从在线业务零碎到数据仓库再到数据产品的过程中,须要在数据仓库这一层实现数据的荡涤、加工。正是有了数据的加工,才有了数据仓库模型和数据仓库代码的建设。如何保障数据加过程中的品质,是离线数据仓库保障数据品质的一个重要环节。

在这些环节中,咱们能够采纳以下形式来保障数据品质:

  1. 代码提交核查

开发相干的规定引擎,辅助代码提交校验。规定分类大抵为:

  • _代码标准类规定_:如表命名标准、字段命名标准、生命周期设置、表正文等;
  • _代码品质类规定_:如分母为 0 揭示、NUll 值参加计算揭示等;
  • _代码性能类规定_:如大表揭示、反复计算监测、大小表 join 操作揭示等。
  1. 代码公布核查

增强测试环节,测试环境测试后再公布到生成环境,且生成环境测试通过后才算公布胜利。

  1. 工作变更或重跑数据

在进行数据更新操作前,须要告诉上游数据变更起因、变更逻辑、变更工夫等信息。上游没有异议后,再依照约定工夫执行变更公布操作。

3. 数据处理危险监控

危险点监控次要是针对数据在日常运行过程中容易呈现的危险进行监控并设置报警机制,次要包含 在线数据 离线数据 运行危险点监控。

1) 数据品质监控

在线业务零碎 的数据生产过程须要保证数据品质,次要依据业务规定对数据进行监控。

比方交易系统配置的一些监控规定,如订单拍下工夫、订单完结工夫、订单领取金额、订单状态流转等都配置了校验规定。订单拍下工夫必定不会大于当天工夫,也不会小于业务上线工夫,一旦出现异常的订单创立工夫,就会立即报警,同时报警给到多人。通过这种机制,能够及时发现并解决问题。

随着业务负责水平的晋升,会导致规定繁多、规定配置的运行老本增大,这时能够 依照咱们之前的数据资产等级有针对性的进行监控

离线数据 危险点监控次要包含对数据准确性和数据产出及时性的监控。对数据调度平台上所有数据处理调度进行监控。

咱们以阿里的 DataWorks 数据调度工具为例,DataWorks 是基于 MaxCompute 计算引擎的一站式开发工场,帮忙企业疾速实现数据集成、开发、治理、品质、平安等全套数据研发工作。

DataWorks 中的 DQC 通过配置数据品质校验规定,实现离线数据处理中的数据品质监控报警机制。

下图是 DQC 的工作流程图:

DQC 数据监控规定有强规定和弱规定:

  • 强规定:一旦触发报警就会阻断工作的执行(将工作置为失败状态,使上游工作不会被触发执行)。
  • 弱规定:只报警但不阻断工作的执行。

DQC 提供罕用的规定模板,包含 表行数较 N 天前稳定率、表空间大小较 N 天前稳定率、字段最大 / 最小 / 平均值相比 N 天前稳定率、字段空值 / 惟一个数 等。

DQC 查看其实也是运行 SQL 工作,只是这个工作是嵌套在主工作中的,一旦检查点太多天然就会影响整体的性能,因而还是 依赖数据产等级 来确定规定的配置状况。比方 L1、L2 类数据监控率要达到 90% 以上,规定类型须要三种及以上,而不重要的数据资产则不强制要求。

2) 数据及时性监控

在确保数据准确性的前提下,须要进一步让数据可能及时地提供服务,否则数据的价值将大幅度降低,甚至没有价值,所以 确保数据及时性也是保障数据品质重中之重的一环

  1. 工作优先级

对于 DataWorks 平台的调度工作,能够通过智能监控工具进行优先级设置。DataWorks 的调度是一个树形构造,当配置了叶子节点的优先级,这个优先级会传递到所有的上游节点,而叶子节点通常就是服务业务的生产节点。

因而,在优先级的设置上,要先确定业务的资产等级,等级越高的业务对应的生产节点优先级越高,优先调度并占用计算资源,确保高等级业务的准时产出。

总之,就是依照数据资产等级优先执行高等级数据资产的调度工作,优先保障高等级业务的数据需要。

  1. 工作报警

工作报警和优先级相似,通过 DataWorks 的智能监控工具进行配置,只须要配置叶子节点即可向上游传递报警配置。工作执行过程中,可能出错或提早,为了保障最重要数据(即资产等级高的数据)产出,须要立刻解决出错并染指解决提早。

  1. DataWorks 智能监控

DataWorks 进行离线任务调度时,提供智能监控工具,对调度工作进行监控告警。依据监控规定和工作运行状况,智能监控决策是否报警、何时报警、如何报警以及给谁报警。智能监控会主动抉择最正当的报警工夫、报警形式以及报警对象。

4. 最初

要想真正解决数据品质问题,就要 明确业务需要并从需要开始控制数据品质,并建设数据品质管理机制。从业务登程做问题定义,由工具主动、及时发现问题,明确问题责任人,通过邮件、短信等形式进行告诉,保障问题及时告诉到责任人。跟踪问题整改进度,保证数据品质问题全过程的治理。

九、数仓标准建设指南

1. 数仓公共开发标准

1. 档次调用标准

稳固业务 依照规范的数据流向进行开发,即 ODS –> DWD –> DWS –> APP。非稳固业务 或探索性需要,能够遵循 ODS -> DWD -> APP 或者 ODS -> DWD -> DWM ->APP 两个模型数据流。

在保障了数据链路的合理性之后,也必须保障模型分层援用准则:

  • 失常流向:ODS -> DWD -> DWM -> DWS -> APP,当呈现 ODS -> DWD -> DWS -> APP 这种关系时,阐明主题域未笼罩全。应将 DWD 数据落到 DWM 中,对于应用频度非常低的表容许 DWD -> DWS。
  • 尽量避免呈现 DWS 宽表中应用 DWD 又应用(该 DWD 所归属主题域)DWM 的表。
  • 同一主题域内对于 DWM 生成 DWM 的表,原则上要尽量避免,否则会影响 ETL 的效率。
  • DWM、DWS 和 APP 中禁止间接应用 ODS 的表,ODS 的表只能被 DWD 援用。
  • 禁止呈现反向依赖,例如 DWM 的表依赖 DWS 的表。

举例:

2. 数据类型标准

需对立规定不同的数据的数据类型,严格依照规定的数据类型执行:

  1. 金额 :double 或应用 decimal(11,2) 控制精度等, 明确单位是分还是元
  2. 字符串:string。
  3. id 类:bigint。
  4. 工夫:string。
  5. 状态:string

3. 数据冗余标准

宽表的冗余字段要确保:

  1. 冗余字段要应用高频,上游 3 个或以上应用
  2. 冗余字段引入 不应造成自身数据产生过多的延后
  3. 冗余字段 和已有字段的反复率不应过大,原则上不应超过 60%,如须要能够抉择 join 或原表拓展。

4. NULL 字段解决标准

  • 对于维度字段,需设置为 -1
  • 对于指标字段,需设置为 0

5. 指标口径标准

保障主题域内,指标口径统一,无歧义

通过数据分层,提供对立的数据进口,对立对外输入的数据口径,防止同一指标不同口径的状况产生。

1) 指标梳理

指标口径的不统一使得数据应用的老本极高,经常出现口径打架、重复核查数据的问题。在数据治理中,咱们将需要梳理到的所有指标进行进一步梳理,明确其口径,如果存在两个指标名称雷同,但口径不统一,先判断是否是进行合并,如须要同时存在,那么在命名上必须可能辨别开。

2) 指标治理

指标治理分为原子指标保护和派生指标保护。

原子指标:

  • 抉择原子指标的归属产线、业务板块、数据域、业务过程
  • 抉择原子指标的统计数据来源于该业务过程下的原始数据源
  • 录入原子指标的英文名称、中文名称、概述
  • 填写指标函数
  • 零碎依据指标函数主动生成原子指标的定义表达式
  • 零碎依据指标定义表达式以及数据源表生成原子指标 SQL

派生指标:

  • 在原子指标的根底之上抉择了一些维度或者润饰限定词。

6. 数据表解决标准

1) 增量表

新增数据,增量数据是上次导出之后的新数据。

  1. 记录每次减少的量,而不是总量;
  2. 增量表,只报变动量,无变动不必报;
  3. 每天一个分区。
2) 全量表

每天的所有的最新状态的数据。

  1. 全量表,有无变动,都要报;
  2. 每次上报的数据都是所有的数据(变动的 + 没有变动的);
  3. 只有一个分区。
3) 快照表

按日分区,记录截止数据日期的全量数据。

  1. 快照表,有无变动,都要报;
  2. 每次上报的数据都是所有的数据(变动的 + 没有变动的);
  3. 一天一个分区。
4) 拉链表

记录截止数据日期的全量数据。

  1. 记录一个事物从开始,始终到以后状态的所有变动的信息;
  2. 拉链表每次上报的都是历史记录的最终状态,是记录在以后时刻的历史总
    量;
  3. 以后记录存的是以后工夫之前的所有历史记录的最初变动量(总量);
  4. 只有一个分区。

7. 表的生命周期治理

这部分次要是要通过对历史数据的等级划分与对表类型的划分生成相应的生命周期治理矩阵。

1) 历史数据等级划分

次要将历史数据划分 P0、Pl、P2、P3 四个等级,其具体定义如下:

  • P0:十分重要的主题域数据和十分重要的利用数据,具备不可恢复性,如交易、日志、团体 KPI 数据、IPO 关联表。
  • Pl:重要的业务数据和重要的利用数据,具备不可恢复性,如重要的业务产品数据。
  • P2:重要的业务数据和重要的利用数据,具备可恢复性,如交易线 ETL 产生的两头过程数据。
  • P3:不重要的业务数据和不重要的利用数据,具备可恢复性,如某些 SNS 产品报表。
2) 表类型划分
  1. 事件型流水表(增量表)

事件型流水表(增量表)指数据无反复或者无主键数据,如日志。

  1. 事件型镜像表(增量表)

事件型镜像表(增量表)指业务过程性数据,有主键,然而对于同样主键的属性会产生迟缓变动,如交易、订单状态与工夫会依据业务产生变更。

  1. 维表

维表包含维度与维度属性数据,如用户表、商品表。

  1. Merge 全量表

Merge 全量表包含业务过程性数据或者维表数据。因为数据自身有新增的或者产生状态变更,对于同样主键的数据可能会保留多份,因而能够对这些数据依据主键进行 Merge 操作,主键对应的属性只会保留最新状态,历史状态保留在前一天分区 中。例如,用户表、交易表等都能够进行 Merge 操作。

  1. ETL 长期表

ETL 长期表是指 ETL 处理过程中产生的长期表数据,个别不倡议保留,最多 7 天。

  1. TT 长期数据

TT 拉取的数据和 DbSync 产生的长期数据最终会流转到 DS 层,ODS 层数据作为原始数据保留下来,从而使得 TT&DbSync 上游数据成为长期数据。这类数据不倡议保留很长时间,生命周期默认设置为 93 天,能够依据理论状况适当缩小保留天数。

7. 一般全量表

很多小业务数据或者产品数据,BI 个别是间接全量拉取,这种形式效率快,对存储压力也不是很大,而且表保留很长时间,能够依据历史数据等级确定保留策略。

通过上述历史数据等级划分与表类型划分,生成相应的生命周期治理矩阵,如下表所示:

2. 数仓各层开发标准

1. ODS 层设计规范

同步标准

  1. 一个零碎源表只容许同步一次;
  2. 全量初始化同步和增量同步解决逻辑要清晰;
  3. 以统计日期和工夫进行分区存储;
  4. 指标表字段在源表不存在时要主动填充解决。

表分类与生命周期

  1. ods 流水全量表
  • 不可再生的永恒保留;
  • 日志可按留存要求;
  • 按需设置保留非凡日期数据;
  • 按需设置保留非凡月份数据;
  1. ods 镜像型全量表
  • 举荐按天存储;
  • 对历史变动进行保留;
  • 最新数据存储在最大分区;
  • 历史数据按需保留;
  1. ods 增量数据
  • 举荐按天存储;
  • 有对应全量表的,倡议只保留 14 天数据;
  • 无对应全量表的,永恒保留;
  1. ods 的 etl 过程中的长期表
  • 举荐按需保留;
  • 最多保留 7 天;
  • 倡议用完即删,下次应用再生成;
  1. BDSync 非去重数据
  • 通过中间层保留,默认用完即删,不倡议保留。

数据品质

  1. 全量表必须配置唯一性字段标识;
  2. 对分区空数据进行监控;
  3. 对枚举类型字段,进行枚举值变动和散布监控;
  4. ods 表数据量级和记录数做环比监控;
  5. ods 全表都必须要有正文;

2. 公共维度层设计规范

1) 设计准则
  1. 一致性

共维度在不同的物理表中的字段名称、数据类型、数据内容必须保持一致(历史起因不统一,要做好版本控制)

  1. 维度的组合与拆分
  • 组合准则

将维度与关联性强的字段进行组合,一起查问,一起展现,两个维度必须具备人造的关系,如:商品的根本属性和所属品牌。

无相关性:如一些应用频率较小的杂项维度,能够构建一个汇合杂项维度的非凡属性。

行为维度:通过计算的度量,但上游当维度解决,例:点击量 0-1000,100-1000 等,能够做聚合分类。

  • 拆分与冗余

针对重要性,业务相关性、源、应用频率等可分为外围表、扩大表。

数据记录较大的维度,能够适当冗余一些子集。

2) 存储及生命周期治理

倡议按天分区。

  1. 3 个月内最大拜访跨度 <= 4 地利,倡议保留最近 7 天分区;
  2. 3 个月内最大拜访跨度 <=12 地利,倡议保留最近 15 天分区;
  3. 3 个月内最大拜访跨度 <=30 地利,倡议保留最近 33 天分区;
  4. 3 个月内最大拜访跨度 <=90 地利,倡议保留最近 120 天分区;
  5. 3 个月内最大拜访跨度 <=180 地利,倡议保留最近 240 天分区;
  6. 3 个月内最大拜访跨度 <=300 地利,倡议保留最近 400 天分区;

3. DWD 明细层设计规范

1) 存储及生命周期治理

倡议按天分区。

  1. 3 个月内最大拜访跨度 <= 4 地利,倡议保留最近 7 天分区;
  2. 3 个月内最大拜访跨度 <=12 地利,倡议保留最近 15 天分区;
  3. 3 个月内最大拜访跨度 <=30 地利,倡议保留最近 33 天分区;
  4. 3 个月内最大拜访跨度 <=90 地利,倡议保留最近 120 天分区;
  5. 3 个月内最大拜访跨度 <=180 地利,倡议保留最近 240 天分区;
  6. 3 个月内最大拜访跨度 <=300 地利,倡议保留最近 400 天分区;
2) 事务型事实表设计准则
  • 基于数据利用需要的剖析设计事务型事实表,联合上游较大的针对某个业务过程和剖析指标需要,可思考基于某个事件过程构建事务型实时表;
  • 个别选用事件的产生日期或工夫作为分区字段,便于扫描和裁剪;
  • 冗余子集准则,有利于升高后续 IO 开销;
  • 明细层事实表维度进化,缩小后续应用 join 老本。
3) 周期快照事实表
  • 周期快照事实表中的每行汇总了产生在某一规范周期,如某一天、某周、某月的多个度量事件。
  • 粒度是周期性的,不是个体的事务。
  • 通常蕴含许多事实,因为任何与事实表粒度统一的度量事件都是被容许的。
4) 累积快照事实表
  • 多个业务过程联结剖析而构建的事实表,如洽购单的流转环节。
  • 用于剖析事件工夫和工夫之间的距离周期。
  • 大量的且以后事务型不反对的,如敞开、发货等相干的统计。

4. DWS 公共汇总层设计规范

数据仓库的性能是数据仓库建设是否胜利的重要规范之一。汇集 次要是通过 汇总明细粒度数据 来取得改良查问性能的成果。通过拜访汇集数据,能够缩小数据库在响应查问时必须执行的工作量,可能疾速响应用户的查问,同时有利于缩小不同用拜访明细数据带来的后果不统一问题。

1) 汇集的根本准则
  • 一致性。汇集表必须提供与查问明细粒度数据统一的查问后果。
  • 防止繁多表设计。不要在同一个表中存储不同档次的汇集数据。
  • 汇集粒度可不同。汇集并不需要放弃与原始明细粒度数据一样的粒度,汇集只关怀所须要查问的维度。
2) 汇集的根本步骤

第一步:确定汇集维度

在原始明细模型中会存在多个形容事实的维度,如日期、商品类别、卖家等,这时候须要确定依据什么维度汇集,如果只关怀商品的交易额状况,那么就能够依据商品维度汇集数据。

第二步:确定一致性上钻

这时候要关怀是按月汇总还是按天汇总,是依照商品汇总还是依照类目汇总,如果依照类目汇总,还须要关怀是依照大类汇总还是小类汇总。当然,咱们要做的只是理解用户须要什么,而后依照他们想要的进行汇集。

第三步:确定汇集事实

在原始明细模型中可能会有多个事实的度量,比方在交易中有交易额、交易数量等,这时候要明确是依照交易额汇总还是依照成交数量汇总。

3) 公共汇总层设计准则

除了汇集根本的准则外,公共汇总层还必须遵循以下准则:

  • 数据专用性。汇总的汇集会有第三者应用吗?基于某个维度的汇集是不是常常用于数据分析中?如果答案是必定的,那么就有必要把明细数据通过汇总积淀到汇集表中。
  • 不跨数据域。数据域是在较高层次上对数据进行分类汇集的形象。如以业务
  • 辨别统计周期 。在表的命名上要能阐明数据的统计周期,如 _Id
    示意最近 1 天,_td 示意截至当天,_nd 示意最近 N 天。

3. 数仓命名标准

1. 词根设计规范

词根属于数仓建设中的标准,属于元数据管理的领域,当初把这个划到数据治理的一部分。残缺的数仓建设是蕴含数据治理的,只是当初谈到数仓偏差于数据建模,而谈到数据治理,更多的是对于数据标准、数据管理。

表命名,其实在很大水平上是对元数据形容的一种体现,表命名标准越欠缺,我 们能从表名获取到的信息就越多。比方:一部分业务是对于货架的,英文名是:rack,rack 就是一个词根,那咱们就在所有的表、字段等用到的中央都叫 rack,不要叫成 别的什么。这就是词根的作用,用来对立命名,表白同一个含意。

指标体系中有很多“率”的指标,都能够拆解成 XXX+ 率,率能够叫 rate,那我 们所有的指标都叫做 XXX+rate。

词根:能够用来对立表名、字段名、主题域名等等

举例:以流程图的形式来展现,更加直观和易懂,本图偏重 dwm 层表的命名 标准,其余命名是相似的情理:

第一个判断条件是该表的用处,是两头表、原始日志还是业务展现用的表 如果该表被判断为两头表,就会走入下一个判断条件:表是否有 group 操作 通过是否有 group 操作来判断该表该划分在 dwd 层还是 dwm 和 dws 层 如果不是 dwd 层,则须要判断该表是否是多个行为的汇总表(即宽表)最初再别离填上事业群、部门、业务线、自定义名称和更新频率等信息即可。

分层:表的应用范畴

事业群和部门:生产该表或者该数据的团队

业务线:表明该数据是哪个产品或者业务线相干

主题域:剖析问题的角度,对象实体

自定义:个别会尽可能多形容该表的信息,比方沉闷表、留存表等

更新周期:比如说天级还是月级更新

数仓表的命名标准如下

1. 数仓档次:

专用维度:dim

DM 层:dm

ODS 层:ods

DWD 层:dwd

DWS 层:dws

2. 周期 / 数据范畴:

日快照:d

增量:i

全量:f

周:w

拉链表:l

非分区全量表:a

2. 表命名标准

1) 惯例表

惯例表是咱们须要固化的表,是正式应用的表,是目前一段时间内须要去保护去 欠缺的表。

标准:分层前缀[dwd|dws|ads]_部门_业务域_主题域_XXX_更新周期 | 数据范畴

业务域、主题域咱们都能够用词根的形式枚举分明,不断完善。

更新周期次要的是工夫粒度、日、月、年、周等。

2) 两头表

两头表个别呈现在 Job 中,是 Job 中长期存储的两头数据的表,两头表的作 用域只限于以后 Job 执行过程中,Job 一旦执行实现,该两头表的使命就完 成了,是能够删除的(依照本人公司的场景自由选择,以前公司会保留几天 的两头表数据,用来排查问题)。

标准:mid_table_name_[0~9|dim]

table_name 是咱们工作中指标表的名字,通常来说一个工作只有一个指标表。
这里加上表名,是为了避免自由发挥的时候表名抵触,而开端大家能够抉择自由发挥,起一些有意义的名字,或者简略粗犷,应用数字代替,各有优劣吧,审慎抉择。

通常会遇到须要补全维度的表,这里应用 dim 结尾。

如果要保留历史的两头表,能够加上日期或者工夫戳。

3) 长期表

长期表是长期测试的表,是长期应用一次的表,就是临时保留下数据看看,后续个别不再应用的表,是能够随时删除的表。

标准:tmp_xxx

只有加上 tmp 结尾即可,其余名字随便,留神 tmp 结尾的表不要用来理论应用,只是测试验证而已。

4) 维度表

维度表是基于底层数据,形象进去的形容类的表。维度表能够主动从底层表形象进去,也能够手工来保护。

标准:dim_xxx

维度表,对立以 dim 结尾,前面加上,对该指标的形容。

5) 手工表

手工表是手工保护的表,手工初始化一次之后,个别不会主动扭转,前面变更,也是手工来保护。

一般来说,手工的数据粒度是偏细的,所以临时对立放在 dwd 层,前面如果有目标值或者其余类型手工数据,再依据理论状况分层。

标准:dwd_业务域_manual_xxx

手工表,减少非凡的主题域,manual,示意手工保护表。

3. 指标命名标准

公共规定
  • 所有单词小写
  • 单词之间下划线宰割(反例:appName 或 AppName)
  • 可读性优于长度 (词根,避免出现同一个指标,命名一致性)
  • 禁止应用 sql 关键字,如字段名与关键字抵触时 +col
  • 数量字段后缀 _cnt 等标识 …
  • 金额字段后缀 _price 标识
  • 天分区应用字段 dt,格局对立(yyyymmdd 或 yyyy-mm-dd)
  • 小时分区应用字段 hh,范畴(00-23)
  • 分钟分区应用字段 mi,范畴(00-59)
  • 布尔类型标识:is_{业务},不容许呈现空值

参考文档:

  1. 上百本优质大数据书籍,附必读清单(大数据宝藏)
  2. 最强最全面的数仓建设标准指南
  3. 美团数据平台及数仓建设实际,超十万字总结
  4. 五万字 | 耗时一个月整顿出这份 Hadoop 吐血宝典

正文完
 0