一、背景在上一篇文章中,咱们应用 Canal Admin 搭建了Canal Server 集群,在这篇文章中,咱们应用上篇文章的根底,将音讯发送到kafka音讯队列中。
二、须要批改的中央以下 配置文件的批改,都是在 Canal Admin 上批改的。1、canal.properties 配置文件批改1、批改canal.serverMode的值
2、批改kafka配置
2、批改 instance.propertios 配置文件
3、canal发消息到mq性能优化影响性能的几个参数:
canal.instance.memory.rawEntry = true (示意是否须要提前做序列化,非flatMessage场景须要设置为true)canal.mq.flatMessage = false (false代表二进制协定,true代表应用json格局,二进制协定有更好的性能)canal.mq.dynamicTopic (动静topic配置定义,能够针对不同表设置不同的topic,在flatMessage模式下能够晋升并行效率)canal.mq.partitionsNum/canal.mq.partitionHash (分区配置,对写入性能有副作用,不过能够晋升生产端的吞吐)参考链接:https://github.com/alibaba/canal/wiki/Canal-MQ-Performance
三、kafka接管音讯1、canal 发送过去的音讯/** * canal 发送过去的音讯 * * @author huan.fu 2021/9/2 - 下午4:06 */@Getter@Setter@ToStringpublic class CanalMessage { /** * 测试得出 同一个事物下产生多个批改,这个id的值是一样的。 */ private Integer id; /** * 数据库或schema */ private String database; /** * 表名 */ private String table; /** * 主键字段名 */ private List<String> pkNames; /** * 是否是ddl语句 */ private Boolean isDdl; /** * 类型:INSERT/UPDATE/DELETE */ private String type; /** * binlog executeTime, 执行耗时 */ private Long es; /** * dml build timeStamp, 同步工夫 */ private Long ts; /** * 执行的sql,dml sql为空 */ private String sql; /** * 数据列表 */ private List<Map<String, Object>> data; /** * 旧数据列表,用于update,size和data的size一一对应 */ private List<Map<String, Object>> old;}2、监听音讯@Component@Slf4jpublic class KafkaConsumer { @KafkaListener(topics = "customer", groupId = "canal-kafka-springboot-001", concurrency = "5") public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) throws InterruptedException { log.info(Thread.currentThread().getName() + ":" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "接管到kafka音讯,partition:" + record.partition() + ",offset:" + record.offset() + "value:" + record.value()); CanalMessage canalMessage = JSON.parseObject(record.value(), CanalMessage.class); log.info("\r================================="); log.info("接管到的原始 canal message为: {}", record.value()); log.info("转换成Java对象后转换成Json为 : {}", JSON.toJSONString(canalMessage)); ack.acknowledge(); }}3、获取音讯
...