共计 6107 个字符,预计需要花费 16 分钟才能阅读完成。
StreanGraph
是一个 DAG(以邻接表的模式存储),存储了整个流的拓扑构造信息,由一系列 StreamEdge 和 StreamNode 组成。
调用 StreamExecutionEnvironment.getStreamGraph() 即能够生成 StreamGraph。StreamExecutionEnvironment.getStreamGraph() 默认会革除 StreamExecutionEnvironment 中保留的 transformations,后续就无奈调用 StreamExecutionEnvironment.execute() 了,因而须要重复使用 transformations 的话能够应用重载办法 StreamExecutionEnvironment.getStreamGraph(String jobName, boolean clearTransformations),其源码如下:
public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
if (clearTransformations) {this.transformations.clear();
}
return streamGraph;
}
这部分代码会生成一个 StreamGraphGenerator(生成过程中曾经存储了所有 transformations 的信息),调用其 generate 办法生成 StreamGraph。
StreamGraphGenerator
generate
public StreamGraph generate() {streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
configureStreamGraph(streamGraph);
alreadyTransformed = new HashMap<>();
for (Transformation<?> transformation : transformations) {transform(transformation);
}
final StreamGraph builtStreamGraph = streamGraph;
alreadyTransformed.clear();
alreadyTransformed = null;
streamGraph = null;
return builtStreamGraph;
}
这个办法外围做了四件事:
- 初始化 StreamGraph;
- 创立 alreadyTransformed,避免对同一个 transformation 反复创立 StreamNode;
- 对每个 transformation 进行转化,创立对应的 StreamNode 和 StreamEdge;
- 后置一些环境清理。
第 2 步创立的 alreadyTransformed 除了避免反复创立 StreamNode 外还有一个作用,建设转换前后 Transformation id 的映射关系,这样能通过 Transformation 间接检索到对应的 StreamNode。 在转换过程中,有的 Transformation 不会转成 StreamNode,而是转成一个虚节点,这些虚节点的 id 会从新生成,这时候就须要通过 alreadyTransformed 记录的映射关系检索到 。Transformation 中有个动态成员 idCounter,用来给每个 Transformation 指派惟一 id。调用 Transformation.getNewNodeId() 就能够获取新的惟一的 Transformation id,getNewNodeId 代码如下:
public static int getNewNodeId() {
idCounter++;
return idCounter;
}
在第 3 步做 transform 的时,StreamGraphGenerator 会递归对每个 Transformation 的上游局部先进行 transform,确保每个 transformations 都会被 transform(StreamGraphGenerator 中的 transformations 其实并没有保留所有的 transformations,只有局部 transformations 在应用 DataStream API 时会被增加到 StreamExecutionEnvironment 中,并在后续创立 StreamGraphGenerator 的时候赋给 StreamGraphGenerator)。
虚节点
虚节点的构造是一个 Tuple3:
Tuple3<Integer, StreamPartitioner<?>, ShuffleMode>
外面记录了三局部信息:
- 上游节点 id;
- 分区器;
- 数据处理模式(批 / 流)。
以上图的 PartitionTransformation 为例,它只记录了分区信息,不蕴含真正的操作,因而会转成一个虚节点。
假如从过程刚启动开始执行该 DataStream,transformation id 从 1 开始自增(上图自右向左从 1 开始递增赋值),PartitionTransformation 的 id 会是 3,在转成虚节点时,此时 idCounter 的值是 5(图中一共有 5 个 Transformation,因而在创立 DataStream 时 idCounter 就已自增到 5),此时再次调用 Transformation.getNewNodeId() 会失去新的值 6,赋给 PartitionTransformation 转换成的虚节点,并在 alreadyTransformed 记录 3 -> 6 的映射。
该 PartitionTransformation 的上游节点的 id 是 2,它由 keyBy 生成,分区器是 KeyGroupStreamPartitioner,因为咱们没有手动指定 shuffleMode,默认是 UNDEFINED,因而最初生成的虚节点具体信息则为:(2, KeyGroupStreamPartitioner, UNDEFINED)。
transform
StreamGraphGenerator 中有个 map—translatorMap,外面存储了 Transformation 到转换器(TransformationTranslator)的映射,在获取 transformation 对应的 TransformationTranslator 后,依据以后是流解决还是批处理环境执行 translateForStreaming 或 translateForBatch,咱们这里是流解决环境,因而间接看 translateForStreaming 的源码。
public final Collection<Integer> translateForStreaming(final T transformation, final Context context) {checkNotNull(transformation);
checkNotNull(context);
final Collection<Integer> transformedIds =
translateForStreamingInternal(transformation, context);
configure(transformation, context);
return transformedIds;
}
translateForStreaming 采纳了模板办法的设计模式,定义在 SimpleTransformationTranslator 中,是一个 final 办法,每个 TransformationTranslator 依据各自的须要别离实现 translateForStreamingInternal 办法,个别最初的逻辑导向办法 translateInternal。
translateInternal
每个 TransformationTranslator 的具体实现可能不同,但根本做的是以下几件事中的一部分或其组合:
- 对该 Transformation 构建 StreamNode,增加到 StreamGraph 中(addOperator);
- 对该 StreamNode 和对应上游的 StreamNode 建设边,增加到 StreamGraph 中;
- 对该 Transformation 构建虚节点,增加到 StreamGraph 中。
以上图为例,LegacySinkTransformation 在转换过程中,首先会找到对应的 Translator:LegacySinkTransformationTranslator,并调用其实现的 translateForStreamingInternal(最终导向到 translateInternal)。在 translateInternal 中最外围的逻辑是这两步:
- 调用 StreamGraph.addSink() 增加 StreamNode;
- 调用 StreamGraph.addEdge() 增加边。
StreamGraph.addSink()
addSink 实质也是调用了 addOperator 办法,只是额定在 StreamGraph 的成员 sinks 中增加了对该 StreamNode 的援用。同理,addSource 其实也只是在 addOperator 的根底上往 sources 里增加了对 StreamNode 的援用。
StreamGraph.addEdge()
在往 StreamGraph 里增加边的时候次要是建设上游节点到该节点的分割,因而须要先拿到所有 inputId,并将每个上游节点作为入参调用 addEdge,代码如下:
for (Integer inputId : context.getStreamNodeIds(input)) {streamGraph.addEdge(inputId, transformationId, 0);
}
context.getStreamNodeIds(input) 是从 StreamGraphGenerator.alreadyTransformed 中拿取上游节点的 id,上文提到过,有些 transformations 在转换过程中会变成虚节点从新生成 id,因而间接用 transformation 中保留的上游节点的 id 可能生效,须要做一次映射查问。
拿到上游节点 id 后,咱们就能够调用 streamGraph.addEdge(inputId, transformationId, 0) 增加边,这个办法最终导向 streamGraph.addEdgeInternal。在 addEdgeInternal 中,StreamNode 会对上游节点是虚节点的状况做一些解决,找到虚节点中记录的真正上游节点 X 后,建设 X 到以后 StreamNode 的关系,如下:
private void addEdgeInternal(
Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
List<String> outputNames,
OutputTag outputTag,
ShuffleMode shuffleMode) {if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
// 找到虚节点记录的上游节点
upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
if (outputTag == null) {outputTag = virtualSideOutputNodes.get(virtualId).f1;
}
addEdgeInternal(
upStreamVertexID,
downStreamVertexID,
typeNumber,
partitioner,
null,
outputTag,
shuffleMode);
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
// 找到虚节点记录的上游节点
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {partitioner = virtualPartitionNodes.get(virtualId).f1;
}
shuffleMode = virtualPartitionNodes.get(virtualId).f2;
addEdgeInternal(
upStreamVertexID,
downStreamVertexID,
typeNumber,
partitioner,
outputNames,
outputTag,
shuffleMode);
} else {StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// 设置 partitioner 和 shuffleMode
StreamEdge edge =
new StreamEdge(
upstreamNode,
downstreamNode,
typeNumber,
partitioner,
outputTag,
shuffleMode);
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
}
以 ReduceTransformation 为例(本身 id 为 4),它记录的 input 的 transformation id 为 3,首先通过 alreadyTransformed 找到映射后的节点 id:6。而后依据虚节点的信息 (2, KeyGroupStreamPartitioner, UNDEFINED) 找到真正的上游节点 2,创立 2-4 的 StreamEdge,在创立 StreamEdge 时,会在其中记录 partitioner 和 shuffleMode,并将这条边别离增加到 source vertex 的 outEdges 和 target vertex 的 inEdges 中。