五分钟了解MongoDB-1Change-Stream-和MongoDB-4x

32次阅读

共计 3026 个字符,预计需要花费 8 分钟才能阅读完成。

充分获知数据库的数据变动是从 MongoDB 向其他数据服务进行数据同步的关键点。与直接查询 collection 来获取数据变动相比,通过流式的方式进行监听会有效并及时的多。这是一种非常强大的“响应式编程”模式。随着 MongoDB 的版本更新,流式的获取方式将变得原来越易用。

让我们来一同回顾一下。在 MongoDB3.6 之前,如果我们希望对 MongoDB 数据库中的数据变动进行监听,我们通常是通过“监听并回放 oplog”(“tail the oplog”)的模式(oplog 表将会记录复制集中的数据变动)。在生产环境中这种方式(“监听并回放 oplog”)通常较为复杂,并且难以保证其稳定与可靠性。

Change Streams and Collections

从 MongoDB3.6 开始支持的 Change Streams 打破了这个僵局。Change Streams 使得数据的变动监听变得简单易用。以下是一个示例,该示例演示了通过 Node.js 对“movieDetails”表的变动监听。

<pre>
javascript
const MongoClient = require(“mongodb”).MongoClient;
const uri = “MONGODBURL”;
const client = new MongoClient(uri, { useNewUrlParser: true});
client.connect().then(db => {
const changeStream = client.db(“video”).collection(“movieDetails”).watch();
changeStream.on(“change”, next => {
console.log(next);
});
});
</pre>

上述代码首先连接进入了数据实例,并通过 watch()函数对“video”库的“movieDetails”表建立了 change stream。而后通过.on(“change”,… 建立了一个事件 trigger,该事件将监听该 change stream 上的所有变动并调用对应的后续函数。在上述示例中,监听到变动后将会将变动事件打印出来。下面是我们在 MongoDB Compass 中进行对应修改后的输出示例:

<pre>
javascript
{_id:
{_data: ‘825C51D03F0000000129295A1004E515B4338C574BA2B9603CB1C7FB3B0446645F696400645C0EC4B74B052F9E2EF0C3810004’}, operationType: ‘replace’,
clusterTime:
Timestamp {_bsontype: ‘Timestamp’, low_: 1, high_: 1548865599},
fullDocument:
{_id: 5c0ec4b74b052f9e2ef0c381,

 title: 'PS I Love You',
 year: 2007,
 ...
 awards: {wins: 2, nominations: 4, text: '2 wins & 4 nominations.'},
 type: 'movie' },

ns: {db: ‘video’, coll: ‘movieDetails’},
documentKey: {_id: 5c0ec4b74b052f9e2ef0c381}
}
</pre>

在上述的返回中可以快速的找到此次 change stream 的重要信息,即通过 operationType 了解到变动的类型,有关完整的返回说明请参考 [Change Events documentation]。当监听某一个 collection 的时候,operationType 的值通常是 insert , update , replace , delete 或 invalidate,前四种的含义通过名字可以清楚的获知,上述返回的 replace 类型是我们通过 Compass 同 collection 进行 replace 操作的反馈。

当我们监听的 collection 被 drop、改名或者其所属的 db 被 drop 的时候,我们将会看到类型为 invalidate 的 operationType。于此同时这也意味着是时候关闭 change stream 了。上述返回中剩下的部分是变动的详细信息,变动发生在什么 namespace,数据是什么样的,何时发生的变更。

以上的示例是在 MongoDB4.x 版本中生成的,相比 3.6 版本,4.x 版本新增了一个_data 字段。该字段是一个恢复 token(resume token),应用程序能够在重连后从该点进行继续监听。

Beyond Collections

如果你只需要针对某一个 collection 进行变动监听,MongoDB3.6 就可以满足你的需求,但是对于那些此前通过 oplog 来进行变动监听的同学,他们的诉求往往是希望监听数据库中的所有变动,以此来将变动应用到其他系统中。MongoDB4.0 很好的满足了这个诉求,在 4.0 版本中我们可以针对若干个数据库或者整个实例(复制集或者 sharding)进行变动监听。与 watch() 某一个 collection 不同,4.0 中我们可以 watch() 某个数据库或者整个实例。

<pre>
javascript
const MongoClient = require(“mongodb”).MongoClient;
const uri =”MONGODBURL”;
undefined
const client = new MongoClient(uri, { useNewUrlParser: true});
client.connect().then(db => {
const changeStream = client.watch();
changeStream.on(“change”, next => {
console.log(next);
});
</pre>

上述示例将对于任何数据库、任何表的任何变动进行输出,这些也不是我们所捕获的全部信息。由于我们将监听范围放到了最广,我们也将会看到在删除 collection 时候的删除事件、删除数据库的时间以及重命名 collection 的事件。

What Next?

我们可以根据实际需要选择监听某一个 collection 的变动、或者某个数据库中所有 collection 的变动又或者是整个实例中所有的数据库与 collection 的变动。需要注意的是创建新 collection、数据库的变动将不会被直接监听到,不过我们可以通过变动中的内容间接获知。

当然,这也不是什么大问题,如果我们希望监听数据库或者 collection 的创建,我们可以通过变动内容中的 collection 来判断是否该表为此前未创建的新表这一方法进行。另外,索引的创建由于不是为表数据变动也不会被监听捕获。

MongoDB4.0 为我们带来了一个全新且强大的数据变动监听方式,尤其是该方式可以实时进行变动捕获。我们十分建议你去尝试下这个功能。Change Stream 的详细文档可以参考 [Change Streams]。如果你还未安装 MongoDB4.0 实例,你也可以在 MongoDB Atlas 中[注册] 并获取 M0 的免费集群节点进行学习和测试。

MongoDB 中文社区,一个 mongoers 都会来的社区(微信公众号:mongoing-mongoing)
译者:周李洋
Teambition 运维总监
MongoDB 中文社区联席主席

正文完
 0