乐趣区

关于flink:深入解析-Flink-的算子链机制

作者:LittleMagic

“为什么我的 Flink 作业 Web UI 中只显示出了一个框,并且 Records Sent 和 Records Received 指标都是 0?是我的程序写得有问题吗?”

Flink 算子链简介

笔者在 Flink 社区群里常常能看到相似这样的疑难。这种状况简直都不是程序有问题,而是因为 Flink 的 operator chain ——即算子链机制导致的,即提交的作业的执行打算中,所有算子的并发实例(即 sub-task)都因为满足特定条件而串成了整体来执行,天然就察看不到算子之间的数据流量了。

当然上述是一种非凡状况。咱们更常见到的是只有局部算子失去了算子链机制的优化,如官网文档中呈现过屡次的下图所示,留神 Source 和 map() 算子。

算子链机制的益处是不言而喻的:所有 chain 在一起的 sub-task 都会在同一个线程(即 TaskManager 的 slot)中执行,可能缩小不必要的数据交换、序列化和上下文切换,从而进步作业的执行效率。

铺垫了这么多,接下来就通过源码简略看看算子链产生的条件,以及它是如何在 Flink Runtime 中实现的。

逻辑打算中的算子链

对 Flink Runtime 稍有理解的看官应该晓得,Flink 作业的执行打算会用三层图构造来示意,即:

  • StreamGraph —— 原始逻辑执行打算
  • JobGraph —— 优化的逻辑执行打算(Web UI 中看到的就是这个)
  • ExecutionGraph —— 物理执行打算

算子链是在优化逻辑打算时退出的,也就是由 StreamGraph 生成 JobGraph 的过程中。那么咱们来到负责生成 JobGraph 的 o.a.f.streaming.api.graph.StreamingJobGraphGenerator 类,查看其外围办法 createJobGraph() 的源码。

private JobGraph createJobGraph() {
    // make sure that all vertices start immediately
    jobGraph.setScheduleMode(streamGraph.getScheduleMode());
    // Generate deterministic hashes for the nodes in order to identify them across
    // submission iff they didn't change.
    Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
    // Generate legacy version hashes for backwards compatibility
    List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
    for (StreamGraphHasher hasher : legacyStreamGraphHashers) {legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
    }
    Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
    setChaining(hashes, legacyHashes, chainedOperatorHashes);

    setPhysicalEdges();
    // 略......

    return jobGraph;
}

可见,该办法会先计算出 StreamGraph 中各个节点的哈希码作为惟一标识,并创立一个空的 Map 构造保留行将被链在一起的算子的哈希码,而后调用 setChaining() 办法,如下源码所示。

<pre class="cm-s-default" style="color: rgb(55, 61, 65); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {for (Integer sourceNodeId : streamGraph.getSourceIDs()) {createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);    }}</pre>

可见是一一遍历 StreamGraph 中的 Source 节点,并调用 createChain() 办法。createChain() 是逻辑打算层创立算子链的外围办法,残缺源码如下,有点长。

