概述

canal是阿里巴巴旗下的一款开源我的项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&生产,目前次要反对了MySQL(也反对mariaDB)。

背景

晚期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需要。不过晚期的数据库同步业务,次要是基于trigger的形式获取增量变更,不过从2010年开始,阿里系公司开始逐渐的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&生产的业务,从此开启了一段新纪元。ps. 目前外部应用的同步,曾经反对mysql5.x和oracle局部版本的日志解析

基于日志增量订阅&生产反对的业务:
  1. 数据库镜像
  2. 数据库实时备份
  3. 多级索引 (卖家和买家各自分库索引)
  4. search build
  5. 业务cache刷新
  6. 价格变动等重要业务音讯
以后的 canal 反对源端 MySQL 版本包含 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

工作原理

Mysql的BinLog

它记录了所有的DDL和DML(除了数据查问语句)语句,以事件模式记录,还蕴含语句所执行的耗费的工夫。次要用来备份和数据同步。

binlog 有三种模式:STATEMENT、ROW、MIXED

  1. STATEMENT 记录的是执行的sql语句
  2. ROW 记录的是实在的行数据记录
  3. MIXED 记录的是1+2,优先依照1的模式记录
举例说明
举例来说,上面的sql
COPYupdate user set age=20

对应STATEMENT模式只有一条记录,对应ROW模式则有可能有成千上万条记录(取决数据库中的记录数)。

MySQL主备复制原理

  1. Slave 下面的IO线程连贯上 Master,并申请从指定日志文件的指定地位(或者从最开始的日志)之后的日志内容;
  2. Master 接管到来自 Slave 的 IO 线程的申请后,通过负责复制的 IO 线程依据申请信息读取指定日志指定地位之后的日志信息,返回给 Slave 端的 IO 线程。返回信息中除了日志所蕴含的信息之外,还包含本次返回的信息在 Master 端的 Binary Log 文件的名称以及在 Binary Log 中的地位;
  3. Slave 的 IO 线程接管到信息后,将接管到的日志内容顺次写入到 Slave 端的Relay Log文件(mysql-relay-bin.xxxxxx)的最末端,并将读取到的Master端的bin-log的文件名和地位记录到master- info文件中,以便在下一次读取的时候可能分明的高速Master“我须要从某个bin-log的哪个地位开始往后的日志内容,请发给我”
  4. Slave 的 SQL 线程检测到 Relay Log 中新减少了内容后,会马上解析该 Log 文件中的内容成为在 Master 端实在执行时候的那些可执行的 Query 语句,并在本身执行这些 Query。这样,实际上就是在 Master 端和 Slave 端执行了同样的 Query,所以两端的数据是齐全一样的。
    当然这个过程实质上还是存在肯定的提早的。
mysql的binlog文件长这个样子。
COPYmysql-bin.003831mysql-bin.003840  mysql-bin.003849  mysql-bin.003858 
启用Binlog留神以下几点:
  1. Master主库个别会有多台Slave订阅,且Master主库要反对业务零碎实时变更操作,服务器资源会有瓶颈;
  2. 须要同步的数据表肯定要有主键;

canal可能同步数据的原理

了解了mysql的主从同步的机制再来看canal就比拟清晰了,canal次要是听过伪装成mysql从server来向主server拉取数据。

  1. canal模仿mysql slave的交互协定,假装本人为mysql slave,向mysql master发送dump协定
  2. mysql master收到dump申请,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

Canal架构

canal的设计理念

canal的组件化设计十分好,有点相似于tomcat的设计。应用组合设计,依赖倒置,面向接口的设计。

canal的组件

  1. canal server 这个代表了咱们部署的一个canal 利用
  2. canal instance 这个代表了一个canal server中的多个 mysql instance ,从这一点阐明一个canal server能够收集多个库的数据,在canal中叫 destionation。
每个canal instance 有多个组件形成。在conf/spring/default-instance.xml中配置了这些组件。他其实是应用了spring的容器来进行这些组件治理的。

instance 蕴含的组件

