背景
交易数据的实时统计是电商网站一个核心功能,可以帮助用户实时统计网站的整体销售情况,快速验证“新销售策略”的效果。我们今天介绍一个基于表格存储 (Tablestore) 实现交易数据的实时计算,给大家提供一个新使用方式。
Tablestore 作为在线的结构化数据库,提供了毫秒级的访问延时和丰富的查询方式,能高效的支撑交易数据的存储和查询,同时 Tablestore 已经原生支持阿里云的流计算框架 Flink/Blink,可以实现数据的实时计算。
架构
示例设计
基本场景
注意:示例是模拟一个电商网站的交易数据的存储和实时计算,目的是为了展示 Tablestore + Blink 的使用流程。
用户通过 SDK 将网站交易数据实时的存储 (PutRow/BatchWrite/TableStoreWriter) 到 Tablestore 的 source_order 表中,Tablestore 通过 Tunnel 服务,将实时增量的数据流入到 Flink/Blink 中,每 5 秒进行一次聚合计算,并将计算的结果重新写回 Tablestore 的 sink_order 表中。最后提供给“大屏”实时读取 (GetRange) 展示。
Source 表(源表)- source_order
source 表是原始数据表,存储了所有交易记录。
字段 | 类型 | 注释 |
---|---|---|
metering(PrimaryKey) | string | 计量类型,样例中默认是 web |
orderid(PrimaryKey) | string | 订单号 ID |
ts | integer | 交易时间(Unix 时间戳,毫秒精度) |
price | double | 交易金额 |
buyerid | integer | 买家 ID |
sellerid | integer | 卖家 ID |
productid | integer | 商品 ID |
Sink 表(结果表)- sink_order
字段 | 类型 | 注释 |
---|---|---|
metering(PrimaryKey) | string | 计量类型,样例中默认是 web |
ts(PrimaryKey) | integer | 交易时间(Unix 时间戳,毫秒精度) |
price | double | 交易金额 |
ordercount | integer | 交易次数 |
Flink SQL
DDL 参考
注意:当前 Blink 在支持 Tablestore source 上还有些限制,只能运行 ProcessingTime 的方式,未来会支持 EventTime 模式,按照用户数据参数的时间进行计算。
-- Source 源表创建
CREATE TABLE ots_input (
metering VARCHAR,
orderid VARCHAR,
price DOUBLE,
byerid BIGINT,
sellerid BIGINT,
productid BIGINT,
primary key(metering,orderid),
ts AS PROCTIME()) WITH (
type = 'ots',
instanceName = 'ordertest',
tableName = 'source_order',
accessId = '******************',
accessKey = '******************',
endpoint = 'http://ordertest.cn-zhangjiakou.vpc.tablestore.aliyuncs.com',
tunnelName = 'blink_agg'
);
-- Sink 结果表创建
CREATE TABLE ots_output (
metering VARCHAR,
ts BIGINT,
price DOUBLE,
ordercount BIGINT,
primary key(metering,ts)
) WITH (
type = 'ots',
instanceName = 'ordertest',
tableName = 'sink_order',
accessId = '******************',
accessKey = '******************',
endpoint = 'http://ordertest.cn-zhangjiakou.vpc.tablestore.aliyuncs.com',
valueColumns = 'price,ordercount'
);
-- 计算
INSERT INTO ots_output
SELECT
DISTINCT metering as metering,
CAST(TUMBLE_START(ots_input.ts, INTERVAL '5' SECOND) AS BIGINT) AS ts,
SUM(price) as price,
COUNT(orderid) as ordercount
FROM ots_input
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND),metering;
实战
第一步:准备账户与开服
准备表格存储实例,可以参考《表格存储实例创建》
准备 Flink/Blink 项目,可以参考《Blink 如何购买》
第二步:创建资源
表格存储资源
表格存储控制台入口,创建表格存储实例 ordertest (用户自定义即可,下面对于参数位置更换为自定义的实例名),并记录实例的 VPC 地址
同时在控制台创建 Source 表和 Sink 表,并为 Source 表(source_order)开启一个 Tunnel 服务 blink_agg
Blink 资源
Blink 控制台入口,创建一个 Blink 项目(独享模式要创建集群之后才能创建项目),分别创建一个作业,agg_order,并将上面的 Flink SQL 粘贴到窗口中,上线服务
在运维窗口中启动该任务
第三步:压入数据 并 实时获取结算结果
1 准备配置文件
程序默认会从 ’~/tablestoreConf.json’ 获取配置
vim ~/tablestoreConf.json
# 内容
{
"endpoint":"http://ordertest.cn-zhangjiakou.ots.aliyuncs.com",
"accessId":"************",
"accessKey":"************",
"instanceName":"ordertest"
}
2 构建源码
mvn install
cd target
tar xzvf stream-compute-1.0-SNAPSHOT-release.tar.gz
3 启动压力器和模拟大屏
可以直接下载工具包:stream-compute-1.0-SNAPSHOT-release.tar.gz
# 窗口 1
./bin/mock_order_generator
# 窗口 2
./bin/data_show_screen
4 效果
源码
源码:https://github.com/aliyun/tablestore-examples/tree/master/demos/StreamCompute
本文作者:chen1255
阅读原文
本文为云栖社区原创内容,未经允许不得转载。