关于微服务:使用canalKafka进行数据库同步实践

49次阅读

共计 4001 个字符,预计需要花费 11 分钟才能阅读完成。

在微服务拆分的架构中,各服务领有本人的数据库,所以经常会遇到服务之间数据通信的问题。比方,B 服务数据库的数据来源于 A 服务的数据库;A 服务的数据有变更操作时,须要同步到 B 服务中。

第一种解决方案:

在代码逻辑中,有相干 A 服务数据写操作时,以调用接口的形式,调用 B 服务接口,B 服务再将数据写到新的数据库中。这种形式看似简略,但其实“坑”很多。在 A 服务代码逻辑中会减少大量这种调用接口同步的代码,减少了我的项目代码的复杂度,当前会越来越难保护。并且,接口调用的形式并不是一个稳固的形式,没有重试机制,没有同步地位记录,接口调用失败了怎么解决,忽然的大量接口调用会产生的问题等,这些都要思考并且在业务中解决。这里会有不少工作量。想到这里,就将这个计划排除了。

第二种解决方案:

通过数据库的 binlog 进行同步。这种解决方案,与 A 服务是独立的,不会和 A 服务有代码上的耦合。能够间接 TCP 连贯进行传输数据,优于接口调用的形式。这是一套成熟的生产解决方案,也有不少 binlog 同步的中间件工具,所以咱们关注的就是哪个工具可能更好的构建稳固、性能满足且易于高可用部署的计划。

