关于canal:Canal-adapter-同步-ElasticSearch-记录

45次阅读

共计 5564 个字符,预计需要花费 14 分钟才能阅读完成。

之前写过一篇介绍 canal 的文章《mysql 增量同步 – canal》,在数据同步的局部,次要着重演示了在代码中通过 canal.client 来同步。过后也有提到 canal adapter,但并未详述。最近愈多地接触 elasticsearch 我的项目的开发,趁着假期试着做了个 Demo,顺便记下笔记。

1. canal 介绍

1.1. 三兄弟简介

canal 对应包的下载和装置的教程,都间接看 canal 官网 github,安装包目前有三兄弟:

  • canal deployer:又称 canal server,是真正监听 mysql 日志的服务端。
  • canal adapter:顾名思义“适配器”,搭配 canal server,目前能实现 mysql 数据到 hbase、rdb、es 的增量同步,妥妥的 ETL 工具。
  • canal admin:也是为 canal server 服务的,为 canal 提供整体配置管理、节点运维等面向运维的性能,提供绝对敌对的 WebUI 操作界面。如果 canal server 要搭建集群环境,必少不了 canal admin 这样业余的运维工具。

对于不太逛 github 的人,把文档也贴上:

  • wiki 文档
  • release 下载包

1.2. canal adapter

它既然是适配器,那么就得介绍“源头”和“指标”这两个部位数据的对接:

  • 源头 :(1)canal adapter 能够直连 canal server,生产 instance 的数据;(2)也能够在让 canal server 将数据投递到 MQ,而后 cancal adapter 生产 MQ 中的数据。
  • 指标 :目前反对 hbase、rdb、es,后续将反对 mongodb、redis 等。

本文实现的较简略,数据流向包含:mysql -> canal server -> canal adapter -> es

2. 数据筹备

2.1. mysql 建表

开启 binlog 日志的局部查看之前的文章。筹备两张表:

-- 员工表
CREATE TABLE `hr_user` (`id` char(32) NOT NULL COMMENT '主键',
  `username` varchar(50) DEFAULT NULL COMMENT '账号',
  `fullname` varchar(50) DEFAULT NULL COMMENT '姓名',
  `sex` tinyint DEFAULT NULL COMMENT '性别 0- 男 /1- 女',
  `birthday` date DEFAULT NULL COMMENT '生日',
  `dept_id` char(32) DEFAULT NULL COMMENT '所属部门 ID',
  `deleted` tinyint DEFAULT NULL COMMENT '是否已删除 0- 否 /1- 是',
  `created_by` char(32) DEFAULT NULL COMMENT '创建人 ID',
  `create_time` datetime DEFAULT NULL COMMENT '创立工夫',
  `updated_by` char(32) DEFAULT NULL COMMENT '更新人 ID',
  `updated_time` datetime DEFAULT NULL COMMENT '更新工夫',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- 部门表
CREATE TABLE `hr_dept` (`id` char(32) NOT NULL COMMENT '主键',
  `dept_name` varchar(50) DEFAULT NULL COMMENT '部门名称',
  `manager_name` varchar(50) DEFAULT NULL COMMENT '部门经理姓名',
  `parent_id` char(32) DEFAULT NULL COMMENT '父级部门 ID',
  `dept_path` varchar(1000) DEFAULT NULL COMMENT '部门门路',
  `deleted` tinyint DEFAULT NULL COMMENT '是否已删除 0- 否 /1- 是',
  `created_by` char(32) DEFAULT NULL COMMENT '创建人 ID',
  `create_time` datetime DEFAULT NULL COMMENT '创立工夫',
  `updated_by` char(32) DEFAULT NULL COMMENT '更新人 ID',
  `updated_time` datetime DEFAULT NULL COMMENT '更新工夫',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2.2. 装置 es、kibana

es docker shell

docker run -d \
 --name elasticsearch \
 --restart=on-failure:3 \
 -p 9200:9200 \
 -p 9300:9300 \
 -e "discovery.type=single-node" \
 -v /Volumes/elasticsearch/data/:/usr/share/elasticsearch/data/ \
 -v /Volumes/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
 -v /Volumes/elasticsearch/plugins/:/usr/share/elasticsearch/plugins/ \
 elasticsearch:7.9.3

kibana docker shell

docker run -d \
 --name kibana \
 --link elasticsearch:es \
 -p 5601:5601 \
 -e ELASTICSEARCH_URL=es:9200 \
 kibana:7.9.3

2.3. 创立索引

kibana -> Management -> Dev Tools 执行创立索引 user:

PUT user

{
    "mappings":{
        "properties":{
            "birthday":{
                "type":"date",
                "format":"yyyy-MM-dd"
            },
            "dept_id":{"type":"keyword"},
            "dept_name":{
                "type":"text",
                "analyzer":"ik_max_word"
            },
            "dept_updated_time":{"type":"date"},
            "fullname":{
                "type":"text",
                "analyzer":"ik_max_word"
            },
            "sex":{"type":"byte"},
            "user_id":{"type":"keyword"},
            "user_updated_time":{"type":"date"},
            "username":{"type":"text"}
        }
    }
}

3. canal 配置

目前最新的 release 版本是 1.1.6-alpha-1,这里都只下载该版本的 canal-deployer、canal-adapter 两个压缩包,在本地解压下来各自对应一个目录。canal-admin 就不装置了,后面两个临时就够用了。

3.1. canal server

canal server 的装置配置其实《mysql 增量同步 – canal》文章中就有了,还是简略列一下。

因为不做 canal server 将数据投递到 MQ,所以关注 conf/example/instance.properties 的上面参数即可:

canal.instance.master.address = 127.0.0.1:3306 
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal

配置外面默认启动的是叫 example 的 instance,所以启动脚本和查看日志对应上面:

# 启动
sh bin/startup.sh
# 敞开
sh bin/stop.sh
# 查看具体实例日志
tail -500f logs/example/example.log

3.2. canal adapter

所有 adapter 的配置参考 adapter 同步 es 的 wiki

1. 批改 conf/application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/es?useUnicode=true
      username: canal
      password: canal
  canalAdapters:
  - instance: example
    groups:
    - groupId: g1
      outerAdapters:
      - name: es7
        hosts: http://127.0.0.1:9200
        properties:
          mode: rest
          cluster.name: docker-cluster

因为是直连 canal server,所以 mode: tcp,没有抉择其余 mq。其余的就是配置 canal server、mysql、es 的连贯信息。

2. 新增 conf/es7/user.yml

因为须要做 mysql 往 es 中 user 索引的同步,就在 es7 中增加一个 user.yml 文件,因为后面 conf/application.yml 中配置了适配器加载门路 es7,所以默认会加载这个目录下所有 yml 文件。

user.yml

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: user
  _id: user_id

  sql: "SELECT 
    u.id AS user_id,
    u.username,
    u.fullname,
    u.sex,
    u.birthday,
    u.dept_id,
    d.dept_name,
    u.updated_time as user_updated_time,
    d.updated_time as dept_updated_time
FROM
    hr_user u
        LEFT JOIN
    hr_dept d ON u.dept_id = d.id"etlCondition:"where u.deleted = 0 AND d.deleted = 0"
  commitBatch: 3000

还是比拟高深莫测的,配置了往 es 中 user 索引同步的数据起源 sql。不过是有肯定标准要求的,具体标准要求,还是要看后面发的官网 wiki 文档。

3. 启动

# 启动
sh bin/startup.sh
# 敞开
sh bin/stop.sh
# 查看适配器日志
tail -500f logs/adapter/adapter.log

如果能看到 canal server、canal adapter 的日志都没报错信息,那就能够了。

4. 验证

为了不便查看 es 中的数据,在 kibana 中将 user 索引增加到 Discover 中。在 Kibana -> Management -> Stack Management -> Kibana -> Index patterns -> Create index pattern,增加 user 索引。而后回到 Discover 就能看到对应索引中的数据了。

在 mysql 中对应表中各自新增一条数据:

-- hr_dept
INSERT INTO hr_dept (id,dept_name,manager_name,parent_id,dept_path,deleted,created_by,create_time,updated_by,updated_time) 
VALUES ('9ef57211ca3311ec8fe00242ac110004','中台研发部','罗永浩','66ab59dbcabf11ec8fe00242ac110004','研发核心 > 平台架构部 >TPaaS 研发部',0,NULL,now(),NULL,now());

-- hr_user
INSERT INTO hr_user (id,username,fullname,sex,birthday,dept_id,deleted,created_by,create_time,updated_by,updated_time) 
VALUES ('b7205315cac811ec8fe00242ac110004','zhangsan','张三',0,'1995-02-18','9ef57211ca3311ec8fe00242ac110004',0,NULL,now(),NULL,now());

在 kibana 中就能看到对应 es 索引中也新增了一条数据,对应的日志在 canal adapter adapter.log 日志中也能看到。

而后无论咱们独自批改 hr_user 表,还是只是批改了 hr_dept 表中的 dept_name 字段,es 中对应那条的文档也会随之批改。

正文完
 0