本文来自取得《2021MongoDB技术实际与利用案例征集流动》入围案例奖作品
作者:vivo互联网技术

1.业务背景

随着公司业务倒退和用户规模的增多,很多我的项目都在打造本人的评论性能,而评论的业务状态根本相似。过后各我的项目都是各自设计实现,存在较多反复的工作量;并且不同业务之间数据存在孤岛,很难产生分割。因而咱们决定打造一款公司级的评论业务中台,为各业务方提供评论业务的疾速接入能力。在通过对各大支流app评论业务的竞品剖析,咱们发现大部分评论的业务状态都具备评论、回复、二次回复、点赞等性能。具体如下图所示:

波及到的外围业务概念有:
【主题 topic】 评论的主题,商城的商品、利用商店的app、社区的帖子。
【评论 comment】用户针对于主题发表的内容。
【回复 reply】用户针对于某条评论发表的内容,包含一级回复和二级回复。

2.为什么抉择MongoDB

团队在数据库选型设计时,比照了多种支流的数据库,最终在MySQL和MongoDB两种存储之进行抉择。

因为评论业务的特殊性,它须要如下能力:
【字段扩大】业务方不同评论模型存储的字段有肯定差别,须要反对动静的主动扩大。
【海量数据】作为公司中台服务,数据量随着业务方的增多成倍增长,须要具备疾速便捷的程度扩大和迁徙能力。
【高可用】作为中台产品,须要提供疾速和稳固的读写能力,可能读写拆散和主动复原
而评论业务不波及用户资产,对事务的要求性不高。因而咱们选用了MongoDB集群作为了最底层的数据存储形式。

3.MongoDB在评论中台的利用

3.1 MongoDB集群常识

集群架构
因为单台机器存在磁盘/IO/CPU等各方面的瓶颈,所以MongoDB提供集群形式的部署架构,如图所示:

次要由以下三个局部组成:
mongos:路由服务器,负责管理利用端的具体链接。利用端申请到mongos服务后,mongos把具体的读写申请转发应的shard节点上执行。一个集群能够有1~N个mongos节点。
config:配置服务器,用于分存储分片汇合的元数据和配置信息,必须为复制集(对于复制集概念戳我) 形式部署。mongos通过config配置服务器合的元数据信息。
shard:用于存储汇合的分片数据的MongoDB服务,同样必须以复制集形式部署。

片键
MongoDB数据是存在collection(对应MySQL表)中。集群模式下,collection依照片键(shard key)拆分成多个区间,每个区间组成一个chunk,依照规定散布在不同的shard中。并造成元数据注册到config服务中治理。

分片键只能在分片汇合创立时指定,指定后不能批改。

分片键次要有两大类型:
hash分片:通过hash算法进行散列,数据分布的更加均匀和扩散。反对单列和多列hash。
范畴分片:依照指定片键的值散布,间断的key往往散布在间断的区间,更加适用范围查问场景。单数据散列性由分片键自身保障。

3.2 评论中台的实际

片键的抉择
MongoDB集群中,一个汇合的数据部署是扩散在多个shard分片和chunk中的,而咱们心愿一个评论列表的查问最好只拜访到一个shard分片,因而确定了范畴分片的形式。
起初设置只应用单个key作为分片键,以comment评论表举例,次要字段有{"_id":惟一id,"topicId":主题id,"text":文本内容,"createDate":工夫} ,思考到一个主题id的评论尽可能间断散布,咱们设置的分片键为topicId。随着性能测试的染指,咱们发现了有两个十分致命的问题:
jumbo chunk问题
惟一键问题

jumbo chunk:
官网文档中,MongoDB中的chunk大小被限度在了1M-1024M。分片键的值是chunk划分的惟一根据,在数据量继续写入超过chunk size设定值时, MongoDB集群就会主动的进行决裂或迁徙。而对于同一个片键的写入是属于一个chunk,无奈被决裂,就会造成 jumbo chunk问题。
举例:若咱们设置1024M为一个chunk的大小,单个document 5KB计算,那么单个chunk可能存储21W左右document。思考热点的主题评论( 如微信评论),评论数可能达到40W+,因而单个chunk很容易超过1024M。超过最大size的chunk仍然可能提供读写服务,只是不会再进行决裂和迁徙,短暂以往会造成集群之间数据的不均衡。

