乐趣区

MySQL数据库增量日志解析工具canal-实战

简介

canal,阿里开源工具,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

应用场景

  • 数据库实时备份
  • 业务 cache 刷新
  • 索引构建和实时维护,例:将商品数据推送到 es 中构建倒排索引
  • 带业务逻辑的增量数据处理,例:增量数据推送到第三方平台

官网

https://github.com/alibaba/canal

原理

  1. MySQL master将数据写入binlog
  2. canalmaster 发送 dump 协议
  3. master 收到 dump 请求,推送 binlogcanal
  4. canal 解析 binlog,可讲数据投递到MQ 系统中,目前支持kafkaRocketMQ

安装

配置 mysql

建议的 mysql 版本是 5.7.x
mysql8.0.x 见官方说明 https://github.com/alibaba/ca…

修改 mysql 配置文件my.cnf

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1

新增用户并授权,测试的话可以直接使用 root 用户

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '123456'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

配置 canal

查看是否安装 java

➜  canal-admin java -version
java version "1.8.0_251"
Java(TM) SE Runtime Environment (build 1.8.0_251-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.251-b08, mixed mode)

若没有安装,去 oracle 官网下载 1.8 版本(切勿选择高版本),选择适合自己系统的版本安装即可,mac 系统下载的 dmg,直接点击安装,不再累述 image.png

访问 release 页面,下载最新稳定版 1.14
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

解压到指定目录

mkdir /tmp/canal
tar zxvf canal.deployer-1.1.4.tar.gz  -C /tmp/canal

进入到 canal 目录,查看文件

  • bin 目录:运行命令
  • conf 目录:配置文件目录
  • logs 目录:记录运行 log
  • lib 目录:jar 包
➜  ll                                  
total 0
drwxr-xr-x   7 jiao  staff   224B  6  3 17:17 bin
drwxr-xr-x   9 jiao  staff   288B  6  3 20:12 conf
drwxr-xr-x  83 jiao  staff   2.6K  6  1 17:54 lib
drwxr-xr-x   7 jiao  staff   224B  6  3 17:05 logs
drwxr-xr-x   7 jiao  staff   224B  5 29 17:43 pierced

修改 instance 配置文件

vi conf/example/instance.properties

配置参数详解:https://github.com/alibaba/ca…

我这里修改了数据连接地址,用户和密码,以及同步规则:只同步 test 数据库的 sc_user 表

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.dbUsername = canal  
canal.instance.dbPassword = 123456
#主库 binlog 文件路径
#canal.instance.standby.journal.name =
#主库 binlog 偏移量
#canal.instance.standby.position =
canal.instance.filter.regex = test\\.sc_user

修改 Server 配置文件
vim conf/canal.properties

#tcp bind ip,设置投递到 tcp 时需设置
canal.ip = 192.168.101.47
#register ip to zookeeper
canal.register.ip = 192.168.101.47
canal.port = 11111
canal.metrics.pull.port = 11112
#canal instance user/passwd
#canal.user = canal
#canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

#canal-admin 需设置,先不设置 
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

#zk 设置,单机的 canal 可不设置
canal.zkServers = 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
#flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
#tcp, kafka, RocketMQ,投递到哪
canal.serverMode = RocketMQ
#flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
##memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
##memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
##meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

##detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

#support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
#mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

#network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

#binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

#binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

#binlog ddl isolation
canal.instance.get.ddl.isolation = false

#parallel parser config
canal.instance.parser.parallel = true
##concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
##disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

#table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
#dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
#purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

#aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
#########         destinations        #############
#################################################
#使用的 instance,可设置上面定义的 example
canal.destinations = example
#conf root dir
canal.conf.dir = ../conf
#auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
#########              MQ              #############
##################################################
#rocketmq 地址设置
canal.mq.servers = 192.168.101.47:9876
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
#Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
#aliyun mq namespace
#canal.mq.namespace =

##################################################
#########     Kafka Kerberos Info    #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

测试 tcp

下载 canal php 客户端: https://github.com/xingwenge/canal-php

修改 src/sample/client.php 文件

<?php

