关于后端:实战Spring-Boot-整合-阿里开源中间件-Canal-实现数据增量同步

3次阅读

共计 5868 个字符,预计需要花费 15 分钟才能阅读完成。

数据同步始终是一个令人头疼的问题。在业务量小,场景不多,数据量不大的状况下咱们可能会抉择在我的项目中间接写一些定时工作手动解决数据,例如从多个表将数据查出来,再汇总解决,再插入到相应的中央。
然而随着业务量增大,数据质变多以及各种简单场景下的分库分表的实现,使数据同步变得越来越艰难。
明天这篇文章应用阿里开源的中间件 Canal 解决数据增量同步的痛点。
文章目录如下:

Canal 是什么?
canal 译意为水道 / 管道 / 沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和生产。
从这句话了解到了什么?
基于 MySQL,并且通过 MySQL 日志进行的增量解析,这也就意味着对原有的业务代码齐全是无侵入性的。

工作原理:解析 MySQL 的 binlog 日志,提供增量数据。

基于日志增量订阅和生产的业务包含

数据库镜像
数据库实时备份
索引构建和实时保护 (拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理

以后的 canal 反对源端 MySQL 版本包含 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

官网文档:github.com/alibaba/can…

Canal 数据如何传输?
先来一张官网图:

Canal 分为服务端和客户端,这也是阿里罕用的套路,比方后面讲到的注册核心 Nacos:

服务端:负责解析 MySQL 的 binlog 日志,传递增量数据给客户端或者消息中间件
客户端:负责解析服务端传过来的数据,而后定制本人的业务解决。

目前为止反对的消息中间件很全面了,比方 Kafka、RocketMQ,RabbitMQ。
数据同步还有其余中间件吗?
有,当然有,还有一些开源的中间件也是相当不错的,比方 Bifrost。
常见的几款中间件的区别如下:

当然要我抉择的话,首选阿里的中间件 Canal。
Canal 服务端装置
服务端须要下载压缩包,下载地址:github.com/alibaba/can…
目前最新的是 v1.1.5,点击下载:

下载实现解压,目录如下:

本文应用 Canal+RabbitMQ 进行数据的同步,因而上面步骤齐全依照这个 base 进行。
1、关上 MySQL 的 binlog 日志
批改 MySQL 的日志文件,my.cnf 配置如下:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 抉择 ROW 模式
server_id=1 # 配置 MySQL replaction 须要定义,不要和 canal 的 slaveId 反复
复制代码
2、设置 MySQL 的配置
须要设置服务端配置文件中的 MySQL 配置,这样 Canal 能力晓得须要监听哪个库、哪个表的日志文件。
一个 Server 能够配置多个实例监听,Canal 性能默认自带的有个 example 实例,本篇就用 example 实例。如果减少实例,复制 example 文件夹内容到同级目录下,而后在 canal.properties 指定增加实例的名称。
批改 canal.deployer-1.1.5\conf\example\instance.properties 配置文件

url

canal.instance.master.address=127.0.0.1:3306

username/password

canal.instance.dbUsername=root
canal.instance.dbPassword=root

监听的数据库

canal.instance.defaultDatabaseName=test

监听的表,能够指定,多个用逗号宰割,这里正则是监听所有

canal.instance.filter.regex=.\..

复制代码
3、设置 RabbitMQ 的配置
服务端默认的传输方式是 tcp,须要在配置文件中设置 MQ 的相干信息。
这里须要批改两处配置文件,如下;
1、canal.deployer-1.1.5\conf\canal.properties
这个配置文件次要是设置 MQ 相干的配置,比方 URL,用户名、明码 …

传输方式:tcp, kafka, rocketMQ, rabbitMQ

canal.serverMode = rabbitMQ

RabbitMQ

rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host =/

exchange

rabbitmq.exchange =canal.exchange

用户名、明码

rabbitmq.username =guest
rabbitmq.password =guest

是否长久化

rabbitmq.deliveryMode = 2
复制代码
2、canal.deployer-1.1.5\conf\example\instance.properties
这个文件设置 MQ 的路由 KEY,这样能力路由到指定的队列中,如下:
canal.mq.topic=canal.routing.key
复制代码
4、RabbitMQ 新建 exchange 和 Queue
在 RabbitMQ 中须要新建一个 canal.exchange(必须和配置中的雷同)的 exchange 和一个名称为 canal.queue(名称随便)的队列。
其中绑定的路由 KEY 为:canal.routing.key(必须和配置中的雷同),如下图:

5、启动服务端
点击 bin 目录下的脚本,windows 间接双击 startup.bat,启动胜利如下:

6、测试
在本地数据库 test 中的 oauth_client_details 插入一条数据,如下:
INSERT INTO oauth_client_details VALUES (‘myjszl’, ‘res1’, ‘$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W’, ‘all’, ‘password,refresh_token,authorization_code,client_credentials,implicit’, ‘http://www.baidu.com’, NULL, 1000, 1000, NULL, ‘false’);
复制代码
此时查看 MQ 中的 canal.queue 曾经有了数据,如下:

其实就是一串 JSON 数据,这个 JSON 如下:
{

"data": [{
    "client_id": "myjszl",
    "resource_ids": "res1",
    "client_secret": "$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W",
    "scope": "all",
    "authorized_grant_types": "password,refresh_token,authorization_code,client_credentials,implicit",
    "web_server_redirect_uri": "http://www.baidu.com",
    "authorities": null,
    "access_token_validity": "1000",
    "refresh_token_validity": "1000",
    "additional_information": null,
    "autoapprove": "false"
}],
"database": "test",
"es": 1640337532000,
"id": 7,
"isDdl": false,
"mysqlType": {"client_id": "varchar(48)",
    "resource_ids": "varchar(256)",
    "client_secret": "varchar(256)",
    "scope": "varchar(256)",
    "authorized_grant_types": "varchar(256)",
    "web_server_redirect_uri": "varchar(256)",
    "authorities": "varchar(256)",
    "access_token_validity": "int(11)",
    "refresh_token_validity": "int(11)",
    "additional_information": "varchar(4096)",
    "autoapprove": "varchar(256)"
},
"old": null,
"pkNames": ["client_id"],
"sql": "","sqlType": {"client_id": 12,"resource_ids": 12,"client_secret": 12,"scope": 12,"authorized_grant_types": 12,"web_server_redirect_uri": 12,"authorities": 12,"access_token_validity": 4,"refresh_token_validity": 4,"additional_information": 12,"autoapprove": 12},
"table": "oauth_client_details",
"ts": 1640337532520,
"type": "INSERT"

}
复制代码
每个字段的意思曾经很分明了,有表名称、办法、参数、参数类型、参数值 …..
客户端要做的就是监听 MQ 获取 JSON 数据,而后将其解析进去,解决本人的业务逻辑。
Canal 客户端搭建
客户端很简略实现,要做的就是生产 Canal 服务端传递过去的音讯,监听 canal.queue 这个队列。
1、创立音讯实体类
MQ 传递过去的是 JSON 数据,当然要创立个实体类接收数据,如下:
/**

  • @author
  • Canal 音讯接管实体类
    */

@NoArgsConstructor
@Data
public class CanalMessage<T> {

@JsonProperty("type")
private String type;

@JsonProperty("table")
private String table;

@JsonProperty("data")
private List<T> data;

@JsonProperty("database")
private String database;

@JsonProperty("es")
private Long es;

@JsonProperty("id")
private Integer id;

@JsonProperty("isDdl")
private Boolean isDdl;

@JsonProperty("old")
private List<T> old;

@JsonProperty("pkNames")
private List<String> pkNames;

@JsonProperty("sql")
private String sql;

@JsonProperty("ts")
private Long ts;

}
复制代码
2、MQ 音讯监听业务
接下来就是监听队列,一旦有 Canal 服务端有数据推送可能及时的生产。
代码很简略,只是给出个接管的案例,具体的业务逻辑能够依据业务实现,如下:
import cn.hutool.json.JSONUtil;
import cn.myjszl.middle.ware.canal.mq.rabbit.model.CanalMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**

  • 监听 MQ 获取 Canal 增量的数据音讯
    */

@Component
@Slf4j
@RequiredArgsConstructor
public class CanalRabbitMQListener {

@RabbitListener(bindings = {
        @QueueBinding(value = @Queue(value = "canal.queue", durable = "true"),
                exchange = @Exchange(value = "canal.exchange"),
                key = "canal.routing.key"
        )
})
public void handleDataChange(String message) {
    // 将 message 转换为 CanalMessage
    CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class);
    String tableName = canalMessage.getTable();
    log.info("Canal 监听 {} 发生变化;明细:{}", tableName, message);
    //TODO 业务逻辑本人欠缺...............
}

}

复制代码
3、测试
上面向表中插入数据,看下接管的音讯是什么样的,SQL 如下:
INSERT INTO oauth_client_details
VALUES

('myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false');

复制代码
客户端转换后的音讯如下图:

上图能够看出所有的数据都曾经胜利接管到,只须要依据数据欠缺本人的业务逻辑即可。

总结
数据增量同步的开源工具并不只有 Canal 一种,依据本人的业务须要抉择适合的组件。

正文完
 0