关于canal:CanalCanal集群实践

77次阅读

共计 16498 个字符,预计需要花费 42 分钟才能阅读完成。

一、前言

Canal 是 alibaba 开源的中间件,纯 java 开发,是一个用来数据同步的数据管道,它将本人伪装成 mysql 的 slaver,具备解析 bin log 的能力,为上游增量同步业务提供服务。Canal 能够多个节点联合 zookeeper 组成高可用集群,Canal 集群中同时只有一个 active 状态的节点用来解析(多线程)bin log,如果有节点退出,之前节点曾经解析实现的 解析位点 生产位点 会同步到 zookeeper,另外的节点就会顶上去持续解析 bin log(从 zookeeper 读取解析位点和生产位点),为上游客户端提供服务。
以下将搭建一个 Canal 高可用集群并将解析的 bin log 间接投递音讯到阿里云 RocketMQ,供业务消费者生产。实现 mysql 数据增量同步到 Elasticsearch 的工作。

  • 参考官网 wiki

二、集群搭建资源筹备

1、mysql 筹备

  • 须要筹备 mysql 服务标的
服务地址 库名称 用户 明文明码
mysql.test.yiyaowang.com:3306 b2c b2c d41d8cd98f00b204
mysql2.test.yiyaowang.com:3306 yc_order yc_order d41d8cd98f00b204

阐明:存在两个数据库实例,以下实际 Canal 须要同时监听多个库。

  • 数据库明文明码采纳 druid 加密
D:\yyw_mvn_repo\repository\com\alibaba\druid\1.1.21>java -cp druid-1.1.21.jar com.alibaba.druid.filter.config.ConfigTools d41d8cd98f00b204
privateKey:MIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEAmmWRWcOG/HlVwLsN4FpnaOaQVKPAdvJBU5b24EVo0UHwLf8W08nqBr+DbTgKH3idgLtK0WURps4kFlGQKtOcEQIDAQABAkAmDeyiXD/0EI/jPfdwmbetMk7Wnbm9V35kdOwKYPExyhWtYjJlrBrRXJH+cafCEov13UvFpB5PO5PnUJLnqeoVAiEAzsb5W74wj6yc8En+DBwhI9Yd/HD40orl+U8wuhvmprMCIQC/JoVs28aj2YzphvtzeGCuxKIxeFcCqE9iybhHzIH0KwIgJlGnSkIfm7CAUONVagcYeRyn5+1DnzjQT3hGbmbXQpMCIQCKP2sKk110TbirgXPFTM/oNtDzpIyRoHdiBHDihNeMZwIhAIpE+nSOCNIWfbpc/ysOfTF/0iMqdHug3eo3HrYY75ht
publicKey:MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJplkVnDhvx5VcC7DeBaZ2jmkFSjwHbyQVOW9uBFaNFB8C3/FtPJ6ga/g204Ch94nYC7StFlEabOJBZRkCrTnBECAwEAAQ==
password:KbOYWBV9y9OFRVcegHMdkyPtV4vBDSahBsRRxXzvLfCUQLdhVJd5Tif571EHvM0EuBwJLmr+6LkR92KxNs4heg==
  • druid 加解密测试
    /**
     * druid 加解密测试
     * @throws Exception
     */
    @Test
    public void druidDecryptTest() throws Exception {
        // 私钥
        String privateKey = "MIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEAmmWRWcOG/HlVwLsN4FpnaOaQVKPAdvJBU5b24EVo0UHwLf8W08nqBr+DbTgKH3idgLtK0WURps4kFlGQKtOcEQIDAQABAkAmDeyiXD/0EI/jPfdwmbetMk7Wnbm9V35kdOwKYPExyhWtYjJlrBrRXJH+cafCEov13UvFpB5PO5PnUJLnqeoVAiEAzsb5W74wj6yc8En+DBwhI9Yd/HD40orl+U8wuhvmprMCIQC/JoVs28aj2YzphvtzeGCuxKIxeFcCqE9iybhHzIH0KwIgJlGnSkIfm7CAUONVagcYeRyn5+1DnzjQT3hGbmbXQpMCIQCKP2sKk110TbirgXPFTM/oNtDzpIyRoHdiBHDihNeMZwIhAIpE+nSOCNIWfbpc/ysOfTF/0iMqdHug3eo3HrYY75ht";
        // 公钥
        String publicKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJplkVnDhvx5VcC7DeBaZ2jmkFSjwHbyQVOW9uBFaNFB8C3/FtPJ6ga/g204Ch94nYC7StFlEabOJBZRkCrTnBECAwEAAQ==";
        // 密文明码
        String password = "KbOYWBV9y9OFRVcegHMdkyPtV4vBDSahBsRRxXzvLfCUQLdhVJd5Tif571EHvM0EuBwJLmr+6LkR92KxNs4heg==";
        //druid 解密
        log.info("ConfigTools.decrypt:{}", ConfigTools.decrypt(publicKey, password));
        //druid 加密
        log.info("ConfigTools.encrypt:{}", ConfigTools.encrypt(privateKey, "d41d8cd98f00b204"));
    }

