实时数仓次要是为了解决传统数仓数据时效性低的问题,实时数仓通常会用在实时的 OLAP 剖析、实时的数据看板、业务指标实时监控等场景。尽管对于实时数仓的架构及技术选型与传统的离线数仓会存在差别,然而对于数仓建设的根本方法论是统一的。本文会分享基于 Flink SQL 从 0 到 1 搭建一个实时数仓的 demo,波及数据采集、存储、计算、可视化整个解决流程。通过本文你能够理解到:
- 实时数仓的根本架构
- 实时数仓的数据处理流程
- Flink1.11 的 SQL 新个性
- Flink1.11 存在的 bug
- 残缺的操作案例
今人学识无遗力,少壮时间老始成。
纸上得来终觉浅,绝知此事要躬行。
案例简介
本文会以电商业务为例,展现实时数仓的数据处理流程。另外,本文旨在阐明实时数仓的构建流程,所以不会波及太简单的数据计算。为了保障案例的可操作性和完整性,本文会给出具体的操作步骤。为了不便演示,本文的所有操作都是在 Flink SQL Cli 中实现的。
架构设计
具体的架构设计如图所示:首先通过 canal 解析 MySQL 的 binlog 日志,将数据存储在 Kafka 中。而后应用 Flink SQL 对原始数据进行荡涤关联,并将解决之后的明细宽表写入 kafka 中。维表数据存储在 MySQL 中,通过 Flink SQL 对明细宽表与维表进行 JOIN,将聚合后的数据写入 MySQL,最初通过 FineBI 进行可视化展现。
业务数据筹备
- 订单表(order_info)
CREATE TABLE `order_info` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`consignee` varchar(100) DEFAULT NULL COMMENT '收货人',
`consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',
`total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',
`order_status` varchar(20) DEFAULT NULL COMMENT '订单状态',
`user_id` bigint(20) DEFAULT NULL COMMENT '用户 id',
`payment_way` varchar(20) DEFAULT NULL COMMENT '付款形式',
`delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',
`order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',
`out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方领取用)',
`trade_body` varchar(200) DEFAULT NULL COMMENT '订单形容(第三方领取用)',
`create_time` datetime DEFAULT NULL COMMENT '创立工夫',
`operate_time` datetime DEFAULT NULL COMMENT '操作工夫',
`expire_time` datetime DEFAULT NULL COMMENT '生效工夫',
`tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',
`parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',
`img_url` varchar(200) DEFAULT NULL COMMENT '图片门路',
`province_id` int(20) DEFAULT NULL COMMENT '地区',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';
- 订单详情表(order_detail)
CREATE TABLE `order_detail` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',
`sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
`sku_name` varchar(200) DEFAULT NULL COMMENT 'sku 名称(冗余)',
`img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',
`order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时 sku 价格)',
`sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
`create_time` datetime DEFAULT NULL COMMENT '创立工夫',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单详情表';
- 商品表(sku_info)
CREATE TABLE `sku_info` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'skuid(itemID)',
`spu_id` bigint(20) DEFAULT NULL COMMENT 'spuid',
`price` decimal(10,0) DEFAULT NULL COMMENT '价格',
`sku_name` varchar(200) DEFAULT NULL COMMENT 'sku 名称',
`sku_desc` varchar(2000) DEFAULT NULL COMMENT '商品规格形容',
`weight` decimal(10,2) DEFAULT NULL COMMENT '分量',
`tm_id` bigint(20) DEFAULT NULL COMMENT '品牌(冗余)',
`category3_id` bigint(20) DEFAULT NULL COMMENT '三级分类 id(冗余)',
`sku_default_img` varchar(200) DEFAULT NULL COMMENT '默认显示图片(冗余)',
`create_time` datetime DEFAULT NULL COMMENT '创立工夫',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='商品表';
- 商品一级类目表(base_category1)
CREATE TABLE `base_category1` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`name` varchar(10) NOT NULL COMMENT '分类名称',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='一级分类表';
- 商品二级类目表(base_category2)
CREATE TABLE `base_category2` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`name` varchar(200) NOT NULL COMMENT '二级分类名称',
`category1_id` bigint(20) DEFAULT NULL COMMENT '一级分类编号',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='二级分类表';
- 商品三级类目表(base_category3)
CREATE TABLE `base_category3` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`name` varchar(200) NOT NULL COMMENT '三级分类名称',
`category2_id` bigint(20) DEFAULT NULL COMMENT '二级分类编号',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='三级分类表';
- 省份表(base_province)
CREATE TABLE `base_province` (`id` int(20) DEFAULT NULL COMMENT 'id',
`name` varchar(20) DEFAULT NULL COMMENT '省名称',
`region_id` int(20) DEFAULT NULL COMMENT '大区 id',
`area_code` varchar(20) DEFAULT NULL COMMENT '行政区位码'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 区域表(base_region)
CREATE TABLE `base_region` (`id` int(20) NOT NULL COMMENT '大区 id',
`region_name` varchar(20) DEFAULT NULL COMMENT '大区名称',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
留神:以上的建表语句是在 MySQL 中实现的,残缺的建表及模仿数据生成脚本见:
链接:https://pan.baidu.com/s/1fcMg… 提取码:zuqw
数据处理流程
ODS 层数据同步
对于 ODS 层的数据同步参见我的另一篇文章基于 Canal 与 Flink 实现数据实时增量同步(一)。次要应用 canal 解析 MySQL 的 binlog 日志,而后将其写入到 Kafka 对应的 topic 中。因为篇幅限度,不会对具体的细节进行阐明。同步之后的后果如下图所示:
DIM 层维表数据筹备
本案例中将维表存储在了 MySQL 中,理论生产中会用 HBase 存储维表数据。咱们次要用到两张维表:区域维表 和商品维表。处理过程如下:
- 区域维表
首先将 mydw.base_province
和mydw.base_region
这个主题对应的数据抽取到 MySQL 中,次要应用 Flink SQL 的 Kafka 数据源对应的 canal-json 格局,留神:在执行装载之前,须要先在 MySQL 中创立对应的表,本文应用的 MySQL 数据库的名字为dim,用于寄存维表数据。如下:
-- -------------------------
-- 省份
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_province`;
CREATE TABLE `ods_base_province` (
`id` INT,
`name` STRING,
`region_id` INT ,
`area_code`STRING
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_province',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 省份
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_province`;
CREATE TABLE `base_province` (
`id` INT,
`name` STRING,
`region_id` INT ,
`area_code`STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_province', -- MySQL 中的待插入数据的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 省份
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_province
SELECT *
FROM ods_base_province;
-- -------------------------
-- 区域
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_region`;
CREATE TABLE `ods_base_region` (
`id` INT,
`region_name` STRING
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_region',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 区域
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_region`;
CREATE TABLE `base_region` (
`id` INT,
`region_name` STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_region', -- MySQL 中的待插入数据的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 区域
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_region
SELECT *
FROM ods_base_region;
通过下面的步骤,将创立维表所须要的原始数据曾经存储到了 MySQL 中,接下来就须要在 MySQL 中创立维表,咱们应用下面的两张表,创立一张视图:dim_province
作为维表:
-- ---------------------------------
-- DIM 层, 区域维表,
-- 在 MySQL 中创立视图
-- ---------------------------------
DROP VIEW IF EXISTS dim_province;
CREATE VIEW dim_province AS
SELECT
bp.id AS province_id,
bp.name AS province_name,
br.id AS region_id,
br.region_name AS region_name,
bp.area_code AS area_code
FROM base_region br
JOIN base_province bp ON br.id= bp.region_id
;
这样咱们所须要的维表:dim_province 就创立好了,只须要在维表 join 时,应用 Flink SQL 创立 JDBC 的数据源,就能够应用该维表了。同理,咱们应用雷同的办法创立商品维表,具体如下:
-- -------------------------
-- 一级类目表
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_category1`;
CREATE TABLE `ods_base_category1` (
`id` BIGINT,
`name` STRING
)WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_category1',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 一级类目表
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_category1`;
CREATE TABLE `base_category1` (
`id` BIGINT,
`name` STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_category1', -- MySQL 中的待插入数据的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 一级类目表
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_category1
SELECT *
FROM ods_base_category1;
-- -------------------------
-- 二级类目表
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_category2`;
CREATE TABLE `ods_base_category2` (
`id` BIGINT,
`name` STRING,
`category1_id` BIGINT
)WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_category2',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 二级类目表
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_category2`;
CREATE TABLE `base_category2` (
`id` BIGINT,
`name` STRING,
`category1_id` BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_category2', -- MySQL 中的待插入数据的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 二级类目表
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_category2
SELECT *
FROM ods_base_category2;
-- -------------------------
-- 三级类目表
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_category3`;
CREATE TABLE `ods_base_category3` (
`id` BIGINT,
`name` STRING,
`category2_id` BIGINT
)WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_category3',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 三级类目表
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_category3`;
CREATE TABLE `base_category3` (
`id` BIGINT,
`name` STRING,
`category2_id` BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_category3', -- MySQL 中的待插入数据的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 三级类目表
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_category3
SELECT *
FROM ods_base_category3;
-- -------------------------
-- 商品表
-- Kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_sku_info`;
CREATE TABLE `ods_sku_info` (
`id` BIGINT,
`spu_id` BIGINT,
`price` DECIMAL(10,0),
`sku_name` STRING,
`sku_desc` STRING,
`weight` DECIMAL(10,2),
`tm_id` BIGINT,
`category3_id` BIGINT,
`sku_default_img` STRING,
`create_time` TIMESTAMP(0)
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.sku_info',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 商品表
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `sku_info`;
CREATE TABLE `sku_info` (
`id` BIGINT,
`spu_id` BIGINT,
`price` DECIMAL(10,0),
`sku_name` STRING,
`sku_desc` STRING,
`weight` DECIMAL(10,2),
`tm_id` BIGINT,
`category3_id` BIGINT,
`sku_default_img` STRING,
`create_time` TIMESTAMP(0),
PRIMARY KEY (tm_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'sku_info', -- MySQL 中的待插入数据的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 商品
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO sku_info
SELECT *
FROM ods_sku_info;
通过下面的步骤,咱们能够将创立商品维表的根底数据表同步到 MySQL 中,同样须要提前创立好对应的数据表。接下来咱们应用下面的根底表在 mySQL 的 dim 库中创立一张视图:dim_sku_info
,用作后续应用的维表。
-- ---------------------------------
-- DIM 层, 商品维表,
-- 在 MySQL 中创立视图
-- ---------------------------------
CREATE VIEW dim_sku_info AS
SELECT
si.id AS id,
si.sku_name AS sku_name,
si.category3_id AS c3_id,
si.weight AS weight,
si.tm_id AS tm_id,
si.price AS price,
si.spu_id AS spu_id,
c3.name AS c3_name,
c2.id AS c2_id,
c2.name AS c2_name,
c3.id AS c1_id,
c3.name AS c1_name
FROM
(
sku_info si
JOIN base_category3 c3 ON si.category3_id = c3.id
JOIN base_category2 c2 ON c3.category2_id =c2.id
JOIN base_category1 c1 ON c2.category1_id = c1.id
);
至此,咱们所须要的维表数据曾经筹备好了,接下来开始解决 DWD 层的数据。
DWD 层数据处理
通过下面的步骤,咱们曾经将所用的维表曾经筹备好了。接下来咱们将对 ODS 的原始数据进行解决,加工成 DWD 层的明细宽表。具体过程如下:
-- -------------------------
-- 订单详情
-- Kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_order_detail`;
CREATE TABLE `ods_order_detail`(
`id` BIGINT,
`order_id` BIGINT,
`sku_id` BIGINT,
`sku_name` STRING,
`img_url` STRING,
`order_price` DECIMAL(10,2),
`sku_num` INT,
`create_time` TIMESTAMP(0)
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.order_detail',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 订单信息
-- Kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_order_info`;
CREATE TABLE `ods_order_info` (
`id` BIGINT,
`consignee` STRING,
`consignee_tel` STRING,
`total_amount` DECIMAL(10,2),
`order_status` STRING,
`user_id` BIGINT,
`payment_way` STRING,
`delivery_address` STRING,
`order_comment` STRING,
`out_trade_no` STRING,
`trade_body` STRING,
`create_time` TIMESTAMP(0) ,
`operate_time` TIMESTAMP(0) ,
`expire_time` TIMESTAMP(0) ,
`tracking_no` STRING,
`parent_order_id` BIGINT,
`img_url` STRING,
`province_id` INT
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.order_info',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- ---------------------------------
-- DWD 层, 领取订单明细表 dwd_paid_order_detail
-- ---------------------------------
DROP TABLE IF EXISTS dwd_paid_order_detail;
CREATE TABLE dwd_paid_order_detail
(
detail_id BIGINT,
order_id BIGINT,
user_id BIGINT,
province_id INT,
sku_id BIGINT,
sku_name STRING,
sku_num INT,
order_price DECIMAL(10,0),
create_time TIMESTAMP(0),
pay_time TIMESTAMP(0)
) WITH (
'connector' = 'kafka',
'topic' = 'dwd_paid_order_detail',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- DWD 层, 已领取订单明细表
-- 向 dwd_paid_order_detail 装载数据
-- ---------------------------------
INSERT INTO dwd_paid_order_detail
SELECT
od.id,
oi.id order_id,
oi.user_id,
oi.province_id,
od.sku_id,
od.sku_name,
od.sku_num,
od.order_price,
oi.create_time,
oi.operate_time
FROM
(
SELECT *
FROM ods_order_info
WHERE order_status = '2' -- 已领取
) oi JOIN
(
SELECT *
FROM ods_order_detail
) od
ON oi.id = od.order_id;
ADS 层数据
通过下面的步骤,咱们创立了一张 dwd_paid_order_detail 明细宽表,并将该表存储在了 Kafka 中。接下来咱们将应用这张明细宽表与维表进行 JOIN,失去咱们 ADS 应用层数据。
- ads_province_index
首先在 MySQL 中创立对应的 ADS 指标表:ads_province_index
CREATE TABLE ads.ads_province_index(province_id INT(10),
area_code VARCHAR(100),
province_name VARCHAR(100),
region_id INT(10),
region_name VARCHAR(100),
order_amount DECIMAL(10,2),
order_count BIGINT(10),
dt VARCHAR(100),
PRIMARY KEY (province_id, dt)
) ;
向 MySQL 的 ADS 层指标装载数据:
-- Flink SQL Cli 操作
-- ---------------------------------
-- 应用 DDL 创立 MySQL 中的 ADS 层表
-- 指标:1. 每天每个省份的订单数
-- 2. 每天每个省份的订单金额
-- ---------------------------------
CREATE TABLE ads_province_index(
province_id INT,
area_code STRING,
province_name STRING,
region_id INT,
region_name STRING,
order_amount DECIMAL(10,2),
order_count BIGINT,
dt STRING,
PRIMARY KEY (province_id, dt) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/ads',
'table-name' = 'ads_province_index',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe'
);
-- ---------------------------------
-- dwd_paid_order_detail 已领取订单明细宽表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
detail_id BIGINT,
order_id BIGINT,
user_id BIGINT,
province_id INT,
sku_id BIGINT,
sku_name STRING,
sku_num INT,
order_price DECIMAL(10,2),
create_time STRING,
pay_time STRING
) WITH (
'connector' = 'kafka',
'topic' = 'dwd_paid_order_detail',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_province_index
-- 订单汇总长期表
-- ---------------------------------
CREATE TABLE tmp_province_index(
province_id INT,
order_count BIGINT,-- 订单数
order_amount DECIMAL(10,2), -- 订单金额
pay_date DATE
)WITH (
'connector' = 'kafka',
'topic' = 'tmp_province_index',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_province_index
-- 订单汇总长期表数据装载
-- ---------------------------------
INSERT INTO tmp_province_index
SELECT
province_id,
count(distinct order_id) order_count,-- 订单数
sum(order_price * sku_num) order_amount, -- 订单金额
TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY province_id,TO_DATE(pay_time,'yyyy-MM-dd')
;
-- ---------------------------------
-- tmp_province_index_source
-- 应用该长期汇总表,作为数据源
-- ---------------------------------
CREATE TABLE tmp_province_index_source(
province_id INT,
order_count BIGINT,-- 订单数
order_amount DECIMAL(10,2), -- 订单金额
pay_date DATE,
proctime as PROCTIME() -- 通过计算列产生一个解决工夫列) WITH (
'connector' = 'kafka',
'topic' = 'tmp_province_index',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- DIM 层, 区域维表,
-- 创立区域维表数据源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_province`;
CREATE TABLE dim_province (
province_id INT,
province_name STRING,
area_code STRING,
region_id INT,
region_name STRING ,
PRIMARY KEY (province_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'dim_province',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'scan.fetch-size' = '100'
);
-- ---------------------------------
-- 向 ads_province_index 装载数据
-- 维表 JOIN
-- ---------------------------------
INSERT INTO ads_province_index
SELECT
pc.province_id,
dp.area_code,
dp.province_name,
dp.region_id,
dp.region_name,
pc.order_amount,
pc.order_count,
cast(pc.pay_date as VARCHAR)
FROM
tmp_province_index_source pc
JOIN dim_province FOR SYSTEM_TIME AS OF pc.proctime as dp
ON dp.province_id = pc.province_id;
当提交工作之后:察看 Flink WEB UI:
查看 ADS 层的 ads_province_index 表数据:
- ads_sku_index
首先在 MySQL 中创立对应的 ADS 指标表:ads_sku_index
CREATE TABLE ads_sku_index
(sku_id BIGINT(10),
sku_name VARCHAR(100),
weight DOUBLE,
tm_id BIGINT(10),
price DOUBLE,
spu_id BIGINT(10),
c3_id BIGINT(10),
c3_name VARCHAR(100) ,
c2_id BIGINT(10),
c2_name VARCHAR(100),
c1_id BIGINT(10),
c1_name VARCHAR(100),
order_amount DOUBLE,
order_count BIGINT(10),
sku_count BIGINT(10),
dt varchar(100),
PRIMARY KEY (sku_id,dt)
);
向 MySQL 的 ADS 层指标装载数据:
-- ---------------------------------
-- 应用 DDL 创立 MySQL 中的 ADS 层表
-- 指标:1. 每天每个商品对应的订单个数
-- 2. 每天每个商品对应的订单金额
-- 3. 每天每个商品对应的数量
-- ---------------------------------
CREATE TABLE ads_sku_index
(
sku_id BIGINT,
sku_name VARCHAR,
weight DOUBLE,
tm_id BIGINT,
price DOUBLE,
spu_id BIGINT,
c3_id BIGINT,
c3_name VARCHAR ,
c2_id BIGINT,
c2_name VARCHAR,
c1_id BIGINT,
c1_name VARCHAR,
order_amount DOUBLE,
order_count BIGINT,
sku_count BIGINT,
dt varchar,
PRIMARY KEY (sku_id,dt) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/ads',
'table-name' = 'ads_sku_index',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe'
);
-- ---------------------------------
-- dwd_paid_order_detail 已领取订单明细宽表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
detail_id BIGINT,
order_id BIGINT,
user_id BIGINT,
province_id INT,
sku_id BIGINT,
sku_name STRING,
sku_num INT,
order_price DECIMAL(10,2),
create_time STRING,
pay_time STRING
) WITH (
'connector' = 'kafka',
'topic' = 'dwd_paid_order_detail',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_sku_index
-- 商品指标统计
-- ---------------------------------
CREATE TABLE tmp_sku_index(
sku_id BIGINT,
order_count BIGINT,-- 订单数
order_amount DECIMAL(10,2), -- 订单金额
order_sku_num BIGINT,
pay_date DATE
)WITH (
'connector' = 'kafka',
'topic' = 'tmp_sku_index',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_sku_index
-- 数据装载
-- ---------------------------------
INSERT INTO tmp_sku_index
SELECT
sku_id,
count(distinct order_id) order_count,-- 订单数
sum(order_price * sku_num) order_amount, -- 订单金额
sum(sku_num) order_sku_num,
TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY sku_id,TO_DATE(pay_time,'yyyy-MM-dd')
;
-- ---------------------------------
-- tmp_sku_index_source
-- 应用该长期汇总表,作为数据源
-- ---------------------------------
CREATE TABLE tmp_sku_index_source(
sku_id BIGINT,
order_count BIGINT,-- 订单数
order_amount DECIMAL(10,2), -- 订单金额
order_sku_num BIGINT,
pay_date DATE,
proctime as PROCTIME() -- 通过计算列产生一个解决工夫列) WITH (
'connector' = 'kafka',
'topic' = 'tmp_sku_index',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- DIM 层, 商品维表,
-- 创立商品维表数据源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_sku_info`;
CREATE TABLE dim_sku_info (
id BIGINT,
sku_name STRING,
c3_id BIGINT,
weight DECIMAL(10,2),
tm_id BIGINT,
price DECIMAL(10,2),
spu_id BIGINT,
c3_name STRING,
c2_id BIGINT,
c2_name STRING,
c1_id BIGINT,
c1_name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'dim_sku_info',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'scan.fetch-size' = '100'
);
-- ---------------------------------
-- 向 ads_sku_index 装载数据
-- 维表 JOIN
-- ---------------------------------
INSERT INTO ads_sku_index
SELECT
sku_id ,
sku_name ,
weight ,
tm_id ,
price ,
spu_id ,
c3_id ,
c3_name,
c2_id ,
c2_name ,
c1_id ,
c1_name ,
sc.order_amount,
sc.order_count ,
sc.order_sku_num ,
cast(sc.pay_date as VARCHAR)
FROM
tmp_sku_index_source sc
JOIN dim_sku_info FOR SYSTEM_TIME AS OF sc.proctime as ds
ON ds.id = sc.sku_id
;
当提交工作之后:察看 Flink WEB UI:
查看 ADS 层的 ads_sku_index 表数据:
FineBI 后果展现
其余留神点
Flink1.11.0 存在的 bug
当在代码中应用 Flink1.11.0 版本时,如果将一个 change-log 的数据源 insert 到一个 upsert sink 时,会报如下异样:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status])
该 bug 目前已被修复,修复能够在 Flink1.11.1 中应用。
总结
本文次要分享了构建一个实时数仓的 demo 案例,通过本文能够理解实时数仓的数据处理流程,在此基础之上,对 Flink SQL 的 CDC 会有更加粗浅的意识。另外,本文给出了十分具体的应用案例,你能够间接上手进行操作,在实践中摸索实时数仓的构建流程。
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包