乐趣区

关于数据库:TiCDC-源码阅读四TiCDC-Scheduler-工作原理解析

本文是 TiCDC 源码解读的第四篇,次要内容是讲述 TiCDC 中 Scheduler 模块的工作原理。次要内容如下:

  1. Scheduler 模块的工作机制
  2. 两阶段调度原理

Scheduler 模块介绍

Scheduler 是 Changefeed 内的一个重要模块,它次要负责两件事件:

  1. 将一个 Changefeed 所有须要被同步的表,散发到不同的 TiCDC 节点上进行同步工作,以达到负载平衡的目标。
  2. 保护每张表的同步进度,同时推动 Changefeed 的全局同步进度。

本次介绍的 Scheduler 相干代码都在 tiflow/cdc/scheduler/internal/v3 目录下,蕴含多个文件夹,具体如下:

  • Coordinator 运行在 Changefeed,是 Scheduler 的全局调度核心,负责发送表调度工作,保护全副同步状态。
  • Agent 运行在 Processor,它接管表调度工作,汇报以后节点上的表同步状态给 Coordinator。
  • Transport 是对底层 peer-2-peer 机制的封装,次要负责在 Coordinator 和 Agent 之间传递网络音讯。
  • Member 次要是对集群中 Captures 状态的治理和保护。
  • Replication 负责管理每张表的同步状态。ReplicationSet 记录了每张表的同步信息,ReplicationManager 负责管理所有的 ReplicationSet
  • Scheduler 实现了多种不同的调度规定,能够由 OpenAPI 触发。

上面咱们具体介绍 Scheduler 模块的工作过程。

表 & 表调度工作 & 表同步单元

TiCDC 的工作是以表为单位,将数据同步到上游指标节点。所以对于一张表,能够通过如下模式来示意,该数据结构即刻画了一张表以后的同步进度。

type Table struct {
    TableID model.TableID
    Checkpoint uint64
    ResolvedTs uint64
}

Scheduler 次要是通过 Add Table / Remove Table / Move Table 三类表调度工作来均衡每个 TiCDC 节点上的正在同步的表数量。对于这三类工作,能够被简略地刻画为:

  • Add Table:「TableID, Checkpoint, CaptureID」,即在 CaptureID 所指代的 Capture 上从 Checkpoint 开始加载并且同步 TableID 所指代的表同步单元。
  • Remove Table:「TableID, CaptureID」,即从 CaptureID 所指代的 Capture 上移除 TableID 所指代的表同步单元。
  • Move Table:「TableID, Source CaptureID, Target CaptureID」,行将 TableID 所指代的表同步单元从 Source CaptureID 指代的 Capture 上移动到 Target CaptureID 指代的 Capture 之上。

表同步单元次要负责对一张表进行数据同步工作,在 TiCDC 内这由 Table Pipeline 实现。它的根本构造如下所示:

每个 Processor 开始同步一张表,即会为这张表创立一个 Table Pipeline,该过程能够分成两个局部:

  • 加载表:创立 Table Pipeline,调配相干的系统资源。KV-Client 从上游 TiKV 拉取数据,经由 Puller 写入到 Sorter 中,然而此时不向上游指标数据系统写入数据。
  • 复制表:在加载表的前提下,启动 Mounter 和 Sink 开始工作,从 Sorter 中读取数据,并且写入到上游指标数据系统。

Processor 实现了 TableExecutor 接口,如下所示:

type TableExecutor interface {
        // AddTable add a new table with `startTs`
        // if `isPrepare` is true, the 1st phase of the 2 phase scheduling protocol.
        // if `isPrepare` is false, the 2nd phase.
        AddTable(ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool,) (done bool, err error)

        // IsAddTableFinished make sure the requested table is in the proper status
        IsAddTableFinished(tableID model.TableID, isPrepare bool) (done bool)

        // RemoveTable remove the table, return true if the table is already removed
        RemoveTable(tableID model.TableID) (done bool)
        // IsRemoveTableFinished convince the table is fully stopped.
        // return false if table is not stopped
        // return true and corresponding checkpoint otherwise.
        IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool)

        // GetAllCurrentTables should return all tables that are being run,
        // being added and being removed.
        //
        // NOTE: two subsequent calls to the method should return the same
        // result, unless there is a call to AddTable, RemoveTable, IsAddTableFinished
        // or IsRemoveTableFinished in between two calls to this method.
        GetAllCurrentTables() []model.TableID

        // GetCheckpoint returns the local checkpoint-ts and resolved-ts of
        // the processor. Its calculation should take into consideration all
        // tables that would have been returned if GetAllCurrentTables had been
        // called immediately before.
        GetCheckpoint() (checkpointTs, resolvedTs model.Ts)

        // GetTableStatus return the checkpoint and resolved ts for the given table
        GetTableStatus(tableID model.TableID) tablepb.TableStatus
}

