聊聊flink LocalEnvironment的execute方法

42次阅读

共计 13372 个字符,预计需要花费 34 分钟才能阅读完成。


本文主要研究一下 flink LocalEnvironment 的 execute 方法
实例
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<RecordDto> csvInput = env.readCsvFile(csvFilePath)
.pojoType(RecordDto.class, “playerName”, “country”, “year”, “game”, “gold”, “silver”, “bronze”, “total”);

DataSet<Tuple2<String, Integer>> groupedByCountry = csvInput
.flatMap(new FlatMapFunction<RecordDto, Tuple2<String, Integer>>() {

private static final long serialVersionUID = 1L;

@Override
public void flatMap(RecordDto record, Collector<Tuple2<String, Integer>> out) throws Exception {

out.collect(new Tuple2<String, Integer>(record.getCountry(), 1));
}
}).groupBy(0).sum(1);
System.out.println(“===groupedByCountry===”);
groupedByCountry.print();
这里使用 DataSet 从 csv 读取数据,然后进行 flatMap、groupBy、sum 操作,最后调用 print 输出
DataSet.print
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/DataSet.java
/**
* Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls
* the print() method. For programs that are executed in a cluster, this method needs
* to gather the contents of the DataSet back to the client, to print it there.
*
* <p>The string written for each element is defined by the {@link Object#toString()} method.
*
* <p>This method immediately triggers the program execution, similar to the
* {@link #collect()} and {@link #count()} methods.
*
* @see #printToErr()
* @see #printOnTaskManager(String)
*/
public void print() throws Exception {
List<T> elements = collect();
for (T e: elements) {
System.out.println(e);
}
}
print 方法这里主要是调用 collect 方法,获取结果,然后挨个打印
DataSet.collect
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/DataSet.java
/**
* Convenience method to get the elements of a DataSet as a List.
* As DataSet can contain a lot of data, this method should be used with caution.
*
* @return A List containing the elements of the DataSet
*/
public List<T> collect() throws Exception {
final String id = new AbstractID().toString();
final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig());

this.output(new Utils.CollectHelper<>(id, serializer)).name(“collect()”);
JobExecutionResult res = getExecutionEnvironment().execute();

ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
if (accResult != null) {
try {
return SerializedListAccumulator.deserializeList(accResult, serializer);
} catch (ClassNotFoundException e) {
throw new RuntimeException(“Cannot find type class of collected data type.”, e);
} catch (IOException e) {
throw new RuntimeException(“Serialization error while deserializing collected data”, e);
}
} else {
throw new RuntimeException(“The call to collect() could not retrieve the DataSet.”);
}
}
这里调用了 getExecutionEnvironment().execute() 来获取 JobExecutionResult;executionEnvironment 这里是 LocalEnvironment
ExecutionEnvironment.execute
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java
/**
* Triggers the program execution. The environment will execute all parts of the program that have
* resulted in a “sink” operation. Sink operations are for example printing results ({@link DataSet#print()},
* writing results (e.g. {@link DataSet#writeAsText(String)},
* {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
* data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
*
* <p>The program execution will be logged and displayed with a generated default name.
*
* @return The result of the job execution, containing elapsed time and accumulators.
* @throws Exception Thrown, if the program executions fails.
*/
public JobExecutionResult execute() throws Exception {
return execute(getDefaultName());
}

/**
* Gets a default job name, based on the timestamp when this method is invoked.
*
* @return A default job name.
*/
private static String getDefaultName() {
return “Flink Java Job at ” + Calendar.getInstance().getTime();
}

/**
* Triggers the program execution. The environment will execute all parts of the program that have
* resulted in a “sink” operation. Sink operations are for example printing results ({@link DataSet#print()},
* writing results (e.g. {@link DataSet#writeAsText(String)},
* {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
* data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
*
* <p>The program execution will be logged and displayed with the given job name.
*
* @return The result of the job execution, containing elapsed time and accumulators.
* @throws Exception Thrown, if the program executions fails.
*/
public abstract JobExecutionResult execute(String jobName) throws Exception;
具体的 execute 抽象方法由子类去实现,这里我们主要看一下 LocalEnvironment 的 execute 方法
LocalEnvironment.execute
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/LocalEnvironment.java
@Override
public JobExecutionResult execute(String jobName) throws Exception {
if (executor == null) {
startNewSession();
}

Plan p = createProgramPlan(jobName);

// Session management is disabled, revert this commit to enable
//p.setJobId(jobID);
//p.setSessionTimeout(sessionTimeout);

JobExecutionResult result = executor.executePlan(p);

this.lastJobExecutionResult = result;
return result;
}

