序本文主要研究一下flink的Execution Plan Visualization实例代码 @Test public void testExecutionPlan(){ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(WORDS) .flatMap(new WordCountTest.Tokenizer()) .keyBy(0) .sum(1); dataStream.print(); System.out.println(env.getExecutionPlan()); }json{ “nodes”: [ { “id”: 1, “type”: “Source: Collection Source”, “pact”: “Data Source”, “contents”: “Source: Collection Source”, “parallelism”: 1 }, { “id”: 2, “type”: “Flat Map”, “pact”: “Operator”, “contents”: “Flat Map”, “parallelism”: 4, “predecessors”: [ { “id”: 1, “ship_strategy”: “REBALANCE”, “side”: “second” } ] }, { “id”: 4, “type”: “Keyed Aggregation”, “pact”: “Operator”, “contents”: “Keyed Aggregation”, “parallelism”: 4, “predecessors”: [ { “id”: 2, “ship_strategy”: “HASH”, “side”: “second” } ] }, { “id”: 5, “type”: “Sink: Print to Std. Out”, “pact”: “Data Sink”, “contents”: “Sink: Print to Std. Out”, “parallelism”: 4, “predecessors”: [ { “id”: 4, “ship_strategy”: “FORWARD”, “side”: “second” } ] } ]}可视化打开flink plan visualizer将上面的json,输入到文本框,点击Draw进行可视化如下:StreamExecutionEnvironment.getExecutionPlanflink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java@Publicpublic abstract class StreamExecutionEnvironment { //…… /** * Creates the plan with which the system will execute the program, and * returns it as a String using a JSON representation of the execution data * flow graph. Note that this needs to be called, before the plan is * executed. * * @return The execution plan of the program, as a JSON String. / public String getExecutionPlan() { return getStreamGraph().getStreamingPlanAsJSON(); } /* * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job. * * @return The streamgraph representing the transformations */ @Internal public StreamGraph getStreamGraph() { if (transformations.size() <= 0) { throw new IllegalStateException(“No operators defined in streaming topology. Cannot execute.”); } return StreamGraphGenerator.generate(this, transformations); } //……}StreamExecutionEnvironment的getExecutionPlan方法调用了getStreamGraph方法;getStreamGraph方法使用StreamGraphGenerator.generate生成了StreamGraph;之后就是调用StreamGraph.getStreamingPlanAsJSON来获取json格式的execution planStreamGraph.getStreamingPlanAsJSONflink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraph.java@Internalpublic class StreamGraph extends StreamingPlan { private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class); private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME; private final StreamExecutionEnvironment environment; private final ExecutionConfig executionConfig; private final CheckpointConfig checkpointConfig; private boolean chaining; private Map<Integer, StreamNode> streamNodes; private Set<Integer> sources; private Set<Integer> sinks; private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes; private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes; private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtualPartitionNodes; protected Map<Integer, String> vertexIDtoBrokerID; protected Map<Integer, Long> vertexIDtoLoopTimeout; private StateBackend stateBackend; private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs; //…… public String getStreamingPlanAsJSON() { try { return new JSONGenerator(this).getJSON(); } catch (Exception e) { throw new RuntimeException(“JSON plan creation failed”, e); } } //……}StreamGraph的getStreamingPlanAsJSON方法使用JSONGenerator来序列化自己,返回json格式的execution plan小结flink提供了flink plan visualizer的在线地址,用于进行execution plan的可视化,它接收json形式的execution planStreamExecutionEnvironment的getExecutionPlan方法调用了getStreamGraph方法;getStreamGraph方法使用StreamGraphGenerator.generate生成了StreamGraphStreamGraph的getStreamingPlanAsJSON方法使用JSONGenerator来序列化自己,返回json格式的execution plandocExecution Plansflink plan visualizer