乐趣区

聊聊flink的Execution Plan Visualization


本文主要研究一下 flink 的 Execution Plan Visualization
实例
代码
@Test
public void testExecutionPlan(){
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(WORDS)
.flatMap(new WordCountTest.Tokenizer())
.keyBy(0)
.sum(1);
dataStream.print();
System.out.println(env.getExecutionPlan());
}
json
{
“nodes”: [
{
“id”: 1,
“type”: “Source: Collection Source”,
“pact”: “Data Source”,
“contents”: “Source: Collection Source”,
“parallelism”: 1
},
{
“id”: 2,
“type”: “Flat Map”,
“pact”: “Operator”,
“contents”: “Flat Map”,
“parallelism”: 4,
“predecessors”: [
{
“id”: 1,
“ship_strategy”: “REBALANCE”,
“side”: “second”
}
]
},
{
“id”: 4,
“type”: “Keyed Aggregation”,
“pact”: “Operator”,
“contents”: “Keyed Aggregation”,
“parallelism”: 4,
“predecessors”: [
{
“id”: 2,
“ship_strategy”: “HASH”,
“side”: “second”
}
]
},
{
“id”: 5,
“type”: “Sink: Print to Std. Out”,
“pact”: “Data Sink”,
“contents”: “Sink: Print to Std. Out”,
“parallelism”: 4,
“predecessors”: [
{
“id”: 4,
“ship_strategy”: “FORWARD”,
“side”: “second”
}
]
}
]
}
可视化
打开 flink plan visualizer 将上面的 json,输入到文本框,点击 Draw 进行可视化如下:
StreamExecutionEnvironment.getExecutionPlan
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@Public
public abstract class StreamExecutionEnvironment {
//……

/**
* Creates the plan with which the system will execute the program, and
* returns it as a String using a JSON representation of the execution data
* flow graph. Note that this needs to be called, before the plan is
* executed.
*
* @return The execution plan of the program, as a JSON String.
*/
public String getExecutionPlan() {
return getStreamGraph().getStreamingPlanAsJSON();
}

/**
* Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
*
* @return The streamgraph representing the transformations
*/
@Internal
public StreamGraph getStreamGraph() {
if (transformations.size() <= 0) {
throw new IllegalStateException(“No operators defined in streaming topology. Cannot execute.”);
}
return StreamGraphGenerator.generate(this, transformations);
}

//……
}
StreamExecutionEnvironment 的 getExecutionPlan 方法调用了 getStreamGraph 方法;getStreamGraph 方法使用 StreamGraphGenerator.generate 生成了 StreamGraph;之后就是调用 StreamGraph.getStreamingPlanAsJSON 来获取 json 格式的 execution plan
StreamGraph.getStreamingPlanAsJSON
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraph.java
@Internal
public class StreamGraph extends StreamingPlan {

private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);

private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;

private final StreamExecutionEnvironment environment;
private final ExecutionConfig executionConfig;
private final CheckpointConfig checkpointConfig;

private boolean chaining;

private Map<Integer, StreamNode> streamNodes;
private Set<Integer> sources;
private Set<Integer> sinks;
private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes;
private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtualPartitionNodes;

protected Map<Integer, String> vertexIDtoBrokerID;
protected Map<Integer, Long> vertexIDtoLoopTimeout;
private StateBackend stateBackend;
private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;

//……

public String getStreamingPlanAsJSON() {
try {
return new JSONGenerator(this).getJSON();
}
catch (Exception e) {
throw new RuntimeException(“JSON plan creation failed”, e);
}
}

//……
}
StreamGraph 的 getStreamingPlanAsJSON 方法使用 JSONGenerator 来序列化自己,返回 json 格式的 execution plan
小结

flink 提供了 flink plan visualizer 的在线地址,用于进行 execution plan 的可视化,它接收 json 形式的 execution plan
StreamExecutionEnvironment 的 getExecutionPlan 方法调用了 getStreamGraph 方法;getStreamGraph 方法使用 StreamGraphGenerator.generate 生成了 StreamGraph
StreamGraph 的 getStreamingPlanAsJSON 方法使用 JSONGenerator 来序列化自己,返回 json 格式的 execution plan

doc

Execution Plans
flink plan visualizer

退出移动版