易用的canaljava 客户端

canal 自身提供了简单的客户端,数据格式较为复杂,处理消费数据也不太方便,为了方便给业务使用,提供一种直接能获取实体对象的方式来进行消费才更方便。
先说一下实现的思路,首先canal 客户端的消息对象有两种,message 和 flatMessage,分别是普通的消息(protobuf格式)和消息队列的扁平消息(json格式),现在将这两种消息转化为我们直接使用的 model 对象,根据消息中的数据库表名称找到对应的的实体对象,那么如何根据数据库表名找到实体对象呢?
第一种方式,如果我们的实体对象都使用JPA 的 @Table注解来标识表和实体的对应关系,可以使用该注解来找到实体对象和表名的关系
第二种方式,可以使用自定义注解的来标注实体和表名的关系,为解耦各个表的处理,我们使用策略模式来封装各个表的增删改操作

canal 主要客户端类

ClientIdentity

canal client和server交互之间的身份标识,目前clientId写死为1001. (目前canal server上的一个instance只能有一个client消费,clientId的设计是为1个instance多client消费模式而预留的)

CanalConnector

SimpleCanalConnector/ClusterCanalConnector : 两种connector的实现,simple针对的是简单的ip直连模式,cluster针对多ip的模式,可依赖CanalNodeAccessStrategy进行failover控制

CanalNodeAccessStrategy

SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:两种failover的实现,simple针对给定的初始ip列表进行failover选择,cluster基于zookeeper上的cluster节点动态选择正在运行的canal server.

ClientRunningMonitor/ClientRunningListener/ClientRunningData

client running相关控制,主要为解决client自身的failover机制。canal client允许同时启动多个canal client,通过running机制,可保证只有一个client在工作,其他client做为冷备. 当运行中的client挂了,running会控制让冷备中的client转为工作模式,这样就可以确保canal client也不会是单点. 保证整个系统的高可用性.

Canal 客户端类型

canal 客户端可以主要分以下几种类型

单一ip 直连模式

这种方式下,可以启动多个客户端,连接同一个canal 服务端,多个客户端只有一个client 工作,其他的可以作为冷备,当一个client的挂了,其他的客户端会有一个进入工作模式
缺点:连接同一个服务端,如果服务端挂了将导致不可用

多ip 模式

这种方式下,客户端连接多个canal服务端,一个客户端随机选择一个canal server 消费,当这个server 挂了,会选择另外一个进行消费
缺点:不支持订阅消费

zookeeper 模式

使用zookeeper来server,client 的状态,当两个canal server 连接zookeeper 后,
优先连接的节点作为 活跃节点,client从活跃节点消费,当server挂了以后,从另外一个节点消费
缺点:不支持订阅消费

消息 队列模式

canal 支持消息直接发送到消息队列,从消息队列消费,目前支持的有kafka 和rocketMq,这种方式支持订阅消费

canal 客户端实现

EntryHandler 实体消息处理器

首先定义一个策略接口,定义增加,更新,删除功能,使用java 8声明方法为default,让客户端选择实现其中的方法,提高灵活性,客户端实现EntryHandler接口后,会返回基于handler中的泛型的实例对象,在对应的方法中实现自定义逻辑

public interface EntryHandler<T> {    default void insert(T t) {    }    default void update(T before, T after) {    }    default void delete(T t) {    }}

定义一个canalClient 的抽象类,封装canal 的链接开启关闭操作,启动一个线程不断去消费canal 数据,依赖一个 messageHandler 封装消息处理的逻辑

public abstract class AbstractCanalClient implements CanalClient {    @Override    public void start() {        log.info("start canal client");        workThread = new Thread(this::process);        workThread.setName("canal-client-thread");        flag = true;        workThread.start();    }    @Override    public void stop() {        log.info("stop canal client");        flag = false;        if (null != workThread) {            workThread.interrupt();        }    }    @Override    public void process() {        if (flag) {            try {                connector.connect();                connector.subscribe(filter);                while (flag) {                    Message message = connector.getWithoutAck(batchSize, timeout, unit);                    log.info("获取消息 {}", message);                    long batchId = message.getId();                    if (message.getId() != -1 && message.getEntries().size() != 0) {                        messageHandler.handleMessage(message);                    }                    connector.ack(batchId);                }            } catch (Exception e) {                log.error("canal client 异常", e);            } finally {                connector.disconnect();            }        }    }}

基于该抽象类,分别提供各种客户端的实现

  1. SimpleCanalClient
  2. ClusterCanalClient
  3. ZookeeperCanalClient
  4. KafkaCanalClient

消息处理器 messageHandler

消息处理器 messageHandler 封装了消息处理逻辑,其中定义了一个消息处理方法

public interface MessageHandler<T> {     void handleMessage(T t);}