namespace xingwenge\canal_php\sample;

use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\Fmt;

require_once __DIR__ . '/../../vendor/autoload.php';

ini_set('display_errors', 'On');
error_reporting(E_ALL);

try {
    $host   = '192.168.101.47';//canal server 地址
    $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET);
    # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);

    $client->connect($host, 11111);
    $client->checkValid('canal','E3619321C1A937C46A0D8BD1DAC39F93B27D4458');// 认证信息,对于 canal.properties 配置的 canal.user 和 canal.passwd
//    $client->subscribe("1001", "example", ".*\\..*");
    $client->subscribe("1001", "example", "test.sc_user"); # 设置过滤规则

    while (true) {$message = $client->get(100);
        if ($entries = $message->getEntries()) {foreach ($entries as $entry) {Fmt::println($entry);
            }
        }
        sleep(1);
    }

    $client->disConnect();} catch (\Exception $e) {echo $e->getMessage(), PHP_EOL;
}

运行 canal
sh bin/startup.sh
查看 log 是否有错误信息
tail -n 50 -f logs/example/example.log
输出一下内容说明 canal 已正常启动,canal已成功连接 mysql 准备发送 dump 指令同步数据

2020-06-03 19:30:47.625 [destination = example , address = /127.0.0.1:3307 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=303179,serverId=1,gtid=,timestamp=1591183839000] cost : 5ms , the next step is binlog dump

运行 php client.php,并在数据库sc_user 表中修改一条数据,可以看到 client 输出了数据库修改的内容
`================> binlog[mysql-bin.000001 : 208118],name[wcc-scm-service,sc_status_history], eventType: 2

——-> before
status_history_id : 2543 update= false
record_id : 662 update= false
status_type : 151 update= false
old_status : 158 update= false
new_status : 153 update= false
opt_type : SELLER_REVOKE update= false
user_id : 282 update= false
user_name : 加工厂 1 update= false
remark : update= false
created : 2020-05-28 18:40:02 update= false
——-> after
status_history_id : 2543 update= false
record_id : 662 update= false
status_type : 151 update= false
old_status : 158 update= false
new_status : 153 update= false
opt_type : SELLER_REVOKE update= false
user_id : 282 update= false
user_name : 加工厂 12 update= true
remark : update= false
created : 2020-05-28 18:40:02 update= false
TSocket: Could not read 4 bytes from 192.168.101.47:11111`

测试 RocketMQ

安装 RocketMQ

这里直接在 docker 里安装

1. 安装 server
docker run -d -p 9876:9876 --name rocketmq-server -e "MAX\_POSSIBLE\_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv
2. 新增配置文件
echo "brokerIP1=192.168.101.47" > broker.properties
3. 安装broker/path/broker.properties替换成第 2 步文件路径
docker run -d -p 10911:10911 -p 10909:10909 -v /path/broker.properties:/opt/rocketmq-4.4.0/bin/broker.properties --name rocketmq-broker --link rocketmq-server -e "NAMESRV\_ADDR=rocketmq-server:9876" -e "MAX\_POSSIBLE\_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c broker.properties
4. 安装 RocketMQ Web
docker run -e "JAVA\_OPTS=-Drocketmq.namesrv.addr=192.168.101.47:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng -d

访问 Rocket 管理页面
http://localhost:8080/

修改配置文件

修改 canal.properties
投递到 tcp 修改成投递到RocketMQ
`canal.serverMode = RocketMQ
canal.mq.servers = 192.168.101.47:9876`

运行 canal

sh bin/startup.sh
查看 log 文件,排查是否有错误
tail -n 50 -f logs/canal/canal.log
tail -n 50 -f logs/example/example.log

顺利的话,在 RocketMQ 中可以查看到,新增的 Topic 和 Message,RocketMQ 可以在 manage 页面直接查看 messge 内容

后记

在安装和使用 canal 的时候还是遇到了一些 ,比如 java 高版本报错 投递到消息队列失败 等,需要耐心排查 log 日志,分析原因,canal 也用到了很多技术栈[zookeeper、kafka、RocketMQ],后续将会进一步去深入研究。

退出移动版