乐趣区

关于存储:利用-ChangeStream-实现-Amazon-DocumentDB-表级别容灾复制

前言
与 MongoDB 兼容的 Amazon DocumentDB,应用齐全托管式文档数据库服务轻松扩大 JSON 工作负载,通过独立扩大计算和存储,反对每秒数以百万计文档的读取申请;自动化硬件预置、修补、设置和其余数据库治理工作;通过主动复制、间断备份和严格的网络隔离实现 99.999999999% 的持久性;将现有 MongoDB 驱动程序和工具与 Apache 2.0 开源 MongoDB 3.6 和 4.0 API 搭配应用。鉴于上述性能劣势,越来越多的企业曾经或行将应用 DocumentDB 来治理 JSON 文档数据库。

亚马逊云科技开发者社区为开发者们提供寰球的开发技术资源。这里有技术文档、开发案例、技术专栏、培训视频、流动与比赛等。帮忙中国开发者对接世界最前沿技术,观点,和我的项目,并将中国优良开发者或技术举荐给寰球云社区。如果你还没有关注 / 珍藏,看到这里请肯定不要匆匆划过,点这里让它成为你的技术宝库!

对很多行业而言,须要保证数据与业务的持续性,存在要害业务与数据的容灾诉求。亚马逊云科技于 2021 年 6 月推出了面向 Amazon DocumentDB(兼容 MongoDB)的全局集群(Global Cluster)。全局集群是一项新性能,可在产生区域范畴的中断时提供劫难复原,同时通过容许从最近的 Amazon DocumentDB 集群读取来实现低提早全局读取。客户能够将业务产生 Region 内的 DocumentDB 通过该性能同步至其余 Region,轻松实现数据层的跨区域容灾。但因为 Global Cluster 全局集群性能是基于存储的疾速复制,所以很遗憾,截止本文发稿时,DocumentDB Global Cluster 全局集群仅反对实例级别的数据同步与复制,暂不反对 Database 或者 Collection 级别的数据容灾。

亚马逊云科技还有另一款数据库产品 Amazon Data Migration Server(DMS),能够实现 Database 或者 Collection 级别的数据同步,以低提早与较低的 RPO 指标实现数据的跨区域同步与复制,以实现容灾的需要。但在面对容灾场景中的数据保护诉求,DMS 暂不反对对删除类型的操作进行过滤。

在本文中,咱们将向您介绍应用 Amazon Managed Streaming for Apache Kafka(MSK)作为消息中间件暂存 DocumentDB 的扭转流事件 Change Stream Events,来实现跨 Region 的数据库同步,并拦挡删除类型的操作的整体解决方案。本例中,咱们采纳 us-east-1 弗吉尼亚北部区域作为主区域 Primary Region,已有 DocumentDB 主实例,us-west- 2 俄勒冈区域作为灾备区域 DR Region,已有 DocumentDB 灾备实例,应用了 python 作为编程语言,除 python 外您还能够应用其余支流编程语言譬如 Java,Node.JS 实现业务逻辑,但因为驱动起因,暂不反对 Ruby;另外请应用 Amazon DocumentDB v4.0 以上版本。参考架构图如下图所示:

主 region 的 stream-capture 主机环境设置
1. 在主 region 的 stream-capture 主机上设置 OS 参数环境 Code 局部:

设置环境变量,请替换红色的文字局部为您理论的值,本文中默认采纳 bar.foo 为扭转流监控 collection,您能够替换为您自定义的其余 DB 与 collection

