乐趣区

关于后端:实现淘宝母婴订单实时查询和可视化|FlinkLearning实战营

作者|吴西荣 @阿里云

为进一步帮忙开发者学习应用 Flink,Apache Flink 中文社区近期发动 Flink-Learning 实战营我的项目。本次实战营通过实在乏味的实战场景帮忙开发者实操体验 Flink,课程包含实时数据接入、实时数据分析、实时数据利用的场景实。并联合小松鼠助教模式,全方位帮忙入营开发者轻松玩转 Flink,点击下方图片扫码即刻入营

本期将持续具体介绍 Flink- Learning 实战营。


试验简介

随着“全面二孩”政策落地、居民可摆布支出稳步减少等因素的刺激,中国的母婴消费市场持续增长。与此同时,随着国民生产降级 90 后宝爸、宝妈人数剧增,生产需要与生产理念都产生了微小的变动。据罗兰贝格最新颁布的报告预计,曾经通过了 16 个年头倒退的母婴行业,到 2020 年,整体规模将达到 3.6 万亿元,2016-2020 年复合增速高达 17%, 行业前景看起来一片光明。如此大好形势下,母婴人群在母婴生产上有什么特点?生产最高的我的项目是什么?

本场景中订单和婴儿信息存储在 MySQL 中,对于订单表,为了不便进行剖析,咱们让它关联上其对应的婴儿信息,形成一张宽表,应用 Flink 实时把它写到 Elasticsearch 中;另一方面数据通过分组聚合后,计算出订单数量和婴儿出世的关系,实时把它写到 Elasticsearch 中并展现到 Kibana 大屏中。

试验资源

实验所开明的云产品因数据连通性要求,需应用同一 Region 可用区,倡议都选取北京 Region 的同一可用区。波及的云产品包含阿里云实时计算 Flink 版、检索剖析服务 Elasticsearch 版、阿里云数据库 RDS。

体验指标

本场景将以 阿里云实时计算 Flink 版为根底,应用 Flink 自带的 MySQL Connector 连贯 RDS 云数据库实例、Elasticsearch Connector 连贯 Elasticsearch 检索剖析服务实例,并以一个淘宝母婴订单实时查问的例子尝试上手 Connector 的数据捕捉、数据写入等性能。

按步骤实现本次试验后,您将把握的常识有:

  • 应用 Flink 实时计算平台创立并提交作业的办法;
  • 编写基于 Flink Table API SQL 语句的能力;
  • 应用 MySQL Connector 对数据库进行读取的办法;
  • 应用 Elasticsearch Connector 对数据库进行写入的办法。

步骤一:创立资源

开始试验之前,您须要先创立相干试验资源,确保 RDS 实例、Elasticsearch 实例、Flink 实例在同一个 VPC 网络下,并配置实现 RDS 白名单、Elasticsearch 白名单使网络买通。

步骤二:创立数据库表

在这个例子中,咱们将创立三张数据表,其中一张 orders_dataset_tmp 是导入数据的长期表,其余两张作为源表,体验淘宝母婴订单实时查问。

1. 点击云数据库 RDS 控制台「实例列表」,切换到下面创立实例所在的 region,点击本人的实例名称进入详情页,首次应用别离点击「账号治理」和「数据库治理」,创立账号和数据库并使账号绑定到指定数据库。

2. 点击云数据库 RDS 实例详情页上方「登录数据库」,会主动跳转到 DMS 数据管理平台,输出用户名和明码登录刚刚创立的实例,点击左侧「数据库实例」-「已登录实例」列表,双击要编辑的数据库名,而后在右侧 SQL Console 命令区输出以下建表指令并执行:

create table orders_dataset_tmp(
    user_id bigint comment '用户身份信息',            
    auction_id bigint comment '购买行为编号',        
    cat_id bigint comment '商品种类序列号',            
    cat1 bigint comment '商品序列号(根类别)',                
    property TEXT comment '商品属性',            
    buy_mount int comment '购买数量',            
    day TEXT comment '购买工夫'                
);

create table orders_dataset(
    order_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY comment '订单 id',
    user_id bigint comment '用户身份信息',            
    auction_id bigint comment '购买行为编号',        
    cat_id bigint comment '商品种类序列号',            
    cat1 bigint comment '商品序列号(根类别)',                
    property TEXT comment '商品属性',            
    buy_mount int comment '购买数量',            
    day TEXT comment '购买工夫'                
);

--
create table baby_dataset(
    user_id bigint NOT NULL PRIMARY KEY,    
    birthday text comment '婴儿生日',
    gender int comment '0 denotes female, 1 denotes male, 2 denotes unknown'
);

