两年前在我的项目上施行oracle etl同步时,客户就提出cdc(Change Data Capture)增量同步的需要,并且明确要求基于日志来捕捉数据变动。过后对于这方面的常识储备不够,只感觉这样的需要太刻薄。到了起初我施行分布式架构的计划越来越多,常常会思考如何保障数据的一致性,也让我回过头来,从新思考当年客户的需要。

本文的配角是canal,罕用来保障mysql到redis、elasticsearch等产品数据的增量同步。下文先讲canal的装置配置,再联合具体的代码,实现mysql到redis的实时同步。

1. 简介

1.1. 背景

分布式架构近些年很受推崇,咱们的零碎开发不再局限于一台mysql数据库,能够为了缓存而引入redis,为了搜寻而引入elasticsearch,等等,这些是分布式架构给咱们带来的便利性。

但带来的挑战也加大了,比方:微服务治理、分布式事务,明天还会讲到数据同步。以前对于关系型数据库的数据同步,曾经诞生了不少 ETL工具,比方咱们相熟的 oracle ODI。然而以当初微服务开发而论,还是不够灵便,咱们须要能够自在的将mysql数据同步到redis、elasticsearch等中央。这里就能够用到本文的配角 -- canal

1.2. canal

canal [k'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和生产。

晚期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需要,实现形式次要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐渐尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和生产业务。

基于日志增量订阅和生产的业务包含:

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时保护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

以后的 canal 反对源端 mysql 版本包含 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x 。

1.3. 工作原理

MySQL主备复制原理

MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,能够通过 show binlog events 进行查看)

MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

MySQL slave 重放 relay log 中事件,将数据变更反映它本人的数据

canal 工作原理

canal 模仿 MySQL slave 的交互协定,假装本人为 MySQL slave ,向 MySQL master 发送dump 协定

MySQL master 收到 dump 申请,开始推送 binary log 给 slave (即 canal )

canal 解析 binary log 对象(原始为 byte 流)

1.4. 长处

对于canal的长处,必定是要拿它和之前接触过 ETL工具做比拟。

  • 面向编程: 我用过的几个ETL工具都偏工具化。同步规定的自定义空间不大,作为开发人员更偏向于用编程的形式实现,我想那些用 spring cloud gateway 代替nginx 的人应该能了解。而 canal的客户端,则是齐全面向java编程的,开发起来更不便。
  • 增量同步: 大多ETL工具都专一于“全量同步”,对于实时性,也都靠设置定时策略来周期性执行,但 canal是专一于做实时“增量同步”的,而且它的做法也比拟好。不少ETL工具老的计划,是通过数据库trigger来实现增量同步的,会给数据库带来很大的压力,侵入性较高,而canal用的是新的计划,基于binlog日志来监听数据变动。

2. 装置配置

2.1. mysql配置

须要先开启mysql的 binlog 写入性能,配置 binlog-format 为 ROW 模式。这里批改 my.cnf 文件,增加下列配置:

log-bin=mysql-bin   # 开启 binlogbinlog-format=ROW   # 抉择 ROW 模式server_id=1        # 配置 MySQL replaction 须要定义,不要和 canal 的 slaveId 反复

重启mysql,用以下命令检查一下binlog是否正确启动:

mysql> show variables like 'log_bin%';+---------------------------------+----------------------------------+| Variable_name                   | Value                            |+---------------------------------+----------------------------------+| log_bin                         | ON                               || log_bin_basename                | /data/mysql/data/mysql-bin       || log_bin_index                   | /data/mysql/data/mysql-bin.index || log_bin_trust_function_creators | OFF                              || log_bin_use_v1_row_events       | OFF                              |+---------------------------------+----------------------------------+5 rows in set (0.00 sec)mysql> show variables like 'binlog_format%';+---------------+-------+| Variable_name | Value |+---------------+-------+| binlog_format | ROW   |+---------------+-------+1 row in set (0.00 sec)

创立一个mysql用户canal 并且赋近程链接权限权限。

CREATE USER canal IDENTIFIED BY 'canal';GRANT ALL PRIVILEGES ON test_canal.user TO 'canal'@'%';FLUSH PRIVILEGES;

2.2. canal.deployer装置配置

  1. 在 canal下载页找到对应版本的部署包(canal.deployer-1.x.x.tar.gz)。
  2. 解压安装包tar -zxvf canal.deployer-1.4.0.tar.gz,失去四个目录bin、conf、lib、logs。
  3. 批改配置conf/example/instance.properties,配置参数比拟多,上面就列几个常见的数据库配置信息
