关于flink:Flink生成dot文件的可视化执行计划图工具类使用

9次阅读

共计 3521 个字符,预计需要花费 9 分钟才能阅读完成。

1. 啥也不说,先上工具类代码:

import com.google.gson.Gson;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class FlinkUtils {

    private static class FlinkPlanEdge {
        private int id;
        private String ship_strategy;
        @SuppressWarnings("unused")
        private String side;

        public int getId() {return id;}

        public String getShip_strategy() {return ship_strategy;}
    }

    private static class FlinkPlanNode {private static final FlinkPlanEdge[] NO_EDGES = new FlinkPlanEdge[0];

        private int id;
        private String type;
        @SuppressWarnings("unused")
        private String pact;
        private String contents;
        private int parallelism;
        private FlinkPlanEdge[] predecessors;

        public int getId() {return id;}

        public String getType() {return type;}

        public String getContents() {return contents;}

        public int getParallelism() {return parallelism;}

        public FlinkPlanEdge[] getPredecessors() {if (predecessors == null) {return NO_EDGES;} else {return predecessors;}
        }
    }

    private static class FlinkPlan {private FlinkPlanNode[] nodes;

        public FlinkPlanNode[] getNodes() {return nodes;}
    }

    /**
     * Given a JSON representation of a Flink Topology (from StreamExecutionEnvironment#getExecutionPlan), convert it to
     * a standard .dot format suitable for visualizing with OmniGraffle and other programs.
     * 
     * See http://www.graphviz.org/doc/info/lang.html See http://www.graphviz.org/doc/info/attrs.html
     * 
     * @param plan
     *            JSON version of plan
     * @return dot format graph.
     */
    public static String planToDot(String plan) {FlinkPlan flinkPlan = new Gson().fromJson(plan, FlinkPlan.class);

        StringBuilder result = new StringBuilder("digraph G {\n");

        // Keep track of iteration sources, which are implicit. So we map from
        // the iteration number to the source id
        Map<Integer, Integer> iterationMap = new HashMap<Integer, Integer>();

        final Pattern ITERATION_SOURCE_NAME = Pattern.compile("IterationSource\\-(\\d+)");
        final Pattern ITERATION_SINK_NAME = Pattern.compile("IterationSink\\-(\\d+)");

        final Pattern SOURCE_OR_SINK_NAME = Pattern.compile("(Source|Sink): (.*)");

        FlinkPlanNode[] nodes = flinkPlan.getNodes();
        for (FlinkPlanNode node : nodes) {Matcher m = SOURCE_OR_SINK_NAME.matcher(node.getType());
            boolean isSourceOrSink = m.matches();
            String nodeName = isSourceOrSink ? m.group(2) : node.getContents();
            String nodeShape = isSourceOrSink ? "box" : "ellipse";

            boolean isIterationSourceOrSink = ITERATION_SOURCE_NAME.matcher(node.getType())
                    .matches() || ITERATION_SINK_NAME.matcher(node.getType()).matches();
            String fillColor = isSourceOrSink || isIterationSourceOrSink ? "8EFF4C" : "FFFFFF";

            result.append(String.format("%d [shape=\"%s\", fillcolor=\"#%s\", label=\"%s (%d)\"];\n",
                            node.getId(), nodeShape, fillColor, nodeName, node.getParallelism()));

            m = ITERATION_SOURCE_NAME.matcher(node.getType());
            if (m.matches()) {
                // Map from iteration number to the source node id
                iterationMap.put(Integer.parseInt(m.group(1)), node.getId());
            }
        }

        // Now dump out the edges
        // 1 -> 2 [label = "blah"];
        for (FlinkPlanNode node : nodes) {int nodeId = node.getId();
            FlinkPlanEdge[] edges = node.getPredecessors();
            for (FlinkPlanEdge edge : edges) {result.append(String.format("%d -> %d [label = \"%s\"];\n", edge.getId(), nodeId,
                        edge.getShip_strategy()));
            }

            // Now check if this node is an iteration sink. If so, add an explicit edge
            // from it to the corresponding source node.
            Matcher m = ITERATION_SINK_NAME.matcher(node.getType());
            if (m.matches()) {int iterationID = Integer.parseInt(m.group(1));
                result.append(String.format("%d -> %d [label = \"ITERATION\"];\n", node.getId(),
                        iterationMap.get(iterationID)));
            }
        }

        result.append("}\n");
        return result.toString();}
}

2. 在 flink 中应用

val env = ...
...
FileUtils.write(new File("./target/kmeans-graph.dot"), FlinkUtils.planToDot(env.getExecutionPlan))
env.execute()

3. 运行后在 target 目录下会生成一个.dot 文件

4. 生成可视化图形
这里应用在线 graphviz 工具:http://www.webgraphviz.com/
复制.dot 文件内容到 graphviz 上,点击 Generate Graph! 就会生成一个可视化的执行打算图啦!

正文完
 0