使用DataWorks来调度AnalyticDB任务

DataWorks作为阿里云上广受欢迎的大数据开发调度服务,最近加入了对于AnalyticDB的支持,意味着所有的AnalyticDB客户可以获得任务开发、任务依赖关系管理、任务调度、任务运维等等全方位强大的能力,现在就给大家仔细介绍下如何使用DataWorks来调度AnalyticDB任务。 开通AnalyticDB进入阿里云分析型数据库 MySQL版产品详情页,点击免费试用。最近上线了15天免费试用活动,需要首先填写申请表单,审批通过后即可享受免费试用AnalyticDB活动。进入购买页面,选择好地域、可用区、ECU类型、ECU数量和数据库名,点击立即购买,稍等几分钟时间就可以开通AnalyticDB实例。 开通DataWorks开通完AnalyticDB服务后,紧接着要开通DataWorks。选择好region后点击下一步。 填写工作空间名称,注意模式要改成“标准模式”,创建工作空间。 AnalyticDB中表和数据准备为了演示如何在DataWorks上调度AnalyticDB的任务,我们后面会用到一些测试数据,这里我们用著名的TPCH的测试数据集中的ORDERS表, 数据已经提前存入表中。前面开通成功后,我们就可以在AnalyticDB中找到数据库,登陆数据库后,创建ORDERS表,如下: CREATE TABLE ads_dla_test.orders ( o_orderkey int COMMENT '', o_custkey int COMMENT '', o_orderstatus varchar COMMENT '', o_totalprice double COMMENT '', o_orderdate date COMMENT '', o_orderpriority varchar COMMENT '', o_clerk varchar COMMENT '', o_shippriority int COMMENT '', o_comment varchar COMMENT '', PRIMARY KEY (O_ORDERKEY,O_CUSTKEY))PARTITION BY HASH KEY (O_ORDERKEY) PARTITION NUM 32TABLEGROUP tpch_50x_groupOPTIONS (UPDATETYPE='realtime')COMMENT ''CREATE TABLE ads_dla_test.finished_orders ( o_orderkey int COMMENT '', o_totalprice double COMMENT '', PRIMARY KEY (O_ORDERKEY))PARTITION BY HASH KEY (O_ORDERKEY) PARTITION NUM 32TABLEGROUP tpch_50x_groupOPTIONS (UPDATETYPE='realtime')COMMENT ''CREATE TABLE ads_dla_test.high_value_finished_orders ( o_orderkey int COMMENT '', o_totalprice double COMMENT '', PRIMARY KEY (O_ORDERKEY))PARTITION BY HASH KEY (O_ORDERKEY) PARTITION NUM 32TABLEGROUP tpch_50x_groupOPTIONS (UPDATETYPE='realtime')COMMENT ''任务调度其中一个重要的功能是任务之间的依赖,为了演示这个功能,我们这里会在DataWorks里面创建两个AnalyticDB任务, 我们的表、任务之间的关系如下图: ...

April 22, 2019 · 1 min · jiezi

利用blink CEP实现流计算中的超时统计问题