private List<StreamEdge> createChain(
        Integer startNodeId,
        Integer currentNodeId,
        Map<Integer, byte[]> hashes,
        List<Map<Integer, byte[]>> legacyHashes,
        int chainIndex,
        Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {if (!builtVertices.contains(startNodeId)) {List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
        List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
        List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

        StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
        for (StreamEdge outEdge : currentNode.getOutEdges()) {if (isChainable(outEdge, streamGraph)) {chainableOutputs.add(outEdge);
            } else {nonChainableOutputs.add(outEdge);
            }
        }

        for (StreamEdge chainable : chainableOutputs) {
            transitiveOutEdges.addAll(createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
        }

        for (StreamEdge nonChainable : nonChainableOutputs) {transitiveOutEdges.add(nonChainable);
            createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
        }

        List<Tuple2<byte[], byte[]>> operatorHashes =
            chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());

        byte[] primaryHashBytes = hashes.get(currentNodeId);
        OperatorID currentOperatorId = new OperatorID(primaryHashBytes);

        for (Map<Integer, byte[]> legacyHash : legacyHashes) {operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
        }

        chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
        chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
        chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));

        if (currentNode.getInputFormat() != null) {getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
        }
        if (currentNode.getOutputFormat() != null) {getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
        }

        StreamConfig config = currentNodeId.equals(startNodeId)
                ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
                : new StreamConfig(new Configuration());

        setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);

        if (currentNodeId.equals(startNodeId)) {config.setChainStart();
            config.setChainIndex(0);
            config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
            config.setOutEdgesInOrder(transitiveOutEdges);
            config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
            for (StreamEdge edge : transitiveOutEdges) {connect(startNodeId, edge);
            }
            config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
        } else {chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
            config.setChainIndex(chainIndex);
            StreamNode node = streamGraph.getStreamNode(currentNodeId);
            config.setOperatorName(node.getOperatorName());
            chainedConfigs.get(startNodeId).put(currentNodeId, config);
        }

        config.setOperatorID(currentOperatorId);
        if (chainableOutputs.isEmpty()) {config.setChainEnd();
        }
        return transitiveOutEdges;
    } else {return new ArrayList<>();
    }
}

先解释一下办法结尾创立的 3 个 List 构造:

  • transitiveOutEdges:以后算子链在 JobGraph 中的出边列表,同时也是 createChain() 办法的最终返回值;
  • chainableOutputs:以后可能链在一起的 StreamGraph 边列表;
  • nonChainableOutputs:以后不可能链在一起的 StreamGraph 边列表。

接下来,从 Source 开始遍历 StreamGraph 中以后节点的所有出边,调用 isChainable() 办法判断是否能够被链在一起(这个判断逻辑稍后会讲到)。能够链接的出边被放入 chainableOutputs 列表,否则放入 nonChainableOutputs 列表。对于 chainableOutputs 中的边,就会以这些边的间接上游为终点,持续递归调用 createChain() 办法延展算子链。对于 nonChainableOutputs 中的边,因为以后算子链的延展曾经到头,就会以这些“断点”为终点,持续递归调用 createChain() 办法试图创立新的算子链。也就是说,逻辑打算中整个创立算子链的过程都是递归的,亦即理论返回时,是从 Sink 端开始返回的。

而后要判断以后节点是不是算子链的起始节点。如果是,则调用 createJobVertex() 办法为算子链创立一个 JobVertex(即 JobGraph 中的节点),也就造成了咱们在 Web UI 中看到的 JobGraph 成果:

最初,还须要将各个节点的算子链数据写入各自的 StreamConfig 中,算子链的起始节点要额定保留下 transitiveOutEdges。StreamConfig 在后文的物理执行阶段会再次用到。

造成算子链的条件

来看看 isChainable() 办法的代码。

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

    StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
    StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();

    return downStreamVertex.getInEdges().size() == 1
            && outOperator != null
            && headOperator != null
            && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
            && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
            && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
                headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
            && (edge.getPartitioner() instanceof ForwardPartitioner)
            && edge.getShuffleMode() != ShuffleMode.BATCH
            && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
            && streamGraph.isChainingEnabled();}

由此可得,上下游算子可能 chain 在一起的条件还是十分刻薄的(陈词滥调了),列举如下:

  • 上下游算子实例处于同一个 SlotSharingGroup 中(之后再提);
  • 上游算子的链接策略(ChainingStrategy)为 ALWAYS ——既能够与上游链接,也能够与上游链接。咱们常见的 map()、filter() 等都属此类;
  • 上游算子的链接策略为 HEAD 或 ALWAYS。HEAD 策略示意只能与上游链接,这在失常状况下是 Source 算子的专属;
  • 两个算子间的物理分区逻辑是 ForwardPartitioner,可参见之前写过的《聊聊 Flink DataStream 的八种物理分区逻辑》;
  • 两个算子间的 shuffle 形式不是批处理模式;
  • 上下游算子实例的并行度雷同;
  • 没有禁用算子链。

