SpringBoot-canal数据同步解决方案

3次阅读

共计 12447 个字符,预计需要花费 32 分钟才能阅读完成。

SpringBoot canal 数据同步解决方案

一、需求

微服务多数据库情况下可以使用 canal 替代触发器,canal 是应阿里巴巴跨机房同步的业务需求而提出的,canal 基于数据库的日志解析,获取变更进行增量订阅 & 消费的业务。无论是 canal 实验需要还是为了增量备份、主从复制和恢复,都是需要开启 mysql-binlog 日志,数据目录设置到不同的磁盘分区可以降低 io 等待。

canal 工作原理

  1. canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave,向 MySQL master 发送 dump 协议
  2. MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal)
  3. canal 解析 binary log 对象 (原始为 byte 流)

二、部署环境

1、登录 mysql 查看是否开启 binlog,标红的 log_bin 默认是 OFF 关

mysql> show variables like 'log_%';
+----------------------------------------+-------------------------------------------------------+
| Variable_name                          | Value                                                 |
+----------------------------------------+-------------------------------------------------------+
| **log_bin                                | OFF**                                                   |
| log_bin_basename                       |                                                       |
| log_bin_index                          |                                                       |
| log_bin_trust_function_creators        | OFF                                                   |
| log_bin_use_v1_row_events              | OFF                                                   |
| log_builtin_as_identified_by_password  | OFF                                                   |
| log_error                              | F:\tools\mysql-5.7.28-winx64\Data\DESKTOP-C1LU9IQ.err |
| log_error_verbosity                    | 3                                                     |
| log_output                             | FILE                                                  |
| log_queries_not_using_indexes          | OFF                                                   |
| log_slave_updates                      | OFF                                                   |
| log_slow_admin_statements              | OFF                                                   |
| log_slow_slave_statements              | OFF                                                   |
| log_statements_unsafe_for_binlog       | ON                                                    |
| log_syslog                             | ON                                                    |
| log_syslog_tag                         |                                                       |
| log_throttle_queries_not_using_indexes | 0                                                     |
| log_timestamps                         | UTC                                                   |
| log_warnings                           | 2                                                     |
+----------------------------------------+-------------------------------------------------------+
19 rows in set (0.03 sec)

2、编辑配置文件

[mysqld]
# 设置 3306 端口
port=3306
# 设置 mysql 的安装目录,按照个人的实际需要改
basedir=F:\\tools\\mysql-5.7.28-winx64   # 切记此处一定要用双斜杠 \\,单斜杠我这里会出错,不过看别人的教程,有的是单斜杠。自己尝试吧
# 设置 mysql 数据库的数据的存放目录
datadir=F:\\tools\\mysql-5.7.28-winx64\\Data   # 此处同上
# 允许最大连接数
max_connections=200
# 允许连接失败的次数。这是为了防止有人从该主机试图攻击数据库系统
max_connect_errors=10
# 服务端使用的字符集默认为 UTF8
character-set-server=utf8
# 创建新表时将使用的默认存储引擎
default-storage-engine=INNODB
# 默认使用“mysql_native_password”插件认证
default_authentication_plugin=mysql_native_password
lower_case_table_names=2
sql_mode = STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION
max_connections=1000
#实验重点配置
 # 开启 binlog
log-bin=mysql-bin
# 选择 ROW 模式
binlog-format=ROW 
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server_id=1 

[mysql]
# 设置 mysql 客户端默认字符集
default-character-set=utf8
[client]
# 设置 mysql 客户端连接服务端时默认使用的端口
port=3306
default-character-set=utf8

3、创建 MySQL slave 的权限 canal 账户并且进行远程连接授权

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

4、记得重启 mysql 服务

Linux:systemctl restart mysqld
Window:
net stop mysql;
net start mysql;

三、canal 快速部署配置

1、修改配置 conf/example/instance.properties

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*

2、通过启动脚本运行:sh bin/startup.sh

3、查看 server 日志和 instance 的日志

$ tail -f logs/canal/canal.log
2020-05-28 13:52:03.037 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2020-05-28 13:52:03.065 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2020-05-28 13:52:03.072 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2020-05-28 13:52:03.444 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.36.58.25(172.36.58.25):11111]
2020-05-28 13:52:04.604 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
$ tail -f logs/example/example.log
2020-05-28 13:52:04.238 [main] WARN  o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]
2020-05-28 13:52:04.264 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-05-28 13:52:04.265 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2020-05-28 13:52:04.568 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2020-05-28 13:52:04.572 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2020-05-28 13:52:04.573 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :
2020-05-28 13:52:04.577 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2020-05-28 13:52:04.616 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2020-05-28 13:52:04.616 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2020-05-28 13:52:06.556 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=4,serverId=1,gtid=<null>,timestamp=1590644973000] cost : 1935ms , the next step is binlog dump

四、初步监听实验

    <dependency>
      <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.client</artifactId>
      <version>1.1.0</version>
  </dependency>
import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

public class SimpleCanalClientExample {public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "","");
        int batchSize = 1000;
        int emptyCount = 0;
        try {connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count :" + emptyCount);
                    try {Thread.sleep(1000);
                    } catch (InterruptedException e) {}} else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } finally {connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}

            RowChange rowChage = null;
            try {rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());
                } else {System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + ":" + column.getValue() + "update=" + column.getUpdated());
        }
    }
}