@Override
@PublicEvolving
public void startNewSession() throws Exception {
if (executor != null) {
// we need to end the previous session
executor.stop();
// create also a new JobID
jobID = JobID.generate();
}

// create a new local executor
executor = PlanExecutor.createLocalExecutor(configuration);
executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());

// if we have a session, start the mini cluster eagerly to have it available across sessions
if (getSessionTimeout() > 0) {
executor.start();

// also install the reaper that will shut it down eventually
executorReaper = new ExecutorReaper(executor);
}
}

这里判断 executor 为 null 的话,会调用 startNewSession,startNewSession 通过 PlanExecutor.createLocalExecutor(configuration) 来创建 executor;如果 sessionTimeout 大于 0,则这里会立马调用 executor.start(),默认该值为 0
之后通过 createProgramPlan 方法来创建 plan
最后通过 executor.executePlan(p) 来获取 JobExecutionResult

PlanExecutor.createLocalExecutor
flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/PlanExecutor.java
private static final String LOCAL_EXECUTOR_CLASS = “org.apache.flink.client.LocalExecutor”;

/**
* Creates an executor that runs the plan locally in a multi-threaded environment.
*
* @return A local executor.
*/
public static PlanExecutor createLocalExecutor(Configuration configuration) {
Class<? extends PlanExecutor> leClass = loadExecutorClass(LOCAL_EXECUTOR_CLASS);

try {
return leClass.getConstructor(Configuration.class).newInstance(configuration);
}
catch (Throwable t) {
throw new RuntimeException(“An error occurred while loading the local executor (”
+ LOCAL_EXECUTOR_CLASS + “).”, t);
}
}

private static Class<? extends PlanExecutor> loadExecutorClass(String className) {
try {
Class<?> leClass = Class.forName(className);
return leClass.asSubclass(PlanExecutor.class);
}
catch (ClassNotFoundException cnfe) {
throw new RuntimeException(“Could not load the executor class (” + className
+ “). Do you have the ‘flink-clients’ project in your dependencies?”);
}
catch (Throwable t) {
throw new RuntimeException(“An error occurred while loading the executor (” + className + “).”, t);
}
}
PlanExecutor.createLocalExecutor 方法通过反射创建 org.apache.flink.client.LocalExecutor
LocalExecutor.executePlan
flink-clients_2.11-1.6.2-sources.jar!/org/apache/flink/client/LocalExecutor.java
/**
* Executes the given program on a local runtime and waits for the job to finish.
*
* <p>If the executor has not been started before, this starts the executor and shuts it down
* after the job finished. If the job runs in session mode, the executor is kept alive until
* no more references to the executor exist.</p>
*
* @param plan The plan of the program to execute.
* @return The net runtime of the program, in milliseconds.
*
* @throws Exception Thrown, if either the startup of the local execution context, or the execution
* caused an exception.
*/
@Override
public JobExecutionResult executePlan(Plan plan) throws Exception {
if (plan == null) {
throw new IllegalArgumentException(“The plan may not be null.”);
}

synchronized (this.lock) {

// check if we start a session dedicated for this execution
final boolean shutDownAtEnd;

if (jobExecutorService == null) {
shutDownAtEnd = true;

// configure the number of local slots equal to the parallelism of the local plan
if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
int maxParallelism = plan.getMaximumParallelism();
if (maxParallelism > 0) {
this.taskManagerNumSlots = maxParallelism;
}
}

// start the cluster for us
start();
}
else {
// we use the existing session
shutDownAtEnd = false;
}

try {
// TODO: Set job’s default parallelism to max number of slots
final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);

Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration);
OptimizedPlan op = pc.compile(plan);

JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration);
JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());

return jobExecutorService.executeJobBlocking(jobGraph);
}
finally {
if (shutDownAtEnd) {
stop();
}
}
}
}

这里当 jobExecutorService 为 null 的时候,会调用 start 方法启动 cluster 创建 jobExecutorService
之后创建 JobGraphGenerator,然后通过 JobGraphGenerator.compileJobGraph 方法,将 plan 构建为 JobGraph
最后调用 jobExecutorService.executeJobBlocking(jobGraph),执行这个 jobGraph,然后返回 JobExecutionResult

LocalExecutor.start
flink-clients_2.11-1.6.2-sources.jar!/org/apache/flink/client/LocalExecutor.java
@Override
public void start() throws Exception {
synchronized (lock) {
if (jobExecutorService == null) {
// create the embedded runtime
jobExecutorServiceConfiguration = createConfiguration();

// start it up
jobExecutorService = createJobExecutorService(jobExecutorServiceConfiguration);
} else {
throw new IllegalStateException(“The local executor was already started.”);
}
}
}