mysql 服务必须开启 bin log 反对

  • 批改 mysql 配置文件 my.cnf
[mysqld]  
#开启 bin log  
log-bin=mysql-bin 

#抉择 row 模式  
binlog-format=ROW 

#配置 mysql replaction 须要定义,不能和 canal 的 slaveId 反复  
server_id=1 

阐明:Canal 的原理是基于 mysql binlog 技术,所以这里肯定须要开启 mysql 的 binlog 写入性能,并且配置 binlog 模式为 row。

  • 验证 mysql 配置文件 my.cnf
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+

筹备一个具备复制相干权限的 mysql 用户

# 创立用户
CREATE USER b2c IDENTIFIED BY 'd41d8cd98f00b204';    

#授予 slaver 复制所需的相干权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'b2c'@'%';  
-- GRANT ALL PRIVILEGES ON *.* TO 'b2c'@'%' ;  

#刷新权限,使得创立的用户权限失效
FLUSH PRIVILEGES; 

#查看用户被授予的权限
show grants for 'b2c'

2、zookeeper 筹备

Canal 高可用集群依赖于 zookeeper 作为对立协调者,各个 Canal server 信息、Canal client 信息、解析位点、生产位点会写入 zookeeper;Canal 客户端间接从 zookeeper 获取 Canal 服务端信息来生产,zk 对立协调,保障只有一个 Canal server 处于激活状态。

  • zookeeper 节点地址
zk1.test.yiyaowang.com:2181,zk2.test.yiyaowang.com:2181,zk3.test.yiyaowang.com:2181

3、RocketMQ 筹备

  • MQ 的 TOPIC 和 GID 筹备
环境 业务服务 TOPIC GID
测试环境 商品 TOPIC_ITEM_ES_DATA_SYNC_TEST GID_ITEM_ES_DATA_SYNC_TEST
预公布环境 商品 TOPIC_ITEM_ES_DATA_SYNC_STG GID_ITEM_ES_DATA_SYNC_STG
生产环境 商品 TOPIC_ITEM_ES_DATA_SYNC GID_ITEM_ES_DATA_SYNC
测试环境 订单 TOPIC_ORDER_ES_DATA_SYNC_TEST GID_ORDER_ES_DATA_SYNC_TEST
预公布环境 订单 TOPIC_ORDER_ES_DATA_SYNC_STG GID_ORDER_ES_DATA_SYNC_STG
生产环境 订单 TOPIC_ORDER_ES_DATA_SYNC GID_ORDER_ES_DATA_SYNC

阐明 Canal 高可用集群并将解析的 bin log 间接投递音讯到阿里云 RocketMQ,供业务消费者生产。实现 mysql 数据增量同步到 Elasticsearch 的工作。

4、canal 机器筹备

零碎版本 机器 IP 部署利用 利用版本
CentOS 7.5 10.6.123.33 canal.deployer 1.1.4
CentOS 7.5 10.6.85.15 canal.deployer 1.1.4

三、Canal 集群搭建及 RocketMQ 音讯投递

1、Canal 下载

  • 间接下载,官网下载链接

  • wget 形式获取
# 以 v1.1.5-alpha- 2 版本为例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.deployer-1.1.5-SNAPSHOT.tar.gz
  • 编译源码
git clone git@github.com:alibaba/canal.git
git co canal-$version #切换到对应的版本上
mvn clean install -Denv=release

