共计 18014 个字符,预计需要花费 46 分钟才能阅读完成。
下面是我在单机上面从零到一实现增量同步 mysql 数据到 elasticsearch canal adapter 方式 (binlog) 实现
实现步骤
(1)安装 mysql
(2)开启 mysql binlog row 模式,并启动 mysql
(3)安装 jdk
(4)安装 Elasticsearch 并启动(我安装的是 6.4.0, 主要目前 canal adapter1.1.3 还不支持 7.0.0 的版本)
(5)安装 kibana 并启动
(6)安装并启动 canal-server
(7)安装并启动 canal-adapter
我使用的操作系统是 centos7
1、通过 yum 安装 mysql
(1)去官网查看最新的安装包
https://dev.mysql.com/downloa…
(2)下载 mysql 源安装包
wget http://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpm
目前版本已经很高了,但是我使用的是 57
安装 mysql 源
yum -y install mysql57-community-release-el7-11.noarch.rpm
查看效果:
yum repolist enabled | grep mysql.*
(3)安装 mysql 服务器
yum install mysql-community-server
(4)启动 mysql 服务
systemctl start mysqld.service
查看 mysql 服务的状态:
systemctl status mysqld.service
(5)查看初始化密码
grep "password" /var/log/mysqld.log
登录:
mysql -u root -p
(6)数据库授权(切记这一步一定要做,我为了方便后面使用的都是 root 账号,没有说新建一个 canal 账号)
数据库没有授权,只支持 localhost 本地访问
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456' WITH GRANT OPTION;
FLUSH PRIVILEGES;
用户名:root
密码:123456
指向 ip:% 代表所有 Ip, 此处也可以输入 Ip 来指定 Ip
2、开启 mysql binlog 模式
找到 my.cnf 文件,我本地目录是 /etc/my.cnf
添加即可
log-bin=mysql-bin
binlog-format=ROW
server-id=1
然后重启 mysql,检查一下 binlog 是否正确启动
show variables like '%log_bin%';
3、安装 jdk
我装的是 jdk 版本是 1.8.0_202
下载网址:
https://www.oracle.com/techne…
(1)将 jdk-8u202-linux-x64.tar.gz 放入 /usr/local 目录
(2)解压缩等一系列处理
tar -xzvf jdk-8u202-linux-x64.tar.gz
mv jdk-8u202-linux-x64 jdk
rm -rf jdk-8u202-linux-x64.tar.gz
命令执行完成之后在 /usr/local 目录下就会生成一个 jdk 目录
(3)配置环境变量
vi /etc/profile
增加:export JAVA_HOME=/usr/local/jdk
export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
export PATH=$JAVA_HOME/bin:$PATH
(4)检查 JDK 是否安装成功
java -version
4、安装并启动 Elasticsearch
官网地址:https://www.elastic.co/downlo…
执行如下命令,对于安装包也可以手动下载之后上传
cd /usr/local
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.4.0-linux-x86_64.tar.gz
tar -xzvf elasticsearch-6.4.0-linux-x86_64.tar.gz
mv elasticsearch-6.4.0-linux-x86_64 elasticsearch
rm -rf elasticsearch-6.4.0-linux-x86_64.tar.gz
命令执行完成之后在 /usr/local 目录下就会生成一个 elasticsearch 目录
由于 elasticsearch 不能使用 root 账户启动。
下面执行如下命令:
useradd elasticsearch
chown -R elasticsearch /usr/local/elasticsearch
su elasticsearch
使用 elasticsearch 用户来启动 ES
(1)修改 linux 参数
vim /etc/security/limits.conf
增加:
* soft nofile 65536
* hard nofile 65536
* soft nproc 2048
* hard nproc 4096
#锁住 swapping 因此需要在这个配置文件下再增加两行代码
elasticsearch soft memlock unlimited
elasticsearch hard memlock unlimited
vim /etc/sysctl.conf
增加:
vm.max_map_count=655360
fs.file-max=655360
注意:之后需要执行一句命令 sysctl - p 使系统配置生效(使用 root 用户)
(2)修改 ES 配置文件(我的 IP 是 192.168.254.131,操作时换成自己的 IP 即可)
vim /usr/local/elasticsearch/config/elasticsearch.yml
# ======================== Elasticsearch Configuration =========================
#
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
# Before you set out to tweak and tune the configuration, make sure you
# understand what are you trying to accomplish and the consequences.
#
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
#
# Please consult the documentation for further information on configuration options:
# https://www.elastic.co/guide/en/elasticsearch/reference/index.html
#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
cluster.name: my-application
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: node-1
#
# Add custom attributes to the node:
#
#node.attr.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
path.data: /usr/local/elasticsearch-6.4.0/data
#
# Path to log files:
#
path.logs: /usr/local/elasticsearch-6.4.0/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
#bootstrap.memory_lock: true
#
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
network.host: 192.168.254.131
#
# Set a custom port for HTTP:
#
http.port: 9200
#
# For more information, consult the network module documentation.
#
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when new node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
discovery.zen.ping.unicast.hosts: ["192.168.254.131"]
#
# Prevent the "split brain" by configuring the majority of nodes (total number of master-eligible nodes / 2 + 1):
#
#discovery.zen.minimum_master_nodes:
#
# For more information, consult the zen discovery module documentation.
#
# ---------------------------------- Gateway -----------------------------------
#
# Block initial recovery after a full cluster restart until N nodes are started:
#
#gateway.recover_after_nodes: 3
#
# For more information, consult the gateway module documentation.
#
# ---------------------------------- Various -----------------------------------
#
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true
transport.tcp.port: 9300
transport.tcp.compress: true
http.cors.enabled: true
http.cors.allow-origin: "*"
(3)启动 elasticsearch
cd /usr/local/elasticsearch
./bin/elasticsearch -d
检查是否启动成功:
curl http://192.168.254.131:9200
5、安装并启动 kibana
官网地址:https://www.elastic.co/downlo…
执行如下命令,对于安装包也可以手动下载之后上传
cd /usr/local
wget https://artifacts.elastic.co/downloads/kibana/kibana-6.4.0-linux-x86_64.tar.gz
tar -xzvf kibana-6.4.0-linux-x86_64.tar.gz
mv kibana-6.4.0-linux-x86_64 kibana
rm -rf kibana-6.4.0-linux-x86_64.tar.gz
命令执行完成之后在 /usr/local 目录下就会生成一个 kibana 目录
修改 kibana 配置文件
vim /usr/local/kibana/config/kibana.yml
# Kibana is served by a back end server. This setting specifies the port to use.
server.port: 5601
# Specifies the address to which the Kibana server will bind. IP addresses and host names are both valid values.
# The default is 'localhost', which usually means remote machines will not be able to connect.
# To allow connections from remote users, set this parameter to a non-loopback address.
server.host: "192.168.254.131"
# Enables you to specify a path to mount Kibana at if you are running behind a proxy.
# Use the `server.rewriteBasePath` setting to tell Kibana if it should remove the basePath
# from requests it receives, and to prevent a deprecation warning at startup.
# This setting cannot end in a slash.
#server.basePath: ""
# Specifies whether Kibana should rewrite requests that are prefixed with
# `server.basePath` or require that they are rewritten by your reverse proxy.
# This setting was effectively always `false` before Kibana 6.3 and will
# default to `true` starting in Kibana 7.0.
#server.rewriteBasePath: false
# The maximum payload size in bytes for incoming server requests.
#server.maxPayloadBytes: 1048576
# The Kibana server's name. This is used for display purposes.
#server.name: "your-hostname"
# The URL of the Elasticsearch instance to use for all your queries.
elasticsearch.url: "http://192.168.254.131:9200"
# When this setting's value is true Kibana uses the hostname specified in the server.host
# setting. When the value of this setting is false, Kibana uses the hostname of the host
# that connects to this Kibana instance.
#elasticsearch.preserveHost: true
# Kibana uses an index in Elasticsearch to store saved searches, visualizations and
# dashboards. Kibana creates a new index if the index doesn't already exist.
kibana.index: ".kibana6"
# The default application to load.
#kibana.defaultAppId: "home"
# If your Elasticsearch is protected with basic authentication, these settings provide
# the username and password that the Kibana server uses to perform maintenance on the Kibana
# index at startup. Your Kibana users still need to authenticate with Elasticsearch, which
# is proxied through the Kibana server.
#elasticsearch.username: "user"
#elasticsearch.password: "pass"
# Enables SSL and paths to the PEM-format SSL certificate and SSL key files, respectively.
# These settings enable SSL for outgoing requests from the Kibana server to the browser.
#server.ssl.enabled: false
#server.ssl.certificate: /path/to/your/server.crt
#server.ssl.key: /path/to/your/server.key
# Optional settings that provide the paths to the PEM-format SSL certificate and key files.
# These files validate that your Elasticsearch backend uses the same key files.
#elasticsearch.ssl.certificate: /path/to/your/client.crt
#elasticsearch.ssl.key: /path/to/your/client.key
# Optional setting that enables you to specify a path to the PEM file for the certificate
# authority for your Elasticsearch instance.
#elasticsearch.ssl.certificateAuthorities: ["/path/to/your/CA.pem"]
# To disregard the validity of SSL certificates, change this setting's value to'none'.
#elasticsearch.ssl.verificationMode: full
# Time in milliseconds to wait for Elasticsearch to respond to pings. Defaults to the value of
# the elasticsearch.requestTimeout setting.
#elasticsearch.pingTimeout: 1500
# Time in milliseconds to wait for responses from the back end or Elasticsearch. This value
# must be a positive integer.
#elasticsearch.requestTimeout: 30000
# List of Kibana client-side headers to send to Elasticsearch. To send *no* client-side
# headers, set this value to [] (an empty list).
#elasticsearch.requestHeadersWhitelist: [authorization]
# Header names and values that are sent to Elasticsearch. Any custom headers cannot be overwritten
# by client-side headers, regardless of the elasticsearch.requestHeadersWhitelist configuration.
#elasticsearch.customHeaders: {}
# Time in milliseconds for Elasticsearch to wait for responses from shards. Set to 0 to disable.
#elasticsearch.shardTimeout: 30000
# Time in milliseconds to wait for Elasticsearch at Kibana startup before retrying.
#elasticsearch.startupTimeout: 5000
# Logs queries sent to Elasticsearch. Requires logging.verbose set to true.
#elasticsearch.logQueries: false
# Specifies the path where Kibana creates the process ID file.
#pid.file: /var/run/kibana.pid
# Enables you specify a file where Kibana stores log output.
#logging.dest: stdout
# Set the value of this setting to true to suppress all logging output.
#logging.silent: false
# Set the value of this setting to true to suppress all logging output other than error messages.
#logging.quiet: false
# Set the value of this setting to true to log all events, including system usage information
# and all requests.
#logging.verbose: false
# Set the interval in milliseconds to sample system and process performance
# metrics. Minimum is 100ms. Defaults to 5000.
#ops.interval: 5000
# The default locale. This locale can be used in certain circumstances to substitute any missing
# translations.
#i18n.defaultLocale: "en"
启动 kibana
cd /usr/local/kibana
nohup ./bin/kibana &
检查是否启动成功
在浏览器中打开 http://192.168.254.131:5601
6、安装并启动 canal-server
详情请查询官网文档:
https://github.com/alibaba/ca…
(1)下载 canal
直接下载
访问:https://github.com/alibaba/canal/releases,会列出所有历史的发布版本包 下载方式,比如以 1.0.17 版本为例子:wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
or
自己编译
git clone git@github.com:alibaba/canal.git
cd canal;
mvn clean install -Dmaven.test.skip -Denv=release
编译完成后,会在根目录下产生 target/canal.deployer-$version.tar.gz
(2)解压缩
mkdir /usr/local/canal
tar zxvf canal.deployer-$version.tar.gz -C /usr/local/canal
(3)修改配置
cd /usr/local/canal
vim conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=192.168.254.131:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
(4)启动 canal-server
cd /usr/local/canal
./bin/startup.sh
cat logs/canal/canal.log
2019-05-03 10:58:31.938 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2019-05-03 10:58:32.106 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2019-05-03 10:58:32.120 [main] INFO c.a.o.c.d.monitor.remote.RemoteConfigLoaderFactory - ## load local canal configurations
2019-05-03 10:58:32.143 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## start the canal server.
2019-05-03 10:58:32.277 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.254.131:11111]
2019-05-03 10:58:34.235 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]
2019-05-03 10:58:35.470 [main] ERROR com.alibaba.druid.pool.DruidDataSource - testWhileIdle is true, validationQuery not set
2019-05-03 10:58:36.317 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2019-05-03 10:58:36.317 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :
2019-05-03 10:58:37.106 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ......
2019-05-03 10:58:37.239 [destination = example , address = /192.168.254.131:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2019-05-03 10:58:37.241 [destination = example , address = /192.168.254.131:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position by switch ::1556597413000
2019-05-03 10:58:39.239 [destination = example , address = /192.168.254.131:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000004,position=4450,serverId=1,gtid=,timestamp=1556596874000] cost : 1915ms , the next step is binlog dump
7、安装并启动 canal-adapter
(1)下载 canal-adapter
访问:https://github.com/alibaba/canal/releases,会列出所有历史的发布版本包 下载方式,比如以 1.0.17 版本为例子:wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.adapter-1.1.3.tar.gz
(2)解压缩
mkdir /usr/local/canal-adapter
tar canal.adapter-1.1.3.tar.gz -C /usr/local/canal-adapter
(3)修改配置
cd /usr/local/canal-adapter
vim conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp # kafka rocketMQ
canalServerHost: 192.168.254.131:11111
# zookeeperHosts: slave1:2181
# mqServers: 127.0.0.1:9092 #or rocketmq
# flatMessage: true
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.254.131:3306/mytest?useUnicode=true
username: root
password: 123456
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
- name: es
hosts: 192.168.254.131:9300
properties:
cluster.name: my-application
vim conf/es/mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: mytest_user
_type: _doc
_id: _id
upsert: true
# pk: id
sql: "select a.id as _id, a.name, a.role_id, a.c_time from user a"
# objFields:
# _labels: array:;
# etlCondition: "where a.c_time>='{0}'"
commitBatch: 3000
(4)先创建 mysql 表 user 以及索引 mytest_user,否则启动 canal-adapter 会报错
create database mytest;
use mytest;
create table user (`id` int(10) NOT NULL,
`name` varchar(100) DEFAULT NULL,
`role_id` int(10) NOT NULL,
`c_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`c_utime` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
);
PUT /mytest_user
{
"mappings": {
"_doc": {
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {"type": "keyword"}
}
},
"role_id": {"type": "long"},
"c_time": {"type": "date"}
}
}
}
}
(5)启动 canal-adapter
cd /usr/local/canal-adapter
./bin/startup.sh
查看日志:
cat logs/adapter/adapter.log
(6)测试是否增量同步数据成功
没有数据更新前
GET /mytest_user/_search
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 0,
"max_score": null,
"hits": []}
}
插入一条数据:
insert user(id, name, role_id) values(7, "test", 7);
GET /mytest_user/_doc/7
{
"_index": "mytest_user",
"_type": "_doc",
"_id": "7",
"_version": 1,
"found": true,
"_source": {
"name": "test",
"role_id": 7,
"c_time": "2019-05-04T06:11:31-05:00"
}
}
更新一条数据:
update user set name = 'zhengguo' where id = 7;
GET /mytest_user/_doc/7
{
"_index": "mytest_user",
"_type": "_doc",
"_id": "7",
"_version": 2,
"found": true,
"_source": {
"name": "zhengguo",
"role_id": 7,
"c_time": "2019-05-04T06:11:31-05:00"
}
}
删除一条数据:
delete from user where id = 7;
GET /mytest_user/_doc/7
{
"_index": "mytest_user",
"_type": "_doc",
"_id": "7",
"found": false
}
可以看到操作都成功了。
遇到的一个坑
之后可能 canal 会优化掉
目前如果使用 adapter1.1.3 增量同步的话,如果 Elasticsearch 的版本是 7.X.X 的,那么在数据增量同步的时候,会报 ESSyncService – sync error, es index: mytest_user, DML : Dml{destination=’example’, database=’mytest’, table=’user’, type=’INSERT’, es=1556597413000, ts=1556597414139, sql=”, data=[{id=4, name=junge, role_id=4, c_time=2019-04-30 00:10:13.0, c_utime=2019-04-30 00:10:13.0}], old=null} ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterWorker – NoNodeAvailableException[None of the configured nodes are available: [{#transport#-1}{lTIHs6ZsTe-PqHs9CToQYQ}{192.168.254.131}{192.168.254.131:9300}]] 无法连接 ES 的错误。
也就是目前还不支持 7 版本的增量同步。更换成 6.X.X 就 OK 了。