乐趣区

关于flink:基于Canal与Flink实现数据实时增量同步一

canal 是阿里巴巴旗下的一款开源我的项目,纯 Java 开发。基于数据库增量日志解析,提供增量数据订阅 & 生产,目前次要反对了 MySQL(也反对 mariaDB)。

筹备

常见的 binlog 命令

# 是否启用 binlog 日志
show variables like 'log_bin';
# 查看 binlog 类型
show global variables like 'binlog_format';
# 查看具体的日志配置信息
show global variables like '%log%';
# mysql 数据存储目录
show variables like '%dir%';
# 查看 binlog 的目录
show global variables like "%log_bin%";
# 查看以后服务器应用的 biglog 文件及大小
show binary logs;
# 查看最新一个 binlog 日志文件名称和 Position
show master status;

配置 MySQL 的 binlog

对于自建 MySQL , 须要先开启 Binlog 写入性能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 抉择 ROW 模式
server_id=1 # 配置 MySQL replaction 须要定义,不要和 canal 的 slaveId 反复 

受权

受权 canal 链接 MySQL 账号具备作为 MySQL slave 的权限, 如果已有账户可间接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

部署 canal

装置 canal

  • 下载:点此下载
  • 解压缩
[kms@kms-1 softwares]$ tar -xzvf canal.deployer-1.1.4.tar.gz  -C /opt/modules/canal/
  • 目录构造
drwxr-xr-x 2 root root 4096 Mar  5 14:19 bin
drwxr-xr-x 5 root root 4096 Mar  5 13:54 conf
drwxr-xr-x 2 root root 4096 Mar  5 13:04 lib
drwxrwxrwx 4 root root 4096 Mar  5 14:19 logs

配置批改

  • 批改 conf/example/instance.properties,批改内容如下:
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,须要改成本人的数据库信息
canal.instance.master.address = kms-1.apache.com:3306 
#username/password,须要改成本人的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
# mq config,kafka topic 名称
canal.mq.topic=test
  • 批改 conf/canal.properties,批改内容如下:
# 配置 zookeeper 地址
canal.zkServers =kms-2:2181,kms-3:2181,kms-4:2181
# 可选项: tcp(默认), kafka, RocketMQ,canal.serverMode = kafka
# 配置 kafka 地址
canal.mq.servers = kms-2:9092,kms-3:9092,kms-4:9092

启动 canal

sh bin/startup.sh

敞开 canal

sh bin/stop.sh

部署 Canal Admin(可选)

canal-admin 设计上是为 canal 提供整体配置管理、节点运维等面向运维的性能,提供绝对敌对的 WebUI 操作界面,不便更多用户疾速和平安的操作。

要求

canal-admin 的限定依赖:

  • MySQL,用于存储配置和节点等相干数据
  • canal 版本,要求 >=1.1.4 (须要依赖 canal-server 提供面向 admin 的动静运维治理接口)

装置 canal-admin

  • 下载

    点此下载

  • 解压缩
[kms@kms-1 softwares]$ tar -xzvf canal.admin-1.1.4.tar.gz  -C /opt/modules/canal-admin/
  • 目录构造
drwxrwxr-x 2 kms kms 4096 Mar  6 11:25 bin
drwxrwxr-x 3 kms kms 4096 Mar  6 11:25 conf
drwxrwxr-x 2 kms kms 4096 Mar  6 11:25 lib
drwxrwxr-x 2 kms kms 4096 Sep  2  2019 logs
  • 配置批改
vi conf/application.yml
server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: kms-1:3306
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin
  • 初始化原数据库
mysql -uroot -p
# 导入初始化 SQL
#注:(1) 初始化 SQL 脚本里会默认创立 canal_manager 的数据库,倡议应用 root 等有超级权限的账号进行初始化 
#    (2)canal_manager.sql 默认会在 conf 目录下
> mysql> source /opt/modules/canal-admin/conf/canal_manager.sql
  • 启动 canal-admin
sh bin/startup.sh
  • 拜访

能够通过 http://kms-1:8089/ 拜访,默认明码:admin/123456

  • canal-server 端配置

应用 canal_local.properties 的配置笼罩 canal.properties, 将上面配置内容配置在 canal_local.properties 文件外面,就能够了。

# register ip
canal.register.ip =
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
  • 启动 canal-serve
sh bin/startup.sh  local

留神:先启 canal-server, 而后再启动 canal-admin,之后登陆 canal-admin 就能够增加 serve 和 instance 了。

启动 kafka 控制台消费者测试

bin/kafka-console-consumer.sh --bootstrap-server kms-2:9092,kms-3:9092,kms-4:9092  --topic test --from-beginning 

此时 MySQL 数据表若有变动,会将 row 类型的 log 写进 Kakfa,具体格局为 JSON:

  • insert 操作
{
    "data":[
        {
            "id":"338",
            "city":"成都",
            "province":"四川省"
        }
    ],
    "database":"qfbap_ods",
    "es":1583394964000,
    "id":2,
    "isDdl":false,
    "mysqlType":{"id":"int(11)",
        "city":"varchar(256)",
        "province":"varchar(256)"
    },
    "old":null,
    "pkNames":["id"],
    "sql":"","sqlType":{"id":4,"city":12,"province":12},
    "table":"code_city",
    "ts":1583394964361,
    "type":"INSERT"
}
  • update 操作
{
    "data":[
        {
            "id":"338",
            "city":"绵阳市",
            "province":"四川省"
        }
    ],
    "database":"qfbap_ods",
    "es":1583395177000,
    "id":3,
    "isDdl":false,
    "mysqlType":{"id":"int(11)",
        "city":"varchar(256)",
        "province":"varchar(256)"
    },
    "old":[
        {"city":"成都"}
    ],
    "pkNames":["id"],
    "sql":"","sqlType":{"id":4,"city":12,"province":12},
    "table":"code_city",
    "ts":1583395177408,
    "type":"UPDATE"
}
  • delete 操作
{
    "data":[
        {
            "id":"338",
            "city":"绵阳市",
            "province":"四川省"
        }
    ],
    "database":"qfbap_ods",
    "es":1583395333000,
    "id":4,
    "isDdl":false,
    "mysqlType":{"id":"int(11)",
        "city":"varchar(256)",
        "province":"varchar(256)"
    },
    "old":null,
    "pkNames":["id"],
    "sql":"","sqlType":{"id":4,"city":12,"province":12},
    "table":"code_city",
    "ts":1583395333208,
    "type":"DELETE"
}

JSON 日志格局解释

  • data:最新的数据,为 JSON 数组,如果是插入则示意最新插入的数据,如果是更新,则示意更新后的最新数据,如果是删除,则示意被删除的数据
  • database:数据库名称
  • es:事件工夫,13 位的工夫戳
  • id:事件操作的序列号,1,2,3…
  • isDdl:是否是 DDL 操作
  • mysqlType:字段类型
  • old:旧数据
  • pkNames:主键名称
  • sql:SQL 语句
  • sqlType:是通过 canal 转换解决的,比方 unsigned int 会被转化为 Long,unsigned long 会被转换为 BigDecimal
  • table:表名
  • ts:日志工夫
  • type:操作类型,比方 DELETE,UPDATE,INSERT

小结

本文首先介绍了 MySQL binlog 日志的配置以及 Canal 的搭建,而后形容了通过 canal 数据传输到 Kafka 的配置,最初对 canal 解析之后的 JSON 数据进行了具体解释。本文是基于 Canal 与 Flink 实现数据实时增量同步的第一篇,在下一篇介绍如何应用 Flink 实现实时增量数据同步。


公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

退出移动版