乐趣区

关于数据:万字长文解密数据异构最佳实践含完整代码实现

写在后面

在当今互联网行业,尤其是当初分布式、微服务开发环境下,为了进步搜寻效率,以及搜寻的精准度,会大量应用 Redis、Memcached 等 NoSQL 数据库,也会应用大量的 Solr、Elasticsearch 等全文检索服务和搜索引擎。那么,这个时候,就会有一个问题须要咱们来思考和解决:那就是数据同步的问题!如何将实时变动的数据库中的数据同步到 Redis/Memcached 或者 Solr/Elasticsearch 中呢?

互联网背景下的数据同步需要

在当今互联网行业,尤其是当初分布式、微服务开发环境下,为了进步搜寻效率,以及搜寻的精准度,会大量应用 Redis、Memcached 等 NoSQL 数据库,也会应用大量的 Solr、Elasticsearch 等全文检索服务。那么,这个时候,就会有一个问题须要咱们来思考和解决:那就是数据同步的问题!如何将实时变动的数据库中的数据同步到 Redis/Memcached 或者 Solr/Elasticsearch 中呢?

例如,咱们在分布式环境下向数据库中一直的写入数据,而咱们读数据可能须要从 Redis、Memcached 或者 Elasticsearch、Solr 等服务中读取。那么,数据库与各个服务中数据的实时同步问题,成为了咱们亟待解决的问题。

试想,因为业务须要,咱们引入了 Redis、Memcached 或者 Elasticsearch、Solr 等服务。使得咱们的应用程序可能会从不同的服务中读取数据,如下图所示。

实质上讲,无论咱们引入了何种服务或者中间件,数据最终都是从咱们的 MySQL 数据库中读取进去的。那么,问题来了,如何将 MySQL 中的数据实时同步到其余的服务或者中间件呢?

留神:为了更好的阐明问题,前面的内容以 MySQL 数据库中的数据同步到 Solr 索引库为例进行阐明。

数据同步解决方案

1. 在业务代码中同步

在减少、批改、删除之后,执行操作 Solr 索引库的逻辑代码。例如上面的代码片段。

public ResponseResult updateStatus(Long[] ids, String status){
    try{goodsService.updateStatus(ids, status);
        if("status_success".equals(status)){List<TbItem> itemList = goodsService.getItemList(ids, status);
            itemSearchService.importList(itemList);
            return new ResponseResult(true, "批改状态胜利")
        }
    }catch(Exception e){return new ResponseResult(false, "批改状态失败");
    }
}

长处:

操作简便。

毛病:

业务耦合度高。

执行效率变低。

2. 定时工作同步

在数据库中执行完减少、批改、删除操作后,通过定时工作定时的将数据库的数据同步到 Solr 索引库中。

定时工作技术有:SpringTask,Quartz。

哈哈,还有我开源的 mykit-delay 框架,开源地址为:https://github.com/sunshinely…。

这里执行定时工作时,须要留神的一个技巧是:第一次执行定时工作时,从 MySQL 数据库中以工夫字段进行倒序排列查问相应的数据,并记录以后查问数据的工夫字段的最大值,当前每次执行定时工作查问数据的时候,只有按工夫字段倒序查问数据表中的工夫字段大于上次记录的工夫值的数据,并且记录本次工作查问出的工夫字段的最大值即可,从而不须要再次查问数据表中的所有数据。

留神:这里所说的工夫字段指的是标识数据更新的工夫字段,也就是说,应用定时工作同步数据时,为了防止每次执行工作都会进行全表扫描,最好是在数据表中减少一个更新记录的工夫字段。

长处:

同步 Solr 索引库的操作与业务代码齐全解耦。

毛病:

数据的实时性并不高。

3. 通过 MQ 实现同步

在数据库中执行完减少、批改、删除操作后,向 MQ 中发送一条音讯,此时,同步程序作为 MQ 中的消费者,从音讯队列中获取音讯,而后执行同步 Solr 索引库的逻辑。

咱们能够应用下图来简略的标识通过 MQ 实现数据同步的过程。

咱们能够应用如下代码实现这个过程。

public ResponseResult updateStatus(Long[] ids, String status){
    try{goodsService.updateStatus(ids, status);
        if("status_success".equals(status)){List<TbItem> itemList = goodsService.getItemList(ids, status);
            final String jsonString = JSON.toJSONString(itemList);
            jmsTemplate.send(queueSolr, new MessageCreator(){
                @Override
                public Message createMessage(Session session) throws JMSException{return session.createTextMessage(jsonString);
                }
            });
        }
        return new ResponseResult(true, "批改状态胜利");
    }catch(Exception e){return new ResponseResult(false, "批改状态失败");
    }
}