canal.instance.master.address = 127.0.0.1:3306 canal.instance.dbUsername = canal  canal.instance.dbPassword = canal
  1. 通过脚本启动 canal
# 启动sh bin/startup.sh# 敞开sh bin/stop.sh# 查看日志tail -500f logs/canal/canal.log# 查看具体实例日志tail -500f logs/example/example.log

3. 基于adapter同步

3.1. 简述

canal作为mysql的实时数据订阅组件,实现了对mysql binlog数据的抓取。

尽管阿里也开源了一个纯正从mysql同步数据到mysql的我的项目otter(github.com/alibaba/otter,基于canal的),实现了mysql的单向同步、双向同步等能力。然而咱们常常有从mysql同步数据到es、hbase等存储的需要,就须要用户本人用canal-client获取数据进行生产,比拟麻烦。

从1.1.1版本开始,canal实现了一个配套的落地模块,实现对canal订阅的音讯进行生产,就是client-adapter(github.com/alibaba/canal/wiki/ClientAdapter)。

目前的最新稳定版1.1.4版本中,client-adapter曾经实现了同步数据到RDS、ES、HBase的能力。

目前Adapter具备以下根本能力:

  • 对接上游音讯,包含kafka、rocketmq、canal-server
  • 实现mysql数据的增量同步
  • 实现mysql数据的全量同步
  • 上游写入反对rds、es、hbase

本文不关注这部分canal.adapter的配置,具体的配置形式,请参考github官网文档。

3.2. elasticsearch示例

简略贴一贴上游es的配置,批改启动器配置: application.yml

server:  port: 8081logging:  level:    com.alibaba.otter.canal.client.adapter.es: DEBUGspring:  jackson:    date-format: yyyy-MM-dd HH:mm:ss    time-zone: GMT+8    default-property-inclusion: non_nullcanal.conf:  canalServerHost: 127.0.0.1:11111  flatMessage: true  srcDataSources:    defaultDS:      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true      username: root      password: 121212  canalInstances:  - instance: example    adapterGroups:    - outAdapters:      - name: es        hosts: 127.0.0.1:9300               # es 集群地址, 逗号分隔        properties:          cluster.name: elasticsearch       # es cluster name

adapter将会主动加载 conf/es 下的所有.yml结尾的配置文件,再批改 conf/es/mytest_user.yml文件:

dataSourceKey: defaultDS        # 源数据源的key, 对应下面配置的srcDataSources中的值destination: example            # cannal的instance或者MQ的topicesMapping:  _index: mytest_user           # es 的索引名称  _type: _doc                   # es 的doc名称  _id: _id                      # es 的_id, 如果不配置该项必须配置上面的pk项_id则会由es主动调配#  pk: id                       # 如果不须要_id, 则须要指定一个属性为主键属性  # sql映射  sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,        a.c_time as _c_time, c.labels as _labels from user a        left join role b on b.id=a.role_id        left join (select user_id, group_concat(label order by id desc separator ';') as labels from label        group by user_id) c on c.user_id=a.id"#  objFields:#    _labels: array:;           # 数组或者对象属性, array:; 代表以;字段外面是以;分隔的#    _obj: obj:{"test":"123"}  etlCondition: "where a.c_time>='{0}'"     # etl 的条件参数  commitBatch: 3000                         # 提交批大小

sql映射阐明:

sql反对多表关联自由组合, 然而有肯定的限度:

  1. 主表不能为子查问语句
  2. 只能应用left outer join即最左表肯定要是主表
  3. 关联从表如果是子查问不能有多张表
  4. 主sql中不能有where查问条件(从表子查问中能够有where条件然而不举荐, 可能会造成数据同步的不统一, 比方批改了where条件中的字段内容)
  5. 关联条件只容许主外键的'='操作不能呈现其余常量判断比方: on a.role_id=b.id and b.statues=1
  6. 关联条件必须要有一个字段呈现在主查问语句中比方: on a.role_id=b.id 其中的 a.role_id 或者 b.id 必须呈现在主select语句中

