1. 简介
SqlClient 是 Flink 提供的一个 SQL 命令行交互工具,在下载 flink 二进制包时,在其 bin 目录下有一个 sql-client.sh,通过启动该脚本就能够进入交互页面。SqlClient 的具体源码实现在 flink-table 模块的 flink-sql-client 子模块下能够找到,其启动函数在 org/apache/flink/table/client/SqlClient.java
中,该启动函数在创立好交互环境后,会调用 CliClient 的 open 函数,进入一个死循环:
public void open() {
isRunning = true;
// print welcome
terminal.writer().append(CliStrings.MESSAGE_WELCOME);
// begin reading loop
while (isRunning) {
// make some space to previous command
terminal.writer().append("\n");
terminal.flush();
final String line;
try {
// 1. 读取用户输出(以“;”为终结符)line = lineReader.readLine(prompt, null, (MaskingCallback) null, null);
} catch (UserInterruptException e) {
// user cancelled line with Ctrl+C
continue;
} catch (EndOfFileException | IOError e) {
// user cancelled application with Ctrl+D or kill
break;
} catch (Throwable t) {throw new SqlClientException("Could not read from command line.", t);
}
if (line == null) {continue;}
// 2. 调用 parseCommand 解析用户输出,获取相应的命令
final Optional<SqlCommandCall> cmdCall = parseCommand(line);
// 3. 调用 callCommand 执行命令
cmdCall.ifPresent(this::callCommand);
}
}
2. 解析命令—parseCommand
2.1 SqlCommandCall
SqlCommandCall 是 SqlCommandParser 的外部类,定义如下:
public static class SqlCommandCall {
public final SqlCommand command;
public final String[] operands;}
其中 SqlCommand 是一个枚举类,这个枚举类申明了每种 SQL 操作类型,除此之外还有一个作用:用于做 SQL 命令的正则匹配。SqlCommand 为每种操作类型都定义了两个字段:
- String pattern:用于匹配 Sql 的正则表达式;
- Function<String[], Optional\<String>[]> operandConverter:函数式接口,将下面应用正则匹配到的后果进一步解析转换,转换的后果最终会存到 operands 里。
如上所述,operands 用于存储 SQL 命令的解析后果。
2.2 解析入口
public static SqlCommandCall parse(Parser sqlParser, String stmt) {
// normalize
stmt = stmt.trim();
// remove ';' at the end
if (stmt.endsWith(";")) {stmt = stmt.substring(0, stmt.length() - 1).trim();}
// parse statement via regex matching first
Optional<SqlCommandCall> callOpt = parseByRegexMatching(stmt);
if (callOpt.isPresent()) {return callOpt.get();
} else {return parseBySqlParser(sqlParser, stmt);
}
}
Flink 首先尝试调用 parseByRegexMatching 对 Sql 进行正则匹配,若失败了再调用 parseBySqlParser 做 SQL(底层应用了 Calcite 框架)解析。笔者初读时不太了解为什么会有两套计划做解析,前面据说是最后都是用正则进行匹配,但因为 Flink SQL 要解析成逻辑执行打算自身就应用了 Calcite 作为 Parser,为了设计上的对立,就减少了 SQL Parser 的形式。但目前 Flink 仍有一部分与数据操作无关的命令保留了正则匹配,如 QUIT、EXIT、HELP、SET 等命令。
在 [Flink-17893] 之前,SqlClient 的解析程序与当初相同,会先应用 Sql Parser 尝试解析,解析失败后再应用正则。
2.3 parseByRegexMatching
parseByRegexMatching 的具体流程如下:
- 遍历 SqlCommand 枚举类的每个成员,如果指定了 pattern,则尝试进行正则匹配;
- 若匹配胜利,则调用 operandConverter 进行解析转换;
- 否则间接返回 Optional.empty()。
2.4 parseBySqlParser
private static SqlCommandCall parseBySqlParser(Parser sqlParser, String stmt) {
List<Operation> operations;
try {operations = sqlParser.parse(stmt);
} catch (Throwable e) {throw new SqlExecutionException("Invalidate SQL statement.", e);
}
if (operations.size() != 1) {throw new SqlExecutionException("Only single statement is supported now.");
}
final SqlCommand cmd;
String[] operands = new String[] {stmt};
Operation operation = operations.get(0);
if (operation instanceof CatalogSinkModifyOperation) {boolean overwrite = ((CatalogSinkModifyOperation) operation).isOverwrite();
cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO;
} else if (operation instanceof CreateTableOperation) {cmd = SqlCommand.CREATE_TABLE;}
// 省略其余的分支状况
return new SqlCommandCall(cmd, operands);
}
首先调用 Parser 解析 SQL 文本,这部分底层应用了 Calcite 框架进行词法、语法解析,将 SQL 文本转成了 SqlNode,再通过 SqlToOperationConverter 转成了 Operation。
之后则是依据 Operation 的具体类型转成相应的 SqlCommand,并从 Operation 中抽取具体的 operands 信息,封装成 SqlCommandCall 返回。
这部分逻辑能够简要概括为这四步转换:String -> SqlNode -> Opreration -> SqlCommand。
3. 执行命令—callCommand
callCommand 的实质就是查看 SqlCommandCall 里的 command 类型,并进入不同的分支执行逻辑。
private void callCommand(SqlCommandCall cmdCall) {switch (cmdCall.command) {
case QUIT:
callQuit();
break;
// ......
case SELECT:
callSelect(cmdCall);
break;
case INSERT_INTO:
case INSERT_OVERWRITE:
callInsert(cmdCall);
break;
// ......
default:
throw new SqlClientException("Unsupported command:" + cmdCall.command);
}
}
从 callSelect 触发,层层探索,能够失去调用链:CliClient.callSelect => LocalExecutor.executeQuery => LocalExecutor.executeQueryInternal,查看 executeQueryInternal 的源码如下:
private <C> ResultDescriptor executeQueryInternal(String sessionId, ExecutionContext<C> context, String query) {
// create table
final Table table = createTable(context, context.getTableEnvironment(), query);
// TODO refactor this after Table#execute support all kinds of changes
// initialize result
final DynamicResult<C> result =
resultStore.createResult(context.getEnvironment(),
removeTimeAttributes(table.getSchema()),
context.getExecutionConfig());
final String jobName = sessionId + ":" + query;
final String tableName = String.format("_tmp_table_%s", Math.abs(query.hashCode()));
final Pipeline pipeline;
try {
// writing to a sink requires an optimization step that might reference UDFs during code
// compilation
context.wrapClassLoader(() -> {((TableEnvironmentInternal) context.getTableEnvironment())
.registerTableSinkInternal(tableName, result.getTableSink());
table.insertInto(tableName);
});
pipeline = context.createPipeline(jobName);
} catch (Throwable t) {
// the result needs to be closed as long as
// it not stored in the result store
result.close();
// catch everything such that the query does not crash the executor
throw new SqlExecutionException("Invalid SQL query.", t);
} finally {
// Remove the temporal table object.
context.wrapClassLoader(() -> {context.getTableEnvironment().dropTemporaryTable(tableName);
});
}
// create a copy so that we can change settings without affecting the original config
Configuration configuration = new Configuration(context.getFlinkConfig());
// for queries we wait for the job result, so run in attached mode
configuration.set(DeploymentOptions.ATTACHED, true);
// shut down the cluster if the shell is closed
configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true);
// create execution
final ProgramDeployer deployer =
new ProgramDeployer(configuration, jobName, pipeline, context.getClassLoader());
JobClient jobClient;
// wrap in classloader because CodeGenOperatorFactory#getStreamOperatorClass
// requires to access UDF in deployer.deploy().
jobClient =
context.wrapClassLoader(() -> {
try {
// blocking deployment
return deployer.deploy().get();
} catch (Exception e) {throw new SqlExecutionException("Error while submitting job.", e);
}
});
String jobId = jobClient.getJobID().toString();
// store the result under the JobID
resultStore.storeResult(jobId, result);
// start result retrieval
result.startRetrieval(jobClient);
return new ResultDescriptor(
jobId,
removeTimeAttributes(table.getSchema()),
result.isMaterialized(),
context.getEnvironment().getExecution().isTableauMode());
}
- 创立 Table 用于存储后果 Schema;
- 创立 Pipeline,这一步也是最外围的,会将 SQL 转为具体的 StreamGraph;
- 创立 JobClient 并部署该作业;
- 启动一个线程 ResultRetrievalThread 一直获取新的后果,并更新本地队列;
- 返回后果标识符。
进一步查看创立 Pipeline 的代码 pipeline = context.createPipeline(jobName)
:
public Pipeline getPipeline(String jobName) {return execEnv.createPipeline(translateAndClearBuffer(), tableConfig, jobName);
}
translateAndClearBuffer 次要是负责将缓存在 TableEnvironmentImpl 的 List\<ModifyOperation> 转换成 List\<Transformation>,这部分细节此节暂且不提,而 createPipeline 则是将这些 Transformation 拼成一个 StreamGraph:
public Pipeline createPipeline(List<Transformation<?>> transformations, TableConfig tableConfig, String jobName) {
StreamGraph streamGraph =
ExecutorUtils.generateStreamGraph(getExecutionEnvironment(), transformations);
streamGraph.setJobName(getNonEmptyJobName(jobName));
return streamGraph;
}
查看 ExecutorUtils.generateStreamGraph 如下,它创立了一个 StreamGraphGenerator,并调用其 generate 办法,从这里开始就和 DataStream 当中提到的 StreamGraph 生成的逻辑完全一致了,也就串联起来了 SQL 和 Transformation。
public static StreamGraph generateStreamGraph(StreamExecutionEnvironment execEnv, List<Transformation<?>> transformations) {if (transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot generate StreamGraph.");
}
StreamGraphGenerator generator =
new StreamGraphGenerator(transformations, execEnv.getConfig(), execEnv.getCheckpointConfig())
.setStateBackend(execEnv.getStateBackend())
.setChaining(execEnv.isChainingEnabled())
.setUserArtifacts(execEnv.getCachedFiles())
.setTimeCharacteristic(execEnv.getStreamTimeCharacteristic())
.setDefaultBufferTimeout(execEnv.getBufferTimeout());
return generator.generate();}