案例与解决方案汇总页:阿里云实时计算产品案例&解决方案汇总一. 背景介绍如<利用blink+MQ实现流计算中的延时统计问题>一文中所描述的场景,我们将其简化为以下案例:实时流的数据源结构如下:物流订单号支付时间仓接单时间仓出库时间LP12018-08-01 08:00 LP12018-08-01 08:002018-08-01 09:00 LP22018-08-01 09:10 LP22018-08-01 09:102018-08-01 09:50 LP22018-08-01 09:102018-08-01 09:502018-08-01 12:00我们期望通过以上数据源,按照支付日期统计,每个仓库的仓接单量、仓出库量、仓接单超2H未出库单量、仓接单超6H未出库单量。可以看出,其中LP1仓接单时间是2018-08-01 09:00,但一直到2018-08-01 12:00点之前,一直都没有出库,LP1满足仓接单超2H未出库的行为。该场景的难点就在于:订单未出库。而对于TT中的源消息流,订单未出库,TT就不会下发新的消息,不下发新的消息,blink就无法被触发计算。而针对上述的场景,对于LP1,我们需要在仓接单时间是2018-08-01 09:00+2H,也就是2018-08-01 11:00的之后,就要知道LP1已经仓接单但超2H未出库了。二. 解决方案本文主要是利用blink CEP来实现上述场景,具体实现步骤如下所述。第一步:在source DDL中定义event_timestamp,并定义sink,如下:—-定义sourcecreate table sourcett_dwd_ri( lg_order_code varchar comment ‘物流订单号’ ,ded_pay_time varchar comment ‘支付时间’ ,store_code varchar comment ‘仓库编码’ ,store_name varchar comment ‘仓库名称’ ,wms_create_time varchar comment ‘仓接单时间’ ,wms_consign_create_time varchar comment ‘仓出库时间’ ,evtstamp as case when coalesce(wms_create_time, ‘’) <> ’’ then to_timestamp(wms_create_time, ‘yyyy-MM-dd HH:mm:ss’) else to_timestamp(‘1970-01-01 00:00:00’, ‘yyyy-MM-dd HH:mm:ss’) end –构造event_timestamp,如果源表本身带有消息的occur_time,可直接选择occur_time作为event_timestamp ,WATERMARK FOR evtstamp AS withOffset(evtstamp, 10000) –设置延迟10秒处理)with( type=‘tt’ ,topic=‘dwd_ri’ ,accessKey=‘xxxxxx’ ,accessId=‘xxxxxx’ ,lengthCheck=‘PAD’ ,nullValues=’\N|’);—-定义sinkcreate table sink_hybrid_blink_cep( ded_pay_date varchar comment ‘支付日期’ ,store_code varchar comment ‘仓库编码’ ,store_name varchar comment ‘仓库名称’ ,wms_create_ord_cnt bigint comment ‘仓接单量’ ,wms_confirm_ord_cnt bigint comment ‘仓出库量’ ,wmsin_nowmsout_2h_ord_cnt bigint comment ‘仓接单超2小时未出库单量’ ,wmsin_nowmsout_6h_ord_cnt bigint comment ‘仓接单超6小时未出库单量’ ,sub_partition bigint comment ‘二级分区(支付日期)’ ,PRIMARY KEY (ded_pay_date, store_code, sub_partition))with( type=‘PetaData’ ,url = ‘xxxxxx’ ,tableName=‘blink_cep’ ,userName=‘xxxxxx’ ,password=‘xxxxxx’ ,bufferSize=‘30000’ ,batchSize=‘3000’ ,batchWriteTimeoutMs=‘15000’);第二步:根据blink CEP的标准语义进行改写,如下:create view blink_cep_v1asselect ‘仓接单-仓出库超时’ as timeout_type ,lg_order_code ,wms_create_time as start_time ,wms_consign_create_time as end_timefrom source_dwd_csn_whc_lgt_fl_ord_riMATCH_RECOGNIZE( PARTITION BY lg_order_code ORDER BY evtstamp MEASURES e1.wms_create_time as wms_create_time ,e2.wms_consign_create_time as wms_consign_create_time ONE ROW PER MATCH WITH TIMEOUT ROWS –重要,必须设置延迟也下发 AFTER MATCH SKIP TO NEXT ROW PATTERN (e1 -> e2) WITHIN INTERVAL ‘6’ HOUR EMIT TIMEOUT (INTERVAL ‘2’ HOUR, INTERVAL ‘6’ HOUR) DEFINE e1 as e1.wms_create_time is not null and e1.wms_consign_create_time is null ,e2 as e2.wms_create_time is not null and e2.wms_consign_create_time is not null)where wms_create_time is not null –重要,可以大大减少进入CEP的消息量and wms_consign_create_time is null –重要,可以大大减少进入CEP的消息量;第三步:根据blink的执行机制,我们通过源实时流sourcett_dwd_ri与超时消息流blink_cep_v1关联,来触发blink对超时消息进行聚合操作,如下:create view blink_cep_v2asselect a.lg_order_code as lg_order_code ,last_value(a.store_code ) as store_code ,last_value(a.store_name ) as store_name ,last_value(a.ded_pay_time ) as ded_pay_time ,last_value(a.wms_create_time ) as wms_create_time ,last_value(a.real_wms_confirm_time ) as real_wms_confirm_time ,last_value(case when coalesce(a.wms_create_time, ‘’) <> ’’ and coalesce(a.real_wms_confirm_time, ‘’) = ’’ and now() - unix_timestamp(a.wms_create_time,‘yyyy-MM-dd HH:mm:ss’) >= 7200 then ‘Y’ else ‘N’ end) as flag_01 ,last_value(case when coalesce(a.wms_create_time, ‘’) <> ’’ and coalesce(a.real_wms_confirm_time, ‘’) = ’’ and now() - unix_timestamp(a.wms_create_time,‘yyyy-MM-dd HH:mm:ss’) >= 21600 then ‘Y’ else ‘N’ end) as flag_02from (select lg_order_code as lg_order_code ,last_value(store_code ) as store_code ,last_value(store_name ) as store_name ,last_value(ded_pay_time ) as ded_pay_time ,last_value(wms_create_time ) as wms_create_time ,last_value(wms_consign_create_time) as real_wms_confirm_time from sourcett_dwd_ri group by lg_order_code ) aleft outer join (select lg_order_code ,count(*) as cnt from blink_cep_v1 group by lg_order_code ) bon a.lg_order_code = b.lg_order_codegroup by a.lg_order_code;insert into sink_hybrid_blink_cepselect regexp_replace(substring(a.ded_pay_time, 1, 10), ‘-’, ‘’) as ded_pay_date ,a.store_code ,max(a.store_name) as store_name ,count(case when coalesce(a.wms_create_time, ‘’) <> ’’ then a.lg_order_code end) as wmsin_ord_cnt ,count(case when coalesce(a.real_wms_confirm_time, ‘’) <> ’’ then a.lg_order_code end) as wmsout_ord_cnt ,count(case when a.flag_01 = ‘Y’ then a.lg_order_code end) as wmsin_nowmsout_2h_ord_cnt ,count(case when a.flag_02 = ‘Y’ then a.lg_order_code end) as wmsin_nowmsout_6h_ord_cnt ,cast(regexp_replace(SUBSTRING(ded_pay_time, 1, 10), ‘-’, ‘’) as bigint) as sub_partitionfrom blink_cep_v2 as t1where coalesce(lg_cancel_time, ‘’) = ‘‘and coalesce(ded_pay_time, ‘’) <> ‘‘group by regexp_replace(substring(ded_pay_time, 1, 10), ‘-’, ‘’) ,a.store_code;三. 问题拓展blink CEP的参数比较多,要完全看懂,着实需要一些时间,但CEP的强大是毋庸置疑的。CEP不仅可以解决物流场景中的超时统计问题,风控中的很多场景也是信手拈来。这里有一个风控中的场景,通过上述物流案例的用法,我们是否能推敲出这个场景的用法呢?风控案例测试数据如下:刷卡时间银行卡ID刷卡地点2018-04-13 12:00:001WW2018-04-13 12:05:001WW12018-04-13 12:10:001WW22018-04-13 12:20:001WW我们认为,当一张银行卡在10min之内,在不同的地点被刷卡大于等于两次,我们就期望对消费者出发预警机制。blink CEP是万能的么?答案是否定的,当消息乱序程度比较高的时候,实时性和准确性就成了一对矛盾的存在。要想实时性比较高,必然要求设置的offset越小越好,但offset设置比较小,就直接可能导致很多eventtime<watermark-offset的消息,直接被丢弃,准确性很难保证。比如,在CP回传物流详情的时候,经常回传的时间跟实操的时间差异很大(实操时间是10点,但回传时间是15点),如果以实操时间作为eventtime,可能就会导致这种差异很大的消息被直接丢掉,无法进入CEP,进而无法触发CEP后续的计算,在使用CEP的过程中,应该注意这一点。四. 作者简介花名:缘桥,来自菜鸟-CTO-数据部-仓配数据研发,主要负责菜鸟仓配业务的离线和实时数据仓库建设以及创新数据技术和工具的探索和应用。本文作者:付空阅读原文本文为云栖社区原创内容,未经允许不得转载。 ...

