关于canal:Canal-adapter-同步-ElasticSearch-记录

之前写过一篇介绍 canal 的文章《mysql增量同步 - canal》,在数据同步的局部,次要着重演示了在代码中通过 canal.client 来同步。过后也有提到 canal adapter,但并未详述。最近愈多地接触 elasticsearch 我的项目的开发,趁着假期试着做了个Demo,顺便记下笔记。 1. canal 介绍1.1. 三兄弟简介canal 对应包的下载和装置的教程,都间接看 canal官网github,安装包目前有三兄弟: canal deployer:又称 canal server,是真正监听 mysql 日志的服务端。canal adapter:顾名思义“适配器”,搭配 canal server,目前能实现mysql 数据到 hbase、rdb、es的增量同步,妥妥的 ETL 工具。canal admin:也是为 canal server 服务的,为canal提供整体配置管理、节点运维等面向运维的性能,提供绝对敌对的WebUI操作界面。如果 canal server 要搭建集群环境,必少不了 canal admin 这样业余的运维工具。对于不太逛github的人,把文档也贴上: wiki 文档release下载包1.2. canal adapter它既然是适配器,那么就得介绍“源头”和“指标”这两个部位数据的对接: 源头:(1)canal adapter 能够直连 canal server ,生产 instance的数据;(2)也能够在让 canal server 将数据投递到 MQ,而后 cancal adapter 生产 MQ 中的数据。指标:目前反对 hbase、rdb、es,后续将反对 mongodb、redis等。本文实现的较简略,数据流向包含:mysql -> canal server -> canal adapter -> es 。 ...

May 3, 2022 · 3 min · jiezi

关于canal:How-old-are-you-尚硅谷大数据之Canal视频教程发布

摘要:谷粉掉的每一根头发,都被刻上了字:汪公子到此一游。 他来了,他又带着礼物走来了。 他眼神中有着梁朝伟同款的忧郁, 稠密的胡茬一看就是有故事的人, 洒脱美少年,皎如玉树临风前。 他就是生产队里的劳模:汪公子。 看看汪公子这一年都干了些什么: Flume新版视频教程Hadoop3.x高可用集群视频教程Flink CDC视频教程Flink实时数仓视频教程Hive源码解析及优化视频教程How old are you,怎么老是你? 学大数据的小伙伴们, 你们掉的每一根头发, 都被刻上了字:汪公子到此一游。 发如不系之舟,渐随雨打风吹去…… 大家伙儿纷纷表示:汪公子啊, 你把手放屁股上,拍拍你的良心吧。 可有头发临时茂密者不知死活, 在垒哥发了Maxwell视频教程之后, 嚷嚷American Chinese not enough, 美中不足,我要招呼神龙! 集齐Canal、Flink CDC、Maxwell。 汪公子这暴脾气挠一下就上来了, 你们不要挑战我的底线! 否则……我还得换底线。 You give me stop,Today—— Let your heart flower angry open! (猜成语,有请评论区大神现身) 不毛之地制造者汪公子又发飙啦: 尚硅谷Canal视频教程公布!Canal是由阿里开发并开源的轻量级MySQL数据抓取软件,在大数据畛域有着十分宽泛的利用,是大数据工程师的必备技能包之一。 Canal能够实时读取MySQL二进制日志Binlog,并生成JSON格局的音讯,作为生产者发送给Kafka、Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序,还能够应用TCP模式自定义数据格式以及发送的上游。 本套教程以Canal的底层原理开展解说,粗疏地介绍了Canal的装置部署及常见利用,具体解说了如何实现MySQL数据的采集,并将数据别离发送至Kafka,同时应用TCP模式深层解析封装的数据,并实现自定义数据格式。 教程含全套视频、教辅文档、代码、软件,一站式搞定数据实时同步神器Canal! Canal教程简介https://www.bilibili.com/vide... 关注尚硅谷官网账号,一手最新视频教程领先看! 教程目录:01.课程介绍02.什么是Canal03.MySQL Binlog介绍04.Canal工作原理05.Canal应用场景06.MySQL环境筹备07.Canal下载与装置08.TCP模式:创立我的项目&Canal封装数据格局剖析09.TCP模式:代码编写 创立连贯&拉取数据10.TCP模式:代码编写 解析后果数据并打印11.TCP模式:代码测试12.Kafka模式:配置文件批改13.Kafka模式:案例测试 大数据视频精选数据仓库4.0视频教程 Atlas视频教程 Spark性能调优视频教程 Zookeeper新版视频教程 Kylin视频教程 DataX视频教程 ClickHouse视频教程 汪公子最近情绪不大漂亮, 他喜爱的歌星和主播塌房了。 这个世界,富人更容易败坏, 但富人更容易腐化。 他在花田里犯了错, 他在瓜田里吃到撑。 “边远的西方有一条龙……” 汪公子始终认为他唱的是文化, ...

December 30, 2021 · 1 min · jiezi

关于canal:Canal-Server发送binlog消息到Kafka消息队列中