长处:

业务代码解耦,并且可能做到准实时。

毛病:

须要在业务代码中退出发送音讯到 MQ 的代码,数据调用接口耦合。

4. 通过 Canal 实现实时同步

Canal 是阿里巴巴开源的一款数据库日志增量解析组件,通过 Canal 来解析数据库的日志信息,来检测数据库中表构造和数据的变动,从而更新 Solr 索引库。

应用 Canal 能够做到业务代码齐全解耦,API 齐全解耦,能够做到准实时。

Canal 简介

阿里巴巴 MySQL 数据库 binlog 增量订阅与生产组件,基于数据库增量日志解析,提供增量数据订阅与生产,目前次要反对了 MySQL。

Canal 开源地址:https://github.com/alibaba/canal。

Canal 工作原理

MySQL 主从复制的实现

从上图能够看出,主从复制次要分成三步:

  • Master 节点将数据的扭转记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,能够通过 show binlog events 进行查看)。
  • Slave 节点将 Master 节点的二进制日志事件(binary log events)拷贝到它的中继日志(relay log)。
  • Slave 节点重做中继日志中的事件将扭转反映到本人自身的数据库中。

Canal 外部原理

首先,咱们来看下 Canal 的原理图,如下所示。

原理大抵形容如下:

  • Canal 模仿 MySQL slave 的交互协定,假装本人为 MySQL Slave,向 MySQL Master 发送 dump 协定
  • MySQL Master 收到 dump 申请,开始推送 binary log 给 Slave (即 Canal)
  • Canal 解析 binary log 对象(原始为 byte 流)

Canal 内部结构

阐明如下:

  • Server:代表一个 Canal 运行实例,对应一个 JVM 过程。
  • Instance:对应一个数据队列(1 个 Server 对应 1 个或者多个 Instance)。

接下来,咱们再来看下 Instance 下的子模块,如下所示。

  • EventParser:数据源接入,模仿 Slave 协定和 Master 节点进行交互,协定解析。
  • EventSink:EventParser 和 EventStore 的连接器,对数据进行过滤、加工、归并和散发等解决。
  • EventSore:数据存储。
  • MetaManager:增量订阅和生产信息管理。

Canal 环境筹备

设置 MySQL 近程拜访

grant all privileges on *.* to 'root'@'%' identified by '123456';
flush privileges;

MySQL 配置

留神:这里的 MySQL 是基于 5.7 版本进行阐明的。

Canal 的原理基于 MySQL binlog 技术,所以,要想应用 Canal 就要开启 MySQL 的 binlog 写入性能,倡议配置 binlog 的模式为 row。

能够在 MySQL 命令行输出如下命令来查看 binlog 的模式。

SHOW VARIABLES LIKE 'binlog_format';

执行成果如下所示。

能够看到,在 MySQL 中默认的 binlog 格局为 STATEMENT,这里咱们须要将 STATEMENT 批改为 ROW。批改 /etc/my.cnf 文件。

vim /etc/my.cnf

在 [mysqld] 上面新增如下三项配置。

log-bin=mysql-bin  #开启 MySQL 二进制日志
binlog_format=ROW #将二进制日志的格局设置为 ROW
server_id=1 #server_id 须要惟一,不能与 Canal 的 slaveId 反复

批改完 my.cnf 文件后,须要重启 MySQL 服务。

service mysqld restart

接下来,咱们再次查看 binlog 模式。

SHOW VARIABLES LIKE 'binlog_format';

能够看到,此时,MySQL 的 binlog 模式曾经被设置为 ROW 了。

MySQL 创立用户受权

Canal 的原理是模式本人为 MySQL Slave,所以肯定要设置 MySQL Slave 的相干权限。这里,须要创立一个主从同步的账户,并且赋予这个账户相干的权限。

CREATE USER canal@'localhost' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost';
FLUSH PRIVILEGES;

Canal 部署装置

下载 Canal

这里,咱们以 Canal 1.1.1 版本进行阐明,小伙伴们能够到链接 https://github.com/alibaba/canal/releases/tag/canal-1.1.1 下载 Canal 1.1.1 版本。

上传解压

将下载好的 Canal 安装包,上传到服务器,并执行如下命令进行解压

mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.1.tar.gz -C /usr/local/canal/

解压后的目录如下所示。

各目录的阐明如下:

  • bin:存储可执行脚本。
  • conf:寄存配置文件。
  • lib:寄存其余依赖或者第三方库。
  • logs:寄存的是日志文件。

