关于mongodb:mongoShake基于go实践应用

MongoShake——基于MongoDB的跨数据中心的数据复制平台-阿里云开发者社区

通过阿里云自研的MongoShake开源工具,您能够实现MongoDB数据库间的数据同步,该性能可用于数据分析、灾备和多活等业务场景。本文以云数据库MongoDB实例间的数据实时同步为例介绍配置流程。

1. 下载

git clone https://github.com/alibaba/MongoShake

2. 介绍

MongoShake是阿里云以Golang语言编写的通用平台型服务工具,它通过读取MongoDB的Oplog操作日志来复制MongoDB的数据以实现特定需要。

MongoShake还提供了日志数据的订阅和生产性能,可通过SDK、Kafka、MetaQ等形式的灵便对接,实用于日志订阅、数据中心同步、Cache异步淘汰等场景

3. 利用场景

  1. MongoDB集群间数据的异步复制,免去业务双写开销。
  2. MongoDB集群间数据的镜像备份(以后1.0开源版本反对受限)
  3. 日志离线剖析
  4. 日志订阅
  5. 数据路由。依据业务需要,联合日志订阅和过滤机制,能够获取关注的数据,达到数据路由的性能。
  6. Cache同步。日志剖析的后果,晓得哪些Cache能够被淘汰,哪些Cache能够进行预加载,反向推动Cache的更新。
  7. 基于日志的集群监控

4. 性能

MongoShake从源库抓取oplog数据,而后发送到各个不同的tunnel通道。源库反对:ReplicaSet,Sharding,Mongod,目标库反对:Mongos,Mongod。现有通道类型有:

  1. Direct:间接写入目标MongoDB
  2. RPC:通过net/rpc形式连贯
  3. TCP:通过tcp形式连贯
  4. File:通过文件形式对接
  5. Kafka:通过Kafka形式对接
  6. Mock:用于测试,不写入tunnel,摈弃所有数据

消费者能够通过对接tunnel通道获取关注的数据,例如对接Direct通道间接写入目标MongoDB,或者对接RPC进行同步数据传输等。此外,用户还能够本人创立本人的API进行灵便接入。上面2张图给出了根本的架构和数据流。

5. 配置与应用

首先返回https://github.com/alibaba/MongoShake下载mongoshake,如下两种形式都能够

go get github.com/alibaba/MongoShake
git clone https://github.com/alibaba/MongoShake

在我的项目目录中次要关注如下两个目录

github.com/mongoShake/cmd
 ├── collector
 │   ├── collector.go
 │   └── sanitize.go
 └── receiver
     └── receiver.go

其中collector就是启动mongoShake监听mongodb的入口,而receiver则是用于接管collector监听mongo中的oplogs日志变动的数据处理程序实践上须要依据语言的不同而需进行独立开发,因为mongoShake整体的实现采纳的是go故此receiver自身也是基于go实现的。

如下为配置文件目录地址

github.com/mongoShake/conf
     ├── collector.conf
     └── receiver.conf

其中与启动mongoShake实现监听的次要配置是collector.conf,如下是我抉择整个配置中较为罕用的几项配置信息

# 源MongoDB连贯串信息,逗号分隔同一个正本集内的结点,分号分隔分片sharding实例,免密模式
mongo_urls = mongodb://192.168.145.10:27017

# tunnel pipeline type. now we support rpc,file,kafka,mock,direct
# 通道模式。
tunnel = rpc

# tunnel.address = mongodb://127.0.0.1:20080
tunnel.address = 127.0.0.1:1234


# raw是默认的类型,其采纳聚合的模式进行写入和
# 读取,然而因为携带了一些管制信息,所以须要专门用receiver进行解析。
# json以json的格局写入kafka,便于用户间接读取。
# bson以bson二进制的格局写入kafka。
tunnel.message = raw

# 黑白名单过滤,目前不反对正则,白名单示意通过的namespace,黑名单示意过滤的namespace,
# 不能同时指定。分号宰割不同namespace,每个namespace能够是db,也能够是db.collection。
filter.namespace.black =
filter.namespace.white =