惟一键问题:
MongoDB集群的惟一键设置减少了限度,必须是蕴含分片键的;如果_id不是分片键,_id索引只能保障单个shard上的唯一性。
You cannot specify a unique constraint on a hashed index
For a to-be-sharded collection, you cannot shard the collection if the collection has other unique indexes
For an already-sharded collection, you cannot create unique indexes on other fields

因而咱们删除了数据和汇合,调整topicId 和 _id 为联结分片键 从新创立了汇合。这样即突破了chunk size的限度,也解决了唯一性问题。

迁徙和扩容
随着数据的写入,当单个chunk中数据大小超过指定大小时(或chunk中的文件数量超过指定值)。MongoDB集群会在插入或更新时,主动触发chunk的拆分。

拆分会导致汇合中的数据块散布不平均,在这种状况下,MongoDB balancer组件会触发集群之间的数据块迁徙。balancer组件是一个治理数据迁徙的后盾过程,如果各个shard分片之间的chunk数差别超过阈值,balancer会进行主动的数据迁徙。

balancer是能够在线对数据迁徙的,然而迁徙的过程中对于集群的负载会有较大影响。个别倡议能够通过如下设置,在业务低峰时进行。
db.settings.update(
{ _id: "balancer" },
{ $set: { activeWindow : { start : "<start-time>", stop : "<stop-time>" } } },
{ upsert: true }
)
MongoDB的扩容也非常简单,只须要筹备好新的shard复制集后,在mongos节点中执行:
sh.addShard("<replica_set>/<hostname><:port>")
扩容期间因为chunk的迁徙,同样会导致集群可用性升高,因而只能在业务低峰进行。

集群的扩大
作为中台服务,对于不同的接入业务方,通过表隔离来辨别数据。以comment评论表举例,每个接入业务方都独自创立一张表,业务方A表为 comment_clientA ,业务方B表为 comment_clientB,均在接入时创立表和相应索引信息。但只是这样设计存在几个问题:
单个集群,不能满足局部业务数据物理隔离的须要
集群调优(如split迁徙工夫)很难业务个性差异化设置
程度扩容带来的单个业务方数据过于扩散问题

因而咱们扩大了MongoDB的集群架构:

扩大后的评论MongoDB集群 减少了 【逻辑集群】和【物理集群】的概念。一个业务方属于一个逻辑集群,一个物理集群蕴含多个逻辑集群。
减少了路由层设计,由利用负责扩大spring的MongoTemplate和连接池治理,实现了业务到MongoDB集群之间的切换抉择服务。
不同的MongoDB分片集群,实现了物理隔离和差别调优的可能。

3.3 自研MongoDB事件采集解决平台及利用

评论中台中有很多统计数据的查问:评论数、回复数、点赞数、未读回复数等等,因为评论数据量较大,单个业务的数据量都在亿级别以上,浏览器、短视频等内容型的业务评论数据量都是在百亿级别,前台业务查问时对数据库做实时count性能必定是无奈达到要求的,因而咱们抉择应用空间换工夫的形式,在数据表中减少相干的统计数据字段,每次有新的评论发表或者状态变更时对该字段的值做$inc原子操作。这种形式的确可能晋升数据查问的效率,在评论这种读多写少的场景下非常适合。

带来的问题:
作为中台类我的项目,须要适配vivo不同的评论业务场景,随着业务倒退,这部分统计口径和类型也会随之变动,各种统计类型越来越多(目前已有20多种统计),在代码主流程中耦合的统计逻辑越来越越重,并且各种统计逻辑的代码散落在零碎代码的各个角落,零碎也越来越臃肿,并且代码变更时很容易带来统计数据不准的问题,这类问题会影响评论及回复的下拉展现的成果,举一个较为典型的例子,如果以后评论下有1条回复,然而统计谬误,误认为没有回复,则以后人回复信息则无奈显示,如果须要修复此类问题,咱们发现涉及面比拟广,无奈收敛。因而咱们对统计相干的逻辑进行了一次大的重构。

