1. 简介
SqlClient 是 Flink 提供的一个 SQL 命令行交互工具,在下载 flink 二进制包时,在其 bin 目录下有一个 sql-client.sh,通过启动该脚本就能够进入交互页面。SqlClient 的具体源码实现在 flink-table 模块的 flink-sql-client 子模块下能够找到,其启动函数在 org/apache/flink/table/client/SqlClient.java
中,该启动函数在创立好交互环境后,会调用 CliClient 的 open 函数,进入一个死循环:
public void open() { isRunning = true; // print welcome terminal.writer().append(CliStrings.MESSAGE_WELCOME); // begin reading loop while (isRunning) { // make some space to previous command terminal.writer().append("\n"); terminal.flush(); final String line; try { // 1. 读取用户输出(以“;”为终结符) line = lineReader.readLine(prompt, null, (MaskingCallback) null, null); } catch (UserInterruptException e) { // user cancelled line with Ctrl+C continue; } catch (EndOfFileException | IOError e) { // user cancelled application with Ctrl+D or kill break; } catch (Throwable t) { throw new SqlClientException("Could not read from command line.", t); } if (line == null) { continue; } // 2. 调用 parseCommand 解析用户输出,获取相应的命令 final Optional<SqlCommandCall> cmdCall = parseCommand(line); // 3. 调用 callCommand 执行命令 cmdCall.ifPresent(this::callCommand); }}
2. 解析命令—parseCommand
2.1 SqlCommandCall
SqlCommandCall 是 SqlCommandParser 的外部类,定义如下:
public static class SqlCommandCall { public final SqlCommand command; public final String[] operands;}
其中 SqlCommand 是一个枚举类,这个枚举类申明了每种 SQL 操作类型,除此之外还有一个作用:用于做 SQL 命令的正则匹配。SqlCommand 为每种操作类型都定义了两个字段:
- String pattern:用于匹配 Sql 的正则表达式;
- Function<String[], Optional\<String>[]> operandConverter:函数式接口,将下面应用正则匹配到的后果进一步解析转换,转换的后果最终会存到 operands 里。
如上所述,operands 用于存储 SQL 命令的解析后果。
2.2 解析入口
public static SqlCommandCall parse(Parser sqlParser, String stmt) { // normalize stmt = stmt.trim(); // remove ';' at the end if (stmt.endsWith(";")) { stmt = stmt.substring(0, stmt.length() - 1).trim(); } // parse statement via regex matching first Optional<SqlCommandCall> callOpt = parseByRegexMatching(stmt); if (callOpt.isPresent()) { return callOpt.get(); } else { return parseBySqlParser(sqlParser, stmt); }}
Flink 首先尝试调用 parseByRegexMatching 对 Sql 进行正则匹配,若失败了再调用 parseBySqlParser 做 SQL(底层应用了 Calcite 框架)解析。笔者初读时不太了解为什么会有两套计划做解析,前面据说是最后都是用正则进行匹配,但因为 Flink SQL 要解析成逻辑执行打算自身就应用了 Calcite 作为 Parser,为了设计上的对立,就减少了 SQL Parser 的形式。但目前 Flink 仍有一部分与数据操作无关的命令保留了正则匹配,如 QUIT、EXIT、HELP、SET 等命令。
在 [Flink-17893] 之前,SqlClient 的解析程序与当初相同,会先应用 Sql Parser 尝试解析,解析失败后再应用正则。
2.3 parseByRegexMatching
parseByRegexMatching 的具体流程如下:
- 遍历 SqlCommand 枚举类的每个成员,如果指定了 pattern,则尝试进行正则匹配;
- 若匹配胜利,则调用 operandConverter 进行解析转换;
- 否则间接返回 Optional.empty()。
2.4 parseBySqlParser
private static SqlCommandCall parseBySqlParser(Parser sqlParser, String stmt) { List<Operation> operations; try { operations = sqlParser.parse(stmt); } catch (Throwable e) { throw new SqlExecutionException("Invalidate SQL statement.", e); } if (operations.size() != 1) { throw new SqlExecutionException("Only single statement is supported now."); } final SqlCommand cmd; String[] operands = new String[] {stmt}; Operation operation = operations.get(0); if (operation instanceof CatalogSinkModifyOperation) { boolean overwrite = ((CatalogSinkModifyOperation) operation).isOverwrite(); cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO; } else if (operation instanceof CreateTableOperation) { cmd = SqlCommand.CREATE_TABLE; } // 省略其余的分支状况 return new SqlCommandCall(cmd, operands);}
首先调用 Parser 解析 SQL 文本,这部分底层应用了 Calcite 框架进行词法、语法解析,将 SQL 文本转成了 SqlNode,再通过 SqlToOperationConverter 转成了 Operation。
之后则是依据 Operation 的具体类型转成相应的 SqlCommand,并从 Operation 中抽取具体的 operands 信息,封装成 SqlCommandCall 返回。
这部分逻辑能够简要概括为这四步转换:String -> SqlNode -> Opreration -> SqlCommand。
3. 执行命令—callCommand
callCommand 的实质就是查看 SqlCommandCall 里的 command 类型,并进入不同的分支执行逻辑。
private void callCommand(SqlCommandCall cmdCall) { switch (cmdCall.command) { case QUIT: callQuit(); break; // ...... case SELECT: callSelect(cmdCall); break; case INSERT_INTO: case INSERT_OVERWRITE: callInsert(cmdCall); break; // ...... default: throw new SqlClientException("Unsupported command: " + cmdCall.command); }}
从 callSelect 触发,层层探索,能够失去调用链:CliClient.callSelect => LocalExecutor.executeQuery => LocalExecutor.executeQueryInternal,查看 executeQueryInternal 的源码如下:
private <C> ResultDescriptor executeQueryInternal( String sessionId, ExecutionContext<C> context, String query) { // create table final Table table = createTable(context, context.getTableEnvironment(), query); // TODO refactor this after Table#execute support all kinds of changes // initialize result final DynamicResult<C> result = resultStore.createResult( context.getEnvironment(), removeTimeAttributes(table.getSchema()), context.getExecutionConfig()); final String jobName = sessionId + ": " + query; final String tableName = String.format("_tmp_table_%s", Math.abs(query.hashCode())); final Pipeline pipeline; try { // writing to a sink requires an optimization step that might reference UDFs during code // compilation context.wrapClassLoader( () -> { ((TableEnvironmentInternal) context.getTableEnvironment()) .registerTableSinkInternal(tableName, result.getTableSink()); table.insertInto(tableName); }); pipeline = context.createPipeline(jobName); } catch (Throwable t) { // the result needs to be closed as long as // it not stored in the result store result.close(); // catch everything such that the query does not crash the executor throw new SqlExecutionException("Invalid SQL query.", t); } finally { // Remove the temporal table object. context.wrapClassLoader( () -> { context.getTableEnvironment().dropTemporaryTable(tableName); }); } // create a copy so that we can change settings without affecting the original config Configuration configuration = new Configuration(context.getFlinkConfig()); // for queries we wait for the job result, so run in attached mode configuration.set(DeploymentOptions.ATTACHED, true); // shut down the cluster if the shell is closed configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true); // create execution final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline, context.getClassLoader()); JobClient jobClient; // wrap in classloader because CodeGenOperatorFactory#getStreamOperatorClass // requires to access UDF in deployer.deploy(). jobClient = context.wrapClassLoader( () -> { try { // blocking deployment return deployer.deploy().get(); } catch (Exception e) { throw new SqlExecutionException("Error while submitting job.", e); } }); String jobId = jobClient.getJobID().toString(); // store the result under the JobID resultStore.storeResult(jobId, result); // start result retrieval result.startRetrieval(jobClient); return new ResultDescriptor( jobId, removeTimeAttributes(table.getSchema()), result.isMaterialized(), context.getEnvironment().getExecution().isTableauMode());}
- 创立 Table 用于存储后果 Schema;
- 创立 Pipeline,这一步也是最外围的,会将 SQL 转为具体的 StreamGraph;
- 创立 JobClient 并部署该作业;
- 启动一个线程 ResultRetrievalThread 一直获取新的后果,并更新本地队列;
- 返回后果标识符。
进一步查看创立 Pipeline 的代码 pipeline = context.createPipeline(jobName)
:
public Pipeline getPipeline(String jobName) { return execEnv.createPipeline(translateAndClearBuffer(), tableConfig, jobName);}
translateAndClearBuffer 次要是负责将缓存在 TableEnvironmentImpl 的 List\<ModifyOperation> 转换成 List\<Transformation>,这部分细节此节暂且不提,而 createPipeline 则是将这些 Transformation 拼成一个 StreamGraph:
public Pipeline createPipeline( List<Transformation<?>> transformations, TableConfig tableConfig, String jobName) { StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(getExecutionEnvironment(), transformations); streamGraph.setJobName(getNonEmptyJobName(jobName)); return streamGraph;}
查看 ExecutorUtils.generateStreamGraph 如下,它创立了一个 StreamGraphGenerator,并调用其 generate 办法,从这里开始就和 DataStream 当中提到的 StreamGraph 生成的逻辑完全一致了,也就串联起来了 SQL 和 Transformation。
public static StreamGraph generateStreamGraph( StreamExecutionEnvironment execEnv, List<Transformation<?>> transformations) { if (transformations.size() <= 0) { throw new IllegalStateException( "No operators defined in streaming topology. Cannot generate StreamGraph."); } StreamGraphGenerator generator = new StreamGraphGenerator( transformations, execEnv.getConfig(), execEnv.getCheckpointConfig()) .setStateBackend(execEnv.getStateBackend()) .setChaining(execEnv.isChainingEnabled()) .setUserArtifacts(execEnv.getCachedFiles()) .setTimeCharacteristic(execEnv.getStreamTimeCharacteristic()) .setDefaultBufferTimeout(execEnv.getBufferTimeout()); return generator.generate();}