批改配置文件

在 Canal 的 conf 目录下有一个 canal.properties 文件,这个文件中配置的是 Canal Server 相干的配置,在这个文件中有如下一行配置。

canal.destinations=example

这里的 example 就相当于 Canal 的一个 Instance,能够在这里配置多个 Instance,多个 Instance 之间以逗号分隔即可。同时,这里的 example 也对应着 Canal 的 conf 目录下的一个文件夹。也就是说,Canal 中的每个 Instance 实例都对应着 conf 目录下的一个子目录。

接下来,咱们须要批改 Canal 的 conf 目录下的 example 目录的一个配置文件 instance.properties。

vim instance.properties

批改如下配置项。

#################################################################
## canal slaveId, 留神:不要与 MySQL 的 server_id 反复
canal.instance.mysql.slaveId = 1234

#position info,须要改成本人的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =

#username/password,须要改成本人的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =canaldb
canal.instance.connectionCharset = UTF-8

#table regex
canal.instance.filter.regex = canaldb\\..*
#################################################################

选项含意:

  • canal.instance.mysql.slaveId : mysql 集群配置中的 serverId 概念,须要保障和以后 mysql 集群中 id 惟一;
  • canal.instance.master.address: mysql 主库链接地址;
  • canal.instance.dbUsername : mysql 数据库帐号;
  • canal.instance.dbPassword : mysql 数据库明码;
  • canal.instance.defaultDatabaseName : mysql 链接时默认数据库;
  • canal.instance.connectionCharset : mysql 数据解析编码;
  • canal.instance.filter.regex : mysql 数据解析关注的表,Perl 正则表达式.

启动 Canal

配置完 Canal 后,就能够启动 Canal 了。进入到 Canal 的 bin 目录下,输出如下命令启动 Canal。

./startup.sh

测试 Canal

导入并批改源码

这里,咱们应用 Canal 的源码进行测试,下载 Canal 的源码后,将其导入到 IDEA 中。

接下来,咱们找到 example 下的 SimpleCanalClientTest 类进行测试。这个类的源码如下所示。

package com.alibaba.otter.canal.example;

import java.net.InetSocketAddress;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;

/**
 * 单机模式的测试例子
 * 
 * @author jianghang 2013-4-15 下午 04:19:20
 * @version 1.0.4
 */
public class SimpleCanalClientTest extends AbstractCanalClientTest {public SimpleCanalClientTest(String destination){super(destination);
     }

    public static void main(String args[]) {
        // 依据 ip,间接创立链接,无 HA 的性能
        String destination = "example";
        String ip = AddressUtils.getHostIp();
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 11111),
                destination,
                "canal",
                "canal");

        final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination);
        clientTest.setConnector(connector);
        clientTest.start();
        Runtime.getRuntime().addShutdownHook(new Thread() {public void run() {
                try {logger.info("## stop the canal client");
                    clientTest.stop();} catch (Throwable e) {logger.warn("##something goes wrong when stopping canal:", e);
                } finally {logger.info("## canal client is down.");
                }
            }

        });
    }
}

能够看到,这个类中,应用的 destination 为 example。在这个类中,咱们只须要将 IP 地址批改为 Canal Server 的 IP 即可。

具体为:将如下一行代码。

String ip = AddressUtils.getHostIp();

批改为:

String ip = "192.168.175.100"

因为咱们在配置 Canal 时,没有指定用户名和明码,所以,咱们还须要将如下代码。

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 11111),
    destination,
    "canal",
    "canal");

批改为:

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 11111),
    destination,
    "","");

批改实现后,运行 main 办法启动程序。

测试数据变更

接下来,在 MySQL 中创立一个 canaldb 数据库。

create database canaldb;

此时会在 IDEA 的命令行输入相干的日志信息。

****************************************************
* Batch Id: [7] ,count : [3] , memsize : [149] , Time : 2020-08-05 23:25:35
* Start : [mysql-bin.000007:6180:1540286735000(2020-08-05 23:25:35)] 
* End : [mysql-bin.000007:6356:1540286735000(2020-08-05 23:25:35)] 
****************************************************

接下来,我在 canaldb 数据库中创立数据表,并对数据表中的数据进行增删改查,程序输入的日志信息如下所示。

# 在 mysql 进行数据变更后,这里会显示 mysql 的 bin 日志。****************************************************
* Batch Id: [7] ,count : [3] , memsize : [149] , Time : 2020-08-05 23:25:35
* Start : [mysql-bin.000007:6180:1540286735000(2020-08-05 23:25:35)] 
* End : [mysql-bin.000007:6356:1540286735000(2020-08-05 23:25:35)] 
****************************************************

