乐趣区

关于flink:Flink源码笔记StreamGraph-到-JobGraph

简介

JobGraph 能够认为是 StreamGraph 的优化图,它将一些合乎特定条件的 operators 合并成一个 operator chain,以缩小数据在节点之间序列化 / 反序列化以及网络通信带来的资源耗费。

入口函数

与 StreamGraph 的生成相似,调用 StreamGraph.getJobGraph() 就能够失去对应的 JobGraph,底层会创立一个 StreamingJobGraphGenerator 以创立 JobGraph:new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph()

private JobGraph createJobGraph() {
    // ...
    
    // Generate deterministic hashes for the nodes in order to identify them across
    // submission iff they didn't change.
    Map<Integer, byte[]> hashes =
            defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

    // Generate legacy version hashes for backwards compatibility
    List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
    for (StreamGraphHasher hasher : legacyStreamGraphHashers) {legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
    }

    setChaining(hashes, legacyHashes);
    // ...
    return jobGraph;
}

外围是这两步:

  1. 调用 traverseStreamGraphAndGenerateHashes 为每个节点生成哈希(惟一标识);
  2. 调用 setChaining 优化算子流程,将一些算子 chain 在一起,缩小序列化 / 反序列化等网络通信开销。

traverseStreamGraphAndGenerateHashes

在 StreamGraph 中咱们提到,创立 StreamGraph 时创立的 StreamNode ID,是由 Transformation ID 转换而来,而 Transformation ID 是一个一直递增的动态变量,因而会呈现以下状况:在同一个过程中,咱们用 DataStream API 先后构建了两个算子流程完全一致的作业 A 和 B,但他们底层的 Transformation ID 齐全不同。从作业和图构造角度上,这两个作业完全一致,因而咱们须要引入另一套 id 机制去标识作业,这就是 Operator ID。

traverseStreamGraphAndGenerateHashes 的作用就是依据节点在 StreamGraph 中的地位,生成对应的哈希值作为节点标识,Flink 默认应用 StreamGraphHasherV2 生成节点哈希。

// The hash function used to generate the hash
final HashFunction hashFunction = Hashing.murmur3_128(0);
final Map<Integer, byte[]> hashes = new HashMap<>();

首先,该办法先收集 StreamGraph 所有的 sources,为了确保对雷同的 StreamGraph 每次生成的哈希统一,在拿到所有 source IDs 后会做一次排序。

We need to make the source order deterministic. The source IDs not returned in the same order, which means that submitting the same program twice might result in different traversal, which breaks the deterministic hash assignment.

List<Integer> sources = new ArrayList<>();
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {sources.add(sourceNodeId);
}
Collections.sort(sources);

而后,StreamGraphHasherV2 应用宽度优先遍历算法来遍历这些节点(利用队列):

  1. 对队列中每个节点尝试生成哈希,若胜利生成,则将该节点的所有上游节点也增加到队列中;
  2. 若生成失败,阐明该节点尚未到生成机会(比方该节点有些上游节点还没被遍历到),因而先将其从队列中移除,期待该节点的另一个上游节点被遍历到再将该节点增加回队列中。
1    2
|    |
|    |
|    3
\    /
 \ /
  4

如上述例子,1 和 2 都是 sources 节点,会被先增加到队列中,此时队列中的节点为:[1. 2],当通过第一次遍历后,节点 1、2 的哈希计算结束,咱们将它们的上游节点按序放入队列,此时队列中的节点变成了 [4, 3],此时咱们先从队列中取到了节点 4 尝试计算哈希,按咱们上述所说的,这次哈希计算会失败,从而进入到 else 分支,4 节点从队列中被移除,而后咱们再取出节点 3 进行哈希计算,在计算结束后将它的上游节点 4 再度放入到队列中:[4]。这样在下一次遍历时再度计算节点 4 的哈希,此时节点 4 的所有上游节点都已被遍历过,能够胜利计算失去哈希。

//
// Traverse the graph in a breadth-first manner. Keep in mind that
// the graph is not a tree and multiple paths to nodes can exist.
//

Set<Integer> visited = new HashSet<>();
Queue<StreamNode> remaining = new ArrayDeque<>();

// Start with source nodes
for (Integer sourceNodeId : sources) {remaining.add(streamGraph.getStreamNode(sourceNodeId));
    visited.add(sourceNodeId);
}

