关于java:读Flink源码谈设计FileSystemConnector中的整洁架构

53次阅读

共计 5373 个字符,预计需要花费 14 分钟才能阅读完成。

本文首发于泊浮目标语雀:https://www.yuque.com/17sing

版本 日期 备注
1.0 2022.3.8 文章首发

本文基于 Flink 1.14 代码进行剖析。

0. 前言

前阵子在生产上碰到了一个诡异景象:全量作业无奈失常进行,日志中充斥着 java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container xxxx(HOSTNAME:PORT) timed out 的报错。

场景为 Oracle 全量抽取至 Hive,数据会流过 Kafka,数据量为 T 级别,依据工夫字段每天做一个分区。报错的 Job 负责抽取 Kafka 的数据并写至 Hive,应用的是 TableAPI。

1. 排查思路

这个问题报到我这边的时候,有同学曾经排查过一轮了。依据网上搜寻,会告知你可能是 yarn 的压力过大、网络短暂不稳固等,能够调大 heartbeat.timeout 来缓解这个问题,经调整改问题并未解决。

另外一个说法会告知你是 GC 频繁的起因。倡议调整内存,调整后,确实有肯定的成果(使出问题的工夫变慢)。那很显然和代码有关系了。

因为之前一个版本同步数据都没有出问题,因而开始寻找最近代码的改变,找了几圈下来并没有找到可疑的代码。登时感觉有点头皮发麻。于是让现场的同学切换到上个版本持续做全量,景象依旧会产生。

这时我就有点狐疑生产环境的个性了——比方数据个性,但现场的同学告知我数据并没有什么非凡之处。于是我要了一份现场的 HeapDump,丢到了剖析软件上进行查看,发现 org.apache.flink.streaming.api.functions.sink.filesystem.Bucket 的对象特地多。

于是看了一下 Bucket 对象的定义:


/**
 * A bucket is the directory organization of the output of the {@link StreamingFileSink}.
 *
 * <p>For each incoming element in the {@code StreamingFileSink}, the user-specified {@link
 * BucketAssigner} is queried to see in which bucket this element should be written to.
 */
@Internal
public class Bucket<IN, BucketID> {

好家伙。一个目录一个对象,此时此刻我曾经对现场的同学告知我的“数据没有什么非凡之处”产生了狐疑,不过为了实锤,我还是跟了一遍代码:

|-- HiveTableSink
   \-- createStreamSink
|-- StreamingFileSink
  \-- initializeState
|-- StreamingFileSinkHelper
  \-- constructor
|-- HadoopPathBasedBulkFormatBuilder
  \-- createBuckets
|-- Buckets
  \-- onElement
  \-- getOrCreateBucketForBucketId

过了一遍代码当前,心里便有了数。问了下现场,同步的数据时间跨度是不是特地大,现场同学确认后,时间跨度为 3 年多。于是倡议升高时间跨度,或者升高分区工夫。最终将全量批次进行切分后解决了这个问题。

2. 解决问题后的好奇

如果每个目录都会产生一个 Bucket,那如果运行一个流作业,岂不是迟早碰到雷同的问题。这么不言而喻的问题,社区的大神们必定早就想到了,好奇心驱使着我寻找答案——直到看到了这段代码:

    public void commitUpToCheckpoint(final long checkpointId) throws IOException {
        final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt =
                activeBuckets.entrySet().iterator();

        LOG.info("Subtask {} received completion notification for checkpoint with id={}.",
                subtaskIndex,
                checkpointId);

        while (activeBucketIt.hasNext()) {final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
            bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);

            if (!bucket.isActive()) {
                // We've dealt with all the pending files and the writer for this bucket is not
                // currently open.
                // Therefore this bucket is currently inactive and we can remove it from our state.
                activeBucketIt.remove();
                notifyBucketInactive(bucket);
            }
        }
    }

做 Checkpoint 后的提交时,这里会依据 Bucket 是否处于沉闷状态来决定是否移除在内存中保护的数据结构。

那么怎么才算沉闷呢?代码很简短:

    boolean isActive() {
        return inProgressPart != null
                || !pendingFileRecoverablesForCurrentCheckpoint.isEmpty()
                || !pendingFileRecoverablesPerCheckpoint.isEmpty();}

接下来就是讲清楚这三个的触发条件了。

2.1 inProgressPart == null

该对象的类型为InProgressFileWriter,触发条件和 FileSystem 的滚动策略非亲非故。


/**
 * The policy based on which a {@code Bucket} in the {@code Filesystem Sink} rolls its currently
 * open part file and opens a new one.
 */
@PublicEvolving
public interface RollingPolicy<IN, BucketID> extends Serializable {