一、背景在上一篇文章中,咱们应用 Canal Admin 搭建了Canal Server 集群,在这篇文章中,咱们应用上篇文章的根底,将音讯发送到kafka音讯队列中。 二、须要批改的中央以下 配置文件的批改,都是在 Canal Admin 上批改的。1、canal.properties 配置文件批改1、批改canal.serverMode的值 2、批改kafka配置 2、批改 instance.propertios 配置文件 3、canal发消息到mq性能优化影响性能的几个参数: canal.instance.memory.rawEntry = true (示意是否须要提前做序列化,非flatMessage场景须要设置为true)canal.mq.flatMessage = false (false代表二进制协定,true代表应用json格局,二进制协定有更好的性能)canal.mq.dynamicTopic (动静topic配置定义,能够针对不同表设置不同的topic,在flatMessage模式下能够晋升并行效率)canal.mq.partitionsNum/canal.mq.partitionHash (分区配置,对写入性能有副作用,不过能够晋升生产端的吞吐)参考链接:https://github.com/alibaba/canal/wiki/Canal-MQ-Performance 三、kafka接管音讯1、canal 发送过去的音讯/** * canal 发送过去的音讯 * * @author huan.fu 2021/9/2 - 下午4:06 */@Getter@Setter@ToStringpublic class CanalMessage { /** * 测试得出 同一个事物下产生多个批改,这个id的值是一样的。 */ private Integer id; /** * 数据库或schema */ private String database; /** * 表名 */ private String table; /** * 主键字段名 */ private List<String> pkNames; /** * 是否是ddl语句 */ private Boolean isDdl; /** * 类型:INSERT/UPDATE/DELETE */ private String type; /** * binlog executeTime, 执行耗时 */ private Long es; /** * dml build timeStamp, 同步工夫 */ private Long ts; /** * 执行的sql,dml sql为空 */ private String sql; /** * 数据列表 */ private List<Map<String, Object>> data; /** * 旧数据列表,用于update,size和data的size一一对应 */ private List<Map<String, Object>> old;}2、监听音讯@Component@Slf4jpublic class KafkaConsumer { @KafkaListener(topics = "customer", groupId = "canal-kafka-springboot-001", concurrency = "5") public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) throws InterruptedException { log.info(Thread.currentThread().getName() + ":" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "接管到kafka音讯,partition:" + record.partition() + ",offset:" + record.offset() + "value:" + record.value()); CanalMessage canalMessage = JSON.parseObject(record.value(), CanalMessage.class); log.info("\r================================="); log.info("接管到的原始 canal message为: {}", record.value()); log.info("转换成Java对象后转换成Json为 : {}", JSON.toJSONString(canalMessage)); ack.acknowledge(); }}3、获取音讯 ...

September 3, 2021 · 2 min · jiezi

关于canal:CanalAdmin搭建Canal-Server集群

一、背景应用CanalAdmin来搭建一个canal集群。 二、机器状况服务名机器ip备注canal admin127.0.0.1:8089canal admin 机器canal server 1127.0.0.1:11111本地canal server机器canal server 2127.0.0.1:11113本地canal server机器zk127.0.0.1:2181,<br/>127.0.0.1:3181,<br/>127.0.0.1:4181本地zk集群mysql127.0.0.1:3306本地mysql三、实现步骤1、下载canal admin # 下载 $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz # 解压,解压会产生多个目录 bin、conf等,最好新建一个文件夹,而后在解压 tar -zxvf canal.admin-1.1.5.tar.gz2、配置canalAdmin 3、初始化canal admin数据库 4、启动canal adminbin/startup.sh5、登录canal admin 6、新建集群 7、集群配置多个canal server同一个 canal 配置。(此处次要配置的是 canal.properties) # 须要批改的配置项# zk 的地址须要指定canal.zkServers = 127.0.0.1:2181,127.0.0.1:3181,127.0.0.1:4181# tcp, kafka, rocketMQ, rabbitMQcanal.serverMode = tcp# 此配置须要批改成 default-instancecanal.instance.global.spring.xml = classpath:spring/default-instance.xml# 这个不须要指定,在admin上手动增加canal.destinations = 8、canal server端配置下载canal server $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz1、第一台canal server的配置vim canal_local.properties # register ip# canal server注册到内部zookeeper、admin的ip信息canal.register.ip = 127.0.0.1# canal server 的端口canal.port = 11111# canal server 的metrics 端口canal.metrics.pull.port = 11112# canal admin configcanal.admin.manager = 127.0.0.1:8089# admin端口,canal 1.1.4版本新增的能力,会在canal-server上提供远程管理操作,默认值11110canal.admin.port = 11110# canal admin 利用下 canal.adminUser 的值canal.admin.user = admin# canal admin 利用下 canal.adminPasswd 下的值,然而须要通过 select password('${canal.adminPasswd}')获取,须要去掉后面的*号canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441# admin auto register# 是否开启主动注册模式canal.admin.register.auto = true# 能够指定默认注册的集群名,如果不指定,默认注册为单机模式,集群的名字须要在 canal admin上存在canal.admin.register.cluster = canal_local# 注册到 canal admin 上server的名字,惟一有意义即可canal.admin.register.name = canal_server_012、第二台canal server的配置vim canal_local.properties ...

June 5, 2021 · 2 min · jiezi

关于canal:Canal的简单使用

一、背景工作中有个需要,当数据库的数据变更时,另外一个零碎中的数据要能及时感应到,通过调研晓得,监听数据库的binlog能够做到一个准实时的告诉,而canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和生产,正好满足需要,此处记录一下canal的简略应用。 二、canal的工作原理 步骤: canal模仿mysql slave的交互协定,假装本人为mysql slave,向mysql master发送dump协定mysql master收到dump申请,开始推送binary log给slave(也就是canal)canal解析binary log对象(原始为byte流)三、装置canal1、mysql配置相干1、检测binlog是否开启mysql> show variables like 'log_bin';+---------------+-------+| Variable_name | Value |+---------------+-------+| log_bin | ON |+---------------+-------+1 row in set (0.00 sec)log_bin的值为ON阐明关上了。 2、mysql开启binlog[mysqld]#binlog日志的根本文件名,须要留神的是启动mysql的用户须要对这个目录(/usr/local/var/mysql/binlog)有写入的权限log_bin=/usr/local/var/mysql/binlog/mysql-bin# 配置binlog日志的格局binlog_format = ROW# 配置 MySQL replaction 须要定义,不能和 canal 的 slaveId 反复server-id=1# 设置中继日志的门路relay_log=/usr/local/var/mysql/relaylog/mysql-relay3、创立canal用户CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;FLUSH PRIVILEGES;2、canal配置相干1、下载canal # 1.下载 deployer $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz # 下载适配器,不是必须的 $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz # 下载治理台,不是必须的 $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz # 下载示例程序 $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.example-1.1.5.tar.gz # 2.解压 deployer (解压后的目录要存在) tar -zxvf canal.deployer-1.1.5.tar.gz -C /Users/huan/soft/canal/deployer/ # 3. 查看 conf 目录构造 $ tree conf conf ├── canal.properties ├── canal_local.properties ├── example │   └── instance.properties ├── logback.xml ├── metrics │   └── Canal_instances_tmpl.json └── spring ├── base-instance.xml ├── default-instance.xml ├── file-instance.xml ├── group-instance.xml ├── memory-instance.xml └── tsdb ├── h2-tsdb.xml ├── mysql-tsdb.xml ├── sql │   └── create_table.sql └── sql-map ├── sqlmap-config.xml ├── sqlmap_history.xml └── sqlmap_snapshot.xml2、配置一个instanceinstance:一个instance就是一个音讯队列,每个instance通道都有各自的一份配置,因为每个mysql的ip,帐号,明码等信息各不相同。 ...

