关于flink:Flink源码笔记从-DataStream-生成-StreamGraph

10次阅读

共计 6107 个字符,预计需要花费 16 分钟才能阅读完成。

StreanGraph

是一个 DAG(以邻接表的模式存储),存储了整个流的拓扑构造信息,由一系列 StreamEdge 和 StreamNode 组成。

调用 StreamExecutionEnvironment.getStreamGraph() 即能够生成 StreamGraph。StreamExecutionEnvironment.getStreamGraph() 默认会革除 StreamExecutionEnvironment 中保留的 transformations,后续就无奈调用 StreamExecutionEnvironment.execute() 了,因而须要重复使用 transformations 的话能够应用重载办法 StreamExecutionEnvironment.getStreamGraph(String jobName, boolean clearTransformations),其源码如下:

public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
    if (clearTransformations) {this.transformations.clear();
    }
    return streamGraph;
}

这部分代码会生成一个 StreamGraphGenerator(生成过程中曾经存储了所有 transformations 的信息),调用其 generate 办法生成 StreamGraph。

StreamGraphGenerator

generate

public StreamGraph generate() {streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
    shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
    configureStreamGraph(streamGraph);

    alreadyTransformed = new HashMap<>();

    for (Transformation<?> transformation : transformations) {transform(transformation);
    }

    final StreamGraph builtStreamGraph = streamGraph;

    alreadyTransformed.clear();
    alreadyTransformed = null;
    streamGraph = null;

    return builtStreamGraph;
}

这个办法外围做了四件事:

  1. 初始化 StreamGraph;
  2. 创立 alreadyTransformed,避免对同一个 transformation 反复创立 StreamNode;
  3. 对每个 transformation 进行转化,创立对应的 StreamNode 和 StreamEdge;
  4. 后置一些环境清理。

第 2 步创立的 alreadyTransformed 除了避免反复创立 StreamNode 外还有一个作用,建设转换前后 Transformation id 的映射关系,这样能通过 Transformation 间接检索到对应的 StreamNode。 在转换过程中,有的 Transformation 不会转成 StreamNode,而是转成一个虚节点,这些虚节点的 id 会从新生成,这时候就须要通过 alreadyTransformed 记录的映射关系检索到 。Transformation 中有个动态成员 idCounter,用来给每个 Transformation 指派惟一 id。调用 Transformation.getNewNodeId() 就能够获取新的惟一的 Transformation id,getNewNodeId 代码如下:

public static int getNewNodeId() {
    idCounter++;
    return idCounter;
}

在第 3 步做 transform 的时,StreamGraphGenerator 会递归对每个 Transformation 的上游局部先进行 transform,确保每个 transformations 都会被 transform(StreamGraphGenerator 中的 transformations 其实并没有保留所有的 transformations,只有局部 transformations 在应用 DataStream API 时会被增加到 StreamExecutionEnvironment 中,并在后续创立 StreamGraphGenerator 的时候赋给 StreamGraphGenerator)。

虚节点

虚节点的构造是一个 Tuple3:

Tuple3<Integer, StreamPartitioner<?>, ShuffleMode>

外面记录了三局部信息:

  • 上游节点 id;
  • 分区器;
  • 数据处理模式(批 / 流)。

以上图的 PartitionTransformation 为例,它只记录了分区信息,不蕴含真正的操作,因而会转成一个虚节点。

假如从过程刚启动开始执行该 DataStream,transformation id 从 1 开始自增(上图自右向左从 1 开始递增赋值),PartitionTransformation 的 id 会是 3,在转成虚节点时,此时 idCounter 的值是 5(图中一共有 5 个 Transformation,因而在创立 DataStream 时 idCounter 就已自增到 5),此时再次调用 Transformation.getNewNodeId() 会失去新的值 6,赋给 PartitionTransformation 转换成的虚节点,并在 alreadyTransformed 记录 3 -> 6 的映射。

该 PartitionTransformation 的上游节点的 id 是 2,它由 keyBy 生成,分区器是 KeyGroupStreamPartitioner,因为咱们没有手动指定 shuffleMode,默认是 UNDEFINED,因而最初生成的虚节点具体信息则为:(2, KeyGroupStreamPartitioner, UNDEFINED)。

transform

StreamGraphGenerator 中有个 map—translatorMap,外面存储了 Transformation 到转换器(TransformationTranslator)的映射,在获取 transformation 对应的 TransformationTranslator 后,依据以后是流解决还是批处理环境执行 translateForStreaming 或 translateForBatch,咱们这里是流解决环境,因而间接看 translateForStreaming 的源码。

public final Collection<Integer> translateForStreaming(final T transformation, final Context context) {checkNotNull(transformation);
    checkNotNull(context);

    final Collection<Integer> transformedIds =
            translateForStreamingInternal(transformation, context);
    configure(transformation, context);

    return transformedIds;
}