事件数据服务
咱们抉择事件驱动的形式将统计逻辑从主流程中解构进去,每一个统计都有其对应的变更事件,例如:
评论发表事件 > 评论数统计点赞、
勾销点赞事件 > 点赞数统计
仅本身可见事件 > 用户仅本身可见数统计

然而这个事件如何收回来呢?一开始咱们想的是在主流程中依据场景发送不同类型的事件,然而这种形式不够灵便,每次减少一种统计类型就须要减少一种事件类型。转换下思路,其实每收回一种事件其本质上是数据库某些数据产生了变动,那咱们为何不监听数据库的操作日志,采集数据的变更,通过比照变更前后的数据来自定义事件,进而进行数据的统计,整体流程如下:

其中监听模块负责监听MongoDB数据变更,并对变更事件进行解决和散发,当被监听的表产生数据变更时会将变更前后的数据通过mq的形式分发给数据服务。数据服务专门负责评论中台数据统计、数据刷新等性能,这些性能对系统负载影响比拟大,须要和主零碎解耦。

从上图能够看到,整个监听模块包含了”bees采集“和”业务事件平台“两个局部,一个负责采集,一个负责对事件进行解决和散发,这里为什么还须要一个独立的平台进行事件处理和散发,有2个起因:
间接采集的事件,其事件内容仍然无奈满足业务需要,无奈满足比照变更前后的数据来自定义事件,因而须要对相应的事件数据进行合并,具体的实现在后文进行介绍。
变更事件须要依照业务方自定义的条件进行过滤,只须要将满足条件的事件分发给上游数据服务。

因而这里复用了vivo自研的业务事件平台,通过low Code形式对事件数据进行解决和过滤后,发给上游的数据服务,整个流程为bees采集—>事件平台—>评论数据服务,接下来咱们先介绍下MongoDB的采集零碎局部。

MongoDB采集架构和流程
对于MongoDB的采集,咱们复用了vivo自研的在Bees大数据采集平台能力,基于这个平台的DB数据采集的子组件(bees-dbsync),咱们开发了用于MongoDB的数据采集的链路,实用于实时获取MongoDB的变更信息,实现了对于MongoDB的全量和增量的采集能力。

性能介绍
现阶段MongoDB采集模块能够反对的性能如下:
反对在工作接入后进行一次全量采集,全量采集完结后会依据用户配置的延迟时间开启增量采集。
反对针对繁多工作的启动、进行、批改等操作。操作实现后会实时进行相应的变更。
增量采集断点续传性能。在任意时刻进行工作后进行复原采集,工作将由上一次实现的采集点位后一个点位开始持续进行采集。在oplog大小保证数据残缺的条件下,不会有数据漏采的状况产生。
全量、增量采集均反对依照用户自定义的kafka分区发送规定发送至相应的分区。
全量、增量采集均反对工作状态和数据量监控。
全量、增量采集均反对依据正本集、sharding变动主动开启采集。

计划架构
在采集零碎中,对于MongoDB的采集模块架构图如下:

采集零碎中,MongoDB采集模块内部结构如上图橙色方框内所示。次要包含:
MongoDBParser首先将对数据进行解析和过滤。对于MongoDB全量数据采集,MongoDBParser将间接对全表执行find()操作,对于MongoDB 增量数据采集,MongoDBParser将读取local库下的oplog.rs表,依据递增的ts值顺次获取新增的oplog事件,因为ts值的唯一性,曾经发送实现的 ts值将作为历史点位信息保留在Bees-DBSync本地并上传到Bees-Manager的数据库中,成为断点续传性能的必须条件。
StreamData,具体由MongoDBStreamData实现,负责寄存Bees-DBSync从业务MongDB解析后的数据。
PackageThread,负责对MongoDBStreamData格局的数据进行工作信息的填充,并将数据打包成对立的PkgData数据格式,用以保障MongoDB、MySQL等发送格局的一致性。
PkgData,是Bees-DBSync外部解决后对立的数据格式。
KafkaSink模块,负责对目标端信息进行解决,依据用户配置的不同分区发送规定,将PkgData格局的数据发送到不同的Kafka分区。

