共计 7996 个字符,预计需要花费 20 分钟才能阅读完成。
本文收录在个人博客:www.chengxy-nds.top,同进步
一、canal 是个啥?
canal
是阿里开发的一款基于数据库增量日志解析,提供增量数据订阅与消费的框架,整个框架纯 JAVA
开发,目前仅支持 Mysql
和MariaDB
(和 mysql 类似)。
那什么是数据库增量日志?
MySQL 的日志种类是比较多的,主要包含:错误日志、查询日志、慢查询日志、事务日志、二进制日志。而 MySQL
数据库所发生的数据变更(DML
(data manipulation language)数据操纵语言,也就是我们熟悉的增删改),都会以二进制日志 (binary log
) 形式存储。
二、canal 原理
在介绍 canal
原理之前,我们先来回顾一下 MySQL
主从同步的原理,这或许会让你更好的理解 canal
的工作机制。
1、MySQL 主从同步原理:
MySQL 主从同步也叫读写分离,可以提升数据库的负载和容错能力,实现数据库的高可用
先来分析一张 MySQL 主从同步原理图:
以上图片源自网络,如有侵权联系删除
master 节点操作过程:
当 master
节点数据发生更改后(delete、update、insert,还是创建函数、存储过程等操作),向 binary log
中写入记录日志,这些记录又叫做 二进制日志事件
(binary log events)。
show binlog events
这些事件会按照顺序写入 bin log 中。当 slave 节点启动连接到 master 节点的时候,master 节点会为 slave 节点开启 binlog dump 线程(负责传输 binlog 数据)。
一旦 master 节点的 bin log 发生变化时,bin log
dump 线程会通知 slave 节点有可以传输的 binlog,并将相应的 bin log 内容发送给 slave 节点。
slave 节点操作过程:
slave 节点上会创建两个线程:一个I/ O 线程,一个SQL 线程。I/ O 线程连接到 master 节点,master 节点上的binlog dump
线程会将 binlog 的内容发送给该 IO 线程。
该 I / O 线程接收到 binlog 内容后,再将内容写入到本地的 relay log。而 sql 线程读取到 I / O 线程写入的 ralay log,将 relay log 中的内容写入 slave 数据库。
2、canal 原理
懂了上边 MySQL 的主从同步原理,canal 的工作机制就很好理解了。其实 canal 是模拟了 MySQL 数据库中,slave 节点与 master 节点的交互协议,伪装自己为 MySQL slave 节点,向 MySQL master 节点发送dump 协议
,MySQL master 节点收到 dump 请求,开始推送 binary log 给 slave 节点(也就是canal
)。
以上图片源自网络,如有侵权联系删除
光说不练假把式,开干!
三、canal 实现“监控”MySQL
在写代码前我们先对 MySQL 进行一下改造,安装 MySQL 就不再细说了,基本操作。
1、查看一下 MySQL 是否开启了 binary log 功能
show binary logs
如果没有开启是图中的状态,一般用户是没有这个命令权限的,不过我有,啧啧啧!
如果没有需要手动开启,并且在 my.cnf
文件中配置 binlog-format
为Row
模式
log-bin=mysq-bin
binlog-format=Row
log-bin
是 binlog
文件存放位置binlog-format
设置 MySQL 复制 log-bin 的方式
MySQL 的三种复制方式:
基于 SQL 语句的复制(statement-based replication, SBR)
- 优点:将修改数据的 sql 保存在 binlog,不需要记录每一条 sql 和数据变化,binlog 体量会很小,IO 开销少,性能好
- 缺点:会导致 master-slave 中的数据不一致
基于行的复制(row-based replication, RBR)
- 优点:不记录每条 sql 语句的上下文信息,仅需记录哪条数据被修改了,修改成什么样了
- 缺点:binlog 体积很大,尤其是在 alter table 属性时,会产生大量 binlog 数据
混合模式复制(mixed-based replication, MBR)
- 对应的,binlog 的格式也有三种:STATEMENT,ROW,MIXED。
2、为 canal 创建一个有权限操作 MySQL 的用户
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3、安装 canal
下载地址:https://github.com/alibaba/ca…
下载后选择版本例如:canal.deployer-xxx.tar.gz
4、配置 canal
修改 instance.properties 文件,需要添加监听数据库和表的规则,canal 可以全量监听数据库,也可以针对某个表进行监听,比较灵活。
vim conf/example/instance.properties
#################################################
## mysql serverId
canal.instance.mysql.slaveId = 2020
# position info 修改自己的数据库(canal 要监听的数据库 地址)canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
# username/password 修改成自己 数据库信息的账号(单独开一个 准备阶段创建的账号)canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
# table regex 表的监听规则
# canal.instance.filter.regex = blogs\.blog_info
canal.instance.filter.regex = .\*\\\\..\*
# table black regex
canal.instance.filter.black.regex =
启动 canal
sh bin/startup.sh
看一下 server 日志,确认一下 canal 是否正常启动
vi logs/canal/canal.log
显示 canal server is running now 即为成功
2020-01-08 15:25:33.361 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2020-01-08 15:25:33.468 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.12.245:11111]
2020-01-08 15:25:34.061 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
5、编写 Java 客户端代码,实现 canal 监听
引入依赖包
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
这里只是简单实现
public class MainApp {public static void main(String... args) throws Exception {
/**
* 创建与
*/
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "","");
int batchSize = 1000;
int emptyCount = 0;
try {connector.connect();
/**
* 监控数据库中所有表
*/
connector.subscribe(".*\\..*");
/**
* 指定要监控的表,库名. 表名
*/
//connector.subscribe("xin-master.jk_order");
connector.rollback();
//120 次心跳过后未检测到,跳出
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count :" + emptyCount);
try {Thread.sleep(1000);
} catch (InterruptedException e) {}} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
/**
* 提交确认
*/
connector.ack(batchId);
/**
* 处理失败, 回滚数据
*/
connector.rollback(batchId);
}
System.out.println("empty too many times, exit");
} finally {connector.disconnect();
/**
* 手动开启事务回滚
*/
//TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
}
private static void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry
.EntryType
.TRANSACTIONEND) {continue;}
CanalEntry.RowChange rowChage = null;
try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {printColumn(rowData.getAfterColumnsList());
} else {System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() + ":" + column.getValue() + "update=" + column.getUpdated());
}
}
}
代码到这就编写完成了,我们启动服务看下是什么效果,由于并没有操作数据库,所以监听的结果都是空的。
接下来我们在数据库执行一条 update
语句试试
update jk_orderset order_no = '1111' where id = 40
控制台检测到了数据库的修改,并生成 binlog 日志文件 mysql-bin.000009:3830
那么生成的 binlog 文件该怎么用,如何解析成 SQl 语句呢?
<!-- mysql binlog 解析 -->
<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.13.0</version>
</dependency>
将刚才的 binlog 文件下载本地测试一下
public static void main(String[] args) throws IOException {
String filePath = "C:\\ProgramData\\MySQL\\MySQL Server 5.7\\Data\\mysql-bin.000009:3830";
File binlogFile = new File(filePath);
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setChecksumType(ChecksumType.CRC32);
BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer);
try {for (Event event; (event = reader.readEvent()) != null; ) {System.out.println(event.toString());
}
} finally {reader.close();
}
}
查看一下执行结果,发现数据库最近的一次操作是加了一个 idx_index 索引
Event{header=EventHeaderV4{timestamp=1551325542000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=8455, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1551325542000, eventType=QUERY, serverId=1, headerLength=19, dataLength=190, nextPosition=8664, flags=0}, data=QueryEventData{threadId=25, executionTime=0, errorCode=0, database='xin-master', sql='ALTER TABLE `jk_order`
DROP INDEX `idx_index` ,
ADD INDEX `idx_index` (`user_id`, `service_id`, `real_price`) USING BTREE'}}
Event{header=EventHeaderV4{timestamp=1551438586000, eventType=STOP, serverId=1, headerLength=19, dataLength=4, nextPosition=8687, flags=0}, data=null}
至此我们就已经实现了监控 MySQL,
四、canal 应用场景
canal
应用场景大致有以下:
- 解决 MySQL 主从同步延迟的问题
-
实现数据库实时备份
- 多级索引 (卖家和买家各自分库索引)
- 实现业务 cache 刷新
- 价格变化等重要业务消息
重点分析一下 canal 是如何解决 MySQL 主从同步延迟的问题
生产环境下 MySQL
的主从同步模式(maser-slave
)很常见,但对于跨机房部署的集群,会出现同步延时的情况。举个栗子:
一条订单状态是未付款,master
节点修改成已付款,可由于某些原因出现延迟数据未能及时同步到slave
,这时用户立即查看订单状态(查询走slave
)显示还是未付款,哪个用户看到这种情况不得慌啊。
为什么会出现主从同步延迟呢?
当主库 master
的TPS
并发较高时,master
节点并发产生的修改操作,而 slave
节点的 sql 线程
是单线程处理同步数据,延时自然而言就产生了。
不过造成主从同步的原因不止这些,由于主从服务器存在跨机器并且跨机房,除了网络带宽原因之外,网络的稳定性以及机器之间的同步,都是主从同步应该考虑的主要原因。
总结
本文只是简单实现 canal 监听数据库的功能,旨在给大家提供一种解决问题的思路,还是反复絮叨的那句话,解决问题的技术方法很对,具体如何应用还需结合具体业务。
原创不易,燃烧秀发输出内容
习惯在 VX 看技术文章,想要获取更多 Java 资源的同学,可以关注我的公众号:程序员内点事,暗号:[ 666 ]