概述

在《DDL的毕生(上)》中,咱们以增加全局二级索引为例,从DDL开发者的视角介绍了如何在DDL引擎框架下实现一个逻辑DDL。在本篇,作者将从DDL引擎的视角登程,向读者介绍DDL引擎的架构、实现,以及DDL引擎与DDL Job的交互逻辑。

DDL引擎相干概念

DDL Job

DDL Job是DDL引擎中的概念,它用于形容一个逻辑DDL。DDL引擎中,一个DDL Job对应一个逻辑DDL,DDL Job外部蕴含了执行一个逻辑DDL须要的一系列动作,因而在DDL引擎框架下,开发者新反对一条逻辑DDL,本质就是定义一个新的DDL Job。

DDL开发者定义的是动态的DDL Job,然而,DDL Job在运行时,还领有状态属性。这一属性次要由DDL引擎负责管理。当然,用户也能够执行无限的DDL运维指令以治理DDL Job的状态,实现对DDL执行过程的治理。下图是DDL Job的状态转移图,图中彩色加粗线框代表DDL Job执行的初态和终态,每个DDL Job状态之间的连线上的标注了能够执行的运维指令。

DDL Task

DDL Task是对DDL Job外部一系列行为的封装,如读写metaDb、在内存中计算、过程通信、向DN下发需执行的物理DDL等,这些行为都会被别离封装为DDL Task。因而一个DDL Job是由若干DDL Task形成的,这些Task须要按肯定程序被DDL引擎调度执行,DDL开发者能够应用Polardb-X的DDL引擎提供的DAG图框架形容Task之间的依赖关系和执行程序。在DDL引擎框架下,开发者定义一个新的DDL Job,本质就是定义若干DDL Task,而后用DAG图把它们组合起来。

DDL Task是DDL引擎实现DDL近似原子性的重要工具,而DDL原子性是DDL引擎谋求的指标。执行一条逻辑DDL波及到一系列操作,原子性要求这些操作要么都全副失效,要么全都不失效。具体来说,DDL引擎要求每个DDL Task都是幂等的,每个Task必须有对应的反向幂等办法(此办法在回滚Task时被DDL引擎调用)。DDL引擎执行DDL之前,会为该DDL生成由DDL Task组成的DAG图,并将其长久化到MetaDb,这相当于保障DDL原子性的undo Log。DDL引擎依照DAG图顺次执行Task直到整个DDL Job执行胜利或者彻底回滚。

Worker和Leader

在DDL引擎的视角下,CN节点被分为Worker节点和Leader节点(在集群中惟一)。Worker节点负责接管用户发来的DDL申请,它将收到的申请进行简略的本地校验,而后把DDL转换成DDL Job并推送至MetaDb,最初告诉Leader节点从MetaDb拉取DDL工作。

Leader节点负责DDL的执行,它从MetaDb拉取到DDL Job后,复原成DAG图的模式,并对Job中的Task进行拓扑排序,而后依照肯定的并行度进行调度、执行Task。

DDL 引擎源码目录

为了方面下文形容,本文先向读者阐明DDL引擎源码的目录。PolarDB-X的DDL引擎的源码位于com.alibaba.polardbx.executor.ddl.newengine,各模块阐明如下:

例子

上面,本文从DDL引擎的视角登程,向读者展现一条逻辑DDL是如何被DDL引擎调度并执行的。

DDL 任务调度

一条DDL语句由用户端的Mysql Client收回后,Worker节点接管到该DDL语句,通过简略的优化器解析后失去LogicalPlan,而后把该LogicalPlan分派到对应的DDL Handler,这个DDL Handler负责生成DDL Job。而后DDL Handler的公共基类的接口com.alibaba.polardbx.executor.handler.ddl.LogicalCommonDdlHandler#handleDdlRequest解决这个DDL申请,该函数调用com.alibaba.polardbx.executor.ddl.newengine.DdlEngineRequester#execute办法将之前生成的DDL Job及执行DDL所需的上下文写入MetaDB,并告诉Leader节点解决。至此,Worker节点实现了本人的工作,如果该DDL是阻塞型的,Worker节点会期待Leader执行完DDL后,返回Response给用户端;如果该DDL是非阻塞型的,Worker节点会间接返回。