3. 在 DMS 数据管理平台,点击左侧「罕用性能」-「数据导入」,配置如下信息后点击提交申请,将 (sample)sam_tianchi_mum_baby_trade_history.csv 导入 orders_dataset_tmp 表,(sample)sam_tianchi_mum_baby.csv 导入 baby_dataset 表。

配置项 阐明
数据库 含糊搜寻数据库名后点击对应的 MySQL 实例
文件编码 自动识别
导入模式 极速模式
文件类型 CSV 格局
指标表 含糊搜寻要导入的表名后点击选中
数据地位 抉择第 1 行为属性
写入形式 INSERT
附件 点击上传按钮上传要导入到表的对应文件

导入实现之后执行以下 SQL 将订单数据导入到订单源表 orders_dataset 中。

insert into orders_dataset(user_id,auction_id,cat_id,cat1,property,buy_mount,day)
select * from orders_dataset_tmp;

步骤三:配置 Elasticsearch 主动创立索引

进入检索剖析服务控制台,Elasticsearch 实例列表找到本人的实例,而后点击实例名进入详情界面,点击「配置与治理」-「ES 集群配置」,点击「批改配置」,抉择「容许主动创立索引」,点击「确定」。

批改配置须要期待十几分钟,请急躁期待配置变更实现后再持续应用 Elasticsearch。

步骤四:创立实时查问 SQL 作业

1. 进入实时计算 Flink 平台,点击左侧边栏中的「利用」—「作业开发」菜单,并点击顶部工具栏的「新建」按钮新建一个作业。作业名字任意,类型抉择「流作业 / SQL」,其余设置放弃默认。如下所示:

2. 胜利创立作业后,右侧编辑窗格应该显示新作业的内容:

3. 接下来,咱们在右侧编辑窗格中输出以下语句来创立二张长期表,并应用 MySQL CDC 连接器实时捕捉 orders_datasetbaby_dataset的变动:

CREATE TEMPORARY TABLE orders_dataset (
    order_id BIGINT,
  `user_id` bigint,            
    auction_id bigint,        
    cat_id bigint,            
    cat1 bigint,                
    property varchar,            
    buy_mount int,            
    `day` varchar    ,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql',
    'hostname' = '******************.mysql.rds.aliyuncs.com',
    'port' = '3306',
    'username' = '***********',
    'password' = '***********',
    'database-name' = '***********',
    'table-name' = 'orders_dataset'
);
CREATE TEMPORARY TABLE baby_dataset (
    `user_id` bigint,
    birthday varchar,
    gender int,
    PRIMARY KEY(user_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql',
    'hostname' = '******************.mysql.rds.aliyuncs.com',
    'port' = '3306',
    'username' = '***********',
    'password' = '***********',
    'database-name' = '***********',
    'table-name' = 'baby_dataset'
);

须要将 hostname参数替换为早些时候创立资源的域名、将 usernamepassword参数替换为数据库登录用户名及明码、将 database-name参数替换为之前在 RDS 后盾中创立的数据库名称。

其中,'connector' = 'mysql'指定了应用 MySQL CDC 连接器来捕捉变动数据。您须要应用筹备步骤中申请的 RDS MySQL URL、用户名、明码,以及之前创立的数据库名替换对应局部。

任何时候您都能够点击顶部工具栏中的「验证」按钮,来确认作业 Flink SQL 语句中是否存在语法错误。

4. 为了测试是否胜利地捕捉了源表数据,紧接着在上面写一行 SELECT * FROM source_table;语句,选中长期表和 select 语句,并点击工具栏中的「执行」按钮。如果控制台中打印了相应的数据行,则阐明捕捉胜利,如下图所示:

5. 咱们在右侧编辑窗格中输出以下语句来创立一张长期表,并应用 Elasticsearch 连接器连贯到 Elasticsearch 实例:

CREATE TEMPORARY TABLE es_sink(
    order_id BIGINT,
    `user_id` bigint,            
    auction_id bigint,        
    cat_id bigint,            
    cat1 bigint,                
    property varchar,            
    buy_mount int,            
    `day` varchar    ,
    birthday varchar,
    gender int,
   PRIMARY KEY(order_id) NOT ENFORCED  -- 主键可选,如果定义了主键,则作为文档 ID,否则文档 ID 将为随机值。) WITH (
'connector' = 'elasticsearch-7',
  'hosts' = 'http://**********:9200',
  'index' = 'enriched_orders',
  'username' ='elastic',
  'password' ='*******'-- 创立 ES 实例时自定义的明码
);

须要将 hosts参数替换为早些时候创立资源的域名、将 password参数替换为登录 Kibana 明码。

其中,'connector' = 'elasticsearch-7'指定了应用 Elasticsearch 连接器来连贯 Elasticsearch 实例写入数据。您须要应用筹备步骤中申请的 Elasticsearch URL、用户名、明码。

6. 接下来,咱们心愿对原始数据依照 user_id 进行 JOIN,形成一张宽表。并把宽表数据写入到 Elasticsearch 的 enriched_orders 索引中。咱们在 Flink 作业编辑窗格中输出如下代码:

INSERT INTO es_sink
SELECT o.*,
    b.birthday,
    b.gender
FROM orders_dataset /*+ OPTIONS('server-id'='123450-123452') */ o
LEFT JOIN baby_dataset /*+ OPTIONS('server-id'='123453-123455') */ as b
    ON o.user_id = b.user_id;

在保障源表中有数据的状况下,再次执行 Flink 作业,察看控制台的输入后果:

当初,点击管制台上的「上线」按钮,即可将咱们编写的 Flink SQL 作业部署上线执行。您能够登录 Kibana 点击「Stack Management」-「Index Management」搜寻 enriched_orders 查看 enriched_orders 索引是否胜利创立。

阿里云实时计算控制台在应用「执行」性能调试时,不会写入任何数据到上游中。因而为了测试应用 SQL Connector 写入汇表,您必须应用「上线」性能。

您也能够进入 Flink UI 控制台察看流数据处理图。

7.Elasticsearch 的 enriched_orders 索引创立胜利后,点击「Discover」-「create index pattern」,输出enriched_orders,点击「Next step」–「create index pattern」,创立实现后就能够在「Kibana」-「Discover」看到写入的数据了。

8. 接下来,咱们通过对 MySQL 中源表的数据进行增改删操作,每执行一步就刷新一下「Kibana」-「Discover」界面,察看数据的变动。

8.1 order_dataset 表增加一条数据

insert into orders_dataset values (DEFAULT ,2222222,2222222,50018855,38,'21458:33304;6933666:4421827;21475:137319;12121566:3861755',1,'20130915');

8.2 baby_dataset 表中增加一条数据

insert into baby_dataset values(144335047,'20150523',1);

写入前

写入后

8.3 order_dataset 表更新一条数据

select order_id from orders_dataset where user_id = 2757;
-- 依据查到的 order_id 更新数据
UPDATE orders_dataset SET auction_id = 2222223 WHERE order_id = ;

更新前

更新后

8.4 order_dataset 表中删除一条数据

select order_id from orders_dataset where user_id = 2222222;
DELETE FROM orders_dataset WHERE order_id = ;

删除前

删除后

步骤五:创立实时大屏 SQL 作业

后面四步和 步骤四 的后面四步雷同,区别在于前面步骤作业的解决逻辑 SQL 不同,要统计的指标不同,所以 Elasticsearch 的 Schema 与之前不同。

1. 进入实时计算 Flink 平台,点击左侧边栏中的「利用」—「作业开发」菜单,并点击顶部工具栏的「新建」按钮新建一个作业。作业名字任意,类型抉择「流作业 / SQL」,其余设置放弃默认。如下所示:

2. 胜利创立作业后,右侧编辑窗格应该显示新作业的内容:

3. 接下来,咱们在右侧编辑窗格中输出以下语句来创立二张长期表,并应用 MySQL CDC 连接器实时捕捉 orders_datasetbaby_dataset的变动:

CREATE TEMPORARY TABLE orders_dataset (
    order_id BIGINT,
  `user_id` bigint,            
    auction_id bigint,        
    cat_id bigint,            
    cat1 bigint,                
    property varchar,            
    buy_mount int,            
    `day` varchar    ,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql',
    'hostname' = '******************.mysql.rds.aliyuncs.com',
    'port' = '3306',
    'username' = '***********',
    'password' = '***********',
    'database-name' = '***********',
    'table-name' = 'orders_dataset'
);
CREATE TEMPORARY TABLE baby_dataset (
    `user_id` bigint,
    birthday varchar,
    gender int,
    PRIMARY KEY(user_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql',
    'hostname' = '******************.mysql.rds.aliyuncs.com',
    'port' = '3306',
    'username' = '***********',
    'password' = '***********',
    'database-name' = '***********',
    'table-name' = 'baby_dataset'
);

须要将 hostname参数替换为早些时候创立资源的域名、将 usernamepassword参数替换为数据库登录用户名及明码、将 database-name参数替换为之前在 RDS 后盾中创立的数据库名称。

其中,'connector' = 'mysql'指定了应用 MySQL CDC 连接器来捕捉变动数据。您须要应用筹备步骤中申请的 RDS MySQL URL、用户名、明码,以及之前创立的数据库名替换对应局部。

任何时候您都能够点击顶部工具栏中的「验证」按钮,来确认作业 Flink SQL 语句中是否存在语法错误。

4. 为了测试是否胜利地捕捉了源表数据,紧接着在上面写一行 SELECT * FROM source_table;语句,选中长期表和 select 语句,并点击工具栏中的「执行」按钮。如果控制台中打印了相应的数据行,则阐明捕捉胜利,如下图所示:

5. 咱们在右侧编辑窗格中输出以下语句来创立一张长期表,并应用 Elasticsearch 连接器连贯到 Elasticsearch 实例:

CREATE TEMPORARY TABLE es_sink(
  day_year varchar,
  `buy_num` bigint,            
    baby_num bigint,
  PRIMARY KEY(day_year) NOT ENFORCED  -- 主键可选,如果定义了主键,则作为文档 ID,否则文档 ID 将为随机值。) WITH (
'connector' = 'elasticsearch-7',
  'hosts' = 'http://**********:9200',
  'index' = 'enriched_orders_view',
  'username' ='elastic',
  'password' ='*******'-- 创立 ES 实例时自定义的明码
);

须要将 hosts参数替换为早些时候创立资源的域名、将 password参数替换为登录 Kibana 明码。

其中,'connector' = 'elasticsearch-7'指定了应用 Elasticsearch 连接器来连贯 Elasticsearch 实例写入数据。您须要应用筹备步骤中申请的 Elasticsearch URL、用户名、明码。

6. 接下来,咱们心愿对原始数据依照 user_id 进行 JOIN,形成一张宽表。而后对宽表数据的订单工夫取到月份进行分组 GROUP BY,并统计每个分组中订单的购买数量 SUM 和出世婴儿的数量 COUNT,并将后果数据写入到 Elasticsearch 的 enriched_orders_view 索引中。咱们在 Flink 作业编辑窗格中输出如下代码:

INSERT INTO es_sink
SELECT 
    SUBSTRING(tmp1.`day` FROM 1 FOR 6) as day_year,
    SUM(tmp1.buy_mount) as buy_num,
    COUNT(birthday) as baby_num
FROM(
    SELECT o.*,
        b.birthday,
        b.gender
    FROM orders_dataset /*+ OPTIONS('server-id'='123456-123457') */ o
    LEFT JOIN baby_dataset /*+ OPTIONS('server-id'='123458-123459') */ as b
        ON o.user_id = b.user_id
) tmp1
GROUP BY SUBSTRING(tmp1.`day` FROM 1 FOR 6)

在保障源表中有数据的状况下,再次执行 Flink 作业,察看控制台的输入后果:

当初,点击管制台上的「上线」按钮,即可将咱们编写的 Flink SQL 作业部署上线执行。您能够登录 Kibana 点击「Stack Management」-「Index Management」搜寻 enriched_orders_view 查看 enriched_orders_view 索引是否胜利创立。

阿里云实时计算控制台在应用「执行」性能调试时,不会写入任何数据到上游中。因而为了测试应用 SQL Connector 写入汇表,您必须应用「上线」性能。

您也能够进入 Flink UI 控制台察看流数据处理图。

7.Elasticsearch 的 enriched_orders_view 索引创立胜利后,点击「Discover」-「create index pattern」,输出enriched_orders_view,点击「Next step」–「create index pattern」,创立实现后就能够在「Kibana」-「Discover」看到写入的数据了。

8. 在「Discover」界面点击左下角「Available fields」-「baby_num」,点击后会展现「TOP 5 VALUES」小窗口,点击窗口下方的「Visualize」,即可跳转到可视化图表界面。

跳转界面后切换图形格局为柱状图 Bar。

配置右侧X-axisY-axis

X-axis 配置 Select a field 为 day_year.keyword,Number of values抉择到最大 100,order by抉择 alphabetical,order direction 抉择 ascending,Display name自定义横轴名称,此处定义为 day_year_month,而后点击Close

Y-axis 配置 Select a field 为 buy_num,Display name自定义纵轴名称,此处定义为 buy_num,Axis side 抉择 Left,而后点击Close。界面两头即生成了对应的折线图。

9. 点击右下角的+,新建一个 layer,切换新建的 layer 的图格局为折线图 Line

配置右侧X-axisY-axis

X-axis 配置 Select a field 为 day_year.keyword,Number of values抉择到最大 100,order by抉择 alphabetical,order direction 抉择 ascending,Display name自定义横轴名称,此处定义为 day_year_month,而后点击Close

与上一个 X-axis 配置完全相同。

Y-axis 配置 Select a field 为 baby_num,Display name自定义纵轴名称,此处定义为 baby_num,Value format抉择 Pecent,Axis side 抉择 Right,而后点击Close。界面两头即生成了对应的折线图与柱状图的复合图。

10. 最初点击右上角的Save,定义此图表的名称即可保留。

想要理解更多商品销售额实时统计的试验信息吗?快来尝试一下吧!

点击即刻入营


更多内容

流动举荐阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:0 元试用 实时计算 Flink 版(5000CU* 小时,3 个月内)理解流动详情:https://click.aliyun.com/m/1000372333/

退出移动版