利用blink CEP实现流计算中的超时统计问题

53次阅读

共计 5189 个字符,预计需要花费 13 分钟才能阅读完成。

案例与解决方案汇总页:阿里云实时计算产品案例 & 解决方案汇总

一. 背景介绍
如 < 利用 blink+MQ 实现流计算中的延时统计问题 > 一文中所描述的场景,我们将其简化为以下案例:实时流的数据源结构如下:

物流订单号
支付时间
仓接单时间
仓出库时间

LP1
2018-08-01 08:00

LP1
2018-08-01 08:00
2018-08-01 09:00

LP2
2018-08-01 09:10

LP2
2018-08-01 09:10
2018-08-01 09:50

LP2
2018-08-01 09:10
2018-08-01 09:50
​2018-08-01 12:00

我们期望通过以上数据源,按照支付日期统计,每个仓库的仓接单量、仓出库量、仓接单超 2H 未出库单量、仓接单超 6H 未出库单量。可以看出,其中 LP1 仓接单时间是 2018-08-01 09:00,但一直到 2018-08-01 12:00 点之前,一直都没有出库,LP1 满足仓接单超 2H 未出库的行为。
该场景的难点就在于:订单未出库。而对于 TT 中的源消息流,订单未出库,TT 就不会下发新的消息,不下发新的消息,blink 就无法被触发计算。而针对上述的场景,对于 LP1,我们需要在仓接单时间是 2018-08-01 09:00+2H,也就是 2018-08-01 11:00 的之后,就要知道 LP1 已经仓接单但超 2H 未出库了。
二. 解决方案
本文主要是利用 blink CEP 来实现上述场景,具体实现步骤如下所述。第一步:在 source DDL 中定义 event_timestamp,并定义 sink,如下:
—- 定义 source
create table sourcett_dwd_ri
(
lg_order_code varchar comment ‘ 物流订单号 ’
,ded_pay_time varchar comment ‘ 支付时间 ’
,store_code varchar comment ‘ 仓库编码 ’
,store_name varchar comment ‘ 仓库名称 ’
,wms_create_time varchar comment ‘ 仓接单时间 ’
,wms_consign_create_time varchar comment ‘ 仓出库时间 ’
,evtstamp as case when coalesce(wms_create_time, ”) <> ”
then to_timestamp(wms_create_time, ‘yyyy-MM-dd HH:mm:ss’)
else to_timestamp(‘1970-01-01 00:00:00’, ‘yyyy-MM-dd HH:mm:ss’)
end – 构造 event_timestamp,如果源表本身带有消息的 occur_time, 可直接选择 occur_time 作为 event_timestamp
,WATERMARK FOR evtstamp AS withOffset(evtstamp, 10000) – 设置延迟 10 秒处理
)
with
(
type=’tt’
,topic=’dwd_ri’
,accessKey=’xxxxxx’
,accessId=’xxxxxx’
,lengthCheck=’PAD’
,nullValues=’\\N|’
);

—- 定义 sink
create table sink_hybrid_blink_cep
(
ded_pay_date varchar comment ‘ 支付日期 ’
,store_code varchar comment ‘ 仓库编码 ’
,store_name varchar comment ‘ 仓库名称 ’
,wms_create_ord_cnt bigint comment ‘ 仓接单量 ’
,wms_confirm_ord_cnt bigint comment ‘ 仓出库量 ’
,wmsin_nowmsout_2h_ord_cnt bigint comment ‘ 仓接单超 2 小时未出库单量 ’
,wmsin_nowmsout_6h_ord_cnt bigint comment ‘ 仓接单超 6 小时未出库单量 ’
,sub_partition bigint comment ‘ 二级分区(支付日期 )’
,PRIMARY KEY (ded_pay_date, store_code, sub_partition)
)
with
(
type=’PetaData’
,url = ‘xxxxxx’
,tableName=’blink_cep’
,userName=’xxxxxx’
,password=’xxxxxx’
,bufferSize=’30000′
,batchSize=’3000′
,batchWriteTimeoutMs=’15000′
);