数据完整性保障
借鉴了RingBuffer设计思路:

定义了三个点位:
Pull:最初一次拉取oplog的点位
Sink:最初一次发送Kafka的点位
Ack:最初一次胜利Sink到Kafka的点位
三个点位的关系是Ack <= Sink <= Pull,三个点位记录了日志采集状况,出现异常状况时能够从记录的点位复原采集,最终保障数据完整性。

采集流程
MongoDB采集工作执行流程图如下:

以上局部是采集零碎的整顿架构和流程,接下来介绍下,对变更事件进行合并和解决的架构和流程。

MongoDB变更事件处理平台
在这里咱们复用了vivo自研的事件处理平台的能力,新增了对MongoDB变更事件的解决,这个平台的建设目标是面向咱们的服务端开发同学,通过画布拖拽形式帮忙业务对采集到的存储变更事件进行解决和散发。平台提供了一个低提早的流式解决解决方案,反对画布的形式对采集到的MongoDB变更事件进行过滤(filter)、转化解决(map、udf)和输入(sink),反对UDF插件的形式自定义解决采集到的数据,并将解决的数据散发到不同的输入端。目前反对将MongoDB采集的事件通过filter、map后sink到es,hbase,kafka,rabbitmq以及通过dubbo形式告诉到不同上游。

业务事件平台的整体架构图

在评论中台的业务场景中,评论中台须要根据MongoDB字段产生变更的条件来进行过滤和统计,例如,须要更加字段“commentNum”进行判断,以后字段的值是否是从少变多,是否数值有减少,这就须要依据MongoDB的变更前的值以及变更后的值进行比拟,对合乎评论业务场景条件的文档内容进行向下传递,因而咱们须要捕捉到MongoDB变更前的值,以及未变更字段的值。

如下图所示,须要汇合对Oplog的变更事件的采集,组合成残缺的文档内容,以后文档内容放在data属性下,old属性下为波及到批改的字段在变更前的值。

然而咱们发现,采集MongoDB的Oplog或者基于change stream都无奈齐全满足咱们的需要。

MongoDB原生的Oplog不同于binlog,Oplog只蕴含以后变更的字段值,Oplog短少变更前的值,以及未变更字段的值,change stream也无奈拿到变更前的值,基于这样的问题,咱们引入Hbase字段多版本的个性,先预热数据到Hbase中,而后通过查问到的字段旧版本值,进行数据合并,形成残缺的变更事件。

MongoDB事件数据合并流程

4.写在最初

MongoDB集群在评论中台我的项目中已上线运行了两年,过程中实现了约20+个业务方接入,承载了百亿级评论回复数据的存储,体现较为稳固。BSON非结构化的数据,也撑持了咱们多个版本业务的疾速降级。而热门数据内存化存储引擎,较大的进步了数据读取的效率。另外咱们针对MongoDB实现了异步事件采集能力,进一步扩大了MongoDB的利用场景。

但对于MongoDB来说,集群化部署是一个不可逆的过程,集群化后也带来了索引,分片策略等较多的限度。因而个别业务在应用MongoDB时,正本集形式就能撑持TB级别的存储和查问,并非肯定须要应用集群化形式。

以上内容基于MongoDB 4.0.9版本个性,和最新版本的MongoDB细节上略有差别。

对于作者:

vivo互联网技术:百灵评论我的项目开发团队,鲁班事件平台开发团队,bees数据采集团队

参考资料:
https://docs.mongodb.com/manu...