关于flink:Flink-流式写入Iceberg实现原理

45次阅读

共计 8646 个字符,预计需要花费 22 分钟才能阅读完成。

Iceberg 作为凌驾于 HDFS 和 S3 等存储系统之上的数据组织框架,提供了数据写入、读取、文件治理和元数据管理等基本功能,尽管 Iceberg 提供了丰盛的 API 接口,然而面向 API 开发须要应用方比拟理解其原理和实现细节,还是显得门槛过高。此外,在面向实时数据读写场景,须要有一个桥接框架来主动实现数据的读写,于是 Iceberg 和 Flink 成为天作之合,本文就来钻研下 Iceberg 是如何跟 Flink 对接的。

Flink 写入 Iceberg 总体流程介绍

Flink 典型的数据处理链路是 Source->Transform->Sink,对 Iceberg 来讲,也听从这一模式,比方下图:

Custom Souce 是自定义的数据源类型的 Source,用于向上游发送数据,比方上面的数据来源于动态 List 汇合:

DataStream<RowData> dataStream = env.fromCollection(list)

IcebergStreamWriter 起着数据变换作用,跟 Source 组成链式 Operator,IcebergFilesCommiter 作为 Sink,将数据提交到本地文件 test 表。

Source 端发送数据到 IcebergStreamWriter,IcebergFilesCommiter 将从 IcebergStreamWriter 获取的数据提交到元数据管理系统,比方 Hive Metastore 或者文件系统。当胜利提交元数据之后,写入的数据才对外部可见。在这一个过程中,IcebergStreamWriter 除了相当于上述链路模式中的 Transform 角色之外,还有一个重要起因:实现事务提交隔离。IcebergStreamWriter 将数据临时写入到一个缓冲文件,该文件临时对外部是不可见的,而后 IcebergFilesCommiter 再将 IcebergStreamWriter 写入的文件的元信息,比方门路、文件大小,记录行数等写入到 ManifestFile 中,最初将 ManifestFile 文件元信息再写入到 ManifestList(ManifestList 即快照信息),ManifestList 又被写入以版本号辨别的 metadata 文件中(v% 版本号 %.metadata.json),下图展现了一个残缺的数据包含元数据组织示例:

上面开展来讲下实现上述指标的细节内容:

首先 Source 端 DataStream 数据流通过 IcebergStreamWriter 变换,生成新的 DataStream:SingleOutputStreamOperator,输入类型是 WriteResult:

//DataStream<RowData> input
//IcebergStreamWriter streamWriter
SingleOutputStreamOperator<WriteResult> writerStream = input
    .transform(operatorName(ICEBERG_STREAM_WRITER_NAME), TypeInformation.of(WriteResult.class), streamWriter)
    .setParallelism(parallelism);

其中 WriteResult 的定义如下:

public class WriteResult implements Serializable {private DataFile[] dataFiles;
  private DeleteFile[] deleteFiles;
  private CharSequence[] referencedDataFiles;
    ...
}

从类定义可知,IcebergStreamWriter 的输入后果其实只是该过程产生的数据文件,次要包含 DataFile 和 DeleteFile,referencedDataFiles 临时先不关注。

而后,IcebergFilesCommiter 对上游的 Operator 做变换,生成新的 DataStream:SingleOutputStreamOperator,这个过程只是提交元数据,自身不会再往上游发送数据,所以返回数据类型为 Void:

//SingleOutputStreamOperator<WriteResult> writerStream
SingleOutputStreamOperator<Void> committerStream = writerStream
    .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
    .setParallelism(1)
    .setMaxParallelism(1);

IcebergStreamWriter 和 IcebergFilesCommiter 实现详细分析

从下面介绍可知,IcebergStreamWriter 和 IcebergFilesCommiter 是最次要的两个数据处理过程,上面对其具体介绍。IcebergStreamWriter 和 IcebergFilesCommiter 都是 AbstractStreamOperator 的子类,自身除了要实现对单个元素的解决逻辑,还有对快照解决的相干逻辑,先说说对单个元素的解决逻辑:

class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
    implements OneInputStreamOperator<T, WriteResult>, BoundedOneInput {

    private transient TaskWriter<T> writer;
    @Override
    public void processElement(StreamRecord<T> element) throws Exception {writer.write(element.getValue());
    }

    @Override
    public void endInput() throws IOException {
        // For bounded stream, it may don't enable the checkpoint mechanism so we'd better to emit the remaining
        // completed files to downstream before closing the writer so that we won't miss any of them.
        emit(writer.complete());
    }
}