================> binlog[mysql-bin.000007:6180] , executeTime : 1540286735000(2020-08-05 23:25:35) , gtid : () , delay : 393ms
 BEGIN ----> Thread id: 43
----------------> binlog[mysql-bin.000007:6311] , name[canal,canal_table] , eventType : DELETE , executeTime : 1540286735000(2020-08-05 23:25:35) , gtid : () , delay : 393 ms
id : 8    type=int(10) unsigned
name : 512    type=varchar(255)
----------------
 END ----> transaction id: 249
================> binlog[mysql-bin.000007:6356] , executeTime : 1540286735000(2020-08-05 23:25:35) , gtid : () , delay : 394ms

****************************************************
* Batch Id: [8] ,count : [3] , memsize : [149] , Time : 2020-08-05 23:25:35
* Start : [mysql-bin.000007:6387:1540286869000(2020-08-05 23:25:49)] 
* End : [mysql-bin.000007:6563:1540286869000(2020-08-05 23:25:49)] 
****************************************************

================> binlog[mysql-bin.000007:6387] , executeTime : 1540286869000(2020-08-05 23:25:49) , gtid : () , delay : 976ms
 BEGIN ----> Thread id: 43
----------------> binlog[mysql-bin.000007:6518] , name[canal,canal_table] , eventType : INSERT , executeTime : 1540286869000(2020-08-05 23:25:49) , gtid : () , delay : 976 ms
id : 21    type=int(10) unsigned    update=true
name : aaa    type=varchar(255)    update=true
----------------
 END ----> transaction id: 250
================> binlog[mysql-bin.000007:6563] , executeTime : 1540286869000(2020-08-05 23:25:49) , gtid : () , delay : 977ms

****************************************************
* Batch Id: [9] ,count : [3] , memsize : [161] , Time : 2020-08-05 23:26:22
* Start : [mysql-bin.000007:6594:1540286902000(2020-08-05 23:26:22)] 
* End : [mysql-bin.000007:6782:1540286902000(2020-08-05 23:26:22)] 
****************************************************

================> binlog[mysql-bin.000007:6594] , executeTime : 1540286902000(2020-08-05 23:26:22) , gtid : () , delay : 712ms
 BEGIN ----> Thread id: 43
----------------> binlog[mysql-bin.000007:6725] , name[canal,canal_table] , eventType : UPDATE , executeTime : 1540286902000(2020-08-05 23:26:22) , gtid : () , delay : 712 ms
id : 21    type=int(10) unsigned
name : aaac    type=varchar(255)    update=true
----------------
 END ----> transaction id: 252
================> binlog[mysql-bin.000007:6782] , executeTime : 1540286902000(2020-08-05 23:26:22) , gtid : () , delay : 713ms

数据同步实现

需要

将数据库数据的变动, 通过 canal 解析 binlog 日志, 实时更新到 solr 的索引库中。

具体实现

创立工程

创立 Maven 工程 mykit-canal-demo,并在 pom.xml 文件中增加如下配置。

<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.0.24</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.protocol</artifactId>
        <version>1.0.24</version>
    </dependency>
    <dependency>
        <groupId>commons-lang</groupId>
        <artifactId>commons-lang</artifactId>
        <version>2.6</version>
    </dependency>
    <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-mapper-asl</artifactId>
        <version>1.8.9</version>
    </dependency>

    <dependency>
        <groupId>org.apache.solr</groupId>
        <artifactId>solr-solrj</artifactId>
        <version>4.10.3</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.9</version>
        <scope>test</scope>
    </dependency>

</dependencies>

创立 log4j 配置文件xml

在工程的 src/main/resources 目录下创立 log4j.properties 文件,内容如下所示。

log4j.rootCategory=debug, CONSOLE

# CONSOLE is set to be a ConsoleAppender using a PatternLayout.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-6r [%15.15t] %-5p %30.30c %x - %m\n

# LOGFILE is set to be a File appender using a PatternLayout.
# log4j.appender.LOGFILE=org.apache.log4j.FileAppender
# log4j.appender.LOGFILE.File=d:\axis.log
# log4j.appender.LOGFILE.Append=true
# log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout
# log4j.appender.LOGFILE.layout.ConversionPattern=%d{ISO8601} %-6r [%15.15t] %-5p %30.30c %x - %m\n

创立实体类

在 io.mykit.canal.demo.bean 包下创立一个 Book 实体类,用于测试 Canal 的数据传输,如下所示。

