乐趣区

关于mongodb:mongoShake基于go实践应用

MongoShake——基于 MongoDB 的跨数据中心的数据复制平台 - 阿里云开发者社区

通过阿里云自研的 MongoShake 开源工具,您能够实现 MongoDB 数据库间的数据同步,该性能可用于数据分析、灾备和多活等业务场景。本文以云数据库 MongoDB 实例间的数据实时同步为例介绍配置流程。

1. 下载

git clone https://github.com/alibaba/MongoShake

2. 介绍

MongoShake 是阿里云以 Golang 语言编写的通用平台型服务工具,它通过读取 MongoDB 的 Oplog 操作日志来复制 MongoDB 的数据以实现特定需要。

MongoShake 还提供了日志数据的订阅和生产性能,可通过 SDK、Kafka、MetaQ 等形式的灵便对接,实用于日志订阅、数据中心同步、Cache 异步淘汰等场景

3. 利用场景

  1. MongoDB 集群间数据的异步复制,免去业务双写开销。
  2. MongoDB 集群间数据的镜像备份(以后 1.0 开源版本反对受限)
  3. 日志离线剖析
  4. 日志订阅
  5. 数据路由。依据业务需要,联合日志订阅和过滤机制,能够获取关注的数据,达到数据路由的性能。
  6. Cache 同步。日志剖析的后果,晓得哪些 Cache 能够被淘汰,哪些 Cache 能够进行预加载,反向推动 Cache 的更新。
  7. 基于日志的集群监控

4. 性能

MongoShake 从源库抓取 oplog 数据,而后发送到各个不同的 tunnel 通道。源库反对:ReplicaSet,Sharding,Mongod,目标库反对:Mongos,Mongod。现有通道类型有:

  1. Direct:间接写入目标 MongoDB
  2. RPC:通过 net/rpc 形式连贯
  3. TCP:通过 tcp 形式连贯
  4. File:通过文件形式对接
  5. Kafka:通过 Kafka 形式对接
  6. Mock:用于测试,不写入 tunnel,摈弃所有数据

消费者能够通过对接 tunnel 通道获取关注的数据,例如对接 Direct 通道间接写入目标 MongoDB,或者对接 RPC 进行同步数据传输等。此外,用户还能够本人创立本人的 API 进行灵便接入。上面 2 张图给出了根本的架构和数据流。

5. 配置与应用

首先返回 https://github.com/alibaba/MongoShake 下载 mongoshake, 如下两种形式都能够

go get github.com/alibaba/MongoShake
git clone https://github.com/alibaba/MongoShake

在我的项目目录中次要关注如下两个目录

github.com/mongoShake/cmd
 ├── collector
 │   ├── collector.go
 │   └── sanitize.go
 └── receiver
     └── receiver.go

其中 collector 就是启动 mongoShake 监听 mongodb 的入口,而 receiver 则是用于接管 collector 监听 mongo 中的 oplogs 日志变动的数据处理程序实践上须要依据语言的不同而需进行独立开发,因为 mongoShake 整体的实现采纳的是 go 故此 receiver 自身也是基于 go 实现的。

如下为配置文件目录地址

github.com/mongoShake/conf
     ├── collector.conf
     └── receiver.conf

其中与启动 mongoShake 实现监听的次要配置是 collector.conf,如下是我抉择整个配置中较为罕用的几项配置信息

# 源 MongoDB 连贯串信息,逗号分隔同一个正本集内的结点,分号分隔分片 sharding 实例,免密模式
mongo_urls = mongodb://192.168.145.10:27017

# tunnel pipeline type. now we support rpc,file,kafka,mock,direct
# 通道模式。tunnel = rpc

# tunnel.address = mongodb://127.0.0.1:20080
tunnel.address = 127.0.0.1:1234


# raw 是默认的类型,其采纳聚合的模式进行写入和
# 读取,然而因为携带了一些管制信息,所以须要专门用 receiver 进行解析。# json 以 json 的格局写入 kafka,便于用户间接读取。# bson 以 bson 二进制的格局写入 kafka。tunnel.message = raw