第二步:根据 blink CEP 的标准语义进行改写,如下:
create view blink_cep_v1
as
select ‘ 仓接单 - 仓出库超时 ’ as timeout_type
,lg_order_code
,wms_create_time as start_time
,wms_consign_create_time as end_time
from source_dwd_csn_whc_lgt_fl_ord_ri
MATCH_RECOGNIZE
(
PARTITION BY lg_order_code
ORDER BY evtstamp
MEASURES
e1.wms_create_time as wms_create_time
,e2.wms_consign_create_time as wms_consign_create_time
ONE ROW PER MATCH WITH TIMEOUT ROWS – 重要,必须设置延迟也下发
AFTER MATCH SKIP TO NEXT ROW
PATTERN (e1 -> e2) WITHIN INTERVAL ‘6’ HOUR
EMIT TIMEOUT (INTERVAL ‘2’ HOUR, INTERVAL ‘6’ HOUR)
DEFINE
e1 as e1.wms_create_time is not null and e1.wms_consign_create_time is null
,e2 as e2.wms_create_time is not null and e2.wms_consign_create_time is not null
)
where wms_create_time is not null – 重要,可以大大减少进入 CEP 的消息量
and wms_consign_create_time is null – 重要,可以大大减少进入 CEP 的消息量
;
第三步:根据 blink 的执行机制,我们通过源实时流 sourcett_dwd_ri 与超时消息流 blink_cep_v1 关联,来触发 blink 对超时消息进行聚合操作,如下:
create view blink_cep_v2
as
select a.lg_order_code as lg_order_code
,last_value(a.store_code) as store_code
,last_value(a.store_name) as store_name
,last_value(a.ded_pay_time) as ded_pay_time
,last_value(a.wms_create_time) as wms_create_time
,last_value(a.real_wms_confirm_time) as real_wms_confirm_time
,last_value(case when coalesce(a.wms_create_time, ”) <> ”
and coalesce(a.real_wms_confirm_time, ”) = ”
and now() – unix_timestamp(a.wms_create_time,’yyyy-MM-dd HH:mm:ss’) >= 7200
then ‘Y’ else ‘N’ end) as flag_01
,last_value(case when coalesce(a.wms_create_time, ”) <> ”
and coalesce(a.real_wms_confirm_time, ”) = ”
and now() – unix_timestamp(a.wms_create_time,’yyyy-MM-dd HH:mm:ss’) >= 21600
then ‘Y’ else ‘N’ end) as flag_02
from
(select lg_order_code as lg_order_code
,last_value(store_code) as store_code
,last_value(store_name) as store_name
,last_value(ded_pay_time) as ded_pay_time
,last_value(wms_create_time) as wms_create_time
,last_value(wms_consign_create_time) as real_wms_confirm_time
from sourcett_dwd_ri
group by lg_order_code
) a
left outer join
(select lg_order_code
,count(*) as cnt
from blink_cep_v1
group by lg_order_code
) b
on a.lg_order_code = b.lg_order_code
group by a.lg_order_code
;

insert into sink_hybrid_blink_cep
select regexp_replace(substring(a.ded_pay_time, 1, 10), ‘-‘, ”) as ded_pay_date
,a.store_code
,max(a.store_name) as store_name
,count(case when coalesce(a.wms_create_time, ”) <> ” then a.lg_order_code end) as wmsin_ord_cnt
,count(case when coalesce(a.real_wms_confirm_time, ”) <> ” then a.lg_order_code end) as wmsout_ord_cnt
,count(case when a.flag_01 = ‘Y’ then a.lg_order_code end) as wmsin_nowmsout_2h_ord_cnt
,count(case when a.flag_02 = ‘Y’ then a.lg_order_code end) as wmsin_nowmsout_6h_ord_cnt
,cast(regexp_replace(SUBSTRING(ded_pay_time, 1, 10), ‘-‘, ”) as bigint) as sub_partition
from blink_cep_v2 as t1
where coalesce(lg_cancel_time, ”) = ”
and coalesce(ded_pay_time, ”) <> ”
group by regexp_replace(substring(ded_pay_time, 1, 10), ‘-‘, ”)
,a.store_code
;
三. 问题拓展
blink CEP 的参数比较多,要完全看懂,着实需要一些时间,但 CEP 的强大是毋庸置疑的。CEP 不仅可以解决物流场景中的超时统计问题,风控中的很多场景也是信手拈来。这里有一个风控中的场景,通过上述物流案例的用法,我们是否能推敲出这个场景的用法呢?风控案例测试数据如下:

刷卡时间
银行卡 ID
刷卡地点

2018-04-13 12:00:00
1
WW

2018-04-13 12:05:00
1
WW1

2018-04-13 12:10:00
1
WW2

2018-04-13 12:20:00
1
WW

我们认为,当一张银行卡在 10min 之内,在不同的地点被刷卡大于等于两次,我们就期望对消费者出发预警机制。
blink CEP 是万能的么?答案是否定的,当消息乱序程度比较高的时候,实时性和准确性就成了一对矛盾的存在。要想实时性比较高,必然要求设置的 offset 越小越好,但 offset 设置比较小,就直接可能导致很多 eventtime<watermark-offset 的消息,直接被丢弃,准确性很难保证。比如,在 CP 回传物流详情的时候,经常回传的时间跟实操的时间差异很大(实操时间是 10 点,但回传时间是 15 点),如果以实操时间作为 eventtime,可能就会导致这种差异很大的消息被直接丢掉,无法进入 CEP,进而无法触发 CEP 后续的计算,在使用 CEP 的过程中,应该注意这一点。
四. 作者简介
花名:缘桥,来自菜鸟 -CTO- 数据部 - 仓配数据研发,主要负责菜鸟仓配业务的离线和实时数据仓库建设以及创新数据技术和工具的探索和应用。

本文作者:付空阅读原文
本文为云栖社区原创内容,未经允许不得转载。

正文完
 0