聊聊flink的Parallel Execution

56次阅读

共计 14213 个字符,预计需要花费 36 分钟才能阅读完成。


本文主要研究一下 flink 的 Parallel Execution
实例
Operator Level
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = […]
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(5);

wordCounts.print();

env.execute(“Word Count Example”);
operators、data sources、data sinks 都可以调用 setParallelism() 方法来设置 parallelism
Execution Environment Level
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = […]
DataStream<Tuple2<String, Integer>> wordCounts = […]
wordCounts.print();

env.execute(“Word Count Example”);
在 ExecutionEnvironment 里头可以通过 setParallelism 来给 operators、data sources、data sinks 设置默认的 parallelism;如果 operators、data sources、data sinks 自己有设置 parallelism 则会覆盖 ExecutionEnvironment 设置的 parallelism
Client Level
./bin/flink run -p 10 ../examples/*WordCount-java*.jar
或者
try {
PackagedProgram program = new PackagedProgram(file, args);
InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport(“localhost:6123”);
Configuration config = new Configuration();

Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

// set the parallelism to 10 here
client.run(program, 10, true);

} catch (ProgramInvocationException e) {
e.printStackTrace();
}
使用 CLI client,可以在命令行调用是用 - p 来指定,或者 Java/Scala 调用时在 Client.run 的参数中指定 parallelism
System Level
# The parallelism used for programs that did not specify and other parallelism.

parallelism.default: 1
可以在 flink-conf.yaml 中通过 parallelism.default 配置项给所有 execution environments 指定系统级的默认 parallelism
ExecutionEnvironment
flink-java-1.7.1-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java
@Public
public abstract class ExecutionEnvironment {
//……

private final ExecutionConfig config = new ExecutionConfig();

/**
* Sets the parallelism for operations executed through this environment.
* Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with
* x parallel instances.
*
* <p>This method overrides the default parallelism for this environment.
* The {@link LocalEnvironment} uses by default a value equal to the number of hardware
* contexts (CPU cores / threads). When executing the program via the command line client
* from a JAR file, the default parallelism is the one configured for that setup.
*
* @param parallelism The parallelism
*/
public void setParallelism(int parallelism) {
config.setParallelism(parallelism);
}

@Internal
public Plan createProgramPlan(String jobName, boolean clearSinks) {
if (this.sinks.isEmpty()) {
if (wasExecuted) {
throw new RuntimeException(“No new data sinks have been defined since the ” +
“last execution. The last execution refers to the latest call to ” +
“‘execute()’, ‘count()’, ‘collect()’, or ‘print()’.”);
} else {
throw new RuntimeException(“No data sinks have been created yet. ” +
“A program needs at least one sink that consumes data. ” +
“Examples are writing the data set or printing it.”);
}
}

if (jobName == null) {
jobName = getDefaultName();
}

OperatorTranslation translator = new OperatorTranslation();
Plan plan = translator.translateToPlan(this.sinks, jobName);

if (getParallelism() > 0) {
plan.setDefaultParallelism(getParallelism());
}
plan.setExecutionConfig(getConfig());

// Check plan for GenericTypeInfo’s and register the types at the serializers.
if (!config.isAutoTypeRegistrationDisabled()) {
plan.accept(new Visitor<org.apache.flink.api.common.operators.Operator<?>>() {

private final Set<Class<?>> registeredTypes = new HashSet<>();
private final Set<org.apache.flink.api.common.operators.Operator<?>> visitedOperators = new HashSet<>();

@Override
public boolean preVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {
if (!visitedOperators.add(visitable)) {
return false;
}
OperatorInformation<?> opInfo = visitable.getOperatorInfo();
Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, registeredTypes);
return true;
}

@Override
public void postVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {}
});
}

try {
registerCachedFilesWithPlan(plan);
} catch (Exception e) {
throw new RuntimeException(“Error while registering cached files: ” + e.getMessage(), e);
}

// clear all the sinks such that the next execution does not redo everything
if (clearSinks) {
this.sinks.clear();
wasExecuted = true;
}

// All types are registered now. Print information.
int registeredTypes = config.getRegisteredKryoTypes().size() +
config.getRegisteredPojoTypes().size() +
config.getRegisteredTypesWithKryoSerializerClasses().size() +
config.getRegisteredTypesWithKryoSerializers().size();
int defaultKryoSerializers = config.getDefaultKryoSerializers().size() +
config.getDefaultKryoSerializerClasses().size();
LOG.info(“The job has {} registered types and {} default Kryo serializers”, registeredTypes, defaultKryoSerializers);

if (config.isForceKryoEnabled() && config.isForceAvroEnabled()) {
LOG.warn(“In the ExecutionConfig, both Avro and Kryo are enforced. Using Kryo serializer”);
}
if (config.isForceKryoEnabled()) {
LOG.info(“Using KryoSerializer for serializing POJOs”);
}
if (config.isForceAvroEnabled()) {
LOG.info(“Using AvroSerializer for serializing POJOs”);
}

