SpringBoot canal数据同步解决方案
一、需求
微服务多数据库情况下可以使用canal替代触发器,canal是应阿里巴巴跨机房同步的业务需求而提出的,canal基于数据库的日志解析,获取变更进行增量订阅&消费的业务。无论是canal实验需要还是为了增量备份、主从复制和恢复,都是需要开启mysql-binlog日志,数据目录设置到不同的磁盘分区可以降低io等待。
canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
二、部署环境
1、登录mysql查看是否开启binlog,标红的log_bin默认是OFF关
mysql> show variables like 'log_%';+----------------------------------------+-------------------------------------------------------+| Variable_name | Value |+----------------------------------------+-------------------------------------------------------+| **log_bin | OFF** || log_bin_basename | || log_bin_index | || log_bin_trust_function_creators | OFF || log_bin_use_v1_row_events | OFF || log_builtin_as_identified_by_password | OFF || log_error | F:\tools\mysql-5.7.28-winx64\Data\DESKTOP-C1LU9IQ.err || log_error_verbosity | 3 || log_output | FILE || log_queries_not_using_indexes | OFF || log_slave_updates | OFF || log_slow_admin_statements | OFF || log_slow_slave_statements | OFF || log_statements_unsafe_for_binlog | ON || log_syslog | ON || log_syslog_tag | || log_throttle_queries_not_using_indexes | 0 || log_timestamps | UTC || log_warnings | 2 |+----------------------------------------+-------------------------------------------------------+19 rows in set (0.03 sec)
2、编辑配置文件
[mysqld]# 设置3306端口port=3306# 设置mysql的安装目录,按照个人的实际需要改basedir=F:\\tools\\mysql-5.7.28-winx64 # 切记此处一定要用双斜杠\\,单斜杠我这里会出错,不过看别人的教程,有的是单斜杠。自己尝试吧# 设置mysql数据库的数据的存放目录datadir=F:\\tools\\mysql-5.7.28-winx64\\Data # 此处同上# 允许最大连接数max_connections=200# 允许连接失败的次数。这是为了防止有人从该主机试图攻击数据库系统max_connect_errors=10# 服务端使用的字符集默认为UTF8character-set-server=utf8# 创建新表时将使用的默认存储引擎default-storage-engine=INNODB# 默认使用“mysql_native_password”插件认证default_authentication_plugin=mysql_native_passwordlower_case_table_names=2sql_mode = STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTIONmax_connections=1000#实验重点配置 # 开启 binloglog-bin=mysql-bin# 选择 ROW 模式binlog-format=ROW # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复server_id=1 [mysql]# 设置mysql客户端默认字符集default-character-set=utf8[client]# 设置mysql客户端连接服务端时默认使用的端口port=3306default-character-set=utf8
3、创建MySQL slave 的权限canal账户并且进行远程连接授权
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;FLUSH PRIVILEGES;
4、记得重启mysql服务
Linux:systemctl restart mysqldWindow:net stop mysql;net start mysql;
三、canal快速部署配置
1、修改配置conf/example/instance.properties
## mysql serverIdcanal.instance.mysql.slaveId = 1234#position info,需要改成自己的数据库信息canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name =#canal.instance.standby.position = #canal.instance.standby.timestamp = #username/password,需要改成自己的数据库信息canal.instance.dbUsername = canal canal.instance.dbPassword = canalcanal.instance.defaultDatabaseName =canal.instance.connectionCharset = UTF-8#table regexcanal.instance.filter.regex = .\*\\\\..\*
2、通过启动脚本运行:sh bin/startup.sh
3、查看 server 日志和instance 的日志
$ tail -f logs/canal/canal.log2020-05-28 13:52:03.037 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler2020-05-28 13:52:03.065 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations2020-05-28 13:52:03.072 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.2020-05-28 13:52:03.444 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.36.58.25(172.36.58.25):11111]2020-05-28 13:52:04.604 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......$ tail -f logs/example/example.log2020-05-28 13:52:04.238 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]2020-05-28 13:52:04.264 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]2020-05-28 13:52:04.265 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]2020-05-28 13:52:04.568 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example2020-05-28 13:52:04.572 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$2020-05-28 13:52:04.573 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :2020-05-28 13:52:04.577 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....2020-05-28 13:52:04.616 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position2020-05-28 13:52:04.616 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status2020-05-28 13:52:06.556 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=4,serverId=1,gtid=<null>,timestamp=1590644973000] cost : 1935ms , the next step is binlog dump
四、初步监听实验
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency>
import java.net.InetSocketAddress;import java.util.List;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.common.utils.AddressUtils;import com.alibaba.otter.canal.protocol.Message;import com.alibaba.otter.canal.protocol.CanalEntry.Column;import com.alibaba.otter.canal.protocol.CanalEntry.Entry;import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;import com.alibaba.otter.canal.protocol.CanalEntry.EventType;import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;import com.alibaba.otter.canal.protocol.CanalEntry.RowData;public class SimpleCanalClientExample { public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } }}
随便插入数据触发
INSERT INTO `demo`.`tb_ad`(`id`, `url`, `status`, `position`, `image`, `start_time`, `end_time`) VALUES (1, 'https://www.baidu.com/', '1', 'web_index_lb', 'https://pics1.baidu.com/feed/c83d70cf3bc79f3d5c30d358deb67a17738b29a6.jpeg?https://kins.oss-cn-shenzhen.aliyuncs.com/yhzb/2020-03-11/ca21b3b17d6f4757b991dd86b8cef3fa-VIP-680.jpeg', '2020-05-22 10:58:08', '2021-06-01 10:58:14');
从控制台中看到
empty count : 66empty count : 67empty count : 68empty count : 69empty count : 70================> binlog[mysql-bin.000001:355] , name[demo,tb_ad] , eventType : INSERTid : 2 update=trueurl : https://www.baidu.com/ update=truestatus : 1 update=trueposition : web_index_lb update=trueimage : https://pics1.baidu.com/feed/c83d70cf3bc79f3d5c30d358deb67a17738b29a6.jpeg?https://kins.oss-cn-shenzhen.aliyuncs.com/yhzb/2020-03-11/ca21b3b17d6f4757b991dd86b8cef3fa-VIP-680.jpeg update=truestart_time : 2020-05-22 10:58:08 update=trueend_time : 2021-06-01 10:58:14 update=true
五、数据监控微服务
<!-- 第三方starter快速整合canal https://github.com/NormanGyllenhaal/canal-client--><!-- https://mvnrepository.com/artifact/top.javatool/canal-spring-boot-starter --><dependency> <groupId>top.javatool</groupId> <artifactId>canal-spring-boot-starter</artifactId> <version>1.2.1-RELEASE</version></dependency>
订阅数据库的增删改操作
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import top.javatool.canal.client.annotation.CanalTable;import top.javatool.canal.client.handler.EntryHandler;@Component@CanalTable(value = "t_user")public class UserHandler implements EntryHandler<User> { private Logger logger = LoggerFactory.getLogger(UserHandler.class); public void insert(User user) { logger.info("insert message {}", user); } public void update(User before, User after) { logger.info("update before {} ", before); logger.info("update after {}", after); } public void delete(User user) { logger.info("delete {}", user); }}
启动数据监控微服务,修改user表,观察控制台输出。
2020-05-28 16:23:22.667 INFO 24284 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient : 获取消息 Message[id=23,entries=[header { version: 1 logfileName: "mysql-bin.000001" logfileOffset: 18380 serverId: 1 serverenCode: "UTF-8" executeTime: 1590654201000 sourceType: MYSQL schemaName: "" tableName: "" eventLength: 68}entryType: TRANSACTIONBEGINstoreValue: " \025", header { version: 1 logfileName: "mysql-bin.000001" logfileOffset: 18505 serverId: 1 serverenCode: "UTF-8" executeTime: 1590654201000 sourceType: MYSQL schemaName: "demo" tableName: "t_user" eventLength: 88 eventType: UPDATE props { key: "rowsCount" value: "1" }}entryType: ROWDATAstoreValue: "\b\210\002\020\002P\000b\370\003\n\033\b\000\020\004\032\002id \001(\0000\000B\00221R\aint(11)\n*\b\001\020\f\032\tuser_name \000(\0000\000B\005ZeldaR\fvarchar(255)\n*\b\002\020\372\377\377\377\377\377\377\377\377\001\032\006gender \000(\0000\000B\0010R\ntinyint(4)\n\"\b\003\020\004\032\ncountry_id \000(\0000\000B\0011R\aint(11)\n&\b\004\020[\032\bbirthday \000(\0000\000B\n1998-04-18R\004date\n7\b\005\020]\032\vcreate_time \000(\0000\000B\0231991-01-10 05:45:50R\ttimestamp\022\033\b\000\020\004\032\002id \001(\0000\000B\00221R\aint(11)\022.\b\001\020\f\032\tuser_name \000(\0010\000B\tZelda1111R\fvarchar(255)\022*\b\002\020\372\377\377\377\377\377\377\377\377\001\032\006gender \000(\0000\000B\0010R\ntinyint(4)\022\"\b\003\020\004\032\ncountry_id \000(\0000\000B\0011R\aint(11)\022&\b\004\020[\032\bbirthday \000(\0000\000B\n1998-04-18R\004date\0227\b\005\020]\032\vcreate_time \000(\0000\000B\0231991-01-10 05:45:50R\ttimestamp", header { version: 1 logfileName: "mysql-bin.000001" logfileOffset: 18593 serverId: 1 serverenCode: "UTF-8" executeTime: 1590654201000 sourceType: MYSQL schemaName: "" tableName: "" eventLength: 31}entryType: TRANSACTIONENDstoreValue: "\022\0041574"],raw=false,rawEntries=[]]2020-05-28 16:23:22.668 INFO 24284 --- [xecute-thread-6] t.j.canal.example.handler.UserHandler : update before User{id=null, userName='Zelda', gender=null, countryId=null, birthday=null, createTime=null} 2020-05-28 16:23:22.668 INFO 24284 --- [xecute-thread-6] t.j.canal.example.handler.UserHandler : update after User{id=21, userName='Zelda1111', gender=0, countryId=1, birthday=Sat Apr 18 00:00:00 CST 1998, createTime=Thu Jan 10 05:45:50 CST 1991}