共计 6262 个字符,预计需要花费 16 分钟才能阅读完成。
内容概要
TiCDC 是一款 TiDB 增量数据同步工具,通过拉取上游 TiKV 的数据变更日志,TiCDC 能够将数据解析为有序的行级变更数据输入到上游。
本文是 TiCDC 源码解读的第二篇,将于大家介绍 TiCDC 的重要组成部分,TiKV 中的 CDC 模块。咱们会围绕 4 个问题和 2 个指标开展。
- TiKV 中的 CDC 模块是什么?
- TiKV 如何输入数据变更事件流?
- 数据变更事件有哪些?
- 如何确保残缺地捕获分布式事务的数据变更事件?
心愿在答复完这 4 个问题之后,大家能:
- 🔔 理解数据从 TiDB 写入到 TiKV CDC 模块输入的流程。
- 🗝️ 理解如何残缺地捕获分布式事务的数据变更事件。
在上面的内容中,咱们在和这两个指标相干的中央会标记上 🔔 和 🗝️,以便揭示读者注意本人感兴趣的中央。
TiKV 中的 CDC 模块是什么?
CDC 模块的状态
从代码上看,CDC 模块是 TiKV 源码的一部分,它是用 rust 写的,在 TiKV 代码库外面;从运行时上看,CDC 模块运行在 TiKV 过程中,是一个线程,专门解决 TiCDC 的申请和数据变更的捕获。
CDC 模块的作用
CDC 模块的作用有两个:
- 它负责捕获实时写入和读取历史数据变更。这里提一下历史数据变更指曾经写到 rocksdb 外面的变更。
- 它还负责计算 resolved ts。这个 resolved ts 是 CDC 模块外面特有的概念,模式上是一个 uint64 的 timestamp。它是 TiKV 事务变更流中的 perfect watermark,perfect watermark 的具体概念参考《Streaming System》的第三章,咱们能够用 resolved ts 来告知上游,也就是 TiCDC,在 tikv 上所有 commit ts 小于 resolved ts 事务都曾经残缺发送了,上游 TiCDC 能够残缺地解决这批事务了。
CDC 模块的代码散布
CDC 模块的代码在 TiKV 代码仓库的 compoenetns/cdc
和 components/resolved_ts
模块。咱们在下图中的黑框外面用红色标注了几个重点文件。
在 delegate.rs
文件中有个同名的 Delegate
构造体,它能够认为是 Region 在 CDC 模块中的“委派”,负责解决这个 region 的变更数据,包含实时的 raft 写入和历史增量数据。
在 endpoint.rs
文件中有个 Endpoint
构造体,它运行在 CDC 的主线程中,驱动整个 CDC 模块,下面的 delegate 也是运行在整个线程中的。
initializer.rs
文件中的 Initializer
构造体负责增量扫逻辑,同时也负责 delegate 的初始化,这里的增量扫就是读取保留在 rocksdb 中的历史数据变更。
service.rs
文件中的 Service
构造体,它实现了 ChagneData gRPC 服务,运行在 gRPC 的线程中,它负责 TiKV 和 TiCDC 的 RPC 交互,同时它和 Endpoint
中的 Delegate
和 Initializer
也会有交互,次要是承受来自它俩的数据变更事件,而后把这些事件通过 RPC 发送给 TiCDC。
最初一个重要文件是 resolver.rs
,它与下面的文件不太一样,在 resolve_ts 这个 component 中,外面的 Resolver
负责计算 resolved ts。
TiKV 如何输入数据变更事件流?
咱们从端到端的角度残缺地走一遍数据的写入和流出。下图概括了数据的流动,咱们以数据保留到磁盘为界,红色箭头代表数据从 TiDB 写入 TiKV 磁盘的方向,蓝色箭头代表数据从 TiKV 磁盘流出到 TiCDC 的方向。
TiDB -> TiKV Service
- txn prewrite: Tikv::kv_prewrite(PrewriteRequest)
- txn commit: Tikv::kv_commit(CommitRequest)
咱们看下从 TiDB 指向 TiKV 的红线。咱们晓得数据来自 TiDB 的事务写入,对于一个失常的事务来说,TiDB 须要分两次调用 TiKV 的 gRPC 接口,别离是 kv_prewrite 和 kv_commit,对应了事务中的 prewrite 和 commit,在 request 申请中蕴含了要写入或者删除的 key 和它的 value,以及一些事务的元数据,比方 start ts,commit ts 等。
TiKV Service -> Txn
- txn prewrite: Storage::sched_prewrite(PrewriteRequest)
- txn commit: Storage::sched_commit(CommitRequest)
咱们再看从 gRPC 指向 Txn 的红线。它代表 RPC 申请从 gRPC 模块流到事务模块的这一步。这里相应的也有两个 API 的调用,别离是 sched_prewrite
和 sched_commit
,在这两个 API 中,事务模块会对 request 做一些查看,比方查看 write conflict,计算 commit ts 等(事务的细节能够参考 TiKV 的源码阅读文章,在这里就先跳过了。)
Txn -> Raftstore
- txn prewrite: Engine::async_write_ext(RaftCmdRequest)
- txn commit: Engine::async_write_ext(RaftCmdRequest)
事务模块到 Raftstore 的红线代表:Request 通过查看后,会被事务模块序列化成对 KV 的操作,而后被组装成 RaftCmdRequest
。RaftCmdRequest
再经由 Engine::async_commit_ext API
被发送至 Raftstore 模块。
大家能够看到 prewrite 和 commit 都是变成了 RaftCmdRequest
,也都是通过 Engine::async_commit_ext
发送到 Raftstore 模块的。这阐明了什么呢?它阐明了到 Engine 这一层,TiDB 的申请中的事务信息曾经被“抹去”了,所有的事务信息都存到了 key 和 value 外面。
Raftstore 模块会将这些 key value 提交到 Raft Log 中,如果 Raft Log Commit 胜利,Apply 线程会将这些 key 和 value 写入到 Rocksdb。(这外面的细节能够参考 TiKV 的源码阅读文章,在这里就先跳过了。)
Rafstore -> CDC
- RaftCmd: CoprocessorHost::on_flush_applied_cmd_batch(Vec<RaftCmdRequest>)
- Txn Record: Engine::async_snapshot()
从这里起,数据开始流出了,从 Raftstore 到 CDC 模块有两条蓝线,对应这里的两个重要的 API,别离为 on_flush_applied_cmd_batch
实时数据的流出,和 async_snapshot
历史增量数据的流出(前面会说细节)。
CDC -> gRPC -> TiCDC
- ChangeDataEvent: Service::event_feed() -> ChangeDataEvent
最初就是从 CDC 模块到 TiCDC 这几条蓝线了。数据进入 CDC 模块后,通过一系列转换,组装成 Protobuf message,最初交给 gRPC 通过 ChangeData service 中的 EventFeed
这个 RPC 发送到上游的 TiCDC。
CDC 模块中的数据流动
上图示意了数据从 Raftstore 发送到 TiCDC 模块的细节。
数据从 Raftstore 到 CDC 模块,能够分成两个阶段,对应两条链路:
阶段 1,增量扫,Initializer -> Delegate。
Initializer 从 Raftstore 拿一个 Snapshot,而后在 Snapshot 上读一些历史数据变更,读的范畴有两个维度:
- 工夫维度
(checkpoint ts, current ts]
,checkpoint ts 能够了解成 changefeed 上的 checkpoint,current ts 代表 PD leader 上的以后工夫。 - key 范畴
[start key, end key)
,个别为 region 的 start key 和 end key。
- 工夫维度
阶段 2,实时写入监听,CdcObserver -> Delegate
CdcObserver
实现对实时写入的监听。它运行在 Raftstore 的 Apply 线程中,只有在 TiCDC 对一个 Region 发动监听后才会启动运行。咱们晓得所有的数据都是通过 Apply 线程写入的,所以说CdcObserver
能轻松地在第一工夫把数据捕捉到,而后交给Delegate
。
咱们再看一下数据从 CDC 模块到 gRPC 的流程,大体也有两局部。第一局部是汇总增量扫和实时写入;第二局部将这些数据是从 KV 数据反序列化成蕴含事务信息的 Protobuf message。咱们再将这些事务构造体外面的信息给提取进去,填到一个 Protobuf message 外面。
Raftstore 和 TiCDC 的交互
上图是 Raftstore 和 CDC 模块的交互时序图。第一条线是 TiCDC,第二条是 CDC 线程,第三条是 Raftsotre 线程,第四条是 Apply 线程,图中每个点都是产生在线程上的一些事件,蕴含发消息、收音讯和过程外部的解决逻辑。在这里咱们重点说 Apply 线程。
Apply 线程在解决 Change 这个音讯的时候,它会先要把缓存在内存中的 KV 的写入给刷到 RocksDB,而后获取 RocksDB 的 Snapshot,把 Snapshot 发送给 CDC 线程。这三步是串行的,保障了 Snapshot 能够看到之前所有的写入。有了这个机制保障,咱们就能够确保 CDC 模块既不漏数据,也不多数据。
数据变更事件有哪些?
数据变更事件可分为两大类,第一类是 Event;第二类是 ResolvedTs。上图是 CDC Protobuf 的简化版定义,只保留了要害的 field。咱们从上到下看下这个 Protobuf 定义。
EventFeed
定义了 TiCDC 和 TiKV 之间的音讯交互方式,TiCDC 在一个 RPC 上能够发动对多个 Region 的监听,TiKV 以 ChangeDataEvent
模式将多个 Region 的数据变更事件发送给 TiCDC。
Event
代表着是 Region 级别的数据变更事件,蕴含了至多一个用户数据变更事件或者或者 Region 元数据的变更。它们是从单条 Raft Log 翻译失去的。咱们能够留神到 Event
被 repeat
润饰了,也就是它可能蕴含了一个 region 多个数据变更,也可能蕴含多个不同 region 的数据变更。
Entries
蕴含了多个 Row
。因为在 oneof
外面不能呈现 repeated
,所以咱们用 Entries
包装了下。
Row
外面的内容十分靠近 TiDB 层面的数据了,它是行级别的数据变更,蕴含:
- 事务的 start ts;
- 事务的 commit ts;
- 事务写入的类型,Prewrite/Commit/Rollback;
- 事务对数据的操作,
op_type
,put 笼罩写一行和 delete 删除一行; - 事务写入的 key;
- 事务写入的 value;
- 该事务之前的 value,old value 在很多 CDC 协定上都会有体现,比如说 MySQL 的 maxwell 协定中的“old”字段。
如何确保残缺地捕获分布式事务的数据变更事件?
什么是“残缺”?
咱们须要定义残缺是什么。在这里,“残缺”的主体是 TiDB 中的事务,咱们晓得 TiDB 的事务会有两个写入事件,第一个是 prewrite,第二是 commit 或者 rollback。同时,TiDB 事务可能会波及多个 key,这些有可能散布在不同的 region 上。所以,咱们说“残缺”地捕获一个事务须要捕获它波及的 所有的 key 和 所有的写入事件。
上图描述了一个波及了三个 key 的事务,P 代表事务的 prewrite,C 代表事务的 commit,虚线代表一次捕获。
后面两条虚线是不“残缺”的捕获,第一条虚线漏了所有 key 的 commit 事件,第二条虚线捕捉到了 k1 和 k2 的 prewrite 和 commit,但漏了 k3 的 commit。如果咱们强行认为第二条虚线是“残缺”的,则会毁坏事务的原子性。
最初一条虚线才是“残缺”的捕获,因为它捕捉到了所有 key 的所有写入。
如何确认曾经“残缺”?
确认“残缺”的办法有很多种,最简略的方法就是 – 等。一般来说,只有咱们等的工夫足够长,比方等一轮 GC lifetime,咱们也能确认残缺。然而这个方法会导致 TiCDC 的 RPO 不达标。
上图最初两条虚线是两次“残缺”的捕获,如果第四条线十年之后才产生的,显然它对咱们来说是没有意义的。第四条尽管是“残缺”的,然而不是咱们想要的。所以咱们须要一种机制可能尽快地告知咱们曾经捕获残缺了,也就是图中第三条虚线,在工夫上要尽可能地凑近最初一个变更的捕获。那这个机制的话就是后面提到的 resolved ts。
ResolvedTs 事件及性质
ResolvedTs 在 Protobuf 中的定义比较简单,一个 Region ID 数组和一个 resolved ts。它记录了 一批 Region 中 最小的 resolved ts,会混在数据变更事件流中发送给 TiCDC。从 resolved ts 事件生成的时候开始,TiDB 集群就不会产生 commit ts 小于 resolved ts 的事务了。从而 TiCDC 收到这个事件之后,便能确认这些 Region 上的数据变更事件的完整性了。
resolved ts 的计算
Resolved ts 的计算逻辑在 resolver.rs 文件中,能够用简略三行伪代码示意:
- 第一行,它要从 PD 那边取一个 TS,称它为
min_ts
。 - 第二行,咱们拿
min_ts
和 Region 中的所有 lock 的 start ts 做比拟,取最小值,咱们称它为new_resolved_ts
。 - 咱们拿
new_resolved_ts
和之前的resolved_ts
做比拟,取最大值,这就是以后时刻的 resolved ts。因为它小于所有 lock 的 start ts,所有它肯定小于这些 lock 的将来的 commit ts。同时,在没有 lcok 的时候,min_ts
会变成 resolved ts,也是就以后时刻 PD 上最新的 ts 将会变成 resolved ts,这确保了它有足够的实时性。
数据变更事件流的例子
上图是一个数据变更事件流的例子,也就是 gRPC EventFeed 中的 stream ChangeDataEvent
。
例子中有三个事务和三个 resolved ts 事件:
- 第一个事务波及了 k1 和 k2,它的 start ts 是 1,commit ts 是 2。
- 第二个事务只蕴含了 k1 这一个 key,它的 start ts 是 3,commit ts 是 6,留神,这个事务在事件流中呈现了乱序,它的 commit 先于 prewrite 呈现在这条流中。
- 第三个事务蕴含了 k2 的一个事务,留神它只有一个 prewrite 事件,commit 事件还没产生,是一个正在进行中的一个事务。
- 第一个 resolved ts 事件中的 resolved ts 是 2,代表 commit ts 小于等于 2 的事务曾经残缺发送,在这个例子中能够把第一个事务平安的还原进去。
- 第二个 resolved ts 事件中的 resolved ts 是 4,这时 k1 的 commit 事件曾经发送了,然而 prewrite 事件没有,4 就阻止了还原第二个事务。
- 第三个 resolved ts 事件呈现后,咱们就能够还原第二个事务了。
结尾
以上就是本文的全部内容。心愿在浏览下面的内容后,读者能晓得文章结尾的四个问题和理解:
- 🔔数据从 TiDB 写入到 TiKV CDC 模块输入的流程
- 🗝️理解如何残缺地捕获分布式事务的数据变更事件