写在后面

在当今互联网行业,尤其是当初分布式、微服务开发环境下,为了进步搜寻效率,以及搜寻的精准度,会大量应用Redis、Memcached等NoSQL数据库,也会应用大量的Solr、Elasticsearch等全文检索服务和搜索引擎。那么,这个时候,就会有一个问题须要咱们来思考和解决:那就是数据同步的问题!如何将实时变动的数据库中的数据同步到Redis/Memcached或者Solr/Elasticsearch中呢?

互联网背景下的数据同步需要

在当今互联网行业,尤其是当初分布式、微服务开发环境下,为了进步搜寻效率,以及搜寻的精准度,会大量应用Redis、Memcached等NoSQL数据库,也会应用大量的Solr、Elasticsearch等全文检索服务。那么,这个时候,就会有一个问题须要咱们来思考和解决:那就是数据同步的问题!如何将实时变动的数据库中的数据同步到Redis/Memcached或者Solr/Elasticsearch中呢?

例如,咱们在分布式环境下向数据库中一直的写入数据,而咱们读数据可能须要从Redis、Memcached或者Elasticsearch、Solr等服务中读取。那么,数据库与各个服务中数据的实时同步问题,成为了咱们亟待解决的问题。

试想,因为业务须要,咱们引入了Redis、Memcached或者Elasticsearch、Solr等服务。使得咱们的应用程序可能会从不同的服务中读取数据,如下图所示。

实质上讲,无论咱们引入了何种服务或者中间件,数据最终都是从咱们的MySQL数据库中读取进去的。那么,问题来了,如何将MySQL中的数据实时同步到其余的服务或者中间件呢?

留神:为了更好的阐明问题,前面的内容以MySQL数据库中的数据同步到Solr索引库为例进行阐明。

数据同步解决方案

1.在业务代码中同步

在减少、批改、删除之后,执行操作Solr索引库的逻辑代码。例如上面的代码片段。

public ResponseResult updateStatus(Long[] ids, String status){    try{        goodsService.updateStatus(ids, status);        if("status_success".equals(status)){            List<TbItem> itemList = goodsService.getItemList(ids, status);            itemSearchService.importList(itemList);            return new ResponseResult(true, "批改状态胜利")        }    }catch(Exception e){        return new ResponseResult(false, "批改状态失败");    }}

长处:

操作简便。

毛病:

业务耦合度高。

执行效率变低。

2.定时工作同步

在数据库中执行完减少、批改、删除操作后,通过定时工作定时的将数据库的数据同步到Solr索引库中。

定时工作技术有:SpringTask,Quartz。

哈哈,还有我开源的mykit-delay框架,开源地址为:https://github.com/sunshinely...。

这里执行定时工作时,须要留神的一个技巧是:第一次执行定时工作时,从MySQL数据库中以工夫字段进行倒序排列查问相应的数据,并记录以后查问数据的工夫字段的最大值,当前每次执行定时工作查问数据的时候,只有按工夫字段倒序查问数据表中的工夫字段大于上次记录的工夫值的数据,并且记录本次工作查问出的工夫字段的最大值即可,从而不须要再次查问数据表中的所有数据。

留神:这里所说的工夫字段指的是标识数据更新的工夫字段,也就是说,应用定时工作同步数据时,为了防止每次执行工作都会进行全表扫描,最好是在数据表中减少一个更新记录的工夫字段。

长处:

同步Solr索引库的操作与业务代码齐全解耦。

毛病:

数据的实时性并不高。

3.通过MQ实现同步

在数据库中执行完减少、批改、删除操作后,向MQ中发送一条音讯,此时,同步程序作为MQ中的消费者,从音讯队列中获取音讯,而后执行同步Solr索引库的逻辑。

咱们能够应用下图来简略的标识通过MQ实现数据同步的过程。

咱们能够应用如下代码实现这个过程。

public ResponseResult updateStatus(Long[] ids, String status){    try{        goodsService.updateStatus(ids, status);        if("status_success".equals(status)){            List<TbItem> itemList = goodsService.getItemList(ids, status);            final String jsonString = JSON.toJSONString(itemList);            jmsTemplate.send(queueSolr, new MessageCreator(){                @Override                public Message createMessage(Session session) throws JMSException{                    return session.createTextMessage(jsonString);                }            });        }        return new ResponseResult(true, "批改状态胜利");    }catch(Exception e){        return new ResponseResult(false, "批改状态失败");    }}

长处:

业务代码解耦,并且可能做到准实时。

毛病:

须要在业务代码中退出发送音讯到MQ的代码,数据调用接口耦合。

4.通过Canal实现实时同步

