一、背景

工作中有个需要,当数据库的数据变更时,另外一个零碎中的数据要能及时感应到,通过调研晓得,监听数据库的binlog能够做到一个准实时的告诉,而canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和生产,正好满足需要,此处记录一下canal的简略应用。

二、canal的工作原理

步骤:

  1. canal模仿mysql slave的交互协定,假装本人为mysql slave,向mysql master发送dump协定
  2. mysql master收到dump申请,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

三、装置canal

1、mysql配置相干

1、检测binlog是否开启

mysql> show variables like 'log_bin';+---------------+-------+| Variable_name | Value |+---------------+-------+| log_bin       | ON    |+---------------+-------+1 row in set (0.00 sec)

log_bin的值为ON阐明关上了。

2、mysql开启binlog

[mysqld]#binlog日志的根本文件名,须要留神的是启动mysql的用户须要对这个目录(/usr/local/var/mysql/binlog)有写入的权限log_bin=/usr/local/var/mysql/binlog/mysql-bin# 配置binlog日志的格局binlog_format = ROW# 配置 MySQL replaction 须要定义,不能和 canal 的 slaveId 反复server-id=1# 设置中继日志的门路relay_log=/usr/local/var/mysql/relaylog/mysql-relay

3、创立canal用户

CREATE USER canal IDENTIFIED BY 'canal';  GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;FLUSH PRIVILEGES;

2、canal配置相干

1、下载canal

 # 1.下载 deployer $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz # 下载适配器,不是必须的 $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz  # 下载治理台,不是必须的 $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz # 下载示例程序 $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.example-1.1.5.tar.gz  # 2.解压 deployer (解压后的目录要存在) tar -zxvf canal.deployer-1.1.5.tar.gz -C /Users/huan/soft/canal/deployer/  # 3. 查看 conf 目录构造 $ tree conf                   conf ├── canal.properties ├── canal_local.properties ├── example │   └── instance.properties ├── logback.xml ├── metrics │   └── Canal_instances_tmpl.json └── spring    ├── base-instance.xml    ├── default-instance.xml    ├── file-instance.xml    ├── group-instance.xml    ├── memory-instance.xml    └── tsdb        ├── h2-tsdb.xml        ├── mysql-tsdb.xml        ├── sql        │   └── create_table.sql        └── sql-map            ├── sqlmap-config.xml            ├── sqlmap_history.xml            └── sqlmap_snapshot.xml

2、配置一个instance

instance:一个instance就是一个音讯队列,每个instance通道都有各自的一份配置,因为每个mysql的ip,帐号,明码等信息各不相同。

一个canal server能够存在多个instance

1、复制conf/example文件夹

cp -r conf/example conf/customer

customer能够简略了解为此处我须要链接customer数据库,因而命名为这个。

2、批改instance的配置

vim conf/customer/instance.properties

# mysql集群配置中的serverId概念,须要保障和以后mysql集群中id惟一 (v1.1.x版本之后canal会主动生成,不须要手工指定)# canal.instance.mysql.slaveId=0# mysql主库链接地址canal.instance.master.address=127.0.0.1:3306# mysql主库链接时起始的binlog文件canal.instance.master.journal.name=# mysql主库链接时起始的binlog偏移量canal.instance.master.position=# mysql主库链接时起始的binlog的工夫戳canal.instance.master.timestamp=# mysql数据库帐号(此处的用户名和明码为 装置canal#mysql配置相干#创立canal用户 这一步创立的用户名和明码)canal.instance.dbUsername=canal# mysql数据库明码canal.instance.dbPassword=canal# mysql 数据解析编码canal.instance.connectionCharset = UTF-8# mysql 数据解析关注的表,Perl正则表达式,即咱们须要关注那些库和那些表的binlog数据,也能够在canal client api中手动笼罩canal.instance.filter.regex=.*\\..*# table black regex# mysql 数据解析表的黑名单,表达式规定见白名单的规定canal.instance.filter.black.regex=mysql\\.slave_.*

3、instance注意事项

