关于java:MySQL如何实时同步数据到ES试试这款阿里开源的神器

41次阅读

共计 12332 个字符,预计需要花费 31 分钟才能阅读完成。

mall我的项目中的商品搜寻性能,始终都没有做实时数据同步。最近发现阿里巴巴开源的 canal 能够把 MySQL 中的数据实时同步到 Elasticsearch 中,能很好地解决数据同步问题。明天咱们来讲讲 canal 的应用,心愿对大家有所帮忙!

SpringBoot 实战电商我的项目 mall(40k+star)地址:https://github.com/macrozheng/mall

canal 简介

canal 主要用途是对 MySQL 数据库增量日志进行解析,提供增量数据的订阅和生产,简略说就是能够对 MySQL 的增量数据进行实时同步,反对同步到 MySQL、Elasticsearch、HBase 等数据存储中去。

canal 工作原理

canal 会模仿 MySQL 主库和从库的交互协定,从而伪装成 MySQL 的从库,而后向 MySQL 主库发送 dump 协定,MySQL 主库收到 dump 申请会向 canal 推送 binlog,canal 通过解析 binlog 将数据同步到其余存储中去。

canal 应用

接下来咱们来学习下 canal 的应用,以 MySQL 实时同步数据到 Elasticsearch 为例。

组件下载

  • 首先咱们须要下载 canal 的各个组件canal-servercanal-adaptercanal-admin,下载地址:https://github.com/alibaba/ca…

  • canal 的各个组件的用处各不相同,上面别离介绍下:

    • canal-server(canal-deploy):能够间接监听 MySQL 的 binlog,把本人伪装成 MySQL 的从库,只负责接收数据,并不做解决。
    • canal-adapter:相当于 canal 的客户端,会从 canal-server 中获取数据,而后对数据进行同步,能够同步到 MySQL、Elasticsearch 和 HBase 等存储中去。
    • canal-admin:为 canal 提供整体配置管理、节点运维等面向运维的性能,提供绝对敌对的 WebUI 操作界面,不便更多用户疾速和平安的操作。
  • 因为不同版本的 MySQL、Elasticsearch 和 canal 会有兼容性问题,所以咱们先对其应用版本做个约定。
利用 端口 版本
MySQL 3306 5.7
Elasticsearch 9200 7.6.2
Kibanba 5601 7.6.2
canal-server 11111 1.1.15
canal-adapter 8081 1.1.15
canal-admin 8089 1.1.15

MySQL 配置

  • 因为 canal 是通过订阅 MySQL 的 binlog 来实现数据同步的,所以咱们须要开启 MySQL 的 binlog 写入性能,并设置 binlog-format 为 ROW 模式,我的配置文件为/mydata/mysql/conf/my.cnf,改为如下内容即可;
[mysqld]
## 设置 server_id,同一局域网中须要惟一
server_id=101 
## 指定不须要同步的数据库名称
binlog-ignore-db=mysql  
## 开启二进制日志性能
log-bin=mall-mysql-bin  
## 设置二进制日志应用内存大小(事务)binlog_cache_size=1M  
## 设置应用的二进制日志格局(mixed,statement,row)binlog_format=row  
## 二进制日志过期清理工夫。默认值为 0,示意不主动清理。expire_logs_days=7  
## 跳过主从复制中遇到的所有谬误或指定类型的谬误,防止 slave 端复制中断。## 如:1062 谬误是指一些主键反复,1032 谬误是因为主从数据库数据不统一
slave_skip_errors=1062  
  • 配置实现后须要重新启动 MySQL,重启胜利后通过如下命令查看 binlog 是否启用;
show variables like '%log_bin%'
+---------------------------------+-------------------------------------+
| Variable_name                   | Value                               |
+---------------------------------+-------------------------------------+
| log_bin                         | ON                                  |
| log_bin_basename                | /var/lib/mysql/mall-mysql-bin       |
| log_bin_index                   | /var/lib/mysql/mall-mysql-bin.index |
| log_bin_trust_function_creators | OFF                                 |
| log_bin_use_v1_row_events       | OFF                                 |
| sql_log_bin                     | ON                                  |
+---------------------------------+-------------------------------------+
  • 再查看下 MySQL 的 binlog 模式;
show variables like 'binlog_format%';  
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
  • 接下来须要创立一个领有从库权限的账号,用于订阅 binlog,这里创立的账号为canal:canal
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
  • 创立好测试用的数据库canal-test,之后创立一张商品表product,建表语句如下。