在 Changefeed 的整个运行周期中,Scheduler 都处于工作状态,Agent 利用 Processor 提供的上述接口办法实现,理论地执行表调度工作,获取到表调度工作进行的水平,以及表同步单元以后的运行状态等,以供后续做出调度决策。

Coordinator & Agent

Scheduler 模块由 Coordinator 和 Agent 两局部组成。Coordinator 运行在 Changefeed 内,Agent 运行在 Processor 内,Coordinator 和 Agent 即是 Changefeed 和 Processor 之间的通信接口。二者应用 peer-2-peer 框架实现网络数据交换,该框架基于 gRPC 实现。下图展现了一个有 3 个 TiCDC 节点的集群中,一个 Changefeed 的 Scheduler 模块的通信拓扑状况。能够看到,Coordinator 和 Agent 之间会替换两类网络音讯,音讯格局由 Protobuf 定义,源代码位于 tiflow/cdc/scheduler/schedulepb。

  • 第一类是 Heartbeat 音讯,Coordinator 周期性地向 Agent 发送 HeartbeatRequest,Agent 返回相应的 HeartbeatResponse,该类音讯次要目标是让 Coordinator 可能及时获取到所有表在不同 TiCDC 节点上的同步状态。
  • 第二类是 DispatchTable 音讯,在有对表进行调度的需要的时候,Coordinator 向特定 Agent 发送 DispatchTableRequest,后者返回 DispatchTableResponse,用于及时同步每一张表的调度停顿。

上面咱们从消息传递的角度,别离看一下 Coordinator 和 Agent 的工作逻辑。

Coordinator 工作过程

Coordinator 会收到来自 Agent 的 HeartbeatReponseDispatchTableResponse 这两类音讯。Coordinator 内的 CaptureM 负责保护 Capture 的状态,在每次接管到 HeartbeatResponse 之后,都会更新本身保护的 Captures 的状态,包含每个 Capture 以后的存活状态,Capture 上以后同步的所有表信息。同时也生成新的 HeartbeatRequest 音讯,再次发送到所有 Agents。ReplicationM 负责保护所有表的同步状态,它接管到 HeartbeatResponseDispatchTableResponse 之后,依照音讯中记录的表信息,更新本人保护的这些表对应的同步状态。CaptureM 提供了以后集群中存活的所有 Captures 信息,ReplicationM 则提供了所有表的同步状态信息,SchedulerM 以二者提供的信息为输出,以让每个 Capture 上的表同步单元数量尽可能平衡为指标,生成表调度工作,这些表调度工作会被 ReplicationM 进一步解决,生成 DispatchTableRequest,而后发送到对应的 Agent。

Agent 工作过程

Agent 会从 Coordinator 收到 HeartbeatRequestDispatchTableRequest 这两类音讯。对于前者,Agent 会收集以后运行在以后 TiCDC 节点上的所有表同步单元的运行状态,结构 HeartbeatRespone。对于后者,则通过拜访 Processor 来增加或者移除表同步单元,获取到表调度工作的执行进度,结构对应的 DispatchTableResponse,最初发送到 Coordinator。

Changefeed 同步进度计算

一个 changefeed 内同步了多张表。对于每张表,有 CheckpointResolvedTs 来标识它的同步进度,Coordinator 通过 HeartbeatResponse 周期性地收集所有表的同步进度信息,而后就能够计算失去一个 Changefeed 的同步进度。具体计算方法如下:

// AdvanceCheckpoint tries to advance checkpoint and returns current checkpoint.
func (r *Manager) AdvanceCheckpoint(currentTables []model.TableID) (newCheckpointTs, newResolvedTs model.Ts) {
    newCheckpointTs, newResolvedTs = math.MaxUint64, math.MaxUint64
    for _, tableID := range currentTables {table, ok := r.tables[tableID]
        if !ok {
            // Can not advance checkpoint there is a table missing.
            return checkpointCannotProceed, checkpointCannotProceed
        }
        // Find the minimum checkpoint ts and resolved ts.
        if newCheckpointTs > table.Checkpoint.CheckpointTs {newCheckpointTs = table.Checkpoint.CheckpointTs}
        if newResolvedTs > table.Checkpoint.ResolvedTs {newResolvedTs = table.Checkpoint.ResolvedTs}
    }
    return newCheckpointTs, newResolvedTs
}

从下面的示例代码中咱们能够看出,一个 Changefeed 的 Checkpoint 和 ResolvedTs,即是它同步的所有表的对应指标的最小值。Changefeed 的 Checkpoint 的意义是,它的所有表的同步进度都不小于该值,所有工夫戳小于该值的数据变更事件曾经被同步到了上游;ResolvedTs 指的是 TiCDC 以后曾经捕捉到了所有工夫戳小于该值的数据变更事件。除此之外的一个重点是,只有当所有表都被散发到 Capture 上并且创立了对应的表同步单元之后,才能够推动同步进度。

以上从消息传递的角度对 Scheduler 模块根本工作原理的简略介绍。上面咱们更加具体地聊一下 Scheduler 对表表度工作的解决机制。

两阶段调度原理

两阶段调度是 Scheduler 外部对表调度工作的执行原理,次要目标是升高 Move Table 操作对同步提早的影响。

上图展现了将表 X 从 Agent-1 所在的 Capture 上移动到 Agent-2 所在的 Capture 上的过程,具体如下:

  1. Coordinator 让 Agent-2 筹备表 X 的数据。
  2. Agent-2 在筹备好了数据之后,告知 Coordinator 这一音讯。
  3. Coordinator 发送音讯到 Agent-1,告知它移除表 X 的同步工作。
  4. Agent-1 在移除了表 X 的同步工作之后,告知 Coordinator 这一音讯。
  5. Coordinator 再次发送音讯到 Agent-2,开始向上游复制表 X 的数据。
  6. Agent-2 再次发送音讯到 Coordinator,告知表 X 正处于复制数据到上游的状态。

上述过程的重点是在将一张表从原节点上移除之前,先在指标节点上调配相干的资源,筹备须要被同步的数据。筹备数据的过程,往往颇为耗时,这是引起移动表过程耗时长的次要起因。两阶段调度机制通过提前在指标节点上筹备表数据,同时保障其余节点上有该表的同步单元正在向上游复制数据,保障了该表始终处于同步状态,这样能够缩小整个移动表过程的工夫开销,升高对同步提早的影响。

Replication set 状态转换过程

在上文中讲述的两阶段调度移动表的根本过程中,能够看到在 Agent-2 执行了前两步之后,表 X 在 Agent-1 和 Agent-2 的 Capture 之上,均存在表同步单元。不同点在于,Agent-1 此时正在复制表,Agent-2 此时只是加载表。

Coordinator 应用 ReplicationSet 来跟踪一张表在多个 Capture 上的表同步单元的状态,并以此保护了该表实在的同步状态。根本定义如下:

// ReplicationSet is a state machine that manages replication states.
type ReplicationSet struct {
    TableID    model.TableID
    State      ReplicationSetState
    Primary model.CaptureID
    Secondary model.CaptureID
    Checkpoint tablepb.Checkpoint
    ...
}

