这一次 TiCDC 浏览系列文章将会从源码层面来解说 TiCDC 的基本原理,心愿可能帮忙读者深刻地理解 TiCDC。本篇文章是这一系列文章的第一期,次要叙述了 TiCDC 的目标、架构和数据同步链路,旨在让读者可能初步理解 TiCDC,为浏览其余源码阅读文章起到一个引子的作用。
TiCDC 是什么?
TiCDC 是 TiDB 生态中的一个数据同步工具,它可能将上游 TiDB 集群中产生的增量数据实时的同步到上游目的地。除了能够将 TiDB 的数据同步至 MySQL 兼容的数据库之外,还提供了同步至 Kafka 和 s3 的能力,反对 canal 和 avro 等多种凋谢音讯协定供其余零碎订阅数据变更。
上图形容了 TiCDC 在整个 TiDB 生态系统中的地位,它处于一个上游 TiDB 集群和上游其它数据系统的两头,充当了一个数据传输管道的角色。
TiCDC 典型的利用场景为搭建多套 TiDB 集群间的主从复制,或者配合其余异构的零碎搭建数据集成服务。以下将从这两方面为大家介绍:
主从复制
应用 TiCDC 来搭建主从复制的 TiDB 集群时,依据从集群的应用目标,可能会对主从集群的数据一致性有不同的要求。目前 TiCDC 提供了如下两种级别的数据一致性:
- 快照一致性:通过开启 Syncpoint 性能,可能在实时的同步过程中,保障上下游集群在某个 TSO 的具备快照一致性。具体内容能够参考文档:TiDB 主从集群的数据校验
- 最终一致性:通过开启 Redo Log 性能,可能在上游集群产生故障的时候,保障上游集群的数据达到最终统一的状态。具体内容能够参考文档:应用 Redo Log 确保数据一致性
数据集成
目前 TiCDC 提供将变更数据同步至 Kafka 和 S3 的能力,用户能够应用该性能将 TiDB 的数据集成进其余数据处理系统。在这种利用场景下,用户对数据采集的实时性和反对的音讯格局的多样性会由较高的要求。以后咱们提供了多种可供订阅的音讯格局 (能够参考 配置 Kafka),并在最近一段时间内对该场景的同步速度做了一系列优化,读者能够从之后的文章中理解相干内容。
TiCDC 的架构
确保数据传输的稳定性、实时性和一致性是 TiCDC 设计的外围指标。为了实现该指标,TiCDC 采纳了分布式架构和无状态的服务模式,具备高可用和程度扩大的个性。想要深刻理解 CDC 的架构,咱们须要先意识上面这些概念:
零碎组件
-
TiKV:
- TiKV 外部的 CDC 组件会扫描和拼装 kv change log。
- 提供输入 kv change logs 的接口供 TiCDC 订阅。
-
Capture:
- TiCDC 运行过程,多个 capture 组成一个 TiCDC 集群。
- 同步工作将会依照肯定的调度规定被划分给一个或者多个 Capture 解决。
逻辑概念
- KV change log:TiKV 提供的暗藏大部分外部实现细节的的 row changed event,TiCDC 从 TiKV 拉取这些 Event。
- Owner:一种 Capture 的角色,每个 TiCDC 集群同一时刻最多只存在一个 Capture 具备 Owner 身份,它负责响应用户的申请、调度集群和同步 DDL 等工作。
- ChangeFeed:由用户启动同步工作,一个同步工作中可能蕴含多张表,这些表会被 Owner 划分为多个子任务分配到不同的 Capture 进行解决。
- Processor:Capture 外部的逻辑线程,一个 Capture 节点中能够运行多个 Processor。每个 Processor 负责解决 ChangeFeed 的一个子工作。
- TablePipeline:Processor 外部的数据同步管道,每个 TablePipeline 负责解决一张表,表的数据会在这个管道中解决和流转,最初被发送到上游。
根本个性
- 分布式:具备高可用能力,反对程度扩大。
- 实时性:惯例场景下提供秒级的同步能力。
- 有序性:输入的数据行级别有序,并且提供 At least once 输入的保障。
- 原子性:提供单表事务的原子性。
TiCDC 的生命周期
意识了以上的基本概念之后,咱们能够持续理解一下 TiCDC 的生命周期。
Owner
首先,咱们须要晓得,TiCDC 集群的元数据都会被存储到 PD 内置的 Etcd 中。当一个 TiCDC 集群被部署起来时,每个 Capture 都会向 Etcd 注册本人的信息,这样 Capture 就可能发现彼此的存在。接着,各个 Capture 之间会竞选出一个 Owner,Owner 选举流程在 cdc/capture.go 文件的 campaignOwner
函数内,上面的代码删除了一些错误处理逻辑和参数设置,只保留次要的流程:
for {
// Campaign to be the owner, it blocks until it been elected.
err := c.campaign(ctx)
...
owner := c.newOwner(c.upstreamManager)
c.setOwner(owner)
...
err = c.runEtcdWorker(ownerCtx, owner,...)
c.owner.AsyncStop()
c.setOwner(nil)
}
每一个 Capture 过程都会调用该函数,进入一个竞选的 Loop 中,每个 Capture 都会继续一直地在竞选 Owner。同一时间段内只有一个 Capture 会入选,其它候选者则会阻塞在这个 Loop 中,直到上一个 Owner 退出就会有新的 Capture 入选。
最初真正的竞选是通过在 c.campaign(ctx)
函数外部调用 Etcd 的 election.Campaign
接口实现的,Etcd 保障了同一时间只有一个 Key 可能入选为 Owner。因为 Etcd 是高可用的服务,TiCDC 借助其力量实现了人造的高可用。
竞选到 Owner 角色的 Capture 会作为集群的管理者,也负责监听和响应来自用户的申请。
ChangeFeed
TiCDC 集群启动结束之后,用户即可应用 TiCDC 命令行工具或者 OpenAPI 创立 ChangeFeed (同步工作)。
一个 ChangeFeed 被创立之后,Owner 会负责对它进行检查和初始化,而后将以表为单位将划分为多个子任务分配给集群内的 Capture 进行同步。同步工作初始化的代码在 cdc/owner/changefeed.go 文件中。该函数的次要工作为:
- 向上游查问该同步工作须要同步的表的 Schema 信息,为接下来调度器调配同步工作做筹备。
- 创立一个
ddlPuller
来拉取 DDL。因为咱们须要在同步的过程中放弃多个 Capture 节点上 Schema 信息的统一,并且保障 DML 与 DDL 同步程序。所以咱们抉择仅由 Owner 这个领有 ChangeFeed 所以信息的角色同步 DDL。 - 创立
scheduler
,它会负责把该同步工作拆分成多个子工作,发送给别的 Capture 进行解决。
Capture 接管到 Owner 发送过去的子工作之后,就会创立出一个 Processor 来解决它接管到的子工作,Processor 会为每张表创立出一个 TablePipeline 来同步对应的表的数据。Processor 会周期性的把每个 TablePipeline 的状态和进度信息汇报给 Owner,由 Owner 来决定是否进行调度和状态更新等操作。
总而言之,TiCDC 集群和同步工作的状态信息会在 Owner 和 Processor 之间流转,而用户须要同步的数据信息则通过 TablePipeline 这个管道传递到上游,下一个大节将会对 TablePipeline 进行解说,了解了它,就可能了解 TiCDC 是怎么同步数据的。
TablePipeline
顾名思义,TablePipeline 是一个表数据流动和解决的管道。Processor 接管到一个同步子工作之后,会为每一张表创立出一个 TablePipeline,如上图所示,它次要由 Puller、Sorter、Mounter 和 Sink 形成。
- Puller:负责拉取对应表在上游的变更数据,它暗藏了外部大量的实现细节,包含与 TiKV CDC 模块建设 gRPC 连贯和反解码数据流等。
- Sorter:负责对 Puller 输入的乱序数据进行排序,并且会把 Sink 来不及生产的数据进行落盘,起到一个蓄水池的作用。
- Mounter:依据事务提交时的表构造信息解析和填充行变更,将行变更转化为 TiCDC 能间接解决的数据结构。在这里,Mounter 须要和一个叫做 SchemaStorage 的组件进行交互,这个组件在 TiCDC 外部保护了所需表的 Schema 信息,后续会有内容对这其进行解说。
- Sink:将 Mounter 解决过后的数据进行编解码,转化为 SQL 语句或者 Kafka 音讯发送到对应上游。
这种模块化的设计形式,比拟有利于代码的保护和重构。值得一提的是,如果你对 TiCDC 有趣味,心愿可能让它接入到以后 CDC 还不反对的上游零碎,那么只有本人编码实现一个对应的 Sink 接口,就能够达到目标。
接下来,咱们以一个具体例子的形式来解说数据在 TiCDC 外部的流转。假如咱们当初建设如下表构造:
CREATE TABLE TEST(NAME VARCHAR (20) NOT NULL,
AGE INT NOT NULL,
PRIMARY KEY (NAME)
);
+-------+-------------+------+------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+-------------+------+------+---------+-------+
| NAME | varchar(20) | NO | PRI | NULL | |
| AGE | int(11) | NO | | NULL | |
+-------+-------------+------+------+---------+-------+
此时,在上游 TiDB 执行以下 DML:
INSERT INTO TEST (NAME,AGE)
VALUES ('Jack',20);
UPDATE TEST
SET AGE = 25
WHERE NAME = 'Jack';
上面咱们就来看一看这两条 DML 会通过什么样的模式通过 TablePipeline,最初写入上游。
Puller 拉取数据
上文中提到 Puller 负责与 TiKV CDC 组件建设 gPRC 连贯而后拉取数据,这是 /pipeline/puller.go 中的 Puller 大抵的工作逻辑:
n.plr = puller.New(... n.startTs, n.tableSpan(),n.tableID,n.tableName ...)
n.wg.Go(func() error {ctx.Throw(errors.Trace(n.plr.Run(ctxC)))
...
})
n.wg.Go(func() error {
for {
select {case <-ctxC.Done():
return nil
case rawKV := <-n.plr.Output():
if rawKV == nil {continue}
pEvent := model.NewPolymorphicEvent(rawKV)
sorter.handleRawEvent(ctx, pEvent)
}
}
})
以上是通过简化的代码,能够看到在 puller.New
办法中,有两个比拟重要的参数 startTs
和 tableSpan()
,它们别离从工夫和空间这两个维度上形容了咱们想要拉取的数据范畴。在 Puller 被创立进去之后,上面局部的代码别离启动了两个 goroutine,第一个负责运行 Puller 的外部逻辑,第二个则是期待 Puller 输入数据,而后把数据发给 Sorter。从 plr.Output()
中吐出来的数据长这个样子:
// RawKVEntry notify the KV operator
type RawKVEntry struct {
OpType OpType `msg:"op_type"`
Key []byte `msg:"key"`
// nil for delete type
Value []byte `msg:"value"`
// nil for insert type
OldValue []byte `msg:"old_value"`
StartTs uint64 `msg:"start_ts"`
// Commit or resolved TS
CRTs uint64 `msg:"crts"`
...
}
所以,在上游 TiDB 写入的那两条 DML 语句,在达到 Puller 的时候会是这样这样的一个数据结构
咱们能够看到 Insert 语句扫描出的数据只有 value 没有 old_value,而 Update 语句则被转化为一条既有 value 又有 old_value 的行变更数据。
这样这两条数据就胜利的被 Puller 拉取到了 TiCDC,然而因为 TiDB 中一张表的数据会被扩散到多个 Region 上,所以 Puller 会与多个 TiKV Region Leader 节点建设连贯,而后拉取数据。那实际上 TiCDC 拉取到的变更数据可能是乱序的,咱们须要对拉取到的所有数据进行排序能力正确的将事务依照程序同步到上游。
Sorter 排序
TablePipeline 中的 Sorter 只是一个领有 Sorter 名字的中转站,实际上负责对数据进行排序的是它背地的 Sorter Engine,Sorter Engine 的生命周期是和 Capture 统一的,一个 Capture 节点上的所有 Processor 会共享一个 Sorter Engine。想要理解它是怎么工作的,能够浏览 EventSorter 接口和其具体实现的相干代码。
在这里,咱们只须要晓得数据进入 TablePipeline 中的 Sorter 后会被排序即可。假如咱们当初除了上述的两条数据之外,在该表上又进行了其余的写入操作,并且该操作的数据在另外一个 Region。最终 Puller 拉到的数据如下:
除了数据之外,咱们还能够看到 Resolved
的事件,这是一个在 TiCDC 零碎中很重要的工夫标记。当 TiCDC 收到 Resolved
时, 能够认为小于等于这个工夫点提交的数据都曾经被接管了,并且当前不会再有早于这个工夫点的数据再发送下来,此时 TiCDC 能够此为界线来将收到的数据同步至上游。
此外,咱们能够看到拉取到的数据并不是依照 commit_ts 严格排序的,Sorter 会依据 commit_ts 将它们进行排序,最终失去如下的数据:
当初排好程序的事件就能够往上游同步了,然而在这之前咱们须要先对数据做一些转换,因为此时的数据是从 TiKV 中扫描出的 key-value,它们实际上只是一堆 bytes 数据,而不是上游想要生产的音讯格局。
Mounter 解析
以上的 Event 数据从 Sorter 进去之后,Mounter 会依据其对应的表的 Schema 信息将它还原成依照表构造组织的数据。
type RowChangedEvent struct {
StartTs uint64
CommitTs uint64
Table *TableName
ColInfos []rowcodec.ColInfo
Columns []*Column
PreColumns []*Column
IndexColumns [][]int
...
}
能够看到,该构造体中还原出了所有的表和列信息,并且 Columns 和 PreColumns 就对应于 value 和 old_value。当 TiCDC 拿到这些信息之后咱们就能够将数据持续下发至 Sink 组件,让其依据表信息和行变更数据去写上游数据库或者生产 Kafka 音讯。值得注意的是,Mounter 进行的是一项 CPU 密集型工作,当一个表中所蕴含的字段较多时,Mounter 会耗费大量的计算资源。
Sink 下发数据
当 RowChangedEvent
被下发至 Sink 组件时,它身上曾经蕴含了充沛的信息,咱们能够将其转化为 SQL 或者特定音讯格局的 Kafka 音讯。在上文的架构图中咱们能够看到有两种 Sink,一种是接入在 Table Pipeline 中的 TableSink,另外一种是 Processor 级别共用的 ProcessorSink。它们在零碎中有不同的作用:
- TableSink 作为一种 Table 级别的治理单位,缓存着要下发到 ProcessorSink 的数据,它的次要作用是不便 TiCDC 依照表为单位治理资源和进行调度
- ProcessorSink 作为实在要与数据库或者 Kafka 建设连贯的 Sink 负责 SQL/Kafka 音讯的转换和同步
咱们再来看一看 ProcessorSink 到底如何转换这些行变更:
- 如果上游是数据库,ProcessorSink 会依据
RowChangedEvent
中的 Columns 和 PreColumns 来判断它到底是一个Insert
、Update
还是Delete
操作,而后依据不同的操作类型,将其转化为 SQL 语句,而后再将其通过数据库连贯写入上游:
/*
因为只有 Columns 所以是 Insert 语句。*/
INSERT INTO TEST (NAME,AGE)
VALUES ('Jack',20);
/*
因为既有 Columns 且有 PreColumns 所以是 Update 语句。*/
UPDATE TEST
SET AGE = 25
WHERE NAME = 'Jack';
- 如果上游是 Kafka, ProcessorSink 会作为一个 Kafka Producer 依照特定的音讯格局将数据发送至 Kafka。以 Canal-JSON 为例,咱们上述的 Insert 语句最终会以如下的 JSON 格局写入 Kafka:
{
"id": 0,
"database": "test",
"table": "TEST",
"pkNames": ["NAME"],
"isDdl": false,
"type": "INSERT",
...
"ts": 2,
"sql": "",
...
"data": [
{
"NAME": "Jack",
"AGE": "25"
}
],
"old": null
}
这样,上游 TiDB 执行的 DML 就胜利的被发送到上游零碎了。
结尾
以上就是本文的全部内容。心愿在浏览完下面的内容之后,读者可能对 TiCDC 是什么?为什么?怎么实现?这几个问题有一个根本的答案。