Leader节点上运行着com.alibaba.polardbx.executor.ddl.newengine.DdlEngineScheduler#ddlDispatcherThread和com.alibaba.polardbx.executor.ddl.newengine.DdlEngineScheduler#ddlSchedulerThread两个线程,它们别离对应着实例级别的DdlJobDispatcher和Schema级别的DdlJobScheduler。其中DdlJobDispatcher从全局惟一的Ddl Request 队列中取出Ddl Request,而后将其调配到Schema级别的Ddl Job队列。DdlJobScheduler是Schema级别的,它负责从Schema级别的Ddl Job队列中一直生产Ddl Job,这个过程中,DdlJobScheduler利用Schema级别的信号量对并行生产Ddl Job的并行度进行管制(同一Schema上的最大线程数为10)。DdlJobScheduler生产Ddl Job,本质上是从Schema级别的Ddl Job队列中取出Ddl Job,而后分派给DdlJobExecutor(Job级别),DdlJobExecutor负责将DDL Job转交给DdlEngineDagExecutor。至此,DDL Job正式进入DDL引擎中的执行器DdlEngineDagExecutor,由后者接管DDL Job的执行。

须要补充阐明的是,从上文能够看出DDL引擎反对多个DDL并发执行,为保障须要雷同资源的DDL之间互斥执行,DDL引擎提供了长久化的读写锁机制。作为DDL开发者,只须要在定义DDL Job的时候,提前申明该DDL所需的Schema、Table资源。当执行DDL的时候,DDL引擎会在com.alibaba.polardbx.executor.ddl.newengine.DdlEngineRequester#execute生成DDL Job并保留至MetaDB之前,先依据该DDL Job所需的资源进行读写锁的acquire。

DDL 工作执行

DdlEngineDagExecutor负责DDL工作的执行,它会调用restoreAndRun办法,从MetaDb中拉取并复原DDL Job为DAG模式。而后调用run办法,依据DDL Job的以后状态执行相应的回调办法。public class DdlEngineDagExecutor