通过调研,咱们抉择了 canal[https://github.com/alibaba/canal]。canal 是阿里巴巴 MySQL binlog 增量订阅 & 生产组件,曾经有在生产上实际的例子,并且不便的反对和其余罕用的中间件组件组合,比方 kafka,elasticsearch 等,也有了canal-go go 语言的 client 库,满足咱们在 go 上的需要,其余具体内容参阅 canal 的 github 主页。

原理简图

OK,开始干!当初要将 A 数据库的数据变更同步到 B 数据库。依据 wiki 很快就用 docker 跑起了一台 canal-server 服务,间接用 canal-gocanal-client代码逻辑。用 canal-go 间接连 canal-servercanal-servercanal-client之间是 Socket 来进行通信的,传输协定是 TCP,交互协定采纳的是 Google Protocol Buffer 3.0。

工作流程

1.Canal 连贯到 A 数据库,模仿 slave

2.canal-client 与 Canal 建设连贯,并订阅对应的数据库表

3.A 数据库产生变更写入到 binlog,Canal 向数据库发送 dump 申请,获取 binlog 并解析,发送解析后的数据给 canal-client

4.canal-client 收到数据,将数据同步到新的数据库

Protocol Buffer 的序列化速度还是很快的。反序列化后失去的数据,是每一行的数据,依照字段名和字段的值的构造,放到一个数组中 代码简略示例:

func Handler(entry protocol.Entry)  {var keys []string
    rowChange := &protocol.RowChange{}
    proto.Unmarshal(entry.GetStoreValue(), rowChange)
    if rowChange != nil {eventType := rowChange.GetEventType()
        for _, rowData := range rowChange.GetRowDatas() { // 遍历每一行数据             if eventType == protocol.EventType_DELETE || eventType == protocol.EventType_UPDATE {columns := rowData.GetBeforeColumns() // 失去更改前的所有字段属性             } else if eventType == protocol.EventType_INSERT {columns := rowData.GetAfterColumns() // 失去更后前的所有字段属性             }
            ......
        }
    }
} 

遇到的问题

为了高可用和更高的性能,咱们会创立多个 canal-client 形成一个集群,来进行解析并同步到新的数据库。这里就呈现了一个比拟重要的问题,如何保障 canal-client 集群解析生产 binlog 的程序性呢?

咱们应用的 binlog 是 row 模式。每一个写操作都会产生一条 binlog 日志。举个简略的例子:插入了一条 a 记录,并且立马批改 a 记录。这样会有两个音讯发送给canal-client,如果因为网络等起因,更新的音讯早于插入的音讯被解决了,还没有插入记录,更新操作的最初成果是失败的。

怎么办呢?canal 能够和音讯队列组合呀! 而且反对 kafka,rabbitmq,rocketmq 多种抉择,如此优良。咱们在音讯队列这层来实现音讯的程序性。(前面会说怎么做)

抉择 canal+kafka 计划

咱们抉择了音讯队列的业界标杆: kafka UCloud 提供了 kafka 和 rocketMQ 音讯队列产品服务,应用它们可能疾速便捷的搭建起一套音讯队列零碎。减速开发,不便运维。

上面就让咱们来一探到底:

①抉择 kafka 音讯队列产品,并申请开明

②开明实现后,在治理界面,创立 kafka 集群,依据本身需要,抉择相应的硬件配置

③一个 kafka+ZooKeeper 集群就搭建进去了,给力!

并且蕴含了节点治理、Topic 治理、Consumer Group 治理,可能十分不便的间接在控制台对配置进行批改

监控视图方面,监控的数据包含 kafka 生成和生产 QPS,集群监控,ZooKeeper 的监控。可能比较完善的提供监控指标。

canal 的 kafka 配置

canal 配上 kafka 也十分的简略。vi /usr/local/canal/conf/canal.properties

# ...
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 127.0.0.1:9002
canal.mq.retries = 0
# flagMessage 模式下能够调大该值, 但不要超过 MQ 音讯体大小下限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage 模式下请将该值改大, 倡议 50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal 的 batch size, 默认 50K, 因为 kafka 最大音讯体限度请勿超过 1M(900K 以下)
canal.mq.canalBatchSize = 50
# Canal get 数据的超时工夫, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为 flat json 格局对象
canal.mq.flatMessage = false
canal.mq.compressionType = none
canal.mq.acks = all
# kafka 音讯投递是否应用事务
canal.mq.transaction = false

# mq config
canal.mq.topic=default
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2..*,.*..*
canal.mq.dynamicTopic=mydatabase.mytable
canal.mq.partition=0
# hash partition config
canal.mq.partitionsNum=3
canal.mq.partitionHash=mydatabase.mytable

具体见:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

解决程序生产问题

看到上面这一行配置

canal.mq.partitionHash=mydatabase.mytable

咱们配置了 kafka 的 partitionHash,并且咱们一个 Topic 就是一个表。这样的成果就是,一个表的数据只会推到一个固定的 partition 中,而后再推给 consumer 进行生产解决,同步到新的数据库。通过这种形式,解决了之前碰到的 binlog 日志程序解决的问题。这样即便咱们部署了多个 kafka consumer 端,形成一个集群,这样 consumer 从一个 partition 生产音讯,就是生产解决同一个表的数据。这样对于一个表来说,就义掉了并行处理,不过集体感觉,凭借 kafka 的性能弱小的解决架构,咱们的业务在 kafka 这个节点产生瓶颈并不容易。并且咱们的业务目标不是实时一致性,在肯定提早下,两个数据库保障最终一致性。

下图是最终的同步架构,咱们在每一个服务节点都实现了集群化。全都跑在 UCloud 的 UK8s 服务上,保障了服务节点的高可用性。

canal 也是集群换,然而某一时刻只会有一台 canal 在解决 binlog,其余都是冗余服务。当这台 canal 服务挂了,其中一台冗余服务就会切换到工作状态。同样的,也是因为要保障 binlog 的程序读取,所以只能有一台 canal 在工作。

并且,咱们还用这套架构进行缓存生效的同步。咱们应用的缓存模式是:Cache-Aside。同样的,如果在代码中数据更改的中央进行缓存生效操作,会将代码变得复杂。所以,在上述架构的根底上,将简单的触发缓存生效的逻辑放到 kafka-client 端对立解决,达到肯定解耦的目标。


目前这套同步架构失常运行中,后续有遇到问题再持续更新。

更多内容,欢送点击下方作者主页进行交换~

本文作者:UCloud 利用研发工程师 Cary

正文完
 0