共计 3190 个字符,预计需要花费 8 分钟才能阅读完成。
简介: ods 层数据同步时常常会遇到增全量合并的模型,即 T - 1 天增量表 + T- 2 全量表 = T- 1 全量表。能够通过 full outer join 脚本来实现合并,然而数据量很大时十分耗费资源。本文将为您介绍在做增量数据的减少、更新时如何通过 full outer join 改写 left anti join 来实现的最佳实际。
背景
ods 层数据同步时常常会遇到 增全量合并 的模型,即T- 1 天增量表 + T- 2 全量表 = T- 1 全量表
。能够通过 full outer join 脚本来实现合并,然而数据量很大时十分耗费资源。
insert overwrite table tb_test partition(ds='${bizdate}')
select case when a.id is not null then a.id esle b.id end as id
,if(a.name is not null, a.name, b.name) as name
,coalesce(a.age, b.age) as age
_-- 这 3 种写法一样,都是优先取 delta 表的字段_
from
(select * from tb_test_delta where ds='${bizdate}'
) a
full outer join
(select * from tb_test where ds='${bizdate-1}'
) b
on a.id =b.id;
这种写法可实现新增和更新操作:
- 新增是指增量表中新呈现的数据,而全量表中没有;
- 更新是指增量表和全量表中都有的数据,但优先取增量表的数据,笼罩历史表的数据。
如下图所示,R2_1 是增量表当天去重后增量数据,M3 是全量表前一天的数据,而 J4_2_3 则是 full outer join 的执行图。
将 J4_2_3 开展会发现外面将增量和全量进行了 merge join,当数据量很大(1288 亿条)时会产生很大的 shuffle 开销。此时优化计划就是将 full outer join 改成 union all,从而防止 join shuffle。
优化模型
论断:full outer join 改成 hash cluster + left join +union all 能够无效地升高计算成本,且有两种利用场景。先将模型进行形象,假如有 a 和 b 两个表,a 是增量表,b 是全量表:
with
a as (select * from values (1,'111')
,(2,'two')
,(7,'777') as (id,name) ) _-- 增量_
,b as (select * from values (1,'')
,(2,'222')
,(3,'333')
,(4,'444') as (id,name) ) _-- 全量_
场景 1: 只合并新增数据到全量表
left anti join 相当于 not in,增量 not in 全量, 过滤后只剩下齐全新增的 id,对全量中已有的 id 不批改:
_-- 查问齐全新增的 id_
select * from a left anti join b on a.id=b.id ;
_-- 后果如下_
+_------------+------+_
| id | name |
+_------------+------+_
| 7 | 777 |
+_------------+------+_
_-- 齐全新增的合并全量表_
select * from a _-- 增量表_
left anti join b on a.id=b.id
union all
select * from b _-- 全量表_
_-- 后果如下_
+_------------+------+_
| id | name |
+_------------+------+_
| 1 | |
| 2 | 222 |
| 3 | 333 |
| 4 | 444 |
| 7 | 777 |
+_------------+------+_
场景 2: 合并新增数据到全量表,且更新历史数据
全量 not in 增量, 过滤后只剩下历史的 id,而后 union all 增量,既新增也批改
_-- 查问历史全量数据_
select * from b left anti join a on a.id=b.id;
_-- 后果如下_
+_------------+------+_
| id | name |
+_------------+------+_
| 3 | 333 |
| 4 | 444 |
+_------------+------+_
_-- 合并新增数据到全量表,且更新历史数据_
select * from b _-- 全量表_
left anti join a on a.id=b.id
union all
select * from a ; _-- 增量表_
_-- 后果如下_
+_------------+------+_
| id | name |
+_------------+------+_
| 1 | 111 |
| 2 | two |
| 7 | 777 |
| 3 | 333 |
| 4 | 444 |
+_------------+------+_
优化实际
步骤 1:表属性批改
表、作业属性批改, 对原来的表、作业进行属性优化,能够晋升优化成果。
set odps.sql.reducer.instances=3072; _-- 可选。默认最大 1111 个 reducer,1111 哈希桶。_
alter table table_name clustered by(contact_id) sorted by(contact_id) into 3072 buckets;_-- 必选_
步骤 2:依照上述模型的场景 1 或者 场景 2 进行代码革新。
这里先给出代码革新后的资源耗费比照:
原来的 full outer jion
left anti join 初始化
原来的 full outer jion
left anti join 第二天当前
工夫耗费
8h30min38s
1h4min48s
7h32min30s
32min30s
cpu 耗费
29666.02 Core * Min
65705.30 Core * Min
31126.86 Core * Min
30589.29 Core * Min
mem 耗费
109640.80 GB * Min
133922.25 GB * Min
114764.80 GB * Min
65509.28 GB * Min
能够发现 hash cluster 分桶操作在初始化有额定的开销,次要是按主键进行散列和排序,然而这是值得的,可一劳永逸,后续的读取速度十分快。以前每天跑须要 8 小时,当初除了分桶初始化须要 1 小时,当前每天理论只须要 30 分钟。
初始化执行图
图 1:
- M2 是读全量表。
-
M4 是读取增量表, 在场景 2 的模型中增量表被读取了两次,其中:
- R5_4 是对主键去重(row_number)后用于前面的 union all,外面蕴含了所有的增量数据;
- R1_4 是对主键去重(row_number)后用于 left anti join,外面只蕴含了主键。
- J3_1_2 是 left anti join, 将它开展后看到这里还是有 mergJoin,然而这只是初始化的操作,前面每天就不会有了。开展后如图 2。
- R6_3_5 是将增量和全量进行 union all,开展后如图 3。
- R7_6 则是将索引信息写入元数据,如图 3 的 MetaCollector1 会在 R7_6 中 sink。
因而:图 1 中除了 R5_4 和 R1_4 是去重必须的,有 shuffle。还有 J3_1_2 和 R6_3_5 这两个中央有 shuffle。
图 2:
图 3:
第二天当前的执行图
图 1:
同上,图 1 中的 R3_2 和 R1_2 是对增量去重必要对操作,有 shuffle,这里疏忽。
初始化执行图的 J3_1_2 和 R6_3_5 曾经被合并到了 M4_1_3,将其开展后如图 2。即 left anti join 和 union all 这两步操作在一个阶段实现了,且这个阶段是 Map 工作(M4_1_3),而不是 Join 工作或 Reduce 工作。而且全量表不在独自占用一个 Map 工作,也被合并到了 M4_1_3,因而整个过程下来没有 shuffle 操作,速度晋升非常明显。也就是说只须要一个 M4_1_3 就能实现所有到操作,间接 sink 到表。
R5_4 则是将索引信息写入元数据,如图 2 的 MetaCollector1 会在 R5_4 中 sink。
图 2:
原文链接
本文为阿里云原创内容,未经容许不得转载。