关于数据库:PolarDBX源码解读DDL的一生下

213次阅读

共计 6762 个字符,预计需要花费 17 分钟才能阅读完成。

概述

在《DDL 的毕生(上)》中,咱们以增加全局二级索引为例,从 DDL 开发者的视角介绍了如何在 DDL 引擎框架下实现一个逻辑 DDL。在本篇,作者将从 DDL 引擎的视角登程,向读者介绍 DDL 引擎的架构、实现,以及 DDL 引擎与 DDL Job 的交互逻辑。

DDL 引擎相干概念

DDL Job

DDL Job 是 DDL 引擎中的概念,它用于形容一个逻辑 DDL。DDL 引擎中,一个 DDL Job 对应一个逻辑 DDL,DDL Job 外部蕴含了执行一个逻辑 DDL 须要的一系列动作,因而在 DDL 引擎框架下,开发者新反对一条逻辑 DDL,本质就是定义一个新的 DDL Job。

DDL 开发者定义的是动态的 DDL Job,然而,DDL Job 在运行时,还领有状态属性。这一属性次要由 DDL 引擎负责管理。当然,用户也能够执行无限的 DDL 运维指令以治理 DDL Job 的状态,实现对 DDL 执行过程的治理。下图是 DDL Job 的状态转移图,图中彩色加粗线框代表 DDL Job 执行的初态和终态,每个 DDL Job 状态之间的连线上的标注了能够执行的运维指令。

DDL Task

DDL Task 是对 DDL Job 外部一系列行为的封装,如读写 metaDb、在内存中计算、过程通信、向 DN 下发需执行的物理 DDL 等,这些行为都会被别离封装为 DDL Task。因而一个 DDL Job 是由若干 DDL Task 形成的,这些 Task 须要按肯定程序被 DDL 引擎调度执行,DDL 开发者能够应用 Polardb- X 的 DDL 引擎提供的 DAG 图框架形容 Task 之间的依赖关系和执行程序。在 DDL 引擎框架下,开发者定义一个新的 DDL Job,本质就是定义若干 DDL Task,而后用 DAG 图把它们组合起来。

DDL Task 是 DDL 引擎实现 DDL 近似原子性的重要工具,而 DDL 原子性是 DDL 引擎谋求的指标。执行一条逻辑 DDL 波及到一系列操作,原子性要求这些操作要么都全副失效,要么全都不失效。具体来说,DDL 引擎要求每个 DDL Task 都是幂等的,每个 Task 必须有对应的反向幂等办法(此办法在回滚 Task 时被 DDL 引擎调用)。DDL 引擎执行 DDL 之前,会为该 DDL 生成由 DDL Task 组成的 DAG 图,并将其长久化到 MetaDb,这相当于保障 DDL 原子性的 undo Log。DDL 引擎依照 DAG 图顺次执行 Task 直到整个 DDL Job 执行胜利或者彻底回滚。

Worker 和 Leader

在 DDL 引擎的视角下,CN 节点被分为 Worker 节点和 Leader 节点(在集群中惟一)。Worker 节点负责接管用户发来的 DDL 申请,它将收到的申请进行简略的本地校验,而后把 DDL 转换成 DDL Job 并推送至 MetaDb,最初告诉 Leader 节点从 MetaDb 拉取 DDL 工作。

Leader 节点负责 DDL 的执行,它从 MetaDb 拉取到 DDL Job 后,复原成 DAG 图的模式,并对 Job 中的 Task 进行拓扑排序,而后依照肯定的并行度进行调度、执行 Task。

DDL 引擎源码目录

为了方面下文形容,本文先向读者阐明 DDL 引擎源码的目录。PolarDB- X 的 DDL 引擎的源码位于 com.alibaba.polardbx.executor.ddl.newengine,各模块阐明如下:

例子

上面,本文从 DDL 引擎的视角登程,向读者展现一条逻辑 DDL 是如何被 DDL 引擎调度并执行的。

DDL 任务调度

一条 DDL 语句由用户端的 Mysql Client 收回后,Worker 节点接管到该 DDL 语句,通过简略的优化器解析后失去 LogicalPlan,而后把该 LogicalPlan 分派到对应的 DDL Handler,这个 DDL Handler 负责生成 DDL Job。而后 DDL Handler 的公共基类的接口 com.alibaba.polardbx.executor.handler.ddl.LogicalCommonDdlHandler#handleDdlRequest 解决这个 DDL 申请,该函数调用 com.alibaba.polardbx.executor.ddl.newengine.DdlEngineRequester#execute 办法将之前生成的 DDL Job 及执行 DDL 所需的上下文写入 MetaDB,并告诉 Leader 节点解决。至此,Worker 节点实现了本人的工作,如果该 DDL 是阻塞型的,Worker 节点会期待 Leader 执行完 DDL 后,返回 Response 给用户端;如果该 DDL 是非阻塞型的,Worker 节点会间接返回。