禁用算子链

用户能够在一个算子上调用 startNewChain() 办法强制开始一个新的算子链,或者调用 disableOperatorChaining() 办法指定它不参加算子链。代码位于 SingleOutputStreamOperator 类中,都是通过扭转算子的链接策略实现的。

@PublicEvolving
public SingleOutputStreamOperator<T> disableChaining() {return setChainingStrategy(ChainingStrategy.NEVER);
}

@PublicEvolving
public SingleOutputStreamOperator<T> startNewChain() {return setChainingStrategy(ChainingStrategy.HEAD);
}

如果要在整个运行时环境中禁用算子链,调用 StreamExecutionEnvironment.disableOperatorChaining() 办法即可。

物理打算中的算子链

在 JobGraph 转换成 ExecutionGraph 并交由 TaskManager 执行之后,会生成调度执行的根本工作单元 ——StreamTask,负责执行具体的 StreamOperator 逻辑。在 StreamTask.invoke() 办法中,初始化了状态后端、checkpoint 存储和定时器服务之后,能够发现:

operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();

结构出了一个 OperatorChain 实例,这就是算子链在理论执行时的状态。解释一下 OperatorChain 中的几个次要属性。

private final StreamOperator<?>[] allOperators;
private final RecordWriterOutput<?>[] streamOutputs;
private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint;
private final OP headOperator;
  • headOperator:算子链的第一个算子,对应 JobGraph 中的算子链起始节点;
  • allOperators:算子链中的所有算子,倒序排列,即 headOperator 位于该数组的开端;
  • streamOutputs:算子链的输入,能够有多个;
  • chainEntryPoint:算子链的“入口点”,它的含意将在后文阐明。

由上可知,所有 StreamTask 都会创立 OperatorChain。如果一个算子无奈进入算子链,也会造成一个只有 headOperator 的单个算子的 OperatorChain。OperatorChain 构造方法中的外围代码如下。

StreamEdge outEdge = outEdgesInOrder.get(i);
    RecordWriterOutput<?> streamOutput = createStreamOutput(recordWriters.get(i),
        outEdge,
        chainedConfigs.get(outEdge.getSourceId()),
        containingTask.getEnvironment());
    this.streamOutputs[i] = streamOutput;
    streamOutputMap.put(outEdge, streamOutput);
}

// we create the chain of operators and grab the collector that leads into the chain
List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
this.chainEntryPoint = createOutputCollector(
    containingTask,
    configuration,
    chainedConfigs,
    userCodeClassloader,
    streamOutputMap,
    allOps);

if (operatorFactory != null) {WatermarkGaugeExposingOutput<StreamRecord<OUT>> output = getChainEntryPoint();
    headOperator = operatorFactory.createStreamOperator(containingTask, configuration, output);
    headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, output.getWatermarkGauge());
} else {headOperator = null;}

// add head operator to end of chain
allOps.add(headOperator);
this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);

首先会遍历算子链整体的所有出边,并调用 createStreamOutput() 办法创立对应的上游输入 RecordWriterOutput。而后就会调用 createOutputCollector() 办法创立物理的算子链,并返回 chainEntryPoint,这个办法比拟重要,局部代码如下。

 StreamTask<?, ?> containingTask,
        StreamConfig operatorConfig,
        Map<Integer, StreamConfig> chainedConfigs,
        ClassLoader userCodeClassloader,
        Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
        List<StreamOperator<?>> allOperators) {List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4);

    // create collectors for the network outputs
    for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {@SuppressWarnings("unchecked")
        RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
        allOutputs.add(new Tuple2<>(output, outputEdge));
    }

    // Create collectors for the chained outputs
    for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {int outputId = outputEdge.getTargetId();
        StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
        WatermarkGaugeExposingOutput<StreamRecord<T>> output = createChainedOperator(
            containingTask,
            chainedOpConfig,
            chainedConfigs,
            userCodeClassloader,
            streamOutputs,
            allOperators,
            outputEdge.getOutputTag());
        allOutputs.add(new Tuple2<>(output, outputEdge));
    }
    // 以下略......
}