package io.mykit.canal.demo.bean;
import org.apache.solr.client.solrj.beans.Field;
import java.util.Date;
public class Book implements Serializable {
    private static final long serialVersionUID = -6350345408771427834L;{@Field("id")
    private Integer id;

    @Field("book_name")
    private String name;

    @Field("book_author")
    private String author;

    @Field("book_publishtime")
    private Date publishtime;

    @Field("book_price")
    private Double price;

    @Field("book_publishgroup")
    private String publishgroup;

    public Integer getId() {return id;}

    public void setId(Integer id) {this.id = id;}

    public String getName() {return name;}

    public void setName(String name) {this.name = name;}

    public String getAuthor() {return author;}

    public void setAuthor(String author) {this.author = author;}

    public Date getPublishtime() {return publishtime;}

    public void setPublishtime(Date publishtime) {this.publishtime = publishtime;}

    public Double getPrice() {return price;}

    public void setPrice(Double price) {this.price = price;}

    public String getPublishgroup() {return publishgroup;}

    public void setPublishgroup(String publishgroup) {this.publishgroup = publishgroup;}

    @Override
    public String toString() {
        return "Book{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", author='" + author + '\'' +
                ", publishtime=" + publishtime +
                ", price=" + price +
                ", publishgroup='" + publishgroup + '\'' +
                '}';
    }
}

其中,咱们在 Book 实体类中,应用 Solr 的注解 @Field 定义了实体类字段与 Solr 域之间的关系。

各种工具类的实现

接下来,咱们就在 io.mykit.canal.demo.utils 包下创立各种工具类。

  • BinlogValue

用于存储 binlog 剖析的每行每列的 value 值,代码如下所示。

package io.mykit.canal.demo.utils;
import java.io.Serializable;
/**
 * 
 * ClassName: BinlogValue <br/> 
 * 
 * binlog 剖析的每行每列的 value 值;<br>
 * 新增数据:beforeValue 和 value 均为现有值;<br>
 * 批改数据:beforeValue 是批改前的值;value 为批改后的值;<br>
 * 删除数据:beforeValue 和 value 均是删除前的值;这个比拟非凡次要是为了删除数据时不便获取删除前的值 <br>
 */
public class BinlogValue implements Serializable {

    private static final long serialVersionUID = -6350345408773943086L;
    
    private String value;
    private String beforeValue;
    
    /**
     * binlog 剖析的每行每列的 value 值;<br>
     * 新增数据:value:为现有值;<br>
     * 批改数据:value 为批改后的值;<br>
     * 删除数据:value 是删除前的值;这个比拟非凡次要是为了删除数据时不便获取删除前的值 <br>
     */
    public String getValue() {return value;}
    public void setValue(String value) {this.value = value;}
    
    /**
     * binlog 剖析的每行每列的 beforeValue 值;<br>
     * 新增数据:beforeValue 为现有值;<br>
     * 批改数据:beforeValue 是批改前的值;<br>
     * 删除数据:beforeValue 为删除前的值;<br>
     */
    public String getBeforeValue() {return beforeValue;}
    public void setBeforeValue(String beforeValue) {this.beforeValue = beforeValue;}
}
  • CanalDataParser

用于解析数据,代码如下所示。

package io.mykit.canal.demo.utils;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;
import com.google.protobuf.InvalidProtocolBufferException;

/**
 * 解析数据
 */
public class CanalDataParser {
    
    protected static final String DATE_FORMAT   = "yyyy-MM-dd HH:mm:ss";
    protected static final String yyyyMMddHHmmss = "yyyyMMddHHmmss";
    protected static final String yyyyMMdd      = "yyyyMMdd";
    protected static final String SEP           = SystemUtils.LINE_SEPARATOR;
    protected static String  context_format     = null;
    protected static String  row_format         = null;
    protected static String  transaction_format = null;
    protected static String row_log = null;
    
    private static Logger logger = LoggerFactory.getLogger(CanalDataParser.class);
    
    static {
        context_format = SEP + "****************************************************" + SEP;
        context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}" + SEP;
        context_format += "* Start : [{}]" + SEP;
        context_format += "* End : [{}]" + SEP;
        context_format += "****************************************************" + SEP;

        row_format = SEP
                     + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {} , delay : {}ms"
                     + SEP;

        transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {} , delay : {}ms" + SEP;

        row_log = "schema[{}], table[{}]";
    }