IcebergStreamWriter 的 processElement 逻辑看起来比较简单,理论都封装到 TaskWriter 中去了。endInput 办法起着兜底作用,在敞开 writer 之前,将残余未发送的数据发到上游,由前文可知,发给上游的数据类型为 WriteResult。这里一个显著问题是 writer 是无止境地往文件中写吗?理论不是的,Writer 会依据写入的文件大小主动切换新的 Writer。

再来看 IcebergFilesCommitter:

class IcebergFilesCommitter extends AbstractStreamOperator<Void>
    implements OneInputStreamOperator<WriteResult, Void>, BoundedOneInput {

  // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the 'dataFilesPerCheckpoint'.
  private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();

    @Override
  public void processElement(StreamRecord<WriteResult> element) {this.writeResultsOfCurrentCkpt.add(element.getValue());
  }

  @Override
  public void endInput() throws IOException {
    // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
    long currentCheckpointId = Long.MAX_VALUE;
    dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId));
    writeResultsOfCurrentCkpt.clear();

    commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, currentCheckpointId);
  }
}

IcebergFilesCommitter 的 processElement 将接管到的元素(WriteResult)放到内存 List 中,在 endInput 中提交。这里有一个显著的问题:如果数据无界,List writeResultsOfCurrentCkpt 是否可能被撑爆?因为 WriteResult 只是记录文件的元信息,比方地位等,理论数据曾经落盘了,尽管如此,这种实现也是实用于有界数据。如果实时无界数据流,就须要靠 Checkpoint 机制了。先来看 IcebergStreamWriter 如何实现 Checkpoint 快照逻辑:

class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
    implements OneInputStreamOperator<T, WriteResult>, BoundedOneInput {

  ...
  @Override
  public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
    // close all open files and emit files to downstream committer operator
    emit(writer.complete());

    this.writer = taskWriterFactory.create();}
  ...
}

IcebergStreamWriter 的 Checkpoint 基于默认实现,只重写了 prepareSnapshotPreBarrier 预发送逻辑:完结以后 Writer,创立 WriteResult,发送到上游,同时切换 writer 实例,仅此而已。Checkpoint 逻辑的重头戏在 IcebergFilesCommitter 中,因为 IcebergStreamWriter 反对并发创立 Checkpoint,它只负责将写入后果发送到上游,而上游的 IcebergFilesCommitter 逻辑波及到多 Checkpoint 的排序、ManifestFlie 和 ManifestList 的创立、snapshot 状态的保留与保护等,为保障 Checkpoint 的事务性,IcebergFilesCommitter 采纳串行化提交。

class IcebergFilesCommitter extends AbstractStreamOperator<Void>
    implements OneInputStreamOperator<WriteResult, Void>, BoundedOneInput {

  // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the 'dataFilesPerCheckpoint'.
  private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();

   @Override
  public void initializeState(StateInitializationContext context) throws Exception {
     ...
      this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, restoredFlinkJobId);

      NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
          .newTreeMap(checkpointsState.get().iterator().next())
          .tailMap(maxCommittedCheckpointId, false);
      if (!uncommittedDataFiles.isEmpty()) {
        // Committed all uncommitted data files from the old flink job to iceberg table.
        long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
        commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
      }
   ...
    }

  @Override
  public void snapshotState(StateSnapshotContext context) throws Exception {
    ...
    // Update the checkpoint state.
    dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
    // Reset the snapshot state to the latest state.
    checkpointsState.clear();
    checkpointsState.add(dataFilesPerCheckpoint);
    ...
  }

  @Override
  public void notifyCheckpointComplete(long checkpointId) throws Exception {super.notifyCheckpointComplete(checkpointId);
    // It's possible that we have the following events:
    //   1. snapshotState(ckpId);
    //   2. snapshotState(ckpId+1);
    //   3. notifyCheckpointComplete(ckpId+1);
    //   4. notifyCheckpointComplete(ckpId);
    // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
    // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
    if (checkpointId > maxCommittedCheckpointId) {commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
      this.maxCommittedCheckpointId = checkpointId;
    }
  }