这里是一个cannalInstance工作所蕴含的大组件。截取自 conf/spring/default-instance.xml
COPY<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">    <property name="destination" value="${canal.instance.destination}" />    <property name="eventParser">        <ref local="eventParser" />    </property>    <property name="eventSink">        <ref local="eventSink" />    </property>    <property name="eventStore">        <ref local="eventStore" />    </property>    <property name="metaManager">        <ref local="metaManager" />    </property>    <property name="alarmHandler">        <ref local="alarmHandler" />    </property></bean>  
EventParser设计
eventParser 最根本的组件,相似于mysql从库的dump线程,负责从master中获取bin_log

整个parser过程大抵可分为几步:
  1. Connection获取上一次解析胜利的地位 (如果第一次启动,则获取初始指定的地位或者是以后数据库的binlog位点)
  2. Connection建设链接,发送BINLOG_DUMP指令
    // 0. write command number
    // 1. write 4 bytes bin-log position to start at
    // 2. write 2 bytes bin-log flags
    // 3. write 4 bytes server id of the slave
    // 4. write bin-log file name
  3. Mysql开始推送Binaly Log
  4. 接管到的Binaly Log的通过Binlog parser进行协定解析,补充一些特定信息
    // 补充字段名字,字段类型,主键信息,unsigned类型解决
  5. 传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储胜利
  6. 存储胜利后,定时记录Binaly Log地位
EventSink设计
eventSink 数据的归集,应用设置的filter对bin log进行过滤,工作的过程如下。

阐明:

数据过滤:反对通配符的过滤模式,表名,字段内容等

数据路由/散发:解决1:n (1个parser对应多个store的模式)

数据归并:解决n:1 (多个parser对应1个store)

数据加工:在进入store之前进行额定的解决,比方join

数据1:n业务

为了正当的利用数据库资源, 个别常见的业务都是依照schema进行隔离,而后在mysql下层或者dao这一层面上,进行一个数据源路由,屏蔽数据库物理地位对开发的影响,阿里系次要是通过cobar/tddl来解决数据源路由问题。

所以,个别一个数据库实例上,会部署多个schema,每个schema会有由1个或者多个业务方关注

数据n:1业务

同样,当一个业务的数据规模达到肯定的量级后,必然会波及到程度拆分和垂直拆分的问题,针对这些拆分的数据须要解决时,就须要链接多个store进行解决,生产的位点就会变成多份,而且数据生产的进度无奈失去尽可能有序的保障。

所以,在肯定业务场景下,须要将拆分后的增量数据进行归并解决,比方依照工夫戳/全局id进行排序归并.

EventStore设计
eventStore 用来存储filter过滤后的数据,canal目前的数据只在这里存储,工作流程如下
  • 目前仅实现了Memory内存模式,后续打算减少本地file存储,mixed混合模式
  • 借鉴了Disruptor的RingBuffer的实现思路

定义了3个cursor
  • Put : Sink模块进行数据存储的最初一次写入地位
  • Get : 数据订阅获取的最初一次提取地位
  • Ack : 数据生产胜利的最初一次生产地位
借鉴Disruptor的RingBuffer的实现,将RingBuffer拉直来看:

实现阐明:
  • Put/Get/Ack cursor用于递增,采纳long型存储
  • buffer的get操作,通过取余或者与操作。(与操作: cusor & (size - 1) , size须要为2的指数,效率比拟高)
metaManager
metaManager 用来存储一些原数据,比方生产到的游标,以后流动的server等信息
alarmHandler
alarmHandler 报警,这个个别状况下就是谬误日志,实践上应该是能够定制成邮件等模式,然而目前不反对

各个组件目前反对的类型

canal采纳了spring bean container的形式来组装一个canal instance ,目标是为了可能更加灵便。

canal通过这些组件的选取能够达到不同应用场景的成果,比方单机的话,个别应用file来存储metadata就行了,HA的话个别应用zookeeper来存储metadata。

eventParser
eventParser 目前只有三种
  • MysqlEventParser 用于解析mysql的日志
  • GroupEventParser 多个eventParser的汇合,实践上是对应了分表的状况,能够通过这个合并到一起
  • RdsLocalBinlogEventParser 基于rds的binlog 的复制
