乐趣区

关于canal:Canal-Server发送binlog消息到Kafka消息队列中

一、背景

在上一篇文章中,咱们应用 Canal Admin 搭建了 Canal Server 集群,在这篇文章中,咱们应用上篇文章的根底,将音讯发送到 kafka 音讯队列中。

二、须要批改的中央

以下 配置文件的批改,都是在 Canal Admin 上批改的。

1、canal.properties 配置文件批改

1、批改 canal.serverMode 的值

2、批改 kafka 配置

2、批改 instance.propertios 配置文件

3、canal 发消息到 mq 性能优化

影响性能的几个参数:

  1. canal.instance.memory.rawEntry = true (示意是否须要提前做序列化,非 flatMessage 场景须要设置为 true)
  2. canal.mq.flatMessage = false (false 代表二进制协定,true 代表应用 json 格局,二进制协定有更好的性能)
  3. canal.mq.dynamicTopic (动静 topic 配置定义,能够针对不同表设置不同的 topic,在 flatMessage 模式下能够晋升并行效率)
  4. canal.mq.partitionsNum/canal.mq.partitionHash (分区配置,对写入性能有副作用,不过能够晋升生产端的吞吐)

参考链接 :https://github.com/alibaba/canal/wiki/Canal-MQ-Performance

三、kafka 接管音讯

1、canal 发送过去的音讯

/**
 * canal 发送过去的音讯
 *
 * @author huan.fu 2021/9/2 - 下午 4:06
 */
@Getter
@Setter
@ToString
public class CanalMessage {

    /**
     * 测试得出 同一个事物下产生多个批改,这个 id 的值是一样的。*/
    private Integer id;

    /**
     * 数据库或 schema
     */
    private String database;
    /**
     * 表名
     */
    private String table;
    /**
     * 主键字段名
     */
    private List<String> pkNames;
    /**
     * 是否是 ddl 语句
     */
    private Boolean isDdl;
    /**
     * 类型:INSERT/UPDATE/DELETE
     */
    private String type;
    /**
     * binlog executeTime, 执行耗时
     */
    private Long es;
    /**
     * dml build timeStamp, 同步工夫
     */
    private Long ts;
    /**
     * 执行的 sql,dml sql 为空
     */
    private String sql;
    /**
     * 数据列表
     */
    private List<Map<String, Object>> data;
    /**
     * 旧数据列表, 用于 update,size 和 data 的 size 一一对应
     */
    private List<Map<String, Object>> old;
}

2、监听音讯

@Component
@Slf4j
public class KafkaConsumer {@KafkaListener(topics = "customer", groupId = "canal-kafka-springboot-001", concurrency = "5")
    public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) throws InterruptedException {log.info(Thread.currentThread().getName() + ":" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "接管到 kafka 音讯,partition:" + record.partition() + ",offset:" + record.offset() + "value:" + record.value());

        CanalMessage canalMessage = JSON.parseObject(record.value(), CanalMessage.class);

        log.info("\r=================================");
        log.info("接管到的原始 canal message 为: {}", record.value());
        log.info("转换成 Java 对象后转换成 Json 为 : {}", JSON.toJSONString(canalMessage));

        ack.acknowledge();}
}

3、获取音讯

四、MQ 配置相干的参数

参数名 参数阐明 默认值
canal.mq.servers kafka 为 bootstrap.servers rocketMQ 中为 nameserver 列表 127.0.0.1:6667
canal.mq.retries 发送失败重试次数 0
canal.mq.batchSize kafka 为 ProducerConfig.BATCH_SIZE_CONFIG rocketMQ 无意义 16384
canal.mq.maxRequestSize kafka 为 ProducerConfig.MAX_REQUEST_SIZE_CONFIG rocketMQ 无意义 1048576
canal.mq.lingerMs kafka 为 ProducerConfig.LINGER_MS_CONFIG , 如果是 flatMessage 格局倡议将该值调大, 如: 200 rocketMQ 无意义 1
canal.mq.bufferMemory kafka 为 ProducerConfig.BUFFER_MEMORY_CONFIG rocketMQ 无意义 33554432
canal.mq.acks kafka 为 ProducerConfig.ACKS_CONFIG rocketMQ 无意义 all
canal.mq.kafka.kerberos.enable kafka 为 ProducerConfig.ACKS_CONFIG rocketMQ 无意义 false
canal.mq.kafka.kerberos.krb5FilePath kafka kerberos 认证 rocketMQ 无意义 ../conf/kerberos/krb5.conf
canal.mq.kafka.kerberos.jaasFilePath kafka kerberos 认证 rocketMQ 无意义 ../conf/kerberos/jaas.conf
canal.mq.producerGroup kafka 无意义 rocketMQ 为 ProducerGroup 名 Canal-Producer
canal.mq.accessChannel kafka 无意义 rocketMQ 为 channel 模式,如果为 aliyun 则配置为 cloud local
canal.mq.vhost= rabbitMQ 配置
canal.mq.exchange= rabbitMQ 配置
canal.mq.username= rabbitMQ 配置
canal.mq.password= rabbitMQ 配置
canal.mq.aliyunuid= rabbitMQ 配置
canal.mq.canalBatchSize 获取 canal 数据的批次大小 50
canal.mq.canalGetTimeout 获取 canal 数据的超时工夫 100
canal.mq.parallelThreadSize mq 数据转换并行处理的并发度 8
canal.mq.flatMessage 是否为 json 格局 如果设置为 false, 对应 MQ 收到的音讯为 protobuf 格局 须要通过 CanalMessageDeserializer 进行解码 false
canal.mq.topic mq 里的 topic 名
canal.mq.dynamicTopic mq 里的动静 topic 规定, 1.1.3 版本反对
canal.mq.partition 单队列模式的分区下标, 1
canal.mq.partitionsNum 散列模式的分区数
canal.mq.partitionHash 散列规定定义 库名. 表名 : 惟一主键,比方 mytest.person: id 1.1.3 版本反对新语法,见下文

参考文档:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

五、MQ 接管 binlog 代码

https://gitee.com/huan1993/spring-cloud-parent/tree/master/canal/canal-kafka-consumer

六、参考文章

1、canal 发送 binlog 到 mq 中性能测试.
2、canal 发送音讯到 kafka 中

退出移动版