# 黑白名单过滤,目前不反对正则,白名单示意通过的 namespace,黑名单示意过滤的 namespace,# 不能同时指定。分号宰割不同 namespace,每个 namespace 能够是 db,也能够是 db.collection。filter.namespace.black =
filter.namespace.white =

# checkpoint 的具体写入的 MongoDB 地址,如果不配置,对于正本集和分片集群都将写入源库 (db=mongoshake)
# 2.4 版本当前不须要配置为源端 cs 的地址。checkpoint.storage.url = mongodb://127.0.0.1:27017

其余的配置信息比拟多能够看官网文档也能够看配置正文(是中文的),第一次应用只须要关注如下三个配置信息即可。案例采纳 rpc 为示例。

# mongo 源数据地址
mongo_urls = mongodb://192.168.145.10:27017

# tunnel pipeline type. now we support rpc,file,kafka,mock,direct
# 通道模式。采集的数据发送形式
tunnel = rpc

# 数据发送的目的地
tunnel.address = 127.0.0.1:1234

6. 启动与应用

6.1 启动

启动形式你能够抉择在下载的目录下执行 ./build.sh 将整个程序编译,需注意要装置 go 版本抉择 1.15 以上。本次案例间接采纳 go 启动程序,如果呈现问题也好调试。

go run cmd/collector/* -conf=/www/go/src/github.com/MongoShake/conf/collector.conf

(如果是编译的则会生成相干零碎的可执行脚本,只需执行 collector.* -conf 即可启动)

[root@localhost MongoShake]# ll ./bin/
总用量 109868
-rwxr-xr-x. 1 root root 20191856 3 月  23 17:29 collector.darwin
-rwxr-xr-x. 1 root root 20468754 3 月  23 17:29 collector.linux
-rwxr-xr-x. 1 root root 20406272 3 月  23 17:29 collector.windows
-rwxr-xr-x. 1 root root     9534 3 月  23 17:29 comparison.py
-rwxr-xr-x. 1 root root    13808 3 月  23 17:29 hypervisor
-rwxr-xr-x. 1 root root     9400 3 月  23 17:29 mongoshake-stat
-rwxr-xr-x. 1 root root 17005680 3 月  23 17:29 receiver.darwin
-rwxr-xr-x. 1 root root 17232383 3 月  23 17:29 receiver.linux
-rwxr-xr-x. 1 root root 17138688 3 月  23 17:29 receiver.windows
-rwxr-xr-x. 1 root root      621 3 月  23 17:29 start.sh
-rwxr-xr-x. 1 root root      373 3 月  23 17:29 stop.sh

6.2 配置 mongo

对 mongo 的次要配置,如果是单节点则须要开启 mongo 的 oplogs 项

replSet=single

而后能够新建一个用户赋予复制的权限。如下是 mongo 的相干权限

Read:容许用户读取指定数据库
readWrite:容许用户读写指定数据库
dbAdmin:容许用户在指定数据库中执行治理函数,如索引创立、删除,查看统计或拜访 system.profile
userAdmin:容许用户向 system.users 汇合写入,能够找指定数据库里创立、删除和治理用户
clusterAdmin:只在 admin 数据库中可用,赋予用户所有分片和复制集相干函数的管理权限。readAnyDatabase:只在 admin 数据库中可用,赋予用户所有数据库的读权限
readWriteAnyDatabase:只在 admin 数据库中可用,赋予用户所有数据库的读写权限
userAdminAnyDatabase:只在 admin 数据库中可用,赋予用户所有数据库的 userAdmin 权限
dbAdminAnyDatabase:只在 admin 数据库中可用,赋予用户所有数据库的 dbAdmin 权限。root:只在 admin 数据库中可用。超级账号,超级权限 

创立角色 — 当然也能够抉择不整(仅仅举荐是在测试学习的时候不整)

use admin

// 创立超级管理员,操作齐他 users
db.createUser(
  {
    user: "root",
    pwd: "root",
    roles: [{ role: "root", db: "admin"} ]
  }
)
db.auth("root","root")

// 创立其余用户
db.createUser(
{
    user: "admin",
    pwd: "0000",
    roles: [{ role: "readAnyDatabase", db: "admin"},
            {role:"read",db:"local"}
        ]
    }
)

db.auth("admin","0000")

6.3 应用

实际上官网曾经提供了相干较好的用例即为 receiver 咱们能够间接应用。

剖析 官网 receiver 的实现

首先在入口 cmd/receiver/receiver.go 中,startup 启动办法会通过 tunnel 工厂依据在 conf/receiver.conf 配置文件中配置的 tunnel 形式创立相应的接收器。

// this is the main connector function
// MongoShake/cmd/receiver/receiver.go
func startup() {factory := tunnel.ReaderFactory{Name: conf.Options.Tunnel}
    reader := factory.Create(conf.Options.TunnelAddress)
    if reader == nil {return}

    /*
     * create re-players, the number of re-players number is equal to the
     * collector worker number to fulfill load balance. The tunnel that message
     * sent to is determined in the collector side: `TMessage.Shard`.
     */
    repList := make([]tunnel.Replayer, conf.Options.ReplayerNum)
    for i := range repList {repList[i] = replayer.NewExampleReplayer(i)
    }

    LOG.Info("receiver is starting...")
    if err := reader.Link(repList); err != nil {LOG.Critical("Replayer link to tunnel error %v", err)
        return
    }
}

