本文来自取得《2021MongoDB 技术实际与利用案例征集流动》入围案例奖作品
作者:vivo 互联网技术
1. 业务背景
随着公司业务倒退和用户规模的增多,很多我的项目都在打造本人的评论性能,而评论的业务状态根本相似。过后各我的项目都是各自设计实现,存在较多反复的工作量;并且不同业务之间数据存在孤岛,很难产生分割。因而咱们决定打造一款公司级的评论业务中台,为各业务方提供评论业务的疾速接入能力。在通过对各大支流 app 评论业务的竞品剖析,咱们发现大部分评论的业务状态都具备评论、回复、二次回复、点赞等性能。具体如下图所示:
波及到的外围业务概念有:
【主题 topic】评论的主题,商城的商品、利用商店的 app、社区的帖子。
【评论 comment】用户针对于主题发表的内容。
【回复 reply】用户针对于某条评论发表的内容,包含一级回复和二级回复。
2. 为什么抉择 MongoDB
团队在数据库选型设计时,比照了多种支流的数据库,最终在 MySQL 和 MongoDB 两种存储之进行抉择。
因为评论业务的特殊性,它须要如下能力:
【字段扩大】业务方不同评论模型存储的字段有肯定差别,须要反对动静的主动扩大。
【海量数据】作为公司中台服务,数据量随着业务方的增多成倍增长,须要具备疾速便捷的程度扩大和迁徙能力。
【高可用】作为中台产品,须要提供疾速和稳固的读写能力,可能读写拆散和主动复原
而评论业务不波及用户资产,对事务的要求性不高。因而咱们选用了 MongoDB 集群作为了最底层的数据存储形式。
3.MongoDB 在评论中台的利用
3.1 MongoDB 集群常识
集群架构
因为单台机器存在磁盘 /IO/CPU 等各方面的瓶颈,所以 MongoDB 提供集群形式的部署架构,如图所示:
次要由以下三个局部组成:
mongos:路由服务器,负责管理利用端的具体链接。利用端申请到 mongos 服务后,mongos 把具体的读写申请转发应的 shard 节点上执行。一个集群能够有 1~N 个 mongos 节点。
config:配置服务器,用于分存储分片汇合的元数据和配置信息,必须为复制集(对于复制集概念戳我) 形式部署。mongos 通过 config 配置服务器合的元数据信息。
shard:用于存储汇合的分片数据的 MongoDB 服务,同样必须以复制集形式部署。
片键
MongoDB 数据是存在 collection(对应 MySQL 表) 中。集群模式下,collection 依照片键(shard key)拆分成多个区间,每个区间组成一个 chunk,依照规定散布在不同的 shard 中。并造成元数据注册到 config 服务中治理。
分片键只能在分片汇合创立时指定,指定后不能批改。
分片键次要有两大类型:
hash 分片:通过 hash 算法进行散列,数据分布的更加均匀和扩散。反对单列和多列 hash。
范畴分片:依照指定片键的值散布,间断的 key 往往散布在间断的区间,更加适用范围查问场景。单数据散列性由分片键自身保障。
3.2 评论中台的实际
片键的抉择
MongoDB 集群中,一个汇合的数据部署是扩散在多个 shard 分片和 chunk 中的,而咱们心愿一个评论列表的查问最好只拜访到一个 shard 分片,因而确定了范畴分片的形式。
起初设置只应用单个 key 作为分片键,以 comment 评论表举例,次要字段有 {“_id”: 惟一 id,”topicId”: 主题 id,”text”: 文本内容,”createDate”: 工夫} , 思考到一个主题 id 的评论尽可能间断散布,咱们设置的分片键为 topicId。随着性能测试的染指,咱们发现了有两个十分致命的问题:
jumbo chunk 问题
惟一键问题
jumbo chunk:
官网文档中,MongoDB 中的 chunk 大小被限度在了 1M-1024M。分片键的值是 chunk 划分的惟一根据,在数据量继续写入超过 chunk size 设定值时,MongoDB 集群就会主动的进行决裂或迁徙。而对于同一个片键的写入是属于一个 chunk,无奈被决裂,就会造成 jumbo chunk 问题。
举例:若咱们设置 1024M 为一个 chunk 的大小,单个 document 5KB 计算,那么单个 chunk 可能存储 21W 左右 document。思考热点的主题评论(如微信评论),评论数可能达到 40W+,因而单个 chunk 很容易超过 1024M。超过最大 size 的 chunk 仍然可能提供读写服务,只是不会再进行决裂和迁徙,短暂以往会造成集群之间数据的不均衡。
惟一键问题:
MongoDB 集群的惟一键设置减少了限度,必须是蕴含分片键的;如果_id 不是分片键,_id 索引只能保障单个 shard 上的唯一性。
You cannot specify a unique constraint on a hashed index
For a to-be-sharded collection, you cannot shard the collection if the collection has other unique indexes
For an already-sharded collection, you cannot create unique indexes on other fields
因而咱们删除了数据和汇合,调整 topicId 和 _id 为联结分片键 从新创立了汇合。这样即突破了 chunk size 的限度,也解决了唯一性问题。
迁徙和扩容
随着数据的写入,当单个 chunk 中数据大小超过指定大小时 (或 chunk 中的文件数量超过指定值)。MongoDB 集群会在插入或更新时,主动触发 chunk 的拆分。
拆分会导致汇合中的数据块散布不平均,在这种状况下,MongoDB balancer 组件会触发集群之间的数据块迁徙。balancer 组件是一个治理数据迁徙的后盾过程,如果各个 shard 分片之间的 chunk 数差别超过阈值,balancer 会进行主动的数据迁徙。
balancer 是能够在线对数据迁徙的,然而迁徙的过程中对于集群的负载会有较大影响。个别倡议能够通过如下设置,在业务低峰时进行。
db.settings.update(
{_id: “balancer”},
{$set: { activeWindow : { start : “<start-time>”, stop : “<stop-time>”} } },
{upsert: true}
)
MongoDB 的扩容也非常简单,只须要筹备好新的 shard 复制集后,在 mongos 节点中执行:
sh.addShard(“<replica_set>/<hostname><:port>”)
扩容期间因为 chunk 的迁徙,同样会导致集群可用性升高,因而只能在业务低峰进行。
集群的扩大
作为中台服务,对于不同的接入业务方,通过表隔离来辨别数据。以 comment 评论表举例,每个接入业务方都独自创立一张表,业务方 A 表为 comment_clientA,业务方 B 表为 comment_clientB,均在接入时创立表和相应索引信息。但只是这样设计存在几个问题:
单个集群,不能满足局部业务数据物理隔离的须要
集群调优 (如 split 迁徙工夫) 很难业务个性差异化设置
程度扩容带来的单个业务方数据过于扩散问题
因而咱们扩大了 MongoDB 的集群架构:
扩大后的评论 MongoDB 集群 减少了【逻辑集群】和【物理集群】的概念。一个业务方属于一个逻辑集群,一个物理集群蕴含多个逻辑集群。
减少了路由层设计,由利用负责扩大 spring 的 MongoTemplate 和连接池治理,实现了业务到 MongoDB 集群之间的切换抉择服务。
不同的 MongoDB 分片集群,实现了物理隔离和差别调优的可能。
3.3 自研 MongoDB 事件采集解决平台及利用
评论中台中有很多统计数据的查问:评论数、回复数、点赞数、未读回复数等等,因为评论数据量较大,单个业务的数据量都在亿级别以上,浏览器、短视频等内容型的业务评论数据量都是在百亿级别,前台业务查问时对数据库做实时 count 性能必定是无奈达到要求的,因而咱们抉择应用空间换工夫的形式,在数据表中减少相干的统计数据字段,每次有新的评论发表或者状态变更时对该字段的值做 $inc 原子操作。这种形式的确可能晋升数据查问的效率,在评论这种读多写少的场景下非常适合。
带来的问题:
作为中台类我的项目,须要适配 vivo 不同的评论业务场景,随着业务倒退,这部分统计口径和类型也会随之变动,各种统计类型越来越多(目前已有 20 多种统计),在代码主流程中耦合的统计逻辑越来越越重,并且各种统计逻辑的代码散落在零碎代码的各个角落,零碎也越来越臃肿,并且代码变更时很容易带来统计数据不准的问题,这类问题会影响评论及回复的下拉展现的成果,举一个较为典型的例子,如果以后评论下有 1 条回复,然而统计谬误,误认为没有回复,则以后人回复信息则无奈显示,如果须要修复此类问题,咱们发现涉及面比拟广,无奈收敛。因而咱们对统计相干的逻辑进行了一次大的重构。
事件数据服务
咱们抉择事件驱动的形式将统计逻辑从主流程中解构进去,每一个统计都有其对应的变更事件,例如:
评论发表事件 > 评论数统计点赞、
勾销点赞事件 > 点赞数统计
仅本身可见事件 > 用户仅本身可见数统计
然而这个事件如何收回来呢?一开始咱们想的是在主流程中依据场景发送不同类型的事件,然而这种形式不够灵便,每次减少一种统计类型就须要减少一种事件类型。转换下思路,其实每收回一种事件其本质上是数据库某些数据产生了变动,那咱们为何不监听数据库的操作日志,采集数据的变更,通过比照变更前后的数据来自定义事件,进而进行数据的统计,整体流程如下:
其中监听模块负责监听 MongoDB 数据变更,并对变更事件进行解决和散发,当被监听的表产生数据变更时会将变更前后的数据通过 mq 的形式分发给数据服务。数据服务专门负责评论中台数据统计、数据刷新等性能,这些性能对系统负载影响比拟大,须要和主零碎解耦。
从上图能够看到,整个监听模块包含了”bees 采集“和”业务事件平台“两个局部,一个负责采集,一个负责对事件进行解决和散发,这里为什么还须要一个独立的平台进行事件处理和散发,有 2 个起因:
间接采集的事件,其事件内容仍然无奈满足业务需要,无奈满足比照变更前后的数据来自定义事件,因而须要对相应的事件数据进行合并,具体的实现在后文进行介绍。
变更事件须要依照业务方自定义的条件进行过滤,只须要将满足条件的事件分发给上游数据服务。
因而这里复用了 vivo 自研的业务事件平台,通过 low Code 形式对事件数据进行解决和过滤后,发给上游的数据服务,整个流程为 bees 采集—> 事件平台—> 评论数据服务,接下来咱们先介绍下 MongoDB 的采集零碎局部。
MongoDB 采集架构和流程
对于 MongoDB 的采集,咱们复用了 vivo 自研的在 Bees 大数据采集平台能力,基于这个平台的 DB 数据采集的子组件(bees-dbsync),咱们开发了用于 MongoDB 的数据采集的链路,实用于实时获取 MongoDB 的变更信息,实现了对于 MongoDB 的全量和增量的采集能力。
性能介绍
现阶段 MongoDB 采集模块能够反对的性能如下:
反对在工作接入后进行一次全量采集,全量采集完结后会依据用户配置的延迟时间开启增量采集。
反对针对繁多工作的启动、进行、批改等操作。操作实现后会实时进行相应的变更。
增量采集断点续传性能。在任意时刻进行工作后进行复原采集,工作将由上一次实现的采集点位后一个点位开始持续进行采集。在 oplog 大小保证数据残缺的条件下,不会有数据漏采的状况产生。
全量、增量采集均反对依照用户自定义的 kafka 分区发送规定发送至相应的分区。
全量、增量采集均反对工作状态和数据量监控。
全量、增量采集均反对依据正本集、sharding 变动主动开启采集。
计划架构
在采集零碎中,对于 MongoDB 的采集模块架构图如下:
采集零碎中,MongoDB 采集模块内部结构如上图橙色方框内所示。次要包含:
MongoDBParser 首先将对数据进行解析和过滤。对于 MongoDB 全量数据采集,MongoDBParser 将间接对全表执行 find()操作,对于 MongoDB 增量数据采集,MongoDBParser 将读取 local 库下的 oplog.rs 表,依据递增的 ts 值顺次获取新增的 oplog 事件,因为 ts 值的唯一性,曾经发送实现的 ts 值将作为历史点位信息保留在 Bees-DBSync 本地并上传到 Bees-Manager 的数据库中,成为断点续传性能的必须条件。
StreamData,具体由 MongoDBStreamData 实现,负责寄存 Bees-DBSync 从业务 MongDB 解析后的数据。
PackageThread,负责对 MongoDBStreamData 格局的数据进行工作信息的填充,并将数据打包成对立的 PkgData 数据格式,用以保障 MongoDB、MySQL 等发送格局的一致性。
PkgData,是 Bees-DBSync 外部解决后对立的数据格式。
KafkaSink 模块,负责对目标端信息进行解决,依据用户配置的不同分区发送规定,将 PkgData 格局的数据发送到不同的 Kafka 分区。
数据完整性保障
借鉴了 RingBuffer 设计思路:
定义了三个点位:
Pull:最初一次拉取 oplog 的点位
Sink:最初一次发送 Kafka 的点位
Ack:最初一次胜利 Sink 到 Kafka 的点位
三个点位的关系是 Ack <= Sink <= Pull,三个点位记录了日志采集状况,出现异常状况时能够从记录的点位复原采集,最终保障数据完整性。
采集流程
MongoDB 采集工作执行流程图如下:
以上局部是采集零碎的整顿架构和流程,接下来介绍下,对变更事件进行合并和解决的架构和流程。
MongoDB 变更事件处理平台
在这里咱们复用了 vivo 自研的事件处理平台的能力,新增了对 MongoDB 变更事件的解决,这个平台的建设目标是面向咱们的服务端开发同学,通过画布拖拽形式帮忙业务对采集到的存储变更事件进行解决和散发。平台提供了一个低提早的流式解决解决方案,反对画布的形式对采集到的 MongoDB 变更事件进行过滤(filter)、转化解决(map、udf)和输入(sink),反对 UDF 插件的形式自定义解决采集到的数据,并将解决的数据散发到不同的输入端。目前反对将 MongoDB 采集的事件通过 filter、map 后 sink 到 es,hbase,kafka,rabbitmq 以及通过 dubbo 形式告诉到不同上游。
业务事件平台的整体架构图
在评论中台的业务场景中,评论中台须要根据 MongoDB 字段产生变更的条件来进行过滤和统计,例如,须要更加字段“commentNum”进行判断,以后字段的值是否是从少变多,是否数值有减少,这就须要依据 MongoDB 的变更前的值以及变更后的值进行比拟,对合乎评论业务场景条件的文档内容进行向下传递,因而咱们须要捕捉到 MongoDB 变更前的值,以及未变更字段的值。
如下图所示,须要汇合对 Oplog 的变更事件的采集,组合成残缺的文档内容,以后文档内容放在 data 属性下,old 属性下为波及到批改的字段在变更前的值。
然而咱们发现,采集 MongoDB 的 Oplog 或者基于 change stream 都无奈齐全满足咱们的需要。
MongoDB 原生的 Oplog 不同于 binlog,Oplog 只蕴含以后变更的字段值,Oplog 短少变更前的值,以及未变更字段的值,change stream 也无奈拿到变更前的值,基于这样的问题,咱们引入 Hbase 字段多版本的个性,先预热数据到 Hbase 中,而后通过查问到的字段旧版本值,进行数据合并,形成残缺的变更事件。
MongoDB 事件数据合并流程
4. 写在最初
MongoDB 集群在评论中台我的项目中已上线运行了两年,过程中实现了约 20+ 个业务方接入,承载了百亿级评论回复数据的存储,体现较为稳固。BSON 非结构化的数据,也撑持了咱们多个版本业务的疾速降级。而热门数据内存化存储引擎,较大的进步了数据读取的效率。另外咱们针对 MongoDB 实现了异步事件采集能力,进一步扩大了 MongoDB 的利用场景。
但对于 MongoDB 来说,集群化部署是一个不可逆的过程,集群化后也带来了索引,分片策略等较多的限度。因而个别业务在应用 MongoDB 时,正本集形式就能撑持 TB 级别的存储和查问,并非肯定须要应用集群化形式。
以上内容基于 MongoDB 4.0.9 版本个性,和最新版本的 MongoDB 细节上略有差别。
对于作者:
vivo 互联网技术:百灵评论我的项目开发团队,鲁班事件平台开发团队,bees 数据采集团队
参考资料:
https://docs.mongodb.com/manu…