Canal是阿里巴巴开源的一款数据库日志增量解析组件,通过Canal来解析数据库的日志信息,来检测数据库中表构造和数据的变动,从而更新Solr索引库。

应用Canal能够做到业务代码齐全解耦,API齐全解耦,能够做到准实时。

Canal简介

阿里巴巴MySQL数据库binlog增量订阅与生产组件,基于数据库增量日志解析,提供增量数据订阅与生产,目前次要反对了MySQL。

Canal开源地址:https://github.com/alibaba/canal。

Canal工作原理

MySQL主从复制的实现

从上图能够看出,主从复制次要分成三步:

  • Master节点将数据的扭转记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,能够通过show binlog events进行查看)。
  • Slave节点将Master节点的二进制日志事件(binary log events)拷贝到它的中继日志(relay log)。
  • Slave节点重做中继日志中的事件将扭转反映到本人自身的数据库中。

Canal外部原理

首先,咱们来看下Canal的原理图,如下所示。

原理大抵形容如下:

  • Canal 模仿 MySQL slave 的交互协定,假装本人为 MySQL Slave ,向 MySQL Master 发送dump 协定
  • MySQL Master 收到 dump 申请,开始推送 binary log 给 Slave (即 Canal )
  • Canal 解析 binary log 对象(原始为 byte 流)

Canal内部结构

阐明如下:

  • Server:代表一个Canal运行实例,对应一个JVM过程。
  • Instance:对应一个数据队列(1个Server对应1个或者多个Instance)。

接下来,咱们再来看下Instance下的子模块,如下所示。

  • EventParser:数据源接入,模仿Slave协定和Master节点进行交互,协定解析。
  • EventSink:EventParser和EventStore的连接器,对数据进行过滤、加工、归并和散发等解决。
  • EventSore:数据存储。
  • MetaManager:增量订阅和生产信息管理。

Canal环境筹备

设置MySQL近程拜访

grant all privileges on *.* to 'root'@'%' identified by '123456';flush privileges;

MySQL配置

留神:这里的MySQL是基于5.7版本进行阐明的。

Canal的原理基于MySQL binlog技术,所以,要想应用Canal就要开启MySQL的binlog写入性能,倡议配置binlog的模式为row。

能够在MySQL命令行输出如下命令来查看binlog的模式。

SHOW VARIABLES LIKE 'binlog_format';

执行成果如下所示。

能够看到,在MySQL中默认的binlog格局为STATEMENT,这里咱们须要将STATEMENT批改为ROW。批改/etc/my.cnf文件。

vim /etc/my.cnf

在[mysqld]上面新增如下三项配置。

log-bin=mysql-bin  #开启MySQL二进制日志binlog_format=ROW #将二进制日志的格局设置为ROWserver_id=1 #server_id须要惟一,不能与Canal的slaveId反复

批改完my.cnf文件后,须要重启MySQL服务。

service mysqld restart

接下来,咱们再次查看binlog模式。

SHOW VARIABLES LIKE 'binlog_format';

能够看到,此时,MySQL的binlog模式曾经被设置为ROW了。

MySQL创立用户受权

Canal的原理是模式本人为MySQL Slave,所以肯定要设置MySQL Slave的相干权限。这里,须要创立一个主从同步的账户,并且赋予这个账户相干的权限。

CREATE USER canal@'localhost' IDENTIFIED BY 'canal';GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost';FLUSH PRIVILEGES;

Canal部署装置

下载Canal

这里,咱们以Canal 1.1.1版本进行阐明,小伙伴们能够到链接 https://github.com/alibaba/canal/releases/tag/canal-1.1.1 下载Canal 1.1.1版本。

上传解压

将下载好的Canal安装包,上传到服务器,并执行如下命令进行解压

mkdir -p /usr/local/canaltar -zxvf canal.deployer-1.1.1.tar.gz -C /usr/local/canal/

解压后的目录如下所示。

各目录的阐明如下:

  • bin:存储可执行脚本。
  • conf:寄存配置文件。
  • lib:寄存其余依赖或者第三方库。
  • logs:寄存的是日志文件。

批改配置文件

在Canal的conf目录下有一个canal.properties文件,这个文件中配置的是Canal Server相干的配置,在这个文件中有如下一行配置。

canal.destinations=example

这里的example就相当于Canal的一个Instance,能够在这里配置多个Instance,多个Instance之间以逗号分隔即可。同时,这里的example也对应着Canal的conf目录下的一个文件夹。也就是说,Canal中的每个Instance实例都对应着conf目录下的一个子目录。

