本文介绍如何利用 Canal 实现异步、解耦的架构,后续有空再写文章剖析 Canal 原理和源代码。
Canal 简介
Canal 是用来获取数据库变更的中间件。
假装本人为 MySQL 从库,拉取主库 binlog 并解析、解决。处理结果可发送给 MQ,不便其余服务获取数据库变更音讯,这一点十分有用。上面介绍一些典型用处。
其中,Canal+MQ 作为一个整体,从外界看来就是一个数据管道服务服务,如下图。
Canal 典型用处
异构数据(如 ES、HBase、不同路由 key 的 DB)
通过 Canal 自带的 adapter,同步异构数据至 ES、HBase,而不必自行实现繁琐的数据转换、同步操作。这里的 adapter 就是典型的适配器模式,把数据转成相应格局,并写入异构的存储系统。
当然,也能够同步数据至 DB,甚至构建一份按不同字段分片路由的数据库。
比方:下单时按用户 id 分库分表订单记录,而后借助 Canal 数据通道,构建一份按商家 id 分库分表的订单记录,用于 B 端业务(如商家查问本人接到哪些订单)。
缓存刷新
缓存刷新的惯例做法是,先更新 DB,再删除缓存,再提早删除(即 cache-aside pattern+ 提早双删),这种多步操作可能失败,而且实现绝对简单。借助 Canal 刷新缓存,使主服务、主流程无需关怀缓存更新等一致性问题,保障最终一致性。
价格变动等重要业务音讯
上游服务可立刻感知价格变动。
惯例做法是,先批改价格,再收回音讯,此处的难点是要保障音讯肯定发送胜利,以及如果发送不胜利时如何解决。借助 Canal,不必在业务层面放心音讯失落的问题。
数据库迁徙
- 多机房数据同步
- 拆库
尽管能够本人在代码中实现双写逻辑,而后对历史数据做解决,然而历史数据也可能被更新,须要一直迭代比照、更新,总之很简单。
实时对账
惯例做法是定时工作跑对账逻辑,时效性低,不能及时发现不统一问题。借助 Canal,可实时触发对账逻辑。
大抵流程如下:
- 接收数据变更音讯
- 写入 hbase 作为流水记录
- 一段窗口工夫过后,触发比拟与对端数据做比拟
Canal 客户端 demo 代码剖析
以下示例是客户端连贯 Canal 的例子,批改自官网 github 示例,楼主做了一些优化,并且在要害代码行中退出了正文。如果 Canal 把数据变更音讯发送至 MQ,写法有所不同,不同之处只是一个是订阅 Canal,一个是订阅 MQ,然而解析和解决逻辑基本相同。
`
public void process() {
// 每批次解决的条数
int batchSize = 1024;
while (running) {
try {
// 连上 Canal 服务
connector.connect();
// 订阅数据(比方某个表)connector.subscribe("table_xxx");
while (running) {
// 批量获取数据变更记录
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {// 非预期状况,需做异样解决} else {
// 打印数据变更明细
printEntry(message.getEntries());
}
if (batchId != -1) {
// 应用 batchId 做 ack 操作:表明该批次解决实现,更新 Canal 侧生产进度
connector.ack(batchId);
}
}
} catch (Throwable e) {logger.error("process error!", e);
try {Thread.sleep(1000L);
} catch (InterruptedException e1) {// ignore}
// 解决失败, 回滚进度
connector.rollback();} finally {
// 断开连接
connector.disconnect();}
}
}
private void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {long executeTime = entry.getHeader().getExecuteTime();
long delayTime = new Date().getTime() - executeTime;
Date date = new Date(entry.getHeader().getExecuteTime());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 只关怀数据变更的类型
if (entry.getEntryType() == EntryType.ROWDATA) {
RowChange rowChange = null;
try {
// 解析数据变更对象
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
EventType eventType = rowChange.getEventType();
logger.info(row_format,
new Object[] { entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(), eventType,
String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
entry.getHeader().getGtid(), String.valueOf(delayTime) });
// 不关怀查问,和 DDL 变更
if (eventType == EventType.QUERY || rowChange.getIsDdl()) {logger.info("ddl :" + rowChange.getIsDdl() + ", sql ---->" + rowChange.getSql() + SEP);
continue;
}
for (RowData rowData : rowChange.getRowDatasList()) {if (eventType == EventType.DELETE) {
// 数据变更类型为 删除 时,打印变动前的列值
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
// 数据变更类型为 插入 时,打印变动后的列值
printColumn(rowData.getAfterColumnsList());
} else {
// 数据变更类型为 其余(即更新)时,打印变动前后的列值
printColumn(rowData.getBeforeColumnsList());
printColumn(rowData.getAfterColumnsList());
}
}
}
}
}
// 打印列值
private void printColumn(List<Column> columns) {for (Column column : columns) {StringBuilder builder = new StringBuilder();
try {if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB")
|| StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) {
// get value bytes
builder.append(column.getName() + ":"
+ new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));
} else {builder.append(column.getName() + ":" + column.getValue());
}
} catch (UnsupportedEncodingException e) { }
builder.append("type=" + column.getMysqlType());
if (column.getUpdated()) {builder.append("update=" + column.getUpdated());
}
builder.append(SEP);
logger.info(builder.toString());
}
}
`