简介
在数据一直写入 Hudi 期间,Hudi 会一直生成 commit、deltacommit、clean 等 Instant 记录每一次操作类型、状态及具体的元数据,这些 Instant 最终都会存到 .hoodie 元数据目录下,为了防止元数据文件数量过多,ActiveTimeline 越来越长,须要对比拟长远的操作进行归档(archive),将这部分操作移到 .hoodie/archive 目录下,独自造成一个 ArchivedTimeline。
由此可知,archive 是 Hudi Timeline 上的操作,操作的数据是 instant 粒度。
相干配置
// org.apache.hudi.config.HoodieArchivalConfig
public static final ConfigProperty<String> ASYNC_ARCHIVE = ConfigProperty
.key("hoodie.archive.async")
.defaultValue("false")
.sinceVersion("0.11.0")
.withDocumentation("Only applies when" + AUTO_ARCHIVE.key() + "is turned on."
+ "When turned on runs archiver async with writing, which can speed up overall write performance.");
public static final ConfigProperty<String> MAX_COMMITS_TO_KEEP_PROP = ConfigProperty
.key("hoodie.keep.max.commits")
.defaultValue("150")
.withDocumentation("Archiving service moves older entries from timeline into an archived log after each write, to"
+ "keep the metadata overhead constant, even as the table size grows."
+ "This config controls the maximum number of instants to retain in the active timeline.");
public static final ConfigProperty<String> MIN_COMMITS_TO_KEEP_PROP = ConfigProperty
.key("hoodie.keep.min.commits")
.defaultValue("145")
.withDocumentation("Similar to" + MAX_COMMITS_TO_KEEP_PROP.key() + ", but controls the minimum number of"
+ "instants to retain in the active timeline.");
public static final ConfigProperty<String> COMMITS_ARCHIVAL_BATCH_SIZE_PROP = ConfigProperty
.key("hoodie.commits.archival.batch")
.defaultValue(String.valueOf(10))
.withDocumentation("Archiving of instants is batched in best-effort manner, to pack more instants into a single"
+ "archive log. This config controls such archival batch size.");
MAX_COMMITS_TO_KEEP_PROP:ActiveTimeLine 最多保留的 instant 个数
MIN_COMMITS_TO_KEEP_PROP:ActiveTimeLine 起码保留的 instant 个数
COMMITS_ARCHIVAL_BATCH_SIZE_PROP:每次 archive 最多的 instant 个数
注:archive 配置与 clean 配置存在关系:hoodie.cleaner.commits.retained <= hoodie,keep.min.commits
入口
checkpoint 的时候提交完 writeStat 后(org.apache.hudi.client.BaseHoodieWriteClient#commitStats),会尝试进行 archive:
// HoodieFlinkWriteClient
protected void mayBeCleanAndArchive(HoodieTable table) {autoArchiveOnCommit(table);
}
依据是否配置了 hoodie.archive.async 决定是同步 / 异步归档。
归档
HoodieTimelineArchiver
public boolean archiveIfRequired(HoodieEngineContext context) throws IOException {
try {List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList());
boolean success = true;
if (!instantsToArchive.isEmpty() && instantsToArchive.size() >= this.config.getCommitArchivalBatchSize()) {this.writer = openWriter();
LOG.info("Archiving instants" + instantsToArchive);
archive(context, instantsToArchive);
LOG.info("Deleting archived instants" + instantsToArchive);
success = deleteArchivedInstants(instantsToArchive);
} else {LOG.info("No Instants to archive");
}
return success;
} finally {close();
}
}
该办法定义了整体 archive 的流程:
- 找出所有可归档的 instant;
- 执行归档,即写到 archive 目录下;
- 删除归档文件,即删除 .hoodie 下所有对应的 instant,以及 .aux 目录下所有的 compaction.requested。
找出所有可归档的 instant
获取所有已实现的 clean、rollback instant,按操作类型分组,如果某类操作 instant 超过了最大保留的 instant,则触发归档,只留下起码需保留的 instant。
getCommitInstantsToArchive:
- 获取最老的未实现的 compaction、replace commit instant;
- 获取最老的处于 inflight 状态的 commit、delta commit instant;
- 找到该表的第一个 savepoint instant;
- 获取 completed commit timline(COW 表为 commit、replace commit,MOR 为 commit、delta commit、replace commit),判断其 instant 个数是否超过 maxInstantsToKeep;
- 如果超过,针对 MOR 表找到尚未参加 compaction 的最早的 delta commit instant;
-
获取 clutering 场景下需保留的最早的 instant:
- 找到最近一次已实现的 clean instant,读取元数据中的 earliestInstantToRetain,取比 earliestInstantToRetain 新的 instants 中最老的 replace commit instant;
- 找到最老的处于 inflight 状态的 replace commit instant,如果存在,找到在此之前最近一次 commit/delta commit/replace commit;
- 取 1 和 2 中更老的 instant。
不能归档 clustering instant,除非其对应的数据文件已被清理,不然 fs view 可能会反复读数据(替换前后文件都存在的状况下)
- 如果开启 hoodie.archive.beyond.savepoint,则归档的 instant 只跳过 savepoint instant,否则则跳过第一个 sapoveint 及之后所有的 instant;
- 筛选 instant time 比上述 instant 小的 compleated commit/delat commit/replace commit,且满足保留 instant 个数大于 minInstantsToKeep。
getCleanInstantsToArchive:
- 获取所有已实现的 clean、rollback instant;
- 如果 clean 或 rollback 的 instant 个数超过 maxInstantsToKeep,则去掉最新的 minInstantsToKeep 个数的 instants,否则返回空;
通过这两个把须要 archive 的 completed instants 筛选进去后,再联合 timeline 填充这些 instant 对应的 requested/inflight instants。
执行归档
删除这些 instant 对应的 markfile 目录,.hoodie/.temp/\<instantTs\>
将每个要归档的 instant 转成一个 IndexedRecord,对应的具体实现为 HoodieArchivedMetaEntry,次要蕴含四局部信息:
- commitTime:instant 对应工夫戳;
- actionState:操作状态;
- actionType:操作类型;
- metadata:操作的元数据;
最初将这些 record 组织成 HoodieAvroDataBlock 按 log 格局落盘,落盘的文件名格局与 logfile 统一,须要将 .log 后缀改为 .archive,比方 .hoodie/archived/commits_.archive.1。
删除已归档的 Instant
删除须要先删 requested/inflight 再删 completed。如果先删 completed 的文件,则其余基于 timeline 的服务(比方 compaction、clustering)可能会谬误的将某次操作状态认为是 pending,从而造成 bug。
如果 archive 的 instants 中蕴含 commit/delta commit,则删除对应 aux 目录下的元数据文件。
合并归档
从 0.11 版本开始,Hudi 反对对归档的后果文件进行合并 HoodieTimelineArchiver#mergeArchiveFilesIfNecessary
。尽管每次归档都将多个 Instant 生成单个 .commit_.arvhive 文件,但归档的次数多后也会有大量的归档后果文件,所以对于较小的归档后果文件进行合并。
当启用归档合并时(hoodie.archive.merge.enable)且文件系统反对 append(StorageSchemas, 例如:file://),如果 arvhive 目录下存在大小小于 hoodie.archive.merge.small.file.limit.bytes 的文件数量达到 hoodie.archive.merge.files.batch.size,即触发合并
- 从新到旧获取间断所有大小小于 hoodie.archive.merge.small.file.limit.bytes 的候选文件 candidateFiles;
- 在这次归档生成的文件的根底上 rollover,作为合并归档用的数据文件;
- 生成合并打算,记录要合并的文件、合并后的文件名等信息,将这些内容保留在 arvhive 目录下的 mergeArchivePlan 文件;
- 合并文件,将小文件的内容都读出来写到新的合并文件中去;
- 胜利后删除候选文件。