作者:吴文池
背景
iceberg提供合并小文件性能,能够依照用户的配置文件,把多个合乎配置的小文件合并成多个大文件。该流程次要是对源数据做了一次复制。
但在理论生产环境中,数据是始终在变动的,有可能会呈现这种状况:在还未实现数据合并工作时,对之前的数据做出了批改,这就导致正在合并的数据与新数据可能会产生抵触。
那么iceberg是如何解决这种数据抵触的呢?
iceberg行级删除
iceberg的行级批改次要是通过行级删除记录再加上数据记录实现的,所以上面先说一下iceberg行级删除的实现。
iceberg行级删除实现概要
iceberg的行级更新和删除,次要是通过减少delete记录做到的。iceberg总共有两种delete类型,一种是position delete,一种是equality delete,次要区别是在于该数据的插入和批改操作是否在同一个checkpoint内。此次剖析的抵触场景次要是不同快照间的,所以以下阐明都以equality delete为背景,简略说下流程(非upsert模式)。
先在 writer 中设置 equalityFieldColumns :
DataStreamSink<Void> append = FlinkSink.forRowData(input)
.tableLoader(tableLoader).overwrite(false).distributionMode(DistributionMode.HASH).equalityFieldColumns(Lists.newArrayList("column1", "column2")) .append();
设置好之后,writer就会依据设置的列对数据做写入,如果是插入的数据,则写为数据记录,如果是删除数据,则写为删除记录(非upsert模式):
case INSERT:
case UPDATE_AFTER:
if (upsert) {
writer.delete(row);
}
writer.write(row);
break;
case UPDATE_BEFORE:
if (upsert) {
break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
}
writer.delete(row);
break;
case DELETE:
writer.delete(row);
break;
在读取数据的时候,会依据数据生成的程序(即sequenceNumber),判断该数据记录是否有对应的删除记录,如果有,则认为该数据曾经被删除,如果没有,则认为该数据仍然存在。
利用删除记录的判断条件
iceberg中应用接口FileScanTask来记录以后扫描工作:
public interface FileScanTask extends ScanTask {
/**
- The {@link DataFile file} to scan.
* - @return the file to scan
*/
DataFile file();
/**
- A list of {@link DeleteFile delete files} to apply when reading the task's data file.
* - @return a list of delete files to apply
*/
List<DeleteFile> deletes();
.......
}
其中, file就是须要读取的数据记录文件,deletes就是该数据文件对应的删除记录文件列表,那么当iceberg要读取一个数据文件的时候,怎么找与之对应的删除文件列表呢?次要以上面两个作为判断根据:
1、sequenceNumber
每个快照对应一个sequenceNumber,同一个快照里所有的数据的number都雷同,这就能够用该number辨认数据的先后顺序。如果一条数据被批改,则该数据对应的number肯定要比删除数据的number小,所以在加载删除数据时,要判断该删除记录的number比以后的数据记录的number大。
2、parquet文件的min/max
parquet文件能够统计以后文件数据的min/max值,所以能够依据min/max值判断出要找的数据在不在以后的parquet文件中。即,当要查的数据的值与文件的min/max值有交加,则阐明该值在以后的文件中,须要读取该文件,如果没有交加,则阐明该值不在以后文件中,不须要读取该文件。
举例
当初须要批改一条数据,以后快照已存在一条数据记录(1,a),sequenceNumber为1,当初须要把它批改成(1,b),则iceberg会先写入一条删除记录(1,a),再写入一条数据记录(1,b),并生成新的快照,该快照的sequenceNumber为2。读取该表数据的时候,共有两个数据文件(共两条数据记录)和一个删除文件(共一条删除记录),因为删除文件的sequenceNumber大于第一个数据文件记录,所以,在读取第一个数据文件记录的时候,会先加载删除记录(1,a)并保留至一个dict中,而后读入第一个数据文件中的数据记录(1,a),而后发现保留着删除记录的dict中有一条与之对应的数据,则阐明该数据记录被删除,则(1,a)这条数据被抛弃。之后读取第二个数据文件中的数据记录(1,b),因为没有与之对应的删除记录,所以该数据返回给用户,最终后果上,用户只能读到一条(1,b)的记录。
剖析
数据抵触场景
场景一:
步骤1、已存在a,b快照,此时用户基于b快照做小文件合并。
步骤2、基于b快照的合并工作还未实现,此时用户把b快照中的数据做了批改,并生成了快照c。
场景二:
步骤1、已存在a,b三个快照,此时用户基于b快照做小文件合并。
步骤2、基于b快照的合并工作还未实现,此时用户新增了数据并生成了c快照,之后又批改c快照中的数据生成了d快照。
(该场景从数据上来说并没有抵触,但实际上因为iceberg的实现形式问题导致该场景也会呈现抵触)
数据抵触剖析
USE_STARTING_SEQUENCE_NUMBER
iceberg中能够设置 USE_STARTING_SEQUENCE_NUMBER 属性用来批改提交快照时快照对应的sequencyNumber,以下会剖析iceberg是如何应用该属性来解决数据抵触的。
先剖析一下场景一:
假如a,b,c快照对应的sequencyNumber别离是1,2,3,b快照中有记录(1,a),c快照中把(1,a)删除,并增加了(1,b),则此时最一开始的数据记录(1,a)的number为1,删除记录(1,a)的number为3,批改后的数据记录(1,b)的number为3。
如果设置USE_STARTING_SEQUENCE_NUMBER为false,假如没有解决数据抵触问题,则可能胜利生成d快照,其sequencyNumber为4,即合并后的(1,a)的sequencyNumber为4。那么在读取的时候,读取的数据记录有number为4的(1,a),number为3的(1,b),读取的删除记录有number为3的(1,a)。因为此时数据记录(1,a)的number比删除记录(1,a)的大,所以iceberg会认为是先做的删除(1,a),再做的增加(1,a),所以最终返回给用户的数据是(1,a)和(1,b)。这显著呈现了数据谬误。(实际上在做完合并工作提交快照时,会刷新以后表的最新快照,如果发现新快照有对合并数据做批改时,即产生了数据抵触,则间接报异样,具体起因会在剖析场景二中论述。以上阐明是建设在不查看抵触时,持续往下提交时会呈现的状况)
如果设置了USE_STARTING_SEQUENCE_NUMBER为true,则生成d快照的number为执行合并工作时的快照的number,即d快照的number为2。那么在读取的时候,读取的数据记录有number为2的(1,a),number为3的(1,b),删除记录有number为3的(1,a),因为此时删除记录(1,a)的number比数据记录(1,a)的大,且该删除记录对应的parquet文件的min/max肯定蕴含(1,a),所以会读取该删除记录(1,a)。最终,在读取数据记录(1,a)的时候,会依据删除记录把(1,a)过滤掉,最终返回给用户的数据是(1,b)。此时数据抵触被胜利解决。
再剖析一下场景二:
现有a,b,c 三个snapshot,对c的数据做更新生成d,再基于b做文件合并。
该过程,其实并不存在数据抵触,因为d中批改的数据是c的,与合并工作的数据并无关系,但为什么把USE_STARTING_SEQUENCE_NUMBER设置为false的状况下,iceberg会认为有数据抵触呢?
假如快照a,b,c,d对应的number别离是1,2,3,4,快照b里有条数据记录(1,a),c里有条数据记录(1,b),d批改了c里的数据记录为(1,c),所以此时有数据记录(1,a),(1,b),(1,c)和一条删除记录(1,b),这4条记录对应的number为2,3,4,4。
在做完合并工作时,以后快照为b,在做完合并工作、提交快照之前,会刷新以后表的最新快照为d。这里会判断试下是否会呈现数据抵触:读取数据记录(1,a),找到number比(1,a)大的删除记录(1,b),此时(1,b)应该不会用于过滤(1,a)的,但因为在做合并工作的时候,并没有读取parquet文件的min/max,所以这里number比2大的删除记录都会被利用于(1,a)上,就导致iceberg认为做合并的数据在前面有被批改,所以间接报了数据抵触,导致合并工作失败。
测试验证
场景一:
形容:
append形式生成 a,b,c三个snapshot,基于b做文件合并。
模仿场景:
1、已存在a,b快照。
2、当初基于b快照做小文件合并。
3、基于b的合并工作还未实现,另一条操作流基于b快照做了append类型的数据增加。
后果:
USE_STARTING_SEQUENCE_NUMBER | |
---|---|
true | 胜利 |
false | 胜利 |
胜利:生成新的快照,最终snapshot是 a,b,c,d。
新生成的大文件是基于a,b的数据文件的总和,
快照d中蕴含了c的数据,以及基于a,b合并的数据。
a,b,c,d对应的squenceNumber别离是1,2,3,4:
USE_STARTING_SEQUENCE_NUMBER 为 true时,d外面生成的新的大文件对应的manifest的squeceid是用的以前的2,删除的manifest用的是新的id4:
USE_STARTING_SEQUENCE_NUMBER 为 false 时,d外面合并工作应用的全都是最新的id:
场景二:
形容:
append形式生成 a,b,c 三个 snapshot,先基于c做一次合并,合并胜利后,基于c再做一次合并。
模仿场景:
1、基于c快照做小文件合并。
2、基于c的工作还未实现,又启动了一个完全相同的基于c快照做小文件合并的工作
后果:
USE_STARTING_SEQUENCE_NUMBER | true |
---|---|
true | 胜利 |
false | 胜利 |
场景三:
形容:
append形式生成 a,b 两个snapshot,对a的数据做更新生成c,再基于b做文件合并。
模仿场景:
1、基于b快照做合并。
2、合并工作还未实现,另一条数据流对a快照中的数据做了更新,且在合并工作实现前先提交胜利,此时生成了c快照。
后果:
USE_STARTING_SEQUENCE_NUMBER | |
---|---|
true | 胜利 |
false | 失败 |
失败:提醒 Cannot commit, found new delete for replaced data file。
胜利,生成最新快照d,生成快照a,b中数据合并的大文件
场景四:
形容:
append形式生成 a,b,c 三个snapshot,对c的数据做更新生成d,再基于b做文件合并。
模仿场景:
1、基于b快照做合并。
2、合并还未实现,另一条数据流先做append,生成了c快照。
3、之前的合并还未实现,另一条数据流对c快照里的数据做批改,生成了快照d
后果:
USE_STARTING_SEQUENCE_NUMBER | |
---|---|
true | 胜利 |
false | 失败 |
论断
测试后果与剖析相匹配。
设置USE_STARTING_SEQUENCE_NUMBER该参数为true,能够批改新生成的dataFile和manifest的sequenceNumber为原数据文件的sequenceNumber,这样在读取数据的时候,就能够把delteFile利用到新生成的dataFile中了,能够解决数据抵触的状况。