canal 是阿里巴巴旗下的一款开源我的项目,纯 Java 开发。基于数据库增量日志解析,提供增量数据订阅 & 生产,目前次要反对了 MySQL(也反对 mariaDB)。
筹备
常见的 binlog 命令
# 是否启用 binlog 日志
show variables like 'log_bin';
# 查看 binlog 类型
show global variables like 'binlog_format';
# 查看具体的日志配置信息
show global variables like '%log%';
# mysql 数据存储目录
show variables like '%dir%';
# 查看 binlog 的目录
show global variables like "%log_bin%";
# 查看以后服务器应用的 biglog 文件及大小
show binary logs;
# 查看最新一个 binlog 日志文件名称和 Position
show master status;
配置 MySQL 的 binlog
对于自建 MySQL , 须要先开启 Binlog 写入性能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 抉择 ROW 模式
server_id=1 # 配置 MySQL replaction 须要定义,不要和 canal 的 slaveId 反复
受权
受权 canal 链接 MySQL 账号具备作为 MySQL slave 的权限, 如果已有账户可间接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
部署 canal
装置 canal
- 下载:点此下载
- 解压缩
[kms@kms-1 softwares]$ tar -xzvf canal.deployer-1.1.4.tar.gz -C /opt/modules/canal/
- 目录构造
drwxr-xr-x 2 root root 4096 Mar 5 14:19 bin
drwxr-xr-x 5 root root 4096 Mar 5 13:54 conf
drwxr-xr-x 2 root root 4096 Mar 5 13:04 lib
drwxrwxrwx 4 root root 4096 Mar 5 14:19 logs
配置批改
- 批改 conf/example/instance.properties,批改内容如下:
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,须要改成本人的数据库信息
canal.instance.master.address = kms-1.apache.com:3306
#username/password,须要改成本人的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
# mq config,kafka topic 名称
canal.mq.topic=test
- 批改 conf/canal.properties,批改内容如下:
# 配置 zookeeper 地址
canal.zkServers =kms-2:2181,kms-3:2181,kms-4:2181
# 可选项: tcp(默认), kafka, RocketMQ,canal.serverMode = kafka
# 配置 kafka 地址
canal.mq.servers = kms-2:9092,kms-3:9092,kms-4:9092
启动 canal
sh bin/startup.sh
敞开 canal
sh bin/stop.sh
部署 Canal Admin(可选)
canal-admin 设计上是为 canal 提供整体配置管理、节点运维等面向运维的性能,提供绝对敌对的 WebUI 操作界面,不便更多用户疾速和平安的操作。
要求
canal-admin 的限定依赖:
- MySQL,用于存储配置和节点等相干数据
- canal 版本,要求 >=1.1.4 (须要依赖 canal-server 提供面向 admin 的动静运维治理接口)
装置 canal-admin
- 下载
点此下载
- 解压缩
[kms@kms-1 softwares]$ tar -xzvf canal.admin-1.1.4.tar.gz -C /opt/modules/canal-admin/
- 目录构造
drwxrwxr-x 2 kms kms 4096 Mar 6 11:25 bin
drwxrwxr-x 3 kms kms 4096 Mar 6 11:25 conf
drwxrwxr-x 2 kms kms 4096 Mar 6 11:25 lib
drwxrwxr-x 2 kms kms 4096 Sep 2 2019 logs
- 配置批改
vi conf/application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: kms-1:3306
database: canal_manager
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
- 初始化原数据库
mysql -uroot -p
# 导入初始化 SQL
#注:(1) 初始化 SQL 脚本里会默认创立 canal_manager 的数据库,倡议应用 root 等有超级权限的账号进行初始化
# (2)canal_manager.sql 默认会在 conf 目录下
> mysql> source /opt/modules/canal-admin/conf/canal_manager.sql
- 启动 canal-admin
sh bin/startup.sh
- 拜访
能够通过 http://kms-1:8089/ 拜访,默认明码:admin/123456
- canal-server 端配置
应用 canal_local.properties 的配置笼罩 canal.properties, 将上面配置内容配置在 canal_local.properties 文件外面,就能够了。
# register ip
canal.register.ip =
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
- 启动 canal-serve
sh bin/startup.sh local
留神:先启 canal-server, 而后再启动 canal-admin,之后登陆 canal-admin 就能够增加 serve 和 instance 了。
启动 kafka 控制台消费者测试
bin/kafka-console-consumer.sh --bootstrap-server kms-2:9092,kms-3:9092,kms-4:9092 --topic test --from-beginning
此时 MySQL 数据表若有变动,会将 row 类型的 log 写进 Kakfa,具体格局为 JSON:
- insert 操作
{
"data":[
{
"id":"338",
"city":"成都",
"province":"四川省"
}
],
"database":"qfbap_ods",
"es":1583394964000,
"id":2,
"isDdl":false,
"mysqlType":{"id":"int(11)",
"city":"varchar(256)",
"province":"varchar(256)"
},
"old":null,
"pkNames":["id"],
"sql":"","sqlType":{"id":4,"city":12,"province":12},
"table":"code_city",
"ts":1583394964361,
"type":"INSERT"
}
- update 操作
{
"data":[
{
"id":"338",
"city":"绵阳市",
"province":"四川省"
}
],
"database":"qfbap_ods",
"es":1583395177000,
"id":3,
"isDdl":false,
"mysqlType":{"id":"int(11)",
"city":"varchar(256)",
"province":"varchar(256)"
},
"old":[
{"city":"成都"}
],
"pkNames":["id"],
"sql":"","sqlType":{"id":4,"city":12,"province":12},
"table":"code_city",
"ts":1583395177408,
"type":"UPDATE"
}
- delete 操作
{
"data":[
{
"id":"338",
"city":"绵阳市",
"province":"四川省"
}
],
"database":"qfbap_ods",
"es":1583395333000,
"id":4,
"isDdl":false,
"mysqlType":{"id":"int(11)",
"city":"varchar(256)",
"province":"varchar(256)"
},
"old":null,
"pkNames":["id"],
"sql":"","sqlType":{"id":4,"city":12,"province":12},
"table":"code_city",
"ts":1583395333208,
"type":"DELETE"
}
JSON 日志格局解释
- data:最新的数据,为 JSON 数组,如果是插入则示意最新插入的数据,如果是更新,则示意更新后的最新数据,如果是删除,则示意被删除的数据
- database:数据库名称
- es:事件工夫,13 位的工夫戳
- id:事件操作的序列号,1,2,3…
- isDdl:是否是 DDL 操作
- mysqlType:字段类型
- old:旧数据
- pkNames:主键名称
- sql:SQL 语句
- sqlType:是通过 canal 转换解决的,比方 unsigned int 会被转化为 Long,unsigned long 会被转换为 BigDecimal
- table:表名
- ts:日志工夫
- type:操作类型,比方 DELETE,UPDATE,INSERT
小结
本文首先介绍了 MySQL binlog 日志的配置以及 Canal 的搭建,而后形容了通过 canal 数据传输到 Kafka 的配置,最初对 canal 解析之后的 JSON 数据进行了具体解释。本文是基于 Canal 与 Flink 实现数据实时增量同步的第一篇,在下一篇介绍如何应用 Flink 实现实时增量数据同步。
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包