## 设置环境变量,请替换红色的文字局部为您理论的值,本文中默认采纳 bar.foo 为扭转流监控 collection,您能够替换为您自定义的其余 DB 与 collection
echo -e "USERNAME="Your Primary MongoDB User"\nexport USERNAME\nPASSWORD="Your Primary MongoDB password"\nexport PASSWORD\nmongo_host="Primary MongoDB Cluster URI"\nexport mongo_host\nstate_tbl="YOUR STATE COLLECTION"\nexport state_tbl\nstate_db="YOUR STATE DB"\nexport state_db\nwatched_db_name="bar"\nexport watched_db_name\nwatched_tbl_name="foo"\nexport watched_tbl_name\nevents_remain=1\nexport events_remain\nDocuments_per_run=100000\nexport Documents_per_run\nkfk_host="YOUR KFK URI\nexport kfk_host\nkfk_topic="changevents"\nexport kfk_topic\nbucket_name="YOUR S3 BUCKET"\nexport bucket_name\nS3_prefix=""\nexport S3_prefix"" >> .bash_profile
## 利用环境变量
source .bash_profile
  1. 在主 region 的 stream-capture 主机上安装 pymongo 与 boto3 请参考如何在 Amazon Linux 2 上应用 Boto 3 库创立 Python 3 虚拟环境

实现 python3 与 boto3 的装置与配置,本文不再复述

## 装置 pymongo
sudo pip install pymongo
  1. 在主 region 的 stream-capture 主机上安装 MongoDB 客户端与证书
## 下载 SSL 证书到 /tmp 下
wget -P /tmp https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem

## 配置 MongoDB 的 YUM REPO
sudo echo -e "[mongodb-org-5.0]\nname=MongoDB Repository\nbaseurl=https://repo.mongodb.org/yum/amazon/2/mongodb-org/5.0/x86_64/\ngpgcheck=1\nenabled=\ngpgkey=https://www.mongodb.org/static/pgp/server-5.0.asc" >> /etc/yum.repos.d/mongodb-org-5.0.repo
## 装置 M ongoDB 客户端
sudo yum install -y mongodb-org-shell

创立 MSK 的 Topic 用以承受扭转流事件
请参照本文档【开始应用 MSK 第 3 步:创立主题】来创立 MSK 的 topic,本文不再复述。请将步骤 12 中的–topic MSKTutorialTopic 替换–topic changevents 之后,执行第步骤 12

咱们将能够看到如下音讯:

Created topic changevents.

启用 Amazon DocumentDB 扭转流
1. 应用 mongosh 客户端登陆主 DocumentDB 集群

Mongo --host $mongo_host:27017 --ssl --sslCAFile
/tmp/rds-combined-ca-bundle.pem --username $USERNAME --password $PASSWORD

2. 对 bar.foo 启用扭转流

db.adminCommand({modifyChangeStreams: 1,database: "bar",collection: "foo", enable: true});

3. 确认胜利
{"ok" : 1}

主 region 的扭转流捕捉程序

#!/bin/env python

import json
import logging
import os
import time
import boto3
import datetime
from pymongo import MongoClient
from pymongo.errors import OperationFailure
from kafka import KafkaProducer

db_client = None
kafka_client = None                                           
s3_client = None        
                                 
logging.basicConfig(Level=logging.ERROR)

# The error code returned when data for the requested resume token has been deleted
err_code_136 = 136


def get_db_client():

    # Use a global variable if CX has interest in Lambda function instead of long-running python
    global db_client

    if db_client is None:
        logging.debug('Creating a new DB client.')

        try:

            username = os.environ[‘USERNAME’]
            password = os.environ[‘PASSWORD’]
            cluster_uri = os.environ['mongo_host'] 
            db_client = MongoClient(cluster_uri, ssl=True, retryWrites=False, ssl_ca_certs='/tmp/rds-combined-ca-bundle.pem')
            # Make an attemp for connecting
            db_client.admin.command('ismaster')
            db_client["admin"].authenticate(name=username, password=password)
            logging.debug('Successfully created a new DB client.')
        except Exception as err:
            logging.error('Failed to create a new DB client: {}'.format(err))
            raise

    return db_client


def get_state_tbl_client():

    """Return a DocumentDB client for the collection in which we store processing state."""

        try:

            db_client = get_db_client()
            state_db_name = os.environ['state_db']
            state_tbl_name = os.environ['state_tbl']
            state_tbl = db_client[state_db_name][state_tbl_name]
        except Exception as err:
            logging.error('Failed to create new state collection client: {}'.format(err))
            raise

    return state_tbl


