乐趣区

关于tidb:TiCDC-源码阅读二TiKV-CDC-模块介绍

内容概要

TiCDC 是一款 TiDB 增量数据同步工具,通过拉取上游 TiKV 的数据变更日志,TiCDC 能够将数据解析为有序的行级变更数据输入到上游。

本文是 TiCDC 源码解读的第二篇,将于大家介绍 TiCDC 的重要组成部分,TiKV 中的 CDC 模块。咱们会围绕 4 个问题和 2 个指标开展。

  1. TiKV 中的 CDC 模块是什么?
  2. TiKV 如何输入数据变更事件流?
  3. 数据变更事件有哪些?
  4. 如何确保残缺地捕获分布式事务的数据变更事件?

心愿在答复完这 4 个问题之后,大家能:

  • 🔔 理解数据从 TiDB 写入到 TiKV CDC 模块输入的流程。
  • 🗝️ 理解如何残缺地捕获分布式事务的数据变更事件。

在上面的内容中,咱们在和这两个指标相干的中央会标记上 🔔 和 🗝️,以便揭示读者注意本人感兴趣的中央。

TiKV 中的 CDC 模块是什么?

CDC 模块的状态

从代码上看,CDC 模块是 TiKV 源码的一部分,它是用 rust 写的,在 TiKV 代码库外面;从运行时上看,CDC 模块运行在 TiKV 过程中,是一个线程,专门解决 TiCDC 的申请和数据变更的捕获。

CDC 模块的作用

CDC 模块的作用有两个:

  1. 它负责捕获实时写入和读取历史数据变更。这里提一下历史数据变更指曾经写到 rocksdb 外面的变更。
  2. 它还负责计算 resolved ts。这个 resolved ts 是 CDC 模块外面特有的概念,模式上是一个 uint64 的 timestamp。它是 TiKV 事务变更流中的 perfect watermark,perfect watermark 的具体概念参考《Streaming System》的第三章,咱们能够用 resolved ts 来告知上游,也就是 TiCDC,在 tikv 上所有 commit ts 小于 resolved ts 事务都曾经残缺发送了,上游 TiCDC 能够残缺地解决这批事务了。

CDC 模块的代码散布

CDC 模块的代码在 TiKV 代码仓库的 compoenetns/cdccomponents/resolved_ts 模块。咱们在下图中的黑框外面用红色标注了几个重点文件。

delegate.rs 文件中有个同名的 Delegate 构造体,它能够认为是 Region 在 CDC 模块中的“委派”,负责解决这个 region 的变更数据,包含实时的 raft 写入和历史增量数据。

endpoint.rs 文件中有个 Endpoint 构造体,它运行在 CDC 的主线程中,驱动整个 CDC 模块,下面的 delegate 也是运行在整个线程中的。

initializer.rs 文件中的 Initializer 构造体负责增量扫逻辑,同时也负责 delegate 的初始化,这里的增量扫就是读取保留在 rocksdb 中的历史数据变更。

service.rs 文件中的 Service 构造体,它实现了 ChagneData gRPC 服务,运行在 gRPC 的线程中,它负责 TiKV 和 TiCDC 的 RPC 交互,同时它和 Endpoint 中的 DelegateInitializer 也会有交互,次要是承受来自它俩的数据变更事件,而后把这些事件通过 RPC 发送给 TiCDC。

最初一个重要文件是 resolver.rs,它与下面的文件不太一样,在 resolve_ts 这个 component 中,外面的 Resolver 负责计算 resolved ts。

TiKV 如何输入数据变更事件流?

咱们从端到端的角度残缺地走一遍数据的写入和流出。下图概括了数据的流动,咱们以数据保留到磁盘为界,红色箭头代表数据从 TiDB 写入 TiKV 磁盘的方向,蓝色箭头代表数据从 TiKV 磁盘流出到 TiCDC 的方向。

TiDB -> TiKV Service

  • txn prewrite: Tikv::kv_prewrite(PrewriteRequest)
  • txn commit: Tikv::kv_commit(CommitRequest)

咱们看下从 TiDB 指向 TiKV 的红线。咱们晓得数据来自 TiDB 的事务写入,对于一个失常的事务来说,TiDB 须要分两次调用 TiKV 的 gRPC 接口,别离是 kv_prewrite 和 kv_commit,对应了事务中的 prewrite 和 commit,在 request 申请中蕴含了要写入或者删除的 key 和它的 value,以及一些事务的元数据,比方 start ts,commit ts 等。

TiKV Service -> Txn

  • txn prewrite: Storage::sched_prewrite(PrewriteRequest)
  • txn commit: Storage::sched_commit(CommitRequest)

咱们再看从 gRPC 指向 Txn 的红线。它代表 RPC 申请从 gRPC 模块流到事务模块的这一步。这里相应的也有两个 API 的调用,别离是 sched_prewritesched_commit,在这两个 API 中,事务模块会对 request 做一些查看,比方查看 write conflict,计算 commit ts 等(事务的细节能够参考 TiKV 的源码阅读文章,在这里就先跳过了。)