if (LOG.isDebugEnabled()) {
LOG.debug(“Registered Kryo types: {}”, config.getRegisteredKryoTypes().toString());
LOG.debug(“Registered Kryo with Serializers types: {}”, config.getRegisteredTypesWithKryoSerializers().entrySet().toString());
LOG.debug(“Registered Kryo with Serializer Classes types: {}”, config.getRegisteredTypesWithKryoSerializerClasses().entrySet().toString());
LOG.debug(“Registered Kryo default Serializers: {}”, config.getDefaultKryoSerializers().entrySet().toString());
LOG.debug(“Registered Kryo default Serializers Classes {}”, config.getDefaultKryoSerializerClasses().entrySet().toString());
LOG.debug(“Registered POJO types: {}”, config.getRegisteredPojoTypes().toString());

// print information about static code analysis
LOG.debug(“Static code analysis mode: {}”, config.getCodeAnalysisMode());
}

return plan;
}

//……
}
ExecutionEnvironment 提供了 setParallelism 方法,给 ExecutionConfig 指定 parallelism;最后 createProgramPlan 方法创建 Plan 后会读取 ExecutionConfig 的 parallelism,给 Plan 设置 defaultParallelism
LocalEnvironment
flink-java-1.7.1-sources.jar!/org/apache/flink/api/java/LocalEnvironment.java
@Public
public class LocalEnvironment extends ExecutionEnvironment {

//……

public JobExecutionResult execute(String jobName) throws Exception {
if (executor == null) {
startNewSession();
}

Plan p = createProgramPlan(jobName);

// Session management is disabled, revert this commit to enable
//p.setJobId(jobID);
//p.setSessionTimeout(sessionTimeout);

JobExecutionResult result = executor.executePlan(p);

this.lastJobExecutionResult = result;
return result;
}

//……
}
LocalEnvironment 的 execute 调用的是 LocalExecutor 的 executePlan
LocalExecutor
flink-clients_2.11-1.7.1-sources.jar!/org/apache/flink/client/LocalExecutor.java
public class LocalExecutor extends PlanExecutor {

//……

@Override
public JobExecutionResult executePlan(Plan plan) throws Exception {
if (plan == null) {
throw new IllegalArgumentException(“The plan may not be null.”);
}

synchronized (this.lock) {

// check if we start a session dedicated for this execution
final boolean shutDownAtEnd;

if (jobExecutorService == null) {
shutDownAtEnd = true;

// configure the number of local slots equal to the parallelism of the local plan
if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
int maxParallelism = plan.getMaximumParallelism();
if (maxParallelism > 0) {
this.taskManagerNumSlots = maxParallelism;
}
}

// start the cluster for us
start();
}
else {
// we use the existing session
shutDownAtEnd = false;
}

try {
// TODO: Set job’s default parallelism to max number of slots
final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);

Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration);
OptimizedPlan op = pc.compile(plan);

JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration);
JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());

return jobExecutorService.executeJobBlocking(jobGraph);
}
finally {
if (shutDownAtEnd) {
stop();
}
}
}
}

//……
}
LocalExecutor 的 executePlan 方法还会根据 slotsPerTaskManager 及 numTaskManagers 对 plan 设置 defaultParallelism
RemoteEnvironment
flink-java-1.7.1-sources.jar!/org/apache/flink/api/java/RemoteEnvironment.java
@Public
public class RemoteEnvironment extends ExecutionEnvironment {

//……

public JobExecutionResult execute(String jobName) throws Exception {
PlanExecutor executor = getExecutor();

Plan p = createProgramPlan(jobName);

// Session management is disabled, revert this commit to enable
//p.setJobId(jobID);
//p.setSessionTimeout(sessionTimeout);

JobExecutionResult result = executor.executePlan(p);

this.lastJobExecutionResult = result;
return result;
}

//……
}
RemoteEnvironment 的 execute 调用的是 RemoteExecutor 的 executePlan
RemoteExecutor
flink-clients_2.11-1.7.1-sources.jar!/org/apache/flink/client/RemoteExecutor.java
public class RemoteExecutor extends PlanExecutor {

private final Object lock = new Object();

private final List<URL> jarFiles;

private final List<URL> globalClasspaths;

private final Configuration clientConfiguration;

private ClusterClient<?> client;

//……

@Override
public JobExecutionResult executePlan(Plan plan) throws Exception {
if (plan == null) {
throw new IllegalArgumentException(“The plan may not be null.”);
}

JobWithJars p = new JobWithJars(plan, this.jarFiles, this.globalClasspaths);
return executePlanWithJars(p);
}

public JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception {
if (program == null) {
throw new IllegalArgumentException(“The job may not be null.”);
}

synchronized (this.lock) {
// check if we start a session dedicated for this execution
final boolean shutDownAtEnd;

if (client == null) {
shutDownAtEnd = true;
// start the executor for us
start();
}
else {
// we use the existing session
shutDownAtEnd = false;
}

try {
return client.run(program, defaultParallelism).getJobExecutionResult();
}
finally {
if (shutDownAtEnd) {
stop();
}
}
}
}

//……
}
RemoteExecutor 的 executePlan 调用了 executePlanWithJars 方法,而后者则调用了 ClusterClient 的 run,并在参数中指定了 defaultParallelism
ClusterClient
flink-clients_2.11-1.7.1-sources.jar!/org/apache/flink/client/program/ClusterClient.java
public abstract class ClusterClient<T> {
//……

public JobSubmissionResult run(JobWithJars program, int parallelism) throws ProgramInvocationException {
return run(program, parallelism, SavepointRestoreSettings.none());
}

public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings)
throws CompilerException, ProgramInvocationException {
ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
if (classLoader == null) {
throw new IllegalArgumentException(“The given JobWithJars does not provide a usercode class loader.”);
}

OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings);
}

