简介: ods层数据同步时常常会遇到增全量合并的模型,即T-1天增量表 + T-2全量表 = T-1全量表。能够通过full outer join脚本来实现合并,然而数据量很大时十分耗费资源。本文将为您介绍在做增量数据的减少、更新时如何通过full outer join改写left anti join来实现的最佳实际。
背景
ods层数据同步时常常会遇到增全量合并的模型,即T-1天增量表 + T-2全量表 = T-1全量表
。能够通过full outer join脚本来实现合并,然而数据量很大时十分耗费资源。
insert overwrite table tb_test partition(ds='${bizdate}')select case when a.id is not null then a.id esle b.id end as id ,if(a.name is not null, a.name, b.name) as name ,coalesce(a.age, b.age) as age _--这3种写法一样,都是优先取delta表的字段_from( select * from tb_test_delta where ds='${bizdate}') afull outer join( select * from tb_test where ds='${bizdate-1}') bon a.id =b.id;
这种写法可实现新增和更新操作:
- 新增是指增量表中新呈现的数据,而全量表中没有;
- 更新是指增量表和全量表中都有的数据,但优先取增量表的数据,笼罩历史表的数据。
如下图所示,R2_1是增量表当天去重后增量数据,M3是全量表前一天的数据,而J4_2_3则是full outer join的执行图。
将J4_2_3开展会发现外面将增量和全量进行了merge join,当数据量很大(1288亿条)时会产生很大的shuffle开销。此时优化计划就是将full outer join改成 union all,从而防止join shuffle。
优化模型
论断:full outer join改成hash cluster + left join +union all能够无效地升高计算成本,且有两种利用场景。先将模型进行形象,假如有a和b两个表,a是增量表,b是全量表:
with a as ( select * from values (1,'111') ,(2,'two') ,(7,'777') as (id,name) ) _--增量_,b as ( select * from values (1,'') ,(2,'222') ,(3,'333') ,(4,'444') as (id,name) ) _--全量_
场景1:只合并新增数据到全量表
left anti join相当于not in,增量not in全量,过滤后只剩下齐全新增的id,对全量中已有的id不批改:
_--查问齐全新增的id_select * from a left anti join b on a.id=b.id ;_--后果如下_+_------------+------+_| id | name |+_------------+------+_| 7 | 777 |+_------------+------+_
_--齐全新增的合并全量表_select * from a _--增量表_left anti join b on a.id=b.id union all select * from b _--全量表__--后果如下_+_------------+------+_| id | name |+_------------+------+_| 1 | || 2 | 222 || 3 | 333 || 4 | 444 || 7 | 777 |+_------------+------+_
场景2:合并新增数据到全量表,且更新历史数据
全量not in增量,过滤后只剩下历史的id,而后union all增量,既新增也批改
_--查问历史全量数据_select * from b left anti join a on a.id=b.id;_--后果如下_+_------------+------+_| id | name |+_------------+------+_| 3 | 333 || 4 | 444 |+_------------+------+_
_--合并新增数据到全量表,且更新历史数据_select * from b _--全量表_left anti join a on a.id=b.idunion all select * from a ; _--增量表__--后果如下_+_------------+------+_| id | name |+_------------+------+_| 1 | 111 || 2 | two || 7 | 777 || 3 | 333 || 4 | 444 |+_------------+------+_
优化实际
步骤1:表属性批改
表、作业属性批改,对原来的表、作业进行属性优化,能够晋升优化成果。
set odps.sql.reducer.instances=3072; _--可选。默认最大1111个reducer,1111哈希桶。_alter table table_name clustered by(contact_id) sorted by(contact_id) into 3072 buckets;_--必选_
步骤2:依照上述模型的场景1 或者 场景2进行代码革新。
这里先给出代码革新后的资源耗费比照:
原来的full outer jion
left anti join初始化
原来的full outer jion
left anti join第二天当前
工夫耗费
8h30min38s
1h4min48s
7h32min30s
32min30s
cpu耗费
29666.02 Core * Min
65705.30 Core * Min
31126.86 Core * Min
30589.29 Core * Min
mem耗费
109640.80 GB * Min
133922.25 GB * Min
114764.80 GB * Min
65509.28 GB * Min
能够发现hash cluster分桶操作在初始化有额定的开销,次要是按主键进行散列和排序,然而这是值得的,可一劳永逸,后续的读取速度十分快。以前每天跑须要8小时,当初除了分桶初始化须要1小时,当前每天理论只须要30分钟。
初始化执行图
图1:
- M2是读全量表。
M4是读取增量表,在场景2的模型中增量表被读取了两次,其中:
- R5_4是对主键去重(row_number)后用于前面的union all,外面蕴含了所有的增量数据;
- R1_4是对主键去重(row_number)后用于left anti join,外面只蕴含了主键。
- J3_1_2是left anti join,将它开展后看到这里还是有mergJoin,然而这只是初始化的操作,前面每天就不会有了。开展后如图2。
- R6_3_5是将增量和全量进行union all,开展后如图3。
- R7_6则是将索引信息写入元数据,如图3的MetaCollector1会在R7_6中sink。
因而:图1中除了R5_4和R1_4是去重必须的,有shuffle。还有J3_1_2和R6_3_5这两个中央有shuffle。
图2:
图3:
第二天当前的执行图
图1:
同上,图1中的R3_2和R1_2是对增量去重必要对操作,有shuffle,这里疏忽。
初始化执行图的J3_1_2和R6_3_5曾经被合并到了M4_1_3,将其开展后如图2。即left anti join 和 union all这两步操作在一个阶段实现了,且这个阶段是Map 工作(M4_1_3),而不是Join工作或Reduce工作。而且全量表不在独自占用一个Map工作,也被合并到了M4_1_3,因而整个过程下来没有shuffle操作,速度晋升非常明显。也就是说只须要一个M4_1_3就能实现所有到操作,间接sink到表。
R5_4则是将索引信息写入元数据,如图2的MetaCollector1会在R5_4中sink。
图2:
原文链接
本文为阿里云原创内容,未经容许不得转载。