{     public static void restoreAndRun(String schemaName, Long jobId, ExecutionContext executionContext){        boolean restoreSuccess = DdlEngineDagExecutorMap.restore(schemaName, jobId, executionContext);        DdlEngineDagExecutor dag = DdlEngineDagExecutorMap.get(schemaName, jobId);        dag.run();    }     private void run() {        // Start the job state machine.        if (ddlContext.getState() == DdlState.QUEUED) {            onQueued();        }        if (ddlContext.getState() == DdlState.RUNNING) {            onRunning();        }        if (ddlContext.getState() == DdlState.ROLLBACK_RUNNING) {            onRollingBack();        }        // Handle the terminated states.        switch (ddlContext.getState()) {        case ROLLBACK_PAUSED:        case PAUSED:            onTerminated();            break;        case ROLLBACK_COMPLETED:        case COMPLETED:            onFinished();            break;        default:            break;        } }}

com.alibaba.polardbx.executor.ddl.newengine.DdlEngineDagExecutor#run会依据DDL Job以后的状态,执行对应的回调办法,这实质上是一个在DDL Job的状态转移图上游走的过程。

DDL Job的初始状态个别为QUEUED,它示意以后被DDL引擎新调度到Schema级别队列。此时run办法会根据此状态调用onQueued()办法。onQueued()办法的作用是将DDL Job的状态批改为RUNNING。

当DDL Job以后的状态是RUNNING时,run办法就会调用onRunning回调办法,依照DAG图的依赖关系执行DDL Job外部的Task。

private void onRunning() {    while (true) {        if (hasFailureOnState(DdlState.RUNNING)) {            if (waitForAllTasksToStop(50L, TimeUnit.MILLISECONDS)) {                LOGGER.info(String.format("JobId:[%s], all tasks stopped", ddlContext.getJobId()));                return;            } else {                continue;            }        }        if (executingTaskScheduler.isAllTaskDone()) {            updateDdlState(DdlState.RUNNING, DdlState.COMPLETED);            return;        }        if (executingTaskScheduler.hasMoreExecutable()) {            // fetch & execute next batch            submitDdlTask(executingTaskScheduler.pollBatch(), true, executingTaskScheduler);            continue;        }        //get some rest        sleep(50L);}

onRunning的流程如下:

  • 先查看以后DDL Job的状态是否为RUNNING,如果不是则间接返回。
  • 查看以后DAG图上是否还有待执行的Task节点,如果没有,则更新Job状态为COMPLETED,而后返回。
  • 如果以后DAG图上存在能够执行的Task,则用拓扑排序的形式,从DAG图上取出所有可执行的Task,依照并行度的限度,调用submitDdlTask办法并发执行。留神,Task并不一定能执行胜利,如果有Task执行失败,submitDdlTask办法会依照Task的开发者事后定义的失败策略,批改以后DDL Job的状态。最典型的,当有Task失败时,批改以后DDL Job状态为 PAUSED 或 ROLLBACK_RUNNING。具体的错误处理与复原机制,将在下一大节介绍。

如果有DDL Job的状态为ROLLBACK_RUNNING,run办法就会调用onRollingBack()回调办法,实现DDL的回滚。相干代码如下

private void onRollingBack() {    if (!allowRollback()) {        updateDdlState(DdlState.ROLLBACK_RUNNING, DdlState.ROLLBACK_PAUSED);        return;    }     reverseTaskDagForRollback();     // Rollback the tasks.    while (true) {        if (hasFailureOnState(DdlState.ROLLBACK_RUNNING)) {            if (waitForAllTasksToStop(50L, TimeUnit.MILLISECONDS)) {                LOGGER.info(String.format("JobId:[%s], all tasks stoped", ddlContext.getJobId()));                return;            } else {                continue;            }        }        if (reveredTaskScheduler.isAllTaskDone()) {            updateDdlState(DdlState.ROLLBACK_RUNNING, DdlState.ROLLBACK_COMPLETED);            return;        }        if (reveredTaskScheduler.hasMoreExecutable()) {            // fetch & execute next batch            submitDdlTask(reveredTaskScheduler.pollBatch(), false, reveredTaskScheduler);            continue;        }        //get some rest        sleep(50L);    }}

onRollingBack的流程如下:

  • 首先查看,在以后DAG图的执行进度下,是否容许回滚(一旦越过了fail point task,则不容许回滚)。如果不可回滚,则标记以后DDL Job的状态为PAUSED,而后退出。
  • 当DDL Job的状态为ROLLBACK_RUNNING时,可能还存在其余正在执行中的Task。此时DDL引擎将不再容许新的Task开始执行,并且会期待正在执行中的Task胜利或失败,此时该DDL Job就达到了一个一致性的状态。
  • 达了一致性状态后能够开始回滚流程,首先逆转DAG图的所有有向边,使整个DDL Job的执行流程反过来。而后依照逆转后的DAG图进行拓扑排序,取出之前执行结束或执行过但未实现的Task,执行它们的反向幂等办法。
  • 当DAG图中没有可执行的Task节点时,标记DDL Job状态为ROLLBACK_COMPLETED,回滚胜利。

其余状态的回调函数逻辑较为简单,这里不再赘述,请感兴趣的读者自行浏览代码。

错误处理与复原

DDL引擎谋求的指标之一是DDL的原子性,如果在执行DDL的过程中局部Task失败,DDL引擎须要采取适当措施让DDL Job变成齐全未执行或执行胜利的状态(即状态转移图中的终态)。DDL引擎采取的方法是给Task增加DdlExceptionAction属性,该属性用于批示DDL引擎执行Task出现异常时如何处理。DDL开发者能够在定义DDL Task的时候设置该属性。

DdlExceptionAction一共有4种取值

  • TRY_RECOVERY_THEN_PAUSE:执行该Task出现异常后,重试3次,如果仍失败,则将Task对应的DDL Job状态设置为PAUSED。
  • ROLLBACK:执行Task出现异常后,将该Task所在DDL Job状态设置为ROLLBACK_RUNNING,随后DDL引擎会依据该状态进行回滚DDL。
  • TRY_RECOVERY_THEN_ROLLBACK:执行该Task出现异常后,重试3次,如果仍失败,将该Task所在DDL Job状态设置为ROLLBACK_RUNNING,随后由DDL引擎回滚该DDL。
  • PAUSE:执行该Task出现异常后,将Task对应的DDL Job状态设置为PAUSED。

一般来说,PAUSED状态意味着该DDL Job没有达到终态,须要开发者染指解决,这罕用于出现异常后无奈复原的Task,或者对外界产生了影响以至无奈回滚的Task。前者举例,如drop table指令,一旦执行了删除元信息或删除物理表的Task,就无奈再复原到删除前的状态了,这时如果某Task失败且重试3次后仍失败,就会导致该DDL Job进入PAUSED状态;后者举例,如Polardb-X中大部分DDL Job都含有一个CDC打标的Task,用于对外生成bin log,该Task执行实现意味着外界曾经能够获取相应DDL的bin log,因而无奈回滚。

总结

本文从DDL引擎的视角,向读者介绍了DDL引擎的架构、实现,以及DDL引擎与DDL Job的交互逻辑。理解更多对于Polardb-X源码的解析,请继续关注咱们后续公布的文章。

原文链接

本文为阿里云原创内容,未经容许不得转载。