作者: 千浪 @阿里云研发工程师
MongoDB CDC 概述
MongoDB 是当下风行的一个基于文档的非关系性数据库。MongoDB CDC [1] 是 Flink CDC 社区 [2] 提供的一个用于捕捉变更数据(Change Data Capturing)的 Flink 连接器,可连贯到 MongoDB 数据库和汇合,并捕捉其中的文档减少、更新、替换、删除等变更操作,生成规范的 Flink Changelog 事件流,反对通过 Flink SQL 或 DataStream API 进行数据加工,加工之后能够不便地写入到 Flink 所反对的各种上游零碎中。
MongoDB CDC 外围性能
全增量一体化读取
在理论的业务场景中,经常须要同时采集 MongoDB 数据库中的存量数据以及增量数据。MongoDB CDC 可能一体化地读取全量数据和增量数据。在启动选项配置为 initial 模式时,CDC 会首先对指标汇合进行扫描,并对现存的每一条数据各发送一条 Insert 记录;快照实现后,CDC 会主动转换为增量模式,开始捕捉连接器启动后到来的变更数据。期间反对在任意时刻的故障复原,且保障提供不丢不重的准确一次(Exactly-Once)语义。
反对多种生产模式
针对不同的场景需要,MongoDB CDC 能够设定为从以下模式中启动:
-
latest 模式
- 在此模式下,MongoDB CDC 不会解决启动前曾经存在的数据,只针对启动后到来的变更数据产生变更记录。这意味着连接器只能读取在连接器启动之后的数据更改。
-
initial 模式
- 在此模式下,MongoDB CDC 会先对全副存量数据进行快照,待快照实现后再开始捕捉变更数据。
-
timestamp 模式
- 在此模式下,MongoDB CDC 会捕捉给定的工夫戳之后产生的变更数据。工夫戳的取值必须在 MongoDB 的无效日志记录范畴内。
反对产生残缺变更事件流
MongoDB 6.0 之前的版本默认不会提供变更前文档及被删除文档的数据;利用这些信息只能实现 Upsert 语义(即缺失了 Update Before 数据条目)。但在 Flink 中许多有用的算子操作都依赖残缺的 Insert、Update Before、Update After、Delete 变更流。如果须要补充缺失的变更前事件,一个天然的思路是在 Flink 状态中缓存所有文档的以后版本快照;在遇到被更新或删除的文档时,查表即可得悉变更前的状态。然而在最坏的状况下,这种操作可能须要保留相当于 100% 原始数据量的记录。
目前,Flink SQL Planner 会主动为 Upsert 类型的 Source 生成一个 ChangelogNormalize 节点,并依照上述操作将其转换为残缺的变更流;代价则是该算子节点须要存储体积微小的 State 数据。
MongoDB 6.0 的 Pre- and Post-Image 新性能 [6] 提供了一个更高效的解决方案:只有启用 changeStreamPreAndPostImages 性能,MongoDB 就会在每次变更产生时,在一个非凡的汇合中记录文档变更前后的残缺状态。MongoDB CDC 反对读取这些记录并产生残缺事件流,从而打消了对 ChangelogNormalize 节点的依赖。
基于心跳的标记推动机制
目前版本的 CDC 实现须要全局惟一的回溯标记(Resume Token)来定位变更流的地位。然而 MongoDB 并不会有限地存储所有的日志,较早的变更记录可能会在保留工夫超限或日志大小超限时被革除。
对于变更频繁的汇合,革除记录并不会带来什么问题,因为每次获取最新的变更条目时都会一并刷新回溯标记,始终保障回溯标记的有效性。但对于一些变更十分迟缓的汇合,可能呈现“上一次变更十分长远,导致其对应的回溯标记曾经被革除了”的状况,这意味着无奈再从流中进行复原并读取下一次变更(因为回溯标记不存在而无奈定位)。
MongoDB 提供了解决这一问题的“心跳机制”选项,在流中没有变更数据时,也能够通过发送心跳包以刷新回溯标记。这样对于变更迟缓的汇合也能放弃其回溯标记更新,而不至于过期。能够通过 MongoDB CDC 的 heartbeat.interval.ms 选项来启用心跳机制。
MongoDB CDC 的设计方案
依据应用的技术办法不同,MongoDB CDC 的技术演进过程大抵分为三个阶段:最早的 CDC(如晚期的 Debezium MongoDB 等)基于查问 OpLog 日志汇合实现,次要面向 MongoDB 的晚期版本;第二阶段降级到了基于 MongoDB 3.6 提供的 Change Stream API 设计;在第三阶段,也就是最新的版本中,Flink CDC 社区实现了基于 FLIP-27 和增量快照算法的设计。
第一阶段:基于 OpLog 的设计方案
晚期的 MongoDB 没有为变动监测的需要设置特地的 API。但为了反对主—副节点分布式部署状况下的数据同步和故障复原,MongoDB 会将数据库中的所有文档操作记录写入一个非凡的零碎汇合 sys.oplog [3]。每条记录的格局如下:
{"ts": Timestamp(1625660877, 2),
"t": NumberLong(2),
"h": NumberLong("5521980394145765083"),
"v": 2,
"op": "i",
"ns": "test.users",
"ui": UUID("edabbd93-76eb-42be-b54a-cdc29eb1f267"),
"wall": ISODate("2021-07-07T12:27:57.689Z"),
"o": {"_id": ObjectId("60e59dcd46db1fb4605f8b18"),
"name": "1"
}
}
其中 ts 段用于记录操作产生的惟一工夫戳(第一位为 Unix epoch 工夫戳,第二位为这一秒内的版本号);ns 记录了操作的数据库和汇合、op 是进行的操作(例如 i 代表插入)、o 是被插入的文档。连接器只需继续查问 OpLog 汇合,即可获取工夫程序的最新数据,并产生对应的日志流。
须要注意的是,MongoDB 出于日志记录开销的考量,只会记录更新操作发生变化的字段;删除操作中只蕴含被删除文档的 _id 字段。因而这类基于 OpLog 的 CDC 实现须要额定的操作(例如,在 Update 后查问残缺文档信息)能力产生 Flink Upsert(包含 Insert、Update After、Delete)事件流。因为 OpLog 并不记录更新前及删除前的文档数据,这类 CDC 通常不能产生 Update Before 事件。
除此之外,MongoDB 数据库的每个分片都有本人的 OpLog 汇合,因而连接器须要同时和每个分片建设连贯并解决同步问题,实现较为繁琐。
第二阶段:基于 Change Stream API 设计方案
MongoDB 3.6 引入了新的 Change Stream(变更流)API [4],反对数据库、汇合层面的变更信息订阅,并提供了基于 Resume Token 的断点续传机制。例如,应用 db.<collection_name>.watch(),就能够订阅对应汇合的操作变更,返回的每条变更记录数据的格局如下:
{
// Resume Token
_id: {_data: '...'},
// 操作类型
operationType: 'insert',
// oplogs 中记录的工夫戳
clusterTime: Timestamp({t: 1686638232, i: 1}),
// 精度更高的工夫戳,MongoDB 6.0 后可用
wallTime: ISODate("2023-06-13T06:37:12.290Z"),
// 插入的残缺文档信息
fullDocument: {_id: ObjectId("64880e8a9f46de45aa2630a4"),
fieldKey: 'fieldValue'
},
// 更新的 database 和 collection
ns: {db: 'testdb', coll: 'testtable'},
// 插入文档的惟一 ID
// 分片汇合中还包含分片键
documentKey: {_id: ObjectId("64880e8a9f46de45aa2630a4") }
}
相比于读取 Oplog 的形式,基于变更流 API 的 CDC 具备这些劣势:
- 对分片集群反对更好。要订阅分片汇合的全副变更操作,也只须要建设一个变更流游标;
- 不便进行复原。只须要记录每条记录的 Resume Token,即可在有效期限内任意回溯;
- 反对主动获取变更后残缺文档。能够通过参数配置获取蕴含更新后残缺文档的变更记录。
晚期版本的 MongoDB CDC 就通过变更流 API 实现了流式更新的订阅。
第三阶段:基于增量快照算法的设计方案
CDC 的变更监测操作通常分为两步:第一步是在启动时对以后数据库中的状态进行残缺快照(Snapshot),第二步是监控实时的流式数据变更。晚期版本的快照阶段为单并发读取,且不反对 Checkpoint 与故障复原。这意味着在数据量很大时,快照阶段执行将破费相当长的工夫,且一旦失败必须从头开始。FLIP-27 提案 [5] 给出了下一代 Flink Source 架构,将从源读取数据的职责形象为两个模块,如下图所示:
- SplitEnumerator,负责管理并将数据源拆分为多个形象分片;
- Reader,负责从形象分片中读取理论的数据。
运行时,读取数据的过程也分为两个步骤:
- 一开始,执行 SplitEnumerator,将全副数据拆分成形象分片;
- 将每个形象分片调配给 Reader,并执行理论的读取逻辑。
Enumerator 和每个 Reader 各有本人的 Checkpoints,均反对故障复原。各 Source 也无需本人保护分片、并发模型问题。
MongoDB CDC 在 Flink CDC 2.3 版本开始迁徙到这一新的 Source 架构。在进行快照时,MongoDB CDC 须要将待快照的汇合按 Key 进行拆分,策略如下:
- 如果指标汇合为分片汇合,则依照理论的物理分片进行拆分;
- 否则,应用 MongoDB 提供的 splitVector 函数平均分片;
- 如果无奈调用 splitVector 则应用启发式算法,抽样预计文档的均匀大小,并按数据行数进行拆分。
失去的每个分片都对应一个 MinKey 和 MaxKey 指定的文档范畴,称作快照分片(SnapshotSplit)。
而在流式读取阶段,咱们只须要指定数据流的起止工夫点,确定要监控的流数据记录范畴即可。这样的分片称为流式分片(StreamSplit)。如果将进行工夫点设定为 MAX_TIMESTAMP(可示意的最大工夫戳),则代表这是一个不限定进行工夫点的无界流分片。
目前,启用了增量快照性能的 MongoDB CDC 应用 FLIP-27 举荐的 Source 接口定义形式 SplitEnumerator 会首先拆分存量数据,产生快照分片;在监测到全副的快照分片均实现之后,才会持续产生流式分片,转为流式读取。
TaskFetcher 在收到待处理的分片后,就会依照其类型(快照或流式)将其传递给对应的 SplitFetchTask 进行理论的读取工作。ScanSplitFetchTask 会依据传入的快照分片界定的 Key 范畴从 MongoDB 数据库中读取存量数据;StreamSplitFetchTask 则会订阅 Change Stream API 来获取变更数据。
所有 SplitFetchTask 产生的记录都会被放入事件队列中,并由 RecordEmitter 转发给指定的反序列化器;反序列化器将其转化为最终的 RowData,交给上游生产。
出于兼容性思考,应用传统 SourceFunction 定义的 MongoDB Source 依然存在于目前版本的 MongoDB CDC 中,用于非增量快照模式。但这种定义形式已被 Flink 标记为不举荐的(Deprecated)用法,且未来可能会被删除。
MongoDB CDC 的底层实现
目前版本的 Mongo CDC 实现大量依赖 MongoDB 底层为变动数据的捕捉提供的反对,如变更流 API、分片汇合反对、变更前后快照等性能。上面具体介绍这些性能的底层机制,对底层原理不感兴趣的读者能够疾速跳过本章节。
基于 Change Stream API 的 CDC 技术
上文提到,基于变更流 API 捕捉变更数据相比于读取 OpLog 的实现更简略无效。然而实际上 MongoDB 的变更流 API 底层也是基于 OpLog 实现的,是在 OpLog 上提供的一层封装。它们存在以下的对应关系:
- 变更流中的每条记录都有着惟一的 _id(即 Resume Token),对应 OpLog 汇合中的一次操作日志;
- 变更流中的 namespace、updateDescription、operationType 等字段和 OpLog 中记录的内容一一对应。
在此基础上,变更流 API 提供了以下便当:
- 反对通过任意无效的 Resume Token 回溯变更流
因为通过 Resume Token(即 OpLog 记录的_id 字段)能够查找出 OpLog 中对应的操作记录,从而自对应位点开始生产接下来的变更数据。
- 反对从特定的 Timestamp 开始回溯变更流
MongoDB 提供了 startAtOperationTime 选项开启变更流,反对从给定的工夫戳处开始读取变动。因为 OpLog 汇合中蕴含了所有变更的工夫戳,且依照工夫有序排列,所以只需进行二分查找即可定位到给定工夫戳对应变更流中的地位。
目前的 MongoDB 实现有个非凡的限度:如果指定的 Timestamp 产生在过来(即须要在 OpLog 中查找变更的起始点),则须要保障这一 Timestamp 在 OpLog 记录的日志范畴内。这一限定很好了解:如果给定的工夫戳在将来,则无需进行二分查找,只需期待该工夫戳之后的变更数据到来再开始捕捉即可;但如果给定的工夫戳在过来,则只能在 OpLog 记录的范畴内能力进行牢靠的二分查找;如果该工夫戳比以后 OpLog 最早的数据还要早,则 MongoDB 无奈确定在它们之间是否有其余变更;即便有,也无奈从该处复原了,因为这些变更对应的 OpLog 日志曾经被革除了。MongoDB 在遇到这种状况时会回绝回溯。
- 反对 Full Document Lookup 性能
OpLog 为了节约存储开销,只会存储起码必要的变更数据。例如,对于一次更新操作,MongoDB 并不会将变更后残缺的文档记录下来,而是只存储那些产生了变动的字段;即 OpLog 外面的更新操作日志中记录的不是残缺的文档,不是很实用。如果须要残缺的文档信息,还须要对 OpLog 中每条更新记录手动查找一次。
变更流 API 则将这一查问需要进行了包装:只需应用 fullDocument: updateLookup 参数,MongoDB 就会在读取到 Update 事件时,在返回的记录中补充残缺的文档,并记录在 fullDocument 字段中。留神这仅实用于 Update 类型的操作,因为 Insert 操作总是蕴含残缺文档信息(因为插入操作前该文档还不存在);Delete 操作总是只蕴含 _id 信息(因为已被删除的文档无奈再查找)。
须要留神的是,MongoDB 不保障 updateLookup 给出的文档肯定与该更新操作的后果对应。也就是说,间断对同一文档进行 Update 操作产生的变更流记录,较早变更查出的 FullDocument 可能被较晚变更的内容笼罩。
这和手工读取 OpLog、再手动读取 Full Document 的问题是一样的。不过因为 MongoDB 的每个文档都具备惟一的 _id 字段,因而在 Upsert 模式的数据流中这一问题不会对后果产生太大影响。
MongoDB 分片汇合的反对
MongoDB 的每个分片节点都具备本人的 OpLog 汇合,别离记录属于本人分片的变更数据;这意味着如果须要基于 OpLog 监控分片汇合的变更,则须要并行地监控每个分片的 OpLog 汇合,并手动解决同步问题、依照工夫戳对来自不同分片的变更日志进行排序并输入,难度和危险都比拟高。
而变更流 API 对分片汇合的变更捕捉进行了封装,使其更加易用。即便通过变更流 API 订阅的是分片存储的汇合,也只会产生惟一一个变更流游标,其中蕴含了来自所有分片的变更数据;且提供了很强的有序性保障,即产生的变更数据总是工夫有序的。
MongoDB 的实现形式是给分片汇合设计一个中心化的治理节点,负责从各个节点获取记录并排序,从而产生有序的输入。每个节点外部的 OpLog 记录总是有序的。那么在所有节点都给出一条该节点内最早的记录之后,其中最早的那一个就能够认为是全局最早值,能够发送给订阅 Change Stream 的客户端了。
然而这样其实会有问题:如果其中有个节点的数据始终没有被更新,那么它就不会给出变更记录;这样,核心节点就无奈确认这个节点到底是很久以前有更新,只是始终没收回来记录,还是的确没有数据;这样就无奈推动全局工夫戳了。
MongoDB 对这一问题的解决方案和 Flink 的 Watermark 机制很相似:MongoDB 要求每个节点即便没有变更数据,也须要定期向核心节点发送一条空白指令。这样核心节点就能够确认每个节点的同步状态,从而推动全局工夫戳。
高效地反对变更前后快照
Change Stream 底层是基于 OpLog 实现的,而 OpLog 不记录更新和删除前的文档信息。为了反对“获取变更 / 删除前文档”这一需要,MongoDB 不得不额定在某个地位保留这一信息。出于兼容性的考量以及贮存开销起因,MongoDB 没有抉择批改 OpLog 格局来贮存额定字段,而是在非凡的汇合中存储变更前后的文档信息(称为 Pre- and Post-Images),并提供了汇合粒度的开关管制。
因为针对每次变更和删除操作都记录下前后快照的开销并不小,所以 MongoDB 提供了许多开关避免状态收缩,例如:
- 反对在零碎层面配置变更前后快照的过期工夫:
db.runCommand({
setClusterParameter: {
changeStreamOptions: {
preAndPostImages: {expireAfterSeconds: '600' // 设定过期工夫为 600 秒}
}
}
})
- 反对对每个汇合独自配置变更前后快照开启与否:
db.runCommand({
collMod: "testtable",
changeStreamPreAndPostImages: {enabled: true // 配置对 testtable 汇合中的变更记录快照}
})
- 反对对每个变更流游标抉择是否读取变更前后快照:
db.testtable.watch({
// 配置是否读取变更后快照
fullDocument: 'required',
// 配置是否读取变更前快照
fullDocumentBeforeChange: 'required'
})
为什么要同时存储变更后的文档呢?之前提到的将 FullDocument 设定为 updateLookup 的办法,尽管能够失去变更后的残缺文档信息,但却不能保障失去的肯定是这一次更新之后的文档信息。例如,在间断进行两次更新操作时,可能会呈现前一次更新操作的 FullDocument 被后一次笼罩的状况。究其原因,是因为这一信息是预先独立查问(Lookup)的,而非和变更事件自身相关联。
而变更后文档记录是在每次变更触发器中存储的,和特定的 OpLog 变更条目相关联,因而能够保障记录的文档反映了该变更的后果;将 fullDocument 选项设定为 required 或 whenAvailable 时,即可要求从变更后快照中读取记录,而非进行“预先查问”。
在启用 scan.full-changelog 选项时,MongoDB CDC 会要求从变更后文档记录中产生 Update After 事件,从而确保每条 Update After 事件与理论变更操作一一对应。这是通过 ChangelogNormalize 算子将 Upsert 流正规化的实现形式无奈保障的。
Announce: Flink CDC 社区新增 Maintainer 成员
Flink CDC 社区的疾速倒退离不开贡献者们的无私奉献,截止目前社区的贡献者曾经超过 100 人,贡献者群体继续扩充。通过 Flink CDC 社区 Maintainer 成员提议,Flink CDC 社区曾经正式邀请 whhe (川粉) 退出 Flink CDC 社区的 Maintainer 列表。
川粉老师是 OceanBase 开源团队的研发工程师,次要负责 OceanBase 的开源生态对接和社区治理相干工作,他长期沉闷在 Flink CDC 社区,参加了多个版本的开发工作,作为外围贡献者在社区奉献了包含 OceanBase CDC Connector 在内的多个 PR,同时在用户群和 issue 列表中帮忙踊跃帮忙用户解决问题,为社区倒退做出了继续的奉献。期待川粉老师作为 Flink CDC 的 Maintainer,为社区的倒退带来更多数据库侧的视角,帮忙更多的社区贡献者和用户,也心愿将来能有更多的贡献者能够退出 Maintainer 列表,一直推动社区的倒退。
阿里云实时计算 Flink 基于 Flink 1.17 的企业版 Flink 已正式公布 [7],在该版本中 MongoDB CDC 连接器已开始公测,反对任意工夫戳生产、残缺事件流等性能 [8]。欢送应用!
参考
[1] MongoDB CDC 社区文档:
https://ververica.github.io/flink-cdc-connectors/release-2.4/…
[2] Flink CDC 社区官网:
https://ververica.github.io/flink-cdc-connectors/
[3] MongoDB Oplog 文档:
https://www.mongodb.com/docs/manual/core/replica-set-oplog/
[4] MongoDB 变更流文档:
https://www.mongodb.com/docs/manual/changeStreams/
[5] Apache Flink FLIP-27 提案:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+…
[6] MongoDB PreImages 文档:
https://www.mongodb.com/docs/atlas/app-services/mongodb/preim…
[7] 阿里云 MongoDB CDC 文档:
https://help.aliyun.com/zh/flink/developer-reference/mongodb-…
[8] 阿里云 Flink 实时计算 Release Notes:
https://help.aliyun.com/zh/flink/product-overview/august-21-2023
更多内容
流动举荐
阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
首购 99 元包月试用,有机会赢取定制周边礼品!
产品官网:https://www.aliyun.com/product/bigdata/sc