乐趣区

关于大数据:Maxcompute-数据上云一致性比对

我写过很多如何去对数、如何批量对数的技术文档,最近我的项目遇到这个问题,我才发现在官网博客上还没有公布过这个课题的文章。这就像灯下黑,太长用到的知识点,反而没有意识到其重要性。

注:这里对数的场景就是指在阿里云平台应用 dataworks 等大数据开发工具集成业务零碎数据库(oracle 等)数据上云到 maxcompute 的场景,所以,示例的 SQL 也是针对 maxcompute。

先说说个别业务上怎么对数的,咱们做了一个报表,出了一个数据“某个产品卖了 30 个”。这个不只是在大数据平台上有这个数据,在业务零碎也有这个数据,这些统计动作在业务零碎通过程序和人工也会有一份,个别做好报表后会先对这个数据。

所以,第一线反馈回来的数据就是这个汇总数据不统一的问题。然而这个后果是十分概括的,因为就像我感觉这个月工资少发了 5 毛一样,如果我不看我的工资条我其实不晓得本人是不是少发了。工资条不只是一个汇总数据,外面有我税前工资、奖金(浮动)、社保、扣税等一系列的明细数据,这些数据让我去判断我是不是少了 5 毛,而加工过的数据是简单的。

说到这里,我其实就像表白一个事件,对数是要对明细数据。这是所有计算后事实的根底,能够拿进去作证的。

所以,两边都查一下这个汇总值应用的表的对应的记录,比如说查问“明天这个产品 ID 的售卖记录”。后果就发现业务零碎有 31 笔,而大数据平台有 30 笔。

即使到了这里,其实咱们依然不晓得期间产生了什么,为什么会失落数据。另外咱们还不晓得其余商品 ID 的数据是不是也有失落的,还有其余的表的数据是不是也会产生相似的状况。

1. 明细数据比对

既然最终都是对明细数据,那么我是不是能够间接比对明细数据呢?答复是:正确。

个别产生这种状况,首先要比对业务零碎和大数据平台两个表的数据。

1. 再利用全量集成工具,从业务零碎的数据库全量抽取一遍数据到大数据平台。比对数据肯定要把数据放到一起,隔空比对是不存在的。因为大数据平台的容量是数百倍于业务零碎的,所以,个别都在大数据平台比对。(这里有一个悖论,如果集成工具自身就有缺点,导致抽取过程中就丢数据,岂不是永远没方法比对了。所以,如果对这个工具也不确定就得从数据库导出数据到文件,而后再加载到某个数据库下来比对。在这里,通过我对离线集成这个产品的长年应用教训,这个工具是十分牢靠的,还未遇到过这个问题。)

2. 依据主键关联,比对 2 个表中的主键的差别。如果是下面提到的记录失落的问题,这一步做完就很容易比对进去了。这里还会发现一个问题,就是业务零碎的表是一直变动的,所以,这时与大数据平台的表比照会有差别。这个差别的外围起因是:大数据平台的表是业务零碎表在每日的日末(00:00:00)的一个时点数据,而业务零碎的数据是始终在变动的。所以,即使有差别超出预期也不要惊恐。如果是应用实时同步能够从归档日志中获取到这期间数据的每一条变动,能够追溯变动起因。如果没有实时同步,也能够通过表中与工夫相干字段去判断数据是否被更新掉。要是什么都没有(这种状况也是存在的),那就去骂骂设计表的业务零碎开发(没错,是他们的锅),也能够跟业务去具体理解一下,这行记录是不是明天做的,而不是昨天。

3. 还有一种状况,就是主键统一,数据内容(主键之外的字段)不统一。这种状况,还是须要思考数据变动的状况,能够从日志、工夫字段、业务等几个角度去比对。如果发现数据的确不合乎预期,就须要查问同步工具的问题。

2. 比对 SQL 剖析

在下面的章节,我形容了比对明天新抽取的全量表和上日在 maxcompute 上应用前日全量和上日增量合并的上日全量的环节。比对两张表汇合是否统一的 SQL 办法其实比较简单,大家第一工夫就会想到汇合操作。在 oracle 外面有 Minus、except,同样在 maxcompute 外面也有。然而为了便于剖析问题,我还是本人写了一个 SQL。示例 SQL(maxcompute sql)如下:

-- 限定日期分区,比对上日
select count(t1.BATCH_NUMBER) as cnt_left
,count(t2.BATCH_NUMBER) as cnt_right
,count(concat(t1.BATCH_NUMBER,t2.BATCH_NUMBER)) as pk_inner
,count(case when t1.BATCH_NUMBER is not null and t2.BATCH_NUMBER is null then 1 end) as pk_left
,count(case when t2.BATCH_NUMBER is not null and t1.BATCH_NUMBER is null then 1 end) as pk_right
,count(case when nvl(t1.rec_id ,'') = nvl(t2.rec_id ,'') then 1 end) as col_diff_rec_id
,count(case when nvl(t2.rec_creator ,'') = nvl(t1.rec_creator ,'') then 1 end) as col_diff_rec_creator
,count(case when nvl(t2.rec_create_time,'') = nvl(t1.rec_create_time,'') then 1 end) as col_diff_rec_create_time
from ods_dev.o_rz_lms_im_timck01 t1 -- 开发环境从新初始化的明天数据
full join ods.o_rz_lms_im_timck01 t2 -- 生产环节昨日长期增量合并的数据
on t1.BATCH_NUMBER =t2.BATCH_NUMBER 
and t1.IN_STOCK_TIME =t2.IN_STOCK_TIME
and t1.OP_NO =t2.OP_NO 
and t1.STOCK_CODE =t2.STOCK_CODE 
and t1.YP_ID =t2.YP_ID 
and t2.ds='20230426'
where t1.ds='20230426'
;
--cnt_left 9205131 阐明:左表有记录数 9205131
--cnt_right 9203971 阐明:右表有记录数 9203971
--pk_inner 9203971 阐明:主键关联统一记录数 9203971
--pk_left 1160 阐明:左表比右表多记录数 1160
--pk_right 0 阐明:右表比左表多有记录数 0
--col_diff_rec_id 9203971 阐明:字段统一记录数与主键统一雷同,阐明关联上的两个表该字段统一
--col_diff_rec_creator 9203971 阐明:同上
--col_diff_rec_create_time 9203971 阐明:同上 

在下面的例子中,左表是明天从新初始化的数据,右表是在 maxcompute 上 merge 的上日全量数据。在比对之前,咱们其实就应该理解这两个表的数据必然是不统一的。尽管是同一张表,然而时点是不统一的。

不统一包含几种:

1.t1 表中存在的主键,在 t2 表中不存在;

2.t2 表中存在的主键,在 t1 表中不存在;

3.t1 和 t2 表中都存在的主键,然而主键之外的字段值不统一;

4.t1 和 t2 表中都存在的主键,然而主键之外的字段值统一;

除去第 4 中状况,其余 3 种状态都是两个表的值不统一,都须要进一步核查。如果是失常状况下,第 1 种状况是在明天零点当前新 insert 到业务库中的数据,第 2 种状况是明天零点当前 delete 的业务库中的数据,第 3 种状况是明天零点之后 update 的数据,第 4 种状况是业务表中明天零点之后未被更新的数据。

理解这些情况后,咱们就能够根据可循去辨认

3.Dataworks 实时同步日志表

如果同步才有的是 DataWorks 的离线同步,其实观测以上数据变动是有些艰难的。如果咱们采纳实时同步,数据库数据的变动是都会保留下来的。上一章节中提到的变动,都能够从日志中观测到。上面 SQL,就是我查问这个变动应用的 SQL,查问(dataworks 实时数据同步日志表)示例 SQL 如下:

select from_unixtime(cast(substr(to_char(_execute_time_),1,10) as bigint)) as yy
,get_json_object(cast(_data_columns_ as string),"$.rec_id") item0
,x.*
from ods.o_rz_lms_odps_first_log x -- 实时同步数据源 o_rz_lms 的日志表
where year='2023' and month='04' and day>='10' -- 数据区间限度
--and hour ='18'
and _dest_table_name_='o_rz_lms_im_timck01' -- 数据表限度
-- 以下为主键字段
and get_json_object(cast(_data_columns_ as string),"$.yp_id") ='L1'
and get_json_object(cast(_data_columns_ as string),"$.batch_number") ='Y1'
and get_json_object(cast(_data_columns_ as string),"$.in_stock_time") ='2'
and get_json_object(cast(_data_columns_ as string),"$.op_no") ='9'
and get_json_object(cast(_data_columns_ as string),"$.stock_code") ='R'
--and _operation_type_='D'
order by _execute_time_ desc 
limit 1000
;
-- _execute_time_ 数据操作工夫
-- _operation_type_ 操作类型 增删改 UDI 
-- _sequence_id_ 序列号,不会反复
-- _before_image_ 批改前数据
-- _after_image_ 批改后数据
-- _dest_table_name_ 操作的表名
-- _data_columns_ 操作的数据内容 JSON

DataWorks 的实时同步的数据源每隔一段时间就会实时写入到一张以“数据源名 +_odps_first_log”命名的表中,表有年、月、日、时四级分区。该表的主键并不是数据操作工夫,而是序列号“_execute_time_”,所以,一行数据主键的更新程序是依照“_execute_time_”更新。

一行数据更新是有前后两个状态的,所以有“_before_image_ 批改前数据、_after_image_ 批改后数据”这两个字段来标识前后状态。

数据是在字段“_data_columns_”中,以 JSON 格局存储。为了辨认其中的某一行数据,我应用函数解析了对应的字段,以此来确定本人要的数据。

4. 继续的品质保障

到这里,我并未讲如何去解决数据不统一的状况。如果的确发现数据不统一,可用的解决形式就是从新初始化全量数据。在这里要强调一点,如果离线全量集成工具是可信的,全量初始化的数据就不会丢。然而如果这种形式不可信,则就要更换办法。

在很多状况下,源端做一些业务变更也会偶发数据异样。在数据失落起因未查明的状况下,须要常常的去做数据一致性的比对,做到防患于未然。所以,日常监控数据数据一致性也是十分重要的。

原文链接

本文为阿里云原创内容,未经容许不得转载。

退出移动版