CREATE TABLE `product`  (`id` bigint(20) NOT NULL AUTO_INCREMENT,
  `title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `sub_title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `price` decimal(10, 2) NULL DEFAULT NULL,
  `pic` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

canal-server 应用

  • 将咱们下载好的压缩包 canal.deployer-1.1.5-SNAPSHOT.tar.gz 上传到 Linux 服务器,而后解压到指定目录/mydata/canal-server,可应用如下命令解压;
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz
  • 解压实现后目录构造如下;
├── bin
│   ├── restart.sh
│   ├── startup.bat
│   ├── startup.sh
│   └── stop.sh
├── conf
│   ├── canal_local.properties
│   ├── canal.properties
│   └── example
│       └── instance.properties
├── lib
├── logs
│   ├── canal
│   │   └── canal.log
│   └── example
│       ├── example.log
│       └── example.log
└── plugin
  • 批改配置文件conf/example/instance.properties,按如下配置即可,次要是批改数据库相干配置;
# 须要同步数据的 MySQL 地址
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# 用于同步数据的数据库账号
canal.instance.dbUsername=canal
# 用于同步数据的数据库明码
canal.instance.dbPassword=canal
# 数据库连贯编码
canal.instance.connectionCharset = UTF-8
# 须要订阅 binlog 的表过滤正则表达式
canal.instance.filter.regex=.*\\..*
  • 应用 startup.sh 脚本启动 canal-server 服务;
sh bin/startup.sh
  • 启动胜利后可应用如下命令查看服务日志信息;
tail -f logs/canal/canal.log
2020-10-26 16:18:13.354 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.0.1(172.17.0.1):11111]
2020-10-26 16:18:19.978 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
  • 启动胜利后可应用如下命令查看 instance 日志信息;
tail -f logs/example/example.log 
2020-10-26 16:18:16.056 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-10-26 16:18:16.061 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2020-10-26 16:18:18.259 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2020-10-26 16:18:18.282 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2020-10-26 16:18:18.282 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2020-10-26 16:18:19.543 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2020-10-26 16:18:19.578 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2020-10-26 16:18:19.912 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
 {"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mall-mysql-bin.000006","position":2271,"serverId":101,"timestamp":1603682664000}}
2020-10-26 16:18:22.435 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mall-mysql-bin.000006,position=2271,serverId=101,gtid=,timestamp=1603682664000] cost : 2768ms , the next step is binlog dump
  • 如果想要进行 canal-server 服务能够应用如下命令。
sh bin/stop.sh

canal-adapter 应用

  • 将咱们下载好的压缩包 canal.adapter-1.1.5-SNAPSHOT.tar.gz 上传到 Linux 服务器,而后解压到指定目录/mydata/canal-adpter,解压实现后目录构造如下;
├── bin
│   ├── adapter.pid
│   ├── restart.sh
│   ├── startup.bat
│   ├── startup.sh
│   └── stop.sh
├── conf
│   ├── application.yml
│   ├── es6
│   ├── es7
│   │   ├── biz_order.yml
│   │   ├── customer.yml
│   │   └── product.yml
│   ├── hbase
│   ├── kudu
│   ├── logback.xml
│   ├── META-INF
│   │   └── spring.factories
│   └── rdb
├── lib
├── logs
│   └── adapter
│       └── adapter.log
└── plugin
  • 批改配置文件conf/application.yml,按如下配置即可,次要是批改 canal-server 配置、数据源配置和客户端适配器配置;
canal.conf:
  mode: tcp # 客户端的模式,可选 tcp kafka rocketMQ
  flatMessage: true # 扁平 message 开关, 是否以 json 字符串模式投递数据, 仅在 kafka/rocketMQ 模式下无效
  zookeeperHosts:    # 对应集群模式下的 zk 地址
  syncBatchSize: 1000 # 每次同步的批数量
  retries: 0 # 重试次数, - 1 为有限重试
  timeout: # 同步超时工夫, 单位毫秒
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111 #设置 canal-server 的地址
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:

  srcDataSources: # 源数据库配置
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/canal_test?useUnicode=true
      username: canal
      password: canal
  canalAdapters: # 适配器列表
  - instance: example # canal 实例名或者 MQ topic 名
    groups: # 分组列表
    - groupId: g1 # 分组 id, 如果是 MQ 模式将用到该值
      outerAdapters:
      - name: logger # 日志打印适配器
      - name: es7 # ES 同步适配器
        hosts: 127.0.0.1:9200 # ES 连贯地址
        properties:
          mode: rest # 模式可选 transport(9300) 或者 rest(9200)
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch # ES 集群名称
  • 增加配置文件canal-adapter/conf/es7/product.yml,用于配置 MySQL 中的表与 Elasticsearch 中索引的映射关系;
dataSourceKey: defaultDS # 源数据源的 key, 对应下面配置的 srcDataSources 中的值
destination: example  # canal 的 instance 或者 MQ 的 topic
groupId: g1 # 对应 MQ 模式下的 groupId, 只会同步对应 groupId 的数据
esMapping:
  _index: canal_product # es 的索引名称
  _id: _id  # es 的_id, 如果不配置该项必须配置上面的 pk 项_id 则会由 es 主动调配
  sql: "SELECT
            p.id AS _id,
            p.title,
            p.sub_title,
            p.price,
            p.pic
        FROM
            product p"        # sql 映射
  etlCondition: "where a.c_time>={}"   #etl 的条件参数
  commitBatch: 3000   # 提交批大小
  • 应用 startup.sh 脚本启动 canal-adapter 服务;