1、配置须要关注那个库和那个表的binlog

批改 instance.properties文件的这个属性canal.instance.filter.regex,规定如下:

  1. 所有表:. or .*\\..
  2. canal schema下所有表: canal\\..*
  3. canal下的以canal打头的表:canal\\.canal.*
  4. canal schema下的一张表:canal\\.test1
  5. 多个规定组合应用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
2、mysql链接时的起始地位
  • canal.instance.master.journal.name + canal.instance.master.position : 准确指定一个binlog位点,进行启动
  • canal.instance.master.timestamp : 指定一个工夫戳,canal会主动遍历mysql binlog,找到对应工夫戳的binlog位点后,进行启动
  • 不指定任何信息:默认从以后数据库的位点,进行启动。(show master status)
3、mysql解析关注表定义
  • 规范的Perl正则,留神本义时须要双斜杠:\
4、mysql链接的编码
  • 目前canal版本仅反对一个数据库只有一种编码,如果一个库存在多个编码,须要通过filter.regex配置,将其拆分为多个canal instance,为每个instance指定不同的编码
5、instance.properties配置参考链接:

https://github.com/alibaba/canal/wiki/AdminGuide

3、canal.properties配置相干

vim conf/canal.properties

# canal server绑定的本地IP信息,如果不配置,默认抉择一个本机IP进行启动服务canal.ip = 127.0.0.1# canal server提供socket服务的端口canal.port = 1111# metrics 端口canal.metrics.pull.port = 11112# canal 服务的用户名(客户端连贯的时候须要这个用户名和明码,也能够不配置)canal.user = canal# canal 服务的明码canal.passwd = 123456# tcp, kafka, rocketMQ, rabbitMQ(如果咱们要将数据发送到kafka中,则此处写kafka,而后配置kafka的配置,此处以tcp演示)canal.serverMode = tcp# 以后server上部署的instance列表,此处写 customer ,则和 conf 目录同级下必须要有一个 customer 文件夹,即上一步咱们创立的,如果有多个instance说,则以英文的逗号隔开canal.destinations = customer# 如果零碎是1个cpu,那么须要将这个并行设置成falsecanal.instance.parser.parallel = true

注意事项:

1、canal.destinations配置

在canal.properties定义了canal.destinations后,须要在canal.conf.dir对应的目录下建设同名的文件

比方:

canal.destinations = example1,example2

这时须要创立example1和example2两个目录,每个目录里各自有一份instance.properties.

ps. canal自带了一份instance.properties demo,可间接复制conf/example目录进行配置批改

cp -R example example1/cp -R example example2/

2、canal.auto.scan配置

如果canal.properties未定义instance列表,但开启了canal.auto.scan时

  • server第一次启动时,会主动扫描conf目录下,将文件名做为instance name,启动对应的instance
  • server运行过程中,会依据canal.auto.scan.interval定义的频率,进行扫描

    1. 发现目录有新增,启动新的instance
    2. 发现目录有删除,敞开老的instance
    3. 发现对应目录的instance.properties有变动,重启instance

3、参考链接

参考链接:https://github.com/alibaba/canal/wiki/AdminGuide

4、启动canal

1、启动canal

# 启动canal serversh bin/startup.sh

2、查看日志

# canal查看日志tail -f -n200 logs/canal/canal.log# 如果canal启动失败则须要查看此日志tail -f -n200 logs/canal/canal_stdout.log# 查看instance日志,由下面的配置可知,咱们的instance的名字是customer,所以看这个日志. tail -f -n200 logs/customer/customer.log

3、jdk版本

启动的时候须要留神一下本地JDK的版本,测试时发现应用jdk11不能启动,应用jdk8能够启动。

四、客户端生产canal数据

1、引入依赖

<dependencies>  <dependency>    <groupId>com.alibaba.otter</groupId>    <artifactId>canal.client</artifactId>    <version>1.1.5</version>  </dependency>  <dependency>    <groupId>com.alibaba.otter</groupId>    <artifactId>canal.protocol</artifactId>    <version>1.1.5</version>  </dependency>  <dependency>    <groupId>com.alibaba.otter</groupId>    <artifactId>canal.common</artifactId>    <version>1.1.5</version>  </dependency></dependencies>

