乐趣区

易用的-canal-java-客户端-canalclient

易用的 canaljava 客户端

canal 自身提供了简单的客户端,数据格式较为复杂,处理消费数据也不太方便,为了方便给业务使用,提供一种直接能获取实体对象的方式来进行消费才更方便。
先说一下实现的思路,首先 canal 客户端的消息对象有两种,message 和 flatMessage,分别是普通的消息(protobuf 格式)和消息队列的扁平消息(json 格式),现在将这两种消息转化为我们直接使用的 model 对象,根据消息中的数据库表名称找到对应的的实体对象,那么如何根据数据库表名找到实体对象呢?
第一种方式,如果我们的实体对象都使用 JPA 的 @Table 注解来标识表和实体的对应关系,可以使用该注解来找到实体对象和表名的关系
第二种方式,可以使用自定义注解的来标注实体和表名的关系,为解耦各个表的处理,我们使用策略模式来封装各个表的增删改操作

canal 主要客户端类

ClientIdentity

canal client 和 server 交互之间的身份标识,目前 clientId 写死为 1001. (目前 canal server 上的一个 instance 只能有一个 client 消费,clientId 的设计是为 1 个 instance 多 client 消费模式而预留的)

CanalConnector

SimpleCanalConnector/ClusterCanalConnector : 两种 connector 的实现,simple 针对的是简单的 ip 直连模式,cluster 针对多 ip 的模式,可依赖 CanalNodeAccessStrategy 进行 failover 控制

CanalNodeAccessStrategy

SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:两种 failover 的实现,simple 针对给定的初始 ip 列表进行 failover 选择,cluster 基于 zookeeper 上的 cluster 节点动态选择正在运行的 canal server.

ClientRunningMonitor/ClientRunningListener/ClientRunningData

client running 相关控制,主要为解决 client 自身的 failover 机制。canal client 允许同时启动多个 canal client,通过 running 机制,可保证只有一个 client 在工作,其他 client 做为冷备. 当运行中的 client 挂了,running 会控制让冷备中的 client 转为工作模式,这样就可以确保 canal client 也不会是单点. 保证整个系统的高可用性.

Canal 客户端类型

canal 客户端可以主要分以下几种类型

单一 ip 直连模式

这种方式下,可以启动多个客户端,连接同一个 canal 服务端,多个客户端只有一个 client 工作,其他的可以作为冷备,当一个 client 的挂了,其他的客户端会有一个进入工作模式
缺点:连接同一个服务端,如果服务端挂了将导致不可用

多 ip 模式

这种方式下,客户端连接多个 canal 服务端,一个客户端随机选择一个 canal server 消费,当这个 server 挂了,会选择另外一个进行消费
缺点:不支持订阅消费

zookeeper 模式

使用 zookeeper 来 server,client 的状态,当两个 canal server 连接 zookeeper 后,
优先连接的节点作为 活跃节点,client 从活跃节点消费,当 server 挂了以后,从另外一个节点消费
缺点:不支持订阅消费

消息 队列模式

canal 支持消息直接发送到消息队列,从消息队列消费,目前支持的有 kafka 和 rocketMq,这种方式支持订阅消费

canal 客户端实现

EntryHandler 实体消息处理器

首先定义一个策略接口,定义增加,更新,删除功能,使用 java 8 声明方法为 default,让客户端选择实现其中的方法,提高灵活性,客户端实现 EntryHandler 接口后,会返回基于 handler 中的泛型的实例对象,在对应的方法中实现自定义逻辑

public interface EntryHandler<T> {default void insert(T t) { }


    default void update(T before, T after) { }


    default void delete(T t) {}}

定义一个 canalClient 的抽象类,封装 canal 的链接开启关闭操作,启动一个线程不断去消费 canal 数据,依赖一个 messageHandler 封装消息处理的逻辑

public abstract class AbstractCanalClient implements CanalClient {



    @Override
    public void start() {log.info("start canal client");
        workThread = new Thread(this::process);
        workThread.setName("canal-client-thread");
        flag = true;
        workThread.start();}

    @Override
    public void stop() {log.info("stop canal client");
        flag = false;
        if (null != workThread) {workThread.interrupt();
        }

    }

    @Override
    public void process() {if (flag) {
            try {connector.connect();
                connector.subscribe(filter);
                while (flag) {Message message = connector.getWithoutAck(batchSize, timeout, unit);
                    log.info("获取消息 {}", message);
                    long batchId = message.getId();
                    if (message.getId() != -1 && message.getEntries().size() != 0) {messageHandler.handleMessage(message);
                    }
                    connector.ack(batchId);
                }
            } catch (Exception e) {log.error("canal client 异常", e);
            } finally {connector.disconnect();
            }
        }
    }

}

基于该抽象类,分别提供各种客户端的实现

  1. SimpleCanalClient
  2. ClusterCanalClient
  3. ZookeeperCanalClient
  4. KafkaCanalClient

消息处理器 messageHandler

消息处理器 messageHandler 封装了消息处理逻辑,其中定义了一个消息处理方法

public interface MessageHandler<T> {void handleMessage(T t);

}