Txn -> Raftstore

  • txn prewrite: Engine::async_write_ext(RaftCmdRequest)
  • txn commit: Engine::async_write_ext(RaftCmdRequest)

事务模块到 Raftstore 的红线代表:Request 通过查看后,会被事务模块序列化成对 KV 的操作,而后被组装成 RaftCmdRequestRaftCmdRequest 再经由 Engine::async_commit_ext API 被发送至 Raftstore 模块。

大家能够看到 prewrite 和 commit 都是变成了 RaftCmdRequest,也都是通过 Engine::async_commit_ext 发送到 Raftstore 模块的。这阐明了什么呢?它阐明了到 Engine 这一层,TiDB 的申请中的事务信息曾经被“抹去”了,所有的事务信息都存到了 key 和 value 外面。

Raftstore 模块会将这些 key value 提交到 Raft Log 中,如果 Raft Log Commit 胜利,Apply 线程会将这些 key 和 value 写入到 Rocksdb。(这外面的细节能够参考 TiKV 的源码阅读文章,在这里就先跳过了。)

Rafstore -> CDC

  • RaftCmd: CoprocessorHost::on_flush_applied_cmd_batch(Vec<RaftCmdRequest>)
  • Txn Record: Engine::async_snapshot()

从这里起,数据开始流出了,从 Raftstore 到 CDC 模块有两条蓝线,对应这里的两个重要的 API,别离为 on_flush_applied_cmd_batch 实时数据的流出,和 async_snapshot 历史增量数据的流出(前面会说细节)。

CDC -> gRPC -> TiCDC

  • ChangeDataEvent: Service::event_feed() -> ChangeDataEvent

最初就是从 CDC 模块到 TiCDC 这几条蓝线了。数据进入 CDC 模块后,通过一系列转换,组装成 Protobuf message,最初交给 gRPC 通过 ChangeData service 中的 EventFeed 这个 RPC 发送到上游的 TiCDC。

CDC 模块中的数据流动

上图示意了数据从 Raftstore 发送到 TiCDC 模块的细节。

数据从 Raftstore 到 CDC 模块,能够分成两个阶段,对应两条链路:

  • 阶段 1,增量扫,Initializer -> Delegate。

    Initializer 从 Raftstore 拿一个 Snapshot,而后在 Snapshot 上读一些历史数据变更,读的范畴有两个维度:

    1. 工夫维度 (checkpoint ts, current ts],checkpoint ts 能够了解成 changefeed 上的 checkpoint,current ts 代表 PD leader 上的以后工夫。
    2. key 范畴 [start key, end key),个别为 region 的 start key 和 end key。
  • 阶段 2,实时写入监听,CdcObserver -> Delegate

    CdcObserver 实现对实时写入的监听。它运行在 Raftstore 的 Apply 线程中,只有在 TiCDC 对一个 Region 发动监听后才会启动运行。咱们晓得所有的数据都是通过 Apply 线程写入的,所以说 CdcObserver 能轻松地在第一工夫把数据捕捉到,而后交给 Delegate

咱们再看一下数据从 CDC 模块到 gRPC 的流程,大体也有两局部。第一局部是汇总增量扫和实时写入;第二局部将这些数据是从 KV 数据反序列化成蕴含事务信息的 Protobuf message。咱们再将这些事务构造体外面的信息给提取进去,填到一个 Protobuf message 外面。

Raftstore 和 TiCDC 的交互

上图是 Raftstore 和 CDC 模块的交互时序图。第一条线是 TiCDC,第二条是 CDC 线程,第三条是 Raftsotre 线程,第四条是 Apply 线程,图中每个点都是产生在线程上的一些事件,蕴含发消息、收音讯和过程外部的解决逻辑。在这里咱们重点说 Apply 线程。

Apply 线程在解决 Change 这个音讯的时候,它会先要把缓存在内存中的 KV 的写入给刷到 RocksDB,而后获取 RocksDB 的 Snapshot,把 Snapshot 发送给 CDC 线程。这三步是串行的,保障了 Snapshot 能够看到之前所有的写入。有了这个机制保障,咱们就能够确保 CDC 模块既不漏数据,也不多数据。

数据变更事件有哪些?

数据变更事件可分为两大类,第一类是 Event;第二类是 ResolvedTs。上图是 CDC Protobuf 的简化版定义,只保留了要害的 field。咱们从上到下看下这个 Protobuf 定义。

EventFeed 定义了 TiCDC 和 TiKV 之间的音讯交互方式,TiCDC 在一个 RPC 上能够发动对多个 Region 的监听,TiKV 以 ChangeDataEvent 模式将多个 Region 的数据变更事件发送给 TiCDC。

Event 代表着是 Region 级别的数据变更事件,蕴含了至多一个用户数据变更事件或者或者 Region 元数据的变更。它们是从单条 Raft Log 翻译失去的。咱们能够留神到 Eventrepeat 润饰了,也就是它可能蕴含了一个 region 多个数据变更,也可能蕴含多个不同 region 的数据变更。

Entries 蕴含了多个 Row。因为在 oneof 外面不能呈现 repeated,所以咱们用 Entries 包装了下。