def get_last_position():

            last_position = None
            logging.debug('Locate the last position.’)
        try:

            state_tbl = get_state_tbl_client()
            if "watched_tbl_name" in os.environ:
            position_point = state_tbl.find_one({'currentposition': True, 'watched_db': str(os.environ['watched_db_name']), 
                'watched_tbl': str(os.environ['watched_tbl_name']), 'db_level': False})
            else:
            position_point = state_tbl.find_one({'currentposition': True, 'db_level': True, 
                'watched_db': str(os.environ['watched_db_name'])})
           
            if position_point is not None:
            if 'lastProcessed' in position_point: 
                last_position = position_point['lastProcessed']
            else:
            if "watched_tbl_name" in os.environ:
                state_tbl.insert_one({'watched_db': str(os.environ['watched_db_name']),
                    'watched_tbl': str(os.environ['watched_tbl_name']), 'currentposition': True, 'db_level': False})
            else:
                state_tbl.insert_one({'watched_db': str(os.environ['watched_db_name']), 'currentposition': True, 
                    'db_level': True})

        except Exception as err:
            logging.error('Failed to locate the last processed id: {}'.format(err))
            raise

    return last_position


def save_last_position(resume_token):

            """Save the resume token by the last successfully processed change event."""

            logging.debug('Saving last processed id.')
        try:

            state_tbl = get_state_tbl_client()
            if "watched_tbl_name" in os.environ:
            state_tbl.update_one({'watched_db': str(os.environ['watched_db_name']), 
                'watched_tbl': str(os.environ['watched_tbl_name'])},{'$set': {'lastProcessed': resume_token}})
            else:
            state_tbl.update_one({'watched_db': str(os.environ['watched_db_name']), 'db_level': True, },
                {'$set': {'lastProcessed': resume_token}})

        except Exception as err:
            logging.error('Failed to save last processed id: {}'.format(err))
            raise


def conn_kfk_producer():

            # Use a global variable if CX has interest in Lambda function instead of long-running python
            global kafka_client
    
            if kafka_client is None:
            logging.debug('Creating a new Kafka client.')

        try:

            kafka_client = KafkaProducer(bootstrap_servers=os.environ['kfk_host'])
        except Exception as err:
            logging.error('Failed to create a new Kafka client: {}'.format(err))
            raise
    
    return kafka_client


def produce_msg(producer_instance, topic_name, key, value):

    """Produce change events to MSK."""
    
        try:

            topic_name = os.environ['kfk_topic']
            producer_instance = KafkaProducer(key_serializer=lambda key: json.dumps(key).encode('utf-8’),value_serializer=lambda value: json.dumps(value).encode('utf-8’),retries=3)
            producer_instance.send(topic_name, key, value)
            producer_instance.flush()
        except Exception as err:
            logging.error('Error in publishing message: {}'.format(err))
            raise


def write_S3(event, database, collection, doc_id):

            global s3_client

            if s3_client is None:
            s3_client = boto3.resource('s3')  

        try:
            logging.debug('Publishing message to S3.') #, str(os.environ['S3_prefix'])
            if "S3_prefix" in os.environ:
            s3_client.Object(os.environ['bucket_name'], str(os.environ['S3_prefix']) + '/' + database + '/' +
                collection + '/' + datetime.datetime.now().strftime('%Y/%m/%d/') + doc_id).put(Body=event)
            else: 
            s3_client.Object(os.environ['bucket_name'], database + '/' + collection + '/' + 
                datetime.datetime.now().strftime('%Y/%m/%d/') + doc_id).put(Body=event)

        except Exception as err:
            logging.error('Error in publishing message to S3: {}'.format(err))
            raise

def main(event, context):
    """Read change events from DocumentDB and push them to MSK&S3."""
    
            events_processed = 0
            watcher = None
            kafka_client = None

        try:
        
            # Kafka client set up    
            if "kfk_host" in os.environ:
            kafka_client = conn_kfk_producer()  
            logging.debug('Kafka client set up.')    

            # DocumentDB watched collection set up
            db_client = get_db_client()
            watched_db = os.environ['watched_db_name']
            if "watched_tbl_name" in os.environ:
            watched_tbl = os.environ['watched_tbl_name']
            watcher = db_client[watched_db][watched_tbl]
            else: 
            watcher = db_client[watched_db]
            logging.debug('Watching table {}'.format(watcher))

            # DocumentDB sync set up
            state_sync_count = int(os.environ['events_remain'])
            last_position = get_last_position()
            logging.debug("last_position: {}".format(last_position))

            with watcher.watch(full_document='updateLookup', resume_after=last_position) as change_stream:
            i = 0
            state = 0

            while change_stream.alive and i < int(os.environ['Documents_per_run']):
            
                i += 1
                change_event = change_stream.try_next()
                logging.debug('Event: {}'.format(change_event))
                
                
                if change_event is None:
                        Time.sleep(0.5)
                        Continue
                else:
                    op_type = change_event['operationType']
                    op_id = change_event['_id']['_data']

                    if op_type == insert':             
                        doc_body = change_event['fullDocument']
                        doc_id = str(doc_body.pop("_id", None))
                        insert_body = doc_body
                        readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat()
                        doc_body.update({'operation':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)})
                        doc_body.update({'insert_body':json.dumps(insert_body)})
                        doc_body.update({'db':str(change_event['ns']['db']),'coll':str(change_event['ns']['coll'])})
                        payload = {'_id':doc_id}
                        payload.update(doc_body)
                        # Publish event to MSK
                            produce_msg(kafka_client, kfk_topic, op_id, payload)


                    if op_type == 'update':             
                        doc_id = str(documentKey["_id"])
                        readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat()
                        doc_body.update({'operation':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)})
                        doc_body.update({'updateDescription':json.dumps(updateDescription)})
                        doc_body.update({'db':str(change_event['ns']['db']),'coll':str(change_event['ns']['coll'])})
                        payload = {'_id':doc_id}
                        payload.update(doc_body)
                        # Publish event to MSK
                            produce_msg(kafka_client, kfk_topic, op_id, payload)


                    if op_type == 'delete':
                        doc_id = str(change_event['documentKey']['_id'])
                        readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat()
                        doc_body.update({'operation':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)})
                        doc_body.update({'db':str(change_event['ns']['db']),'coll':str(change_event['ns']['coll'])})
                        payload = {'_id':doc_id}
                        payload.update(doc_body)
                        # Append event for S3
                        if "bucket_name" in os.environ:
                            write_S3(op_id, json.dumps(payload))

                        logging.debug('Processed event ID {}'.format(op_id))

                    events_processed += 1

    except OperationFailure as of:
            if of.code == err_code_136:
            # Data for the last processed ID has been deleted in the change stream,
            # Store the last known good state so our next invocation
            # starts from the most recently available data
            save_last_position(None)
        raise

    except Exception as err:
            logging.error(‘Positionpoint lost: {}'.format(err))
        raise

    else:
        
        if events_processed > 0:

            save_last_position(change_stream.resume_token)
            logging.debug('Synced token {} to state collection'.format(change_stream.resume_token))
            return{
                'statusCode': 200,
                'description': 'Success',
                'detail': json.dumps(str(events_processed)+ 'records processed successfully.')
            }
        else:
                return{
                    'statusCode': 201,
                    'description': 'Success',
                    'detail': json.dumps('No records to process.')
                }

    finally:

        # Close Kafka client
        if "kfk_host" in os.environ:                                                 
            kafka_client.close()

容灾 region 的 stream-apply 主机环境设置
Code 局部:

## 设置环境变量,请替换红色的文字局部为您理论的值
echo -e "DR_USERNAME="Your DR MongoDB User"\nexport DR_USERNAME\nDR_PASSWORD="Your DR MongoDB Password"\nexport DR_PASSWORD\nDR_mongo_host="Your DR MongoDB cluster URI"\nexport DR_mongo_host\nkfk_host="YOUR KFK URI\nexport kfk_host\nkfk_topic="changevents"\nexport kfk_topic \nDocuments_per_run=100000\nexport Documents_per_run" >> .bash_profile
## 利用环境变量
source .bash_profile

容灾 region 的扭转流应用程序
在 stream-apply 主机上部署下列 python 代码并运行

Python Code:

#!/bin/env python

import json
import logging
import os
import string
import sys
import time
import boto3
import datetime
from pymongo import MongoClient
from kafka import KafkaConsumer
                                                
db_client = None 
kafka_client = None                                                  

"""ERROR level for deployment."""                                
logging.basicConfig(Level=logging.ERROR)

def get_db_client():
    global db_client

    if db_client is None:
            logging.debug('Creating a new DB client.')

        try:

            username = os.environ[‘DR_USERNAME’]
            password = os.environ[‘DR_PASSWORD’]
            cluster_uri = os.environ[‘DR_mongo_host'] 
            db_client = MongoClient(cluster_uri, ssl=True, retryWrites=False, ssl_ca_certs='/tmp/rds-combined-ca-bundle.pem')
            # Make an attemp for connecting
            db_client.admin.command('ismaster')
            db_client["admin"].authenticate(name=username, password=password)
            logging.debug('Successfully created a new DB client.')
        except Exception as err:
            logging.error('Failed to create a new DB client: {}'.format(err))
            raise

    return db_client

def conn_kfk_comsumer():
    global kafka_client
    
            if kafka_client is None:
            logging.debug('Creating a new Kafka client.')

        try:

            kafka_client = KafkaConsumer(bootstrap_servers=os.environ['kfk_host'])
        except Exception as err:
            logging.error('Failed to create a new Kafka client: {}'.format(err))
            raise
    
    return kafka_client

def poll_msg(consumer, topic_name, key, value):
    """Poll documentdb changes from MSK."""
    
        try:

                topic_name = os.environ['kfk_topic']
                consumer = KafkaConsumer(topic_name, bootstrap_servers= os.environ['kfk_host'], auto_offset_reset=‘latest’, group_id=‘docdb’, key_deserializer=lambda key: json.loads(key).decode('utf-8’), value_deserializer=lambda value: json.loads(value).decode('utf-8’))
                consumer.subscribe(topic_name, key, value)
                consumer.poll(max_records=1)
        except Exception as err:
                logging.error('Error in polling message: {}'.format(err))
                raise


def apply2mongodb(message,db_client)

    try:
        
                # Kafka client set up    
                if "kfk_host" in os.environ:
                kafka_client = conn_kfk_consumer()  
                logging.debug('Kafka client set up.')    

                db_client = get_db_client()

                partition = KafkaConsumer.assignment()
                next_offset = KafkaConsumer.position(partition)
            
                if next_offset is None:
                    Time.sleep(0.5)
                    Continue
                else:
                poll_msg(kafka_client, kfk_topic, op_id, payload)
                for message in consumer:
                event_body = message.value()
              op_type = json.loads(event_body[‘operation'])

                if op_type == 'insert':
                    coll = json.loads(event_body['coll'])
                    coll_client = db_client(coll)
                    insert_body = json.loads(event_body[‘insert_body'])
                    payload = {'_id':ObjectId(json.loads(event_body['_id']))}
                  payload.update(insert_body)
                    coll_client.insert_one(payload)

                if op_type == 'update':
                    coll = json.loads(event_body['coll'])
                    coll_client = db_client(coll)
                    update_body = json.loads(event_body[‘updateDescription']['updatedFields'])
                    update_set = {"$set":update_body}
                    payload = {'_id':(json.loads(event_body['_id']))}
                    coll_client.update_one(payload,update_set)

                    events_processed += 1

def main(event, context):
     events_processed = 0
    kafka_client = None

    try:

                # DocumentDB watched collection set up
                db_client = get_db_client()
                dr_db = os.environ['DR_mongo_host']
                dr_db_client = db_client(dr_db)
                while i < int(os.environ['Documents_per_run']):
                apply2mongodb(message,dr_db_client)
                i += 1

        else:

                if events_processed > 0:

                logging.debug('{} events been processed successfully'.format(events_processed))
                return{
                'statusCode': 200,
                'description': 'Success',
                'detail': json.dumps(str(events_processed)+ 'events processed successfully.')
            }
        else:
                return{
                    'statusCode': 201,
                    'description': 'Success',
                    'detail': json.dumps('No records to process.')
                }

    finally:

        # Close Kafka client
        if "kfk_host" in os.environ:                                                 
            kafka_client.close()

后果验证

  1. 别离登陆主 region 与容灾 region 的 DocumentDB
    主 region:
mongo --host $mongo_host:27017 --ssl --sslCAFile
/tmp/rds-combined-ca-bundle.pem --username $USERNAME --password $PASSWORD

容灾 region:

mongo --host $DR_mongo_host:27017 --ssl --sslCAFile
/tmp/rds-combined-ca-bundle.pem --username $USERNAME --password $PASSWORD
  1. 在主 region 插入数据
use bar;
db.foo.insertOne({"x":1}) ;
  1. 在灾备 region 察看
use bar;
db.foo.find();
## 失去后果
{"_id":ObjectId(9416e4a253875177a816b3d6),"x":1}
  1. 在主 region 更新数据
db.foo.updateOne({"x":1},
{$set:{"x":2}}
);
  1. 在灾备 region 察看
db.foo.find();
## 失去后果
{"_id":ObjectId(9416e4a253875177a816b3d6),"x":2}

5. 在主 region 非监控表 exa 插入数据 y =1

db.exa.insertOne({"y":1});

6. 在主 region 察看有哪些表,发现新减少了 exa 这张表

show tables;
exa
foo
  1. 在灾备 region 察看,并没有 exa 呈现,因为 exa 并不在咱们的 watched collection 里,不会捕获相干的扭转流
show tables;
foo
  1. 在主 region 的 foo 表删除 x 记录
db.foo.deleteOne({"x":2}) ;

## 察看失去后果,主 region DocumentDB foo 表已被清空
db.foo.find();
## 失去后果为空
  1. 在灾备 region 验证 foo 表内容
db.foo.find();
## 失去后果
{"_id":ObjectId(9416e4a253875177a816b3d6),"x":2}
## 删除操作被拦挡

10. 下载 S3 中的文件,并关上,其中内容为

{"_id":"ObjectId(9416e4a253875177a816b3d6)", "operation":"delete", "timestamp":1658233222,"timestampReadable":"2022-07-19 20:20:22", "db":"bar","coll":"foo"}
## 验证了本条 delete 命令被拦挡并保留在 S3 中。

总结
咱们在此文中,应用了 MSK 来异步保留 DocumentDB 的 insert/update 扭转流,拦挡 delete 类型的扭转流存储在 S3 中备查。如果须要进一步对删除事件做出剖析,能够引入 Amazon Glue 与 Amazon Athena 对存储于 S3 中的日志文件即席查问。MSK 中的扭转流事件,咱们将其利用在灾备区域的 DocumentDB,做到数据只增不减,防止主 region 的数据库因为意外误操作导致的数据损失或者高工夫老本数据恢复操作。

参考资源
Amazon Linux 2 上应用 Boto 3 库创立 Python 3 虚拟环境

https://aws.amazon.com/cn/premiumsupport/knowledge-center/ec2…

创立 MSK 的 Topic

https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/c…

本篇作者

付晓明 Amazon 解决方案架构师,负责云计算解决方案的征询与架构设计,同时致力于数据库,边缘计算方面的钻研和推广。在退出亚马逊云科技之前曾在金融行业 IT 部门负责互联网券商架构的设计,对分布式,高并发,中间件等具备丰盛教训。

刘冰冰 Amazon 数据库解决方案架构师,负责基于 Amazon 的数据库解决方案的征询与架构设计,同时致力于大数据方面的钻研和推广。在退出 Amazon 之前曾在 Oracle 工作多年,在数据库云布局、设计运维调优、DR 解决方案、大数据和数仓以及企业应用等方面有丰盛的教训。

文章起源:https://dev.amazoncloud.cn/column/article/630994cd86218f3ca3e…

退出移动版