Maxwell(mysql-to-json)初体验
本篇次要解说一下 Maxwell,Maxwell 是一个读取 MySQL binlogs 日志而后转换成 json 输入到 Kafka,Redis,RabbitMQ 等等 中间件中
前言
以前写过一篇对于 阿里的 canal,它也是通过监听 mysql 的 binlogs 日志的工具,本公司目前就是应用这个,而我明天要说的是 maxwell 它是在外部本人转换为 json 格局 输入到 其余中间件
1. 下载和装置 Maxwell
间接下载
官网地址: http://maxwells-daemon.io/
或者 Docker 下载
我这里抉择的是应用 Docker 的形式进行装置
docker pull zendesk/maxwell
2. 配置 Mysql 开起 binlogs
配置 my.cnf
我的门路在 /etc/my.cnf
$ vi my.cnf
[mysqld]
server_id=1
log-bin=master
binlog_format=row
或者间接运行如下指令:
mysql> set global binlog_format=ROW;
mysql> set global binlog_row_image=FULL;
须要给 maxwell 用户 肯定的权限
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
# or for running maxwell locally:
mysql> CREATE USER 'maxwell'@'localhost' IDENTIFIED BY 'XXXXXX';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'localhost';
3. Maxwell 输入到 stdout 规范输入模式启
在保障 mysql 能够拜访 并且失常启动的状况下 输出以下命令:
docker run -it --rm zendesk/maxwell bin/maxwell --user=maxwell \
--password=maxwell --host=192.168.25.5 --producer=stdout
命令解析
–user = maxwell:是后面配置的 mysql 的用户
–password=maxwell : 是后面配置的 mysql 的 maxwell 的 明码
–host = 192.168.25.5 : 是本机的 mysql 地址
–producer:是指 规范的输入形式(控制台输入)
执行 sql:
insert into user(id , userName , userAge , userAddress) values (200, "johnny" , 25 , "wuxi")
能够看到 maxwell 控制台 对其进行输入了 这个插入操作
{"database":"test","table":"user","type":"insert","ts":1609312375,"xid":735,"commit":true,
"data":{"id":200,"userName":"johnny","userAge":25,"userAddress":"wuxi"}}
4.Maxwell 输入到 kafka 模式
maxwell 官网比拟举荐的 形式就是 配合 kafka 进行应用 上面先来筹备 kafka 环境
4.1 启动 zookeeper
具体环境自行筹备
4.2 启动 kafka
4.3 启动 maxwell 指定 producer = kafka
producer = kafka
docker run -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
--password='maxwell' --host='192.168.25.5' --producer=kafka \
--kafka.bootstrap.servers=192.168.25.5:9092 --kafka_topic=maxwell
参数解析
–user = maxwell:是后面配置的 mysql 的用户
–password=maxwell : 是后面配置的 mysql 的 maxwell 的 明码
–host = 192.168.25.5 : 是本机的 mysql 地址
–producer = kafka : 自定输入 json 到 kafka 中
–kafka.bootstrap.servers=192.168.25.5:9092 : 指定 kafka 的 地址
–kafka_topic=maxwell:指定 kafka topic
maxwell 启动胜利
4.4 启动 kafka consumer 来监听音讯
间接应用 kafka 自带的 consumer 工具 进行监听 topic = maxwell
./kafka-console-consumer --bootstrap-server 192.168.25.5:9092 -topic maxwell
执行 sql:
insert into user(id , userName , userAge , userAddress) values (201, "candy" , 26 , "wuxi")
能够看到 maxwell 将这个插入操作 json 发送到 kafka 中 并且被 console-consumer 进行了生产
5.Maxwell 输入到 Redis 模式
maxwell 也能输入到 redis 中,能够通过
筹备一个能够拜访的 Redis 我的是在虚拟机上的 192.168.25.101 6379
producer = redis
docker run -it --rm zendesk/maxwell bin/maxwell --user='maxwell' --password='maxwell' --host='192.168.25.5' \
--producer=redis --redis_host=47.98.250.186 --redis_port=6380 --redis_type=lpush
执行 sql:
insert into user(id , userName , userAge , userAddress) values (203, "jack" , 26 , "wuxi");
insert into user(id , userName , userAge , userAddress) values (204, "jack2" , 26 , "wuxi");
insert into user(id , userName , userAge , userAddress) values (205, "jack3" , 26 , "wuxi");
能够看到 maxwell 将这些插入操作 json 发送到 redis 中
127.0.0.1:6379> lrange maxwell 0 -1
1) "{\"database\":\"test\",\"table\":\"user\",\"type\":\"insert\",\"ts\":1609315062,\"xid\":4462,\"commit\":true,\"data\":{\"id\":205,\"userName\":\"jack3\",\"userAge\":26,\"userAddress\":\"wuxi\"}}"
2) "{\"database\":\"test\",\"table\":\"user\",\"type\":\"insert\",\"ts\":1609315062,\"xid\":4461,\"commit\":true,\"data\":{\"id\":204,\"userName\":\"jack2\",\"userAge\":26,\"userAddress\":\"wuxi\"}}"
3) "{\"database\":\"test\",\"table\":\"user\",\"type\":\"insert\",\"ts\":1609315012,\"xid\":4322,\"commit\":true,\"data\":{\"id\":203,\"userName\":\"jack\",\"userAge\":26,\"userAddress\":\"wuxi\"}}"
6. 应用 Maxwell BootStrap 初始化表
肯定有 数据割接过程,或者表的 全量逻辑需要,这时候 maxwell 提供了 bootstrap 机制,能够将整个表的数据 全副发送到 producer 中
6.1 官网相干解释
摘取官网的 参数解释
option | description |
---|---|
–log_level LOG_LEVEL | log level (DEBUG, INFO, WARN or ERROR) |
–user USER | mysql username |
–password PASSWORD | mysql password |
–host HOST | mysql host |
–port PORT | mysql port |
–database DATABASE | mysql database containing the table to bootstrap |
–table TABLE | mysql table to bootstrap |
–where WHERE_CLAUSE | where clause to restrict the rows bootstrapped from the specified table |
–client_id CLIENT_ID | specify which maxwell instance should perform the bootstrap operation |
–comment COMMENT | arbitrary comment to be added to every bootstrap row record |
摘取 官网 Starting a table bootstrap
You can start a bootstrap using:
bin/maxwell-bootstrap --database fooDB --table barTable
Optionally, you can include a where clause to replay part of the data.
bin/maxwell-bootstrap --database fooDB --table barTable --where "my_date >='2017-01-07 00:00:00'"
Alternatively you can insert a row in the maxwell.bootstrap
table to trigger a bootstrap.
mysql> insert into maxwell.bootstrap (database_name, table_name) values ('fooDB', 'barTable');
Note that if a Maxwell client_id has been set you should specify the client id.
mysql> insert into maxwell.bootstrap (database_name, table_name, client_id) values ('fooDB', 'barTable', 'custom_maxwell_client_id');
You can schedule bootstrap tasks to be run in the future by setting the started_at column. Maxwell will wait until this time to start the bootstrap.
mysql> insert into maxwell.bootstrap (database_name, table_name, client_id, started_at)
values ('fooDB', 'barTable', 'custom
6.2 演示 应用 maxwell 同步全表到 kafka 中
6.2.1 筹备 kafka 和 maxwell 和 consumer
保障 kafka 和 maxwell 曾经 连贯了 并且提供一个 kafka-console-consumer
kafka 启动
maxwell 连贯 kafka
Kafka-console-consumer
6.2.2 Docker 启动 maxwell-bootstrap 脚本
仍然应用 Docker 的形式 指定 /bin 脚本为 maxwell-bootstrap
docker run -it --rm zendesk/maxwell bin/maxwell-bootstrap --user maxwell \
--password maxwell --host=192.168.25.5 --database test --table user --client_id maxwell
–database test : 指定 database
–table user : 指定 table = user
当下面命令执行后 能够看到 kafka-console-consumer 就能收到 database = test 库 table = user 全表的数据了
在 type = bootstrap-start 和 type = bootstrap-comlete 之间的就是 全量数据, 而 bootstrap-start 和 bootstrap-comlete 两条只是作为标记记录的,data 对应是空 第一条则是 执行下面命令所插入 bootstrap 疏导表的 数据
执行 docker run -it --rm zendesk/maxwell bin/maxwell-bootstrap 会主动在 maxwell 数据库的 bootstrap 表中 增加如下记录
{"database":"maxwell","table":"bootstrap","type":"insert","ts":1609315926,"xid":6719,"commit":true,"data":{"id":8,"database_name":"test","table_name":"user","where_clause":null,"is_complete":0,"inserted_rows":0,"total_rows":35,"created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position":0,"client_id":"maxwell","comment":null}}
6.2.3 直接插入 bootstrap 表 触发
也能够通过 对 maxwell 数据的 bootstrap 表插入 也能触发,maxwell 数据库是主动创立的
insert into maxwell.bootstrap (database_name, table_name) values (‘test’, ‘address’);
6.3 bootstrap 过程中 maxwell 解体
在进行 bootstrap 过程中,如果 maxwell 解体,重启时,bootstrap 会齐全从新开始,不论之前进行到多少,若不心愿这样,能够到数据库中 maxwell 设置 is_complete
字段值为 1(示意实现),或者删除该行
7. 扩大 Maxwell 过滤器配置
Maxwell 能够通过 --filter
配置项来指定过滤规定,通过 exclude
排除,通过 include
蕴含,值能够为具体的数据库、数据表、数据列,甚至用 Javascript 来定义简单的过滤规定;能够用正则表达式形容,有几个来自官网的例子
# 仅匹配 foodb 数据库的 tbl 表和所有 table_数字的表
--filter='exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
# 排除所有库所有表,仅匹配 db1 数据库
--filter = 'exclude: *.*, include: db1.*'
# 排除含 db.tbl.col 列值为 reject 的所有更新
--filter = 'exclude: db.tbl.col = reject'
# 排除任何蕴含 col_a 列的更新
--filter = 'exclude: *.*.col_a = *'
# blacklist 黑名单,齐全排除 bad_db 数据库,若要复原,必须删除 maxwell 库
--filter = 'blacklist: bad_db.*'
总结:
本篇次要解说了 Maxwell 次要是干嘛的,并且介绍了 Maxwell 如何配合 Kafka 和 Redis 进行应用,最初还介绍了 Maxwell BootStrap 的操作形式,最初扩大了 Maxwell 的过滤器配置形式。。除了 Maxwell 还有 阿里的 Canal 你会更喜爱哪个呢,我比拟喜爱 Maxwell 不过公司 目前在用 Canal。。
集体博客网站 https://www.askajohnny.com 欢送来拜访!
本文由博客一文多发平台 OpenWrite 公布!