sh bin/startup.sh
  • 启动胜利后可应用如下命令查看服务日志信息;
tail -f logs/adapter/adapter.log
20-10-26 16:52:55.148 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
2020-10-26 16:52:57.005 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ... 
2020-10-26 16:52:57.376 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
2020-10-26 16:52:58.615 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 succeed
2020-10-26 16:52:58.651 [main] INFO  c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /mydata/canal-adapter/plugin
2020-10-26 16:52:59.043 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-g1 succeed
2020-10-26 16:52:59.044 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2020-10-26 16:52:59.057 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2020-10-26 16:52:59.100 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2020-10-26 16:52:59.153 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2020-10-26 16:52:59.590 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
2020-10-26 16:52:59.626 [main] INFO  c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 31.278 seconds (JVM running for 33.99)
2020-10-26 16:52:59.930 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============
  • 如果须要进行 canal-adapter 服务能够应用如下命令。
sh bin/stop.sh

数据同步演示

通过下面的一系列步骤,canal 的数据同步性能曾经根本能够应用了,上面咱们来演示下数据同步性能。

  • 首先咱们须要在 Elasticsearch 中创立索引,和 MySQL 中的 product 表绝对应,间接在 Kibana 的 Dev Tools 中应用如下命令创立即可;
PUT canal_product
{
  "mappings": {
    "properties": {
      "title": {"type": "text"},
      "sub_title": {"type": "text"},
      "pic": {"type": "text"},
      "price": {"type": "double"}
    }
  }
}

  • 创立实现后能够查看下索引的构造;
GET canal_product/_mapping

  • 之后应用如下 SQL 语句在数据库中创立一条记录;
INSERT INTO product (id, title, sub_title, price, pic) VALUES (5, '小米 8', '全面屏游戏智能手机 6GB+64GB', 1999.00, NULL);
  • 创立胜利后,在 Elasticsearch 中搜寻下,发现数据曾经同步了;
GET canal_product/_search

  • 再应用如下 SQL 对数据进行批改;
UPDATE product SET title='小米 10' WHERE id=5
  • 批改胜利后,在 Elasticsearch 中搜寻下,发现数据曾经批改了;

  • 再应用如下 SQL 对数据进行删除操作;
DELETE FROM product WHERE id=5
  • 删除胜利后,在 Elasticsearch 中搜寻下,发现数据曾经删除了,至此 MySQL 同步到 Elasticsearch 的性能实现了!

canal-admin 应用

  • 将咱们下载好的压缩包 canal.admin-1.1.5-SNAPSHOT.tar.gz 上传到 Linux 服务器,而后解压到指定目录/mydata/canal-admin,解压实现后目录构造如下;
├── bin
│   ├── restart.sh
│   ├── startup.bat
│   ├── startup.sh
│   └── stop.sh
├── conf
│   ├── application.yml
│   ├── canal_manager.sql
│   ├── canal-template.properties
│   ├── instance-template.properties
│   ├── logback.xml
│   └── public
│       ├── avatar.gif
│       ├── index.html
│       ├── logo.png
│       └── static
├── lib
└── logs
  • 创立 canal-admin 须要应用的数据库canal_manager,创立 SQL 脚本为/mydata/canal-admin/conf/canal_manager.sql,会创立如下表;

  • 批改配置文件 conf/application.yml,按如下配置即可,次要是批改数据源配置和canal-admin 的治理账号配置,留神须要用一个有读写权限的数据库账号,比方治理账号root:root
server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: root
  password: root
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin
  • 接下来对之前搭建的 canal-serverconf/canal_local.properties文件进行配置,次要是批改 canal-admin 的配置,批改实现后应用 sh bin/startup.sh local 重启canal-server
# register ip
canal.register.ip =

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = 
  • 应用 startup.sh 脚本启动 canal-admin 服务;
sh bin/startup.sh
  • 启动胜利后可应用如下命令查看服务日志信息;
tail -f logs/admin.log
020-10-27 10:15:04.210 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8089"]
2020-10-27 10:15:04.308 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2020-10-27 10:15:04.534 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8089 (http) with context path ''
2020-10-27 10:15:04.573 [main] INFO  com.alibaba.otter.canal.admin.CanalAdminApplication - Started CanalAdminApplication in 31.203 seconds (JVM running for 34.865)
  • 拜访 canal-admin 的 Web 界面,输出账号密码 admin:123456 即可登录,拜访地址:http://192.168.3.101:8089

  • 登录胜利后即可应用 Web 界面操作 canal-server。

参考资料

canal 官网文档:https://github.com/alibaba/ca…

配置文件地址

https://github.com/macrozheng…

本文 GitHub https://github.com/macrozheng/mall-learning 曾经收录,欢送大家 Star!

正文完
 0