共计 3592 个字符,预计需要花费 9 分钟才能阅读完成。
一、背景
在上一篇文章中,咱们应用 Canal Admin 搭建了 Canal Server 集群,在这篇文章中,咱们应用上篇文章的根底,将音讯发送到 kafka 音讯队列中。
二、须要批改的中央
以下 配置文件的批改,都是在 Canal Admin 上批改的。
1、canal.properties 配置文件批改
1、批改 canal.serverMode 的值
2、批改 kafka 配置
2、批改 instance.propertios 配置文件
3、canal 发消息到 mq 性能优化
影响性能的几个参数:
canal.instance.memory.rawEntry = true
(示意是否须要提前做序列化,非 flatMessage 场景须要设置为 true)canal.mq.flatMessage = false
(false 代表二进制协定,true 代表应用 json 格局,二进制协定有更好的性能)canal.mq.dynamicTopic
(动静 topic 配置定义,能够针对不同表设置不同的 topic,在 flatMessage 模式下能够晋升并行效率)canal.mq.partitionsNum/canal.mq.partitionHash
(分区配置,对写入性能有副作用,不过能够晋升生产端的吞吐)
参考链接 :https://github.com/alibaba/canal/wiki/Canal-MQ-Performance
三、kafka 接管音讯
1、canal 发送过去的音讯
/**
* canal 发送过去的音讯
*
* @author huan.fu 2021/9/2 - 下午 4:06
*/
@Getter
@Setter
@ToString
public class CanalMessage {
/**
* 测试得出 同一个事物下产生多个批改,这个 id 的值是一样的。*/
private Integer id;
/**
* 数据库或 schema
*/
private String database;
/**
* 表名
*/
private String table;
/**
* 主键字段名
*/
private List<String> pkNames;
/**
* 是否是 ddl 语句
*/
private Boolean isDdl;
/**
* 类型:INSERT/UPDATE/DELETE
*/
private String type;
/**
* binlog executeTime, 执行耗时
*/
private Long es;
/**
* dml build timeStamp, 同步工夫
*/
private Long ts;
/**
* 执行的 sql,dml sql 为空
*/
private String sql;
/**
* 数据列表
*/
private List<Map<String, Object>> data;
/**
* 旧数据列表, 用于 update,size 和 data 的 size 一一对应
*/
private List<Map<String, Object>> old;
}
2、监听音讯
@Component
@Slf4j
public class KafkaConsumer {@KafkaListener(topics = "customer", groupId = "canal-kafka-springboot-001", concurrency = "5")
public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) throws InterruptedException {log.info(Thread.currentThread().getName() + ":" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "接管到 kafka 音讯,partition:" + record.partition() + ",offset:" + record.offset() + "value:" + record.value());
CanalMessage canalMessage = JSON.parseObject(record.value(), CanalMessage.class);
log.info("\r=================================");
log.info("接管到的原始 canal message 为: {}", record.value());
log.info("转换成 Java 对象后转换成 Json 为 : {}", JSON.toJSONString(canalMessage));
ack.acknowledge();}
}
3、获取音讯
四、MQ 配置相干的参数
参数名 | 参数阐明 | 默认值 |
---|---|---|
canal.mq.servers | kafka 为 bootstrap.servers rocketMQ 中为 nameserver 列表 | 127.0.0.1:6667 |
canal.mq.retries | 发送失败重试次数 | 0 |
canal.mq.batchSize | kafka 为 ProducerConfig.BATCH_SIZE_CONFIG rocketMQ 无意义 |
16384 |
canal.mq.maxRequestSize | kafka 为 ProducerConfig.MAX_REQUEST_SIZE_CONFIG rocketMQ 无意义 |
1048576 |
canal.mq.lingerMs | kafka 为 ProducerConfig.LINGER_MS_CONFIG , 如果是 flatMessage 格局倡议将该值调大, 如: 200 rocketMQ 无意义 |
1 |
canal.mq.bufferMemory | kafka 为 ProducerConfig.BUFFER_MEMORY_CONFIG rocketMQ 无意义 |
33554432 |
canal.mq.acks | kafka 为 ProducerConfig.ACKS_CONFIG rocketMQ 无意义 |
all |
canal.mq.kafka.kerberos.enable | kafka 为 ProducerConfig.ACKS_CONFIG rocketMQ 无意义 |
false |
canal.mq.kafka.kerberos.krb5FilePath | kafka kerberos 认证 rocketMQ 无意义 | ../conf/kerberos/krb5.conf |
canal.mq.kafka.kerberos.jaasFilePath | kafka kerberos 认证 rocketMQ 无意义 | ../conf/kerberos/jaas.conf |
canal.mq.producerGroup | kafka 无意义 rocketMQ 为 ProducerGroup 名 | Canal-Producer |
canal.mq.accessChannel | kafka 无意义 rocketMQ 为 channel 模式,如果为 aliyun 则配置为 cloud | local |
canal.mq.vhost= | rabbitMQ 配置 | 无 |
canal.mq.exchange= | rabbitMQ 配置 | 无 |
canal.mq.username= | rabbitMQ 配置 | 无 |
canal.mq.password= | rabbitMQ 配置 | 无 |
canal.mq.aliyunuid= | rabbitMQ 配置 | 无 |
canal.mq.canalBatchSize | 获取 canal 数据的批次大小 | 50 |
canal.mq.canalGetTimeout | 获取 canal 数据的超时工夫 | 100 |
canal.mq.parallelThreadSize | mq 数据转换并行处理的并发度 | 8 |
canal.mq.flatMessage | 是否为 json 格局 如果设置为 false, 对应 MQ 收到的音讯为 protobuf 格局 须要通过 CanalMessageDeserializer 进行解码 | false |
canal.mq.topic | mq 里的 topic 名 | 无 |
canal.mq.dynamicTopic | mq 里的动静 topic 规定, 1.1.3 版本反对 | 无 |
canal.mq.partition | 单队列模式的分区下标, | 1 |
canal.mq.partitionsNum | 散列模式的分区数 | 无 |
canal.mq.partitionHash | 散列规定定义 库名. 表名 : 惟一主键,比方 mytest.person: id 1.1.3 版本反对新语法,见下文 |
参考文档:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
五、MQ 接管 binlog 代码
https://gitee.com/huan1993/spring-cloud-parent/tree/master/canal/canal-kafka-consumer
六、参考文章
1、canal 发送 binlog 到 mq 中性能测试.
2、canal 发送音讯到 kafka 中
正文完