实时数仓次要是为了解决传统数仓数据时效性低的问题,实时数仓通常会用在实时的OLAP剖析、实时的数据看板、业务指标实时监控等场景。尽管对于实时数仓的架构及技术选型与传统的离线数仓会存在差别,然而对于数仓建设的根本方法论是统一的。本文会分享基于Flink SQL从0到1搭建一个实时数仓的demo,波及数据采集、存储、计算、可视化整个解决流程。通过本文你能够理解到:
实时数仓的根本架构实时数仓的数据处理流程Flink1.11的SQL新个性Flink1.11存在的bug残缺的操作案例今人学识无遗力,少壮时间老始成。纸上得来终觉浅,绝知此事要躬行。
案例简介本文会以电商业务为例,展现实时数仓的数据处理流程。另外,本文旨在阐明实时数仓的构建流程,所以不会波及太简单的数据计算。为了保障案例的可操作性和完整性,本文会给出具体的操作步骤。为了不便演示,本文的所有操作都是在Flink SQL Cli中实现的。
架构设计具体的架构设计如图所示:首先通过canal解析MySQL的binlog日志,将数据存储在Kafka中。而后应用Flink SQL对原始数据进行荡涤关联,并将解决之后的明细宽表写入kafka中。维表数据存储在MySQL中,通过Flink SQL对明细宽表与维表进行JOIN,将聚合后的数据写入MySQL,最初通过FineBI进行可视化展现。
业务数据筹备订单表(order_info)CREATE TABLE `order_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号', `consignee` varchar(100) DEFAULT NULL COMMENT '收货人', `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话', `total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额', `order_status` varchar(20) DEFAULT NULL COMMENT '订单状态', `user_id` bigint(20) DEFAULT NULL COMMENT '用户id', `payment_way` varchar(20) DEFAULT NULL COMMENT '付款形式', `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址', `order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注', `out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方领取用)', `trade_body` varchar(200) DEFAULT NULL COMMENT '订单形容(第三方领取用)', `create_time` datetime DEFAULT NULL COMMENT '创立工夫', `operate_time` datetime DEFAULT NULL COMMENT '操作工夫', `expire_time` datetime DEFAULT NULL COMMENT '生效工夫', `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号', `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号', `img_url` varchar(200) DEFAULT NULL COMMENT '图片门路', `province_id` int(20) DEFAULT NULL COMMENT '地区', PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';订单详情表(order_detail)CREATE TABLE `order_detail` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号', `order_id` bigint(20) DEFAULT NULL COMMENT '订单编号', `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id', `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称(冗余)', `img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)', `order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)', `sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数', `create_time` datetime DEFAULT NULL COMMENT '创立工夫', PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单详情表';商品表(sku_info)CREATE TABLE `sku_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'skuid(itemID)', `spu_id` bigint(20) DEFAULT NULL COMMENT 'spuid', `price` decimal(10,0) DEFAULT NULL COMMENT '价格', `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称', `sku_desc` varchar(2000) DEFAULT NULL COMMENT '商品规格形容', `weight` decimal(10,2) DEFAULT NULL COMMENT '分量', `tm_id` bigint(20) DEFAULT NULL COMMENT '品牌(冗余)', `category3_id` bigint(20) DEFAULT NULL COMMENT '三级分类id(冗余)', `sku_default_img` varchar(200) DEFAULT NULL COMMENT '默认显示图片(冗余)', `create_time` datetime DEFAULT NULL COMMENT '创立工夫', PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='商品表';商品一级类目表(base_category1)CREATE TABLE `base_category1` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号', `name` varchar(10) NOT NULL COMMENT '分类名称', PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='一级分类表';商品二级类目表(base_category2)CREATE TABLE `base_category2` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号', `name` varchar(200) NOT NULL COMMENT '二级分类名称', `category1_id` bigint(20) DEFAULT NULL COMMENT '一级分类编号', PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='二级分类表';商品三级类目表(base_category3)CREATE TABLE `base_category3` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号', `name` varchar(200) NOT NULL COMMENT '三级分类名称', `category2_id` bigint(20) DEFAULT NULL COMMENT '二级分类编号', PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='三级分类表';省份表(base_province)CREATE TABLE `base_province` ( `id` int(20) DEFAULT NULL COMMENT 'id', `name` varchar(20) DEFAULT NULL COMMENT '省名称', `region_id` int(20) DEFAULT NULL COMMENT '大区id', `area_code` varchar(20) DEFAULT NULL COMMENT '行政区位码') ENGINE=InnoDB DEFAULT CHARSET=utf8;区域表(base_region)CREATE TABLE `base_region` ( `id` int(20) NOT NULL COMMENT '大区id', `region_name` varchar(20) DEFAULT NULL COMMENT '大区名称', PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;留神:以上的建表语句是在MySQL中实现的,残缺的建表及模仿数据生成脚本见:链接:https://pan.baidu.com/s/1fcMg... 提取码:zuqw
...