两年前在我的项目上施行 oracle etl 同步时,客户就提出 cdc(Change Data Capture)增量同步的需要,并且明确要求基于日志来捕捉数据变动。过后对于这方面的常识储备不够,只感觉这样的需要太刻薄。到了起初我施行分布式架构的计划越来越多,常常会思考如何保障数据的一致性,也让我回过头来,从新思考当年客户的需要。
本文的配角是 canal,罕用来保障 mysql 到 redis、elasticsearch 等产品数据的增量同步。下文先讲 canal 的装置配置,再联合具体的代码,实现 mysql 到 redis 的实时同步。
1. 简介
1.1. 背景
分布式架构近些年很受推崇,咱们的零碎开发不再局限于一台 mysql 数据库,能够为了缓存而引入 redis,为了搜寻而引入 elasticsearch,等等,这些是分布式架构给咱们带来的便利性。
但带来的挑战也加大了,比方:微服务治理、分布式事务,明天还会讲到数据同步。以前对于关系型数据库的数据同步,曾经诞生了不少 ETL 工具,比方咱们相熟的 oracle ODI。然而以当初微服务开发而论,还是不够灵便,咱们须要能够自在的将 mysql 数据同步到 redis、elasticsearch 等中央。这里就能够用到本文的配角 — canal。
1.2. canal
canal [kə’næl],译意为水道 / 管道 / 沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和生产。
晚期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需要,实现形式次要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐渐尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和生产业务。
基于日志增量订阅和生产的业务包含:
- 数据库镜像
- 数据库实时备份
- 索引构建和实时保护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
以后的 canal 反对源端 mysql 版本包含 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
1.3. 工作原理
MySQL 主备复制原理
MySQL master 将数据变更写入二进制日志(binary log, 其中记录叫做二进制日志事件 binary log events,能够通过 show binlog events 进行查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它本人的数据
canal 工作原理
canal 模仿 MySQL slave 的交互协定,假装本人为 MySQL slave,向 MySQL master 发送 dump 协定
MySQL master 收到 dump 申请,开始推送 binary log 给 slave (即 canal)
canal 解析 binary log 对象(原始为 byte 流)
1.4. 长处
对于 canal 的长处,必定是要拿它和之前接触过 ETL 工具做比拟。
- 面向编程: 我用过的几个 ETL 工具都偏工具化。同步规定的自定义空间不大,作为开发人员更偏向于用编程的形式实现,我想那些用 spring cloud gateway 代替 nginx 的人应该能了解。而 canal 的客户端,则是齐全面向 java 编程的,开发起来更不便。
- 增量同步: 大多 ETL 工具都专一于“全量同步”,对于实时性,也都靠设置定时策略来周期性执行,但 canal 是专一于做实时“增量同步”的,而且它的做法也比拟好。不少 ETL 工具老的计划,是通过数据库 trigger 来实现增量同步的,会给数据库带来很大的压力,侵入性较高,而 canal 用的是新的计划,基于 binlog 日志来监听数据变动。
2. 装置配置
2.1. mysql 配置
须要先开启 mysql 的 binlog 写入性能,配置 binlog-format 为 ROW 模式。这里批改 my.cnf 文件,增加下列配置:
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 抉择 ROW 模式
server_id=1 # 配置 MySQL replaction 须要定义,不要和 canal 的 slaveId 反复
重启 mysql,用以下命令检查一下 binlog 是否正确启动:
mysql> show variables like 'log_bin%';
+---------------------------------+----------------------------------+
| Variable_name | Value |
+---------------------------------+----------------------------------+
| log_bin | ON |
| log_bin_basename | /data/mysql/data/mysql-bin |
| log_bin_index | /data/mysql/data/mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
+---------------------------------+----------------------------------+
5 rows in set (0.00 sec)
mysql> show variables like 'binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.00 sec)
创立一个 mysql 用户 canal 并且赋近程链接权限权限。
CREATE USER canal IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON test_canal.user TO 'canal'@'%';
FLUSH PRIVILEGES;
2.2. canal.deployer 装置配置
- 在 canal 下载页找到对应版本的部署包(canal.deployer-1.x.x.tar.gz)。
- 解压安装包
tar -zxvf canal.deployer-1.4.0.tar.gz
,失去四个目录 bin、conf、lib、logs。 - 批改配置 conf/example/instance.properties,配置参数比拟多,上面就列几个常见的数据库配置信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
- 通过脚本启动 canal
# 启动
sh bin/startup.sh
# 敞开
sh bin/stop.sh
# 查看日志
tail -500f logs/canal/canal.log
# 查看具体实例日志
tail -500f logs/example/example.log
3. 基于 adapter 同步
3.1. 简述
canal 作为 mysql 的实时数据订阅组件,实现了对 mysql binlog 数据的抓取。
尽管阿里也开源了一个纯正从 mysql 同步数据到 mysql 的我的项目 otter(github.com/alibaba/otter,基于 canal 的),实现了 mysql 的单向同步、双向同步等能力。然而咱们常常有从 mysql 同步数据到 es、hbase 等存储的需要,就须要用户本人用 canal-client 获取数据进行生产,比拟麻烦。
从 1.1.1 版本开始,canal 实现了一个配套的落地模块,实现对 canal 订阅的音讯进行生产,就是 client-adapter(github.com/alibaba/canal/wiki/ClientAdapter)。
目前的最新稳定版 1.1.4 版本中,client-adapter 曾经实现了同步数据到 RDS、ES、HBase 的能力。
目前 Adapter 具备以下根本能力:
- 对接上游音讯,包含 kafka、rocketmq、canal-server
- 实现 mysql 数据的增量同步
- 实现 mysql 数据的全量同步
- 上游写入反对rds、es、hbase
本文不关注这部分 canal.adapter 的配置,具体的配置形式,请参考 github 官网文档。
3.2. elasticsearch 示例
简略贴一贴上游 es 的配置,批改启动器配置: application.yml
server:
port: 8081
logging:
level:
com.alibaba.otter.canal.client.adapter.es: DEBUG
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
canalServerHost: 127.0.0.1:11111
flatMessage: true
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
username: root
password: 121212
canalInstances:
- instance: example
adapterGroups:
- outAdapters:
- name: es
hosts: 127.0.0.1:9300 # es 集群地址, 逗号分隔
properties:
cluster.name: elasticsearch # es cluster name
adapter 将会主动加载 conf/es 下的所有.yml 结尾的配置文件,再批改 conf/es/mytest_user.yml 文件:
dataSourceKey: defaultDS # 源数据源的 key, 对应下面配置的 srcDataSources 中的值
destination: example # cannal 的 instance 或者 MQ 的 topic
esMapping:
_index: mytest_user # es 的索引名称
_type: _doc # es 的 doc 名称
_id: _id # es 的_id, 如果不配置该项必须配置上面的 pk 项_id 则会由 es 主动调配
# pk: id # 如果不须要_id, 则须要指定一个属性为主键属性
# sql 映射
sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
a.c_time as _c_time, c.labels as _labels from user a
left join role b on b.id=a.role_id
left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
group by user_id) c on c.user_id=a.id"
# objFields:
# _labels: array:; # 数组或者对象属性, array:; 代表以; 字段外面是以; 分隔的
# _obj: obj:{"test":"123"}
etlCondition: "where a.c_time>='{0}'" # etl 的条件参数
commitBatch: 3000 # 提交批大小
sql 映射阐明:
sql 反对多表关联自由组合, 然而有肯定的限度:
- 主表不能为子查问语句
- 只能应用 left outer join 即最左表肯定要是主表
- 关联从表如果是子查问不能有多张表
- 主 sql 中不能有 where 查问条件(从表子查问中能够有 where 条件然而不举荐, 可能会造成数据同步的不统一, 比方批改了 where 条件中的字段内容)
- 关联条件只容许主外键的 ’=’ 操作不能呈现其余常量判断比方: on a.role_id=b.id and b.statues=1
- 关联条件必须要有一个字段呈现在主查问语句中比方: on a.role_id=b.id 其中的 a.role_id 或者 b.id 必须呈现在主 select 语句中
Elastic Search 的 mapping 属性与 sql 的查问值将一一对应 (不反对 select *), 比方: select a.id as _id, a.name, a.email as _email from user, 其中 name 将映射到 es mapping 的 name field, _email 将 映射到 mapping 的_email field, 这里以别名(如果有别名) 作为最终的映射字段. 这里的_id 能够填写到配置文件的 _id: _id 映射.
1、单表映射索引示例
select a.id as _id, a.name, a.role_id, a.c_time from user a
该 sql 对应的 es mapping 示例:
{
"mytest_user": {
"mappings": {
"_doc": {
"properties": {
"name": {"type": "text"},
"role_id": {"type": "long"},
"c_time": {"type": "date"}
}
}
}
}
}
2、单表映射索引示例 sql 带函数或运算操作
select a.id as _id, concat(a.name,'_test') as name, a.role_id+10000 as role_id, a.c_time from user a
函数字段后必须跟上别名, 该 sql 对应的 es mapping 示例:
{
"mytest_user": {
"mappings": {
"_doc": {
"properties": {
"name": {"type": "text"},
"role_id": {"type": "long"},
"c_time": {"type": "date"}
}
}
}
}
}
3、多表映射 (一对一, 多对一) 索引示例 sql:
select a.id as _id, a.name, a.role_id, b.role_name, a.c_time from user a
left join role b on b.id = a.role_id
注: 这里 join 操作只能是 left outer join, 第一张表必须为主表!! 该 sql 对应的 es mapping 示例:
{
"mytest_user": {
"mappings": {
"_doc": {
"properties": {
"name": {"type": "text"},
"role_id": {"type": "long"},
"role_name": {"type": "text"},
"c_time": {"type": "date"}
}
}
}
}
}
4、多表映射 (一对多) 索引示例 sql:
select a.id as _id, a.name, a.role_id, c.labels, a.c_time from user a
left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
group by user_id) c on c.user_id=a.id
注:left join 后的子查问只容许一张表, 即子查问中不能再蕴含子查问或者关联!! 该 sql 对应的 es mapping 示例:
{
"mytest_user": {
"mappings": {
"_doc": {
"properties": {
"name": {"type": "text"},
"role_id": {"type": "long"},
"c_time": {"type": "date"},
"labels": {"type": "text"}
}
}
}
}
}
4. 基于代码同步
如果你是同步到 es、rds、hbase,然而 adapter 实现不了你的需要,能够用下列代码的形式实现。
如果你是想要同步到 redis、mongo 等数据库,因为 adapter 目前还不反对,同样能够用代码的形式实现。
如果你是用 springboot 开发,目前有两种常见的形式:
- 阿里原生的 canal.client,比拟举荐这种,可参考 GitHub 官网文档。
- 集体基于 canal.client 开发的 starter,可参考 GitHub 官网文档。
本文选用第一种形式,实现 canal 到 redis 的实时同步。
4.1. 代码(redis 同步)
pom.xml
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
<!-- canal-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.3</version>
</dependency>
<!--redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
RedisTableUtil.java
@Slf4j
@Component
public class RedisTableUtil {
private static final String PK_NAME="id";
private static final String NULL_ID="NULL";
private final RedisTemplate redisTemplate;
public RedisTableUtil(RedisTemplate redisTemplate){this.redisTemplate=redisTemplate;}
/**
* 新增
* @param columnList
* @param databaseName
* @param tableName
*/
public void insert(List<CanalEntry.Column> columnList,String databaseName,String tableName){String keyName=this.generateKeyName(columnList,databaseName,tableName);
HashOperations hashOperations= redisTemplate.opsForHash();
columnList.stream()
.forEach((column -> {hashOperations.put(keyName,column.getName(),column.getValue());
}));
}
/**
* 删除
* @param columnList
* @param databaseName
* @param tableName
*/
public void delete(List<CanalEntry.Column> columnList,String databaseName,String tableName){String keyName=this.generateKeyName(columnList,databaseName,tableName);
redisTemplate.delete(keyName);
}
/**
* 更新
* @param columnList
* @param databaseName
* @param tableName
*/
public void update(List<CanalEntry.Column> columnList,String databaseName,String tableName){String keyName=this.generateKeyName(columnList,databaseName,tableName);
HashOperations hashOperations= redisTemplate.opsForHash();
columnList.stream()
.filter(CanalEntry.Column::getUpdated)
.forEach((column -> {hashOperations.put(keyName,column.getName(),column.getValue());
}));
}
/**
* 生成 行记录 key
* @param columnList
* @param databaseName
* @param tableName
* @return
*/
private String generateKeyName(List<CanalEntry.Column> columnList,String databaseName,String tableName){Optional<String> id= columnList.stream()
.filter(column -> PK_NAME.equals(column.getName()))
.map(CanalEntry.Column::getValue)
.findFirst();
return databaseName+"_"+tableName+"_"+id.orElse(NULL_ID);
}
}
RedisConfig.java
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate redisTemplate(RedisTemplate redisTemplate){RedisSerializer<String> stringSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringSerializer);
redisTemplate.setValueSerializer(stringSerializer);
redisTemplate.setHashKeySerializer(stringSerializer);
redisTemplate.setHashValueSerializer(stringSerializer);
return redisTemplate;
}
}
CanalServer.java
@Slf4j
@Component
public class CanalServer {
private static final String THREAD_NAME_PREFIX="canalStart-";
private final RedisTableUtil redisTableUtil;
public CanalServer(RedisTableUtil redisTableUtil){this.redisTableUtil=redisTableUtil;}
/**
* 初始化
* 单线程启动 canal 客户端
*/
@PostConstruct
public void init() {
// 须要开启一个新的线程来执行 canal 服务
Thread initThread = new CanalStartThread();
initThread.setName(THREAD_NAME_PREFIX);
initThread.start();}
/**
* 定义 canal 服务线程
*/
public class CanalStartThread extends Thread {
@Override
public void run() {
// 创立链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111),
"example", "","");
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
try {while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(1000);
long batchId = message.getId();
if (batchId != -1 && message.getEntries().size() > 0) {entryHandler(message.getEntries());
}
connector.ack(batchId); // 提交确认
Thread.sleep(1000);
}
}catch (Exception e){log.error("Canal 线程异样,已终止:"+e.getMessage());
} finally {connector.disconnect();
}
}
}
/**
* canal 入口处理器
* @param entrys
*/
private void entryHandler(List<Entry> entrys) {for (Entry entry : entrys) {
// 操作事物 疏忽
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}
CanalEntry.RowChange rowChage = null;
String databaseName=null;
String tableName=null;
try {databaseName=entry.getHeader().getSchemaName();
tableName=entry.getHeader().getTableName();
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {log.error("获取数据失败:"+e.getMessage());
}
// 获取执行的事件
CanalEntry.EventType eventType = rowChage.getEventType();
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
// 删除操作
if (eventType.equals(CanalEntry.EventType.DELETE)) {redisTableUtil.delete(rowData.getBeforeColumnsList(),databaseName,tableName);
}
// 增加操作
else if (eventType.equals(CanalEntry.EventType.INSERT)) {redisTableUtil.insert(rowData.getAfterColumnsList(),databaseName,tableName);
}
// 批改操作
else if(eventType.equals(CanalEntry.EventType.UPDATE)) {redisTableUtil.insert(rowData.getAfterColumnsList(),databaseName,tableName);
}
}
}
}
}
CanalServer 中独自起了一个线程,每秒获取 mysql 日志。当监听到 mysql 表数据的新增、更新、删除操作时,会在 redis 中做出对应的数据操作,具体 redis 的更新逻辑在 RedisTableUtil 类中定义。
个别生产环境日志量大,能够将监听到的 binlog 事件推到消息中间件,再在消息中间件的生产端做上游数据同步的解决。
另外通过 binlog 监听到的数据库操作不止 DML,其实还有 DQL 和 DDL 等,只不过代码中没有体现。因而能够设想的到,应用 canal 能实现的性能,远不止数据同步这点性能。