关于canal:Canal的简单使用

38次阅读

共计 8914 个字符,预计需要花费 23 分钟才能阅读完成。

一、背景

工作中有个需要,当数据库的数据变更时,另外一个零碎中的数据要能及时感应到,通过调研晓得,监听数据库的 binlog 能够做到一个准实时的告诉,而 canal 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和生产,正好满足需要,此处记录一下 canal 的简略应用。

二、canal 的工作原理

步骤:

  1. canal 模仿 mysql slave 的交互协定,假装本人为 mysql slave,向 mysql master 发送 dump 协定
  2. mysql master 收到 dump 申请,开始推送 binary log 给 slave(也就是 canal)
  3. canal 解析 binary log 对象(原始为 byte 流)

三、装置 canal

1、mysql 配置相干

1、检测 binlog 是否开启

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.00 sec)

log_bin的值为 ON 阐明关上了。

2、mysql 开启 binlog

[mysqld]
#binlog 日志的根本文件名,须要留神的是启动 mysql 的用户须要对这个目录 (/usr/local/var/mysql/binlog) 有写入的权限
log_bin=/usr/local/var/mysql/binlog/mysql-bin
# 配置 binlog 日志的格局
binlog_format = ROW
# 配置 MySQL replaction 须要定义,不能和 canal 的 slaveId 反复
server-id=1
# 设置中继日志的门路
relay_log=/usr/local/var/mysql/relaylog/mysql-relay

3、创立 canal 用户

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

2、canal 配置相干

1、下载 canal

 # 1. 下载 deployer
 $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
 # 下载适配器,不是必须的
 $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz 
 # 下载治理台,不是必须的
 $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz
 # 下载示例程序
 $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.example-1.1.5.tar.gz
 
 # 2. 解压 deployer (解压后的目录要存在)
 tar -zxvf canal.deployer-1.1.5.tar.gz -C /Users/huan/soft/canal/deployer/
 
 # 3. 查看 conf 目录构造
 $ tree conf                  
 conf
 ├── canal.properties
 ├── canal_local.properties
 ├── example
 │   └── instance.properties
 ├── logback.xml
 ├── metrics
 │   └── Canal_instances_tmpl.json
 └── spring
    ├── base-instance.xml
    ├── default-instance.xml
    ├── file-instance.xml
    ├── group-instance.xml
    ├── memory-instance.xml
    └── tsdb
        ├── h2-tsdb.xml
        ├── mysql-tsdb.xml
        ├── sql
        │   └── create_table.sql
        └── sql-map
            ├── sqlmap-config.xml
            ├── sqlmap_history.xml
            └── sqlmap_snapshot.xml

2、配置一个 instance

instance:一个 instance 就是一个音讯队列,每个 instance 通道都有各自的一份配置,因为每个 mysql 的 ip,帐号,明码等信息各不相同。

一个 canal server 能够存在多个instance

1、复制 conf/example 文件夹

cp -r conf/example conf/customer

customer能够简略了解为此处我须要链接 customer 数据库,因而命名为这个。

2、批改 instance 的配置

vim conf/customer/instance.properties

# mysql 集群配置中的 serverId 概念,须要保障和以后 mysql 集群中 id 惟一 (v1.1.x 版本之后 canal 会主动生成,不须要手工指定)
# canal.instance.mysql.slaveId=0
# mysql 主库链接地址
canal.instance.master.address=127.0.0.1:3306
# mysql 主库链接时起始的 binlog 文件
canal.instance.master.journal.name=
# mysql 主库链接时起始的 binlog 偏移量
canal.instance.master.position=
# mysql 主库链接时起始的 binlog 的工夫戳
canal.instance.master.timestamp=

# mysql 数据库帐号(此处的用户名和明码为 装置 canal#mysql 配置相干 #创立 canal 用户 这一步创立的用户名和明码)
canal.instance.dbUsername=canal
# mysql 数据库明码
canal.instance.dbPassword=canal
# mysql 数据解析编码
canal.instance.connectionCharset = UTF-8