接下来,咱们须要批改Canal的conf目录下的example目录的一个配置文件instance.properties。

vim instance.properties

批改如下配置项。

################################################################### canal slaveId,留神:不要与MySQL的server_id反复canal.instance.mysql.slaveId = 1234#position info,须要改成本人的数据库信息canal.instance.master.address = 127.0.0.1:3306canal.instance.master.journal.name =canal.instance.master.position =canal.instance.master.timestamp =#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#username/password,须要改成本人的数据库信息canal.instance.dbUsername = canalcanal.instance.dbPassword = canalcanal.instance.defaultDatabaseName =canaldbcanal.instance.connectionCharset = UTF-8#table regexcanal.instance.filter.regex = canaldb\\..*#################################################################

选项含意:

  • canal.instance.mysql.slaveId : mysql集群配置中的serverId概念,须要保障和以后mysql集群中id惟一;
  • canal.instance.master.address: mysql主库链接地址;
  • canal.instance.dbUsername : mysql数据库帐号;
  • canal.instance.dbPassword : mysql数据库明码;
  • canal.instance.defaultDatabaseName : mysql链接时默认数据库;
  • canal.instance.connectionCharset : mysql 数据解析编码;
  • canal.instance.filter.regex : mysql 数据解析关注的表,Perl正则表达式.

启动Canal

配置完Canal后,就能够启动Canal了。进入到Canal的bin目录下,输出如下命令启动Canal。

./startup.sh

测试Canal

导入并批改源码

这里,咱们应用Canal的源码进行测试,下载Canal的源码后,将其导入到IDEA中。

接下来,咱们找到example下的SimpleCanalClientTest类进行测试。这个类的源码如下所示。