June 2, 2021 · 3 min · jiezi

关于canal:Flink-最佳实践之使用-Canal-同步-MySQL-数据至-TiDB

简介:本文将介绍如何将 MySQL 中的数据,通过 Binlog + Canal 的模式导入到 Kafka 中,继而被 Flink 生产的案例。一. 背景介绍本文将介绍如何将 MySQL 中的数据,通过 Binlog + Canal 的模式导入到 Kafka 中,继而被 Flink 生产的案例。 为了可能疾速的验证整套流程的功能性,所有的组件都以单机的模式部署。如果手上的物理资源有余,能够将本文中的所有组件一台 4G 1U 的虚拟机环境中。 如果须要在生产环境中部署,倡议将每一个组件替换成高可用的集群部署计划。 其中,咱们独自创立了一套 Zookeeper 单节点环境,Flink、Kafka、Canal 等组件共用这个 Zookeeper 环境。 针对于所有须要 JRE 的组件,如 Flink,Kafka,Canal,Zookeeper,思考到降级 JRE 可能会影响到其余的利用,咱们抉择每个组件独立应用本人的 JRE 环境。 本文分为两个局部,其中,前七大节次要介绍根底环境的搭建,最初一个大节介绍了数据是如何在各个组件中流通的。 数据的流动通过以下组件: MySQL 数据源生成 Binlog。Canal 读取 Binlog,生成 Canal json,推送到 Kafka 指定的 Topic 中。Flink 应用 flink-sql-connector-kafka API,生产 Kafka Topic 中的数据。Flink 在通过 flink-connector-jdbc,将数据写入到 TiDB 中。TiDB + Flink 的构造,反对开发与运行多种不同品种的应用程序。 目前次要的个性次要包含: ...

May 20, 2021 · 10 min · jiezi

关于canal:基于canal实现mysql的数据同步

canal是什么?canal [k'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和生产 基于日志增量订阅和生产的业务包含 数据库镜像数据库实时备份索引构建和实时保护(拆分异构索引、倒排索引等)业务 cache 刷新带业务逻辑的增量数据处理以后的 canal 反对源端 MySQL 版本包含 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x 工作原理 [](https://github.com/alibaba/ca...基于下面的解说,咱们在实现canal之前,先简略做一个主从复制。一主 一从首先下载mysql 镜像,并启动docker pull mysql:latestdocker run -itd --name mysql-1 -p 23306:3306 -e MYSQL_ROOT_PASSWORD=root mysqldocker run -itd --name mysql-2 -p 23307:3306 -e MYSQL_ROOT_PASSWORD=root mysql相干命令再解释一下:name xxx :xxx为容器名p 111:222 其中111是宿主机端口,222是容器端口MYSQL_ROOT_PASSWORD=root 设置root账户明码为root 进入容器测试一下,一切正常 设置 mysql-1为主,mysql-2为从库批改一下 mysql的配置,装置vim编辑器apt-get updateapt-get install vim在主库 创立一个mysql账户给从库应用CREATE USER 'slave'@'%' IDENTIFIED BY '123456';GRANT REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'slave'@'%';FLUSH PRIVILEGES; ...

November 2, 2020 · 2 min · jiezi

关于canal:canal探究

后面的文章应用canal订阅mysql数据变动进而同步数据,这里钻研canal的外部个性,进而更好地应用canal,大部分内容来自官网,还有一部分来自我的了解。canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和生产。 利用场景:异构数据同步数据库实时备份业务cache刷新原理canal模仿成mysql slave向master发送dump申请,收到binlog数据进行解析slave同步master原理: master将扭转记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,能够通过show binlog events进行查看)slave将binary log events拷贝到它的中继日志(relay log)slave重做中继日志中的事件,将扭转反映它本人的数据 架构server代表一个canal运行实例,对应于一个jvminstance对应一个数据队列,一个destination,相当于一个数据库实例变更的监听,1个server对应1-n个instanceinstance模块: eventParser (数据源接入,模仿slave协定和master进行交互,协定解析)eventSink (Parser和Store链接器,进行数据过滤,加工,散发的工作)eventStore (数据存储)metaManager (增量订阅&生产信息管理器)EventParser Connection获取上一次解析胜利的地位 (如果第一次启动,则获取初始指定的地位或者是以后数据库的binlog位点)Connection建设链接,发送BINLOG_DUMP指令Mysql开始推送Binaly Log接管到的Binaly Log的通过Binlog parser进行协定解析,补充一些特定信息传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储胜利存储胜利后,定时记录Binaly Log地位EventSink 数据过滤:反对通配符的过滤模式,表名,字段内容等数据路由/散发:解决1:n (1个parser对应多个store的模式)数据归并:解决n:1 (多个parser对应1个store)数据加工:在进入store之前进行额定的解决,比方joinEventStore Instance设计instance代表了一个理论运行的数据队列,包含了EventPaser,EventSink,EventStore等组件。形象了CanalInstanceGenerator,次要是思考配置的治理形式: manager形式:提供http形式,能够和公司外部web console/manager零碎进行对接。spring形式:基于spring xml + properties进行定义,通过spring配置Spring配置spring配置的原理是将整个配置形象为两局部: xxxx-instance.xml (canal组件的配置定义,能够在多个instance配置中共享,在canal.properties中配置)xxxx.properties (每个instance通道都有各自一份定义,因为每个mysql的ip,帐号,明码等信息不会雷同)通过spring的PropertyPlaceholderConfigurer通过机制将其交融,生成一份instance实例对象,每个instance对应的组件都是互相独立的,互不影响 properties配置文件properties配置分为两局部: canal.properties (零碎根配置文件,配置destinations,注册IP,启动端口)instance.properties (instance级别的配置文件,每个instance一份,配置数据库信息,监听的表)canal.properties介绍:canal配置次要分为两局部定义: instance列表定义 (列出以后server上有多少个instance,每个instance的加载形式是spring/manager等)common参数定义,比方能够将instance.properties的专用参数,抽取搁置到这里,这样每个instance启动的时候就能够共享. 【instance.properties配置定义优先级高于canal.properties】canal如何保护一份增量订阅&生产的关系信息:解析位点 (parse模块会记录,上一次解析binlog到了什么地位,对应组件为:CanalLogPositionManager)生产位点 (canal server在接管了客户端的ack后,就会记录客户端提交的最初位点,对应的组件为:CanalMetaManager)对应的两个位点组件,目前都有几种实现: memory memory-instance.xml中应用,所有的组件(parser , sink , store)都抉择了内存版模式,记录位点的都抉择了memory模式,重启后又会回到初始位点进行解析速度最快,依赖起码(不须要zookeeper)场景:个别利用在quickstart,或者是呈现问题后,进行数据分析的场景,不应该将其利用于生产环境zookeepermixedperiod: default-instance.xml中应用,汇合了zookeeper+memory模式,store抉择了内存模式,其余的parser/sink依赖的位点治理抉择了长久化模式,目前长久化的形式次要是写入zookeeper,保证数据集群共享反对HA,可用于生产环境,集群化部署一份 instance.xml 中有一份或者多份 instance 定义,优先以 destination 名字查找对应的 instance bean 定义,如果没有,则按默认的名字 “instance” 查找 instance 对象,例如 xxxx-instance.xml 中定义 id 别离为 instance-1, instance-2 的两个 bean. 这两个 bean 将为同名的 instance 提供自定义的 eventParser , evnetSink , evnetStore , metaManager,alarmHandler.如果没有自定义这些 bean, 就应用 id="instance" 的 bean 来配置 canal instance.一份 instance bean 定义,须要蕴含 eventParser , evnetSink , evnetStore , metaManager,alarmHandler 的5个模块定义,( alarmHandler 次要是一些报警机制解决,因为简略没开展,可扩大)instance.xml设计初衷:容许进行自定义扩大,比方实现了基于数据库的位点治理后,能够自定义一份本人的instance.xml,整个canal设计中最大的灵活性在于此 ...