随便插入数据触发

INSERT INTO `demo`.`tb_ad`(`id`, `url`, `status`, `position`, `image`, `start_time`, `end_time`) VALUES (1, 'https://www.baidu.com/', '1', 'web_index_lb', 'https://pics1.baidu.com/feed/c83d70cf3bc79f3d5c30d358deb67a17738b29a6.jpeg?https://kins.oss-cn-shenzhen.aliyuncs.com/yhzb/2020-03-11/ca21b3b17d6f4757b991dd86b8cef3fa-VIP-680.jpeg', '2020-05-22 10:58:08', '2021-06-01 10:58:14');

从控制台中看到

empty count : 66
empty count : 67
empty count : 68
empty count : 69
empty count : 70
================&gt; binlog[mysql-bin.000001:355] , name[demo,tb_ad] , eventType : INSERT
id : 2    update=true
url : https://www.baidu.com/    update=true
status : 1    update=true
position : web_index_lb    update=true
image : https://pics1.baidu.com/feed/c83d70cf3bc79f3d5c30d358deb67a17738b29a6.jpeg?https://kins.oss-cn-shenzhen.aliyuncs.com/yhzb/2020-03-11/ca21b3b17d6f4757b991dd86b8cef3fa-VIP-680.jpeg    update=true
start_time : 2020-05-22 10:58:08    update=true
end_time : 2021-06-01 10:58:14    update=true

五、数据监控微服务

<!-- 第三方 starter 快速整合 canal https://github.com/NormanGyllenhaal/canal-client-->
<!-- https://mvnrepository.com/artifact/top.javatool/canal-spring-boot-starter -->
<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>

订阅数据库的增删改操作

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

@Component
@CanalTable(value = "t_user")
public class UserHandler implements EntryHandler<User> {private Logger logger = LoggerFactory.getLogger(UserHandler.class);

    public void insert(User user) {logger.info("insert message  {}", user);
    }

    public void update(User before, User after) {logger.info("update before {}", before);
        logger.info("update after {}", after);
    }

    public void delete(User user) {logger.info("delete  {}", user);
    }
}

启动数据监控微服务,修改 user 表,观察控制台输出。

2020-05-28 16:23:22.667  INFO 24284 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient  : 获取消息 Message[id=23,entries=[header {
  version: 1
  logfileName: "mysql-bin.000001"
  logfileOffset: 18380
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1590654201000
  sourceType: MYSQL
  schemaName: ""tableName:""
  eventLength: 68
}
entryType: TRANSACTIONBEGIN
storeValue: "\025"
, header {
  version: 1
  logfileName: "mysql-bin.000001"
  logfileOffset: 18505
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1590654201000
  sourceType: MYSQL
  schemaName: "demo"
  tableName: "t_user"
  eventLength: 88
  eventType: UPDATE
  props {
    key: "rowsCount"
    value: "1"
  }
}
entryType: ROWDATA
storeValue: "\b\210\002\020\002P\000b\370\003\n\033\b\000\020\004\032\002id \001(\0000\000B\00221R\aint(11)\n*\b\001\020\f\032\tuser_name \000(\0000\000B\005ZeldaR\fvarchar(255)\n*\b\002\020\372\377\377\377\377\377\377\377\377\001\032\006gender \000(\0000\000B\0010R\ntinyint(4)\n\"\b\003\020\004\032\ncountry_id \000(\0000\000B\0011R\aint(11)\n&\b\004\020[\032\bbirthday \000(\0000\000B\n1998-04-18R\004date\n7\b\005\020]\032\vcreate_time \000(\0000\000B\0231991-01-10 05:45:50R\ttimestamp\022\033\b\000\020\004\032\002id \001(\0000\000B\00221R\aint(11)\022.\b\001\020\f\032\tuser_name \000(\0010\000B\tZelda1111R\fvarchar(255)\022*\b\002\020\372\377\377\377\377\377\377\377\377\001\032\006gender \000(\0000\000B\0010R\ntinyint(4)\022\"\b\003\020\004\032\ncountry_id \000(\0000\000B\0011R\aint(11)\022&\b\004\020[\032\bbirthday \000(\0000\000B\n1998-04-18R\004date\0227\b\005\020]\032\vcreate_time \000(\0000\000B\0231991-01-10 05:45:50R\ttimestamp"
, header {
  version: 1
  logfileName: "mysql-bin.000001"
  logfileOffset: 18593
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1590654201000
  sourceType: MYSQL
  schemaName: ""tableName:""
  eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\022\0041574"
],raw=false,rawEntries=[]]
2020-05-28 16:23:22.668  INFO 24284 --- [xecute-thread-6] t.j.canal.example.handler.UserHandler    : update before User{id=null, userName='Zelda', gender=null, countryId=null, birthday=null, createTime=null} 
2020-05-28 16:23:22.668  INFO 24284 --- [xecute-thread-6] t.j.canal.example.handler.UserHandler    : update after User{id=21, userName='Zelda1111', gender=0, countryId=1, birthday=Sat Apr 18 00:00:00 CST 1998, createTime=Thu Jan 10 05:45:50 CST 1991}
正文完
 0