Tablestore-Blink实战交易数据的实时统计

47次阅读

共计 2761 个字符,预计需要花费 7 分钟才能阅读完成。

背景

交易数据的实时统计是电商网站一个核心功能,可以帮助用户实时统计网站的整体销售情况,快速验证“新销售策略”的效果。我们今天介绍一个基于表格存储 (Tablestore) 实现交易数据的实时计算,给大家提供一个新使用方式。

Tablestore 作为在线的结构化数据库,提供了毫秒级的访问延时和丰富的查询方式,能高效的支撑交易数据的存储和查询,同时 Tablestore 已经原生支持阿里云的流计算框架 Flink/Blink,可以实现数据的实时计算。

架构

示例设计

基本场景

注意:示例是模拟一个电商网站的交易数据的存储和实时计算,目的是为了展示 Tablestore + Blink 的使用流程。

用户通过 SDK 将网站交易数据实时的存储 (PutRow/BatchWrite/TableStoreWriter) 到 Tablestore 的 source_order 表中,Tablestore 通过 Tunnel 服务,将实时增量的数据流入到 Flink/Blink 中,每 5 秒进行一次聚合计算,并将计算的结果重新写回 Tablestore 的 sink_order 表中。最后提供给“大屏”实时读取 (GetRange) 展示。

Source 表(源表)- source_order

source 表是原始数据表,存储了所有交易记录。

字段 类型 注释
metering(PrimaryKey) string 计量类型,样例中默认是 web
orderid(PrimaryKey) string 订单号 ID
ts integer 交易时间(Unix 时间戳,毫秒精度)
price double 交易金额
buyerid integer 买家 ID
sellerid integer 卖家 ID
productid integer 商品 ID

Sink 表(结果表)- sink_order

字段 类型 注释
metering(PrimaryKey) string 计量类型,样例中默认是 web
ts(PrimaryKey) integer 交易时间(Unix 时间戳,毫秒精度)
price double 交易金额
ordercount integer 交易次数

Flink SQL

DDL 参考

注意:当前 Blink 在支持 Tablestore source 上还有些限制,只能运行 ProcessingTime 的方式,未来会支持 EventTime 模式,按照用户数据参数的时间进行计算。

-- Source 源表创建
CREATE TABLE ots_input (
    metering VARCHAR,
    orderid VARCHAR,
    price DOUBLE,
    byerid BIGINT,
    sellerid BIGINT,
    productid BIGINT,
    primary key(metering,orderid),
    ts AS PROCTIME()) WITH (
    type = 'ots',
    instanceName = 'ordertest',
    tableName = 'source_order',
    accessId = '******************',
    accessKey = '******************',
    endpoint = 'http://ordertest.cn-zhangjiakou.vpc.tablestore.aliyuncs.com',
    tunnelName = 'blink_agg'
);
-- Sink 结果表创建
CREATE TABLE ots_output (
    metering VARCHAR,
    ts BIGINT,
    price DOUBLE,
    ordercount BIGINT,
    primary key(metering,ts)
) WITH (
    type = 'ots',
    instanceName = 'ordertest',
    tableName = 'sink_order',
    accessId = '******************',
    accessKey = '******************',
    endpoint = 'http://ordertest.cn-zhangjiakou.vpc.tablestore.aliyuncs.com',
    valueColumns = 'price,ordercount'
);

-- 计算
INSERT INTO ots_output
SELECT 
    DISTINCT metering as metering,
    CAST(TUMBLE_START(ots_input.ts, INTERVAL '5' SECOND) AS BIGINT) AS ts,
    SUM(price) as price,
    COUNT(orderid) as ordercount
FROM ots_input
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND),metering;

实战

第一步:准备账户与开服

准备表格存储实例,可以参考《表格存储实例创建》

准备 Flink/Blink 项目,可以参考《Blink 如何购买》

第二步:创建资源

表格存储资源

表格存储控制台入口,创建表格存储实例 ordertest (用户自定义即可,下面对于参数位置更换为自定义的实例名),并记录实例的 VPC 地址

同时在控制台创建 Source 表和 Sink 表,并为 Source 表(source_order)开启一个 Tunnel 服务 blink_agg

Blink 资源

Blink 控制台入口,创建一个 Blink 项目(独享模式要创建集群之后才能创建项目),分别创建一个作业,agg_order,并将上面的 Flink SQL 粘贴到窗口中,上线服务

在运维窗口中启动该任务

第三步:压入数据 并 实时获取结算结果

1 准备配置文件

程序默认会从 ’~/tablestoreConf.json’ 获取配置

vim ~/tablestoreConf.json

# 内容
{
    "endpoint":"http://ordertest.cn-zhangjiakou.ots.aliyuncs.com",
    "accessId":"************",
    "accessKey":"************",
    "instanceName":"ordertest"
}

2 构建源码

mvn install
cd target
tar xzvf stream-compute-1.0-SNAPSHOT-release.tar.gz

3 启动压力器和模拟大屏

可以直接下载工具包:stream-compute-1.0-SNAPSHOT-release.tar.gz

# 窗口 1
./bin/mock_order_generator
# 窗口 2
./bin/data_show_screen

4 效果

源码

源码:https://github.com/aliyun/tablestore-examples/tree/master/demos/StreamCompute


本文作者:chen1255

阅读原文

本文为云栖社区原创内容,未经允许不得转载。

正文完
 0