October 14, 2020 · 1 min · jiezi

关于canal:canal动态监控Mysql将binlog日志解析后把采集到的数据发送到Kafka

生产者要将发送的数据转化为字节数组能力通过网络动员给Kafka,对于一些简略的数据,Kafka自带了一些序列化工具。 //创立生产者实例private static Producer<String , String> createProducer(){ Properties properties = new Properties(); properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrap); properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeper); properties.put("serializer.class" , StringEncoder.class.getName()); return new Producer<String, String>(new ProducerConfig(properties));}在通常的微服务中,服务之间须要频繁的传递各种负责的数据结构,然而kafka仅仅反对简略的类型如String,Integer。于是咱们在服务之间应用JSONObject,因为JSON能够很容易的转化为String,而String的序列化和反序列化曾经被反对。 JSONObject jsonObject = new JSONObject();jsonObject.put("logFileName", logFileName);jsonObject.put("logFileOffset", logFileOffset);jsonObject.put("dbName", dbName);jsonObject.put("tableName", tableName);jsonObject.put("eventType", eventType);jsonObject.put("columnValueList", columnValueList);jsonObject.put("emptyCount", emptyCount);jsonObject.put("timestamp", timestamp);//拼接所有binlog解析的字段String data = JSON.toJSONString(jsonObject);// 解析后的数据发送到kafkaKafkaSender.sendMessage(GlobalConfigUtil.kafkaInput, JSON.toJSONString(key), data);ResourceBundle类是用来读取propertise资源文件的,能够在初始化时把配置项全部一次读入,并保留在动态成员变量中。防止每次须要的时候才去读取相干配置文件的class,I/O速度慢,容易造成性能上的瓶颈。 //读取application.properties文件private static ResourceBundle resourceBundle = ResourceBundle.getBundle("application");public static String canalHost= resourceBundle.getString("canal.host");public static String canalPort = resourceBundle.getString("canal.port");public static String canalInstance = resourceBundle.getString("canal.instance");public static String mysqlUsername = resourceBundle.getString("mysql.username");public static String mysqlPassword= resourceBundle.getString("mysql.password");public static String kafkaBootstrap= resourceBundle.getString("kafka.bootstrap.servers");public static String kafkaZookeeper= resourceBundle.getString("kafka.zookeeper.connect");public static String kafkaInput = resourceBundle.getString("kafka.input.topic");残缺代码 ...

September 13, 2020 · 5 min · jiezi

关于canal:canal的安装和配置本地客户端解析mysql-binlog

什么是canalcanal是阿里巴巴旗下的一款开源我的项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&生产,目前次要反对了Mysql(也反对mariaDB)。 先理解下Mysql主备复制的工作原理 master将扭转记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,能够通过show binlog events进行查看);slave将master的binary log events拷贝到它的中继日志(relay log);slave重做中继日志中的事件,将扭转反映它本人的数据。canal的工作原理 canal模仿mysql slave的交互协定,假装本人为mysql slave,向mysql master发送dump协定mysql master收到dump申请,开始推送binary log给slave(也就是canal)canal解析binary log对象(原始为byte流)canal架构设计 server代表一个canal运行实例,对应于一个jvminstance对应于一个数据队列 (1个server对应1…n个instance) canal的instance模块 eventParser (数据源接入,模仿slave协定和master进行交互,协定解析)eventSink (Parser和Store链接器,进行数据过滤,加工,散发的工作)eventStore (数据存储)metaManager (增量订阅&生产信息管理器)mysql开启binlog步骤登录Mysql后应用show variables like 'log_%'; 查看是否开启binlog 编辑配置文件vim /etc/my.cnf server_id=1 #配置 MySQL replaction 须要定义,不要和 canal 的 slaveId 反复log-bin=mysql-bin #开启 binlogbinlog-format=ROW #抉择 ROW模式expire_logs_days=30 重启Mysql服务 systemctl restart mysqld,而后再次应用命令show variables like 'log_%';进行查看,为 ON表明binlog已胜利开启 受权 canal 链接 MySQL 账号具备作为 MySQL slave 的权限 CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';FLUSH PRIVILEGES;其中, /usr/local/mysql/data 为binlog日志文件寄存门路,mysql-bin是日志文件系统的前缀名 ...