阐明:执行实现后,会在 canal 工程根目录下生成一个 target 目录,外面会蕴含一个 canal.deployer-$verion.tar.gz

  • canal deployer 的目录构造
drwxr-xr-x. 2 root root   76 Sep  4 13:11 bin   #启停脚本
drwxr-xr-x. 5 root root  130 Sep  4 13:28 conf  #配置文件
drwxr-xr-x. 2 root root 4096 Sep  3 15:03 lib   #依赖库
drwxrwxrwx. 4 root root   41 Sep  3 16:16 logs  #日志目录
  • 解压 Canal 程序包
# 解压 canal 程序包至指定文件夹
mkdir /usr/local/canal
tar zxvf /usr/local/canal.deployer-1.1.4.tar.gz  -C /usr/local/canal

#从以后机器(10.6.123.33)拷贝 canal 文件夹至另一台机器(10.6.85.15)的 /usr/local 目录下
scp -r /usr/local/canal root@10.6.85.15:/usr/local

2、Canal 配置

[root@localhost conf]# pwd
/usr/local/canal/conf
[root@localhost conf]# ll
total 16
-rwxrwxrwx. 1 root root  291 Sep  2  2019 canal_local.properties
-rwxrwxrwx. 1 root root 5394 Sep  4 13:28 canal.properties
-rwxrwxrwx. 1 root root 3119 Sep  2  2019 logback.xml
drwxrwxrwx. 2 root root   39 Sep  3 15:03 metrics
drwxrwxrwx. 2 root root   49 Sep  4 11:38 order_instance
drwxrwxrwx. 3 root root  149 Sep  3 15:03 spring

咱们须要配置的文件根本就只有 canal.properties 以及咱们自定义的 instance.properties,每个 canal server 都能够加载多个 instance.properties,每个 instance 实例能够向一个 mysql 实例同步 bin log,也就是说 canal 反对多个 rdb 库的同步业务需要。默认配置下 Canal 会主动扫描 conf 目录下咱们自定义的目录(如:order_instance),并加载改目录下的 instance.properties 来启动一个 instance 实例。

  • 配置零碎 canal.properties,所有 instance 实例专用,可被 instance 配置笼罩
#################################################
#########         common argument        #############
#################################################
# canal server 绑定的本地 IP 信息,如果不配置,默认抉择一个本机 IP 进行启动服务
canal.ip = 10.6.123.33
# canal server 注册到内部 zookeeper、admin 的 ip 信息
canal.register.ip = 10.6.123.33
# canal server 提供 socket 服务的端口
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

#canal server 链接 zookeeper 集群的链接信息
canal.zkServers = zk1.test.yiyaowang.com:2181,zk2.test.yiyaowang.com:2181,zk3.test.yiyaowang.com:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# 服务模式,包含 tcp(canal 客户端), kafka(间接投递音讯到 kafka), RocketMQ(间接投递音讯到 RocketMQ)canal.serverMode = RocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
canal.instance.parser.parallelThreadSize = 4
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = false
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun 账号的 ak/sk 信息,如果应用阿里云的 RocketMQ 服务必填, support rds/mq
canal.aliyun.accessKey = xxxxxxxxxxxxxxxxxxxxxx
canal.aliyun.secretKey = xxxxxxxxxxxxxxxxxxxxxx

#################################################
#########         destinations        #############
#################################################
#以后 server 上部署的 instance 列表, 不配置主动探测
canal.destinations = 
# conf root dir
canal.conf.dir = ../conf
# 开启 instance 主动扫描,如果配置为 true,canal.conf.dir 目录下的 instance 配置变动会主动触发:# a. instance 目录新增:触发 instance 配置载入,lazy 为 true 时则主动启动  
# b. instance 目录删除:卸载对应 instance 配置,如已启动则进行敞开  
# c. instance.properties 文件变动:reload instance 配置,如已启动主动进行重启操作
canal.auto.scan = true
#instance 主动扫描的间隔时间,单位秒
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
#########              MQ              #############
##################################################
#阿里云 RocketMQ 服务实例的 TCP 协定客户端接入点
canal.mq.servers = onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
#canal.mq.producerGroup = 
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = cloud
# aliyun mq namespace
#canal.mq.namespace =

