关于数据湖:Hudi源码解读Archive-流程

23次阅读

共计 4989 个字符,预计需要花费 13 分钟才能阅读完成。

简介

在数据一直写入 Hudi 期间,Hudi 会一直生成 commit、deltacommit、clean 等 Instant 记录每一次操作类型、状态及具体的元数据,这些 Instant 最终都会存到 .hoodie 元数据目录下,为了防止元数据文件数量过多,ActiveTimeline 越来越长,须要对比拟长远的操作进行归档(archive),将这部分操作移到 .hoodie/archive 目录下,独自造成一个 ArchivedTimeline。

由此可知,archive 是 Hudi Timeline 上的操作,操作的数据是 instant 粒度。

相干配置

// org.apache.hudi.config.HoodieArchivalConfig
public static final ConfigProperty<String> ASYNC_ARCHIVE = ConfigProperty
    .key("hoodie.archive.async")
    .defaultValue("false")
    .sinceVersion("0.11.0")
    .withDocumentation("Only applies when" + AUTO_ARCHIVE.key() + "is turned on."
          + "When turned on runs archiver async with writing, which can speed up overall write performance.");

public static final ConfigProperty<String> MAX_COMMITS_TO_KEEP_PROP = ConfigProperty
    .key("hoodie.keep.max.commits")
    .defaultValue("150")
    .withDocumentation("Archiving service moves older entries from timeline into an archived log after each write, to"
        + "keep the metadata overhead constant, even as the table size grows."
        + "This config controls the maximum number of instants to retain in the active timeline.");

public static final ConfigProperty<String> MIN_COMMITS_TO_KEEP_PROP = ConfigProperty
    .key("hoodie.keep.min.commits")
    .defaultValue("145")
    .withDocumentation("Similar to" + MAX_COMMITS_TO_KEEP_PROP.key() + ", but controls the minimum number of"
        + "instants to retain in the active timeline.");
        
public static final ConfigProperty<String> COMMITS_ARCHIVAL_BATCH_SIZE_PROP = ConfigProperty
  .key("hoodie.commits.archival.batch")
  .defaultValue(String.valueOf(10))
  .withDocumentation("Archiving of instants is batched in best-effort manner, to pack more instants into a single"
      + "archive log. This config controls such archival batch size.");

MAX_COMMITS_TO_KEEP_PROP:ActiveTimeLine 最多保留的 instant 个数

MIN_COMMITS_TO_KEEP_PROP:ActiveTimeLine 起码保留的 instant 个数

COMMITS_ARCHIVAL_BATCH_SIZE_PROP:每次 archive 最多的 instant 个数

注:archive 配置与 clean 配置存在关系:hoodie.cleaner.commits.retained <= hoodie,keep.min.commits

入口

checkpoint 的时候提交完 writeStat 后(org.apache.hudi.client.BaseHoodieWriteClient#commitStats),会尝试进行 archive:

// HoodieFlinkWriteClient
protected void mayBeCleanAndArchive(HoodieTable table) {autoArchiveOnCommit(table);
}

依据是否配置了 hoodie.archive.async 决定是同步 / 异步归档。

归档

HoodieTimelineArchiver

public boolean archiveIfRequired(HoodieEngineContext context) throws IOException {
try {List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList());

  boolean success = true;
  if (!instantsToArchive.isEmpty() && instantsToArchive.size() >= this.config.getCommitArchivalBatchSize()) {this.writer = openWriter();
    LOG.info("Archiving instants" + instantsToArchive);
    archive(context, instantsToArchive);
    LOG.info("Deleting archived instants" + instantsToArchive);
    success = deleteArchivedInstants(instantsToArchive);
  } else {LOG.info("No Instants to archive");
  }

  return success;
} finally {close();
}
}

该办法定义了整体 archive 的流程:

  1. 找出所有可归档的 instant;
  2. 执行归档,即写到 archive 目录下;
  3. 删除归档文件,即删除 .hoodie 下所有对应的 instant,以及 .aux 目录下所有的 compaction.requested。

找出所有可归档的 instant