Elastic Search的mapping 属性与sql的查问值将一一对应(不反对 select *), 比方: select a.id as _id, a.name, a.email as _email from user, 其中name将映射到es mapping的name field, _email将 映射到mapping的_email field, 这里以别名(如果有别名)作为最终的映射字段. 这里的_id能够填写到配置文件的 _id: _id映射.

1、单表映射索引示例
select a.id as _id, a.name, a.role_id, a.c_time from user a

该sql对应的es mapping示例:

{    "mytest_user": {        "mappings": {            "_doc": {                "properties": {                    "name": {                        "type": "text"                    },                    "role_id": {                        "type": "long"                    },                    "c_time": {                        "type": "date"                    }                }            }        }    }}
2、单表映射索引示例sql带函数或运算操作
select a.id as _id, concat(a.name,'_test') as name, a.role_id+10000 as role_id, a.c_time from user a

函数字段后必须跟上别名, 该sql对应的es mapping示例:

{    "mytest_user": {        "mappings": {            "_doc": {                "properties": {                    "name": {                        "type": "text"                    },                    "role_id": {                        "type": "long"                    },                    "c_time": {                        "type": "date"                    }                }            }        }    }}
3、多表映射(一对一, 多对一)索引示例sql:
select a.id as _id, a.name, a.role_id, b.role_name, a.c_time from user a left join role b on b.id = a.role_id

注:这里join操作只能是left outer join, 第一张表必须为主表!!该sql对应的es mapping示例:

{    "mytest_user": {        "mappings": {            "_doc": {                "properties": {                    "name": {                        "type": "text"                    },                    "role_id": {                        "type": "long"                    },                    "role_name": {                        "type": "text"                    },                    "c_time": {                        "type": "date"                    }                }            }        }    }}
4、多表映射(一对多)索引示例sql:
select a.id as _id, a.name, a.role_id, c.labels, a.c_time from user a left join (select user_id, group_concat(label order by id desc separator ';') as labels from label        group by user_id) c on c.user_id=a.id

注:left join 后的子查问只容许一张表, 即子查问中不能再蕴含子查问或者关联!!该sql对应的es mapping示例:

{    "mytest_user": {        "mappings": {            "_doc": {                "properties": {                    "name": {                        "type": "text"                    },                    "role_id": {                        "type": "long"                    },                    "c_time": {                        "type": "date"                    },                    "labels": {                        "type": "text"                    }                }            }        }    }}

4. 基于代码同步

如果你是同步到es、rds、hbase,然而adapter实现不了你的需要,能够用下列代码的形式实现。
如果你是想要同步到redis、mongo等数据库,因为adapter目前还不反对,同样能够用代码的形式实现。

如果你是用springboot开发,目前有两种常见的形式:

  1. 阿里原生的canal.client,比拟举荐这种,可参考GitHub官网文档。
  2. 集体基于canal.client开发的starter,可参考GitHub官网文档。

本文选用第一种形式,实现 canal 到 redis的实时同步。

4.1. 代码(redis同步)

pom.xml
        <!--lombok-->        <dependency>            <groupId>org.projectlombok</groupId>            <artifactId>lombok</artifactId>            <version>1.18.16</version>        </dependency>        <!-- canal-->        <dependency>            <groupId>com.alibaba.otter</groupId>            <artifactId>canal.client</artifactId>            <version>1.1.3</version>        </dependency>        <!--redis-->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-data-redis</artifactId>        </dependency>