# mysql 数据解析关注的表,Perl 正则表达式,即咱们须要关注那些库和那些表的 binlog 数据,也能够在 canal client api 中手动笼罩
canal.instance.filter.regex=.*\\..*
# table black regex
# mysql 数据解析表的黑名单,表达式规定见白名单的规定
canal.instance.filter.black.regex=mysql\\.slave_.*

3、instance 注意事项

1、配置须要关注那个库和那个表的 binlog

批改 instance.properties文件的这个属性canal.instance.filter.regex, 规定如下:

  1. 所有表:. or .*\\..
  2. canal schema 下所有表:canal\\..*
  3. canal 下的以 canal 打头的表:canal\\.canal.*
  4. canal schema 下的一张表:canal\\.test1
  5. 多个规定组合应用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
2、mysql 链接时的起始地位
  • canal.instance.master.journal.name + canal.instance.master.position : 准确指定一个 binlog 位点,进行启动
  • canal.instance.master.timestamp : 指定一个工夫戳,canal 会主动遍历 mysql binlog,找到对应工夫戳的 binlog 位点后,进行启动
  • 不指定任何信息:默认从以后数据库的位点,进行启动。(show master status)
3、mysql 解析关注表定义
  • 规范的 Perl 正则,留神本义时须要双斜杠:\
4、mysql 链接的编码
  • 目前 canal 版本仅反对一个数据库只有一种编码 ,如果一个库存在多个编码,须要通过 filter.regex 配置,将其拆分为多个 canal instance, 为每个 instance 指定不同的编码
5、instance.properties 配置参考链接:

https://github.com/alibaba/canal/wiki/AdminGuide

3、canal.properties 配置相干

vim conf/canal.properties

# canal server 绑定的本地 IP 信息,如果不配置,默认抉择一个本机 IP 进行启动服务
canal.ip = 127.0.0.1
# canal server 提供 socket 服务的端口
canal.port = 1111
# metrics 端口
canal.metrics.pull.port = 11112
# canal 服务的用户名(客户端连贯的时候须要这个用户名和明码,也能够不配置)canal.user = canal
# canal 服务的明码
canal.passwd = 123456
# tcp, kafka, rocketMQ, rabbitMQ(如果咱们要将数据发送到 kafka 中,则此处写 kafka,而后配置 kafka 的配置,此处以 tcp 演示)canal.serverMode = tcp
# 以后 server 上部署的 instance 列表, 此处写 customer , 则和 conf 目录同级下必须要有一个 customer 文件夹,即上一步咱们创立的,如果有多个 instance 说,则以英文的逗号隔开
canal.destinations = customer
# 如果零碎是 1 个 cpu,那么须要将这个并行设置成 false
canal.instance.parser.parallel = true

注意事项:

1、canal.destinations 配置

在 canal.properties 定义了 canal.destinations 后,须要在 canal.conf.dir 对应的目录下建设同名的文件

比方:

canal.destinations = example1,example2

这时须要创立 example1 和 example2 两个目录,每个目录里各自有一份 instance.properties.

ps. canal 自带了一份 instance.properties demo,可间接复制 conf/example 目录进行配置批改

cp -R example example1/
cp -R example example2/

2、canal.auto.scan 配置

如果 canal.properties 未定义 instance 列表,但开启了 canal.auto.scan 时

  • server 第一次启动时,会主动扫描 conf 目录下,将文件名做为 instance name,启动对应的 instance
  • server 运行过程中,会依据 canal.auto.scan.interval 定义的频率,进行扫描

    1. 发现目录有新增,启动新的 instance
    2. 发现目录有删除,敞开老的 instance
    3. 发现对应目录的 instance.properties 有变动,重启 instance

3、参考链接

参考链接:https://github.com/alibaba/canal/wiki/AdminGuide

4、启动 canal

1、启动 canal

# 启动 canal server
sh bin/startup.sh

2、查看日志

# canal 查看日志
tail -f -n200 logs/canal/canal.log
# 如果 canal 启动失败则须要查看此日志
tail -f -n200 logs/canal/canal_stdout.log