package com.alibaba.otter.canal.example;import java.net.InetSocketAddress;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.common.utils.AddressUtils;/** * 单机模式的测试例子 *  * @author jianghang 2013-4-15 下午04:19:20 * @version 1.0.4 */public class SimpleCanalClientTest extends AbstractCanalClientTest {    public SimpleCanalClientTest(String destination){           super(destination);     }    public static void main(String args[]) {        // 依据ip,间接创立链接,无HA的性能        String destination = "example";        String ip = AddressUtils.getHostIp();        CanalConnector connector = CanalConnectors.newSingleConnector(            new InetSocketAddress(ip, 11111),                destination,                "canal",                "canal");        final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination);        clientTest.setConnector(connector);        clientTest.start();        Runtime.getRuntime().addShutdownHook(new Thread() {            public void run() {                try {                    logger.info("## stop the canal client");                    clientTest.stop();                } catch (Throwable e) {                    logger.warn("##something goes wrong when stopping canal:", e);                } finally {                    logger.info("## canal client is down.");                }            }        });    }}

能够看到,这个类中,应用的destination为example。在这个类中,咱们只须要将IP地址批改为Canal Server的IP即可。

具体为:将如下一行代码。

String ip = AddressUtils.getHostIp();

批改为:

String ip = "192.168.175.100"

因为咱们在配置Canal时,没有指定用户名和明码,所以,咱们还须要将如下代码。

CanalConnector connector = CanalConnectors.newSingleConnector(    new InetSocketAddress(ip, 11111),    destination,    "canal",    "canal");

批改为:

CanalConnector connector = CanalConnectors.newSingleConnector(    new InetSocketAddress(ip, 11111),    destination,    "",    "");

批改实现后,运行main办法启动程序。

测试数据变更

接下来,在MySQL中创立一个canaldb数据库。

create database canaldb;

此时会在IDEA的命令行输入相干的日志信息。

***************************************************** Batch Id: [7] ,count : [3] , memsize : [149] , Time : 2020-08-05 23:25:35* Start : [mysql-bin.000007:6180:1540286735000(2020-08-05 23:25:35)] * End : [mysql-bin.000007:6356:1540286735000(2020-08-05 23:25:35)] ****************************************************

接下来,我在canaldb数据库中创立数据表,并对数据表中的数据进行增删改查,程序输入的日志信息如下所示。

#在mysql进行数据变更后,这里会显示mysql的bin日志。***************************************************** Batch Id: [7] ,count : [3] , memsize : [149] , Time : 2020-08-05 23:25:35* Start : [mysql-bin.000007:6180:1540286735000(2020-08-05 23:25:35)] * End : [mysql-bin.000007:6356:1540286735000(2020-08-05 23:25:35)] ****************************************************================> binlog[mysql-bin.000007:6180] , executeTime : 1540286735000(2020-08-05 23:25:35) , gtid : () , delay : 393ms BEGIN ----> Thread id: 43----------------> binlog[mysql-bin.000007:6311] , name[canal,canal_table] , eventType : DELETE , executeTime : 1540286735000(2020-08-05 23:25:35) , gtid : () , delay : 393 msid : 8    type=int(10) unsignedname : 512    type=varchar(255)---------------- END ----> transaction id: 249================> binlog[mysql-bin.000007:6356] , executeTime : 1540286735000(2020-08-05 23:25:35) , gtid : () , delay : 394ms***************************************************** Batch Id: [8] ,count : [3] , memsize : [149] , Time : 2020-08-05 23:25:35* Start : [mysql-bin.000007:6387:1540286869000(2020-08-05 23:25:49)] * End : [mysql-bin.000007:6563:1540286869000(2020-08-05 23:25:49)] ****************************************************================> binlog[mysql-bin.000007:6387] , executeTime : 1540286869000(2020-08-05 23:25:49) , gtid : () , delay : 976ms BEGIN ----> Thread id: 43----------------> binlog[mysql-bin.000007:6518] , name[canal,canal_table] , eventType : INSERT , executeTime : 1540286869000(2020-08-05 23:25:49) , gtid : () , delay : 976 msid : 21    type=int(10) unsigned    update=truename : aaa    type=varchar(255)    update=true---------------- END ----> transaction id: 250================> binlog[mysql-bin.000007:6563] , executeTime : 1540286869000(2020-08-05 23:25:49) , gtid : () , delay : 977ms***************************************************** Batch Id: [9] ,count : [3] , memsize : [161] , Time : 2020-08-05 23:26:22* Start : [mysql-bin.000007:6594:1540286902000(2020-08-05 23:26:22)] * End : [mysql-bin.000007:6782:1540286902000(2020-08-05 23:26:22)] ****************************************************================> binlog[mysql-bin.000007:6594] , executeTime : 1540286902000(2020-08-05 23:26:22) , gtid : () , delay : 712ms BEGIN ----> Thread id: 43----------------> binlog[mysql-bin.000007:6725] , name[canal,canal_table] , eventType : UPDATE , executeTime : 1540286902000(2020-08-05 23:26:22) , gtid : () , delay : 712 msid : 21    type=int(10) unsignedname : aaac    type=varchar(255)    update=true---------------- END ----> transaction id: 252================> binlog[mysql-bin.000007:6782] , executeTime : 1540286902000(2020-08-05 23:26:22) , gtid : () , delay : 713ms

数据同步实现

需要

将数据库数据的变动, 通过canal解析binlog日志, 实时更新到solr的索引库中。

具体实现

创立工程

创立Maven工程mykit-canal-demo,并在pom.xml文件中增加如下配置。

<dependencies>    <dependency>        <groupId>com.alibaba.otter</groupId>        <artifactId>canal.client</artifactId>        <version>1.0.24</version>    </dependency>    <dependency>        <groupId>com.alibaba.otter</groupId>        <artifactId>canal.protocol</artifactId>        <version>1.0.24</version>    </dependency>    <dependency>        <groupId>commons-lang</groupId>        <artifactId>commons-lang</artifactId>        <version>2.6</version>    </dependency>    <dependency>        <groupId>org.codehaus.jackson</groupId>        <artifactId>jackson-mapper-asl</artifactId>        <version>1.8.9</version>    </dependency>    <dependency>        <groupId>org.apache.solr</groupId>        <artifactId>solr-solrj</artifactId>        <version>4.10.3</version>    </dependency>    <dependency>        <groupId>junit</groupId>        <artifactId>junit</artifactId>        <version>4.9</version>        <scope>test</scope>    </dependency></dependencies>

创立log4j配置文件xml

在工程的src/main/resources目录下创立log4j.properties文件,内容如下所示。

log4j.rootCategory=debug, CONSOLE# CONSOLE is set to be a ConsoleAppender using a PatternLayout.log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppenderlog4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayoutlog4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-6r [%15.15t] %-5p %30.30c %x - %m\n# LOGFILE is set to be a File appender using a PatternLayout.# log4j.appender.LOGFILE=org.apache.log4j.FileAppender# log4j.appender.LOGFILE.File=d:\axis.log# log4j.appender.LOGFILE.Append=true# log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout# log4j.appender.LOGFILE.layout.ConversionPattern=%d{ISO8601} %-6r [%15.15t] %-5p %30.30c %x - %m\n

创立实体类

在io.mykit.canal.demo.bean包下创立一个Book实体类,用于测试Canal的数据传输,如下所示。

package io.mykit.canal.demo.bean;import org.apache.solr.client.solrj.beans.Field;import java.util.Date;public class Book implements Serializable {    private static final long serialVersionUID = -6350345408771427834L;{    @Field("id")    private Integer id;    @Field("book_name")    private String name;    @Field("book_author")    private String author;    @Field("book_publishtime")    private Date publishtime;    @Field("book_price")    private Double price;    @Field("book_publishgroup")    private String publishgroup;    public Integer getId() {        return id;    }    public void setId(Integer id) {        this.id = id;    }    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }    public String getAuthor() {        return author;    }    public void setAuthor(String author) {        this.author = author;    }    public Date getPublishtime() {        return publishtime;    }    public void setPublishtime(Date publishtime) {        this.publishtime = publishtime;    }    public Double getPrice() {        return price;    }    public void setPrice(Double price) {        this.price = price;    }    public String getPublishgroup() {        return publishgroup;    }    public void setPublishgroup(String publishgroup) {        this.publishgroup = publishgroup;    }    @Override    public String toString() {        return "Book{" +                "id=" + id +                ", name='" + name + '\'' +                ", author='" + author + '\'' +                ", publishtime=" + publishtime +                ", price=" + price +                ", publishgroup='" + publishgroup + '\'' +                '}';    }}

其中,咱们在Book实体类中,应用Solr的注解@Field定义了实体类字段与Solr域之间的关系。

各种工具类的实现

接下来,咱们就在io.mykit.canal.demo.utils包下创立各种工具类。

  • BinlogValue

用于存储binlog剖析的每行每列的value值,代码如下所示。

package io.mykit.canal.demo.utils;import java.io.Serializable;/** *  * ClassName: BinlogValue <br/>  *  * binlog剖析的每行每列的value值;<br> * 新增数据:beforeValue 和 value 均为现有值;<br> * 批改数据:beforeValue是批改前的值;value为批改后的值;<br> * 删除数据:beforeValue和value均是删除前的值; 这个比拟非凡次要是为了删除数据时不便获取删除前的值<br> */public class BinlogValue implements Serializable {    private static final long serialVersionUID = -6350345408773943086L;        private String value;    private String beforeValue;        /**     * binlog剖析的每行每列的value值;<br>     * 新增数据: value:为现有值;<br>     * 批改数据:value为批改后的值;<br>     * 删除数据:value是删除前的值; 这个比拟非凡次要是为了删除数据时不便获取删除前的值<br>     */    public String getValue() {        return value;    }    public void setValue(String value) {        this.value = value;    }        /**     * binlog剖析的每行每列的beforeValue值;<br>     * 新增数据:beforeValue为现有值;<br>     * 批改数据:beforeValue是批改前的值;<br>     * 删除数据:beforeValue为删除前的值; <br>     */    public String getBeforeValue() {        return beforeValue;    }    public void setBeforeValue(String beforeValue) {        this.beforeValue = beforeValue;    }}
  • CanalDataParser

用于解析数据,代码如下所示。

package io.mykit.canal.demo.utils;import java.text.SimpleDateFormat;import java.util.ArrayList;import java.util.Date;import java.util.HashMap;import java.util.List;import java.util.Map;import org.apache.commons.lang.SystemUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.util.CollectionUtils;import com.alibaba.otter.canal.protocol.Message;import com.alibaba.otter.canal.protocol.CanalEntry.Column;import com.alibaba.otter.canal.protocol.CanalEntry.Entry;import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;import com.alibaba.otter.canal.protocol.CanalEntry.EventType;import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;import com.alibaba.otter.canal.protocol.CanalEntry.RowData;import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;import com.google.protobuf.InvalidProtocolBufferException;/** * 解析数据 */public class CanalDataParser {        protected static final String DATE_FORMAT   = "yyyy-MM-dd HH:mm:ss";    protected static final String yyyyMMddHHmmss = "yyyyMMddHHmmss";    protected static final String yyyyMMdd      = "yyyyMMdd";    protected static final String SEP           = SystemUtils.LINE_SEPARATOR;    protected static String  context_format     = null;    protected static String  row_format         = null;    protected static String  transaction_format = null;    protected static String row_log = null;        private static Logger logger = LoggerFactory.getLogger(CanalDataParser.class);        static {        context_format = SEP + "****************************************************" + SEP;        context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}" + SEP;        context_format += "* Start : [{}] " + SEP;        context_format += "* End : [{}] " + SEP;        context_format += "****************************************************" + SEP;        row_format = SEP                     + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {} , delay : {}ms"                     + SEP;        transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {} , delay : {}ms" + SEP;        row_log = "schema[{}], table[{}]";    }    public static List<InnerBinlogEntry> convertToInnerBinlogEntry(Message message) {        List<InnerBinlogEntry> innerBinlogEntryList = new ArrayList<InnerBinlogEntry>();                if(message == null) {            logger.info("接管到空的 message; 疏忽");            return innerBinlogEntryList;        }                long batchId = message.getId();        int size = message.getEntries().size();        if (batchId == -1 || size == 0) {            logger.info("接管到空的message[size=" + size + "]; 疏忽");            return innerBinlogEntryList;        }        printLog(message, batchId, size);        List<Entry> entrys = message.getEntries();        //输入日志        for (Entry entry : entrys) {            long executeTime = entry.getHeader().getExecuteTime();            long delayTime = new Date().getTime() - executeTime;                        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {                    TransactionBegin begin = null;                    try {                        begin = TransactionBegin.parseFrom(entry.getStoreValue());                    } catch (InvalidProtocolBufferException e) {                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);                    }                    // 打印事务头信息,执行的线程id,事务耗时                    logger.info("BEGIN ----> Thread id: {}",  begin.getThreadId());                    logger.info(transaction_format, new Object[] {entry.getHeader().getLogfileName(),                                String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });                } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {                    TransactionEnd end = null;                    try {                        end = TransactionEnd.parseFrom(entry.getStoreValue());                    } catch (InvalidProtocolBufferException e) {                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);                    }                    // 打印事务提交信息,事务id                    logger.info("END ----> transaction id: {}", end.getTransactionId());                    logger.info(transaction_format,                        new Object[] {entry.getHeader().getLogfileName(),  String.valueOf(entry.getHeader().getLogfileOffset()),                                String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });                }                continue;            }            //解析后果            if (entry.getEntryType() == EntryType.ROWDATA) {                RowChange rowChage = null;                try {                    rowChage = RowChange.parseFrom(entry.getStoreValue());                } catch (Exception e) {                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);                }                EventType eventType = rowChage.getEventType();                logger.info(row_format, new Object[] { entry.getHeader().getLogfileName(),                            String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),                            entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });                //组装数据后果                if (eventType == EventType.INSERT || eventType == EventType.DELETE || eventType == EventType.UPDATE) {                    String schemaName = entry.getHeader().getSchemaName();                    String tableName = entry.getHeader().getTableName();                    List<Map<String, BinlogValue>> rows = parseEntry(entry);                    InnerBinlogEntry innerBinlogEntry = new InnerBinlogEntry();                    innerBinlogEntry.setEntry(entry);                    innerBinlogEntry.setEventType(eventType);                    innerBinlogEntry.setSchemaName(schemaName);                    innerBinlogEntry.setTableName(tableName.toLowerCase());                    innerBinlogEntry.setRows(rows);                    innerBinlogEntryList.add(innerBinlogEntry);                } else {                    logger.info(" 存在 INSERT INSERT UPDATE 操作之外的SQL [" + eventType.toString() + "]");                }                continue;            }        }        return innerBinlogEntryList;    }    private static List<Map<String, BinlogValue>> parseEntry(Entry entry) {        List<Map<String, BinlogValue>> rows = new ArrayList<Map<String, BinlogValue>>();        try {            String schemaName = entry.getHeader().getSchemaName();            String tableName = entry.getHeader().getTableName();            RowChange rowChage = RowChange.parseFrom(entry.getStoreValue());            EventType eventType = rowChage.getEventType();            // 解决每个Entry中的每行数据            for (RowData rowData : rowChage.getRowDatasList()) {                StringBuilder rowlog = new StringBuilder("rowlog schema[" + schemaName + "], table[" + tableName + "], event[" + eventType.toString() + "]");                                Map<String, BinlogValue> row = new HashMap<String, BinlogValue>();                List<Column> beforeColumns = rowData.getBeforeColumnsList();                List<Column> afterColumns = rowData.getAfterColumnsList();                beforeColumns = rowData.getBeforeColumnsList();                if (eventType == EventType.DELETE) {//delete                    for(Column column : beforeColumns) {                        BinlogValue binlogValue = new BinlogValue();                        binlogValue.setValue(column.getValue());                        binlogValue.setBeforeValue(column.getValue());                        row.put(column.getName(), binlogValue);                    }                } else if(eventType == EventType.UPDATE) {//update                    for(Column column : beforeColumns) {                        BinlogValue binlogValue = new BinlogValue();                        binlogValue.setBeforeValue(column.getValue());                        row.put(column.getName(), binlogValue);                    }                    for(Column column : afterColumns) {                        BinlogValue binlogValue = row.get(column.getName());                        if(binlogValue == null) {                            binlogValue = new BinlogValue();                        }                        binlogValue.setValue(column.getValue());                        row.put(column.getName(), binlogValue);                    }                } else { // insert                    for(Column column : afterColumns) {                        BinlogValue binlogValue = new BinlogValue();                        binlogValue.setValue(column.getValue());                        binlogValue.setBeforeValue(column.getValue());                        row.put(column.getName(), binlogValue);                    }                }                                rows.add(row);                String rowjson = JacksonUtil.obj2str(row);                                logger.info("#################################### Data Parse Result ####################################");                logger.info(rowlog + " , " + rowjson);                logger.info("#################################### Data Parse Result ####################################");                logger.info("");            }        } catch (InvalidProtocolBufferException e) {            throw new RuntimeException("parseEntry has an error , data:" + entry.toString(), e);        }        return rows;    }    private static void printLog(Message message, long batchId, int size) {        long memsize = 0;        for (Entry entry : message.getEntries()) {            memsize += entry.getHeader().getEventLength();        }        String startPosition = null;        String endPosition = null;        if (!CollectionUtils.isEmpty(message.getEntries())) {            startPosition = buildPositionForDump(message.getEntries().get(0));            endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));        }        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);        logger.info(context_format, new Object[] {batchId, size, memsize, format.format(new Date()), startPosition, endPosition });    }    private static String buildPositionForDump(Entry entry) {        long time = entry.getHeader().getExecuteTime();        Date date = new Date(time);        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);        return entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":" + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";    }}
  • DateUtils

工夫工具类,代码如下所示。

package io.mykit.canal.demo.utils;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;public class DateUtils {        private static final String FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss";        private static SimpleDateFormat sdf = new SimpleDateFormat(FORMAT_PATTERN);        public static Date parseDate(String datetime) throws ParseException{        if(datetime != null && !"".equals(datetime)){            return sdf.parse(datetime);        }        return null;    }            public static String formatDate(Date datetime) throws ParseException{        if(datetime != null ){            return sdf.format(datetime);        }        return null;    }        public static Long formatStringDateToLong(String datetime) throws ParseException{        if(datetime != null && !"".equals(datetime)){            Date d =  sdf.parse(datetime);            return d.getTime();        }        return null;    }        public static Long formatDateToLong(Date datetime) throws ParseException{        if(datetime != null){            return datetime.getTime();        }        return null;    }}
  • InnerBinlogEntry

Binlog实体类,代码如下所示。

package io.mykit.canal.demo.utils;import java.util.ArrayList;import java.util.List;import java.util.Map;import com.alibaba.otter.canal.protocol.CanalEntry.Entry;import com.alibaba.otter.canal.protocol.CanalEntry.EventType;public class InnerBinlogEntry {        /**     * canal原生的Entry     */    private Entry entry;        /**     * 该Entry归属于的表名     */    private String tableName;        /**     * 该Entry归属数据库名     */    private String schemaName;        /**     * 该Entry本次的操作类型,对应canal原生的枚举;EventType.INSERT; EventType.UPDATE; EventType.DELETE;     */    private EventType eventType;        private List<Map<String, BinlogValue>> rows = new ArrayList<Map<String, BinlogValue>>();            public Entry getEntry() {        return entry;    }    public void setEntry(Entry entry) {        this.entry = entry;    }    public String getTableName() {        return tableName;    }    public void setTableName(String tableName) {        this.tableName = tableName;    }    public EventType getEventType() {        return eventType;    }    public void setEventType(EventType eventType) {        this.eventType = eventType;    }    public String getSchemaName() {        return schemaName;    }    public void setSchemaName(String schemaName) {        this.schemaName = schemaName;    }    public List<Map<String, BinlogValue>> getRows() {        return rows;    }    public void setRows(List<Map<String, BinlogValue>> rows) {        this.rows = rows;    }}
  • JacksonUtil

Json工具类,代码如下所示。

package io.mykit.canal.demo.utils;import java.io.IOException;import org.codehaus.jackson.JsonGenerationException;import org.codehaus.jackson.JsonParseException;import org.codehaus.jackson.map.JsonMappingException;import org.codehaus.jackson.map.ObjectMapper;public class JacksonUtil {    private static ObjectMapper mapper = new ObjectMapper();    public static String obj2str(Object obj) {        String json = null;        try {            json = mapper.writeValueAsString(obj);        } catch (JsonGenerationException e) {            e.printStackTrace();        } catch (JsonMappingException e) {            e.printStackTrace();        } catch (IOException e) {            e.printStackTrace();        }        return json;    }    public static <T> T str2obj(String content, Class<T> valueType) {        try {            return mapper.readValue(content, valueType);        } catch (JsonParseException e) {            e.printStackTrace();        } catch (JsonMappingException e) {            e.printStackTrace();        } catch (IOException e) {            e.printStackTrace();        }        return null;    }}

同步程序的实现

筹备好实体类和工具类后,咱们就能够编写同步程序来实现MySQL数据库中的数据实时同步到Solr索引库了,咱们在io.mykit.canal.demo.main包中常见MykitCanalDemoSync类,代码如下所示。

package io.mykit.canal.demo.main;import io.mykit.canal.demo.bean.Book;import io.mykit.canal.demo.utils.BinlogValue;import io.mykit.canal.demo.utils.CanalDataParser;import io.mykit.canal.demo.utils.DateUtils;import io.mykit.canal.demo.utils.InnerBinlogEntry;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry;import com.alibaba.otter.canal.protocol.Message;import org.apache.solr.client.solrj.SolrServer;import org.apache.solr.client.solrj.impl.HttpSolrServer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;import java.text.ParseException;import java.util.List;import java.util.Map;public class SyncDataBootStart {    private static Logger logger = LoggerFactory.getLogger(SyncDataBootStart.class);    public static void main(String[] args) throws Exception {        String hostname = "192.168.175.100";        Integer port = 11111;        String destination = "example";        //获取CanalServer 连贯        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostname, port), destination, "", "");        //连贯CanalServer        canalConnector.connect();        //订阅Destination        canalConnector.subscribe();        //轮询拉取数据        Integer batchSize = 5*1024;        while (true){            Message message = canalConnector.getWithoutAck(batchSize);            long messageId = message.getId();            int size = message.getEntries().size();            if(messageId == -1 || size == 0){                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }else{                //进行数据同步                //1. 解析Message对象                List<InnerBinlogEntry> innerBinlogEntries = CanalDataParser.convertToInnerBinlogEntry(message);                //2. 将解析后的数据信息 同步到Solr的索引库中.                syncDataToSolr(innerBinlogEntries);            }            //提交确认            canalConnector.ack(messageId);        }    }    private static void syncDataToSolr(List<InnerBinlogEntry> innerBinlogEntries) throws Exception {        //获取solr的连贯        SolrServer solrServer = new HttpSolrServer("http://192.168.175.101:8080/solr");        //遍历数据汇合 , 依据数据汇合中的数据信息, 来决定执行减少, 批改 , 删除操作 .        if(innerBinlogEntries != null){            for (InnerBinlogEntry innerBinlogEntry : innerBinlogEntries) {                CanalEntry.EventType eventType = innerBinlogEntry.getEventType();                //如果是Insert, update , 则须要同步数据到 solr 索引库                if(eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE){                    List<Map<String, BinlogValue>> rows = innerBinlogEntry.getRows();                    if(rows != null){                        for (Map<String, BinlogValue> row : rows) {                            BinlogValue id = row.get("id");                            BinlogValue name = row.get("name");                            BinlogValue author = row.get("author");                            BinlogValue publishtime = row.get("publishtime");                            BinlogValue price = row.get("price");                            BinlogValue publishgroup = row.get("publishgroup");                            Book book = new Book();                            book.setId(Integer.parseInt(id.getValue()));                            book.setName(name.getValue());                            book.setAuthor(author.getValue());                            book.setPrice(Double.parseDouble(price.getValue()));                            book.setPublishgroup(publishgroup.getValue());                            book.setPublishtime(DateUtils.parseDate(publishtime.getValue()));                            //导入数据到solr索引库                            solrServer.addBean(book);                            solrServer.commit();                        }                    }                }else if(eventType == CanalEntry.EventType.DELETE){                    //如果是Delete操作, 则须要删除solr索引库中的数据 .                    List<Map<String, BinlogValue>> rows = innerBinlogEntry.getRows();                    if(rows != null){                        for (Map<String, BinlogValue> row : rows) {                            BinlogValue id = row.get("id");                            //依据ID删除solr的索引库                            solrServer.deleteById(id.getValue());                            solrServer.commit();                        }                    }                }            }        }    }}

接下来,启动SyncDataBootStart类的main办法,监听Canal Server,而Canal Server监听MySQL binlog的日志变动,一旦MySQL的binlog日志发生变化,则SyncDataBootStart会立即收到变更信息,并将变更信息解析成Book对象实时更新到Solr库中。如果在MySQL数据库中删除了数据,则也会实时删除Solr库中的数据。

局部参考Canal官网文档:https://github.com/alibaba/canal。

好了,明天就到这儿吧,我是冰河,咱们下期见~~