// MongoShake/tunnel/tunnel.go
func (factory *ReaderFactory) Create(address string) Reader {
    switch factory.Name {
    case utils.VarTunnelKafka:
        return &KafkaReader{address: address}
    case utils.VarTunnelTcp:
        return &TCPReader{listenAddress: address}
    case utils.VarTunnelRpc:
        return &RPCReader{address: address}
    case utils.VarTunnelMock:
        return &MockReader{}
    case utils.VarTunnelFile:
        return &FileReader{File: address}
    case utils.VarTunnelDirect:
        LOG.Critical("direct mode not supported in reader")
        return nil
    default:
        LOG.Critical("Specific tunnel not found [%s]", factory.Name)
        return nil
    }
}

而后在 startup 中为接收器增加理论对 mongo 采集监听数据的处理器 Replayer 对象

/*
 * create re-players, the number of re-players number is equal to the
 * collector worker number to fulfill load balance. The tunnel that message
 * sent to is determined in the collector side: `TMessage.Shard`.
 */
repList := make([]tunnel.Replayer, conf.Options.ReplayerNum)
for i := range repList {repList[i] = replayer.NewExampleReplayer(i)
}

在 MongoShake/receiver/replayer.go 中 ExampleReplayer 对象的办法次要为如下

func (er *ExampleReplayer) Sync(message *tunnel.TMessage, completion func()) int64
func (er *ExampleReplayer) GetAcked() int64
func (er *ExampleReplayer) handler()

其中 Sync 是接管音讯的入口负责数据的验证解析等相干工作,handler 就是咱们最终解决数据的外围代码

func (er *ExampleReplayer) handler() {
    for msg := range er.pendingQueue {count := uint64(len(msg.message.RawLogs))
        if count == 0 {
            // probe request
            continue
        }

        // parse batched message
        oplogs := make([]oplog.ParsedLog, len(msg.message.RawLogs))
        for i, raw := range msg.message.RawLogs {oplogs[i] = oplog.ParsedLog{}
            if err := bson.Unmarshal(raw, &oplogs[i]); err != nil {
                // impossible switch, need panic and exit
                LOG.Crashf("unmarshal oplog[%v] failed[%v]", raw, err)
                return
            }
            LOG.Info(oplogs[i]) // just print for test, users can modify to fulfill different needs
            // fmt.Println(oplogs[i])
        }

        if callback := msg.completion; callback != nil {callback() // exec callback
        }

        // get the newest timestamp
        n := len(oplogs)
        lastTs := utils.TimeStampToInt64(oplogs[n-1].Timestamp)
        er.Ack = lastTs

        LOG.Debug("handle ack[%v]", er.Ack)

        // add logical code below
    }
}