eventSink
eventSink 目前只有EntryEventSink 就是基于mysql的binlog数据对象的解决操作
eventStore
eventStore 目前只有一种 MemoryEventStoreWithBuffer,外部应用了一个ringbuffer 也就是说canal解析的数据都是存在内存中的,并没有到zookeeper当中。
metaManager
metaManager 这个比拟多,其实依据元数据寄存的地位能够分为三大类,memory,file,zookeeper

Canal-HA机制

canal是反对HA的,其实现机制也是依赖zookeeper来实现的,用到的个性有watcher和EPHEMERAL节点(和session生命周期绑定),与HDFS的HA相似。

canal的ha分为两局部,canal server和canal client别离有对应的ha实现

  • canal server: 为了缩小对mysql dump的申请,不同server上的instance(不同server上的雷同instance)要求同一时间只能有一个处于running,其余的处于standby状态(standby是instance的状态)。
  • canal client: 为了保障有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接管无奈保障有序。
server ha的架构图如下

大抵步骤:
  1. canal server要启动某个canal instance时都先向zookeeper_进行一次尝试启动判断_(实现:创立EPHEMERAL节点,谁创立胜利就容许谁启动)
  2. 创立zookeeper节点胜利后,对应的canal server就启动对应的canal instance,没有创立胜利的canal instance就会处于standby状态
  3. 一旦zookeeper发现canal server A创立的instance节点隐没后,立刻告诉其余的canal server再次进行步骤1的操作,从新选出一个canal server启动instance。
  4. canal client每次进行connect时,会首先向zookeeper询问以后是谁启动了canal instance,而后和其建设链接,一旦链接不可用,会从新尝试connect。

Canal Client的形式和canal server形式相似,也是利用zookeeper的抢占EPHEMERAL节点的形式进行管制.

canal的工作过程

dump日志

启动时去MySQL 进行dump操作的binlog 地位确定

工作的过程。在启动一个canal instance 的时候,首先启动一个eventParser 线程来进行数据的dump 当他去master拉取binlog的时候须要binlog的地位,这个地位的确定是依照如下的程序来确定的(这个中央讲述的是HA模式哈)。

  1. 在启动的时候判断是否应用zookeeper,如果是zookeeper,看是否拿到 cursor (也就是binlog的信息),如果可能拿到,把这个信息存到内存中(MemoryLogPositionManager),而后拿这个信息去mysql中dump binlog
  2. 通过1拿不到的话(个别是zookeeper当中每一,比方第一次搭建的时候,或者因为某些起因zk中的数据被删除了),就去配置文件配置当中的去拿,把这个信息存到内存中(MemoryLogPositionManager),而后拿这个信息去mysql中dump binlog
  3. 通过2仍然没有拿到的话,就去mysql 中执行一个sql show master status 这个语句会显示以后mysql binlog最初地位的信息,也就是刚写入的binlog所在的地位信息。把这个信息存到内存中(MemoryLogPositionManager),而后拿这个信息去mysql中dump binlog。

前面的eventParser的操作就会以内存中(MemoryLogPositionManager)存储的binlog地位去master进行dump操作了。