StreamNode currentNode;
while ((currentNode = remaining.poll()) != null) {
    // Generate the hash code. Because multiple path exist to each
    // node, we might not have all required inputs available to
    // generate the hash code.
    if (generateNodeHash(
        currentNode,
        hashFunction,
        hashes,
        streamGraph.isChainingEnabled(),
        streamGraph)) {
        // Add the child nodes
        for (StreamEdge outEdge : currentNode.getOutEdges()) {StreamNode child = streamGraph.getTargetVertex(outEdge);

            if (!visited.contains(child.getId())) {remaining.add(child);
                visited.add(child.getId());
            }
        }
    } else {
        // We will revisit this later.
        visited.remove(currentNode.getId());
    }
}

在 generateNodeHash 中咱们能够看到哈希的具体计算过程:依据用户是否指定了 Transformation UID,别离调用 generateDeterministicHash 和 generateUserSpecifiedHash 计算哈希,并将 StreamNode ID 到哈希的映射后果放入 hashes。

private boolean generateNodeHash(
        StreamNode node,
        HashFunction hashFunction,
        Map<Integer, byte[]> hashes,
        boolean isChainingEnabled,
        StreamGraph streamGraph) {

    // Check for user-specified ID
    String userSpecifiedHash = node.getTransformationUID();

    if (userSpecifiedHash == null) {
        // Check that all input nodes have their hashes computed
        for (StreamEdge inEdge : node.getInEdges()) {
            // If the input node has not been visited yet, the current
            // node will be visited again at a later point when all input
            // nodes have been visited and their hashes set.
            if (!hashes.containsKey(inEdge.getSourceId())) {return false;}
        }

        Hasher hasher = hashFunction.newHasher();
        byte[] hash =
                generateDeterministicHash(node, hasher, hashes, isChainingEnabled, streamGraph);

        if (hashes.put(node.getId(), hash) != null) {
            // Sanity check
            throw new IllegalStateException(
                    "Unexpected state. Tried to add node hash"
                            + "twice. This is probably a bug in the JobGraph generator.");
        }

        return true;
    } else {Hasher hasher = hashFunction.newHasher();
        byte[] hash = generateUserSpecifiedHash(node, hasher);

        for (byte[] previousHash : hashes.values()) {if (Arrays.equals(previousHash, hash)) {
                throw new IllegalArgumentException(
                        "Hash collision on user-specified ID"
                                + "\""
                                + userSpecifiedHash
                                + "\". "+"Most likely cause is a non-unique ID. Please check that all IDs "+"specified via `uid(String)` are unique.");
            }
        }

        if (hashes.put(node.getId(), hash) != null) {
            // Sanity check
            throw new IllegalStateException(
                    "Unexpected state. Tried to add node hash"
                            + "twice. This is probably a bug in the JobGraph generator.");
        }

        return true;
    }
}

generateDeterministicHash

Flink 将 hashes.size() 作为哈希算法的输出值(即以以后节点在 StreamGraph 中的遍历地位作为哈希算法的输出)的起因在正文中也已阐明:

Include stream node to hash. We use the current size of the computed hashes as the ID. We cannot use the node’s ID, because it is assigned from a static counter. This will result in two identical programs having different hashes.

应用 StreamNode ID 可能会使得两个雷同程序失去不一样的哈希计算结果。

须要留神的是,该节点的哈希值还与该节点和上游节点可能 chain 在一起的个数无关,最初还须要跟其上游节点的哈希值进行异或操作。

private byte[] generateDeterministicHash(
        StreamNode node,
        Hasher hasher,
        Map<Integer, byte[]> hashes,
        boolean isChainingEnabled,
        StreamGraph streamGraph) {

    // Include stream node to hash. We use the current size of the computed
    // hashes as the ID. We cannot use the node's ID, because it is
    // assigned from a static counter. This will result in two identical
    // programs having different hashes.
    // hasher.putInt(hashes.size())
    generateNodeLocalHash(hasher, hashes.size());

    // Include chained nodes to hash
    for (StreamEdge outEdge : node.getOutEdges()) {if (isChainable(outEdge, isChainingEnabled, streamGraph)) {

            // Use the hash size again, because the nodes are chained to
            // this node. This does not add a hash for the chained nodes.
            // hasher.putInt(hashes.size())
            generateNodeLocalHash(hasher, hashes.size());
        }
    }

    byte[] hash = hasher.hash().asBytes();

    // Make sure that all input nodes have their hash set before entering
    // this loop (calling this method).
    for (StreamEdge inEdge : node.getInEdges()) {byte[] otherHash = hashes.get(inEdge.getSourceId());

        // Sanity check
        if (otherHash == null) {
            throw new IllegalStateException(
                    "Missing hash for input node"
                            + streamGraph.getSourceVertex(inEdge)
                            + ". Cannot generate hash for"
                            + node
                            + ".");
        }

        for (int j = 0; j < hash.length; j++) {hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);
        }
    }
    // ...

    return hash;
}

