简介: ods层数据同步时常常会遇到增全量合并的模型,即T-1天增量表 + T-2全量表 = T-1全量表。能够通过full outer join脚本来实现合并,然而数据量很大时十分耗费资源。本文将为您介绍在做增量数据的减少、更新时如何通过full outer join改写left anti join来实现的最佳实际。

背景

ods层数据同步时常常会遇到增全量合并的模型,即T-1天增量表 + T-2全量表 = T-1全量表。能够通过full outer join脚本来实现合并,然而数据量很大时十分耗费资源。

insert overwrite table tb_test partition(ds='${bizdate}')select case when a.id is not null then a.id esle b.id end as id         ,if(a.name is not null, a.name, b.name) as name      ,coalesce(a.age, b.age) as age       _--这3种写法一样,都是优先取delta表的字段_from(   select * from tb_test_delta where ds='${bizdate}') afull outer join(   select * from tb_test where ds='${bizdate-1}') bon a.id =b.id;

这种写法可实现新增和更新操作:

  • 新增是指增量表中新呈现的数据,而全量表中没有;
  • 更新是指增量表和全量表中都有的数据,但优先取增量表的数据,笼罩历史表的数据。
    如下图所示,R2_1是增量表当天去重后增量数据,M3是全量表前一天的数据,而J4_2_3则是full outer join的执行图。

将J4_2_3开展会发现外面将增量和全量进行了merge join,当数据量很大(1288亿条)时会产生很大的shuffle开销。此时优化计划就是将full outer join改成 union all,从而防止join shuffle

优化模型

论断:full outer join改成hash cluster + left join +union all能够无效地升高计算成本,且有两种利用场景。先将模型进行形象,假如有a和b两个表,a是增量表,b是全量表:

with  a as ( select * from values  (1,'111')                             ,(2,'two')                             ,(7,'777') as (id,name) ) _--增量_,b as ( select * from values  (1,'')                             ,(2,'222')                             ,(3,'333')                             ,(4,'444') as (id,name) )  _--全量_

场景1:只合并新增数据到全量表

left anti join相当于not in,增量not in全量,过滤后只剩下齐全新增的id,对全量中已有的id不批改:

_--查问齐全新增的id_select * from a left anti join b on a.id=b.id ;_--后果如下_+_------------+------+_| id         | name |+_------------+------+_| 7          | 777  |+_------------+------+_
_--齐全新增的合并全量表_select * from  a _--增量表_left anti join b on a.id=b.id  union all select * from b  _--全量表__--后果如下_+_------------+------+_| id         | name |+_------------+------+_| 1          |      || 2          | 222  || 3          | 333  || 4          | 444  || 7          | 777  |+_------------+------+_

场景2:合并新增数据到全量表,且更新历史数据

全量not in增量,过滤后只剩下历史的id,而后union all增量,既新增也批改

_--查问历史全量数据_select * from b left anti join a on a.id=b.id;_--后果如下_+_------------+------+_| id         | name |+_------------+------+_| 3          | 333  || 4          | 444  |+_------------+------+_
_--合并新增数据到全量表,且更新历史数据_select * from  b _--全量表_left anti join a on a.id=b.idunion all select * from a ; _--增量表__--后果如下_+_------------+------+_| id         | name |+_------------+------+_| 1          | 111  || 2          | two  || 7          | 777  || 3          | 333  || 4          | 444  |+_------------+------+_

优化实际

步骤1:表属性批改

表、作业属性批改,对原来的表、作业进行属性优化,能够晋升优化成果。

set odps.sql.reducer.instances=3072;  _--可选。默认最大1111个reducer,1111哈希桶。_alter table table_name clustered by(contact_id) sorted by(contact_id) into 3072 buckets;_--必选_

步骤2:依照上述模型的场景1 或者 场景2进行代码革新。

这里先给出代码革新后的资源耗费比照:

原来的full outer jion

left anti join初始化

原来的full outer jion

left anti join第二天当前

工夫耗费

8h30min38s

1h4min48s

7h32min30s

32min30s

cpu耗费

29666.02 Core * Min

65705.30 Core * Min

31126.86 Core * Min

30589.29 Core * Min

mem耗费

109640.80 GB * Min

133922.25 GB * Min

114764.80 GB * Min

65509.28 GB * Min

能够发现hash cluster分桶操作在初始化有额定的开销,次要是按主键进行散列和排序,然而这是值得的,可一劳永逸,后续的读取速度十分快。以前每天跑须要8小时,当初除了分桶初始化须要1小时,当前每天理论只须要30分钟。

初始化执行图

图1:

  • M2是读全量表。
  • M4是读取增量表,在场景2的模型中增量表被读取了两次,其中:

    • R5_4是对主键去重(row_number)后用于前面的union all,外面蕴含了所有的增量数据;
    • R1_4是对主键去重(row_number)后用于left anti join,外面只蕴含了主键。
  • J3_1_2是left anti join,将它开展后看到这里还是有mergJoin,然而这只是初始化的操作,前面每天就不会有了。开展后如图2。
  • R6_3_5是将增量和全量进行union all,开展后如图3。
  • R7_6则是将索引信息写入元数据,如图3的MetaCollector1会在R7_6中sink。
    因而:图1中除了R5_4和R1_4是去重必须的,有shuffle。还有J3_1_2和R6_3_5这两个中央有shuffle。

图2:

图3:

第二天当前的执行图

图1:

同上,图1中的R3_2和R1_2是对增量去重必要对操作,有shuffle,这里疏忽。

初始化执行图的J3_1_2和R6_3_5曾经被合并到了M4_1_3,将其开展后如图2。即left anti join 和 union all这两步操作在一个阶段实现了,且这个阶段是Map 工作(M4_1_3),而不是Join工作或Reduce工作。而且全量表不在独自占用一个Map工作,也被合并到了M4_1_3,因而整个过程下来没有shuffle操作,速度晋升非常明显。也就是说只须要一个M4_1_3就能实现所有到操作,间接sink到表。

R5_4则是将索引信息写入元数据,如图2的MetaCollector1会在R5_4中sink。

图2:

原文链接
本文为阿里云原创内容,未经容许不得转载。