一、前言
Canal 是 alibaba 开源的中间件,纯 java 开发,是一个用来数据同步的数据管道,它将本人伪装成 mysql 的 slaver,具备解析 bin log 的能力,为上游增量同步业务提供服务。Canal 能够多个节点联合 zookeeper 组成高可用集群,Canal 集群中同时只有一个 active 状态的节点用来解析(多线程)bin log,如果有节点退出,之前节点曾经解析实现的 解析位点 和生产位点 会同步到 zookeeper,另外的节点就会顶上去持续解析 bin log(从 zookeeper 读取解析位点和生产位点),为上游客户端提供服务。
以下将搭建一个 Canal 高可用集群并将解析的 bin log 间接投递音讯到阿里云 RocketMQ,供业务消费者生产。实现 mysql 数据增量同步到 Elasticsearch 的工作。
- 参考官网 wiki
二、集群搭建资源筹备
1、mysql 筹备
- 须要筹备 mysql 服务标的
服务地址 | 库名称 | 用户 | 明文明码 |
---|---|---|---|
mysql.test.yiyaowang.com:3306 | b2c | b2c | d41d8cd98f00b204 |
mysql2.test.yiyaowang.com:3306 | yc_order | yc_order | d41d8cd98f00b204 |
阐明:存在两个数据库实例,以下实际 Canal 须要同时监听多个库。
- 数据库明文明码采纳 druid 加密
D:\yyw_mvn_repo\repository\com\alibaba\druid\1.1.21>java -cp druid-1.1.21.jar com.alibaba.druid.filter.config.ConfigTools d41d8cd98f00b204
privateKey:MIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEAmmWRWcOG/HlVwLsN4FpnaOaQVKPAdvJBU5b24EVo0UHwLf8W08nqBr+DbTgKH3idgLtK0WURps4kFlGQKtOcEQIDAQABAkAmDeyiXD/0EI/jPfdwmbetMk7Wnbm9V35kdOwKYPExyhWtYjJlrBrRXJH+cafCEov13UvFpB5PO5PnUJLnqeoVAiEAzsb5W74wj6yc8En+DBwhI9Yd/HD40orl+U8wuhvmprMCIQC/JoVs28aj2YzphvtzeGCuxKIxeFcCqE9iybhHzIH0KwIgJlGnSkIfm7CAUONVagcYeRyn5+1DnzjQT3hGbmbXQpMCIQCKP2sKk110TbirgXPFTM/oNtDzpIyRoHdiBHDihNeMZwIhAIpE+nSOCNIWfbpc/ysOfTF/0iMqdHug3eo3HrYY75ht
publicKey:MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJplkVnDhvx5VcC7DeBaZ2jmkFSjwHbyQVOW9uBFaNFB8C3/FtPJ6ga/g204Ch94nYC7StFlEabOJBZRkCrTnBECAwEAAQ==
password:KbOYWBV9y9OFRVcegHMdkyPtV4vBDSahBsRRxXzvLfCUQLdhVJd5Tif571EHvM0EuBwJLmr+6LkR92KxNs4heg==
- druid 加解密测试
/**
* druid 加解密测试
* @throws Exception
*/
@Test
public void druidDecryptTest() throws Exception {
// 私钥
String privateKey = "MIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEAmmWRWcOG/HlVwLsN4FpnaOaQVKPAdvJBU5b24EVo0UHwLf8W08nqBr+DbTgKH3idgLtK0WURps4kFlGQKtOcEQIDAQABAkAmDeyiXD/0EI/jPfdwmbetMk7Wnbm9V35kdOwKYPExyhWtYjJlrBrRXJH+cafCEov13UvFpB5PO5PnUJLnqeoVAiEAzsb5W74wj6yc8En+DBwhI9Yd/HD40orl+U8wuhvmprMCIQC/JoVs28aj2YzphvtzeGCuxKIxeFcCqE9iybhHzIH0KwIgJlGnSkIfm7CAUONVagcYeRyn5+1DnzjQT3hGbmbXQpMCIQCKP2sKk110TbirgXPFTM/oNtDzpIyRoHdiBHDihNeMZwIhAIpE+nSOCNIWfbpc/ysOfTF/0iMqdHug3eo3HrYY75ht";
// 公钥
String publicKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJplkVnDhvx5VcC7DeBaZ2jmkFSjwHbyQVOW9uBFaNFB8C3/FtPJ6ga/g204Ch94nYC7StFlEabOJBZRkCrTnBECAwEAAQ==";
// 密文明码
String password = "KbOYWBV9y9OFRVcegHMdkyPtV4vBDSahBsRRxXzvLfCUQLdhVJd5Tif571EHvM0EuBwJLmr+6LkR92KxNs4heg==";
//druid 解密
log.info("ConfigTools.decrypt:{}", ConfigTools.decrypt(publicKey, password));
//druid 加密
log.info("ConfigTools.encrypt:{}", ConfigTools.encrypt(privateKey, "d41d8cd98f00b204"));
}
mysql 服务必须开启 bin log 反对
- 批改 mysql 配置文件 my.cnf
[mysqld]
#开启 bin log
log-bin=mysql-bin
#抉择 row 模式
binlog-format=ROW
#配置 mysql replaction 须要定义,不能和 canal 的 slaveId 反复
server_id=1
阐明:Canal 的原理是基于 mysql binlog 技术,所以这里肯定须要开启 mysql 的 binlog 写入性能,并且配置 binlog 模式为 row。
- 验证 mysql 配置文件 my.cnf
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
筹备一个具备复制相干权限的 mysql 用户
# 创立用户
CREATE USER b2c IDENTIFIED BY 'd41d8cd98f00b204';
#授予 slaver 复制所需的相干权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'b2c'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'b2c'@'%' ;
#刷新权限,使得创立的用户权限失效
FLUSH PRIVILEGES;
#查看用户被授予的权限
show grants for 'b2c'
2、zookeeper 筹备
Canal 高可用集群依赖于 zookeeper 作为对立协调者,各个 Canal server 信息、Canal client 信息、解析位点、生产位点会写入 zookeeper;Canal 客户端间接从 zookeeper 获取 Canal 服务端信息来生产,zk 对立协调,保障只有一个 Canal server 处于激活状态。
- zookeeper 节点地址
zk1.test.yiyaowang.com:2181,zk2.test.yiyaowang.com:2181,zk3.test.yiyaowang.com:2181
3、RocketMQ 筹备
- MQ 的 TOPIC 和 GID 筹备
环境 | 业务服务 | TOPIC | GID |
---|---|---|---|
测试环境 | 商品 | TOPIC_ITEM_ES_DATA_SYNC_TEST | GID_ITEM_ES_DATA_SYNC_TEST |
预公布环境 | 商品 | TOPIC_ITEM_ES_DATA_SYNC_STG | GID_ITEM_ES_DATA_SYNC_STG |
生产环境 | 商品 | TOPIC_ITEM_ES_DATA_SYNC | GID_ITEM_ES_DATA_SYNC |
测试环境 | 订单 | TOPIC_ORDER_ES_DATA_SYNC_TEST | GID_ORDER_ES_DATA_SYNC_TEST |
预公布环境 | 订单 | TOPIC_ORDER_ES_DATA_SYNC_STG | GID_ORDER_ES_DATA_SYNC_STG |
生产环境 | 订单 | TOPIC_ORDER_ES_DATA_SYNC | GID_ORDER_ES_DATA_SYNC |
阐明 Canal 高可用集群并将解析的 bin log 间接投递音讯到阿里云 RocketMQ,供业务消费者生产。实现 mysql 数据增量同步到 Elasticsearch 的工作。
4、canal 机器筹备
零碎版本 | 机器 IP | 部署利用 | 利用版本 |
---|---|---|---|
CentOS 7.5 | 10.6.123.33 | canal.deployer | 1.1.4 |
CentOS 7.5 | 10.6.85.15 | canal.deployer | 1.1.4 |
三、Canal 集群搭建及 RocketMQ 音讯投递
1、Canal 下载
- 间接下载,官网下载链接
- wget 形式获取
# 以 v1.1.5-alpha- 2 版本为例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.deployer-1.1.5-SNAPSHOT.tar.gz
- 编译源码
git clone git@github.com:alibaba/canal.git
git co canal-$version #切换到对应的版本上
mvn clean install -Denv=release
阐明:执行实现后,会在 canal 工程根目录下生成一个 target 目录,外面会蕴含一个 canal.deployer-$verion.tar.gz
- canal deployer 的目录构造
drwxr-xr-x. 2 root root 76 Sep 4 13:11 bin #启停脚本
drwxr-xr-x. 5 root root 130 Sep 4 13:28 conf #配置文件
drwxr-xr-x. 2 root root 4096 Sep 3 15:03 lib #依赖库
drwxrwxrwx. 4 root root 41 Sep 3 16:16 logs #日志目录
- 解压 Canal 程序包
# 解压 canal 程序包至指定文件夹
mkdir /usr/local/canal
tar zxvf /usr/local/canal.deployer-1.1.4.tar.gz -C /usr/local/canal
#从以后机器(10.6.123.33)拷贝 canal 文件夹至另一台机器(10.6.85.15)的 /usr/local 目录下
scp -r /usr/local/canal root@10.6.85.15:/usr/local
2、Canal 配置
[root@localhost conf]# pwd
/usr/local/canal/conf
[root@localhost conf]# ll
total 16
-rwxrwxrwx. 1 root root 291 Sep 2 2019 canal_local.properties
-rwxrwxrwx. 1 root root 5394 Sep 4 13:28 canal.properties
-rwxrwxrwx. 1 root root 3119 Sep 2 2019 logback.xml
drwxrwxrwx. 2 root root 39 Sep 3 15:03 metrics
drwxrwxrwx. 2 root root 49 Sep 4 11:38 order_instance
drwxrwxrwx. 3 root root 149 Sep 3 15:03 spring
咱们须要配置的文件根本就只有 canal.properties 以及咱们自定义的 instance.properties,每个 canal server 都能够加载多个 instance.properties,每个 instance 实例能够向一个 mysql 实例同步 bin log,也就是说 canal 反对多个 rdb 库的同步业务需要。默认配置下 Canal 会主动扫描 conf 目录下咱们自定义的目录(如:order_instance),并加载改目录下的 instance.properties 来启动一个 instance 实例。
- 配置零碎 canal.properties,所有 instance 实例专用,可被 instance 配置笼罩
#################################################
######### common argument #############
#################################################
# canal server 绑定的本地 IP 信息,如果不配置,默认抉择一个本机 IP 进行启动服务
canal.ip = 10.6.123.33
# canal server 注册到内部 zookeeper、admin 的 ip 信息
canal.register.ip = 10.6.123.33
# canal server 提供 socket 服务的端口
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
#canal server 链接 zookeeper 集群的链接信息
canal.zkServers = zk1.test.yiyaowang.com:2181,zk2.test.yiyaowang.com:2181,zk3.test.yiyaowang.com:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# 服务模式,包含 tcp(canal 客户端), kafka(间接投递音讯到 kafka), RocketMQ(间接投递音讯到 RocketMQ)canal.serverMode = RocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
canal.instance.parser.parallelThreadSize = 4
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = false
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
# aliyun 账号的 ak/sk 信息,如果应用阿里云的 RocketMQ 服务必填, support rds/mq
canal.aliyun.accessKey = xxxxxxxxxxxxxxxxxxxxxx
canal.aliyun.secretKey = xxxxxxxxxxxxxxxxxxxxxx
#################################################
######### destinations #############
#################################################
#以后 server 上部署的 instance 列表, 不配置主动探测
canal.destinations =
# conf root dir
canal.conf.dir = ../conf
# 开启 instance 主动扫描,如果配置为 true,canal.conf.dir 目录下的 instance 配置变动会主动触发:# a. instance 目录新增:触发 instance 配置载入,lazy 为 true 时则主动启动
# b. instance 目录删除:卸载对应 instance 配置,如已启动则进行敞开
# c. instance.properties 文件变动:reload instance 配置,如已启动主动进行重启操作
canal.auto.scan = true
#instance 主动扫描的间隔时间,单位秒
canal.auto.scan.interval = 5
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ #############
##################################################
#阿里云 RocketMQ 服务实例的 TCP 协定客户端接入点
canal.mq.servers = onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
#canal.mq.producerGroup =
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = cloud
# aliyun mq namespace
#canal.mq.namespace =
##################################################
######### Kafka Kerberos Info #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"
- 在 conf 目录下自定义目录(如:order_instance)并创立 instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=mysql2.test.yiyaowang.com:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=false
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=yc_order
# 密文明码
canal.instance.dbPassword=KbOYWBV9y9OFRVcegHMdkyPtV4vBDSahBsRRxXzvLfCUQLdhVJd5Tif571EHvM0EuBwJLmr+6LkR92KxNs4heg==
canal.instance.connectionCharset = UTF-8
# 开启 druid 的加解密形式反对
canal.instance.enableDruid=true
# druid 公钥
canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJplkVnDhvx5VcC7DeBaZ2jmkFSjwHbyQVOW9uBFaNFB8C3/FtPJ6ga/g204Ch94nYC7StFlEabOJBZRkCrTnBECAwEAAQ==
# table regex
canal.instance.filter.regex=yc_order\\.t_order,yc_order\\.t_order_detail,yc_order\\.t_order_child,yc_order\\.t_order_delivery,yc_order\\.t_system_pay_type,yc_order\\.t_order_exception
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic= TOPIC_ORDER_ES_DATA_SYNC_TEST
canal.mq.producerGroup = GID_ORDER_ES_DATA_SYNC_TEST
#canal.mq.namespace =
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
3、Canal 相干命令与状态
Canal 相干命令
- 启动 canal
sh bin/startup.sh
- 进行 canal
sh bin/stop.sh
- 查看日志
# 以 order_instance 实例日志为例
[root@localhost order_instance]# tail -f -n 500 logs/order_instance/order_instance.log
2020-09-03 16:16:19.143 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-09-03 16:16:19.183 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [order_instance/instance.properties]
2020-09-03 16:16:20.751 [canal-instance-scan-0] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]
2020-09-03 16:16:21.160 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-09-03 16:16:21.161 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [order_instance/instance.properties]
2020-09-03 16:16:21.649 [canal-instance-scan-0] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-order_instance
2020-09-03 16:16:21.660 [canal-instance-scan-0] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^yc_order\.t_system_pay_type$|^yc_order\.t_order_exception$|^yc_order\.t_order_delivery$|^yc_order\.t_order_detail$|^yc_order\.t_order_child$|^yc_order\.t_order$
2020-09-03 16:16:21.660 [canal-instance-scan-0] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :
2020-09-03 16:16:21.675 [canal-instance-scan-0] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2020-09-03 16:16:21.757 [destination = order_instance , address = mysql2.test.yiyaowang.com/10.6.168.14:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2020-09-03 16:16:21.761 [destination = order_instance , address = mysql2.test.yiyaowang.com/10.6.168.14:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2020-09-03 16:16:26.312 [destination = order_instance , address = mysql2.test.yiyaowang.com/10.6.168.14:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000571,position=283581902,serverId=168252,gtid=,timestamp=1599120981000] cost : 4543ms , the next step is binlog dump
阐明:如果是 canal 集群,只会看到一台机器上呈现了启动胜利的日志。因为 canal 集群中的各个节点仅有一个节点是处于 active 激活状态的。
Canal 集群注册状态
别离启动所有 canal 节点,察看 zookeeper 中的状态,zk 会记录曾经注册的所有 canal server 节点信息、以后运行的节点具体信息、生产节点信息。
- zookeeper 中的注册状态
- 以后运行节点信息
{"active":true,"address":"10.6.123.33:11111"}
- 生产位点信息
# 数据生产胜利后,canal server 会在 zookeeper 中记录下以后最初一次生产胜利的 binlog 位点.
# (下次你重启 client 时,会从这最初一个位点持续进行生产)
{
"@type":"com.alibaba.otter.canal.protocol.position.LogPosition",
"identity":{
"slaveId":-1,
"sourceAddress":{
"address":"mysql2.test.yiyaowang.com",
"port":3306
}
},
"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000573","position":308524409,"serverId":168252,"timestamp":1599205987000}
}