generateUserSpecifiedHash

generateUserSpecifiedHash 绝对 generateDeterministicHash 简略很多,只有把用户指定的 Transformation UID 作为哈希算法的输出计算即可失去哈希值。

setChaining

创立 JobGraph 要做的最外围的事是将 operators 合并成 chain,入口在 createJobGraph() 的 setChaining 中:

private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {// we separate out the sources that run as inputs to another operator (chained inputs)
    // from the sources that needs to run as the main (head) operator.
    final Map<Integer, OperatorChainInfo> chainEntryPoints =
            buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
    final Collection<OperatorChainInfo> initialEntryPoints =
            chainEntryPoints.entrySet().stream()
                    .sorted(Comparator.comparing(Entry::getKey))
                    .map(Entry::getValue)
                    .collect(Collectors.toList());

    // iterate over a copy of the values, because this map gets concurrently modified
    for (OperatorChainInfo info : initialEntryPoints) {
        createChain(info.getStartNodeId(),
                1, // operators start at position 1 because 0 is for chained source inputs
                info,
                chainEntryPoints);
    }
}

Flink 先对 StreamGraph 的每个 source 创立了一个链头(chainEntryPoints),而后从每个链头开始用贪婪思维尝试结构一条条 chain,具体可看 createChain:

private List<StreamEdge> createChain(
        final Integer currentNodeId,
        final int chainIndex,
        final OperatorChainInfo chainInfo,
        final Map<Integer, OperatorChainInfo> chainEntryPoints) {Integer startNodeId = chainInfo.getStartNodeId();
    if (!builtVertices.contains(startNodeId)) {

        // 以后 chain 到下一个 chain 的所有 edges
        List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();

        List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
        List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

        StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);

        // 1. 对以后节点和以后节点的每个上游节点判断是否能 chain 在一起,具体的判断条件后续阐明,// 依据是否能 chain 在一起,别离将 StreamEdge 增加到 chainableOutputs/nonChainableOutputs。for (StreamEdge outEdge : currentNode.getOutEdges()) {if (isChainable(outEdge, streamGraph)) {chainableOutputs.add(outEdge);
            } else {nonChainableOutputs.add(outEdge);
            }
        }

        // 2. 对能 chain 在一起的,递归调用 createChain 将上游节点退出到以后的 cahin 里。for (StreamEdge chainable : chainableOutputs) {
            transitiveOutEdges.addAll(
                    createChain(chainable.getTargetId(),
                            chainIndex + 1,
                            chainInfo,
                            chainEntryPoints));
        }

        // 3. 对不能 chain 在一起的上游节点,新建一个 OperatorChainInfo,// 并将以后节点和该上游节点之间的 edge 增加到 transitiveOutEdges 中,// 返回给下层办法。for (StreamEdge nonChainable : nonChainableOutputs) {transitiveOutEdges.add(nonChainable);
            createChain(nonChainable.getTargetId(),
                    1, // operators start at position 1 because 0 is for chained source inputs
                    chainEntryPoints.computeIfAbsent(nonChainable.getTargetId(),
                            (k) -> chainInfo.newChain(nonChainable.getTargetId())),
                    chainEntryPoints);
        }

        chainedNames.put(
                currentNodeId,
                createChainedName(
                        currentNodeId,
                        chainableOutputs,
                        Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
        chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
        chainedPreferredResources.put(
                currentNodeId,
                createChainedPreferredResources(currentNodeId, chainableOutputs));

        // 4. 将以后节点退出到 chain 中,并创立以后节点的 OperatorID,// 该 OperatorID 其实就是后面计算的哈希值,也是前面 JobVertex 的 ID。OperatorID currentOperatorId =
                chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId));

        if (currentNode.getInputFormat() != null) {getOrCreateFormatContainer(startNodeId)
                    .addInputFormat(currentOperatorId, currentNode.getInputFormat());
        }

        if (currentNode.getOutputFormat() != null) {getOrCreateFormatContainer(startNodeId)
                    .addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
        }

        // 5. 如果以后节点是 chain 的首节点,那么创立 JobVertex 并返回 JobVertex 对应的配置,// 否则创立一个新的配置。StreamConfig config =
                currentNodeId.equals(startNodeId)
                        ? createJobVertex(startNodeId, chainInfo)
                        : new StreamConfig(new Configuration());

        setVertexConfig(
                currentNodeId,
                config,
                chainableOutputs,
                nonChainableOutputs,
                chainInfo.getChainedSources());

        if (currentNodeId.equals(startNodeId)) {config.setChainStart();
            config.setChainIndex(chainIndex);
            config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());

            // 6. 为以后 JobVertex 和上游 JobVertex 建设连贯,体现为创立 IntermediateDataSet 和 JobEdge,// 并将 IntermediateDataSet 增加到上游节点的 results,将 JobEdge 增加到上游节点的 input 中。for (StreamEdge edge : transitiveOutEdges) {connect(startNodeId, edge);
            }

            config.setOutEdgesInOrder(transitiveOutEdges);
            config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

        } else {
            // 7. 若以后节点不是 chain 的首节点,那么把该节点的配置记录到 chainedConfigs 中。chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());

            config.setChainIndex(chainIndex);
            StreamNode node = streamGraph.getStreamNode(currentNodeId);
            config.setOperatorName(node.getOperatorName());
            chainedConfigs.get(startNodeId).put(currentNodeId, config);
        }

        config.setOperatorID(currentOperatorId);

        if (chainableOutputs.isEmpty()) {config.setChainEnd();
        }
        return transitiveOutEdges;

    } else {return new ArrayList<>();
    }
}

