MongoDB-Change-Stream初体验

47次阅读

共计 3000 个字符,预计需要花费 8 分钟才能阅读完成。

Change Stream 是 MongoDB 从 3.6 开始支持的新特性。这个新特性有哪些奇妙之处,会给我们带来什么便利?本次的文章将就这个主题进行初步讨论。

Change Stream 是什么?

顾名思义,Change Stream 即变更流,是 MongoDB 向应用发布数据变更的一种方式。即当数据库中有任何数据发生变化,应用端都可以得到通知。我们可以将其理解为在应用中执行的触发器。至于应用想得到什么数据,以什么形式得到数据,则可以通过聚合框架加以过滤和转换。这点将在后文中讨论。

Change Stream 的原理

我们先来回顾一下 MongoDB 复制集大致是如何工作的:

  1. 应用通过驱动向数据库发起写入请求;
  2. 在同一个事务中,MongoDB 完成 oplog 和集合的修改;
  3. oplog 被其他从节点拉走;
  4. 从节点应用得到的 oplog,同样在一个事务中完成对 oplog 和集合的修改;

至此,复制集同步完成。可以发现,整个同步过程是依赖于 oplog 来进行的。也就是说 oplog 实际上已经包含了我们需要的所有变更数据。如果观测 oplog 的变化,是否就能够得到所有变更的数据了呢?对,change stream 正是基于这个原理实现的。但事情并没有这么简单!我们来看一下问题有可能出在什么地方。

如何从断点恢复

现实世界中,没有哪个应用是可以不间断运行的。不考虑 bug 导致的问题,正常的应用升级也会导致应用中断运行。那么在应用恢复的时候,从哪里开始继续获取变更呢?oplog 当然是可以帮我们做到这点的,但你必须对 MongoDB 足够了解,才知道有 oplogReplay 这样的参数,以及其他一些问题。

如何有效地处理订阅

假设在一个应用中需要订阅 10 个不同集合的变更情况,是否需要开 10 个 tailable cursor 去获取 oplog 的变更呢?如果是 100 个集合呢?出于效率考虑显然不应该这么做。那么整个过程就会变成一个生产者 - 消费者模式,由一个线程负责从 oplog 获取变更,由订阅的线程负责消费这些变更。虽然实现也不是那么复杂,并且多半可以找到开源实现,但是涉及多线程就已经足够让初学者头疼一阵的了。
公平地说,上面这些还不算严重的问题,下面这些问题可能会更让人头疼。

如何管理权限

想要 tail oplog,必须对 local.oplog.rs 有读权限。实际上这相当于对整个数据库都有了读权限,因为所有的变更都会在这里体现出来。DBA 可能会阻止你这么做,因为这实在不是一个很安全的做法。

如何数据回滚

极端情况下,如果应用处理不当,MongoDB 中可能发生数据回滚 rollback 的问题。如果仅仅通过跟踪 oplog,则会出现已经通知出去的变更被回滚的情况。

幸运的是上面这些问题现在都不是问题了,因为 change stream 帮我们规避了这些复杂的细节。

使用方法

由于各种驱动都会有不同的语法和 API,从 shell 中尝试使用 change stream 可能是最简便的方法。这并不妨碍你随后在各种驱动中的使用,因为 shell 中能实现的功能在驱动中一定有对应的语法。下面就以 shell 为例看看 change stream 应该如何使用。

打开一个 shell,订阅你需要关注的集合
比如:

var cursor = db.bar.watch();

为了便于演示,我们在这个 shell 中不断遍历这个游标以获取新数据:

while(true) {if (cursor.hasNext()) {print(JSON.stringify(cursor.next()));
    }
}

打开另一个 shell,向 bar 集合中插入一条数据:

db.bar.insert({y: 1})

此时第一个 shell 中会立即输出变更数据:

{"_id":{"_data":{"$binary":"glzquiIAAAACRmRfaWQAZFzquiK0lDNo+K0DpwBaEARUMrm0ruVACoftuxjt1RtCBA==","$type":"00"}},"operationType":"insert","fullDocument":{"_id":{"$oid":"5ceaba22b4943368f8ad03a7"},"y":1},"ns":{"db":"test","coll":"bar"},"documentKey":{"_id":{"$oid":"5ceaba22b4943368f8ad03a7"}}}

这里的一些字段的简单介绍。更完整的介绍请查阅文档 change events:

  • _id: 用于恢复断点时使用。即知道这个值,应用断开后下次重启里就可以从这个断点之后开始恢复获得变更;
  • operationType: 操作类型,常见的值包括:

    • insert
    • update
    • delete
  • ns: 正在操作的命名空间
  • fullDocument: 完整的文档

从断点恢复

var cursor = db.bar.watch([], {resumeAfter: <\_id>})

此时使用 hasNext()/next() 即可获取到随后的变更。

注意事项

{readConcern: ‘majority’}

为了避免被回滚的更新被发布出去,change stream 选择只在一个变更到达大多数节点(不可能被回滚)时,才会将这些变更发布到应用。使用的方式即{readConcern: "majority"}。因此以下这些情况下 change stream 都是不会向应用通知任何变更的:

  • 禁用了readConcern
  • 从旧版本升级,但没有更新featureCompatibilityVersion
  • PSA 架构中 S 宕机;

断点可恢复时间

因为 change stream 是依赖于 oplog 工作的,自然也会面临 oplog 面临的所有问题。问题之一就是 oplog 被覆盖。因此想要保证断点可以恢复,必须保证应用在 oplog window 的时间内请求断点。

删除集合

如果在订阅集合变更过程中集合被删除,则会收到一条 invalid 信息通知,表示集合已不再可用:

{
    "_id" : {"_data" : BinData(0,"glzqxCcAAAACFFoQBFQyubSu5UAKh+27GO3VG0IE")
    },
    "operationType" : "invalidate"
}

参考资料

  • Tailable cursor: https://docs.mongodb.com/manual/core/tailable-cursors/
  • 生产者 - 消费者模式: https://zh.wikipedia.org/wiki/%E7%94%9F%E4%BA%A7%E8%80%85%E6%B6%88%E8%B4%B9%E8%80%85%E9%97%AE%E9%A2%98
  • 关于回滚: https://docs.mongodb.com/manual/core/replica-set-rollbacks/
  • 变更事件: https://docs.mongodb.com/manual/reference/change-events/
  • Change Stream 介绍文档:https://docs.mongodb.com/manual/changeStreams/

作者简介

张耀星,MongoDB 亚太区首席技术咨询服务顾问。在 MongoDB 的开发、应用和咨询服务上有多年实践经验。作为 MongoDB 认证专家,曾经为不同行业的各类大型客户提供过培训、性能调优、架构设计等各类 MongoDB 相关技术服务。

正文完
 0