关于后端:如何实时统计最近-15-秒的商品销售额|FlinkLearning-实战营

41次阅读

共计 4869 个字符,预计需要花费 13 分钟才能阅读完成。

为进一步帮忙开发者学习应用 Flink,Apache Flink 中文社区近期发动 Flink-Learning 实战营我的项目。本次实战营通过实在乏味的实战场景帮忙开发者实操体验 Flink,课程包含实时数据接入、实时数据分析、实时数据利用的场景实。并联合小松鼠助教模式,全方位帮忙入营开发者轻松玩转 Flink, 点击下方图片扫码即刻入营

本期内容具体介绍 Flink 实战营第一期实战。


想要理解如何应用 Flink 实时统计最近 15 秒的商品销售额吗?本试验将以阿里云实时计算 Flink 版为根底,应用 Flink 自带的 MySQL Connector 连贯 RDS 云数据库实例,并以实时商品销售数据统计的例子,疏导开发者上手 Connector 的数据捕捉、数据写入等性能。

实现本次试验后,您将把握的常识有:

  • 应用 Flink 实时计算平台创立并提交作业的办法;
  • 编写基于 Flink Table API SQL 语句的能力;
  • 应用 MySQL Connector 对数据库进行读写的办法。

Flink MySQL Connector 简介

MySQL Connector 能够将本地或近程的 MySQL 数据库连贯到 Flink 中,并不便地应用 Flink Table API 与之交互、捕捉数据变更、并将处理结果写回数据库。本试验次要介绍如何在阿里云实时计算平台上应用 Flink MySQL 连接器的相干性能,并应用 Table API 编写一个简略的例子,尝试 MySQL 作为源表、维表、汇表的不同性能。

步骤一:创立资源

开始试验之前,您须要开明阿里云实时计算 Flink 版,创立试验资源。

步骤二:创立数据库表

在这个例子中,咱们将创立三张数据表,别离作为源表、维表、汇表,演示 MySQL Connector 的不同性能。

  1. 进入云数据库 RDS 后盾,并登录刚刚创立资源的后盾页面。
  2. 点击左侧边栏的 + 加号按钮,创立一个测试用数据库,而后在右侧命令区输出以下建表指令并执行:
-- Source Table;
CREATE TABLE `source_table` (
  `id` int unsigned NOT NULL AUTO_INCREMENT,
  `good_id` int DEFAULT NULL,
  `amount` int DEFAULT NULL,
  `record_time` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`id`)
);

-- Dimension Table;
CREATE TABLE `dimension_table` (
  `good_id` int unsigned NOT NULL,
  `good_name` varchar(256) DEFAULT NULL,
  `good_price` int DEFAULT NULL,
  PRIMARY KEY (`good_id`)
);