mysql的show master status 操作
COPYmysql> show master status\G*************************** 1. row ***************************             File: mysql-bin.000028         Position: 635762367     Binlog_Do_DB: Binlog_Ignore_DB:Executed_Gtid_Set: 18db0532-6a08-11e8-a13e-52540042a113:1-2784514,318556ef-4e47-11e6-81b6-52540097a9a8:1-30002,ac5a3780-63ad-11e8-a9ac-52540042a113:1-5,be44d87c-4f25-11e6-a0a8-525400de9ffd:1-1563497821 row in set (0.00 sec

归集(sink)和存储(store)

数据在dump回来之后进行的归集(sink)和存储(store)

sink操作是能够撑持将多个eventParser的数据进行过滤filter

filter应用的是instance.properties中配置的filter,当然这个filter也能够由canal的client端在进行subscribe的时候进行设置。如果在client端进行了设置,那么服务端配置文件instance.properties的配置都会生效

sink 之后将过滤后的数据存储到eventStore当中去。

目前eventStore的实现只有一个MemoryEventStoreWithBuffer,也就是基于内存的ringbuffer,应用这个store有一个特点,这个ringbuffer是基于内存的,大小是有限度的(bufferSize = 16 * 1024 也就是16M),所以,当canal的客户端生产比较慢的时候,ringbuffer中存满了就会阻塞sink操作,那么正读取mysql binlogeventParser线程也会碰壁。
  这种设计其实也是有情理的。 因为canal的操作是pull 模型,不是producer push的模型,所以他没必要存储太多数据,这样就能够防止了数据存储和长久化治理的一些问题。使数据管理的复杂度大大降低。

  下面这些整个是canal的parser 线程的工作流程,次要对应的就是将数据从mysql搞下来,做一些根本的归集和过滤,而后存储到内存中。

binlog的消费者

canal从mysql订阅了binlog当前次要还是想要给消费者应用。那么binlog是在什么时候被生产呢。这就是另一条主线了。就像咱们做一个toC的零碎,管理系统是必须的,用户应用的app或者web又是一套,eventParser 线程就像是管理系统,往里面录入根底数据。canal的client就像是app端一样,是这些数据的生产方。
  binlog的次要消费者就是canal的client端。应用的协定是基于tcp的google.protobuf,当然tcp的模式是io多路复用,也就是nio。当咱们的client发动申请之后,canal的server端就会从eventStore中将数据传输给客户端。依据客户端的ack机制,将binlog的元数据信息定期同步到zookeeper当中。

canal的目录构造

配置父目录:
在上面能够看到
COPYcanal├── bin│   ├── canal.pid│   ├── startup.bat│   ├── startup.sh│   └── stop.sh└── conf    ├── canal.properties    ├── gamer ---目录    ├── ww_social ---目录    ├── wother ---目录    ├── nihao ---目录    ├── liveim ---目录    ├── logback.xml    ├── spring ---目录    ├── ym ---目录    └── xrm_ppp ---目录
这里是全副开展的目录
COPYcanal├── bin│   ├── canal.pid│   ├── startup.bat│   ├── startup.sh│   └── stop.sh└── conf    ├── canal.properties    ├── game_center    │   └── instance.properties    ├── ww_social    │   ├── h2.mv.db    │   ├── h2.trace.db    │   └── instance.properties    ├── wwother    │   ├── h2.mv.db    │   └── instance.properties    ├── nihao    │   ├── h2.mv.db    │   ├── h2.trace.db    │   └── instance.properties    ├── movie    │   ├── h2.mv.db    │   └── instance.properties    ├── logback.xml    ├── spring    │   ├── default-instance.xml    │   ├── file-instance.xml    │   ├── group-instance.xml    │   ├── local-instance.xml    │   ├── memory-instance.xml    │   └── tsdb    │       ├── h2-tsdb.xml    │       ├── mysql-tsdb.xml    │       ├── sql    │       └── sql-map    └── ym        └── instance.properties

Canal利用场景

同步缓存redis/全文搜寻ES

canal一个常见利用场景是同步缓存/全文搜寻,当数据库变更后通过binlog进行缓存/ES的增量更新。当缓存/ES更新呈现问题时,应该回退binlog到过来某个地位进行从新同步,并提供全量刷新缓存/ES的办法,如下图所示。

下发工作

  另一种常见利用场景是下发工作,当数据变更时须要告诉其余依赖零碎。其原理是工作零碎监听数据库变更,而后将变更的数据写入MQ/kafka进行工作下发,比方商品数据变更后须要告诉商品详情页、列表页、搜寻页等先关零碎。这种形式能够保证数据下发的精确性,通过MQ发送音讯告诉变更缓存是无奈做到这一点的,而且业务零碎中不会散落着各种下发MQ的代码,从而实现了下发归集,如下图所示。

数据异构

在大型网站架构中,DB都会采纳分库分表来解决容量和性能问题,但分库分表之后带来的新问题。比方不同维度的查问或者聚合查问,此时就会十分辣手。个别咱们会通过数据异构机制来解决此问题。

所谓的数据异构,那就是将须要join查问的多表依照某一个维度又聚合在一个DB中。让你去查问。canal就是实现数据异构的伎俩之一。

本文由传智教育博学谷狂野架构师教研团队公布。

如果本文对您有帮忙,欢送关注点赞;如果您有任何倡议也可留言评论私信,您的反对是我保持创作的能源。

转载请注明出处!