这部分建设 chain 的流程很简单,简略来说就是用贪婪思维,在这个办法中有一个 OperatorChainInfo chainInfo 保留以后的所在的 chain 信息,而后将该节点退出到以后 chain,并查看该节点和其上游节点是否能 chain 在一起, 能的话将以后的 chainInfo 传给上游节点递归调用 createChain,否则就新建一个 OperatorChainInfo 作为上游节点的 chain。而后将以后节点的信息记录到 chainInfo 中,同时获取对应的 OperatorID,这个 OperatorID 其实就是 traverseStreamGraphAndGenerateHashes 中计算出来的哈希值(如果该节点是 chain 的首节点,那么这个 OperatorID 也是 JobVertex ID)。

最初,判断以后节点是否是该 chain 的首节点,如果是,那么对应以后节点会生成一个 JobVertex,否则就把该节点的信息记录到 chainedConfigs 中,不便前面 chain 的首节点获取。当创立 JobVertex 的时候,还会建设为以后 JobVertex 和上游 JobVertex 建设连贯:IntermediateDataSet 和 JobEdge。JobVertex 之间的连贯和咱们平时的图不一样,它的连贯是:JobVertex(A) – IntermediateDataSet – JobEdge – JobVertex(B),如上面的 demo 所示。

JobVertexA                               JobVertexB
  results:                                 inputs:
    - [0]:                                   - [0]:
      consumer ---------- JobEdge ---------- - [1]:
    - [1]:                                   - [2]:
    - ...                                    - ...

isChainable

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

    return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph);
}

private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

    if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
            && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
            && (edge.getPartitioner() instanceof ForwardPartitioner)
            && edge.getShuffleMode() != ShuffleMode.BATCH
            && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
            && streamGraph.isChainingEnabled())) {return false;}

    // check that we do not have a union operation, because unions currently only work
    // through the network/byte-channel stack.
    // we check that by testing that each "type" (which means input position) is used only once
    for (StreamEdge inEdge : downStreamVertex.getInEdges()) {if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {return false;}
    }
    return true;
}

依据代码的判断条件,咱们能够总结出节点之间可能 chain 在一起需满足的条件如下:

  1. 上游节点仅有上游节点一个输出;
  2. 上游节点和上游节点在同一个 slotSharingGroup(默认满足);
  3. 上游节点和上游节点的 ChainingStrategy 必须合乎肯定条件(比方上下游节点的 ChainStrategy 都不能够为 NEVER)。
  4. 上游节点和上游节点必须通过 ForwardPartitioner 发送数据;
  5. shuffle 模式不能是 Batch;
  6. 上下游算子的并发度雷同;
  7. StreamGraph 的 chaining 配置项为 true(没有调用 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining 手动禁止)。
退出移动版