    /**
     * Determines if the in-progress part file for a bucket should roll on every checkpoint.
     *
     * @param partFileState the state of the currently open part file of the bucket.
     * @return {@code True} if the part file should roll, {@link false} otherwise.
     */
    boolean shouldRollOnCheckpoint(final PartFileInfo<BucketID> partFileState) throws IOException;

    /**
     * Determines if the in-progress part file for a bucket should roll based on its current state,
     * e.g. its size.
     *
     * @param element the element being processed.
     * @param partFileState the state of the currently open part file of the bucket.
     * @return {@code True} if the part file should roll, {@link false} otherwise.
     */
    boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState, IN element)
            throws IOException;

    /**
     * Determines if the in-progress part file for a bucket should roll based on a time condition.
     *
     * @param partFileState the state of the currently open part file of the bucket.
     * @param currentTime the current processing time.
     * @return {@code True} if the part file should roll, {@link false} otherwise.
     */
    boolean shouldRollOnProcessingTime(final PartFileInfo<BucketID> partFileState, final long currentTime) throws IOException;
}

这三个接口别离对应在某些状况下,是否应该敞开以后关上的文件:

  • shouldRollOnCheckpoint:做 Checkpoint 之前查看。
  • shouldRollOnEvent:依据以后的状态查看是否应该敞开。比方以后的 buffer 大小是否超过了限度。
  • shouldRollOnProcessingTime:查看以后关上工夫是否太长来盘判断合乎敞开的条件。

2.2 pendingFileRecoverablesForCurrentCheckpoint isNotEmpty

其中的元素也是依据 RollingPolicy 来触发的,不做过多的解释。

2.3 pendingFileRecoverablesPerCheckpoint isNotEmpty

基于 pendingFileRecoverablesForCurrentCheckpoint isNotEmpty。用字典来保留一个 CheckpointId 与 List<InProgressFileWriter.PendingFileRecoverable> 的关系。

2.4 非沉闷 Bucket

联合后面的条件来说,其实就是曾经敞开并做完所有 Checkpoint 的目录,则为非沉闷Bucket。查看的机会个别是:

  1. Task 从新复原时,从 StateBackend 中读取之前的状态,并做查看
  2. 做完 Checkpoint 后,会进行一次查看

当 Bucket 变成非沉闷状态时,会做一次告诉 Inactive 的告诉。告知上游该分区的数据已提交,变成可读状态。见 issue:artition commit is delayed when records keep coming

3. FileSystemConnector 中的整洁架构

在理解完上文的知识点后,我关注到了有这么一个 Proposal:FLIP-115: Filesystem connector in Table。依据这个 Proposal,我简略的翻阅了一下相干的源码,发现其实现也是一种整洁架构的体现。

在下面咱们曾经进行过源码剖析了,接下来咱们就外面的形象设计以及职责、分层进行剖析:

|-- HiveTableSink  #Table 级 API,负责对外,用户能够间接调用
|-- StreamingFileSink  #Streaming 级 API,也能够对外,位于 TableAPI 下方
|-- StreamingFileSinkHelper #集成了对于 TimeService 的逻辑,便于定期敞开 Bucket;以及对于数据到 Bucket 的散发。这个类也被 AbstractStreamingWriter 应用,正文上也倡议复用于 RichSinkFunction or StreamOperator
|-- BucketsBuilder #场景中调到的具体类是 HadoopPathBasedBulkFormatBuilder,这个类会关注 Buckets 的具体实现以 BucketWriter 的具体实现
|-- Buckets #这是一个治理 Bucket 生命周期的类。其中有几个要害成员对象
  |-- BucketWriter  #会对应具体的 FileSystem 实现与写入的 Format
  |-- RolingPolicy  #滚动策略,后面提到过,不再深刻探讨
  |-- BucketAssigner #决定每个元素输入到哪个 Bucket 中。比方是 key 还是 date 等等
  |-- BucketFactory #负责每个 Bucket 的创立

因为职责切分粒度较细,数据的流转逻辑与内部具体实现是解耦的,咱们举几个例子:

  1. 如果咱们要基于本人的 DSL 来调用 Hive 的写入,那么只须要写个和 HiveTableSink 相似的 HiveDSLSink。
  2. 如果一个数仓(数据湖)始终在减少本人底层的文件系统的反对,那么当第一套代码构筑结束时,后续只须要实现相应的 BucketWriterFileSystem即可。
  3. 如果一个数仓(数据湖)始终在减少本人反对的 Format,那么当第一套代码构筑结束时,后续只须要实现相应的 BucketWriter 即可。

基于这种设计,外围逻辑往往不会产生变动,并将容易变动的局部隔离开来,整个模块的品质将更容易失去保障。

正文完
 0