Leader 节点上运行着 com.alibaba.polardbx.executor.ddl.newengine.DdlEngineScheduler#ddlDispatcherThread 和 com.alibaba.polardbx.executor.ddl.newengine.DdlEngineScheduler#ddlSchedulerThread 两个线程,它们别离对应着实例级别的 DdlJobDispatcher 和 Schema 级别的 DdlJobScheduler。其中 DdlJobDispatcher 从全局惟一的 Ddl Request 队列中取出 Ddl Request,而后将其调配到 Schema 级别的 Ddl Job 队列。DdlJobScheduler 是 Schema 级别的,它负责从 Schema 级别的 Ddl Job 队列中一直生产 Ddl Job,这个过程中,DdlJobScheduler 利用 Schema 级别的信号量对并行生产 Ddl Job 的并行度进行管制(同一 Schema 上的最大线程数为 10)。DdlJobScheduler 生产 Ddl Job,本质上是从 Schema 级别的 Ddl Job 队列中取出 Ddl Job,而后分派给 DdlJobExecutor(Job 级别),DdlJobExecutor 负责将 DDL Job 转交给 DdlEngineDagExecutor。至此,DDL Job 正式进入 DDL 引擎中的执行器 DdlEngineDagExecutor,由后者接管 DDL Job 的执行。

须要补充阐明的是,从上文能够看出 DDL 引擎反对多个 DDL 并发执行,为保障须要雷同资源的 DDL 之间互斥执行,DDL 引擎提供了长久化的读写锁机制。作为 DDL 开发者,只须要在定义 DDL Job 的时候,提前申明该 DDL 所需的 Schema、Table 资源。当执行 DDL 的时候,DDL 引擎会在 com.alibaba.polardbx.executor.ddl.newengine.DdlEngineRequester#execute 生成 DDL Job 并保留至 MetaDB 之前,先依据该 DDL Job 所需的资源进行读写锁的 acquire。

DDL 工作执行

DdlEngineDagExecutor 负责 DDL 工作的执行,它会调用 restoreAndRun 办法,从 MetaDb 中拉取并复原 DDL Job 为 DAG 模式。而后调用 run 办法,依据 DDL Job 的以后状态执行相应的回调办法。public class DdlEngineDagExecutor

{public static void restoreAndRun(String schemaName, Long jobId, ExecutionContext executionContext){boolean restoreSuccess = DdlEngineDagExecutorMap.restore(schemaName, jobId, executionContext);
        DdlEngineDagExecutor dag = DdlEngineDagExecutorMap.get(schemaName, jobId);
        dag.run();}
 
    private void run() {
        // Start the job state machine.
        if (ddlContext.getState() == DdlState.QUEUED) {onQueued();
        }
        if (ddlContext.getState() == DdlState.RUNNING) {onRunning();
        }
        if (ddlContext.getState() == DdlState.ROLLBACK_RUNNING) {onRollingBack();
        }
        // Handle the terminated states.
        switch (ddlContext.getState()) {
        case ROLLBACK_PAUSED:
        case PAUSED:
            onTerminated();
            break;
        case ROLLBACK_COMPLETED:
        case COMPLETED:
            onFinished();
            break;
        default:
            break;
        }
 }
}

com.alibaba.polardbx.executor.ddl.newengine.DdlEngineDagExecutor#run 会依据 DDL Job 以后的状态,执行对应的回调办法,这实质上是一个在 DDL Job 的状态转移图上游走的过程。

DDL Job 的初始状态个别为 QUEUED,它示意以后被 DDL 引擎新调度到 Schema 级别队列。此时 run 办法会根据此状态调用 onQueued() 办法。onQueued() 办法的作用是将 DDL Job 的状态批改为 RUNNING。

当 DDL Job 以后的状态是 RUNNING 时,run 办法就会调用 onRunning 回调办法,依照 DAG 图的依赖关系执行 DDL Job 外部的 Task。

