本文介绍如何利用Canal实现异步、解耦的架构,后续有空再写文章剖析Canal原理和源代码。

Canal简介

Canal是用来获取数据库变更的中间件。
假装本人为MySQL从库,拉取主库binlog并解析、解决。处理结果可发送给MQ,不便其余服务获取数据库变更音讯,这一点十分有用。上面介绍一些典型用处。

其中,Canal+MQ作为一个整体,从外界看来就是一个数据管道服务服务,如下图。

Canal典型用处

异构数据(如ES、HBase、不同路由key的DB)

通过Canal自带的adapter,同步异构数据至ES、HBase,而不必自行实现繁琐的数据转换、同步操作。这里的adapter就是典型的适配器模式,把数据转成相应格局,并写入异构的存储系统。

当然,也能够同步数据至DB,甚至构建一份按不同字段分片路由的数据库。
比方:下单时按用户id分库分表订单记录,而后借助Canal数据通道,构建一份按商家id分库分表的订单记录,用于B端业务(如商家查问本人接到哪些订单)。

缓存刷新

缓存刷新的惯例做法是,先更新DB,再删除缓存,再提早删除(即cache-aside pattern+提早双删),这种多步操作可能失败,而且实现绝对简单。借助Canal刷新缓存,使主服务、主流程无需关怀缓存更新等一致性问题,保障最终一致性。

价格变动等重要业务音讯

上游服务可立刻感知价格变动。
惯例做法是,先批改价格,再收回音讯,此处的难点是要保障音讯肯定发送胜利,以及如果发送不胜利时如何解决。借助Canal,不必在业务层面放心音讯失落的问题。

数据库迁徙

  • 多机房数据同步
  • 拆库
    尽管能够本人在代码中实现双写逻辑,而后对历史数据做解决,然而历史数据也可能被更新,须要一直迭代比照、更新,总之很简单。

实时对账

惯例做法是定时工作跑对账逻辑,时效性低,不能及时发现不统一问题。借助Canal,可实时触发对账逻辑。
大抵流程如下:

  • 接收数据变更音讯
  • 写入hbase作为流水记录
  • 一段窗口工夫过后,触发比拟与对端数据做比拟

Canal客户端demo代码剖析

以下示例是客户端连贯Canal的例子,批改自官网github示例,楼主做了一些优化,并且在要害代码行中退出了正文。如果Canal把数据变更音讯发送至MQ,写法有所不同,不同之处只是一个是订阅Canal,一个是订阅MQ,然而解析和解决逻辑基本相同。

`

public void process() {    // 每批次解决的条数    int batchSize = 1024;    while (running) {        try {            // 连上Canal服务            connector.connect();            // 订阅数据(比方某个表)            connector.subscribe("table_xxx");            while (running) {                // 批量获取数据变更记录                Message message = connector.getWithoutAck(batchSize);                long batchId = message.getId();                int size = message.getEntries().size();                if (batchId == -1 || size == 0) {                    // 非预期状况,需做异样解决                } else {                    // 打印数据变更明细                    printEntry(message.getEntries());                }                if (batchId != -1) {                    // 应用batchId做ack操作:表明该批次解决实现,更新Canal侧生产进度                    connector.ack(batchId);                }            }        } catch (Throwable e) {            logger.error("process error!", e);            try {                Thread.sleep(1000L);            } catch (InterruptedException e1) {                // ignore            }            // 解决失败, 回滚进度            connector.rollback();        } finally {            // 断开连接            connector.disconnect();        }    }}private void printEntry(List<Entry> entrys) {    for (Entry entry : entrys) {        long executeTime = entry.getHeader().getExecuteTime();        long delayTime = new Date().getTime() - executeTime;        Date date = new Date(entry.getHeader().getExecuteTime());        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");        // 只关怀数据变更的类型        if (entry.getEntryType() == EntryType.ROWDATA) {            RowChange rowChange = null;            try {                // 解析数据变更对象                rowChange = RowChange.parseFrom(entry.getStoreValue());            } catch (Exception e) {                throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);            }            EventType eventType = rowChange.getEventType();            logger.info(row_format,                new Object[] { entry.getHeader().getLogfileName(),                        String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),                        entry.getHeader().getTableName(), eventType,                        String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),                        entry.getHeader().getGtid(), String.valueOf(delayTime) });            // 不关怀查问,和DDL变更            if (eventType == EventType.QUERY || rowChange.getIsDdl()) {                logger.info("ddl : " + rowChange.getIsDdl() + " ,  sql ----> " + rowChange.getSql() + SEP);                continue;            }            for (RowData rowData : rowChange.getRowDatasList()) {                if (eventType == EventType.DELETE) {                    // 数据变更类型为 删除 时,打印变动前的列值                    printColumn(rowData.getBeforeColumnsList());                } else if (eventType == EventType.INSERT) {                    // 数据变更类型为 插入 时,打印变动后的列值                    printColumn(rowData.getAfterColumnsList());                } else {                    // 数据变更类型为 其余(即更新) 时,打印变动前后的列值                    printColumn(rowData.getBeforeColumnsList());                    printColumn(rowData.getAfterColumnsList());                }            }        }    }}// 打印列值private void printColumn(List<Column> columns) {    for (Column column : columns) {        StringBuilder builder = new StringBuilder();        try {            if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB")                || StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) {                // get value bytes                builder.append(column.getName() + " : "                               + new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));            } else {                builder.append(column.getName() + " : " + column.getValue());            }        } catch (UnsupportedEncodingException e) {        }        builder.append("    type=" + column.getMysqlType());        if (column.getUpdated()) {            builder.append("    update=" + column.getUpdated());        }        builder.append(SEP);        logger.info(builder.toString());    }}

`