序本文主要研究一下flink LocalEnvironment的execute方法实例 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<RecordDto> csvInput = env.readCsvFile(csvFilePath) .pojoType(RecordDto.class, “playerName”, “country”, “year”, “game”, “gold”, “silver”, “bronze”, “total”); DataSet<Tuple2<String, Integer>> groupedByCountry = csvInput .flatMap(new FlatMapFunction<RecordDto, Tuple2<String, Integer>>() { private static final long serialVersionUID = 1L; @Override public void flatMap(RecordDto record, Collector<Tuple2<String, Integer>> out) throws Exception { out.collect(new Tuple2<String, Integer>(record.getCountry(), 1)); } }).groupBy(0).sum(1); System.out.println("===groupedByCountry==="); groupedByCountry.print();这里使用DataSet从csv读取数据,然后进行flatMap、groupBy、sum操作,最后调用print输出DataSet.printflink-java-1.6.2-sources.jar!/org/apache/flink/api/java/DataSet.java /** * Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls * the print() method. For programs that are executed in a cluster, this method needs * to gather the contents of the DataSet back to the client, to print it there. * * <p>The string written for each element is defined by the {@link Object#toString()} method. * * <p>This method immediately triggers the program execution, similar to the * {@link #collect()} and {@link #count()} methods. * * @see #printToErr() * @see #printOnTaskManager(String) / public void print() throws Exception { List<T> elements = collect(); for (T e: elements) { System.out.println(e); } }print方法这里主要是调用collect方法,获取结果,然后挨个打印DataSet.collectflink-java-1.6.2-sources.jar!/org/apache/flink/api/java/DataSet.java /* * Convenience method to get the elements of a DataSet as a List. * As DataSet can contain a lot of data, this method should be used with caution. * * @return A List containing the elements of the DataSet / public List<T> collect() throws Exception { final String id = new AbstractID().toString(); final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig()); this.output(new Utils.CollectHelper<>(id, serializer)).name(“collect()”); JobExecutionResult res = getExecutionEnvironment().execute(); ArrayList<byte[]> accResult = res.getAccumulatorResult(id); if (accResult != null) { try { return SerializedListAccumulator.deserializeList(accResult, serializer); } catch (ClassNotFoundException e) { throw new RuntimeException(“Cannot find type class of collected data type.”, e); } catch (IOException e) { throw new RuntimeException(“Serialization error while deserializing collected data”, e); } } else { throw new RuntimeException(“The call to collect() could not retrieve the DataSet.”); } }这里调用了getExecutionEnvironment().execute()来获取JobExecutionResult;executionEnvironment这里是LocalEnvironmentExecutionEnvironment.executeflink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java /* * Triggers the program execution. The environment will execute all parts of the program that have * resulted in a “sink” operation. Sink operations are for example printing results ({@link DataSet#print()}, * writing results (e.g. {@link DataSet#writeAsText(String)}, * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}. * * <p>The program execution will be logged and displayed with a generated default name. * * @return The result of the job execution, containing elapsed time and accumulators. * @throws Exception Thrown, if the program executions fails. / public JobExecutionResult execute() throws Exception { return execute(getDefaultName()); } /* * Gets a default job name, based on the timestamp when this method is invoked. * * @return A default job name. / private static String getDefaultName() { return “Flink Java Job at " + Calendar.getInstance().getTime(); } /* * Triggers the program execution. The environment will execute all parts of the program that have * resulted in a “sink” operation. Sink operations are for example printing results ({@link DataSet#print()}, * writing results (e.g. {@link DataSet#writeAsText(String)}, * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}. * * <p>The program execution will be logged and displayed with the given job name. * * @return The result of the job execution, containing elapsed time and accumulators. * @throws Exception Thrown, if the program executions fails. / public abstract JobExecutionResult execute(String jobName) throws Exception;具体的execute抽象方法由子类去实现,这里我们主要看一下LocalEnvironment的execute方法LocalEnvironment.executeflink-java-1.6.2-sources.jar!/org/apache/flink/api/java/LocalEnvironment.java @Override public JobExecutionResult execute(String jobName) throws Exception { if (executor == null) { startNewSession(); } Plan p = createProgramPlan(jobName); // Session management is disabled, revert this commit to enable //p.setJobId(jobID); //p.setSessionTimeout(sessionTimeout); JobExecutionResult result = executor.executePlan(p); this.lastJobExecutionResult = result; return result; } @Override @PublicEvolving public void startNewSession() throws Exception { if (executor != null) { // we need to end the previous session executor.stop(); // create also a new JobID jobID = JobID.generate(); } // create a new local executor executor = PlanExecutor.createLocalExecutor(configuration); executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled()); // if we have a session, start the mini cluster eagerly to have it available across sessions if (getSessionTimeout() > 0) { executor.start(); // also install the reaper that will shut it down eventually executorReaper = new ExecutorReaper(executor); } }这里判断executor为null的话,会调用startNewSession,startNewSession通过PlanExecutor.createLocalExecutor(configuration)来创建executor;如果sessionTimeout大于0,则这里会立马调用executor.start(),默认该值为0之后通过createProgramPlan方法来创建plan最后通过executor.executePlan(p)来获取JobExecutionResultPlanExecutor.createLocalExecutorflink-core-1.6.2-sources.jar!/org/apache/flink/api/common/PlanExecutor.java private static final String LOCAL_EXECUTOR_CLASS = “org.apache.flink.client.LocalExecutor”; /* * Creates an executor that runs the plan locally in a multi-threaded environment. * * @return A local executor. / public static PlanExecutor createLocalExecutor(Configuration configuration) { Class<? extends PlanExecutor> leClass = loadExecutorClass(LOCAL_EXECUTOR_CLASS); try { return leClass.getConstructor(Configuration.class).newInstance(configuration); } catch (Throwable t) { throw new RuntimeException(“An error occurred while loading the local executor (” + LOCAL_EXECUTOR_CLASS + “).”, t); } } private static Class<? extends PlanExecutor> loadExecutorClass(String className) { try { Class<?> leClass = Class.forName(className); return leClass.asSubclass(PlanExecutor.class); } catch (ClassNotFoundException cnfe) { throw new RuntimeException(“Could not load the executor class (” + className + “). Do you have the ‘flink-clients’ project in your dependencies?”); } catch (Throwable t) { throw new RuntimeException(“An error occurred while loading the executor (” + className + “).”, t); } }PlanExecutor.createLocalExecutor方法通过反射创建org.apache.flink.client.LocalExecutorLocalExecutor.executePlanflink-clients_2.11-1.6.2-sources.jar!/org/apache/flink/client/LocalExecutor.java /* * Executes the given program on a local runtime and waits for the job to finish. * * <p>If the executor has not been started before, this starts the executor and shuts it down * after the job finished. If the job runs in session mode, the executor is kept alive until * no more references to the executor exist.</p> * * @param plan The plan of the program to execute. * @return The net runtime of the program, in milliseconds. * * @throws Exception Thrown, if either the startup of the local execution context, or the execution * caused an exception. / @Override public JobExecutionResult executePlan(Plan plan) throws Exception { if (plan == null) { throw new IllegalArgumentException(“The plan may not be null.”); } synchronized (this.lock) { // check if we start a session dedicated for this execution final boolean shutDownAtEnd; if (jobExecutorService == null) { shutDownAtEnd = true; // configure the number of local slots equal to the parallelism of the local plan if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) { int maxParallelism = plan.getMaximumParallelism(); if (maxParallelism > 0) { this.taskManagerNumSlots = maxParallelism; } } // start the cluster for us start(); } else { // we use the existing session shutDownAtEnd = false; } try { // TODO: Set job’s default parallelism to max number of slots final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots); final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers); Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration); OptimizedPlan op = pc.compile(plan); JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration); JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId()); return jobExecutorService.executeJobBlocking(jobGraph); } finally { if (shutDownAtEnd) { stop(); } } } }这里当jobExecutorService为null的时候,会调用start方法启动cluster创建jobExecutorService之后创建JobGraphGenerator,然后通过JobGraphGenerator.compileJobGraph方法,将plan构建为JobGraph最后调用jobExecutorService.executeJobBlocking(jobGraph),执行这个jobGraph,然后返回JobExecutionResultLocalExecutor.startflink-clients_2.11-1.6.2-sources.jar!/org/apache/flink/client/LocalExecutor.java @Override public void start() throws Exception { synchronized (lock) { if (jobExecutorService == null) { // create the embedded runtime jobExecutorServiceConfiguration = createConfiguration(); // start it up jobExecutorService = createJobExecutorService(jobExecutorServiceConfiguration); } else { throw new IllegalStateException(“The local executor was already started.”); } } } private Configuration createConfiguration() { Configuration newConfiguration = new Configuration(); newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, getTaskManagerNumSlots()); newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, isDefaultOverwriteFiles()); newConfiguration.addAll(baseConfiguration); return newConfiguration; } private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception { final JobExecutorService newJobExecutorService; if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) { if (!configuration.contains(RestOptions.PORT)) { configuration.setInteger(RestOptions.PORT, 0); } final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() .setConfiguration(configuration) .setNumTaskManagers( configuration.getInteger( ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)) .setRpcServiceSharing(RpcServiceSharing.SHARED) .setNumSlotsPerTaskManager( configuration.getInteger( TaskManagerOptions.NUM_TASK_SLOTS, 1)) .build(); final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration); miniCluster.start(); configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort()); newJobExecutorService = miniCluster; } else { final LocalFlinkMiniCluster localFlinkMiniCluster = new LocalFlinkMiniCluster(configuration, true); localFlinkMiniCluster.start(); newJobExecutorService = localFlinkMiniCluster; } return newJobExecutorService; }start方法这里先通过createConfiguration创建配置文件,再通过createJobExecutorService创建JobExecutorServicecreateConfiguration主要设置了TaskManagerOptions.NUM_TASK_SLOTS以及CoreOptions.FILESYTEM_DEFAULT_OVERRIDEcreateJobExecutorService方法这里主要是根据configuration.getString(CoreOptions.MODE)的配置来创建不同的newJobExecutorService默认是CoreOptions.NEW_MODE模式,它先创建MiniClusterConfiguration,然后创建MiniCluster(JobExecutorService),然后调用MiniCluster.start方法启动之后返回非CoreOptions.NEW_MODE模式,则创建的是LocalFlinkMiniCluster(JobExecutorService),然后调用LocalFlinkMiniCluster.start()启动之后返回MiniCluster.executeJobBlockingflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/minicluster/MiniCluster.java /* * This method runs a job in blocking mode. The method returns only after the job * completed successfully, or after it failed terminally. * * @param job The Flink job to execute * @return The result of the job execution * * @throws JobExecutionException Thrown if anything went amiss during initial job launch, * or if the job terminally failed. */ @Override public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException { checkNotNull(job, “job is null”); final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job); final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose( (JobSubmissionResult ignored) -> requestJobResult(job.getJobID())); final JobResult jobResult; try { jobResult = jobResultFuture.get(); } catch (ExecutionException e) { throw new JobExecutionException(job.getJobID(), “Could not retrieve JobResult.”, ExceptionUtils.stripExecutionException(e)); } try { return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader()); } catch (IOException | ClassNotFoundException e) { throw new JobExecutionException(job.getJobID(), e); } }MiniCluster.executeJobBlocking方法,先调用submitJob(job)方法,提交这个JobGraph,它返回一个CompletableFuture(submissionFuture)该CompletableFuture(submissionFuture)通过thenCompose连接了requestJobResult方法来根据jobId请求jobResult(jobResultFuture)最后通过jobResultFuture.get()获取JobExecutionResult小结DataSet的print方法调用了collect方法,而collect方法则调用getExecutionEnvironment().execute()来获取JobExecutionResult,executionEnvironment这里是LocalEnvironmentExecutionEnvironment.execute方法内部调用了抽象方法execute(String jobName),该抽象方法由子类实现,这里是LocalEnvironment.execute,它先通过startNewSession,使用PlanExecutor.createLocalExecutor创建LocalExecutor,之后通过createProgramPlan创建plan,最后调用LocalExecutor.executePlan来获取JobExecutionResultLocalExecutor.executePlan方法它先判断jobExecutorService,如果为null,则调用start方法创建jobExecutorService(这里根据CoreOptions.MODE配置,如果是CoreOptions.NEW_MODE则创建的jobExecutorService是MiniCluster,否则创建的jobExecutorService是LocalFlinkMiniCluster),这里创建的jobExecutorService为MiniCluster;之后通过JobGraphGenerator将plan转换为jobGraph;最后调用jobExecutorService.executeJobBlocking(jobGraph),执行这个jobGraph,然后返回JobExecutionResultdocLocalEnvironmentLocalExecutorMiniCluster