消息处理器可能要适配4种情况,分别是消费message,flatMessage和两种消息的同步与异步消费
消息处理的工作主要有两个

  1. 获取增删改的行数据,交给行处理器继续处理
  2. 在上下文对象中保存其他的数据,例如库名,表名,binlog 时间戳等等数据

首先我们封装一个抽象的 message 消息处理器,实现MessageHandler接口

public abstract class AbstractMessageHandler implements MessageHandler<Message> {    @Override    public void handleMessage(Message message) {        List<CanalEntry.Entry> entries = message.getEntries();        for (CanalEntry.Entry entry : entries) {            if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) {                try {                    EntryHandler entryHandler = HandlerUtil.getEntryHandler(entryHandlers, entry.getHeader().getTableName());                    if(entryHandler!=null){                        CanalModel model = CanalModel.Builder.builder().id(message.getId()).table(entry.getHeader().getTableName())                                .executeTime(entry.getHeader().getExecuteTime()).database(entry.getHeader().getSchemaName()).build();                        CanalContext.setModel(model);                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());                        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();                        CanalEntry.EventType eventType = rowChange.getEventType();                        for (CanalEntry.RowData rowData : rowDataList) {                            rowDataHandler.handlerRowData(rowData,entryHandler,eventType);                        }                    }                } catch (Exception e) {                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);                }finally {                   CanalContext.removeModel();                }            }        }    }}

分别定义两个实现类,同步与异步实现类,继承AbstractMessageHandler抽象类

public class SyncMessageHandlerImpl extends AbstractMessageHandler {    public SyncMessageHandlerImpl(List<? extends EntryHandler> entryHandlers, RowDataHandler<CanalEntry.RowData> rowDataHandler) {        super(entryHandlers, rowDataHandler);    }    @Override    public void handleMessage(Message message) {        super.handleMessage(message);    }}
public class AsyncMessageHandlerImpl extends AbstractMessageHandler {    private ExecutorService executor;    public AsyncMessageHandlerImpl(List<? extends EntryHandler> entryHandlers, RowDataHandler<CanalEntry.RowData> rowDataHandler, ExecutorService executor) {        super(entryHandlers, rowDataHandler);        this.executor = executor;    }    @Override    public void handleMessage(Message message) {        executor.execute(() -> super.handleMessage(message));    }}

RowDataHandler 行消息处理器

消息处理器依赖的行消息处理器主要是将原始的column list 转为 实体对象,并将相应的增删改消息交给相应的hangler对象方法,行消息处理器分别需要处理两种对象,一个是 message的行数据 和 flatMessage 的行数据

public interface RowDataHandler<T> {    void handlerRowData(T t, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception;}

两个行处理器的实现为

public class RowDataHandlerImpl implements RowDataHandler<CanalEntry.RowData> {    private IModelFactory<List<CanalEntry.Column>> modelFactory;    public RowDataHandlerImpl(IModelFactory modelFactory) {        this.modelFactory = modelFactory;    }    @Override    public void handlerRowData(CanalEntry.RowData rowData, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception {        if (entryHandler != null) {            switch (eventType) {                case INSERT:                    Object object = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());                    entryHandler.insert(object);                    break;                case UPDATE:                    Set<String> updateColumnSet = rowData.getAfterColumnsList().stream().filter(CanalEntry.Column::getUpdated)                            .map(CanalEntry.Column::getName).collect(Collectors.toSet());                    Object before = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList(),updateColumnSet);                    Object after = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());                    entryHandler.update(before, after);                    break;                case DELETE:                    Object o = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList());                    entryHandler.delete(o);                    break;                default:                    break;            }        }    }}
public class MapRowDataHandlerImpl implements RowDataHandler<List<Map<String, String>>> {    private IModelFactory<Map<String,String>> modelFactory;    public MapRowDataHandlerImpl(IModelFactory<Map<String, String>> modelFactory) {        this.modelFactory = modelFactory;    }    @Override    public void handlerRowData(List<Map<String, String>> list, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception{        if (entryHandler != null) {            switch (eventType) {                case INSERT:                    Object object = modelFactory.newInstance(entryHandler, list.get(0));                    entryHandler.insert(object);                    break;                case UPDATE:                    Object before = modelFactory.newInstance(entryHandler, list.get(1));                    Object after = modelFactory.newInstance(entryHandler, list.get(0));                    entryHandler.update(before, after);                    break;                case DELETE:                    Object o = modelFactory.newInstance(entryHandler, list.get(0));                    entryHandler.delete(o);                    break;                default:                    break;            }        }    }}

IModelFactory bean实例创建工厂

行消息处理的依赖的工厂 主要是是通过反射创建与表名称对应的bean实例

public interface IModelFactory<T> {    Object newInstance(EntryHandler entryHandler, T t) throws Exception;    default Object newInstance(EntryHandler entryHandler, T t, Set<String> updateColumn) throws Exception {        return null;    }}

CanalContext canal 消息上下文

目前主要用于保存bean实例以外的其他数据,使用threadLocal实现

代码已在github开源canal-client