获取所有已实现的 clean、rollback instant,按操作类型分组,如果某类操作 instant 超过了最大保留的 instant,则触发归档,只留下起码需保留的 instant。

getCommitInstantsToArchive:

  1. 获取最老的未实现的 compaction、replace commit instant;
  2. 获取最老的处于 inflight 状态的 commit、delta commit instant;
  3. 找到该表的第一个 savepoint instant;
  4. 获取 completed commit timline(COW 表为 commit、replace commit,MOR 为 commit、delta commit、replace commit),判断其 instant 个数是否超过 maxInstantsToKeep;
  5. 如果超过,针对 MOR 表找到尚未参加 compaction 的最早的 delta commit instant;
  6. 获取 clutering 场景下需保留的最早的 instant:

    1. 找到最近一次已实现的 clean instant,读取元数据中的 earliestInstantToRetain,取比 earliestInstantToRetain 新的 instants 中最老的 replace commit instant;
    2. 找到最老的处于 inflight 状态的 replace commit instant,如果存在,找到在此之前最近一次 commit/delta commit/replace commit;
    3. 取 1 和 2 中更老的 instant。

    不能归档 clustering instant,除非其对应的数据文件已被清理,不然 fs view 可能会反复读数据(替换前后文件都存在的状况下)

  7. 如果开启 hoodie.archive.beyond.savepoint,则归档的 instant 只跳过 savepoint instant,否则则跳过第一个 sapoveint 及之后所有的 instant;
  8. 筛选 instant time 比上述 instant 小的 compleated commit/delat commit/replace commit,且满足保留 instant 个数大于 minInstantsToKeep。

getCleanInstantsToArchive:

  1. 获取所有已实现的 clean、rollback instant;
  2. 如果 clean 或 rollback 的 instant 个数超过 maxInstantsToKeep,则去掉最新的 minInstantsToKeep 个数的 instants,否则返回空;

通过这两个把须要 archive 的 completed instants 筛选进去后,再联合 timeline 填充这些 instant 对应的 requested/inflight instants。

执行归档

删除这些 instant 对应的 markfile 目录,.hoodie/.temp/\<instantTs\>

将每个要归档的 instant 转成一个 IndexedRecord,对应的具体实现为 HoodieArchivedMetaEntry,次要蕴含四局部信息:

  • commitTime:instant 对应工夫戳;
  • actionState:操作状态;
  • actionType:操作类型;
  • metadata:操作的元数据;

最初将这些 record 组织成 HoodieAvroDataBlock 按 log 格局落盘,落盘的文件名格局与 logfile 统一,须要将 .log 后缀改为 .archive,比方 .hoodie/archived/commits_.archive.1。

删除已归档的 Instant

删除须要先删 requested/inflight 再删 completed。如果先删 completed 的文件,则其余基于 timeline 的服务(比方 compaction、clustering)可能会谬误的将某次操作状态认为是 pending,从而造成 bug。

如果 archive 的 instants 中蕴含 commit/delta commit,则删除对应 aux 目录下的元数据文件。

合并归档

从 0.11 版本开始,Hudi 反对对归档的后果文件进行合并 HoodieTimelineArchiver#mergeArchiveFilesIfNecessary。尽管每次归档都将多个 Instant 生成单个 .commit_.arvhive 文件,但归档的次数多后也会有大量的归档后果文件,所以对于较小的归档后果文件进行合并。
当启用归档合并时(hoodie.archive.merge.enable)且文件系统反对 append(StorageSchemas, 例如:file://),如果 arvhive 目录下存在大小小于 hoodie.archive.merge.small.file.limit.bytes 的文件数量达到 hoodie.archive.merge.files.batch.size,即触发合并

  1. 从新到旧获取间断所有大小小于 hoodie.archive.merge.small.file.limit.bytes 的候选文件 candidateFiles;
  2. 在这次归档生成的文件的根底上 rollover,作为合并归档用的数据文件;
  3. 生成合并打算,记录要合并的文件、合并后的文件名等信息,将这些内容保留在 arvhive 目录下的 mergeArchivePlan 文件;
  4. 合并文件,将小文件的内容都读出来写到新的合并文件中去;
  5. 胜利后删除候选文件。

正文完
 0