这就是 IcebergFilesCommitter 整个的 Checkpoint 的提交逻辑,不论是无 Checkpoint 还是有 Checkpoint,最终都落脚到 commitUpToCheckpoint 办法中进行元数据的提交,接下来独自看它的实现形式。

Checkpoint 元数据提交详解

Iceberg 把每次 Checkpoint 的变更操作用一个接口类 SnapshotUpdate 来定义,其中一个次要办法就是 commit,它的子类实现比拟多,比方上面的:

这些操作波及到重写文件、增加文件、删除文件、重写 Manifest 文件、替换分区、快照回滚等,同时在 Table 中也向用户裸露了一些操作接口,比方:

举两个例子,newAppend 用于向 table 中追加数据文件,newRowDelta 向表中同时追加数据文件和删除文件:

table.newAppend()
        .appendFile(dataFile)
        .commit();

table.newRowDelta()
        .addRows(dataFile)
        .addDeletes(deleteFile)
        .commit();

而在 SnapshotUpdate 实现类的 commit 办法中,又通过接口 TableOperations 的 commit 来实现:

TableOperations operations = ((BaseTable) table).operations();
TableMetadata metadata = operations.current();
operations.commit(metadata, metadata.upgradeToFormatVersion(2));

TableOperations 是干啥的呢?类正文是这么写的:

SPI interface to abstract table metadata access and updates.

从接口定义来看,它是对于 metadata 的拜访和更新操作的,其有三个次要实现类:HadoopTableOperations、HiveTableOperations 和 JdbcTableOperations。为什么会有这么多不同的实现类呢?回顾文章结尾局部的 Iceberg 的元数据组织构造和 Checkpoint 过程可知,Iceberg 反对并发 Checkpoint,然而在提交元数据阶段又要保障事务性,因而在提交元数据过程如何保障原子性地让 metadata 文件依照提交程序递增是一大挑战。

先来看 HadoopTableOperations 是如何实现的?HadoopTable 是将元数据存储在 HDFS 文件上,没有依赖 HMS,因而在生成 v% 版本号 %.metadata.json 文件时,须要有一个中央记录上次最大的版本序号,Iceberg 的做法是在 metadata 目录创立一个 version-hint.txt 文件,外面记录上次的版本序号,每次提交新的 metadata.json 文件时就更新这个值,而如何保障提交新的 metadata.json 不会导致抵触,Iceberg 是学生成一个随机序号的长期 metadata.json 文件,而后再通过 rename 到以后版本号 + 1 的 metadata.json 文件,如果 rename 胜利,则示意没有抵触,否则抛弃以后交。

再看 HiveTableOperations 是如何保障 metadata.json 的原子程序提交的?HiveTable 跟 HadoopTable 的区别是,HiveTable 依赖 HMS,HiveTableOperations 通过一个过程级别的全局锁来管制每个表一把锁,进而管制对表元数据批改的并发提交。

而 JdbcTableOperations 是将表的元数据信息存储在反对 JDBC 协定的数据库中的固定表 iceberg_tables 中,所有对表元信息的更新通过数据库自身的乐观锁实现。

总结

Flink 实时写入 Iceberg 的过程不管是否基于 Checkpoint,都是通过两阶段实现,并且做到事务隔离,即 IcebergStreamWriter 负责数据的写入落盘,而后写入后果 WriteResult 发送到上游,IcebergFilesCommitter 从 IcebergStreamWriter 发送的多个 WriteResult 中回放数据文件,依照数据类型的不同(DAtaFile、DeleteFile)创立对应类型的 ManifestFile,并创立 ManifestList 作为快照保留在序号递增的 metadata.json 文件中,为保障 metadata.json 序号的递增,Iceberg 采纳了多种形式来实现并发更新操作来满足不同场景的须要,比方 HadoopTable 基于 HDFS,不依赖 HMS,通过文件的 rename 操作进行并发管制,HiveTable 依赖 HMS,通过过程级的表锁来管制并发,而 JdbcTable 依赖反对 JDBC 协定的数据库自身的乐观锁机制来实现并发管制。

Iceberg 作为数据湖的代表性框架,可能解决离线数仓的很多痛点,后劲很大,然而因为比拟”年老“,还存在很多问题,比方不反对跨分区的、跨快照的数据去重,小文件合并在大数据集场景下的性能问题、Schema 不反对动静变更、索引能力较弱等,这些问题不解决,将会重大阻塞其推广应用,本文在后续将陆续介绍这些问题的解决办法。

正文完
 0