# checkpoint的具体写入的MongoDB地址,如果不配置,对于正本集和分片集群都将写入源库(db=mongoshake)
# 2.4版本当前不须要配置为源端cs的地址。
checkpoint.storage.url = mongodb://127.0.0.1:27017

其余的配置信息比拟多能够看官网文档也能够看配置正文(是中文的),第一次应用只须要关注如下三个配置信息即可。案例采纳rpc为示例。

# mongo源数据地址
mongo_urls = mongodb://192.168.145.10:27017

# tunnel pipeline type. now we support rpc,file,kafka,mock,direct
# 通道模式。采集的数据发送形式
tunnel = rpc

# 数据发送的目的地
tunnel.address = 127.0.0.1:1234

6. 启动与应用

6.1 启动

启动形式你能够抉择在下载的目录下执行 ./build.sh 将整个程序编译,需注意要装置go版本抉择1.15以上。本次案例间接采纳go启动程序,如果呈现问题也好调试。

go run cmd/collector/* -conf=/www/go/src/github.com/MongoShake/conf/collector.conf

(如果是编译的则会生成相干零碎的可执行脚本,只需执行 collector.* -conf 即可启动)

[root@localhost MongoShake]# ll ./bin/
总用量 109868
-rwxr-xr-x. 1 root root 20191856 3月  23 17:29 collector.darwin
-rwxr-xr-x. 1 root root 20468754 3月  23 17:29 collector.linux
-rwxr-xr-x. 1 root root 20406272 3月  23 17:29 collector.windows
-rwxr-xr-x. 1 root root     9534 3月  23 17:29 comparison.py
-rwxr-xr-x. 1 root root    13808 3月  23 17:29 hypervisor
-rwxr-xr-x. 1 root root     9400 3月  23 17:29 mongoshake-stat
-rwxr-xr-x. 1 root root 17005680 3月  23 17:29 receiver.darwin
-rwxr-xr-x. 1 root root 17232383 3月  23 17:29 receiver.linux
-rwxr-xr-x. 1 root root 17138688 3月  23 17:29 receiver.windows
-rwxr-xr-x. 1 root root      621 3月  23 17:29 start.sh
-rwxr-xr-x. 1 root root      373 3月  23 17:29 stop.sh

6.2 配置mongo

对mongo的次要配置,如果是单节点则须要开启mongo的oplogs项

replSet=single

而后能够新建一个用户赋予复制的权限。如下是mongo的相干权限

Read:容许用户读取指定数据库
readWrite :容许用户读写指定数据库
dbAdmin:容许用户在指定数据库中执行治理函数,如索引创立、删除,查看统计或拜访system.profile
userAdmin:容许用户向system.users汇合写入,能够找指定数据库里创立、删除和治理用户
clusterAdmin:只在admin数据库中可用,赋予用户所有分片和复制集相干函数的管理权限。
readAnyDatabase:只在admin数据库中可用,赋予用户所有数据库的读权限
readWriteAnyDatabase:只在admin数据库中可用,赋予用户所有数据库的读写权限
userAdminAnyDatabase:只在admin数据库中可用,赋予用户所有数据库的userAdmin权限
dbAdminAnyDatabase:只在admin数据库中可用,赋予用户所有数据库的dbAdmin权限。
root:只在admin数据库中可用。超级账号,超级权限

创立角色 — 当然也能够抉择不整(仅仅举荐是在测试学习的时候不整)

use admin

//创立超级管理员,操作齐他users
db.createUser(
  {
    user: "root",
    pwd: "root",
    roles: [ { role: "root", db: "admin" } ]
  }
)
db.auth("root","root")

//创立其余用户
db.createUser(
{
    user: "admin",
    pwd: "0000",
    roles: [
            { role: "readAnyDatabase", db: "admin" },
            {role:"read",db:"local"}
        ]
    }
)

db.auth("admin","0000")

6.3 应用

实际上官网曾经提供了相干较好的用例即为receiver咱们能够间接应用。

剖析 官网receiver的实现

首先在入口 cmd/receiver/receiver.go 中,startup启动办法会通过tunnel工厂依据在 conf/receiver.conf配置文件中配置的tunnel形式创立相应的接收器。

// this is the main connector function
// MongoShake/cmd/receiver/receiver.go
func startup() {
    factory := tunnel.ReaderFactory{Name: conf.Options.Tunnel}
    reader := factory.Create(conf.Options.TunnelAddress)
    if reader == nil {
        return
    }

    /*
     * create re-players, the number of re-players number is equal to the
     * collector worker number to fulfill load balance. The tunnel that message
     * sent to is determined in the collector side: `TMessage.Shard`.
     */
    repList := make([]tunnel.Replayer, conf.Options.ReplayerNum)
    for i := range repList {
        repList[i] = replayer.NewExampleReplayer(i)
    }

    LOG.Info("receiver is starting...")
    if err := reader.Link(repList); err != nil {
        LOG.Critical("Replayer link to tunnel error %v", err)
        return
    }
}