##################################################
#########     Kafka Kerberos Info    #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"
  • 在 conf 目录下自定义目录(如:order_instance)并创立 instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=mysql2.test.yiyaowang.com:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=false
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=yc_order
# 密文明码
canal.instance.dbPassword=KbOYWBV9y9OFRVcegHMdkyPtV4vBDSahBsRRxXzvLfCUQLdhVJd5Tif571EHvM0EuBwJLmr+6LkR92KxNs4heg==
canal.instance.connectionCharset = UTF-8
# 开启 druid 的加解密形式反对
canal.instance.enableDruid=true
# druid 公钥
canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJplkVnDhvx5VcC7DeBaZ2jmkFSjwHbyQVOW9uBFaNFB8C3/FtPJ6ga/g204Ch94nYC7StFlEabOJBZRkCrTnBECAwEAAQ==

# table regex
canal.instance.filter.regex=yc_order\\.t_order,yc_order\\.t_order_detail,yc_order\\.t_order_child,yc_order\\.t_order_delivery,yc_order\\.t_system_pay_type,yc_order\\.t_order_exception
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic= TOPIC_ORDER_ES_DATA_SYNC_TEST
canal.mq.producerGroup = GID_ORDER_ES_DATA_SYNC_TEST
#canal.mq.namespace = 
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

3、Canal 相干命令与状态

Canal 相干命令

  • 启动 canal
sh bin/startup.sh
  • 进行 canal
sh bin/stop.sh
  • 查看日志
# 以 order_instance 实例日志为例
[root@localhost order_instance]# tail -f -n 500 logs/order_instance/order_instance.log
2020-09-03 16:16:19.143 [canal-instance-scan-0] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-09-03 16:16:19.183 [canal-instance-scan-0] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [order_instance/instance.properties]
2020-09-03 16:16:20.751 [canal-instance-scan-0] WARN  o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]
2020-09-03 16:16:21.160 [canal-instance-scan-0] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-09-03 16:16:21.161 [canal-instance-scan-0] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [order_instance/instance.properties]
2020-09-03 16:16:21.649 [canal-instance-scan-0] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-order_instance 
2020-09-03 16:16:21.660 [canal-instance-scan-0] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^yc_order\.t_system_pay_type$|^yc_order\.t_order_exception$|^yc_order\.t_order_delivery$|^yc_order\.t_order_detail$|^yc_order\.t_order_child$|^yc_order\.t_order$
2020-09-03 16:16:21.660 [canal-instance-scan-0] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : 
2020-09-03 16:16:21.675 [canal-instance-scan-0] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2020-09-03 16:16:21.757 [destination = order_instance , address = mysql2.test.yiyaowang.com/10.6.168.14:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2020-09-03 16:16:21.761 [destination = order_instance , address = mysql2.test.yiyaowang.com/10.6.168.14:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2020-09-03 16:16:26.312 [destination = order_instance , address = mysql2.test.yiyaowang.com/10.6.168.14:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000571,position=283581902,serverId=168252,gtid=,timestamp=1599120981000] cost : 4543ms , the next step is binlog dump

阐明:如果是 canal 集群,只会看到一台机器上呈现了启动胜利的日志。因为 canal 集群中的各个节点仅有一个节点是处于 active 激活状态的。

Canal 集群注册状态

别离启动所有 canal 节点,察看 zookeeper 中的状态,zk 会记录曾经注册的所有 canal server 节点信息、以后运行的节点具体信息、生产节点信息。

  • zookeeper 中的注册状态

  • 以后运行节点信息
{"active":true,"address":"10.6.123.33:11111"}
  • 生产位点信息
# 数据生产胜利后,canal server 会在 zookeeper 中记录下以后最初一次生产胜利的 binlog 位点.  
# (下次你重启 client 时,会从这最初一个位点持续进行生产)
{  
 "@type":"com.alibaba.otter.canal.protocol.position.LogPosition",  
 "identity":{  
        "slaveId":-1,  
        "sourceAddress":{  
            "address":"mysql2.test.yiyaowang.com",  
            "port":3306  
        }  
    },  
 "postion":{"gtid":"","included":false,"journalName":"mysql-bin.000573","position":308524409,"serverId":168252,"timestamp":1599205987000}  
}

四、客户端接入

五、Canal admin 可视化配置管理

正文完
 0