private void onRunning() {while (true) {if (hasFailureOnState(DdlState.RUNNING)) {if (waitForAllTasksToStop(50L, TimeUnit.MILLISECONDS)) {LOGGER.info(String.format("JobId:[%s], all tasks stopped", ddlContext.getJobId()));
                return;
            } else {continue;}
        }
        if (executingTaskScheduler.isAllTaskDone()) {updateDdlState(DdlState.RUNNING, DdlState.COMPLETED);
            return;
        }
        if (executingTaskScheduler.hasMoreExecutable()) {
            // fetch & execute next batch
            submitDdlTask(executingTaskScheduler.pollBatch(), true, executingTaskScheduler);
            continue;
        }
        //get some rest
        sleep(50L);
}

onRunning 的流程如下:

  • 先查看以后 DDL Job 的状态是否为 RUNNING,如果不是则间接返回。
  • 查看以后 DAG 图上是否还有待执行的 Task 节点,如果没有,则更新 Job 状态为 COMPLETED,而后返回。
  • 如果以后 DAG 图上存在能够执行的 Task,则用拓扑排序的形式,从 DAG 图上取出所有可执行的 Task,依照并行度的限度,调用 submitDdlTask 办法并发执行。留神,Task 并不一定能执行胜利,如果有 Task 执行失败,submitDdlTask 办法会依照 Task 的开发者事后定义的失败策略,批改以后 DDL Job 的状态。最典型的,当有 Task 失败时,批改以后 DDL Job 状态为 PAUSED 或 ROLLBACK_RUNNING。具体的错误处理与复原机制,将在下一大节介绍。

如果有 DDL Job 的状态为 ROLLBACK_RUNNING,run 办法就会调用 onRollingBack() 回调办法,实现 DDL 的回滚。相干代码如下

private void onRollingBack() {if (!allowRollback()) {updateDdlState(DdlState.ROLLBACK_RUNNING, DdlState.ROLLBACK_PAUSED);
        return;
    }
 
    reverseTaskDagForRollback();
 
    // Rollback the tasks.
    while (true) {if (hasFailureOnState(DdlState.ROLLBACK_RUNNING)) {if (waitForAllTasksToStop(50L, TimeUnit.MILLISECONDS)) {LOGGER.info(String.format("JobId:[%s], all tasks stoped", ddlContext.getJobId()));
                return;
            } else {continue;}
        }
        if (reveredTaskScheduler.isAllTaskDone()) {updateDdlState(DdlState.ROLLBACK_RUNNING, DdlState.ROLLBACK_COMPLETED);
            return;
        }
        if (reveredTaskScheduler.hasMoreExecutable()) {
            // fetch & execute next batch
            submitDdlTask(reveredTaskScheduler.pollBatch(), false, reveredTaskScheduler);
            continue;
        }
        //get some rest
        sleep(50L);
    }
}

onRollingBack 的流程如下:

  • 首先查看,在以后 DAG 图的执行进度下,是否容许回滚(一旦越过了 fail point task,则不容许回滚)。如果不可回滚,则标记以后 DDL Job 的状态为 PAUSED,而后退出。
  • 当 DDL Job 的状态为 ROLLBACK_RUNNING 时,可能还存在其余正在执行中的 Task。此时 DDL 引擎将不再容许新的 Task 开始执行,并且会期待正在执行中的 Task 胜利或失败,此时该 DDL Job 就达到了一个一致性的状态。
  • 达了一致性状态后能够开始回滚流程,首先逆转 DAG 图的所有有向边,使整个 DDL Job 的执行流程反过来。而后依照逆转后的 DAG 图进行拓扑排序,取出之前执行结束或执行过但未实现的 Task,执行它们的反向幂等办法。
  • 当 DAG 图中没有可执行的 Task 节点时,标记 DDL Job 状态为 ROLLBACK_COMPLETED,回滚胜利。

其余状态的回调函数逻辑较为简单,这里不再赘述,请感兴趣的读者自行浏览代码。

错误处理与复原

DDL 引擎谋求的指标之一是 DDL 的原子性,如果在执行 DDL 的过程中局部 Task 失败,DDL 引擎须要采取适当措施让 DDL Job 变成齐全未执行或执行胜利的状态(即状态转移图中的终态)。DDL 引擎采取的方法是给 Task 增加 DdlExceptionAction 属性,该属性用于批示 DDL 引擎执行 Task 出现异常时如何处理。DDL 开发者能够在定义 DDL Task 的时候设置该属性。

DdlExceptionAction 一共有 4 种取值

  • TRY_RECOVERY_THEN_PAUSE:执行该 Task 出现异常后,重试 3 次,如果仍失败,则将 Task 对应的 DDL Job 状态设置为 PAUSED。
  • ROLLBACK:执行 Task 出现异常后,将该 Task 所在 DDL Job 状态设置为 ROLLBACK_RUNNING,随后 DDL 引擎会依据该状态进行回滚 DDL。
  • TRY_RECOVERY_THEN_ROLLBACK:执行该 Task 出现异常后,重试 3 次,如果仍失败,将该 Task 所在 DDL Job 状态设置为 ROLLBACK_RUNNING,随后由 DDL 引擎回滚该 DDL。
  • PAUSE:执行该 Task 出现异常后,将 Task 对应的 DDL Job 状态设置为 PAUSED。

一般来说,PAUSED 状态意味着该 DDL Job 没有达到终态,须要开发者染指解决,这罕用于出现异常后无奈复原的 Task,或者对外界产生了影响以至无奈回滚的 Task。前者举例,如 drop table 指令,一旦执行了删除元信息或删除物理表的 Task,就无奈再复原到删除前的状态了,这时如果某 Task 失败且重试 3 次后仍失败,就会导致该 DDL Job 进入 PAUSED 状态;后者举例,如 Polardb- X 中大部分 DDL Job 都含有一个 CDC 打标的 Task,用于对外生成 bin log,该 Task 执行实现意味着外界曾经能够获取相应 DDL 的 bin log,因而无奈回滚。

总结

本文从 DDL 引擎的视角,向读者介绍了 DDL 引擎的架构、实现,以及 DDL 引擎与 DDL Job 的交互逻辑。理解更多对于 Polardb- X 源码的解析,请继续关注咱们后续公布的文章。

原文链接

本文为阿里云原创内容,未经容许不得转载。

正文完
 0