// MongoShake/tunnel/tunnel.go
func (factory *ReaderFactory) Create(address string) Reader {
    switch factory.Name {
    case utils.VarTunnelKafka:
        return &KafkaReader{address: address}
    case utils.VarTunnelTcp:
        return &TCPReader{listenAddress: address}
    case utils.VarTunnelRpc:
        return &RPCReader{address: address}
    case utils.VarTunnelMock:
        return &MockReader{}
    case utils.VarTunnelFile:
        return &FileReader{File: address}
    case utils.VarTunnelDirect:
        LOG.Critical("direct mode not supported in reader")
        return nil
    default:
        LOG.Critical("Specific tunnel not found [%s]", factory.Name)
        return nil
    }
}

而后在startup中为接收器增加理论对mongo采集监听数据的处理器Replayer对象

/*
 * create re-players, the number of re-players number is equal to the
 * collector worker number to fulfill load balance. The tunnel that message
 * sent to is determined in the collector side: `TMessage.Shard`.
 */
repList := make([]tunnel.Replayer, conf.Options.ReplayerNum)
for i := range repList {
  repList[i] = replayer.NewExampleReplayer(i)
}

在MongoShake/receiver/replayer.go中ExampleReplayer对象的办法次要为如下

func (er *ExampleReplayer) Sync(message *tunnel.TMessage, completion func()) int64
func (er *ExampleReplayer) GetAcked() int64
func (er *ExampleReplayer) handler()

其中Sync是接管音讯的入口负责数据的验证解析等相干工作,handler就是咱们最终解决数据的外围代码

func (er *ExampleReplayer) handler() {
    for msg := range er.pendingQueue {
        count := uint64(len(msg.message.RawLogs))
        if count == 0 {
            // probe request
            continue
        }

        // parse batched message
        oplogs := make([]oplog.ParsedLog, len(msg.message.RawLogs))
        for i, raw := range msg.message.RawLogs {
            oplogs[i] = oplog.ParsedLog{}
            if err := bson.Unmarshal(raw, &oplogs[i]); err != nil {
                // impossible switch, need panic and exit
                LOG.Crashf("unmarshal oplog[%v] failed[%v]", raw, err)
                return
            }
            LOG.Info(oplogs[i]) // just print for test, users can modify to fulfill different needs
            // fmt.Println(oplogs[i])
        }

        if callback := msg.completion; callback != nil {
            callback() // exec callback
        }

        // get the newest timestamp
        n := len(oplogs)
        lastTs := utils.TimeStampToInt64(oplogs[n-1].Timestamp)
        er.Ack = lastTs

        LOG.Debug("handle ack[%v]", er.Ack)

        // add logical code below
    }
}

而咱们的业务代码只须要减少到 // add logical code below 之后即可,数据对象是 []oplog.ParsedLog