TableID 惟一地标识了一张表,State 则记录了以后该 ReplicationSet 所处的状态,Primary 记录了以后正在复制该表的 Capture 的 ID,而 Secondary 则记录了以后曾经加载了该表,然而尚未同步数据的 Capture 的 ID,Checkpoint 则记录了该表以后的同步状态。
在对表进行调度的过程中,一个 ReplicationSet 会处于多种状态。如下图所示:

  • Absent 示意没有任何一个节点加载了该表的同步单元。
  • Prepare 可能呈现在两种状况。第一种是表正处于 Absent 状态,调用 Add Table 在某一个 Capture 上开始加载该表。第二种状况是须要将正在被同步的表移动到其余节点上,发动 Move Table 申请,在指标节点上加载表。
  • Commit 指的是在至多一个节点上,曾经筹备好了能够同步到上游的数据。
  • Replicating 指的是有且只有一个节点正在复制该表的数据到上游指标零碎。
  • Removing 阐明以后只有一个节点上加载了表的同步单元,并且以后正在进行向上游同步数据,同时开释该同步单元。个别产生在上游执行了 Drop table 的状况。在一张表被齐全移除之后,即再次回到 Absent 状态。

上面假如存在一张表 table-0,它在被调度时产生的各种状况。首先思考如何将表 X 加载到 Agent-0 所在的 Capture 之上,并且向上游复制数据。

首先 table-0 处于 Absent 状态,此时发动 Add Table 调度工作,让 Agent-0 从 checkpoint = 5 开始该表的同步工作,Agent-0 会创立相应的表同步单元,和上游 TiKV 集群中的 Regions 建设网络连接,拉取数据。当筹备好了能够向上游同步的数据之后,Agent-0 告知 Coordinator 该表同步单元以后曾经处于 Prepared 状态。Coordinator 会依据该音讯,将该 ReplicationSetPrepare 切换到 Commit 状态,而后发动第二条音讯到 Agent-0,让它开始从 checkpoint = 5 从上游开始同步数据。当 Agent-0 实现相干操作,返回响应到 Coordinator 之后,Coordinator 再次更新 table-0 的 ReplicationSet,进入到 Replicating 状态。

再来看一下移除表 table-0 的过程,如上所示。最开始正处于 Replicating 状态,并且在 Capture-0 上同步。Coordinator 向 Agent-0 发送 Remove Table 申请,Agent-0 通过 Processor 来勾销该表的同步单元,开释相干的资源,待所有资源开释结束之后,返回音讯到 Coordinator,告知该表以后曾经没有被同步了,同时带有最初同步的 Checkpoint。在 Agent-0 正在勾销表的过程中,Coordinator 和 Agent-0 之间仍旧有放弃通过 Heartbeat 进行状态告诉,Coordinator 能够及时地晓得以后表 t = 0 正处于 Removing 状态,在后续收到表曾经被齐全勾销的音讯之后,则从 Removing 切换到 Absent 状态。

最初再来看一下 Move Table,它实质上是先在指标节点 Add Table,而后在原节点上 Remove Table

如上图所示,首先假如 table-0 正在 capture-0 上被同步,处于 Replicating 状态,当初须要将 table-0 从 capture-0 移动到 capture-1。首先 Coordinator 将 ReplicationSet 的状态从 Replicating 转移到 Prepare,同时向 Agent-1 发动增加 table-0 的申请,Agent-1 加载完了该表的同步单元之后,会通知 Coordinator 这一音讯,此时 Coordinator 会再次更新 table-0 到 Commit 状态。此时能够晓得表 table-0 目前正在 capture-0 上被同步,在 agent-1 上也曾经有了它的同步单元和可同步数据。Coordinator 再向 Agent-0 上发送 Remove TableAgent-0 收到调度批示之后,进行并且开释表 table-0 的同步单元,再向 Coordinator 返回执行后果。Coordinator 在得悉 capture-0 上曾经没有该表的同步单元之后,将 Primary 从 capture-0 批改为 capture-1,告知 Agent-1 开始向上游同步表 table-0 的数据,Coordinator 在收到从 Agent-1 传来的响应之后,再次更新 table-0 的 状态为 Replicating

从下面三种调度操作中,能够看到 Coordinator 保护的 ReplicationSet 记录了整个调度过程中,一张表的同步状态,它由从 Agent 处收到的各种音讯来驱动状态的扭转。同时能够看到音讯中还有 Checkpoint 和 Resolved Ts 在不断更新。Coordinator 在解决收到的 Checkpoint 和 ResolvedTs 时,保障二者均不会产生会退。

总结

以上就是本文的全部内容。心愿在浏览下面的内容之后,读者可能对 TiCDC 的 Scheduler 模块的工作原理有一个根本的理解。

退出移动版