1.啥也不说,先上工具类代码:

import com.google.gson.Gson;import java.util.HashMap;import java.util.Map;import java.util.regex.Matcher;import java.util.regex.Pattern;public class FlinkUtils {    private static class FlinkPlanEdge {        private int id;        private String ship_strategy;        @SuppressWarnings("unused")        private String side;        public int getId() {            return id;        }        public String getShip_strategy() {            return ship_strategy;        }    }    private static class FlinkPlanNode {        private static final FlinkPlanEdge[] NO_EDGES = new FlinkPlanEdge[0];        private int id;        private String type;        @SuppressWarnings("unused")        private String pact;        private String contents;        private int parallelism;        private FlinkPlanEdge[] predecessors;        public int getId() {            return id;        }        public String getType() {            return type;        }        public String getContents() {            return contents;        }        public int getParallelism() {            return parallelism;        }        public FlinkPlanEdge[] getPredecessors() {            if (predecessors == null) {                return NO_EDGES;            } else {                return predecessors;            }        }    }    private static class FlinkPlan {        private FlinkPlanNode[] nodes;        public FlinkPlanNode[] getNodes() {            return nodes;        }    }    /**     * Given a JSON representation of a Flink Topology (from StreamExecutionEnvironment#getExecutionPlan), convert it to     * a standard .dot format suitable for visualizing with OmniGraffle and other programs.     *      * See http://www.graphviz.org/doc/info/lang.html See http://www.graphviz.org/doc/info/attrs.html     *      * @param plan     *            JSON version of plan     * @return dot format graph.     */    public static String planToDot(String plan) {        FlinkPlan flinkPlan = new Gson().fromJson(plan, FlinkPlan.class);        StringBuilder result = new StringBuilder("digraph G {\n");        // Keep track of iteration sources, which are implicit. So we map from        // the iteration number to the source id        Map<Integer, Integer> iterationMap = new HashMap<Integer, Integer>();        final Pattern ITERATION_SOURCE_NAME = Pattern.compile("IterationSource\\-(\\d+)");        final Pattern ITERATION_SINK_NAME = Pattern.compile("IterationSink\\-(\\d+)");        final Pattern SOURCE_OR_SINK_NAME = Pattern.compile("(Source|Sink): (.*)");        FlinkPlanNode[] nodes = flinkPlan.getNodes();        for (FlinkPlanNode node : nodes) {            Matcher m = SOURCE_OR_SINK_NAME.matcher(node.getType());            boolean isSourceOrSink = m.matches();            String nodeName = isSourceOrSink ? m.group(2) : node.getContents();            String nodeShape = isSourceOrSink ? "box" : "ellipse";            boolean isIterationSourceOrSink = ITERATION_SOURCE_NAME.matcher(node.getType())                    .matches() || ITERATION_SINK_NAME.matcher(node.getType()).matches();            String fillColor = isSourceOrSink || isIterationSourceOrSink ? "8EFF4C" : "FFFFFF";            result.append(                    String.format("  %d [shape=\"%s\", fillcolor=\"#%s\", label=\"%s (%d)\"];\n",                            node.getId(), nodeShape, fillColor, nodeName, node.getParallelism()));            m = ITERATION_SOURCE_NAME.matcher(node.getType());            if (m.matches()) {                // Map from iteration number to the source node id                iterationMap.put(Integer.parseInt(m.group(1)), node.getId());            }        }        // Now dump out the edges        // 1 -> 2 [label = "blah"];        for (FlinkPlanNode node : nodes) {            int nodeId = node.getId();            FlinkPlanEdge[] edges = node.getPredecessors();            for (FlinkPlanEdge edge : edges) {                result.append(String.format("  %d -> %d [label = \"%s\"];\n", edge.getId(), nodeId,                        edge.getShip_strategy()));            }            // Now check if this node is an iteration sink. If so, add an explicit edge            // from it to the corresponding source node.            Matcher m = ITERATION_SINK_NAME.matcher(node.getType());            if (m.matches()) {                int iterationID = Integer.parseInt(m.group(1));                result.append(String.format("  %d -> %d [label = \"ITERATION\"];\n", node.getId(),                        iterationMap.get(iterationID)));            }        }        result.append("}\n");        return result.toString();    }}

2.在flink中应用

val env = ......FileUtils.write(new File("./target/kmeans-graph.dot"), FlinkUtils.planToDot(env.getExecutionPlan))env.execute()

3.运行后在target目录下会生成一个.dot文件

4.生成可视化图形
这里应用在线graphviz工具:http://www.webgraphviz.com/
复制.dot文件内容到graphviz上,点击Generate Graph!就会生成一个可视化的执行打算图啦!