在 StreamGraph 翻译为 JobGraph 的过程中 Flink 会为每一个算子生成对应的 OperatorID,并传递到 Jobvertex 中。JobVertex 是 JobGraph 中的节点,每个 JobVertex 蕴含一个或多个算子 chain 在一起的算子链。如果 JobVertex 只蕴含一个算子,则 JobVertex 的 id 就是这个算子的 OperatorID,如果 JobVertex 蕴含了多个算子 chain 在一起的算子链,则 JobVertex 的 id 是这个算子链的头部算子的 OperatorID。每个 OperatorID 惟一标识一个算子,Flink 状态复原时也是通过 OperatorID 找到以后节点对应的状态。

入口函数

之前提到,OperatorID 是在 StreamGraph 翻译为 JobGraph 的过程中生成的,其入口函数为 StreamingJobGraphGenerator#createJobGraph:

// Generate deterministic hashes for the nodes in order to identify them across// submission iff they didn't change.Map<Integer, byte[]> hashes =  defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);// Generate legacy version hashes for backwards compatibilityList<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());for (StreamGraphHasher hasher : legacyStreamGraphHashers) {  legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));}
  • defaultStreamGraphHasher:默认实现为 StreamGraphHasherV2,用于计算每个节点的 OperatorID,哈希的对象依据 StreamNode 是否设置了 transformationUID 会有变动。
  • legacyHashes:只蕴含一个 StreamGraphUserHashHasher,如果用户给算子设置了 userHash,则这里会抽取用户设置的 userHash 作为 OperatorID。

StreamGraphHasherV2

  1. 找出所有的 source 算子,增加到 remaining 队列;
  2. 对 remaining 队列采取广度遍历算法,计算每个节点的 OperatorID。
public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {    // The hash function used to generate the hash    final HashFunction hashFunction = Hashing.murmur3_128(0);    final Map<Integer, byte[]> hashes = new HashMap<>();    Set<Integer> visited = new HashSet<>();    Queue<StreamNode> remaining = new ArrayDeque<>();    // We need to make the source order deterministic. The source IDs are    // not returned in the same order, which means that submitting the same    // program twice might result in different traversal, which breaks the    // deterministic hash assignment.    List<Integer> sources = new ArrayList<>();    for (Integer sourceNodeId : streamGraph.getSourceIDs()) {        sources.add(sourceNodeId);    }    Collections.sort(sources);    //    // Traverse the graph in a breadth-first manner. Keep in mind that    // the graph is not a tree and multiple paths to nodes can exist.    //    // 将 source 节点放入队列    // Start with source nodes    for (Integer sourceNodeId : sources) {        remaining.add(streamGraph.getStreamNode(sourceNodeId));        visited.add(sourceNodeId);    }    // 广度遍历    StreamNode currentNode;    while ((currentNode = remaining.poll()) != null) {        // Generate the hash code. Because multiple path exist to each        // node, we might not have all required inputs available to        // generate the hash code.        // 如果生成失败,阐明该节点依赖的节点的哈希尚未计算结束,则把该节点从 visited 拿出,期待下一次遍历        // 如果生成胜利,则把该节点的上游节点放入待遍历的队列和 visited 队列,放入 visited 队列的起因是        if (generateNodeHash(                currentNode,                hashFunction,                hashes,                streamGraph.isChainingEnabled(),                streamGraph)) {            // Add the child nodes            for (StreamEdge outEdge : currentNode.getOutEdges()) {                StreamNode child = streamGraph.getTargetVertex(outEdge);                if (!visited.contains(child.getId())) {                    remaining.add(child);                    visited.add(child.getId());                }            }        } else {            // We will revisit this later.            visited.remove(currentNode.getId());        }    }    return hashes;}

generateNodeHash 办法执行逻辑如下图所示:

依据用户是否需给 StreamNode 设置了 transformationUID 会将不同的数据作为哈希对象:

  • generateUserSpecifiedHash,将用户设置的 transformationUID 作为源数据计算哈希:

    private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {  hasher.putString(node.getTransformationUID(), Charset.forName("UTF-8"));  return hasher.hash().asBytes();}
  • generateDeterministicHash,依据作业的拓扑构造计算 OperatorID:

    private byte[] generateDeterministicHash(  StreamNode node,  Hasher hasher,  Map<Integer, byte[]> hashes,  boolean isChainingEnabled,  StreamGraph streamGraph) {  // Include stream node to hash. We use the current size of the computed  // hashes as the ID. We cannot use the node's ID, because it is  // assigned from a static counter. This will result in two identical  // programs having different hashes.  generateNodeLocalHash(hasher, hashes.size());  // Include chained nodes to hash  for (StreamEdge outEdge : node.getOutEdges()) {    if (isChainable(outEdge, isChainingEnabled, streamGraph)) {      // Use the hash size again, because the nodes are chained to      // this node. This does not add a hash for the chained nodes.      generateNodeLocalHash(hasher, hashes.size());    }  }  byte[] hash = hasher.hash().asBytes();  // Make sure that all input nodes have their hash set before entering  // this loop (calling this method).  for (StreamEdge inEdge : node.getInEdges()) {    byte[] otherHash = hashes.get(inEdge.getSourceId());    // Sanity check    if (otherHash == null) {      throw new IllegalStateException(        "Missing hash for input node "        + streamGraph.getSourceVertex(inEdge)        + ". Cannot generate hash for "        + node        + ".");    }    for (int j = 0; j < hash.length; j++) {      hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);    }  }  // ... debug log  return hash;}
  1. 在哈希源数据 buffer 中放入 hashes.size();
  2. 查看该算子 chain 的上游算子数量,每有一个就往 buffer 中放一次 hashes.size();
  3. 对 buffer 计算哈希失去该算子的哈希值;
  4. 找出该算子的上游算子的 hashes,与该算子的哈希值做位操作。

Chain 策略

判断两个算子能 chain 在一起的条件如下:

  • 用户全局容许启用 chain:默认处于开启状态,可通过 StreamExecutionEnvironment#disableOperatorChaining 禁用;
  • 上游算子只有以后算子一个上游;
  • 两个算子同属一个 slotSharingGroup;
  • 算子的 chain 策略不能是 NEVER:默认是 ALWAYS,一些 transformations 可通过 setChainingStrategy 批改;
  • 算子之间的数据转发应用 ForwardPartitioner;
  • ShuffleMode 不能是 BATCH;
  • 上下游算子并发度统一;
  • 该 StreamGraph 的 chaining 配置为 true:默认为 true,用户可通过 StreamGraph::setChaining 批改。

总结

OperatorID 的生成逻辑能够简要概括如下:

  1. 如果用户在创立 DataStream 时设置了 userHash,则应用该 userHash 作为 OperatorID;
  2. 如果用户在创立 DataStream 时设置了 transformationUID,则将 transformationUID 进行一次哈希计算的后果作为 OperatorID;
  3. 默认状况下,依据以后算子的地位以及和上游算子 chain 的状况计算哈希值,并和上游算子的哈希值做位操作后取得 OperatorID。