该办法从上一节提到的 StreamConfig 中别离取出出边和链接边的数据,并创立各自的 Output。出边的 Output 就是将数据发往算子链之外上游的 RecordWriterOutput,而链接边的输入要靠 createChainedOperator() 办法。

private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(
        StreamTask<?, ?> containingTask,
        StreamConfig operatorConfig,
        Map<Integer, StreamConfig> chainedConfigs,
        ClassLoader userCodeClassloader,
        Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
        List<StreamOperator<?>> allOperators,
        OutputTag<IN> outputTag) {
    // create the output that the operator writes to first. this may recursively create more operators
    WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput = createOutputCollector(
        containingTask,
        operatorConfig,
        chainedConfigs,
        userCodeClassloader,
        streamOutputs,
        allOperators);

    // now create the operator and give it the output collector to write its output to
    StreamOperatorFactory<OUT> chainedOperatorFactory = operatorConfig.getStreamOperatorFactory(userCodeClassloader);
    OneInputStreamOperator<IN, OUT> chainedOperator = chainedOperatorFactory.createStreamOperator(containingTask, operatorConfig, chainedOperatorOutput);

    allOperators.add(chainedOperator);

    WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput;
    if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag);
    }
    else {TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
        currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
    }

    // wrap watermark gauges since registered metrics must be unique
    chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()::getValue);
    chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge()::getValue);
    return currentOperatorOutput;
}

咱们一眼就能够看到,这个办法递归调用了上述 createOutputCollector() 办法,与逻辑打算阶段相似,通过一直延长 Output 来产生 chainedOperator(即算子链中除了 headOperator 之外的算子),并逆序返回,这也是 allOperators 数组中的算子程序为倒序的起因。

chainedOperator 产生之后,将它们通过 ChainingOutput 连接起来,造成如下图所示的构造。

图片来自:http://wuchong.me/blog/2016/05/09/flink-internals-understanding-execution-resources/

最初来看看 ChainingOutput.collect() 办法是如何输入数据流的。

@Override
public void collect(StreamRecord<T> record) {if (this.outputTag != null) {
        // we are only responsible for emitting to the main input
        return;
    }
    pushToOperator(record);
}

@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
        // we are only responsible for emitting to the side-output specified by our
        // OutputTag.
        return;
    }
    pushToOperator(record);
}

protected <X> void pushToOperator(StreamRecord<X> record) {
    try {
        // we know that the given outputTag matches our OutputTag so the record
        // must be of the type that our operator expects.
        @SuppressWarnings("unchecked")
        StreamRecord<T> castRecord = (StreamRecord<T>) record;
        numRecordsIn.inc();
        operator.setKeyContextElement1(castRecord);
        operator.processElement(castRecord);
    }
    catch (Exception e) {throw new ExceptionInChainedOperatorException(e);
    }
}

可见是通过调用链接算子的 processElement() 办法,间接将数据推给上游解决了。也就是说,OperatorChain 齐全能够看做一个由 headOperator 和 streamOutputs 组成的单个算子,其外部的 chainedOperator 和 ChainingOutput 都像是被黑盒遮蔽,同时没有引入任何 overhead。

买通了算子链在执行层的逻辑,看官应该会明确 chainEntryPoint 的含意了。因为它位于递归返回的起点,所以它就是流入算子链的起始 Output,即上图中指向 headOperator 的 RecordWriterOutput。

文章转载自简书,作者:LittleMagic。
原文链接:https://www.jianshu.com/p/799744e347c7

退出移动版