一、前言

Canal是alibaba开源的中间件,纯java开发,是一个用来数据同步的数据管道,它将本人伪装成mysql的slaver,具备解析bin log的能力,为上游增量同步业务提供服务。Canal能够多个节点联合zookeeper组成高可用集群,Canal集群中同时只有一个active状态的节点用来解析(多线程)bin log,如果有节点退出,之前节点曾经解析实现的解析位点生产位点会同步到zookeeper,另外的节点就会顶上去持续解析bin log(从zookeeper读取解析位点和生产位点),为上游客户端提供服务。
以下将搭建一个Canal高可用集群并将解析的bin log间接投递音讯到阿里云RocketMQ,供业务消费者生产。实现mysql数据增量同步到Elasticsearch的工作。

  • 参考官网wiki

二、集群搭建资源筹备

1、mysql筹备

  • 须要筹备mysql服务标的
服务地址库名称用户明文明码
mysql.test.yiyaowang.com:3306b2cb2cd41d8cd98f00b204
mysql2.test.yiyaowang.com:3306yc_orderyc_orderd41d8cd98f00b204

阐明:存在两个数据库实例,以下实际Canal须要同时监听多个库。

  • 数据库明文明码采纳druid加密
D:\yyw_mvn_repo\repository\com\alibaba\druid\1.1.21>java -cp druid-1.1.21.jar com.alibaba.druid.filter.config.ConfigTools d41d8cd98f00b204privateKey:MIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEAmmWRWcOG/HlVwLsN4FpnaOaQVKPAdvJBU5b24EVo0UHwLf8W08nqBr+DbTgKH3idgLtK0WURps4kFlGQKtOcEQIDAQABAkAmDeyiXD/0EI/jPfdwmbetMk7Wnbm9V35kdOwKYPExyhWtYjJlrBrRXJH+cafCEov13UvFpB5PO5PnUJLnqeoVAiEAzsb5W74wj6yc8En+DBwhI9Yd/HD40orl+U8wuhvmprMCIQC/JoVs28aj2YzphvtzeGCuxKIxeFcCqE9iybhHzIH0KwIgJlGnSkIfm7CAUONVagcYeRyn5+1DnzjQT3hGbmbXQpMCIQCKP2sKk110TbirgXPFTM/oNtDzpIyRoHdiBHDihNeMZwIhAIpE+nSOCNIWfbpc/ysOfTF/0iMqdHug3eo3HrYY75htpublicKey:MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJplkVnDhvx5VcC7DeBaZ2jmkFSjwHbyQVOW9uBFaNFB8C3/FtPJ6ga/g204Ch94nYC7StFlEabOJBZRkCrTnBECAwEAAQ==password:KbOYWBV9y9OFRVcegHMdkyPtV4vBDSahBsRRxXzvLfCUQLdhVJd5Tif571EHvM0EuBwJLmr+6LkR92KxNs4heg==
  • druid加解密测试
    /**     * druid加解密测试     * @throws Exception     */    @Test    public void druidDecryptTest() throws Exception {        //私钥        String privateKey = "MIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEAmmWRWcOG/HlVwLsN4FpnaOaQVKPAdvJBU5b24EVo0UHwLf8W08nqBr+DbTgKH3idgLtK0WURps4kFlGQKtOcEQIDAQABAkAmDeyiXD/0EI/jPfdwmbetMk7Wnbm9V35kdOwKYPExyhWtYjJlrBrRXJH+cafCEov13UvFpB5PO5PnUJLnqeoVAiEAzsb5W74wj6yc8En+DBwhI9Yd/HD40orl+U8wuhvmprMCIQC/JoVs28aj2YzphvtzeGCuxKIxeFcCqE9iybhHzIH0KwIgJlGnSkIfm7CAUONVagcYeRyn5+1DnzjQT3hGbmbXQpMCIQCKP2sKk110TbirgXPFTM/oNtDzpIyRoHdiBHDihNeMZwIhAIpE+nSOCNIWfbpc/ysOfTF/0iMqdHug3eo3HrYY75ht";        //公钥        String publicKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJplkVnDhvx5VcC7DeBaZ2jmkFSjwHbyQVOW9uBFaNFB8C3/FtPJ6ga/g204Ch94nYC7StFlEabOJBZRkCrTnBECAwEAAQ==";        //密文明码        String password = "KbOYWBV9y9OFRVcegHMdkyPtV4vBDSahBsRRxXzvLfCUQLdhVJd5Tif571EHvM0EuBwJLmr+6LkR92KxNs4heg==";        //druid解密        log.info("ConfigTools.decrypt:{}", ConfigTools.decrypt(publicKey, password));        //druid加密        log.info("ConfigTools.encrypt:{}", ConfigTools.encrypt(privateKey, "d41d8cd98f00b204"));    }

mysql服务必须开启bin log反对

  • 批改mysql配置文件my.cnf
[mysqld]  #开启bin log  log-bin=mysql-bin #抉择row模式  binlog-format=ROW #配置mysql replaction须要定义,不能和canal的slaveId反复  server_id=1 

阐明:Canal的原理是基于mysql binlog技术,所以这里肯定须要开启mysql的binlog写入性能,并且配置binlog模式为row。

  • 验证mysql配置文件my.cnf
mysql> show variables like 'binlog_format';+---------------+-------+| Variable_name | Value |+---------------+-------+| binlog_format | ROW   |+---------------+-------+mysql> show variables like 'log_bin';+---------------+-------+| Variable_name | Value |+---------------+-------+| log_bin       | ON    |+---------------+-------+

筹备一个具备复制相干权限的mysql用户

#创立用户CREATE USER b2c IDENTIFIED BY 'd41d8cd98f00b204';    #授予slaver复制所需的相干权限GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'b2c'@'%';  -- GRANT ALL PRIVILEGES ON *.* TO 'b2c'@'%' ;  #刷新权限,使得创立的用户权限失效FLUSH PRIVILEGES; #查看用户被授予的权限show grants for 'b2c'

2、zookeeper筹备

Canal高可用集群依赖于zookeeper作为对立协调者,各个Canal server信息、Canal client信息、解析位点、生产位点会写入zookeeper;Canal客户端间接从zookeeper获取Canal服务端信息来生产,zk对立协调,保障只有一个Canal server处于激活状态。

  • zookeeper节点地址
zk1.test.yiyaowang.com:2181,zk2.test.yiyaowang.com:2181,zk3.test.yiyaowang.com:2181

3、RocketMQ筹备

  • MQ的TOPIC和GID筹备
环境业务服务TOPICGID
测试环境商品TOPIC_ITEM_ES_DATA_SYNC_TESTGID_ITEM_ES_DATA_SYNC_TEST
预公布环境商品TOPIC_ITEM_ES_DATA_SYNC_STGGID_ITEM_ES_DATA_SYNC_STG
生产环境商品TOPIC_ITEM_ES_DATA_SYNCGID_ITEM_ES_DATA_SYNC
测试环境订单TOPIC_ORDER_ES_DATA_SYNC_TESTGID_ORDER_ES_DATA_SYNC_TEST
预公布环境订单TOPIC_ORDER_ES_DATA_SYNC_STGGID_ORDER_ES_DATA_SYNC_STG
生产环境订单TOPIC_ORDER_ES_DATA_SYNCGID_ORDER_ES_DATA_SYNC

阐明Canal高可用集群并将解析的bin log间接投递音讯到阿里云RocketMQ,供业务消费者生产。实现mysql数据增量同步到Elasticsearch的工作。

4、canal机器筹备

零碎版本机器IP部署利用利用版本
CentOS 7.510.6.123.33canal.deployer1.1.4
CentOS 7.510.6.85.15canal.deployer1.1.4

三、Canal集群搭建及RocketMQ音讯投递

1、Canal下载

  • 间接下载,官网下载链接

  • wget形式获取
#以v1.1.5-alpha-2版本为例wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.deployer-1.1.5-SNAPSHOT.tar.gz
  • 编译源码
git clone git@github.com:alibaba/canal.gitgit co canal-$version #切换到对应的版本上mvn clean install -Denv=release

阐明:执行实现后,会在canal工程根目录下生成一个target目录,外面会蕴含一个 canal.deployer-$verion.tar.gz

  • canal deployer的目录构造
drwxr-xr-x. 2 root root   76 Sep  4 13:11 bin   #启停脚本drwxr-xr-x. 5 root root  130 Sep  4 13:28 conf  #配置文件drwxr-xr-x. 2 root root 4096 Sep  3 15:03 lib   #依赖库drwxrwxrwx. 4 root root   41 Sep  3 16:16 logs  #日志目录
  • 解压Canal程序包
#解压canal程序包至指定文件夹mkdir /usr/local/canaltar zxvf /usr/local/canal.deployer-1.1.4.tar.gz  -C /usr/local/canal#从以后机器(10.6.123.33)拷贝canal文件夹至另一台机器(10.6.85.15)的/usr/local目录下scp -r /usr/local/canal root@10.6.85.15:/usr/local

2、Canal配置

[root@localhost conf]# pwd/usr/local/canal/conf[root@localhost conf]# lltotal 16-rwxrwxrwx. 1 root root  291 Sep  2  2019 canal_local.properties-rwxrwxrwx. 1 root root 5394 Sep  4 13:28 canal.properties-rwxrwxrwx. 1 root root 3119 Sep  2  2019 logback.xmldrwxrwxrwx. 2 root root   39 Sep  3 15:03 metricsdrwxrwxrwx. 2 root root   49 Sep  4 11:38 order_instancedrwxrwxrwx. 3 root root  149 Sep  3 15:03 spring

咱们须要配置的文件根本就只有canal.properties以及咱们自定义的instance.properties,每个canal server都能够加载多个instance.properties,每个instance实例能够向一个mysql实例同步bin log,也就是说canal反对多个rdb库的同步业务需要。默认配置下Canal会主动扫描conf目录下咱们自定义的目录(如:order_instance),并加载改目录下的instance.properties来启动一个instance实例。

  • 配置零碎canal.properties,所有instance实例专用,可被instance配置笼罩
##########################################################         common argument        ############################################################### canal server绑定的本地IP信息,如果不配置,默认抉择一个本机IP进行启动服务canal.ip = 10.6.123.33# canal server注册到内部zookeeper、admin的ip信息canal.register.ip = 10.6.123.33# canal server提供socket服务的端口canal.port = 11111canal.metrics.pull.port = 11112# canal instance user/passwd# canal.user = canal# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458# canal admin config#canal.admin.manager = 127.0.0.1:8089canal.admin.port = 11110canal.admin.user = admincanal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441#canal server链接zookeeper集群的链接信息canal.zkServers = zk1.test.yiyaowang.com:2181,zk2.test.yiyaowang.com:2181,zk3.test.yiyaowang.com:2181# flush data to zkcanal.zookeeper.flush.period = 1000canal.withoutNetty = false# 服务模式,包含tcp(canal客户端), kafka(间接投递音讯到kafka), RocketMQ(间接投递音讯到RocketMQ)canal.serverMode = RocketMQ# flush meta cursor/parse position to filecanal.file.data.dir = ${canal.conf.dir}canal.file.flush.period = 1000## memory store RingBuffer size, should be Math.pow(2,n)canal.instance.memory.buffer.size = 16384## memory store RingBuffer used memory unit size , default 1kbcanal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZEcanal.instance.memory.batch.mode = MEMSIZEcanal.instance.memory.rawEntry = true## detecing configcanal.instance.detecting.enable = false#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()canal.instance.detecting.sql = select 1canal.instance.detecting.interval.time = 3canal.instance.detecting.retry.threshold = 3canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions deliverycanal.instance.transaction.size =  1024# mysql fallback connected to new master should fallback timescanal.instance.fallbackIntervalInSeconds = 60# network configcanal.instance.network.receiveBufferSize = 16384canal.instance.network.sendBufferSize = 16384canal.instance.network.soTimeout = 30# binlog filter configcanal.instance.filter.druid.ddl = truecanal.instance.filter.query.dcl = falsecanal.instance.filter.query.dml = falsecanal.instance.filter.query.ddl = falsecanal.instance.filter.table.error = falsecanal.instance.filter.rows = falsecanal.instance.filter.transaction.entry = false# binlog format/image checkcanal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolationcanal.instance.get.ddl.isolation = false# parallel parser configcanal.instance.parser.parallel = true## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()canal.instance.parser.parallelThreadSize = 4## disruptor ringbuffer size, must be power of 2canal.instance.parser.parallelBufferSize = 256# table meta tsdb infocanal.instance.tsdb.enable = falsecanal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;canal.instance.tsdb.dbUsername = canalcanal.instance.tsdb.dbPassword = canal# dump snapshot interval, default 24 hourcanal.instance.tsdb.snapshot.interval = 24# purge snapshot expire , default 360 hour(15 days)canal.instance.tsdb.snapshot.expire = 360# aliyun账号的ak/sk信息,如果应用阿里云的RocketMQ服务必填, support rds/mqcanal.aliyun.accessKey = xxxxxxxxxxxxxxxxxxxxxxcanal.aliyun.secretKey = xxxxxxxxxxxxxxxxxxxxxx##########################################################         destinations        ###############################################################以后server上部署的instance列表,不配置主动探测canal.destinations = # conf root dircanal.conf.dir = ../conf# 开启instance主动扫描,如果配置为true,canal.conf.dir目录下的instance配置变动会主动触发:  # a. instance目录新增: 触发instance配置载入,lazy为true时则主动启动  # b. instance目录删除:卸载对应instance配置,如已启动则进行敞开  # c. instance.properties文件变动:reload instance配置,如已启动主动进行重启操作canal.auto.scan = true#instance主动扫描的间隔时间,单位秒canal.auto.scan.interval = 5canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = springcanal.instance.global.lazy = falsecanal.instance.global.manager.address = ${canal.admin.manager}#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml#canal.instance.global.spring.xml = classpath:spring/file-instance.xmlcanal.instance.global.spring.xml = classpath:spring/default-instance.xml###########################################################              MQ              ################################################################阿里云RocketMQ服务实例的TCP协定客户端接入点canal.mq.servers = onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80canal.mq.retries = 0canal.mq.batchSize = 16384canal.mq.maxRequestSize = 1048576canal.mq.lingerMs = 100canal.mq.bufferMemory = 33554432canal.mq.canalBatchSize = 50canal.mq.canalGetTimeout = 100canal.mq.flatMessage = truecanal.mq.compressionType = nonecanal.mq.acks = all#canal.mq.properties. =#canal.mq.producerGroup = # Set this value to "cloud", if you want open message trace feature in aliyun.canal.mq.accessChannel = cloud# aliyun mq namespace#canal.mq.namespace =###########################################################     Kafka Kerberos Info    ###############################################################canal.mq.kafka.kerberos.enable = falsecanal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"
  • 在conf目录下自定义目录(如:order_instance)并创立instance.properties
################################################### mysql serverId , v1.0.26+ will autoGen# canal.instance.mysql.slaveId=0# enable gtid use true/falsecanal.instance.gtidon=false# position infocanal.instance.master.address=mysql2.test.yiyaowang.com:3306canal.instance.master.journal.name=canal.instance.master.position=canal.instance.master.timestamp=canal.instance.master.gtid=# rds oss binlogcanal.instance.rds.accesskey=canal.instance.rds.secretkey=canal.instance.rds.instanceId=# table meta tsdb infocanal.instance.tsdb.enable=false#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid=# username/passwordcanal.instance.dbUsername=yc_order# 密文明码canal.instance.dbPassword=KbOYWBV9y9OFRVcegHMdkyPtV4vBDSahBsRRxXzvLfCUQLdhVJd5Tif571EHvM0EuBwJLmr+6LkR92KxNs4heg==canal.instance.connectionCharset = UTF-8# 开启druid的加解密形式反对canal.instance.enableDruid=true# druid公钥canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJplkVnDhvx5VcC7DeBaZ2jmkFSjwHbyQVOW9uBFaNFB8C3/FtPJ6ga/g204Ch94nYC7StFlEabOJBZRkCrTnBECAwEAAQ==# table regexcanal.instance.filter.regex=yc_order\\.t_order,yc_order\\.t_order_detail,yc_order\\.t_order_child,yc_order\\.t_order_delivery,yc_order\\.t_system_pay_type,yc_order\\.t_order_exception# table black regexcanal.instance.filter.black.regex=# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq configcanal.mq.topic= TOPIC_ORDER_ES_DATA_SYNC_TESTcanal.mq.producerGroup = GID_ORDER_ES_DATA_SYNC_TEST#canal.mq.namespace = # dynamic topic route by schema or table regex#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*canal.mq.partition=0# hash partition config#canal.mq.partitionsNum=3#canal.mq.partitionHash=test.table:id^name,.*\\..*#################################################

3、Canal相干命令与状态

Canal相干命令

  • 启动canal
sh bin/startup.sh
  • 进行canal
sh bin/stop.sh
  • 查看日志
# 以order_instance实例日志为例[root@localhost order_instance]# tail -f -n 500 logs/order_instance/order_instance.log2020-09-03 16:16:19.143 [canal-instance-scan-0] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]2020-09-03 16:16:19.183 [canal-instance-scan-0] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [order_instance/instance.properties]2020-09-03 16:16:20.751 [canal-instance-scan-0] WARN  o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]2020-09-03 16:16:21.160 [canal-instance-scan-0] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]2020-09-03 16:16:21.161 [canal-instance-scan-0] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [order_instance/instance.properties]2020-09-03 16:16:21.649 [canal-instance-scan-0] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-order_instance 2020-09-03 16:16:21.660 [canal-instance-scan-0] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^yc_order\.t_system_pay_type$|^yc_order\.t_order_exception$|^yc_order\.t_order_delivery$|^yc_order\.t_order_detail$|^yc_order\.t_order_child$|^yc_order\.t_order$2020-09-03 16:16:21.660 [canal-instance-scan-0] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : 2020-09-03 16:16:21.675 [canal-instance-scan-0] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....2020-09-03 16:16:21.757 [destination = order_instance , address = mysql2.test.yiyaowang.com/10.6.168.14:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position2020-09-03 16:16:21.761 [destination = order_instance , address = mysql2.test.yiyaowang.com/10.6.168.14:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status2020-09-03 16:16:26.312 [destination = order_instance , address = mysql2.test.yiyaowang.com/10.6.168.14:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000571,position=283581902,serverId=168252,gtid=,timestamp=1599120981000] cost : 4543ms , the next step is binlog dump

阐明:如果是canal集群,只会看到一台机器上呈现了启动胜利的日志。因为canal集群中的各个节点仅有一个节点是处于active激活状态的。

Canal集群注册状态

别离启动所有canal节点,察看zookeeper中的状态,zk会记录曾经注册的所有canal server节点信息、以后运行的节点具体信息、生产节点信息。

  • zookeeper中的注册状态

  • 以后运行节点信息
{"active":true,"address":"10.6.123.33:11111"}
  • 生产位点信息
# 数据生产胜利后,canal server会在zookeeper中记录下以后最初一次生产胜利的binlog位点.  # (下次你重启client时,会从这最初一个位点持续进行生产){   "@type":"com.alibaba.otter.canal.protocol.position.LogPosition",   "identity":{          "slaveId":-1,          "sourceAddress":{              "address":"mysql2.test.yiyaowang.com",              "port":3306          }      },   "postion":{          "gtid":"",          "included":false,          "journalName":"mysql-bin.000573",          "position":308524409,          "serverId":168252,          "timestamp":1599205987000      }  }

四、客户端接入

五、Canal admin可视化配置管理