RedisTableUtil.java
@Slf4j@Componentpublic class RedisTableUtil {    private static final String PK_NAME="id";    private static final String NULL_ID="NULL";    private final RedisTemplate redisTemplate;    public RedisTableUtil(RedisTemplate redisTemplate){        this.redisTemplate=redisTemplate;    }    /**     * 新增     * @param columnList     * @param databaseName     * @param tableName     */    public void insert(List<CanalEntry.Column> columnList,String databaseName,String tableName){        String keyName=this.generateKeyName(columnList,databaseName,tableName);        HashOperations hashOperations= redisTemplate.opsForHash();        columnList.stream()                .forEach((column -> {                    hashOperations.put(keyName,column.getName(),column.getValue());                }));    }    /**     * 删除     * @param columnList     * @param databaseName     * @param tableName     */    public void delete(List<CanalEntry.Column> columnList,String databaseName,String tableName){        String keyName=this.generateKeyName(columnList,databaseName,tableName);        redisTemplate.delete(keyName);    }    /**     * 更新     * @param columnList     * @param databaseName     * @param tableName     */    public void update(List<CanalEntry.Column> columnList,String databaseName,String tableName){        String keyName=this.generateKeyName(columnList,databaseName,tableName);        HashOperations hashOperations= redisTemplate.opsForHash();        columnList.stream()                .filter(CanalEntry.Column::getUpdated)                .forEach((column -> {                    hashOperations.put(keyName,column.getName(),column.getValue());                }));    }    /**     * 生成 行记录 key     * @param columnList     * @param databaseName     * @param tableName     * @return     */    private String generateKeyName(List<CanalEntry.Column> columnList,String databaseName,String tableName){        Optional<String> id= columnList.stream()                .filter(column -> PK_NAME.equals(column.getName()))                .map(CanalEntry.Column::getValue)                .findFirst();        return databaseName+"_"+tableName+"_"+id.orElse(NULL_ID);    }}
RedisConfig.java
@Configurationpublic class RedisConfig {    @Bean    public RedisTemplate redisTemplate(RedisTemplate redisTemplate){        RedisSerializer<String> stringSerializer = new StringRedisSerializer();        redisTemplate.setKeySerializer(stringSerializer);        redisTemplate.setValueSerializer(stringSerializer);        redisTemplate.setHashKeySerializer(stringSerializer);        redisTemplate.setHashValueSerializer(stringSerializer);        return redisTemplate;    }}
CanalServer.java
@Slf4j@Componentpublic class CanalServer {    private static final String THREAD_NAME_PREFIX="canalStart-";    private final RedisTableUtil redisTableUtil;    public CanalServer(RedisTableUtil redisTableUtil){        this.redisTableUtil=redisTableUtil;    }    /**     * 初始化     * 单线程启动 canal客户端     */    @PostConstruct    public void init() {        //须要开启一个新的线程来执行 canal 服务        Thread initThread = new CanalStartThread();        initThread.setName(THREAD_NAME_PREFIX);        initThread.start();    }    /**     * 定义 canal服务线程     */    public class CanalStartThread extends Thread {        @Override        public void run() {            // 创立链接            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111),                    "example", "", "");            connector.connect();            connector.subscribe(".*\\..*");            connector.rollback();            try {                while (true) {                    // 获取指定数量的数据                    Message message = connector.getWithoutAck(1000);                    long batchId = message.getId();                    if (batchId != -1 &&  message.getEntries().size() > 0) {                        entryHandler(message.getEntries());                    }                    connector.ack(batchId); // 提交确认                    Thread.sleep(1000);                }            }catch (Exception e){                log.error("Canal线程异样,已终止:"+e.getMessage());            } finally {                connector.disconnect();            }        }    }    /**     * canal 入口处理器     * @param entrys     */    private  void entryHandler( List<Entry> entrys) {        for (Entry entry : entrys) {            //操作事物 疏忽            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; }            CanalEntry.RowChange rowChage = null;            String databaseName=null;            String tableName=null;            try {                databaseName=entry.getHeader().getSchemaName();                tableName=entry.getHeader().getTableName();                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());            } catch (InvalidProtocolBufferException e) {                log.error("获取数据失败:"+e.getMessage());            }            //获取执行的事件            CanalEntry.EventType eventType = rowChage.getEventType();            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {                //删除操作                if (eventType.equals(CanalEntry.EventType.DELETE)) {                    redisTableUtil.delete(rowData.getBeforeColumnsList(),databaseName,tableName);                }                //增加操作                else if (eventType.equals(CanalEntry.EventType.INSERT)) {                    redisTableUtil.insert(rowData.getAfterColumnsList(),databaseName,tableName);                }                //批改操作                else if(eventType.equals(CanalEntry.EventType.UPDATE)) {                    redisTableUtil.insert(rowData.getAfterColumnsList(),databaseName,tableName);                }            }        }    }    }

CanalServer 中独自起了一个线程,每秒获取mysql日志。当监听到mysql表数据的新增、更新、删除操作时,会在redis中做出对应的数据操作,具体redis的更新逻辑在RedisTableUtil类中定义。

个别生产环境日志量大,能够将监听到的binlog事件推到消息中间件,再在消息中间件的生产端做上游数据同步的解决。

另外通过binlog监听到的数据库操作不止DML,其实还有DQL和DDL等,只不过代码中没有体现。因而能够设想的到,应用canal能实现的性能,远不止数据同步这点性能。