    public static List<InnerBinlogEntry> convertToInnerBinlogEntry(Message message) {List<InnerBinlogEntry> innerBinlogEntryList = new ArrayList<InnerBinlogEntry>();
        
        if(message == null) {logger.info("接管到空的 message; 疏忽");
            return innerBinlogEntryList;
        }
        
        long batchId = message.getId();
        int size = message.getEntries().size();
        if (batchId == -1 || size == 0) {logger.info("接管到空的 message[size=" + size + "]; 疏忽");
            return innerBinlogEntryList;
        }

        printLog(message, batchId, size);
        List<Entry> entrys = message.getEntries();

        // 输入日志
        for (Entry entry : entrys) {long executeTime = entry.getHeader().getExecuteTime();
            long delayTime = new Date().getTime() - executeTime;
            
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
                    TransactionBegin begin = null;
                    try {begin = TransactionBegin.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                    // 打印事务头信息,执行的线程 id,事务耗时
                    logger.info("BEGIN ----> Thread id: {}",  begin.getThreadId());
                    logger.info(transaction_format, new Object[] {entry.getHeader().getLogfileName(),
                                String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });

                } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    TransactionEnd end = null;
                    try {end = TransactionEnd.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                    // 打印事务提交信息,事务 id
                    logger.info("END ----> transaction id: {}", end.getTransactionId());
                    logger.info(transaction_format,
                        new Object[] {entry.getHeader().getLogfileName(),  String.valueOf(entry.getHeader().getLogfileOffset()),
                                String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });
                }
                continue;
            }

            // 解析后果
            if (entry.getEntryType() == EntryType.ROWDATA) {
                RowChange rowChage = null;
                try {rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                }

                EventType eventType = rowChage.getEventType();

                logger.info(row_format, new Object[] {entry.getHeader().getLogfileName(),
                            String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
                            entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });

                // 组装数据后果
                if (eventType == EventType.INSERT || eventType == EventType.DELETE || eventType == EventType.UPDATE) {String schemaName = entry.getHeader().getSchemaName();
                    String tableName = entry.getHeader().getTableName();
                    List<Map<String, BinlogValue>> rows = parseEntry(entry);

                    InnerBinlogEntry innerBinlogEntry = new InnerBinlogEntry();
                    innerBinlogEntry.setEntry(entry);
                    innerBinlogEntry.setEventType(eventType);
                    innerBinlogEntry.setSchemaName(schemaName);
                    innerBinlogEntry.setTableName(tableName.toLowerCase());
                    innerBinlogEntry.setRows(rows);

                    innerBinlogEntryList.add(innerBinlogEntry);
                } else {logger.info("存在 INSERT INSERT UPDATE 操作之外的 SQL [" + eventType.toString() + "]");
                }
                continue;
            }
        }
        return innerBinlogEntryList;
    }

    private static List<Map<String, BinlogValue>> parseEntry(Entry entry) {List<Map<String, BinlogValue>> rows = new ArrayList<Map<String, BinlogValue>>();
        try {String schemaName = entry.getHeader().getSchemaName();
            String tableName = entry.getHeader().getTableName();
            RowChange rowChage = RowChange.parseFrom(entry.getStoreValue());
            EventType eventType = rowChage.getEventType();

            // 解决每个 Entry 中的每行数据
            for (RowData rowData : rowChage.getRowDatasList()) {StringBuilder rowlog = new StringBuilder("rowlog schema[" + schemaName + "], table[" + tableName + "], event[" + eventType.toString() + "]");
                
                Map<String, BinlogValue> row = new HashMap<String, BinlogValue>();
                List<Column> beforeColumns = rowData.getBeforeColumnsList();
                List<Column> afterColumns = rowData.getAfterColumnsList();
                beforeColumns = rowData.getBeforeColumnsList();
                if (eventType == EventType.DELETE) {//delete
                    for(Column column : beforeColumns) {BinlogValue binlogValue = new BinlogValue();
                        binlogValue.setValue(column.getValue());
                        binlogValue.setBeforeValue(column.getValue());
                        row.put(column.getName(), binlogValue);
                    }
                } else if(eventType == EventType.UPDATE) {//update
                    for(Column column : beforeColumns) {BinlogValue binlogValue = new BinlogValue();
                        binlogValue.setBeforeValue(column.getValue());
                        row.put(column.getName(), binlogValue);
                    }
                    for(Column column : afterColumns) {BinlogValue binlogValue = row.get(column.getName());
                        if(binlogValue == null) {binlogValue = new BinlogValue();
                        }
                        binlogValue.setValue(column.getValue());
                        row.put(column.getName(), binlogValue);
                    }
                } else { // insert
                    for(Column column : afterColumns) {BinlogValue binlogValue = new BinlogValue();
                        binlogValue.setValue(column.getValue());
                        binlogValue.setBeforeValue(column.getValue());
                        row.put(column.getName(), binlogValue);
                    }
                } 
               
                rows.add(row);
                String rowjson = JacksonUtil.obj2str(row);
                
                logger.info("#################################### Data Parse Result ####################################");
                logger.info(rowlog + "," + rowjson);
                logger.info("#################################### Data Parse Result ####################################");
                logger.info("");
            }
        } catch (InvalidProtocolBufferException e) {throw new RuntimeException("parseEntry has an error , data:" + entry.toString(), e);
        }
        return rows;
    }

    private static void printLog(Message message, long batchId, int size) {
        long memsize = 0;
        for (Entry entry : message.getEntries()) {memsize += entry.getHeader().getEventLength();}

        String startPosition = null;
        String endPosition = null;
        if (!CollectionUtils.isEmpty(message.getEntries())) {startPosition = buildPositionForDump(message.getEntries().get(0));
            endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));
        }

        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        logger.info(context_format, new Object[] {batchId, size, memsize, format.format(new Date()), startPosition, endPosition });
    }

    private static String buildPositionForDump(Entry entry) {long time = entry.getHeader().getExecuteTime();
        Date date = new Date(time);
        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        return entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":" + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";
    }
}
  • DateUtils

