关于后端:PolarDBX源码解读系列DML之Insert流程

8次阅读

共计 9347 个字符,预计需要花费 24 分钟才能阅读完成。

简介:Insert 类的 SQL 语句的流程可初略分为:解析、校验、优化器、执行器、物理执行 (GalaxyEngine 执行)。本文将以一条简略的 Insert 语句通过调试的形式进行解读。在浏览本文之前,强烈建议先浏览《PolarDB- X 源码解读系列:SQL 的毕生》,可能理解一条 SQL 的执行流程,也能晓得 GalaxySQL(CN) 的各个组件,而后再浏览本文,理解 Insert 的具体实现过程,加深各个组件的了解。Insert 类的 SQL 语句的流程可初略分为:解析、校验、优化器、执行器、物理执行(GalaxyEngine 执行)。本文将以一条简略的 Insert 语句通过调试的形式进行解读。建表语句:#一个简略的 PolarDB- X 中的分库分表 sbtestCREATE TABLE sbtest (id int(11) NOT NULL AUTO_INCREMENT,k int(11) NOT NULL DEFAULT ‘0’,c char(120) NOT NULL DEFAULT ”,pad char(60) NOT NULL DEFAULT ”,PRIMARY KEY (id))dbpartition by hash(id) tbpartition by hash(id) tbpartitions 2; #调试语句 insert into sbtest(id) values(100); 解析 连贯上 PolarDB- X 后,执行一条 Insert 语句 insert into sbtest(id) values(100);PolarDB- X 接管到该字符串语句后,开始执行该 SQL,可见 TConnection#executeSQL:

筹备执行该 SQL 语句,ExecutionContext 会保留该 Sql 执行的参数、配置、等上下文信息,该变量会始终陪伴该 Sql 通过解析、校验、优化器、执行器,直到下发给 GalaxyEngine(DN)。PolarDB- X 执行该 SQL 时,须要先获取执行打算,可见代码 TConnection#executeQuery:ExecutionPlan plan = Planner.getInstance().plan(sql, executionContext); 为了防止执行同一条 SQL 每次都要解析、校验、优化器等操作,PolarDB- X 内置了 PlanCache,会在 PlanCache 中获取该 SQL 的执行打算,当然,并不是依据纯字符串 SQL 来进行缓存,而是生成 SqlParameterized,如下图所示(Planner#plan),真正缓存的是 sql 模板,该类中的 sql 变量:INSERT INTO sbtest (id)\nVALUES (?),可实用于相似的语句,?代表可填入的值,不同的值都是同一类 SQL 语句。

如果 PlanCache 找不到的话,须要生成新的执行打算,具体代码见 PlanCache#getFromCache:

先将字符串通过 FastsqlParser 解析成形象语法树,查看有没有语法错误等,生成 SqlNode,本条 SQL 是 Insert 语句,解析成 SqlInsert 类,而后持续依据形象语法树获取执行打算,具体 SqlInsert 内容为:

简略解释几个变量:keywords:关键字,例如:Insert Ignore 语句会加 Ignore 关键字,代表该语句特色;source:数据起源,插入数据的起源,这里是 values,如果是 Insert … Select 语句,则是 select 语句;updateList:批改信息,例如:Insert … ON DUPLICATE KEY 语句会把批改信息保留在该变量;至此,实现了字符串 SQL 语句到 SqlNode 的转变,即实现了解析局部。校验 校验过程即查看 SqlNode 的语义是否正确,例如表是否存在、列是否存在、类型是否正确等等,具体入口在 Planner#getPlan 函数中:SqlNode validatedNode = converter.validate(ast); 便是验证该 SQL 的有效性,PolarDB- X 沿用了 Apache Calcite 框架,validate 的实现也是相似的大框架,蕴含 Scope 和 Namespace 两个概念,在此基础上进行验证,SqlInsert 类型的验证入口在 SqlValidatorImpl#validateInsert(SqlInsert insert)中: …final SqlValidatorNamespace targetNamespace = getNamespace(insert);validateNamespace(targetNamespace, unknownType);…final SqlNode source = insert.getSource();if (source instanceof SqlSelect) {final SqlSelect sqlSelect = (SqlSelect) source;   validateSelect(sqlSelect, targetRowType);} else {final SqlValidatorScope scope = scopes.get(source);   validateQuery(source, scope, targetRowType);}… 大体流程查看两个局部:首先,查看 insert into sbtest 语句是否正确;而后查看 SqlInsert.source 局部是否无效。本条 SQL 是 Values,所以查看 Values 是否无效,如果是 Insert …Select 语句,source 是 SqlSelect,须要查看 Select 语句是否无效。没有报错,则阐明 SQL 语句语义没有谬误,校验通过,能够发现还是 SqlInsert:

 优化器 在通过优化器之前,还须要将 SqlNode(SqlInsert)转成 RelNode,大体含意就是将 sql 语法树转成关系表达式,入口在 Planner#getPlan:RelNode relNode = converter.toRel(validatedNode, plannerContext); 具体转换过程在 SqlConverter#toRel:…final SqlToRelConverter sqlToRelConverter = new TddlSqlToRelConverter(…);RelRoot root = sqlToRelConverter.convertQuery(validatedNode, false, true);… TddlSqlToRelConverter 类是 PolarDB- X 的转换器,继承 Calcite 的 SqlToRelConverter 类,转换 SqlInsert 的执行过程在 TddlSqlToRelConverter#convertInsert(SqlInsert call):RelNode relNode = super.convertInsert(call);if (relNode instanceof TableModify) {…} 能够发现,会调用 SqlToRelConverter#convertInsert,在该办法中,会将 SqlInsert 转成 LogicalTableModify,该类的内容如下:

能够留神到几个变量:operation:操作类型;input:输出起源,本条 sql 是 values;PolarDB- X 外部还有新的本人的 RelNode,所以还会把 RelNode 再转成本人定义的 RelNode,入口在 Planner#getPlan:ToDrdsRelVisitor toDrdsRelVisitor = new ToDrdsRelVisitor(validatedNode, plannerContext);RelNode drdsRelNode = relNode.accept(toDrdsRelVisitor); 转换过程在 ToDrdsRelVisitor#visit(RelNode other):if ((other instanceof LogicalTableModify)) {…    if (operation == TableModify.Operation.INSERT || …) {LogicalInsert logicalInsert = new LogicalInsert(modify);        …    }} Insert 类型会转成 LogicalInsert,就是 PolarDB- X 外部的 RelNode,执行也是基于该类,LogicalInsert 的内容如下(还有局部变量不在截图中):