而咱们的业务代码只须要减少到 // add logical code below 之后即可,数据对象是 []oplog.ParsedLog

应用

能够利用 go 启动即可,你能够在 // add logical code below 减少对于 oplogs 后果的打印输出

go run cmd/receiver/* -conf=/www/go/src/github.com/MongoShake/conf/receiver.conf

最佳实现(只针对数据 CURD 操作)

在基于下面代码的状况下,定义好解决的对象与接口以及路由

import "github.com/alibaba/MongoShake/v2/oplog"

type Handler interface {Namespace() string
    Insert(oplogs *oplog.ParsedLog) (err error)
    Update(oplogs *oplog.ParsedLog) (err error)
    Delete(oplogs *oplog.ParsedLog) (err error)
}

type Routes map[string]Handler

var routes = make(Routes)

func RegistryRouter(handler Handler) {routes[handler.Namespace()] = handler
}

批改 MongoShake/receiver/replayer.go 中 ExampleReplayer.handler 办法

func (r *Replayer) handler() {
    for msg := range r.pendingQueue {count := uint64(len(msg.message.RawLogs))
        if count == 0 {continue}

        oplogs, err := r.parseMsg(msg)
        if err != nil {log.Error("parseMsg msg err", zap.Error(err))
            return
        }

        if callback := msg.completion; callback != nil {callback()
        }

        // get the newest timestamp
        n := len(oplogs)
        lastTs := utils.TimeStampToInt64(oplogs[n-1].Timestamp)
        r.Ack = lastTs

        // add logical code below
        for _, oplog := range oplogs {handler, ok := routes[oplog.Namespace]
            if ok {
                var err error
                op := Op(oplog.Operation)
                switch op {
                case OpI:
                    err = handler.Insert(oplog)
                case OpU:
                    err = handler.Update(oplog)
                case OpD:
                    err = handler.Delete(oplog)
                }

                if err != nil {
                    // 同步失败解决
                    log.Errorf("oplog sync err mongo db = %s, option = %s, query = %s, err = %s", oplog.Namespace, op.String(), oplog.Query, err.Error())
                }
            } else {log.Warn("oplog hander not found", zap.String("namespace", oplog.Namespace))
            }
        }
    }
}

在业务端只须要实现 Handler 接口并进行注册即可,留神注册的 Namespace 是 mongo 中的 collection.db 的名称格局

示例代码

https://gitee.com/dn-jinmin/mongoShake/tree/master/tunnelx

import (
    "fmt"
    "gitee.com/dn-jinmin/mongoShake/tunnelx"
)


func loadHandler() []tunnelx.Handler {return []tunnelx.Handler{test.NewUser(),
    }
}

func Run() {handlers := loadHandler()

    for _, handler := range handlers {tunnelx.RegistryRouter(handler)
    }

    tunnelx.Startup(&tunnelx.Configuration{
        Tunnel:        configs.MongoShake.Tunnel,
        TunnelAddress: configs.MongoShake.TunnelAddress,
        ReplayerNum:   configs.MongoShake.Replayer,
    })

    select {}}


type User struct {
}

func NewUser() tunnelx.Handler {return &Manager{}
}

func (m *User) Namespace() string {return "test.user"}

func (m *User) Insert(oplogs *oplog.ParsedLog) (err error) {fmt.Println("参数:", oplogs.Object)
    return nil
}

func (m *User) Update(oplogs *oplog.ParsedLog) (err error) {fmt.Println("参数:", oplogs.Object)
    fmt.Println("条件:", oplogs.query)
    return nil
}
func (m *User) Delete(oplogs *oplog.ParsedLog) (err error) {fmt.Println("条件:", oplogs.Object)
    return nil
}
退出移动版