工夫工具类,代码如下所示。

package io.mykit.canal.demo.utils;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class DateUtils {
    
    private static final String FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss";
    
    private static SimpleDateFormat sdf = new SimpleDateFormat(FORMAT_PATTERN);
    
    public static Date parseDate(String datetime) throws ParseException{if(datetime != null && !"".equals(datetime)){return sdf.parse(datetime);
        }
        return null;
    }
    
    
    public static String formatDate(Date datetime) throws ParseException{if(datetime != null){return sdf.format(datetime);
        }
        return null;
    }
    
    public static Long formatStringDateToLong(String datetime) throws ParseException{if(datetime != null && !"".equals(datetime)){Date d =  sdf.parse(datetime);
            return d.getTime();}
        return null;
    }
    
    public static Long formatDateToLong(Date datetime) throws ParseException{if(datetime != null){return datetime.getTime();
        }
        return null;
    }
}
  • InnerBinlogEntry

Binlog 实体类,代码如下所示。

package io.mykit.canal.demo.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;

public class InnerBinlogEntry {
    
    /**
     * canal 原生的 Entry
     */
    private Entry entry;
    
    /**
     * 该 Entry 归属于的表名
     */
    private String tableName;
    
    /**
     * 该 Entry 归属数据库名
     */
    private String schemaName;
    
    /**
     * 该 Entry 本次的操作类型,对应 canal 原生的枚举;EventType.INSERT; EventType.UPDATE; EventType.DELETE;
     */
    private EventType eventType;
    
    private List<Map<String, BinlogValue>> rows = new ArrayList<Map<String, BinlogValue>>();
    
    
    public Entry getEntry() {return entry;}
    public void setEntry(Entry entry) {this.entry = entry;}
    public String getTableName() {return tableName;}
    public void setTableName(String tableName) {this.tableName = tableName;}
    public EventType getEventType() {return eventType;}
    public void setEventType(EventType eventType) {this.eventType = eventType;}
    public String getSchemaName() {return schemaName;}
    public void setSchemaName(String schemaName) {this.schemaName = schemaName;}
    public List<Map<String, BinlogValue>> getRows() {return rows;}
    public void setRows(List<Map<String, BinlogValue>> rows) {this.rows = rows;}
}
  • JacksonUtil

Json 工具类,代码如下所示。

package io.mykit.canal.demo.utils;

import java.io.IOException;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;


public class JacksonUtil {private static ObjectMapper mapper = new ObjectMapper();

    public static String obj2str(Object obj) {
        String json = null;
        try {json = mapper.writeValueAsString(obj);
        } catch (JsonGenerationException e) {e.printStackTrace();
        } catch (JsonMappingException e) {e.printStackTrace();
        } catch (IOException e) {e.printStackTrace();
        }
        return json;
    }

    public static <T> T str2obj(String content, Class<T> valueType) {
        try {return mapper.readValue(content, valueType);
        } catch (JsonParseException e) {e.printStackTrace();
        } catch (JsonMappingException e) {e.printStackTrace();
        } catch (IOException e) {e.printStackTrace();
        }
        return null;
    }
}

同步程序的实现