private Configuration createConfiguration() {
Configuration newConfiguration = new Configuration();
newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, getTaskManagerNumSlots());
newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, isDefaultOverwriteFiles());

newConfiguration.addAll(baseConfiguration);

return newConfiguration;
}

private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception {
final JobExecutorService newJobExecutorService;
if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {

if (!configuration.contains(RestOptions.PORT)) {
configuration.setInteger(RestOptions.PORT, 0);
}

final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumTaskManagers(
configuration.getInteger(
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
.setRpcServiceSharing(RpcServiceSharing.SHARED)
.setNumSlotsPerTaskManager(
configuration.getInteger(
TaskManagerOptions.NUM_TASK_SLOTS, 1))
.build();

final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
miniCluster.start();

configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());

newJobExecutorService = miniCluster;
} else {
final LocalFlinkMiniCluster localFlinkMiniCluster = new LocalFlinkMiniCluster(configuration, true);
localFlinkMiniCluster.start();

newJobExecutorService = localFlinkMiniCluster;
}

return newJobExecutorService;
}

start 方法这里先通过 createConfiguration 创建配置文件,再通过 createJobExecutorService 创建 JobExecutorService
createConfiguration 主要设置了 TaskManagerOptions.NUM_TASK_SLOTS 以及 CoreOptions.FILESYTEM_DEFAULT_OVERRIDE
createJobExecutorService 方法这里主要是根据 configuration.getString(CoreOptions.MODE) 的配置来创建不同的 newJobExecutorService
默认是 CoreOptions.NEW_MODE 模式,它先创建 MiniClusterConfiguration,然后创建 MiniCluster(JobExecutorService),然后调用 MiniCluster.start 方法启动之后返回
非 CoreOptions.NEW_MODE 模式,则创建的是 LocalFlinkMiniCluster(JobExecutorService),然后调用 LocalFlinkMiniCluster.start() 启动之后返回

MiniCluster.executeJobBlocking
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/minicluster/MiniCluster.java
/**
* This method runs a job in blocking mode. The method returns only after the job
* completed successfully, or after it failed terminally.
*
* @param job The Flink job to execute
* @return The result of the job execution
*
* @throws JobExecutionException Thrown if anything went amiss during initial job launch,
* or if the job terminally failed.
*/
@Override
public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
checkNotNull(job, “job is null”);

final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);

final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
(JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));

final JobResult jobResult;

try {
jobResult = jobResultFuture.get();
} catch (ExecutionException e) {
throw new JobExecutionException(job.getJobID(), “Could not retrieve JobResult.”, ExceptionUtils.stripExecutionException(e));
}

try {
return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
} catch (IOException | ClassNotFoundException e) {
throw new JobExecutionException(job.getJobID(), e);
}
}

MiniCluster.executeJobBlocking 方法,先调用 submitJob(job) 方法,提交这个 JobGraph,它返回一个 CompletableFuture(submissionFuture)
该 CompletableFuture(submissionFuture) 通过 thenCompose 连接了 requestJobResult 方法来根据 jobId 请求 jobResult(jobResultFuture)
最后通过 jobResultFuture.get() 获取 JobExecutionResult

小结

DataSet 的 print 方法调用了 collect 方法,而 collect 方法则调用 getExecutionEnvironment().execute() 来获取 JobExecutionResult,executionEnvironment 这里是 LocalEnvironment
ExecutionEnvironment.execute 方法内部调用了抽象方法 execute(String jobName),该抽象方法由子类实现,这里是 LocalEnvironment.execute,它先通过 startNewSession,使用 PlanExecutor.createLocalExecutor 创建 LocalExecutor,之后通过 createProgramPlan 创建 plan,最后调用 LocalExecutor.executePlan 来获取 JobExecutionResult
LocalExecutor.executePlan 方法它先判断 jobExecutorService,如果为 null,则调用 start 方法创建 jobExecutorService(这里根据 CoreOptions.MODE 配置,如果是 CoreOptions.NEW_MODE 则创建的 jobExecutorService 是 MiniCluster,否则创建的 jobExecutorService 是 LocalFlinkMiniCluster),这里创建的 jobExecutorService 为 MiniCluster;之后通过 JobGraphGenerator 将 plan 转换为 jobGraph;最后调用 jobExecutorService.executeJobBlocking(jobGraph),执行这个 jobGraph,然后返回 JobExecutionResult

doc

LocalEnvironment
LocalExecutor
MiniCluster

正文完
 0