大多数变量和 LogicalTableModify 一样,新增了像 PolarDB- X 特有的 gsi 相干变量等等。而后便是通过优化器阶段,优化器执行过程代码在 Planner#sqlRewriteAndPlanEnumerate:private RelNode sqlRewriteAndPlanEnumerate(RelNode input, PlannerContext plannerContext) {CalcitePlanOptimizerTrace.getOptimizerTracer().get().addSnapshot(“Start”, input, plannerContext);//RBO 优化   RelNode logicalOutput = optimizeBySqlWriter(input, plannerContext);   CalcitePlanOptimizerTrace.getOptimizerTracer().get()       .addSnapshot(“PlanEnumerate”, logicalOutput, plannerContext);    //CBO 优化   RelNode bestPlan = optimizeByPlanEnumerator(logicalOutput, plannerContext);    // finally we should clear the planner to release memory   bestPlan.getCluster().getPlanner().clear();   bestPlan.getCluster().invalidateMetadataQuery();   return bestPlan;} Insert 的优化器次要在 RBO 过程,定义了一些规定,CBO 规定对 Insert 简直没有扭转。能够重点关注 RBO 的 OptimizeLogicalInsertRule 规定,会依据 GMS(PolarDB- X 的元数据管理)的信息来判断该 SQL 的执行打算,可能会将 LogicalInsert 转变成其它的 RelNode 去执行,不便辨别不同的 SQL 执行形式,首先会确定该 SQL 的执行策略,次要分为三种:public enum ExecutionStrategy {/    Foreach row, exists only one target partition.    Pushdown origin statement, with function call not pushable (like sequence call) replaced by RexCallParam.    Typical for single table and partitioned table without gsi.    /   PUSHDOWN,   /    Foreach row, might exists more than one target partition.    Pushdown origin statement, with nondeterministic function call replaced by RexCallParam.    Typical for broadcast table.    /   DETERMINISTIC_PUSHDOWN,   /*    Foreach row, might exists more than one target partition, and data in different target partitions might be different.    Select then execute, with all function call replaced by RexCallParam.    Typical for table with gsi or table are doing scale out.    */   LOGICAL;}; 因为本条 SQL 较为简单,策略是 PUSHDOWN,处理过程也比较简单,而后生成 InsertWriter,该类负责生成下发到 DN 的 SQL 语句,保留在 LogicalInsert 中,OptimizeLogicalInsertRule 解决规定较为细节,感兴趣的能够自行查看 onMatch 办法。通过优化器后,还是 LogicalInsert 类的 RelNode,至此,意味着优化器执行结束。最终会生成执行打算,在 PlanCache#getFromCache,见下图(图中非全副变量):

ExecutionPlan.plan 就是执行打算,能够发现是 LogicalInsert,对于简略的 Insert,PolarDB- X 还会改写执行打算,代码在 PlanCache#getFromCache:BuildFinalPlanVisitor visitor = new BuildFinalPlanVisitor(executionPlan.getAst(), plannerContext);executionPlan = executionPlan.copy(executionPlan.getPlan().accept(visitor)); insert into sbtest(id) values(100); 语句执行 BuildFinalPlanVisitor#buildNewPlanForInsert(LogicalInsert logicalInsert, ExecutionContext ec),因为该 Insert 语句比较简单,只有一个 values,蕴含拆分键和 auto_increment 列,只须要依据拆分键就能确定下发到 DN 的哪一个分片,在 CN 端无需更多操作,所以会简化执行打算,在 BuildFinalPlanVisitor#buildSingleTableInsert 转成 SingleTableOperation,并保留了分库分表规定,最终的执行打算如下:

执行打算变成 SingleTableOperation,至此,执行打算生成结束。执行器 SQL 语句生成执行打算后,将由执行器进行执行,执行入口在 TConnection#executeQuery:ResultCursor resultCursor = executor.execute(plan, executionContext); 而后会由 ExecutorHelper#execute 办法执行 ExecutionPlan.plan,也就是后面的 SingleTableOperation,执行策略有 CURSOR、TP_LOCAL、AP_LOCAL、MPP,Insert 类型根本都是走 CURSOR,接着依据执行打算拿对应的 Handler 进行解决,具体可查看 CommandHandlerFactoryMyImp 类,例如:SingleTableOperation 是 MySingleTableModifyHandler,LogicalInsert 是 LogicalInsertHandler。会在对应的 Handler 外面进行执行,个别会返回一个 Cursor,Cursor 外面会调用真正的执行过程,调用 Cursor.next 便会获取后果,Insert 语句的后果是 affect Rows,本条 SQL 会创立一个 MyPhyTableModifyCursor,入口在 MySingleTableModifyHandler#handleInner:…MyPhyTableModifyCursor modifyCursor = (MyPhyTableModifyCursor) repo.getCursorFactory().repoCursor(executionContext, logicalPlan);…affectRows = modifyCursor.batchUpdate();… 依据 ExecutionContext 和 SingleTableOperation 创立一个 MyPhyTableModifyCursor,而后间接执行:public int[] batchUpdate() {   try {       return handler.executeUpdate(this.plan);   } catch (SQLException e) {throw GeneralUtil.nestedException(e);   }} 这里的 this.plan 就是 SingleTableOperation,handler 是 PolarDB- X 的 CN 与 DN 间交互的 MyJdbcHandler,能够认为是执行物理打算的 handler,会依据 plan 生成真正的物理 SQL,下发到 DN 执行。因为这条 SQL 较为简单,CN 不须要过多解决,再举一例 Insert 语句:insert into sbtest(k) values(101),(102); 通过优化器后,该语句的执行打算是 LogicalInsert,如下图:

能够发现 sqlTemplate 为 INSERT\nINTO ? (id,k)\nVALUES(?, ?),表名可能要换成物理表名,同时减少了一列 id,因为该列是 auto_increment,会有一个全局的 sequence 表来记录该列的值,能力保障全局惟一,插入的 values 的参数保留在 ExecutionContext 的 params 中,如下图:

id 列的值会在真正生成物理执行打算的时候才会去获取,LogicalInsert 打算实用 LogicalInsertHandler 来执行,执行过程:public Cursor handle(RelNode logicalPlan, ExecutionContext executionContext){…   LogicalInsert logicalInsert = (LogicalInsert) logicalPlan;   …   if (!logicalInsert.isSourceSelect()) {affectRows = doExecute(logicalInsert, executionContext, handlerParams);   } else {affectRows = selectForInsert(logicalInsert, executionContext, handlerParams);   }   …} 会依据起源是否是 Select 语句抉择不同的执行形式,具体执行过程在 LogicalInsertHandler#executeInsert,如下:…// 生成主表的物理执行打算 final InsertWriter primaryWriter = logicalInsert.getPrimaryInsertWriter();List inputs = primaryWriter.getInput(executionContext);…// 如果有 GSI,生成 GSI 表的物理执行打算 final List gsiWriters = logicalInsert.getGsiInsertWriters();gsiWriters.stream().map(gsiWriter -> gsiWriter.getInput(executionContext))…;…// 执行所有物理执行打算 final int totalAffectRows = executePhysicalPlan(allPhyPlan, executionContext, schemaName, isBroadcast);… 主表生成物理执行打算过程中,会先获取 id 的值,因为 id 也是拆分键,所以两个 values 会依据拆分键定位到不同的物理分库分表上,会生成有两个物理执行打算,如下:

其中 dbIndex 是物理库名,tableNames 是物理表名,param 保留了这条 slqTemplate 的参数值,填充上就是残缺的 SQL,而后执行所有物理执行打算,就实现了该 SQL 的执行。物理执行 PolarDB- X 中 CN 与 DN 的交互都在 MyJdbcHandler 中,以 SingleTableOperation 为例,看看具体交互过程:public int[] executeUpdate(BaseQueryOperation phyTableModify) throws SQLException {…   // 获取物理执行打算的库名和参数   Pair> dbIndexAndParam =           phyTableModify.getDbIndexAndParam(executionContext.getParams() == null ? null : executionContext.getParams()               .getCurrentParameter(), executionContext);…   // 依据库名获取连贯   connection = getPhyConnection(transaction, rw, groupName);…    // 依据参数组成字符串 SQL    String sql = buildSql(sqlAndParam.sql, executionContext);…    // 依据连贯创立 prepareStatement    ps = prepareStatement(sql, connection, executionContext, isInsert, false);…    // 设置参数    ParameterMethod.setParameters(ps, sqlAndParam.param);…    // 执行    affectRow = ((PreparedStatement) ps).executeUpdate();…} 将物理执行打算发送到 DN 执行,执行实现后,依据 affectRow 返回到执行器,最终会把后果返回给用户,至此,一条残缺 SQL 就执行实现。小结 本文通过调试简略的 Insert 语句,介绍了 PolarDB- X 在解析、校验、优化器、执行器对 Insert 语句的解决,当然,Insert 语句也有很多非凡的用法,本文并没有一一概述,感兴趣的同学能够在相应代码处进行查看。原文链接:http://click.aliyun.com/m/100… 本文为阿里云原创内容,未经容许不得转载。

正文完
 0