private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism)
throws CompilerException, ProgramInvocationException {
return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
}

public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
Logger log = LoggerFactory.getLogger(ClusterClient.class);

if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
log.debug(“Changing plan default parallelism from {} to {}”, p.getDefaultParallelism(), parallelism);
p.setDefaultParallelism(parallelism);
}
log.debug(“Set parallelism {}, plan default parallelism {}”, parallelism, p.getDefaultParallelism());

return compiler.compile(p);
}

//……
}
ClusterClient 的 run 方法中的 parallelism 在 parallelism > 0 以及 p.getDefaultParallelism() <= 0 的时候会作用到 Plan 中
DataStreamSource
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@Public
public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {

boolean isParallel;

public DataStreamSource(StreamExecutionEnvironment environment,
TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
boolean isParallel, String sourceName) {
super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));

this.isParallel = isParallel;
if (!isParallel) {
setParallelism(1);
}
}

public DataStreamSource(SingleOutputStreamOperator<T> operator) {
super(operator.environment, operator.getTransformation());
this.isParallel = true;
}

@Override
public DataStreamSource<T> setParallelism(int parallelism) {
if (parallelism != 1 && !isParallel) {
throw new IllegalArgumentException(“Source: ” + transformation.getId() + ” is not a parallel source”);
} else {
super.setParallelism(parallelism);
return this;
}
}
}
DataStreamSource 继承了 SingleOutputStreamOperator,它提供了 setParallelism 方法,最终调用的是父类 SingleOutputStreamOperator 的 setParallelism
SingleOutputStreamOperator
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@Public
public class SingleOutputStreamOperator<T> extends DataStream<T> {
//……

/**
* Sets the parallelism for this operator.
*
* @param parallelism
* The parallelism for this operator.
* @return The operator with set parallelism.
*/
public SingleOutputStreamOperator<T> setParallelism(int parallelism) {
Preconditions.checkArgument(canBeParallel() || parallelism == 1,
“The parallelism of non parallel operator must be 1.”);

transformation.setParallelism(parallelism);

return this;
}

//……
}
SingleOutputStreamOperator 的 setParallelism 最后是作用到 StreamTransformation
DataStreamSink
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@Public
public class DataStreamSink<T> {

private final SinkTransformation<T> transformation;

//……

/**
* Sets the parallelism for this sink. The degree must be higher than zero.
*
* @param parallelism The parallelism for this sink.
* @return The sink with set parallelism.
*/
public DataStreamSink<T> setParallelism(int parallelism) {
transformation.setParallelism(parallelism);
return this;
}

//……
}
DataStreamSink 提供了 setParallelism 方法,最后是作用于 SinkTransformation
小结

flink 可以设置好几个 level 的 parallelism,其中包括 Operator Level、Execution Environment Level、Client Level、System Level
在 flink-conf.yaml 中通过 parallelism.default 配置项给所有 execution environments 指定系统级的默认 parallelism;在 ExecutionEnvironment 里头可以通过 setParallelism 来给 operators、data sources、data sinks 设置默认的 parallelism;如果 operators、data sources、data sinks 自己有设置 parallelism 则会覆盖 ExecutionEnvironment 设置的 parallelism
ExecutionEnvironment 提供的 setParallelism 方法用于给 ExecutionConfig 指定 parallelism(如果使用 CLI client,可以在命令行调用是用 - p 来指定,或者 Java/Scala 调用时在 Client.run 的参数中指定 parallelism;LocalEnvironment 及 RemoteEnvironment 设置的 parallelism 最后都是设置到 Plan 中);DataStreamSource 继承了 SingleOutputStreamOperator,它提供了 setParallelism 方法,最终调用的是父类 SingleOutputStreamOperator 的 setParallelism;SingleOutputStreamOperator 的 setParallelism 最后是作用到 StreamTransformation;DataStreamSink 提供了 setParallelism 方法,最后是作用于 SinkTransformation

doc
Parallel Execution

正文完
 0