2、编写客户端代码

import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry;import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;import java.util.List;import java.util.concurrent.TimeUnit;/** * canal client api 的应用 * https://github.com/alibaba/canal/wiki/ClientExample * 测试过程中发现,如果批改一个sql语句,然而批改的值没有发生变化,则此处不会监控到。 * 同一个客户端启动屡次,只有一个客户端能够获取到数据 * * @author huan.fu 2021/5/31 - 上午10:31 */public class CanalClientApi {    public static void main(String[] args) {        String destination = "customer";        // 创立一个 canal 链接        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), destination, "admin", "admin");        // 链接对应的canal server        canalConnector.connect();        // 订阅那个库的那个表等        /**         * 订阅规定         * 1.  所有表:.*   or  .*\\..*         * 2.  canal schema下所有表: canal\\..*         * 3.  canal下的以canal打头的表:canal\\.canal.*         * 4.  canal schema下的一张表:canal\\.test1         * 5.  多个规定组合应用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)         */        canalConnector.subscribe("temp_work\\.customer");        // 回滚到未进行 #ack 的中央,下次fetch的时候,能够从最初一个没有 #ack 的中央开始拿        canalConnector.rollback();        int batchSize = 1000;        while (true) {            // 获取一批数据,不肯定会获取到 batchSize 条            Message message = canalConnector.getWithoutAck(batchSize);            // 获取批次id            long batchId = message.getId();            // 获取数据            List<CanalEntry.Entry> entries = message.getEntries();            if (batchId == -1 || entries.isEmpty()) {                System.out.println("没有获取到数据");                try {                    TimeUnit.SECONDS.sleep(3);                } catch (InterruptedException e) {                    e.printStackTrace();                }                continue;            }            for (CanalEntry.Entry entry : entries) {                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {                    continue;                }                CanalEntry.RowChange rowChange;                try {                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());                } catch (Exception e) {                    throw new RuntimeException("解析binlog数据出现异常 , data:" + entry.toString(), e);                }                CanalEntry.EventType eventType = rowChange.getEventType();                System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),                        eventType));                if (eventType == CanalEntry.EventType.QUERY || rowChange.getIsDdl()) {                    System.out.println("sql => " + rowChange.getSql());                }                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {                    if (eventType == CanalEntry.EventType.DELETE) {                        printColumn(rowData.getBeforeColumnsList());                    } else if (eventType == CanalEntry.EventType.INSERT) {                        printColumn(rowData.getAfterColumnsList());                    } else {                        System.out.println("-------> before");                        printColumn(rowData.getBeforeColumnsList());                        System.out.println("-------> after");                        printColumn(rowData.getAfterColumnsList());                    }                }            }            canalConnector.ack(batchId);        }    }    private static void printColumn(List<CanalEntry.Column> columns) {        for (CanalEntry.Column column : columns) {            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());        }    }}

留神⚠️:

经测试,同一个客户端代码,启动多份,模仿多个客户端同时生产,只有一个客户端能够生产数据。

3、测试后果

没有获取到数据没有获取到数据================> binlog[mysql-bin.000014:5771] , name[temp_work,customer] , eventType : UPDATE-------> beforeid : 12    update=falsephone : 123    update=falseaddress : abcdefg    update=falsecolumn_4 :     update=false-------> afterid : 12    update=falsephone : 123    update=falseaddress : 数据扭转    update=truecolumn_4 :     update=false没有获取到数据没有获取到数据

能够看到获取到了,扭转后的数据。

五、残缺代码

代码门路:https://gitee.com/huan1993/spring-cloud-parent/blob/master/canal/canal-api/src/main/java/CanalClientApi.java

六、参考链接

1、https://github.com/alibaba/canal/wiki/简介

2、https://github.com/alibaba/canal/wiki/AdminGuide

3、https://github.com/alibaba/canal/wiki/常见问题解答