应用

能够利用go启动即可,你能够在 // add logical code below 减少对于 oplogs 后果的打印输出

go run cmd/receiver/* -conf=/www/go/src/github.com/MongoShake/conf/receiver.conf

最佳实现(只针对数据CURD操作)

在基于下面代码的状况下,定义好解决的对象与接口以及路由

import "github.com/alibaba/MongoShake/v2/oplog"

type Handler interface {
    Namespace() string
    Insert(oplogs *oplog.ParsedLog) (err error)
    Update(oplogs *oplog.ParsedLog) (err error)
    Delete(oplogs *oplog.ParsedLog) (err error)
}

type Routes map[string]Handler

var routes = make(Routes)

func RegistryRouter(handler Handler) {
    routes[handler.Namespace()] = handler
}

批改MongoShake/receiver/replayer.go中ExampleReplayer.handler办法

func (r *Replayer) handler() {
    for msg := range r.pendingQueue {
        count := uint64(len(msg.message.RawLogs))
        if count == 0 {
            continue
        }

        oplogs, err := r.parseMsg(msg)
        if err != nil {
            log.Error("parseMsg msg err ", zap.Error(err))
            return
        }

        if callback := msg.completion; callback != nil {
            callback()
        }

        // get the newest timestamp
        n := len(oplogs)
        lastTs := utils.TimeStampToInt64(oplogs[n-1].Timestamp)
        r.Ack = lastTs

        // add logical code below
        for _, oplog := range oplogs {
            handler, ok := routes[oplog.Namespace]
            if ok {
                var err error
                op := Op(oplog.Operation)
                switch op {
                case OpI:
                    err = handler.Insert(oplog)
                case OpU:
                    err = handler.Update(oplog)
                case OpD:
                    err = handler.Delete(oplog)
                }

                if err != nil {
                    // 同步失败解决
                    log.Errorf("oplog sync err mongo db = %s, option = %s, query = %s, err = %s", oplog.Namespace, op.String(), oplog.Query, err.Error())
                }
            } else {
                log.Warn("oplog hander not found", zap.String("namespace", oplog.Namespace))
            }
        }
    }
}

在业务端只须要实现Handler接口并进行注册即可,留神注册的Namespace是mongo中的 collection.db 的名称格局

示例代码

https://gitee.com/dn-jinmin/mongoShake/tree/master/tunnelx

import (
    "fmt"
    "gitee.com/dn-jinmin/mongoShake/tunnelx"
)


func loadHandler() []tunnelx.Handler {
    return []tunnelx.Handler{
        test.NewUser(),
    }
}

func Run() {
    handlers := loadHandler()

    for _, handler := range handlers {
        tunnelx.RegistryRouter(handler)
    }

    tunnelx.Startup(&tunnelx.Configuration{
        Tunnel:        configs.MongoShake.Tunnel,
        TunnelAddress: configs.MongoShake.TunnelAddress,
        ReplayerNum:   configs.MongoShake.Replayer,
    })

    select {}
}


type User struct {
}

func NewUser() tunnelx.Handler {
    return &Manager{
    }
}

func (m *User) Namespace() string {
    return "test.user"
}

func (m *User) Insert(oplogs *oplog.ParsedLog) (err error) {
    fmt.Println("参数:", oplogs.Object)
    return nil
}

func (m *User) Update(oplogs *oplog.ParsedLog) (err error) {
    fmt.Println("参数:", oplogs.Object)
    fmt.Println("条件:", oplogs.query)
    return nil
}
func (m *User) Delete(oplogs *oplog.ParsedLog) (err error) {
    fmt.Println("条件:", oplogs.Object)
    return nil
}

【腾讯云】轻量 2核2G4M,首年65元

阿里云限时活动-云数据库 RDS MySQL  1核2G配置 1.88/月 速抢

本文由乐趣区整理发布,转载请注明出处,谢谢。

您可能还喜欢...

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据