-- Sink Table;
CREATE TABLE `sink_table` (
  `record_timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `good_name` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
  `sell_amount` int DEFAULT NULL,
  PRIMARY KEY (`record_timestamp`)
);

步骤三:创立 Flink 作业

  1. 进入实时计算 Flink 平台,点击左侧边栏中的「利用」—「作业开发」菜单,并点击顶部工具栏的「新建」按钮新建一个作业。作业名字任意,类型抉择「流作业 / SQL」,其余设置放弃默认。如下所示:
  1. 胜利创立作业后,右侧编辑窗格应该显示新作业的内容:
  1. 接下来,咱们在右侧编辑窗格中输出以下语句来创立一张长期表,并应用 MySQL CDC 连接器实时捕捉 source_table 的变动:
CREATE TEMPORARY TABLE source_table (
    id INT NOT NULL PRIMARY KEY NOT ENFORCED,
    record_time TIMESTAMP_LTZ(3),
    good_id INT,
    amount INT,
    WATERMARK FOR record_time AS record_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '******************.mysql.rds.aliyuncs.com',
    'port' = '3306',
    'username' = '***********',
    'password' = '***********',
    'database-name' = '***********',
    'table-name' = 'source_table'
);

须要将 hostname 参数替换为早些时候创立资源的域名、将 usernamepassword 参数替换为数据库登录用户名及明码、将 database-name 参数替换为之前在 RDS 后盾中创立的数据库名称。

其中,'connector' = 'mysql-cdc' 指定了应用 MySQL CDC 连接器来捕捉变动数据。您须要应用筹备步骤中申请的 RDS MySQL URL、用户名、明码,以及之前创立的数据库名替换对应局部。

任何时候您都能够点击顶部工具栏中的「验证」按钮,来确认作业 Flink SQL 语句中是否存在语法错误。

  1. 为了测试是否胜利地捕捉了源表数据,紧接着在上面写一行 SELECT * FROM source_table; 语句,并点击工具栏中的「执行」按钮。接着,向 source_table 表中插入一些数据。如果控制台中打印了相应的数据行,则阐明捕捉胜利,如下图所示:
  2. 接下来,咱们心愿对原始数据依照工夫窗口进行分组计算。咱们应用 TUMBLE 相干窗口函数联合 GROUP BY,将长度 15 秒内的订单数据依照商品 ID 进行归类,并应用 SUM 计算其销售总额。咱们在 Flink 作业编辑窗格中输出如下代码:
SELECT 
  good_id, 
  tumble_start(record_time, interval '15' seconds) AS record_timestamp, 
  sum(amount) AS total_amount 
FROM 
  source_table 
GROUP BY 
  tumble (record_time, interval '15' seconds), 
  good_id;

为了测试这一成果,须要向数据库中插入多条数据。你能够在 RDS 中执行附件中的「 示例数据.sql」来插入数据,或者应用「 示例数据生成.py」脚本实时地插入数据。

在保障源表中有数据的状况下,再次执行 Flink 作业,察看控制台的输入后果:

  1. 在这个业务场景中,购买商品信息应用 good_id 记录,而商品 ID 到可读商品名字的映射表、每件商品的价格等信息则存储在另一张维度表(Dimension Table)中。咱们同样能够应用 Flink SQL 连贯维度表,只需在 Flink 作业中编写上面的语句:
CREATE TEMPORARY TABLE dimension_table (
    good_id INT NOT NULL PRIMARY KEY NOT ENFORCED,
    good_name VARCHAR(256),
    good_price INT
) WITH (
    'connector' = 'mysql',
    'hostname' = '******************.mysql.rds.aliyuncs.com',
    'port' = '3306',
    'username' = '***********',
    'password' = '***********',
    'database-name' = '***********',
    'table-name' = 'dimension_table'
);

这里,咱们心愿依据上一步中统计出的「每 15 秒商品销售量」信息,计算出每件商品的销售额。因为商品名称及商品价格数据存储在另一张维度表 dimension_table 中,咱们须要将后果视图和 dimension_table 进行 JOIN 操作,并将「商品销售量」、「商品价格」相乘计算出「商品销售额」,并提取后果中的商品可读名称信息作为后果表。

须要确保 dimension_table 中存在对应商品 ID 的条目。你能够在 RDS 中执行附件中的「 示例数据.sql」来插入数据。

作业代码如下:

SELECT 
  record_timestamp, 
  good_name, 
  total_amount * good_price AS revenue 
FROM 
  (
    SELECT 
      good_id, 
      tumble_start(record_time, interval '15' seconds) AS record_timestamp, 
      sum(amount) AS total_amount 
    FROM 
      source_table 
    GROUP BY 
      tumble (record_time, interval '15' seconds), 
      good_id
  ) AS tumbled_table 
  LEFT JOIN dimension_table ON tumbled_table.good_id = dimension_table.good_id;

其中第 7 到 20 行和上一步骤的 SQL 语句统一。

执行下面的语句,并察看控制台中的统计数据:

  1. 最初,咱们将这些实时的统计数据写回数据库,Flink SQL 也能够简略地实现这一点。首先咱们须要创立一张用于连贯汇表的 Flink 长期表,如下所示:
CREATE TEMPORARY TABLE sink_table (record_timestamp TIMESTAMP(3) NOT NULL PRIMARY KEY NOT ENFORCED,
    good_name VARCHAR(128),
    sell_amount INT
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://******************.mysql.rds.aliyuncs.com:3306/***********',
  'table-name' = 'sink_table',
  'username' = '***********',
  'password' = '***********',
  'scan.auto-commit' = 'true'
);

而后,只须要将下面的 SELECT 语句的输入后果 INSERT 到该表就能够了:

INSERT INTO sink_table 
SELECT 
  record_timestamp, 
  -- ... 和下面的语句一样 

当初,点击管制台上的「上线」按钮,即可将咱们编写的 Flink SQL 作业部署上线执行。您能够应用数据库客户端等软件察看汇表中是否写入了正确的数据。

阿里云实时计算控制台在应用「执行」性能调试时,不会写入任何数据到上游中。因而为了测试应用 SQL Connector 写入汇表,您必须应用「上线」性能。

您也能够进入 Flink UI 控制台察看流数据处理图。在这个简略的示例中,首先进行的是源表数据的捕捉与窗口聚合;接着和维度表进行 JOIN 操作失去运算后果;最初将解决数据存入汇表。

想要理解更多商品销售额实时统计的试验信息吗?快来尝试一下吧!

<p style=”text-align:center”><font color=FF6a00 size=4> 入营立享权利 </font>
</p>


更多内容

<p style=”text-align:center”><img src=”https://ucc.alicdn.com/gfbp4bwpctdbo_20230518_4116f0d345704b99953e01018a7f526d.png” alt=”img” style=”zoom:100%;” /></p>


流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
0 元试用 实时计算 Flink 版(5000CU* 小时,3 个月内)
理解流动详情:https://click.aliyun.com/m/1000372333/

正文完
 0