Flink 源码剖析之 Client 解析流程剖析
抛出问题
首先来思考一个问题,咱们在提交 Flink 工作的时候,在 Flink 客户端执行了上面的命令后,Flink 客户端做了哪些事件?又是怎么执行咱们本人写的代码的?上面就来一层一层的揭开 flink-client 的神秘面纱。
flink run -d -m yarn-cluster \
-Dyarn.application.name=FlinkStreamingNewDemoHome \
-Dyarn.application.queue=flink \
-Dmetrics.reporter.promgateway.groupingKey="jobname=FlinkStreamingNewDemoHome" \
-Dmetrics.reporter.promgateway.jobName=FlinkStreamingNewDemoHome \
-c flink.stream.FlinkStreamingNewDemo \
-Denv.java.opts="-Dflink_job_name=FlinkStreamingNewDemoHome" \
/home/jason/bigdata/jar/flink-1.14.0-1.0-SNAPSHOT.jar
要解答这个问题, 就先要弄明确,当执行下面命令的时候,实际上底层是在执行哪些代码?咱们能够通过查看 flink 脚本找到答案。
# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList"$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
你会发现脚本的最初一行,实际上是通过 Java 命令执行 org.apache.flink.client.cli.CliFrontend 这个对象的,而后把下面的一大堆 Flink 命令当成参数传入到 main 办法里, 咱们先在 IDEA 外面找到对应的代码。
CliFrontend
能够看到 CliFrontend 这个类是位于 flink-clients 模块下的,接着来看一下 CliFrontend 类的几个重要成员变量。
间接找到 CliFrontend#main 办法。
CliFrontend#main 源码剖析
/** Submits the job based on the arguments. */
public static void main(final String[] args) {
// 获取 JVM 信息、hadoop 信息等打印日志
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory
// 获取 flink 的配置文件门路 即: flink/conf/flink-conf.yaml
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration
// 解析并加载 flink-conf.yaml 配置文件中的配置到 Configuration(实质上是一个 Map)
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines
// 初始化 3 种不同的 CLI 别离是 GenericCLI 对应的是 per-job 模式,flinkYarnSessionCLI 对应的是 yarn-session 模式,以及 DefaultCLI 对应的是 standalone 模式
final List<CustomCommandLine> customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
int retCode = 31;
try {
// 初始化 CliFrontend 客户端对象
final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
// 调用 parseAndRun 执行
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
} catch (Throwable t) {
final Throwable strippedThrowable =
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("Fatal error while running command line interface.", strippedThrowable);
strippedThrowable.printStackTrace();} finally {System.exit(retCode);
}
}
main 办法的代码逻辑十分清晰,大抵能够分为上面 5 个步骤:
- 获取 flink 的配置文件门路 即: flink/conf/flink-conf.yaml
- 解析并加载 flink-conf.yaml 配置文件中的配置到 Configuration(实质上是一个 Map)
- 初始化 3 种不同的 CLI 别离是 GenericCLI 对应的是 per-job 模式,flinkYarnSessionCLI 对应的是 yarn-session 模式,以及 DefaultCLI 对应的是 standalone 模式
- 初始化 CliFrontend 客户端对象
- 调用 parseAndRun 解析并执行程序
上面就来看一下每个步骤具体做了哪些事件。
flink-conf.yaml 获取配置文件源码
public static String getConfigurationDirectoryFromEnv() {String location = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
if (location != null) {if (new File(location).exists()) {return location;} else {
throw new RuntimeException(
"The configuration directory'"
+ location
+ "', specified in the'"
+ ConfigConstants.ENV_FLINK_CONF_DIR
+ "'environment variable, does not exist.");
}
} else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {location = CONFIG_DIRECTORY_FALLBACK_1;} else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {location = CONFIG_DIRECTORY_FALLBACK_2;} else {
throw new RuntimeException(
"The configuration directory was not specified."
+ "Please specify the directory containing the configuration file through the'"
+ ConfigConstants.ENV_FLINK_CONF_DIR
+ "'environment variable.");
}
return location;
}
代码比较简单,间接获取零碎环境变量的值,而后返回 flink-conf.yaml 配置文件的门路。
flink-conf.yaml 解析配置文件源码
private static Configuration loadYAMLResource(File file) {final Configuration config = new Configuration();
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(new FileInputStream(file)))) {
String line;
int lineNo = 0;
while ((line = reader.readLine()) != null) {
lineNo++;
// 1. check for comments
String[] comments = line.split("#", 2);
String conf = comments[0].trim();
// 2. get key and value
if (conf.length() > 0) {String[] kv = conf.split(":", 2);
// skip line with no valid key-value pair
if (kv.length == 1) {
LOG.warn(
"Error while trying to split key and value in configuration file"
+ file
+ ":"
+ lineNo
+ ": \""
+ line
+ "\"");
continue;
}
// 获取配置的 key: value
// 比方,jobmanager.rpc.address: storm1
// key: jobmanager.rpc.address
// value: storm1
String key = kv[0].trim();
String value = kv[1].trim();
// sanity check
if (key.length() == 0 || value.length() == 0) {
LOG.warn(
"Error after splitting key and value in configuration file"
+ file
+ ":"
+ lineNo
+ ": \""
+ line
+ "\"");
continue;
}
LOG.info("Loading configuration property: {}, {}",
key,
isSensitive(key) ? HIDDEN_CONTENT : value);
// 退出到 config,相当于是一个 map
config.setString(key, value);
}
}
} catch (IOException e) {throw new RuntimeException("Error parsing YAML configuration.", e);
}
return config;
}
loadConfiguration 办法最终会调用 loadYAMLResource 办法进行解析 flink-conf.yaml 配置文件,通过一行行的读取配置,而后把配置的 key,value 退出到 Configuration 中,Configuration 的实质就是一个 map,用来保留 flink 的配置信息。
CustomCommandLine 初始化源码
public static List<CustomCommandLine> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {List<CustomCommandLine> customCommandLines = new ArrayList<>();
customCommandLines.add(new GenericCLI(configuration, configurationDirectory));
// Command line interface of the YARN session, with a special initialization here
// to prefix all options with y/yarn.
final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
try {
customCommandLines.add(
loadCustomCommandLine(
flinkYarnSessionCLI,
configuration,
configurationDirectory,
"y",
"yarn"));
} catch (NoClassDefFoundError | Exception e) {
final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
try {LOG.info("Loading FallbackYarnSessionCli");
customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration));
} catch (Exception exception) {LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}
}
// Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get
// the
// active CustomCommandLine in order and DefaultCLI isActive always return true.
customCommandLines.add(new DefaultCLI());
return customCommandLines;
}
loadCustomCommandLines 次要是用来初始化 CustomCommandLine 的,返回一个 CustomCommandLine 的汇合。
这里次要有三种不同的 CustomCommandLine 实现类,别离是 GenericCLI,FlinkYarnSessionCli,DefaultCLI。
三种不同的实现对应三种不同的模式,GenericCLI 对应的是 per-job 模式,flinkYarnSessionCLI 对应的是 yarn-session 模式,以及 DefaultCLI 对应的是 standalone 模式。
CliFrontend 初始化源码
// 初始化 CliFrontend 客户端对象
final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
public CliFrontend(
Configuration configuration,
ClusterClientServiceLoader clusterClientServiceLoader,
List<CustomCommandLine> customCommandLines) {this.configuration = checkNotNull(configuration);
this.customCommandLines = checkNotNull(customCommandLines);
this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
this.customCommandLineOptions = new Options();
for (CustomCommandLine customCommandLine : customCommandLines) {customCommandLine.addGeneralOptions(customCommandLineOptions);
customCommandLine.addRunOptions(customCommandLineOptions);
}
this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
}
通过下面第二步和第三步获取到的 configuration 和 customCommandLines 信息初始化 CliFrontend 对象。
parseAndRun 解析并运行程序源码解析
public int parseAndRun(String[] args) {
// check for action
if (args.length < 1) {CliFrontendParser.printHelp(customCommandLines);
System.out.println("Please specify an action.");
return 1;
}
// get action
// 其实这里就是 run
String action = args[0];
// remove action from parameters
final String[] params = Arrays.copyOfRange(args, 1, args.length);
try {
// do action
switch (action) {
case ACTION_RUN:
// 所以会走到这里
run(params);
return 0;
case ACTION_RUN_APPLICATION:
runApplication(params);
return 0;
case ACTION_LIST:
list(params);
return 0;
case ACTION_INFO:
info(params);
return 0;
case ACTION_CANCEL:
cancel(params);
return 0;
case ACTION_STOP:
stop(params);
return 0;
case ACTION_SAVEPOINT:
savepoint(params);
return 0;
case "-h":
case "--help":
CliFrontendParser.printHelp(customCommandLines);
return 0;
case "-v":
case "--version":
String version = EnvironmentInformation.getVersion();
String commitID = EnvironmentInformation.getRevisionInformation().commitId;
System.out.print("Version:" + version);
System.out.println(commitID.equals(EnvironmentInformation.UNKNOWN)
? "":", Commit ID: " + commitID);
return 0;
default:
System.out.printf("\"%s\"is not a valid action.\n", action);
System.out.println();
System.out.println("Valid actions are \"run\", \"run-application\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
System.out.println();
System.out.println("Specify the version option (-v or --version) to print Flink version.");
System.out.println();
System.out.println("Specify the help option (-h or --help) to get help on the command.");
return 1;
}
} catch (CliArgsException ce) {return handleArgException(ce);
} catch (ProgramParametrizationException ppe) {return handleParametrizationException(ppe);
} catch (ProgramMissingJobException pmje) {return handleMissingJobException();
} catch (Exception e) {return handleError(e);
}
}
后面 4 个步骤都是在做一些筹备工作,最初一步才是真正开始执行程序,因为咱们执行的是 flink run 命令,所以会走到 run(params) 办法外面。
run(params) 源码
protected void run(String[] args) throws Exception {LOG.info("Running'run'command.");
// 获取所有的 flink 命令
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
// 获取输出参数外面的 flink 命令
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
// evaluate help flag
// 如果是 help 打印帮忙命令信息
if (commandLine.hasOption(HELP_OPTION.getOpt())) {CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
// 获取处于 active 状态的 CLI
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
// 构建 ProgramOptions 对象
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
// 获取用户提交的 jar 包和依赖包
final List<URL> jobJars = getJobJarAndDependencies(programOptions);
final Configuration effectiveConfiguration =
getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);
LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
// 真正的执行程序
executeProgram(effectiveConfiguration, program);
}
}
首先会获取 flink 所有的 options, 而后在获取咱们输出的 flink 命令。如果有 h 的话就会打印 help 信息。接下来会获取处于 active 状态的 CustomCommandLine,这里获取到的应该是 GenericCLI。而后获取用户提交的 jar 包和依赖包,最初调用 executeProgram 开始真正的执行程序。
public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout)
throws ProgramInvocationException {checkNotNull(executorServiceLoader);
// 获取用户代码的类加载器, 默认状况下是 ChildFirstClassLoader 这个能够在配置文件外面配置
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
// 获取以后线程的类加载器
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
// 把以后线程的类加载器设置为 ChildFirstClassLoader
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
LOG.info("Starting program (detached: {})",
!configuration.getBoolean(DeploymentOptions.ATTACHED));
// 初始化上下文的配置信息 ContextEnvironment
ContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
// 初始化 StreamContextEnvironment
StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
try {
// 通过反射去执行用户编写的代码
program.invokeInteractiveModeForExecution();} finally {ContextEnvironment.unsetAsContext();
StreamContextEnvironment.unsetAsContext();}
} finally {
// 最初在把类加载器切换回去
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
先是会获取用户代码的类加载器, 默认状况下是 ChildFirstClassLoader 这个能够在 flink-conf.yaml 配置文件外面配置
#Flink 的类加载策略
classloader.resolve-order: child-first/parent-first
这里有一个十分有意思的中央是,获取以后线程的类加载器 contextClassLoader , 而后把以后线程的类加载器设置为 ChildFirstClassLoader 或者 ParentFirstClassLoader 紧接着初始化 ContextEnvironment 和 StreamContextEnvironment 的上下文配置信息,最终通过反射的形式调用 invokeInteractiveModeForExecution 办法,也就是在执行用户的代码,留神最初在 finally 外面又把线程的类加载器切换到了之前的 contextClassLoader,相当于做了一个线程类加载器的切换,也就是通过这种形式,实现了用户代码和 flink 框架代码不发生冲突。
callMainMethod 源码
private static void callMainMethod(Class<?> entryClass, String[] args)
throws ProgramInvocationException {
Method mainMethod;
if (!Modifier.isPublic(entryClass.getModifiers())) {
throw new ProgramInvocationException("The class" + entryClass.getName() + "must be public.");
}
try {
// 获取到用户编写代码类的 main 办法
mainMethod = entryClass.getMethod("main", String[].class);
} catch (NoSuchMethodException e) {
throw new ProgramInvocationException("The class" + entryClass.getName() + "has no main(String[]) method.");
} catch (Throwable t) {
throw new ProgramInvocationException("Could not look up the main(String[]) method from the class"
+ entryClass.getName()
+ ":"
+ t.getMessage(),
t);
}
if (!Modifier.isStatic(mainMethod.getModifiers())) {
throw new ProgramInvocationException("The class" + entryClass.getName() + "declares a non-static main method.");
}
if (!Modifier.isPublic(mainMethod.getModifiers())) {
throw new ProgramInvocationException("The class" + entryClass.getName() + "declares a non-public main method.");
}
try {
// 调用 invoke 办法的时候就会走到用户代码的 main 办法外面
mainMethod.invoke(null, (Object) args);
} catch (IllegalArgumentException e) {
throw new ProgramInvocationException("Could not invoke the main method, arguments are not matching.", e);
} catch (IllegalAccessException e) {
throw new ProgramInvocationException("Access to the main method was denied:" + e.getMessage(), e);
} catch (InvocationTargetException e) {Throwable exceptionInMethod = e.getTargetException();
if (exceptionInMethod instanceof Error) {throw (Error) exceptionInMethod;
} else if (exceptionInMethod instanceof ProgramParametrizationException) {throw (ProgramParametrizationException) exceptionInMethod;
} else if (exceptionInMethod instanceof ProgramInvocationException) {throw (ProgramInvocationException) exceptionInMethod;
} else {
throw new ProgramInvocationException("The main method caused an error:" + exceptionInMethod.getMessage(),
exceptionInMethod);
}
} catch (Throwable t) {
throw new ProgramInvocationException("An error occurred while invoking the program's main method: " + t.getMessage(),
t);
}
}
最终调用的是 callMainMethod 办法。首先会判断该类是否是 public 的,如果是的话,会获取到 main 办法,而后再次判断 mainMethod 是否是 public static 润饰的,都满足条件的话,最初调用 invoke 办法,这个时候就会来到用户本人的代码,比方下面提交的代码是 flink.stream.FlinkStreamingNewDemo 那么就会执行 FlinkStreamingNewDemo 类的 main 办法。通过下面一系列的解析配置,初始化最终终于走到咱们的代码外面了。
本文由 mdnice 多平台公布