# 查看 instance 日志, 由下面的配置可知,咱们的 instance 的名字是 customer, 所以看这个日志. 
tail -f -n200 logs/customer/customer.log

3、jdk 版本

启动的时候须要留神一下本地 JDK 的版本,测试时发现应用 jdk11 不能启动,应用 jdk8 能够启动。

四、客户端生产 canal 数据

1、引入依赖

<dependencies>
  <dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.5</version>
  </dependency>
  <dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.5</version>
  </dependency>
  <dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.common</artifactId>
    <version>1.1.5</version>
  </dependency>
</dependencies>

2、编写客户端代码

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * canal client api 的应用
 * https://github.com/alibaba/canal/wiki/ClientExample
 * 测试过程中发现,如果批改一个 sql 语句,然而批改的值没有发生变化,则此处不会监控到。* 同一个客户端启动屡次,只有一个客户端能够获取到数据
 *
 * @author huan.fu 2021/5/31 - 上午 10:31
 */
public class CanalClientApi {public static void main(String[] args) {

        String destination = "customer";
        // 创立一个 canal 链接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), destination, "admin", "admin");
        // 链接对应的 canal server
        canalConnector.connect();
        // 订阅那个库的那个表等
        /**
         * 订阅规定
         * 1.  所有表:.*   or  .*\\..*
         * 2.  canal schema 下所有表:canal\\..*
         * 3.  canal 下的以 canal 打头的表:canal\\.canal.*
         * 4.  canal schema 下的一张表:canal\\.test1
         * 5.  多个规定组合应用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
         */
        canalConnector.subscribe("temp_work\\.customer");
        // 回滚到未进行 #ack 的中央,下次 fetch 的时候,能够从最初一个没有 #ack 的中央开始拿
        canalConnector.rollback();
        int batchSize = 1000;
        while (true) {
            // 获取一批数据,不肯定会获取到 batchSize 条
            Message message = canalConnector.getWithoutAck(batchSize);
            // 获取批次 id
            long batchId = message.getId();
            // 获取数据
            List<CanalEntry.Entry> entries = message.getEntries();
            if (batchId == -1 || entries.isEmpty()) {System.out.println("没有获取到数据");
                try {TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {e.printStackTrace();
                }
                continue;
            }
            for (CanalEntry.Entry entry : entries) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}

                CanalEntry.RowChange rowChange;
                try {rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {throw new RuntimeException("解析 binlog 数据出现异常 , data:" + entry.toString(), e);
                }

                CanalEntry.EventType eventType = rowChange.getEventType();
                System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                        eventType));

                if (eventType == CanalEntry.EventType.QUERY || rowChange.getIsDdl()) {System.out.println("sql =>" + rowChange.getSql());
                }

                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == CanalEntry.EventType.INSERT) {printColumn(rowData.getAfterColumnsList());
                    } else {System.out.println("-------> before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------> after");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
            canalConnector.ack(batchId);
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() + ":" + column.getValue() + "update=" + column.getUpdated());
        }
    }
}

留神⚠️:

经测试,同一个客户端代码,启动多份,模仿多个客户端同时生产,只有一个客户端能够生产数据。

3、测试后果

没有获取到数据
没有获取到数据
================> binlog[mysql-bin.000014:5771] , name[temp_work,customer] , eventType : UPDATE
-------> before
id : 12    update=false
phone : 123    update=false
address : abcdefg    update=false
column_4 :     update=false
-------> after
id : 12    update=false
phone : 123    update=false
address : 数据扭转    update=true
column_4 :     update=false
没有获取到数据
没有获取到数据

能够看到获取到了,扭转后的数据。

五、残缺代码

代码门路:https://gitee.com/huan1993/spring-cloud-parent/blob/master/canal/canal-api/src/main/java/CanalClientApi.java

六、参考链接

1、https://github.com/alibaba/canal/wiki/ 简介

2、https://github.com/alibaba/canal/wiki/AdminGuide

3、https://github.com/alibaba/canal/wiki/ 常见问题解答

正文完
 0