本文是 TiCDC 源码解读的第四篇,次要内容是讲述 TiCDC 中 Scheduler 模块的工作原理。次要内容如下:
- Scheduler 模块的工作机制
- 两阶段调度原理
Scheduler 模块介绍
Scheduler 是 Changefeed 内的一个重要模块,它次要负责两件事件:
- 将一个 Changefeed 所有须要被同步的表,散发到不同的 TiCDC 节点上进行同步工作,以达到负载平衡的目标。
- 保护每张表的同步进度,同时推动 Changefeed 的全局同步进度。
本次介绍的 Scheduler 相干代码都在 tiflow/cdc/scheduler/internal/v3 目录下,蕴含多个文件夹,具体如下:
- Coordinator 运行在 Changefeed,是 Scheduler 的全局调度核心,负责发送表调度工作,保护全副同步状态。
- Agent 运行在 Processor,它接管表调度工作,汇报以后节点上的表同步状态给 Coordinator。
- Transport 是对底层 peer-2-peer 机制的封装,次要负责在 Coordinator 和 Agent 之间传递网络音讯。
- Member 次要是对集群中 Captures 状态的治理和保护。
- Replication 负责管理每张表的同步状态。
ReplicationSet
记录了每张表的同步信息,ReplicationManager
负责管理所有的ReplicationSet
。 - Scheduler 实现了多种不同的调度规定,能够由 OpenAPI 触发。
上面咱们具体介绍 Scheduler 模块的工作过程。
表 & 表调度工作 & 表同步单元
TiCDC 的工作是以表为单位,将数据同步到上游指标节点。所以对于一张表,能够通过如下模式来示意,该数据结构即刻画了一张表以后的同步进度。
type Table struct {
TableID model.TableID
Checkpoint uint64
ResolvedTs uint64
}
Scheduler 次要是通过 Add Table
/ Remove Table
/ Move Table
三类表调度工作来均衡每个 TiCDC 节点上的正在同步的表数量。对于这三类工作,能够被简略地刻画为:
- Add Table:「TableID, Checkpoint, CaptureID」,即在 CaptureID 所指代的 Capture 上从 Checkpoint 开始加载并且同步 TableID 所指代的表同步单元。
- Remove Table:「TableID, CaptureID」,即从 CaptureID 所指代的 Capture 上移除 TableID 所指代的表同步单元。
- Move Table:「TableID, Source CaptureID, Target CaptureID」,行将 TableID 所指代的表同步单元从 Source CaptureID 指代的 Capture 上移动到 Target CaptureID 指代的 Capture 之上。
表同步单元次要负责对一张表进行数据同步工作,在 TiCDC 内这由 Table Pipeline 实现。它的根本构造如下所示:
每个 Processor 开始同步一张表,即会为这张表创立一个 Table Pipeline,该过程能够分成两个局部:
- 加载表:创立 Table Pipeline,调配相干的系统资源。KV-Client 从上游 TiKV 拉取数据,经由 Puller 写入到 Sorter 中,然而此时不向上游指标数据系统写入数据。
- 复制表:在加载表的前提下,启动 Mounter 和 Sink 开始工作,从 Sorter 中读取数据,并且写入到上游指标数据系统。
Processor 实现了 TableExecutor 接口,如下所示:
type TableExecutor interface {
// AddTable add a new table with `startTs`
// if `isPrepare` is true, the 1st phase of the 2 phase scheduling protocol.
// if `isPrepare` is false, the 2nd phase.
AddTable(ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool,) (done bool, err error)
// IsAddTableFinished make sure the requested table is in the proper status
IsAddTableFinished(tableID model.TableID, isPrepare bool) (done bool)
// RemoveTable remove the table, return true if the table is already removed
RemoveTable(tableID model.TableID) (done bool)
// IsRemoveTableFinished convince the table is fully stopped.
// return false if table is not stopped
// return true and corresponding checkpoint otherwise.
IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool)
// GetAllCurrentTables should return all tables that are being run,
// being added and being removed.
//
// NOTE: two subsequent calls to the method should return the same
// result, unless there is a call to AddTable, RemoveTable, IsAddTableFinished
// or IsRemoveTableFinished in between two calls to this method.
GetAllCurrentTables() []model.TableID
// GetCheckpoint returns the local checkpoint-ts and resolved-ts of
// the processor. Its calculation should take into consideration all
// tables that would have been returned if GetAllCurrentTables had been
// called immediately before.
GetCheckpoint() (checkpointTs, resolvedTs model.Ts)
// GetTableStatus return the checkpoint and resolved ts for the given table
GetTableStatus(tableID model.TableID) tablepb.TableStatus
}
在 Changefeed 的整个运行周期中,Scheduler 都处于工作状态,Agent 利用 Processor 提供的上述接口办法实现,理论地执行表调度工作,获取到表调度工作进行的水平,以及表同步单元以后的运行状态等,以供后续做出调度决策。
Coordinator & Agent
Scheduler 模块由 Coordinator 和 Agent 两局部组成。Coordinator 运行在 Changefeed 内,Agent 运行在 Processor 内,Coordinator 和 Agent 即是 Changefeed 和 Processor 之间的通信接口。二者应用 peer-2-peer 框架实现网络数据交换,该框架基于 gRPC 实现。下图展现了一个有 3 个 TiCDC 节点的集群中,一个 Changefeed 的 Scheduler 模块的通信拓扑状况。能够看到,Coordinator 和 Agent 之间会替换两类网络音讯,音讯格局由 Protobuf 定义,源代码位于 tiflow/cdc/scheduler/schedulepb。
- 第一类是
Heartbeat
音讯,Coordinator 周期性地向 Agent 发送HeartbeatRequest
,Agent 返回相应的HeartbeatResponse
,该类音讯次要目标是让 Coordinator 可能及时获取到所有表在不同 TiCDC 节点上的同步状态。 - 第二类是
DispatchTable
音讯,在有对表进行调度的需要的时候,Coordinator 向特定 Agent 发送DispatchTableRequest
,后者返回DispatchTableResponse
,用于及时同步每一张表的调度停顿。
上面咱们从消息传递的角度,别离看一下 Coordinator 和 Agent 的工作逻辑。
Coordinator 工作过程
Coordinator 会收到来自 Agent 的 HeartbeatReponse
和 DispatchTableResponse
这两类音讯。Coordinator 内的 CaptureM
负责保护 Capture 的状态,在每次接管到 HeartbeatResponse
之后,都会更新本身保护的 Captures 的状态,包含每个 Capture 以后的存活状态,Capture 上以后同步的所有表信息。同时也生成新的 HeartbeatRequest
音讯,再次发送到所有 Agents。ReplicationM
负责保护所有表的同步状态,它接管到 HeartbeatResponse
和 DispatchTableResponse
之后,依照音讯中记录的表信息,更新本人保护的这些表对应的同步状态。CaptureM
提供了以后集群中存活的所有 Captures 信息,ReplicationM
则提供了所有表的同步状态信息,SchedulerM
以二者提供的信息为输出,以让每个 Capture 上的表同步单元数量尽可能平衡为指标,生成表调度工作,这些表调度工作会被 ReplicationM
进一步解决,生成 DispatchTableRequest
,而后发送到对应的 Agent。
Agent 工作过程
Agent 会从 Coordinator 收到 HeartbeatRequest
和 DispatchTableRequest
这两类音讯。对于前者,Agent 会收集以后运行在以后 TiCDC 节点上的所有表同步单元的运行状态,结构 HeartbeatRespone
。对于后者,则通过拜访 Processor 来增加或者移除表同步单元,获取到表调度工作的执行进度,结构对应的 DispatchTableResponse
,最初发送到 Coordinator。
Changefeed 同步进度计算
一个 changefeed 内同步了多张表。对于每张表,有 Checkpoint
和 ResolvedTs
来标识它的同步进度,Coordinator 通过 HeartbeatResponse
周期性地收集所有表的同步进度信息,而后就能够计算失去一个 Changefeed 的同步进度。具体计算方法如下:
// AdvanceCheckpoint tries to advance checkpoint and returns current checkpoint.
func (r *Manager) AdvanceCheckpoint(currentTables []model.TableID) (newCheckpointTs, newResolvedTs model.Ts) {
newCheckpointTs, newResolvedTs = math.MaxUint64, math.MaxUint64
for _, tableID := range currentTables {table, ok := r.tables[tableID]
if !ok {
// Can not advance checkpoint there is a table missing.
return checkpointCannotProceed, checkpointCannotProceed
}
// Find the minimum checkpoint ts and resolved ts.
if newCheckpointTs > table.Checkpoint.CheckpointTs {newCheckpointTs = table.Checkpoint.CheckpointTs}
if newResolvedTs > table.Checkpoint.ResolvedTs {newResolvedTs = table.Checkpoint.ResolvedTs}
}
return newCheckpointTs, newResolvedTs
}
从下面的示例代码中咱们能够看出,一个 Changefeed 的 Checkpoint 和 ResolvedTs,即是它同步的所有表的对应指标的最小值。Changefeed 的 Checkpoint 的意义是,它的所有表的同步进度都不小于该值,所有工夫戳小于该值的数据变更事件曾经被同步到了上游;ResolvedTs 指的是 TiCDC 以后曾经捕捉到了所有工夫戳小于该值的数据变更事件。除此之外的一个重点是,只有当所有表都被散发到 Capture 上并且创立了对应的表同步单元之后,才能够推动同步进度。
以上从消息传递的角度对 Scheduler 模块根本工作原理的简略介绍。上面咱们更加具体地聊一下 Scheduler 对表表度工作的解决机制。
两阶段调度原理
两阶段调度是 Scheduler 外部对表调度工作的执行原理,次要目标是升高 Move Table
操作对同步提早的影响。
上图展现了将表 X 从 Agent-1 所在的 Capture 上移动到 Agent-2 所在的 Capture 上的过程,具体如下:
- Coordinator 让 Agent-2 筹备表 X 的数据。
- Agent-2 在筹备好了数据之后,告知 Coordinator 这一音讯。
- Coordinator 发送音讯到 Agent-1,告知它移除表 X 的同步工作。
- Agent-1 在移除了表 X 的同步工作之后,告知 Coordinator 这一音讯。
- Coordinator 再次发送音讯到 Agent-2,开始向上游复制表 X 的数据。
- Agent-2 再次发送音讯到 Coordinator,告知表 X 正处于复制数据到上游的状态。
上述过程的重点是在将一张表从原节点上移除之前,先在指标节点上调配相干的资源,筹备须要被同步的数据。筹备数据的过程,往往颇为耗时,这是引起移动表过程耗时长的次要起因。两阶段调度机制通过提前在指标节点上筹备表数据,同时保障其余节点上有该表的同步单元正在向上游复制数据,保障了该表始终处于同步状态,这样能够缩小整个移动表过程的工夫开销,升高对同步提早的影响。
Replication set 状态转换过程
在上文中讲述的两阶段调度移动表的根本过程中,能够看到在 Agent-2 执行了前两步之后,表 X 在 Agent-1 和 Agent-2 的 Capture 之上,均存在表同步单元。不同点在于,Agent-1 此时正在复制表,Agent-2 此时只是加载表。
Coordinator 应用 ReplicationSet
来跟踪一张表在多个 Capture 上的表同步单元的状态,并以此保护了该表实在的同步状态。根本定义如下:
// ReplicationSet is a state machine that manages replication states.
type ReplicationSet struct {
TableID model.TableID
State ReplicationSetState
Primary model.CaptureID
Secondary model.CaptureID
Checkpoint tablepb.Checkpoint
...
}
TableID
惟一地标识了一张表,State
则记录了以后该 ReplicationSet
所处的状态,Primary
记录了以后正在复制该表的 Capture 的 ID,而 Secondary
则记录了以后曾经加载了该表,然而尚未同步数据的 Capture 的 ID,Checkpoint 则记录了该表以后的同步状态。
在对表进行调度的过程中,一个 ReplicationSet
会处于多种状态。如下图所示:
- Absent 示意没有任何一个节点加载了该表的同步单元。
- Prepare 可能呈现在两种状况。第一种是表正处于
Absent
状态,调用Add Table
在某一个 Capture 上开始加载该表。第二种状况是须要将正在被同步的表移动到其余节点上,发动Move Table
申请,在指标节点上加载表。 - Commit 指的是在至多一个节点上,曾经筹备好了能够同步到上游的数据。
- Replicating 指的是有且只有一个节点正在复制该表的数据到上游指标零碎。
- Removing 阐明以后只有一个节点上加载了表的同步单元,并且以后正在进行向上游同步数据,同时开释该同步单元。个别产生在上游执行了
Drop table
的状况。在一张表被齐全移除之后,即再次回到 Absent 状态。
上面假如存在一张表 table-0,它在被调度时产生的各种状况。首先思考如何将表 X 加载到 Agent-0 所在的 Capture 之上,并且向上游复制数据。
首先 table-0 处于 Absent
状态,此时发动 Add Table
调度工作,让 Agent-0
从 checkpoint = 5 开始该表的同步工作,Agent-0
会创立相应的表同步单元,和上游 TiKV 集群中的 Regions 建设网络连接,拉取数据。当筹备好了能够向上游同步的数据之后,Agent-0
告知 Coordinator 该表同步单元以后曾经处于 Prepared
状态。Coordinator 会依据该音讯,将该 ReplicationSet
从 Prepare
切换到 Commit
状态,而后发动第二条音讯到 Agent-0
,让它开始从 checkpoint = 5
从上游开始同步数据。当 Agent-0
实现相干操作,返回响应到 Coordinator 之后,Coordinator 再次更新 table-0 的 ReplicationSet
,进入到 Replicating
状态。
再来看一下移除表 table-0 的过程,如上所示。最开始正处于 Replicating
状态,并且在 Capture-0 上同步。Coordinator 向 Agent-0
发送 Remove Table
申请,Agent-0
通过 Processor 来勾销该表的同步单元,开释相干的资源,待所有资源开释结束之后,返回音讯到 Coordinator,告知该表以后曾经没有被同步了,同时带有最初同步的 Checkpoint。在 Agent-0
正在勾销表的过程中,Coordinator 和 Agent-0 之间仍旧有放弃通过 Heartbeat 进行状态告诉,Coordinator 能够及时地晓得以后表 t = 0 正处于 Removing
状态,在后续收到表曾经被齐全勾销的音讯之后,则从 Removing
切换到 Absent
状态。
最初再来看一下 Move Table
,它实质上是先在指标节点 Add Table
,而后在原节点上 Remove Table
。
如上图所示,首先假如 table-0 正在 capture-0 上被同步,处于 Replicating
状态,当初须要将 table-0 从 capture-0 移动到 capture-1。首先 Coordinator 将 ReplicationSet
的状态从 Replicating
转移到 Prepare
,同时向 Agent-1
发动增加 table-0 的申请,Agent-1
加载完了该表的同步单元之后,会通知 Coordinator 这一音讯,此时 Coordinator 会再次更新 table-0 到 Commit
状态。此时能够晓得表 table-0 目前正在 capture-0 上被同步,在 agent-1 上也曾经有了它的同步单元和可同步数据。Coordinator 再向 Agent-0
上发送 Remove Table
,Agent-0
收到调度批示之后,进行并且开释表 table-0 的同步单元,再向 Coordinator 返回执行后果。Coordinator 在得悉 capture-0 上曾经没有该表的同步单元之后,将 Primary 从 capture-0 批改为 capture-1,告知 Agent-1 开始向上游同步表 table-0 的数据,Coordinator 在收到从 Agent-1 传来的响应之后,再次更新 table-0 的 状态为 Replicating
。
从下面三种调度操作中,能够看到 Coordinator
保护的 ReplicationSet 记录了整个调度过程中,一张表的同步状态,它由从 Agent 处收到的各种音讯来驱动状态的扭转。同时能够看到音讯中还有 Checkpoint 和 Resolved Ts 在不断更新。Coordinator 在解决收到的 Checkpoint 和 ResolvedTs 时,保障二者均不会产生会退。
总结
以上就是本文的全部内容。心愿在浏览下面的内容之后,读者可能对 TiCDC 的 Scheduler 模块的工作原理有一个根本的理解。