September 13, 2020 · 2 min · jiezi

关于canal:CanalCanal集群实践

一、前言 Canal是alibaba开源的中间件,纯java开发,是一个用来数据同步的数据管道,它将本人伪装成mysql的slaver,具备解析bin log的能力,为上游增量同步业务提供服务。Canal能够多个节点联合zookeeper组成高可用集群,Canal集群中同时只有一个active状态的节点用来解析(多线程)bin log,如果有节点退出,之前节点曾经解析实现的解析位点和生产位点会同步到zookeeper,另外的节点就会顶上去持续解析bin log(从zookeeper读取解析位点和生产位点),为上游客户端提供服务。以下将搭建一个Canal高可用集群并将解析的bin log间接投递音讯到阿里云RocketMQ,供业务消费者生产。实现mysql数据增量同步到Elasticsearch的工作。 参考官网wiki二、集群搭建资源筹备1、mysql筹备须要筹备mysql服务标的服务地址库名称用户明文明码mysql.test.yiyaowang.com:3306b2cb2cd41d8cd98f00b204mysql2.test.yiyaowang.com:3306yc_orderyc_orderd41d8cd98f00b204阐明:存在两个数据库实例,以下实际Canal须要同时监听多个库。 数据库明文明码采纳druid加密D:\yyw_mvn_repo\repository\com\alibaba\druid\1.1.21>java -cp druid-1.1.21.jar com.alibaba.druid.filter.config.ConfigTools d41d8cd98f00b204privateKey:MIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEAmmWRWcOG/HlVwLsN4FpnaOaQVKPAdvJBU5b24EVo0UHwLf8W08nqBr+DbTgKH3idgLtK0WURps4kFlGQKtOcEQIDAQABAkAmDeyiXD/0EI/jPfdwmbetMk7Wnbm9V35kdOwKYPExyhWtYjJlrBrRXJH+cafCEov13UvFpB5PO5PnUJLnqeoVAiEAzsb5W74wj6yc8En+DBwhI9Yd/HD40orl+U8wuhvmprMCIQC/JoVs28aj2YzphvtzeGCuxKIxeFcCqE9iybhHzIH0KwIgJlGnSkIfm7CAUONVagcYeRyn5+1DnzjQT3hGbmbXQpMCIQCKP2sKk110TbirgXPFTM/oNtDzpIyRoHdiBHDihNeMZwIhAIpE+nSOCNIWfbpc/ysOfTF/0iMqdHug3eo3HrYY75htpublicKey:MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJplkVnDhvx5VcC7DeBaZ2jmkFSjwHbyQVOW9uBFaNFB8C3/FtPJ6ga/g204Ch94nYC7StFlEabOJBZRkCrTnBECAwEAAQ==password:KbOYWBV9y9OFRVcegHMdkyPtV4vBDSahBsRRxXzvLfCUQLdhVJd5Tif571EHvM0EuBwJLmr+6LkR92KxNs4heg==druid加解密测试 /** * druid加解密测试 * @throws Exception */ @Test public void druidDecryptTest() throws Exception { //私钥 String privateKey = "MIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEAmmWRWcOG/HlVwLsN4FpnaOaQVKPAdvJBU5b24EVo0UHwLf8W08nqBr+DbTgKH3idgLtK0WURps4kFlGQKtOcEQIDAQABAkAmDeyiXD/0EI/jPfdwmbetMk7Wnbm9V35kdOwKYPExyhWtYjJlrBrRXJH+cafCEov13UvFpB5PO5PnUJLnqeoVAiEAzsb5W74wj6yc8En+DBwhI9Yd/HD40orl+U8wuhvmprMCIQC/JoVs28aj2YzphvtzeGCuxKIxeFcCqE9iybhHzIH0KwIgJlGnSkIfm7CAUONVagcYeRyn5+1DnzjQT3hGbmbXQpMCIQCKP2sKk110TbirgXPFTM/oNtDzpIyRoHdiBHDihNeMZwIhAIpE+nSOCNIWfbpc/ysOfTF/0iMqdHug3eo3HrYY75ht"; //公钥 String publicKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJplkVnDhvx5VcC7DeBaZ2jmkFSjwHbyQVOW9uBFaNFB8C3/FtPJ6ga/g204Ch94nYC7StFlEabOJBZRkCrTnBECAwEAAQ=="; //密文明码 String password = "KbOYWBV9y9OFRVcegHMdkyPtV4vBDSahBsRRxXzvLfCUQLdhVJd5Tif571EHvM0EuBwJLmr+6LkR92KxNs4heg=="; //druid解密 log.info("ConfigTools.decrypt:{}", ConfigTools.decrypt(publicKey, password)); //druid加密 log.info("ConfigTools.encrypt:{}", ConfigTools.encrypt(privateKey, "d41d8cd98f00b204")); }mysql服务必须开启bin log反对批改mysql配置文件my.cnf[mysqld] #开启bin log log-bin=mysql-bin #抉择row模式 binlog-format=ROW #配置mysql replaction须要定义,不能和canal的slaveId反复 server_id=1 阐明:Canal的原理是基于mysql binlog技术,所以这里肯定须要开启mysql的binlog写入性能,并且配置binlog模式为row。 验证mysql配置文件my.cnfmysql> show variables like 'binlog_format';+---------------+-------+| Variable_name | Value |+---------------+-------+| binlog_format | ROW |+---------------+-------+mysql> show variables like 'log_bin';+---------------+-------+| Variable_name | Value |+---------------+-------+| log_bin | ON |+---------------+-------+筹备一个具备复制相干权限的mysql用户#创立用户CREATE USER b2c IDENTIFIED BY 'd41d8cd98f00b204'; #授予slaver复制所需的相干权限GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'b2c'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'b2c'@'%' ; #刷新权限,使得创立的用户权限失效FLUSH PRIVILEGES; #查看用户被授予的权限show grants for 'b2c'2、zookeeper筹备Canal高可用集群依赖于zookeeper作为对立协调者,各个Canal server信息、Canal client信息、解析位点、生产位点会写入zookeeper;Canal客户端间接从zookeeper获取Canal服务端信息来生产,zk对立协调,保障只有一个Canal server处于激活状态。 ...

September 4, 2020 · 5 min · jiezi

关于canal:移山数据迁移平台实时数据同步服务是如何保证消息的顺序性

上一篇介绍了移山(数据迁徙平台)实时数据同步的整体架构; 本文次要介绍移山(数据迁徙平台)实时数据同步是如何保障音讯的程序性。能够拜访 这里 查看更多对于大数据平台建设的原创文章。一. 什么是音讯的程序性?音讯生产端将音讯发送给同一个MQ服务器的同一个分区,并且按程序发送;生产生产端依照音讯发送的程序进行生产。二. 为什么要保障音讯的程序性?在某些业务性能场景下须要保障音讯的发送和接管程序是统一的,否则会影响数据的应用。 须要保障音讯有序的场景移山的实时数据同步应用 canal 组件订阅MySQL数据库的日志,并将其投递至 kafka 中(想理解移山实时同步服务架构设计的能够点这里); kafka 生产端再依据具体的数据应用场景去解决数据(存入 HBase、MySQL 或间接做实时剖析); 因为binlog 自身是有序的,因而写入到mq之后也须要保障程序。如果当初移山创立了一个实时同步工作,而后订阅了一个业务数据库的订单表;上游业务,向订单表里插入了一个订单,而后对该订单又做了一个更新操作,则 binlog 里会主动写入插入操作和更新操作的数据,这些数据会被 canal server 投递至 kafka broker 外面;如果 kafka 生产端先生产到了更新日志,后生产到插入日志,则在往指标表里做操作时就会因为数据缺失导致产生异样。三. 移山实时同步服务是怎么保障音讯的程序性实时同步服务音讯解决整体流程如下: 咱们次要通过以下两个方面去保障保障音讯的程序性。 1. 将须要保障程序的音讯发送到同一个partition1.1 kafka的同一个partition内的音讯是有序的kafka 的同一个 partition 用一个write ahead log组织, 是一个有序的队列,所以能够保障FIFO的程序;因而生产者依照肯定的程序发送音讯,broker 就会依照这个程序把音讯写入 partition,消费者也会依照雷同的程序去读取音讯;kafka 的每一个 partition 不会同时被两个消费者实例生产,由此能够保障音讯生产的程序性。1.2 管制同一key散发到同一partition要保障同一个订单的屡次批改达到 kafka 里的程序不能乱,能够在Producer 往 kafka 插入数据时,管制同一个key (能够采纳订单主键key-hash算法来实现)发送到同一 partition,这样就能保障同一笔订单都会落到同一个 partition 内。 1.3 canal 须要做的配置canal 目前反对的mq有kafka/rocketmq,实质上都是基于本地文件的形式来反对了分区级的程序音讯的能力。咱们只需在配置 instance 的时候开启如下配置即可: 1> canal.properties # leader节点会期待所有同步中的正本确认之后再确认这条记录是否发送实现canal.mq.acks = all备注: ...

August 14, 2020 · 2 min · jiezi

关于canal:移山数据迁移平台实时数据同步服务是如何保证消息的顺序性

上一篇介绍了移山(数据迁徙平台)实时数据同步的整体架构; 本文次要介绍移山(数据迁徙平台)实时数据同步是如何保障音讯的程序性。能够拜访 这里 查看更多对于大数据平台建设的原创文章。一. 什么是音讯的程序性?音讯生产端将音讯发送给同一个MQ服务器的同一个分区,并且按程序发送;生产生产端依照音讯发送的程序进行生产。二. 为什么要保障音讯的程序性?在某些业务性能场景下须要保障音讯的发送和接管程序是统一的,否则会影响数据的应用。 须要保障音讯有序的场景移山的实时数据同步应用 canal 组件订阅MySQL数据库的日志,并将其投递至 kafka 中(想理解移山实时同步服务架构设计的能够点这里); kafka 生产端再依据具体的数据应用场景去解决数据(存入 HBase、MySQL 或间接做实时剖析); 因为binlog 自身是有序的,因而写入到mq之后也须要保障程序。如果当初移山创立了一个实时同步工作,而后订阅了一个业务数据库的订单表;上游业务,向订单表里插入了一个订单,而后对该订单又做了一个更新操作,则 binlog 里会主动写入插入操作和更新操作的数据,这些数据会被 canal server 投递至 kafka broker 外面;如果 kafka 生产端先生产到了更新日志,后生产到插入日志,则在往指标表里做操作时就会因为数据缺失导致产生异样。三. 移山实时同步服务是怎么保障音讯的程序性实时同步服务音讯解决整体流程如下: 咱们次要通过以下两个方面去保障保障音讯的程序性。 1. 将须要保障程序的音讯发送到同一个partition1.1 kafka的同一个partition内的音讯是有序的kafka 的同一个 partition 用一个write ahead log组织, 是一个有序的队列,所以能够保障FIFO的程序;因而生产者依照肯定的程序发送音讯,broker 就会依照这个程序把音讯写入 partition,消费者也会依照雷同的程序去读取音讯;kafka 的每一个 partition 不会同时被两个消费者实例生产,由此能够保障音讯生产的程序性。1.2 管制同一key散发到同一partition要保障同一个订单的屡次批改达到 kafka 里的程序不能乱,能够在Producer 往 kafka 插入数据时,管制同一个key (能够采纳订单主键key-hash算法来实现)发送到同一 partition,这样就能保障同一笔订单都会落到同一个 partition 内。 1.3 canal 须要做的配置canal 目前反对的mq有kafka/rocketmq,实质上都是基于本地文件的形式来反对了分区级的程序音讯的能力。咱们只需在配置 instance 的时候开启如下配置即可: 1> canal.properties # leader节点会期待所有同步中的正本确认之后再确认这条记录是否发送实现canal.mq.acks = all备注: ...

August 14, 2020 · 2 min · jiezi

关于canal:移山数据迁移平台实时数据同步服务的架构设计

移山是禧云自研的数据迁徙平台,蕴含异构数据源的迁徙、实时数据同步等服务。有趣味的能够看这里理解 在移山中怎么实现异构数据源的迁徙;本文次要介绍移山实时数据同步服务产生的背景以及整体架构设计。能够拜访 这里 查看更多对于大数据平台建设的原创文章。一. 移山实时数据同步服务产生背景禧云各个子公司业务零碎根本都是以 MySQL 为主;做为数据反对部门,须要订阅这些业务数据做为数据仓库的数据源,来进行上游的数据分析。比方: 各种离线数据 T+1 报表展现;实时数据大屏展现等。微信小程序实时数据指标展现 像这种常见的实时数据指标大屏展现,背地可能就用到实时数据同步服务技术栈。 二. 移山实时数据同步服务应用canal中间件1. 应用场景合乎它能够对 MySQL 数据库增量日志解析,提供增量数据订阅和生产,完全符合咱们的应用场景。 2. 反对将订阅到的数据投递到kafkacanal 1.1.1版本之后,server端能够通过简略的配置就能将订阅到的数据投递到MQ中,目前反对的MQ有kafka、RocketMQ,代替老版本中必须通过手动编码投递的形式。 移山的实时数据同步服务应用的MQ为kafka,以下为次要配置: 批改canal.properties中配置# 这里写上以后canal server所在机器的ipcanal.ip = 10.200.*.109# register ip to zookeeper(这里写上以后canal server所在机器的ip)canal.register.ip = 10.200.*.109# 指定注册的zk集群地址canal.zkServers =10.200.*.109:2181,10.200.*.110:2181# tcp, kafka, RocketMQ(设置serverMode模式,这个配置十分要害,咱们设置为kafka)canal.serverMode = kafka# 这个demo就是conf目录里的实例canal.destinations = demo# HA模式必须应用该xml,须要将相干数据写入zookeeper,保证数据集群共享canal.instance.global.spring.xml = classpath:spring/default-instance.xml# 这里设置 kafka集群地址(其它对于mq的配置参数能够依据理论状况设置)canal.mq.servers = 10.200.*.108:9092,10.200.*.111:9092批改demo.properties中配置# canal假装的MySQL slave的编号,不能与MySQL数据库和其余的slave反复# canal.instance.MySQL.slaveId=1003# 按需批改成本人的数据库信息# position info(须要订阅的MySQL数据库地址)canal.instance.master.address=10.200.*.109:3306# 这里配置要订阅的数据库,数据库的用户名和明码canal.instance.dbUsername=canalcanal.instance.dbPassword=canalcanal.instance.defaultDatabaseName =# 设置要订阅的topic名称canal.mq.topic=demo# 设置订阅散列模式的分区数canal.mq.partitionsNum=3备注 更多对于mq的配置参数解释,能够拜访这里:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart多个 canal server 除了 ip 和 MySQL.slaveId 设置不同外,其它都应该放弃雷同的配置。3.反对带cluster模式的客户端链接,保障服务高可用客户端能够间接指定zookeeper地址、instance name,canal client 会主动从zookeeper中的running节点,获取以后canal server服务的工作节点,而后与其建设链接;其它canal server节点则做为Standby状态,如果以后active节点产生故障,能够主动实现failover切换。对canal 的高可用(HA机制)想理解更多,能够查看这篇文章。 ...

August 14, 2020 · 1 min · jiezi

关于canal:移山数据迁移平台实时数据同步服务的架构设计

移山是禧云自研的数据迁徙平台,蕴含异构数据源的迁徙、实时数据同步等服务。有趣味的能够看这里理解在移山中怎么实现异构数据源的迁徙;本文次要介绍移山实时数据同步服务产生的背景以及整体架构设计。一. 移山实时数据同步服务产生背景禧云各个子公司业务零碎根本都是以 MySQL 为主;做为数据反对部门,须要订阅这些业务数据做为数据仓库的数据源,来进行上游的数据分析。比方: 各种离线数据 T+1 报表展现;实时数据大屏展现等。微信小程序实时数据指标展现 像这种常见的实时数据指标大屏展现,背地可能就用到实时数据同步服务技术栈。 二. 移山实时数据同步服务应用canal中间件1. 应用场景合乎它能够对 MySQL 数据库增量日志解析,提供增量数据订阅和生产,完全符合咱们的应用场景。 2. 反对将订阅到的数据投递到kafkacanal 1.1.1版本之后,server端能够通过简略的配置就能将订阅到的数据投递到MQ中,目前反对的MQ有kafka、RocketMQ,代替老版本中必须通过手动编码投递的形式。 移山的实时数据同步服务应用的MQ为kafka,以下为次要配置: 批改canal.properties中配置# 这里写上以后canal server所在机器的ipcanal.ip = 10.200.*.109# register ip to zookeeper(这里写上以后canal server所在机器的ip)canal.register.ip = 10.200.*.109# 指定注册的zk集群地址canal.zkServers =10.200.*.109:2181,10.200.*.110:2181# tcp, kafka, RocketMQ(设置serverMode模式,这个配置十分要害,咱们设置为kafka)canal.serverMode = kafka# 这个demo就是conf目录里的实例canal.destinations = demo# HA模式必须应用该xml,须要将相干数据写入zookeeper,保证数据集群共享canal.instance.global.spring.xml = classpath:spring/default-instance.xml# 这里设置 kafka集群地址(其它对于mq的配置参数能够依据理论状况设置)canal.mq.servers = 10.200.*.108:9092,10.200.*.111:9092批改demo.properties中配置# canal假装的MySQL slave的编号,不能与MySQL数据库和其余的slave反复# canal.instance.MySQL.slaveId=1003# 按需批改成本人的数据库信息# position info(须要订阅的MySQL数据库地址)canal.instance.master.address=10.200.*.109:3306# 这里配置要订阅的数据库,数据库的用户名和明码canal.instance.dbUsername=canalcanal.instance.dbPassword=canalcanal.instance.defaultDatabaseName =# 设置要订阅的topic名称canal.mq.topic=demo# 设置订阅散列模式的分区数canal.mq.partitionsNum=3备注 更多对于mq的配置参数解释,能够拜访这里:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart多个 canal server 除了 ip 和 MySQL.slaveId 设置不同外,其它都应该放弃雷同的配置。3.反对带cluster模式的客户端链接,保障服务高可用客户端能够间接指定zookeeper地址、instance name,canal client 会主动从zookeeper中的running节点,获取以后canal server服务的工作节点,而后与其建设链接;其它canal server节点则做为Standby状态,如果以后active节点产生故障,能够主动实现failover切换。对canal 的高可用(HA机制)想理解更多,能够查看这篇文章。 ...

August 14, 2020 · 1 min · jiezi

关于canal:阿里canal是怎么通过zookeeper实现HA机制的

一. 阿里canal工作原理canal 是阿里的一款开源我的项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&生产,目前次要反对了MySQL(也反对mariaDB)。 MySQL主备复制原理 Master 将变更写入binlog日志;Slave 的 I/O thread 会去申请 Master 的binlog,并将失去的binlog写到本地的relay-log(中继日志)文件中;Slave 的 SQL thread 会从中继日志读取binlog,而后执行binlog日志中的内容,也就是在本人本地再次执行一遍SQL语句,从而使从服务器和主服务器的数据保持一致。更多MySQL 的 Binary Log 介绍 http://dev.mysql.com/doc/refman/5.5/en/binary-log.htmlhttp://www.taobaodba.com/html/474_mysqls-binary-log_details.htmlcanal的工作原理 canal 模仿 mysql slave 的交互协定,假装本人为 mysql slave,向 mysql master发送 dump 协定;mysql master 收到 dump 申请,开始推送binary log给 slave(也就是canal)canal 解析 binary log对象(原始为byte流)。更多对于 canal的具体介绍,能够拜访官网:https://github.com/alibaba/canal 二. 阿里canal的HA机制1. 什么是HA机制所谓HA(High Available),即高可用(7*24小时不中断服务)。2. 单点故障实现高可用最要害的策略是打消单点故障。比方Hadoop2.0之前,在HDFS集群中NameNode存在单点故障: NameNode机器发生意外,如宕机,集群将无奈应用;NameNode机器须要降级,包含软件、硬件降级,此时集群也将无奈应用。3. Hadoop2.0引入HA机制通过配置Active/Standby两个NameNodes实现在集群中对NameNode的热备来解决上述问题。 有一台节点是Active模式,也就是工作模式,其它的节点是 Standby(备用模式);干活的(Active模式的节点)如果挂了,就从备用模式的节点中选出一台顶上去。更多 对于 Hadoop 的HA 机制的具体介绍,能够拜访:https://blog.csdn.net/pengjunlee/article/details/81583052 4. zookeeper的watcher和EPHEMERAL节点zookeeper的watcherwatcher 机制波及到客户端与服务器(留神,不止一个机器,个别是集群)的两者数据通信与音讯通信: 更多对于watcher 的具体介绍,能够拜访:https://www.jianshu.com/p/4c071e963f18 zookeeper的节点类型EPHEMERAL节点是 zookeeper的长期节点,长期节点与session生命周期绑定,客户端会话生效后长期节点会主动革除。 更多对于 zookeeper EPHEMERAL节点的具体介绍,能够拜访: https://blog.csdn.net/randompeople/article/details/70500076 ...

July 20, 2020 · 2 min · jiezi

聊聊canalgo的position

序本文主要研究一下canal-go的position Positioncanal-go-v1.0.7/protocol/Position/Position.go package positiontype Position interface {}Position.go定义了一个Position接口MetaqPositioncanal-go-v1.0.7/protocol/Position/metaq_position.go package positiontype MetaqPosition struct { Topic string MsgNewId string Offset int64}MetaqPosition定义了Topic、MsgNewId、Offset属性TimePositioncanal-go-v1.0.7/protocol/Position/time_position.go package positiontype TimePosition struct { Timestamp int64}func NewTimePosition(timestamp int64) *TimePosition { tstamp := &TimePosition{Timestamp: timestamp} return tstamp}TimePosition定义了Timestamp属性,NewTimePosition方法实例化TimePositionEntryPositioncanal-go-v1.0.7/protocol/Position/entry_position.go package positionconst ( EVENTIDENTITY_SEGMENT = 3 EVENTIDENTITY_SPLIT = 5)type EntryPosition struct { TimePosition Included bool JournalName string Position int64 ServerId int64}func NewEntryPosition(journalName string, position int64, timestamp int64, serverId int64, Included bool) *EntryPosition { entryPosition := &EntryPosition{TimePosition{timestamp}, false, journalName, position, serverId} return entryPosition}EntryPosition定义了TimePosition、Included、JournalName、Position、ServerId属性;NewEntryPosition方法实例一个EntryPositionLogPositiongo-projects/canal-go-v1.0.7/protocol/Position/log_position.go ...

July 2, 2020 · 1 min · jiezi

聊聊canalgo的SimpleCanalConnector

序本文主要研究一下canal-go的SimpleCanalConnector SimpleCanalConnectorcanal-go-v1.0.7/client/simple_canal_connector.go type SimpleCanalConnector struct { Address string Port int UserName string PassWord string SoTime int32 IdleTimeOut int32 ClientIdentity pb.ClientIdentity Connected bool Running bool Filter string RollbackOnConnect bool LazyParseEntry bool}SimpleCanalConnector定义了Address、Port、UserName、PassWord、SoTime、IdleTimeOut、ClientIdentity、Connected、Running、Filter、RollbackOnConnect、LazyParseEntry属性NewSimpleCanalConnectorcanal-go-v1.0.7/client/simple_canal_connector.go //NewSimpleCanalConnector 创建SimpleCanalConnector实例func NewSimpleCanalConnector(address string, port int, username string, password string, destination string, soTimeOut int32, idleTimeOut int32) *SimpleCanalConnector { s := &SimpleCanalConnector{ Address: address, Port: port, UserName: username, PassWord: password, ClientIdentity: pb.ClientIdentity{Destination: destination, ClientId: 1001}, SoTime: soTimeOut, IdleTimeOut: idleTimeOut, RollbackOnConnect: true, } return s}NewSimpleCanalConnector方法创建了SimpleCanalConnector实例Connectcanal-go-v1.0.7/client/simple_canal_connector.go ...

July 1, 2020 · 5 min · jiezi