Iceberg作为凌驾于HDFS和S3等存储系统之上的数据组织框架,提供了数据写入、读取、文件治理和元数据管理等基本功能,尽管Iceberg提供了丰盛的API接口,然而面向API开发须要应用方比拟理解其原理和实现细节,还是显得门槛过高。此外,在面向实时数据读写场景,须要有一个桥接框架来主动实现数据的读写,于是Iceberg和Flink成为天作之合,本文就来钻研下Iceberg是如何跟Flink对接的。
Flink写入Iceberg总体流程介绍
Flink典型的数据处理链路是Source->Transform->Sink,对Iceberg来讲,也听从这一模式,比方下图:
Custom Souce是自定义的数据源类型的Source,用于向上游发送数据,比方上面的数据来源于动态List汇合:
DataStream<RowData> dataStream = env.fromCollection(list)
IcebergStreamWriter起着数据变换作用,跟Source 组成链式Operator,IcebergFilesCommiter作为Sink,将数据提交到本地文件test表。
Source端发送数据到IcebergStreamWriter,IcebergFilesCommiter将从IcebergStreamWriter获取的数据提交到元数据管理系统,比方Hive Metastore或者文件系统。当胜利提交元数据之后,写入的数据才对外部可见。在这一个过程中,IcebergStreamWriter除了相当于上述链路模式中的Transform角色之外,还有一个重要起因:实现事务提交隔离。IcebergStreamWriter将数据临时写入到一个缓冲文件,该文件临时对外部是不可见的,而后IcebergFilesCommiter再将IcebergStreamWriter写入的文件的元信息,比方门路、文件大小,记录行数等写入到ManifestFile中,最初将ManifestFile文件元信息再写入到ManifestList(ManifestList即快照信息),ManifestList又被写入以版本号辨别的metadata文件中(v%版本号%.metadata.json),下图展现了一个残缺的数据包含元数据组织示例:
上面开展来讲下实现上述指标的细节内容:
首先Source端DataStream数据流通过IcebergStreamWriter变换,生成新的DataStream: SingleOutputStreamOperator ,输入类型是WriteResult:
//DataStream<RowData> input//IcebergStreamWriter streamWriterSingleOutputStreamOperator<WriteResult> writerStream = input .transform(operatorName(ICEBERG_STREAM_WRITER_NAME), TypeInformation.of(WriteResult.class), streamWriter) .setParallelism(parallelism);
其中WriteResult的定义如下:
public class WriteResult implements Serializable { private DataFile[] dataFiles; private DeleteFile[] deleteFiles; private CharSequence[] referencedDataFiles; ...}
从类定义可知,IcebergStreamWriter的输入后果其实只是该过程产生的数据文件,次要包含DataFile和DeleteFile,referencedDataFiles临时先不关注。
而后,IcebergFilesCommiter对上游的Operator 做变换,生成新的DataStream:SingleOutputStreamOperator,这个过程只是提交元数据,自身不会再往上游发送数据,所以返回数据类型为Void:
//SingleOutputStreamOperator<WriteResult> writerStreamSingleOutputStreamOperator<Void> committerStream = writerStream .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter) .setParallelism(1) .setMaxParallelism(1);
IcebergStreamWriter和IcebergFilesCommiter实现详细分析
从下面介绍可知,IcebergStreamWriter和IcebergFilesCommiter是最次要的两个数据处理过程,上面对其具体介绍。IcebergStreamWriter和IcebergFilesCommiter都是AbstractStreamOperator的子类,自身除了要实现对单个元素的解决逻辑,还有对快照解决的相干逻辑,先说说对单个元素的解决逻辑:
class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult> implements OneInputStreamOperator<T, WriteResult>, BoundedOneInput { private transient TaskWriter<T> writer; @Override public void processElement(StreamRecord<T> element) throws Exception { writer.write(element.getValue()); } @Override public void endInput() throws IOException { // For bounded stream, it may don't enable the checkpoint mechanism so we'd better to emit the remaining // completed files to downstream before closing the writer so that we won't miss any of them. emit(writer.complete()); }}
IcebergStreamWriter的processElement逻辑看起来比较简单,理论都封装到TaskWriter中去了。endInput办法起着兜底作用,在敞开writer之前,将残余未发送的数据发到上游,由前文可知,发给上游的数据类型为WriteResult。这里一个显著问题是writer是无止境地往文件中写吗?理论不是的,Writer会依据写入的文件大小主动切换新的Writer。
再来看IcebergFilesCommitter:
class IcebergFilesCommitter extends AbstractStreamOperator<Void> implements OneInputStreamOperator<WriteResult, Void>, BoundedOneInput { // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the 'dataFilesPerCheckpoint'. private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList(); @Override public void processElement(StreamRecord<WriteResult> element) { this.writeResultsOfCurrentCkpt.add(element.getValue()); } @Override public void endInput() throws IOException { // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly. long currentCheckpointId = Long.MAX_VALUE; dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId)); writeResultsOfCurrentCkpt.clear(); commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, currentCheckpointId); }}
IcebergFilesCommitter的processElement将接管到的元素(WriteResult)放到内存List中,在endInput中提交。这里有一个显著的问题:如果数据无界,List writeResultsOfCurrentCkpt是否可能被撑爆?因为WriteResult只是记录文件的元信息,比方地位等,理论数据曾经落盘了,尽管如此,这种实现也是实用于有界数据。如果实时无界数据流,就须要靠Checkpoint机制了。先来看IcebergStreamWriter如何实现Checkpoint快照逻辑:
class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult> implements OneInputStreamOperator<T, WriteResult>, BoundedOneInput { ... @Override public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { // close all open files and emit files to downstream committer operator emit(writer.complete()); this.writer = taskWriterFactory.create(); } ...}
IcebergStreamWriter的Checkpoint 基于默认实现,只重写了prepareSnapshotPreBarrier预发送逻辑:完结以后Writer,创立WriteResult,发送到上游,同时切换writer实例,仅此而已。Checkpoint逻辑的重头戏在IcebergFilesCommitter中,因为IcebergStreamWriter反对并发创立Checkpoint,它只负责将写入后果发送到上游,而上游的IcebergFilesCommitter逻辑波及到多Checkpoint的排序、ManifestFlie和ManifestList的创立、snapshot 状态的保留与保护等,为保障Checkpoint的事务性,IcebergFilesCommitter采纳串行化提交。
class IcebergFilesCommitter extends AbstractStreamOperator<Void> implements OneInputStreamOperator<WriteResult, Void>, BoundedOneInput { // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the 'dataFilesPerCheckpoint'. private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList(); @Override public void initializeState(StateInitializationContext context) throws Exception { ... this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, restoredFlinkJobId); NavigableMap<Long, byte[]> uncommittedDataFiles = Maps .newTreeMap(checkpointsState.get().iterator().next()) .tailMap(maxCommittedCheckpointId, false); if (!uncommittedDataFiles.isEmpty()) { // Committed all uncommitted data files from the old flink job to iceberg table. long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey(); commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId); } ... } @Override public void snapshotState(StateSnapshotContext context) throws Exception { ... // Update the checkpoint state. dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId)); // Reset the snapshot state to the latest state. checkpointsState.clear(); checkpointsState.add(dataFilesPerCheckpoint); ... } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); // It's possible that we have the following events: // 1. snapshotState(ckpId); // 2. snapshotState(ckpId+1); // 3. notifyCheckpointComplete(ckpId+1); // 4. notifyCheckpointComplete(ckpId); // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files, // Besides, we need to maintain the max-committed-checkpoint-id to be increasing. if (checkpointId > maxCommittedCheckpointId) { commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId); this.maxCommittedCheckpointId = checkpointId; } }
这就是IcebergFilesCommitter整个的Checkpoint的提交逻辑,不论是无Checkpoint还是有Checkpoint,最终都落脚到commitUpToCheckpoint办法中进行元数据的提交,接下来独自看它的实现形式。
Checkpoint元数据提交详解
Iceberg把每次Checkpoint的变更操作用一个接口类SnapshotUpdate来定义,其中一个次要办法就是commit,它的子类实现比拟多,比方上面的:
这些操作波及到重写文件、增加文件、删除文件、重写Manifest文件、替换分区、快照回滚等,同时在Table中也向用户裸露了一些操作接口,比方:
举两个例子,newAppend用于向table中追加数据文件,newRowDelta向表中同时追加数据文件和删除文件:
table.newAppend() .appendFile(dataFile) .commit();table.newRowDelta() .addRows(dataFile) .addDeletes(deleteFile) .commit();
而在SnapshotUpdate实现类的commit办法中,又通过接口TableOperations的commit来实现:
TableOperations operations = ((BaseTable) table).operations();TableMetadata metadata = operations.current();operations.commit(metadata, metadata.upgradeToFormatVersion(2));
TableOperations是干啥的呢?类正文是这么写的:
SPI interface to abstract table metadata access and updates.
从接口定义来看,它是对于metadata的拜访和更新操作的,其有三个次要实现类:HadoopTableOperations、HiveTableOperations和JdbcTableOperations。为什么会有这么多不同的实现类呢?回顾文章结尾局部的Iceberg的元数据组织构造和Checkpoint过程可知,Iceberg反对并发Checkpoint,然而在提交元数据阶段又要保障事务性,因而在提交元数据过程如何保障原子性地让metadata文件依照提交程序递增是一大挑战。
先来看HadoopTableOperations是如何实现的?HadoopTable是将元数据存储在HDFS文件上,没有依赖HMS,因而在生成v%版本号%.metadata.json文件时,须要有一个中央记录上次最大的版本序号,Iceberg的做法是在metadata目录创立一个version-hint.txt文件,外面记录上次的版本序号,每次提交新的metadata.json文件时就更新这个值,而如何保障提交新的metadata.json不会导致抵触,Iceberg是学生成一个随机序号的长期metadata.json文件,而后再通过rename到以后版本号+1的metadata.json文件,如果rename胜利,则示意没有抵触,否则抛弃以后交。
再看HiveTableOperations是如何保障metadata.json的原子程序提交的?HiveTable跟HadoopTable的区别是,HiveTable依赖HMS,HiveTableOperations通过一个过程级别的全局锁来管制每个表一把锁,进而管制对表元数据批改的并发提交。
而JdbcTableOperations是将表的元数据信息存储在反对JDBC协定的数据库中的固定表iceberg_tables中,所有对表元信息的更新通过数据库自身的乐观锁实现。
总结
Flink实时写入Iceberg的过程不管是否基于Checkpoint,都是通过两阶段实现,并且做到事务隔离,即IcebergStreamWriter负责数据的写入落盘,而后写入后果WriteResult发送到上游,IcebergFilesCommitter从IcebergStreamWriter发送的多个WriteResult中回放数据文件,依照数据类型的不同(DAtaFile、DeleteFile)创立对应类型的ManifestFile,并创立ManifestList作为快照保留在序号递增的metadata.json文件中,为保障metadata.json序号的递增,Iceberg采纳了多种形式来实现并发更新操作来满足不同场景的须要,比方HadoopTable基于HDFS,不依赖HMS,通过文件的rename操作进行并发管制,HiveTable依赖HMS,通过过程级的表锁来管制并发,而JdbcTable依赖反对JDBC协定的数据库自身的乐观锁机制来实现并发管制。
Iceberg作为数据湖的代表性框架,可能解决离线数仓的很多痛点,后劲很大,然而因为比拟”年老“,还存在很多问题,比方不反对跨分区的、跨快照的数据去重,小文件合并在大数据集场景下的性能问题、Schema不反对动静变更、索引能力较弱等,这些问题不解决,将会重大阻塞其推广应用,本文在后续将陆续介绍这些问题的解决办法。