Row 外面的内容十分靠近 TiDB 层面的数据了,它是行级别的数据变更,蕴含:

  1. 事务的 start ts;
  2. 事务的 commit ts;
  3. 事务写入的类型,Prewrite/Commit/Rollback;
  4. 事务对数据的操作,op_type,put 笼罩写一行和 delete 删除一行;
  5. 事务写入的 key;
  6. 事务写入的 value;
  7. 该事务之前的 value,old value 在很多 CDC 协定上都会有体现,比如说 MySQL 的 maxwell 协定中的“old”字段。

如何确保残缺地捕获分布式事务的数据变更事件?

什么是“残缺”?

咱们须要定义残缺是什么。在这里,“残缺”的主体是 TiDB 中的事务,咱们晓得 TiDB 的事务会有两个写入事件,第一个是 prewrite,第二是 commit 或者 rollback。同时,TiDB 事务可能会波及多个 key,这些有可能散布在不同的 region 上。所以,咱们说“残缺”地捕获一个事务须要捕获它波及的 所有的 key 所有的写入事件

上图描述了一个波及了三个 key 的事务,P 代表事务的 prewrite,C 代表事务的 commit,虚线代表一次捕获。

后面两条虚线是不“残缺”的捕获,第一条虚线漏了所有 key 的 commit 事件,第二条虚线捕捉到了 k1 和 k2 的 prewrite 和 commit,但漏了 k3 的 commit。如果咱们强行认为第二条虚线是“残缺”的,则会毁坏事务的原子性。

最初一条虚线才是“残缺”的捕获,因为它捕捉到了所有 key 的所有写入。

如何确认曾经“残缺”?

确认“残缺”的办法有很多种,最简略的方法就是 – 等。一般来说,只有咱们等的工夫足够长,比方等一轮 GC lifetime,咱们也能确认残缺。然而这个方法会导致 TiCDC 的 RPO 不达标。

上图最初两条虚线是两次“残缺”的捕获,如果第四条线十年之后才产生的,显然它对咱们来说是没有意义的。第四条尽管是“残缺”的,然而不是咱们想要的。所以咱们须要一种机制可能尽快地告知咱们曾经捕获残缺了,也就是图中第三条虚线,在工夫上要尽可能地凑近最初一个变更的捕获。那这个机制的话就是后面提到的 resolved ts。

ResolvedTs 事件及性质

ResolvedTs 在 Protobuf 中的定义比较简单,一个 Region ID 数组和一个 resolved ts。它记录了 一批 Region 中 最小的 resolved ts,会混在数据变更事件流中发送给 TiCDC。从 resolved ts 事件生成的时候开始,TiDB 集群就不会产生 commit ts 小于 resolved ts 的事务了。从而 TiCDC 收到这个事件之后,便能确认这些 Region 上的数据变更事件的完整性了。

resolved ts 的计算

Resolved ts 的计算逻辑在 resolver.rs 文件中,能够用简略三行伪代码示意:

  • 第一行,它要从 PD 那边取一个 TS,称它为 min_ts
  • 第二行,咱们拿 min_ts 和 Region 中的所有 lock 的 start ts 做比拟,取最小值,咱们称它为 new_resolved_ts
  • 咱们拿 new_resolved_ts 和之前的 resolved_ts 做比拟,取最大值,这就是以后时刻的 resolved ts。因为它小于所有 lock 的 start ts,所有它肯定小于这些 lock 的将来的 commit ts。同时,在没有 lcok 的时候,min_ts 会变成 resolved ts,也是就以后时刻 PD 上最新的 ts 将会变成 resolved ts,这确保了它有足够的实时性。

数据变更事件流的例子

上图是一个数据变更事件流的例子,也就是 gRPC EventFeed 中的 stream ChangeDataEvent

例子中有三个事务和三个 resolved ts 事件:

  • 第一个事务波及了 k1 和 k2,它的 start ts 是 1,commit ts 是 2。
  • 第二个事务只蕴含了 k1 这一个 key,它的 start ts 是 3,commit ts 是 6,留神,这个事务在事件流中呈现了乱序,它的 commit 先于 prewrite 呈现在这条流中。
  • 第三个事务蕴含了 k2 的一个事务,留神它只有一个 prewrite 事件,commit 事件还没产生,是一个正在进行中的一个事务。
  • 第一个 resolved ts 事件中的 resolved ts 是 2,代表 commit ts 小于等于 2 的事务曾经残缺发送,在这个例子中能够把第一个事务平安的还原进去。
  • 第二个 resolved ts 事件中的 resolved ts 是 4,这时 k1 的 commit 事件曾经发送了,然而 prewrite 事件没有,4 就阻止了还原第二个事务。
  • 第三个 resolved ts 事件呈现后,咱们就能够还原第二个事务了。

结尾

以上就是本文的全部内容。心愿在浏览下面的内容后,读者能晓得文章结尾的四个问题和理解:

  • 🔔数据从 TiDB 写入到 TiKV CDC 模块输入的流程
  • 🗝️理解如何残缺地捕获分布式事务的数据变更事件
退出移动版