作者:吴文池
背景
iceberg 提供合并小文件性能,能够依照用户的配置文件,把多个合乎配置的小文件合并成多个大文件。该流程次要是对源数据做了一次复制。
但在理论生产环境中,数据是始终在变动的,有可能会呈现这种状况:在还未实现数据合并工作时,对之前的数据做出了批改,这就导致正在合并的数据与新数据可能会产生抵触。
那么 iceberg 是如何解决这种数据抵触的呢?
iceberg 行级删除
iceberg 的行级批改次要是通过行级删除记录再加上数据记录实现的,所以上面先说一下 iceberg 行级删除的实现。
iceberg 行级删除实现概要
iceberg 的行级更新和删除,次要是通过减少 delete 记录做到的。iceberg 总共有两种 delete 类型,一种是 position delete,一种是 equality delete,次要区别是在于该数据的插入和批改操作是否在同一个 checkpoint 内。此次剖析的抵触场景次要是不同快照间的,所以以下阐明都以 equality delete 为背景,简略说下流程(非 upsert 模式)。
先在 writer 中设置 equalityFieldColumns :
DataStreamSink<Void> append = FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.overwrite(false)
.distributionMode(DistributionMode.HASH)
.equalityFieldColumns(Lists.newArrayList("column1", "column2"))
.append();
设置好之后,writer 就会依据设置的列对数据做写入,如果是插入的数据,则写为数据记录,如果是删除数据,则写为删除记录(非 upsert 模式):
case INSERT:
case UPDATE_AFTER:
if (upsert) {
writer.delete(row);
}
writer.write(row);
break;
case UPDATE_BEFORE:
if (upsert) {
break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
}
writer.delete(row);
break;
case DELETE:
writer.delete(row);
break;
在读取数据的时候,会依据数据生成的程序(即 sequenceNumber),判断该数据记录是否有对应的删除记录,如果有,则认为该数据曾经被删除,如果没有,则认为该数据仍然存在。
利用删除记录的判断条件
iceberg 中应用接口 FileScanTask 来记录以后扫描工作:
public interface FileScanTask extends ScanTask {
/**
- The {@link DataFile file} to scan.
* - @return the file to scan
*/
DataFile file();
/**
- A list of {@link DeleteFile delete files} to apply when reading the task’s data file.
* - @return a list of delete files to apply
*/
List<DeleteFile> deletes();
…….
}
其中,file 就是须要读取的数据记录文件,deletes 就是该数据文件对应的删除记录文件列表,那么当 iceberg 要读取一个数据文件的时候,怎么找与之对应的删除文件列表呢?次要以上面两个作为判断根据:
1、sequenceNumber
每个快照对应一个 sequenceNumber,同一个快照里所有的数据的 number 都雷同,这就能够用该 number 辨认数据的先后顺序。如果一条数据被批改,则该数据对应的 number 肯定要比删除数据的 number 小,所以在加载删除数据时,要判断该删除记录的 number 比以后的数据记录的 number 大。
2、parquet 文件的 min/max
parquet 文件能够统计以后文件数据的 min/max 值,所以能够依据 min/max 值判断出要找的数据在不在以后的 parquet 文件中。即,当要查的数据的值与文件的 min/max 值有交加,则阐明该值在以后的文件中,须要读取该文件,如果没有交加,则阐明该值不在以后文件中,不须要读取该文件。
举例
当初须要批改一条数据,以后快照已存在一条数据记录 (1,a),sequenceNumber 为 1,当初须要把它批改成(1,b),则 iceberg 会先写入一条删除记录(1,a),再写入一条数据记录(1,b),并生成新的快照,该快照的 sequenceNumber 为 2。读取该表数据的时候,共有两个数据文件(共两条数据记录)和一个删除文件(共一条删除记录),因为删除文件的 sequenceNumber 大于第一个数据文件记录,所以,在读取第一个数据文件记录的时候,会先加载删除记录(1,a) 并保留至一个 dict 中,而后读入第一个数据文件中的数据记录 (1,a),而后发现保留着删除记录的 dict 中有一条与之对应的数据,则阐明该数据记录被删除,则(1,a) 这条数据被抛弃。之后读取第二个数据文件中的数据记录 (1,b),因为没有与之对应的删除记录,所以该数据返回给用户,最终后果上,用户只能读到一条(1,b) 的记录。
剖析
数据抵触场景
场景一:
步骤 1、已存在 a,b 快照,此时用户基于 b 快照做小文件合并。
步骤 2、基于 b 快照的合并工作还未实现,此时用户把 b 快照中的数据做了批改,并生成了快照 c。
场景二:
步骤 1、已存在 a,b 三个快照,此时用户基于 b 快照做小文件合并。
步骤 2、基于 b 快照的合并工作还未实现,此时用户新增了数据并生成了 c 快照,之后又批改 c 快照中的数据生成了 d 快照。
(该场景从数据上来说并没有抵触,但实际上因为 iceberg 的实现形式问题导致该场景也会呈现抵触)
数据抵触剖析
USE_STARTING_SEQUENCE_NUMBER
iceberg 中能够设置 USE_STARTING_SEQUENCE_NUMBER 属性用来批改提交快照时快照对应的 sequencyNumber,以下会剖析 iceberg 是如何应用该属性来解决数据抵触的。
先剖析一下场景一:
假如 a,b,c 快照对应的 sequencyNumber 别离是 1,2,3,b 快照中有记录 (1,a),c 快照中把(1,a) 删除,并增加了 (1,b),则此时最一开始的数据记录(1,a) 的 number 为 1,删除记录 (1,a) 的 number 为 3,批改后的数据记录 (1,b) 的 number 为 3。
如果设置 USE_STARTING_SEQUENCE_NUMBER 为 false,假如没有解决数据抵触问题,则可能胜利生成 d 快照,其 sequencyNumber 为 4,即合并后的 (1,a) 的 sequencyNumber 为 4。那么在读取的时候,读取的数据记录有 number 为 4 的 (1,a),number 为 3 的(1,b),读取的删除记录有 number 为 3 的(1,a)。因为此时数据记录(1,a) 的 number 比删除记录 (1,a) 的大,所以 iceberg 会认为是先做的删除 (1,a),再做的增加(1,a),所以最终返回给用户的数据是(1,a) 和(1,b)。这显著呈现了数据谬误。(实际上在做完合并工作提交快照时,会刷新以后表的最新快照,如果发现新快照有对合并数据做批改时,即产生了数据抵触,则间接报异样,具体起因会在剖析场景二中论述。以上阐明是建设在不查看抵触时,持续往下提交时会呈现的状况)
如果设置了 USE_STARTING_SEQUENCE_NUMBER 为 true,则生成 d 快照的 number 为执行合并工作时的快照的 number,即 d 快照的 number 为 2。那么在读取的时候,读取的数据记录有 number 为 2 的 (1,a),number 为 3 的(1,b),删除记录有 number 为 3 的(1,a),因为此时删除记录(1,a) 的 number 比数据记录 (1,a) 的大,且该删除记录对应的 parquet 文件的 min/max 肯定蕴含 (1,a),所以会读取该删除记录(1,a)。最终,在读取数据记录(1,a) 的时候,会依据删除记录把 (1,a) 过滤掉,最终返回给用户的数据是(1,b)。此时数据抵触被胜利解决。
再剖析一下场景二:
现有 a,b,c 三个 snapshot,对 c 的数据做更新生成 d,再基于 b 做文件合并。
该过程,其实并不存在数据抵触,因为 d 中批改的数据是 c 的,与合并工作的数据并无关系,但为什么把 USE_STARTING_SEQUENCE_NUMBER 设置为 false 的状况下,iceberg 会认为有数据抵触呢?
假如快照 a,b,c,d 对应的 number 别离是 1,2,3,4, 快照 b 里有条数据记录 (1,a),c 里有条数据记录(1,b),d 批改了 c 里的数据记录为(1,c),所以此时有数据记录(1,a),(1,b),(1,c) 和一条删除记录(1,b),这 4 条记录对应的 number 为 2,3,4,4。
在做完合并工作时,以后快照为 b,在做完合并工作、提交快照之前,会刷新以后表的最新快照为 d。这里会判断试下是否会呈现数据抵触:读取数据记录 (1,a),找到 number 比(1,a) 大的删除记录 (1,b),此时(1,b) 应该不会用于过滤 (1,a) 的,但因为在做合并工作的时候,并没有读取 parquet 文件的 min/max,所以这里 number 比 2 大的删除记录都会被利用于 (1,a) 上,就导致 iceberg 认为做合并的数据在前面有被批改,所以间接报了数据抵触,导致合并工作失败。
测试验证
场景一:
形容:
append 形式生成 a,b,c 三个 snapshot,基于 b 做文件合并。
模仿场景:
1、已存在 a,b 快照。
2、当初基于 b 快照做小文件合并。
3、基于 b 的合并工作还未实现,另一条操作流基于 b 快照做了 append 类型的数据增加。
后果:
USE_STARTING_SEQUENCE_NUMBER | |
---|---|
true | 胜利 |
false | 胜利 |
胜利:生成新的快照,最终 snapshot 是 a,b,c,d。
新生成的大文件是基于 a,b 的数据文件的总和,
快照 d 中蕴含了 c 的数据,以及基于 a,b 合并的数据。
a,b,c,d 对应的 squenceNumber 别离是 1,2,3,4:
USE_STARTING_SEQUENCE_NUMBER 为 true 时,d 外面生成的新的大文件对应的 manifest 的 squeceid 是用的以前的 2,删除的 manifest 用的是新的 id4:
USE_STARTING_SEQUENCE_NUMBER 为 false 时,d 外面合并工作应用的全都是最新的 id:
场景二:
形容:
append 形式生成 a,b,c 三个 snapshot,先基于 c 做一次合并,合并胜利后,基于 c 再做一次合并。
模仿场景:
1、基于 c 快照做小文件合并。
2、基于 c 的工作还未实现,又启动了一个完全相同的基于 c 快照做小文件合并的工作
后果:
USE_STARTING_SEQUENCE_NUMBER | true |
---|---|
true | 胜利 |
false | 胜利 |
场景三:
形容:
append 形式生成 a,b 两个 snapshot,对 a 的数据做更新生成 c,再基于 b 做文件合并。
模仿场景:
1、基于 b 快照做合并。
2、合并工作还未实现,另一条数据流对 a 快照中的数据做了更新,且在合并工作实现前先提交胜利,此时生成了 c 快照。
后果:
USE_STARTING_SEQUENCE_NUMBER | |
---|---|
true | 胜利 |
false | 失败 |
失败:提醒 Cannot commit, found new delete for replaced data file。
胜利,生成最新快照 d,生成快照 a,b 中数据合并的大文件
场景四:
形容:
append 形式生成 a,b,c 三个 snapshot,对 c 的数据做更新生成 d,再基于 b 做文件合并。
模仿场景:
1、基于 b 快照做合并。
2、合并还未实现,另一条数据流先做 append,生成了 c 快照。
3、之前的合并还未实现,另一条数据流对 c 快照里的数据做批改,生成了快照 d
后果:
USE_STARTING_SEQUENCE_NUMBER | |
---|---|
true | 胜利 |
false | 失败 |
论断
测试后果与剖析相匹配。
设置 USE_STARTING_SEQUENCE_NUMBER 该参数为 true,能够批改新生成的 dataFile 和 manifest 的 sequenceNumber 为原数据文件的 sequenceNumber,这样在读取数据的时候,就能够把 delteFile 利用到新生成的 dataFile 中了,能够解决数据抵触的状况。