消息处理器可能要适配 4 种情况,分别是消费 message,flatMessage 和两种消息的同步与异步消费
消息处理的工作主要有两个

  1. 获取增删改的行数据,交给行处理器继续处理
  2. 在上下文对象中保存其他的数据,例如库名,表名,binlog 时间戳等等数据

首先我们封装一个抽象的 message 消息处理器,实现 MessageHandler 接口

public abstract class AbstractMessageHandler implements MessageHandler<Message> {


    @Override
    public void handleMessage(Message message) {List<CanalEntry.Entry> entries = message.getEntries();
        for (CanalEntry.Entry entry : entries) {if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) {
                try {EntryHandler entryHandler = HandlerUtil.getEntryHandler(entryHandlers, entry.getHeader().getTableName());
                    if(entryHandler!=null){CanalModel model = CanalModel.Builder.builder().id(message.getId()).table(entry.getHeader().getTableName())
                                .executeTime(entry.getHeader().getExecuteTime()).database(entry.getHeader().getSchemaName()).build();
                        CanalContext.setModel(model);
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        for (CanalEntry.RowData rowData : rowDataList) {rowDataHandler.handlerRowData(rowData,entryHandler,eventType);
                        }
                    }
                } catch (Exception e) {throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                }finally {CanalContext.removeModel();
                }

            }
        }
    }
}

分别定义两个实现类,同步与异步实现类, 继承 AbstractMessageHandler 抽象类

public class SyncMessageHandlerImpl extends AbstractMessageHandler {public SyncMessageHandlerImpl(List<? extends EntryHandler> entryHandlers, RowDataHandler<CanalEntry.RowData> rowDataHandler) {super(entryHandlers, rowDataHandler);
    }

    @Override
    public void handleMessage(Message message) {super.handleMessage(message);
    }
}
public class AsyncMessageHandlerImpl extends AbstractMessageHandler {


    private ExecutorService executor;


    public AsyncMessageHandlerImpl(List<? extends EntryHandler> entryHandlers, RowDataHandler<CanalEntry.RowData> rowDataHandler, ExecutorService executor) {super(entryHandlers, rowDataHandler);
        this.executor = executor;
    }

    @Override
    public void handleMessage(Message message) {executor.execute(() -> super.handleMessage(message));
    }
}

RowDataHandler 行消息处理器

消息处理器依赖的行消息处理器主要是将原始的 column list 转为 实体对象,并将相应的增删改消息交给相应的 hangler 对象方法,行消息处理器分别需要处理两种对象,一个是 message 的行数据 和 flatMessage 的行数据

public interface RowDataHandler<T> {void handlerRowData(T t, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception;
}

两个行处理器的实现为

public class RowDataHandlerImpl implements RowDataHandler<CanalEntry.RowData> {



    private IModelFactory<List<CanalEntry.Column>> modelFactory;




    public RowDataHandlerImpl(IModelFactory modelFactory) {this.modelFactory = modelFactory;}

    @Override
    public void handlerRowData(CanalEntry.RowData rowData, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception {if (entryHandler != null) {switch (eventType) {
                case INSERT:
                    Object object = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());
                    entryHandler.insert(object);
                    break;
                case UPDATE:
                    Set<String> updateColumnSet = rowData.getAfterColumnsList().stream().filter(CanalEntry.Column::getUpdated)
                            .map(CanalEntry.Column::getName).collect(Collectors.toSet());
                    Object before = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList(),updateColumnSet);
                    Object after = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());
                    entryHandler.update(before, after);
                    break;
                case DELETE:
                    Object o = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList());
                    entryHandler.delete(o);
                    break;
                default:
                    break;
            }
        }
    }
}
public class MapRowDataHandlerImpl implements RowDataHandler<List<Map<String, String>>> {



    private IModelFactory<Map<String,String>> modelFactory;


    public MapRowDataHandlerImpl(IModelFactory<Map<String, String>> modelFactory) {this.modelFactory = modelFactory;}

    @Override
    public void handlerRowData(List<Map<String, String>> list, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception{if (entryHandler != null) {switch (eventType) {
                case INSERT:
                    Object object = modelFactory.newInstance(entryHandler, list.get(0));
                    entryHandler.insert(object);
                    break;
                case UPDATE:
                    Object before = modelFactory.newInstance(entryHandler, list.get(1));
                    Object after = modelFactory.newInstance(entryHandler, list.get(0));
                    entryHandler.update(before, after);
                    break;
                case DELETE:
                    Object o = modelFactory.newInstance(entryHandler, list.get(0));
                    entryHandler.delete(o);
                    break;
                default:
                    break;
            }
        }
    }
}

IModelFactory bean 实例创建工厂

行消息处理的依赖的工厂 主要是是通过反射创建与表名称对应的 bean 实例

public interface IModelFactory<T> {Object newInstance(EntryHandler entryHandler, T t) throws Exception;


    default Object newInstance(EntryHandler entryHandler, T t, Set<String> updateColumn) throws Exception {return null;}
}

CanalContext canal 消息上下文

目前主要用于保存 bean 实例以外的其他数据,使用 threadLocal 实现

代码已在 github 开源 canal-client

退出移动版