translateForStreaming 采纳了模板办法的设计模式,定义在 SimpleTransformationTranslator 中,是一个 final 办法,每个 TransformationTranslator 依据各自的须要别离实现 translateForStreamingInternal 办法,个别最初的逻辑导向办法 translateInternal。

translateInternal

每个 TransformationTranslator 的具体实现可能不同,但根本做的是以下几件事中的一部分或其组合:

  • 对该 Transformation 构建 StreamNode,增加到 StreamGraph 中(addOperator);
  • 对该 StreamNode 和对应上游的 StreamNode 建设边,增加到 StreamGraph 中;
  • 对该 Transformation 构建虚节点,增加到 StreamGraph 中。

以上图为例,LegacySinkTransformation 在转换过程中,首先会找到对应的 Translator:LegacySinkTransformationTranslator,并调用其实现的 translateForStreamingInternal(最终导向到 translateInternal)。在 translateInternal 中最外围的逻辑是这两步:

  1. 调用 StreamGraph.addSink() 增加 StreamNode;
  2. 调用 StreamGraph.addEdge() 增加边。

StreamGraph.addSink()

addSink 实质也是调用了 addOperator 办法,只是额定在 StreamGraph 的成员 sinks 中增加了对该 StreamNode 的援用。同理,addSource 其实也只是在 addOperator 的根底上往 sources 里增加了对 StreamNode 的援用。

StreamGraph.addEdge()

在往 StreamGraph 里增加边的时候次要是建设上游节点到该节点的分割,因而须要先拿到所有 inputId,并将每个上游节点作为入参调用 addEdge,代码如下:

for (Integer inputId : context.getStreamNodeIds(input)) {streamGraph.addEdge(inputId, transformationId, 0);
}

context.getStreamNodeIds(input) 是从 StreamGraphGenerator.alreadyTransformed 中拿取上游节点的 id,上文提到过,有些 transformations 在转换过程中会变成虚节点从新生成 id,因而间接用 transformation 中保留的上游节点的 id 可能生效,须要做一次映射查问。

拿到上游节点 id 后,咱们就能够调用 streamGraph.addEdge(inputId, transformationId, 0) 增加边,这个办法最终导向 streamGraph.addEdgeInternal。在 addEdgeInternal 中,StreamNode 会对上游节点是虚节点的状况做一些解决,找到虚节点中记录的真正上游节点 X 后,建设 X 到以后 StreamNode 的关系,如下:

private void addEdgeInternal(
        Integer upStreamVertexID,
        Integer downStreamVertexID,
        int typeNumber,
        StreamPartitioner<?> partitioner,
        List<String> outputNames,
        OutputTag outputTag,
        ShuffleMode shuffleMode) {if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
        int virtualId = upStreamVertexID;
        // 找到虚节点记录的上游节点
        upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
        if (outputTag == null) {outputTag = virtualSideOutputNodes.get(virtualId).f1;
        }
        addEdgeInternal(
                upStreamVertexID,
                downStreamVertexID,
                typeNumber,
                partitioner,
                null,
                outputTag,
                shuffleMode);
    } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
        int virtualId = upStreamVertexID;
        // 找到虚节点记录的上游节点
        upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
        if (partitioner == null) {partitioner = virtualPartitionNodes.get(virtualId).f1;
        }
        shuffleMode = virtualPartitionNodes.get(virtualId).f2;
        addEdgeInternal(
                upStreamVertexID,
                downStreamVertexID,
                typeNumber,
                partitioner,
                outputNames,
                outputTag,
                shuffleMode);
    } else {StreamNode upstreamNode = getStreamNode(upStreamVertexID);
        StreamNode downstreamNode = getStreamNode(downStreamVertexID);

        // 设置 partitioner 和 shuffleMode
       
        StreamEdge edge =
                new StreamEdge(
                        upstreamNode,
                        downstreamNode,
                        typeNumber,
                        partitioner,
                        outputTag,
                        shuffleMode);

        getStreamNode(edge.getSourceId()).addOutEdge(edge);
        getStreamNode(edge.getTargetId()).addInEdge(edge);
    }
}

以 ReduceTransformation 为例(本身 id 为 4),它记录的 input 的 transformation id 为 3,首先通过 alreadyTransformed 找到映射后的节点 id:6。而后依据虚节点的信息 (2, KeyGroupStreamPartitioner, UNDEFINED) 找到真正的上游节点 2,创立 2-4 的 StreamEdge,在创立 StreamEdge 时,会在其中记录 partitioner 和 shuffleMode,并将这条边别离增加到 source vertex 的 outEdges 和 target vertex 的 inEdges 中。

正文完
 0