筹备好实体类和工具类后,咱们就能够编写同步程序来实现 MySQL 数据库中的数据实时同步到 Solr 索引库了,咱们在 io.mykit.canal.demo.main 包中常见 MykitCanalDemoSync 类,代码如下所示。

package io.mykit.canal.demo.main;

import io.mykit.canal.demo.bean.Book;
import io.mykit.canal.demo.utils.BinlogValue;
import io.mykit.canal.demo.utils.CanalDataParser;
import io.mykit.canal.demo.utils.DateUtils;
import io.mykit.canal.demo.utils.InnerBinlogEntry;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.text.ParseException;
import java.util.List;
import java.util.Map;

public class SyncDataBootStart {private static Logger logger = LoggerFactory.getLogger(SyncDataBootStart.class);

    public static void main(String[] args) throws Exception {

        String hostname = "192.168.175.100";
        Integer port = 11111;
        String destination = "example";

        // 获取 CanalServer 连贯
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostname, port), destination, "","");

        // 连贯 CanalServer
        canalConnector.connect();

        // 订阅 Destination
        canalConnector.subscribe();

        // 轮询拉取数据
        Integer batchSize = 5*1024;
        while (true){Message message = canalConnector.getWithoutAck(batchSize);

            long messageId = message.getId();
            int size = message.getEntries().size();

            if(messageId == -1 || size == 0){
                try {Thread.sleep(1000);
                } catch (InterruptedException e) {e.printStackTrace();
                }
            }else{
                // 进行数据同步
                //1. 解析 Message 对象
                List<InnerBinlogEntry> innerBinlogEntries = CanalDataParser.convertToInnerBinlogEntry(message);

                //2. 将解析后的数据信息 同步到 Solr 的索引库中.
                syncDataToSolr(innerBinlogEntries);
            }

            // 提交确认
            canalConnector.ack(messageId);

        }

    }
    private static void syncDataToSolr(List<InnerBinlogEntry> innerBinlogEntries) throws Exception {
        // 获取 solr 的连贯
        SolrServer solrServer = new HttpSolrServer("http://192.168.175.101:8080/solr");

        // 遍历数据汇合 , 依据数据汇合中的数据信息, 来决定执行减少, 批改 , 删除操作 .
        if(innerBinlogEntries != null){for (InnerBinlogEntry innerBinlogEntry : innerBinlogEntries) {CanalEntry.EventType eventType = innerBinlogEntry.getEventType();

                // 如果是 Insert, update , 则须要同步数据到 solr 索引库
                if(eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE){List<Map<String, BinlogValue>> rows = innerBinlogEntry.getRows();
                    if(rows != null){for (Map<String, BinlogValue> row : rows) {BinlogValue id = row.get("id");
                            BinlogValue name = row.get("name");
                            BinlogValue author = row.get("author");
                            BinlogValue publishtime = row.get("publishtime");
                            BinlogValue price = row.get("price");
                            BinlogValue publishgroup = row.get("publishgroup");

                            Book book = new Book();
                            book.setId(Integer.parseInt(id.getValue()));
                            book.setName(name.getValue());
                            book.setAuthor(author.getValue());
                            book.setPrice(Double.parseDouble(price.getValue()));
                            book.setPublishgroup(publishgroup.getValue());
                            book.setPublishtime(DateUtils.parseDate(publishtime.getValue()));


                            // 导入数据到 solr 索引库
                            solrServer.addBean(book);
                            solrServer.commit();}
                    }

                }else if(eventType == CanalEntry.EventType.DELETE){
                    // 如果是 Delete 操作, 则须要删除 solr 索引库中的数据 .
                    List<Map<String, BinlogValue>> rows = innerBinlogEntry.getRows();
                    if(rows != null){for (Map<String, BinlogValue> row : rows) {BinlogValue id = row.get("id");

                            // 依据 ID 删除 solr 的索引库
                            solrServer.deleteById(id.getValue());
                            solrServer.commit();}
                    }

                }
            }
        }
    }
}

接下来,启动 SyncDataBootStart 类的 main 办法,监听 Canal Server,而 Canal Server 监听 MySQL binlog 的日志变动,一旦 MySQL 的 binlog 日志发生变化,则 SyncDataBootStart 会立即收到变更信息,并将变更信息解析成 Book 对象实时更新到 Solr 库中。如果在 MySQL 数据库中删除了数据,则也会实时删除 Solr 库中的数据。

局部参考 Canal 官网文档:https://github.com/alibaba/canal。

好了,明天就到这儿吧,我是冰河,咱们下期见~~

退出移动版