March 6, 2019 · 3 min · jiezi

利用blink+MQ实现流计算中的超时统计问题

案例与解决方案汇总页:阿里云实时计算产品案例&解决方案汇总一. 背景介绍菜鸟的物流数据本身就有链路复杂、实操节点多、汇总维度多、考核逻辑复杂的特点,对于实时数据的计算存在很大挑战。经过仓配ETL团队的努力,目前仓配实时数据已覆盖了绝大多数场景,但是有这样一类特殊指标:“晚点超时指标”(例如:出库超6小时未揽收的订单量),仍存在实时汇总计算困难。原因在于:流计算是基于消息触发计算的,若没有消息到达到则无法计算,这类指标恰好是要求在指定的超时时间计算出有多少未达到的消息。然而,这类指标对于指导实操有着重要意义,可以告知运营小二当前多少订单积压在哪些作业节点,应该督促哪些实操人员加快作业,这对于物流的时效KPI达成至关重要。之前的方案是:由产品前端根据用户的请求查询OLAP数据库,由OLAP从明细表出结果。大促期间,用户请求量大,加之数据量大,故对OLAP的明细查询造成了比较大的压力。二. 解决方案2.1 问题定义“超时晚点指标” 是指,一笔订单的两个相邻的实操节点node_n-1 、node_n 的完成时间 time_n-1、time_n,当满足 : time_n is null && current_time - time_n-1 > kpi_length 时,time_flag_n 为 true , 该笔订单计入 超时晚点指标的计数。如下图,有一笔订单其 node_1 为出库节点,时间为time_1 = ‘2018-06-18 00:00:00’ ,运营对出库与揽收之间考核的时长 kpi_length = 6h, 那么当前自然时间 current_time > ‘2018-06-18 06:00:00’ 时,且node_2揽收节点的time_2 为null,则该笔订单的 timeout_flag_2 = true , “出库超6小时未揽收订单量” 加1。由于要求time_2 为null,即要求没有揽收消息下发的情况下让流计算做汇总值更新,这违背了流计算基于消息触发的基本原理,故流计算无法直接算出这种“超时晚点指标”。决问题的基本思路是:在考核时刻(即 kpi_time = time_n-1+kpi_length )“制造”出一条消息下发给流计算,触发汇总计算。继续上面的例子:在考核时刻“2018-06-18 06:00:00”利用MetaQ定时消息功能“制造”出一条消息下发给流计算汇总任务,触发对该笔订单的 time_out_flag_2 的判断,增加汇总计数。同时,还利用 Blink 的Retraction 机制,当time_2 由null变成有值的时候,Blink 可以对 time_out_flag_2 更新,重新计数。2.2 方案架构如上图所示:Step1: Blink job1 接收来自上游系统的订单数据,做清洗加工,生成订单明细表:dwd_ord_ri,利用TT下发给Blink job2 和 Blink job3。Step2:Blink job2 收到 dwd_ord_ri后,对每笔订单算出考核时刻 kpi_time = time_n-1+kpi_length,作为MetaQ消息的“TIMER_DELIVER_MS” 属性,写入MetaQ。MetaQ的定时消息功能,可以根据用户写入的TIMER_DELIVER_MS 在指定时刻下发给消费者,即上图中的Blink job3。Step3:Blink job3 接收 TT、MetaQ 两个消息源,先做Join,再对time_flag判断,最后做Aggregate计算。同一笔订单,dwd_ord_ri、timing_msg任意一个消息到来,都会触发join,time_flag判断,aggregate重新计算一遍,Blink的Retraction可对结果进行实时更新。2.3 实现细节本方案根据物流场景中多种实操节点、多种考核时长的特点,从Blink SQL代码 和 自定义Sink两方面做了特殊设计,从而实现了灵活配置、高效开发。(1) Blink job2 — 生成定时消息关键Blink SQL 代码如下。约定每条record的第一个字段为投递时间列表,即MetaQ向消费者下发消息的时刻List,也就是上面所说的多个考核时刻。第二个字段为保序字段,比如在物流场景中经常以订单code、运单号作为保序主键。该代码实现了对每个出库的物流订单,根据其出库时间,向后延迟6小时(21600000毫秒)、12小时(43200000毫秒)、24小时(86400000毫秒)由MetaQ向消费者下发三个定时消息。create table metaq_timing_msg(deliver_time_list varchar comment ‘投递时间列表’, – 约定第一个字段为投递时间listlg_code varchar comment ‘物流订单code’, – 约定第二字段为保序主键node_name varchar comment ‘节点名称’,node_time varchar comment ‘节点时间’,)WITH(type = ‘custom’,class = ‘com.alibaba.xxx.xxx.udf.MetaQTimingMsgSink’,tag = ‘store’,topic = ‘blink_metaq_delay_msg_test’,producergroup = ‘blinktest’,retrytimes = ‘5’,sleeptime = ‘1000’);insert into metaq_timing_msgselectconcat_ws(’,’,cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 21600000) as varchar), –6小时cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 43200000) as varchar), –12小时cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 86400000) as varchar) –24小时) as deliver_time_list,lg_code,‘wms’ as node_name,store_out_time as node_timefrom(selectlg_code,FIRST_VALUE(store_out_time) as store_out_timefrom srctablegroup by lg_code)bwhere store_out_time is not null ;(2) Blink 自定义Sink — MetaQTimingMsg SinkBlink的当前版本还不支持 MetaQ的定时消息功能的Sink,故利用 Blink的自定义Sink功能,并结合菜鸟物流数据的特点开发了MetaQTimingMsg Sink。关键代码如下(实现 writeAddRecord 方法)。@Overridepublic void writeAddRecord(Row row) throws IOException {Object deliverTime = row.getField(0);String[] deliverTimeList = deliverTime.toString().split(",");for(String dTime:deliverTimeList){ String orderCode = row.getField(1).toString(); String key = orderCode + “_” + dTime; Message message = newMessage(row, dTime, key); boolean result = sendMessage(message,orderCode); if(!result){ LOG.error(orderCode + " : " + dTime + " send failed"); } }}private Message newMessage(Row row,String deliverMillisec,String key){ //Support Varbinary Type Insert Into MetaQ Message message = new Message(); message.setKeys(key); message.putUserProperty(“TIMER_DELIVER_MS”,deliverMillisec); int arity = row.getArity(); Object[] values = new Object[arity]; for(int i=0;i<arity;i++){ values[i]=row.getField(i); } String lineStr=org.apache.commons.lang3.StringUtils.join(values, FIELD_DELIMITER); try { byte[] bytes = lineStr.getBytes(ENCODING); message.setBody(bytes); message.setWaitStoreMsgOK(true); } catch (UnsupportedEncodingException e) { LOG.error(“create new message error”,e); } return message;}private boolean sendMessage(Message message,String orderCode){ long retryTime = 0; boolean isSendSuccess = true; if(message != null){ message.setTopic(topicName); message.setTags(tagName); } SendResult result = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { …. // 针对物流订单code的hash算法 return list.get(index.intValue()); } },orderCode); if(!result.getSendStatus().equals(SendStatus.SEND_OK)){ LOG.error("" + orderCode +" write to metaq result is " + result.getSendStatus().toString()); isSendSuccess = false; } return isSendSuccess;}}(3)Blink job3 — 汇总计算关键Blink SQL 代码如下,统计了每个仓库的“出库超6小时未揽收物理订单”、“出库超12小时未揽收物理订单”、“出库超24小时未揽收物理订单”的汇总值。代码中使用了“stringLast()”函数处理来自dwd_ord_ri的每条消息,以取得每个物流订单的最新出库揽收情况,利用Blink Retraction机制,更新汇总值。create view dws_store_view as select t1.store_code, max(t1.store_name) as store_name, count(case when length(trim(t1.store_out_time)) = 19 and t1.tms_collect_time is null and NOW()-UNIX_TIMESTAMP(t1.store_out_time,‘yyyy-MM-dd HH:mm:ss’) >= 21600 then t2.lg_code end ) as tms_not_collect_6h_ord_cnt, —出库超6小时未揽收物流订单量 count(case when length(trim(t1.store_out_time)) = 19 and t1.tms_collect_time is null and NOW()-UNIX_TIMESTAMP(t1.store_out_time,‘yyyy-MM-dd HH:mm:ss’) >= 43200 then t2.lg_code end ) as tms_not_collect_12h_ord_cnt,—出库超6小时未揽收物流订单量 count(case when length(trim(t1.store_out_time)) = 19 and t1.tms_collect_time is null and NOW()-UNIX_TIMESTAMP(t1.store_out_time,‘yyyy-MM-dd HH:mm:ss’) >= 86400 then t2.lg_code end ) as tms_not_collect_24h_ord_cnt —出库超6小时未揽收物流订单量from ( select lg_code, coalesce(store_code,’-1’) as store_code, store_name, store_out_time, tms_collect_time from ( select lg_code, max(store_code) as store_code, max(store_name) as store_name, stringLast(store_out_time) as store_out_time, stringLast(tms_collect_time)as tms_collect_time, from dwd_ord_ri group by lg_code ) a ) t1left outer join ( select lg_code, from timing_msg where node_name = ‘wms’ group by lg_code) t2on t1.lg_code = t2.lg_codegroup by t1.store_code ;三. 方案优势3.1 配置灵活我们从“Blink SQL 代码” 和“自定义MetaQ” 两个方面设计,用户可以根据具体的业务场景,在Blink SQL的一个view里就能实现多种节点多种考核时间的定时消息生成,而不是针对每一个实操节点的每一种定时指标都要写一个view,这样大大节省了代码量,提升了开发效率。例如对于仓库节点的出库超6小时未揽收、超12小时未揽收、超24小时未揽收,这三个指标利用上述方案,仅需在Blink job2的中metaq_timing_msg的第一个字段deliver_time_list中拼接三个kpi_length,即6小时、12小时、24小时为一个字符串即可,由MetaQTimingMsg Sink自动拆分成三条消息下发给MetaQ。对于不同的节点的考核,仅需在node_name,node_time填写不同的节点名称和节点实操时间即可。3.2 主键保序如2.3节所述,自定义的Sink中 实现了MetaQ的 MessageQueueSelector 接口的 select() 方法,同时在Blink SQL 生成的MetaQ消息默认第二个字段为保序主键字段。从而,可以根据用户自定义的主键,保证同一主键的所有消息放在同一个通道内处理,从而保证按主键保序,这对于流计算非常关键,能够实现数据的实时准确性。3.3 性能优良让专业的团队做专业的事。个人认为,这种大规模的消息存储、消息下发的任务本就应该交给“消息中间件”来处理,这样既可以做到计算与消息存储分离,也可以方便消息的管理,比如针对不同的实操节点,我们还可以定义不同的MetaQ的tag。另外,正如2.2节所述,我们对定时消息量做了优化。考虑到一笔订单的属性字段或其他节点更新会下发多条消息,我们利用了Blink的FIRST_VALUE函数,在Blink job2中同一笔订单的的一种考核指标只下发一条定时消息,大大减少了消息量,减轻了Blink的写压力,和MetaQ的存储。四. 自我介绍马汶园 阿里巴巴 -菜鸟网络—数据部 数据工程师菜鸟仓配实时研发核心成员,主导多次仓配大促实时数据研发,对利用Blink的原理与特性解决物流场景问题有深入思考与理解。本文作者:付空阅读原文本文为云栖社区原创内容,未经允许不得转载。 ...

March 6, 2019 · 3 min · jiezi