关于java:Maxwell-mysqltojson初体验

39次阅读

共计 6913 个字符,预计需要花费 18 分钟才能阅读完成。

Maxwell(mysql-to-json)初体验

本篇次要解说一下 Maxwell,Maxwell 是一个读取 MySQL binlogs 日志而后转换成 json 输入到 Kafka,Redis,RabbitMQ 等等 中间件中

前言

以前写过一篇对于 阿里的 canal,它也是通过监听 mysql 的 binlogs 日志的工具,本公司目前就是应用这个,而我明天要说的是 maxwell 它是在外部本人转换为 json 格局 输入到 其余中间件

1. 下载和装置 Maxwell

间接下载

官网地址: http://maxwells-daemon.io/

或者 Docker 下载

我这里抉择的是应用 Docker 的形式进行装置

 docker pull zendesk/maxwell

2. 配置 Mysql 开起 binlogs

配置 my.cnf

我的门路在 /etc/my.cnf

$ vi my.cnf

[mysqld]
server_id=1
log-bin=master   
binlog_format=row

或者间接运行如下指令:

mysql> set global binlog_format=ROW;
mysql> set global binlog_row_image=FULL;

须要给 maxwell 用户 肯定的权限

mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

# or for running maxwell locally:

mysql> CREATE USER 'maxwell'@'localhost' IDENTIFIED BY 'XXXXXX';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'localhost';

3. Maxwell 输入到 stdout 规范输入模式启

在保障 mysql 能够拜访 并且失常启动的状况下 输出以下命令:

docker run -it --rm zendesk/maxwell bin/maxwell --user=maxwell \
    --password=maxwell --host=192.168.25.5 --producer=stdout

命令解析

–user = maxwell:是后面配置的 mysql 的用户

–password=maxwell : 是后面配置的 mysql 的 maxwell 的 明码

–host = 192.168.25.5 : 是本机的 mysql 地址

–producer:是指 规范的输入形式(控制台输入)

执行 sql:

 insert into user(id , userName , userAge , userAddress) values (200, "johnny" , 25 , "wuxi") 

能够看到 maxwell 控制台 对其进行输入了 这个插入操作

{"database":"test","table":"user","type":"insert","ts":1609312375,"xid":735,"commit":true,
 "data":{"id":200,"userName":"johnny","userAge":25,"userAddress":"wuxi"}}

4.Maxwell 输入到 kafka 模式

maxwell 官网比拟举荐的 形式就是 配合 kafka 进行应用 上面先来筹备 kafka 环境

4.1 启动 zookeeper

具体环境自行筹备

4.2 启动 kafka

4.3 启动 maxwell 指定 producer = kafka

producer = kafka

    docker run -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
    --password='maxwell' --host='192.168.25.5' --producer=kafka \
    --kafka.bootstrap.servers=192.168.25.5:9092 --kafka_topic=maxwell

参数解析

–user = maxwell:是后面配置的 mysql 的用户

–password=maxwell : 是后面配置的 mysql 的 maxwell 的 明码

–host = 192.168.25.5 : 是本机的 mysql 地址

–producer = kafka : 自定输入 json 到 kafka 中

–kafka.bootstrap.servers=192.168.25.5:9092 : 指定 kafka 的 地址

–kafka_topic=maxwell:指定 kafka topic

maxwell 启动胜利

4.4 启动 kafka consumer 来监听音讯

间接应用 kafka 自带的 consumer 工具 进行监听 topic = maxwell

./kafka-console-consumer --bootstrap-server 192.168.25.5:9092 -topic maxwell

执行 sql:

 insert into user(id , userName , userAge , userAddress) values (201, "candy" , 26 , "wuxi") 

能够看到 maxwell 将这个插入操作 json 发送到 kafka 中 并且被 console-consumer 进行了生产

5.Maxwell 输入到 Redis 模式

maxwell 也能输入到 redis 中,能够通过

筹备一个能够拜访的 Redis 我的是在虚拟机上的 192.168.25.101 6379

producer = redis

docker run -it --rm zendesk/maxwell bin/maxwell --user='maxwell' --password='maxwell' --host='192.168.25.5' \
--producer=redis --redis_host=47.98.250.186 --redis_port=6380     --redis_type=lpush

执行 sql:

insert into user(id , userName , userAge , userAddress) values (203, "jack" , 26 , "wuxi");
insert into user(id , userName , userAge , userAddress) values (204, "jack2" , 26 , "wuxi");
insert into user(id , userName , userAge , userAddress) values (205, "jack3" , 26 , "wuxi"); 

能够看到 maxwell 将这些插入操作 json 发送到 redis 中

127.0.0.1:6379> lrange maxwell 0 -1
1) "{\"database\":\"test\",\"table\":\"user\",\"type\":\"insert\",\"ts\":1609315062,\"xid\":4462,\"commit\":true,\"data\":{\"id\":205,\"userName\":\"jack3\",\"userAge\":26,\"userAddress\":\"wuxi\"}}"
2) "{\"database\":\"test\",\"table\":\"user\",\"type\":\"insert\",\"ts\":1609315062,\"xid\":4461,\"commit\":true,\"data\":{\"id\":204,\"userName\":\"jack2\",\"userAge\":26,\"userAddress\":\"wuxi\"}}"
3) "{\"database\":\"test\",\"table\":\"user\",\"type\":\"insert\",\"ts\":1609315012,\"xid\":4322,\"commit\":true,\"data\":{\"id\":203,\"userName\":\"jack\",\"userAge\":26,\"userAddress\":\"wuxi\"}}"

6. 应用 Maxwell BootStrap 初始化表

肯定有 数据割接过程,或者表的 全量逻辑需要,这时候 maxwell 提供了 bootstrap 机制,能够将整个表的数据 全副发送到 producer 中

6.1 官网相干解释

摘取官网的 参数解释

option description
–log_level LOG_LEVEL log level (DEBUG, INFO, WARN or ERROR)
–user USER mysql username
–password PASSWORD mysql password
–host HOST mysql host
–port PORT mysql port
–database DATABASE mysql database containing the table to bootstrap
–table TABLE mysql table to bootstrap
–where WHERE_CLAUSE where clause to restrict the rows bootstrapped from the specified table
–client_id CLIENT_ID specify which maxwell instance should perform the bootstrap operation
–comment COMMENT arbitrary comment to be added to every bootstrap row record

摘取 官网 Starting a table bootstrap


You can start a bootstrap using:

bin/maxwell-bootstrap --database fooDB --table barTable

Optionally, you can include a where clause to replay part of the data.

bin/maxwell-bootstrap --database fooDB --table barTable --where "my_date >='2017-01-07 00:00:00'"

Alternatively you can insert a row in the maxwell.bootstrap table to trigger a bootstrap.

mysql> insert into maxwell.bootstrap (database_name, table_name) values ('fooDB', 'barTable');

Note that if a Maxwell client_id has been set you should specify the client id.

mysql> insert into maxwell.bootstrap (database_name, table_name, client_id) values ('fooDB', 'barTable', 'custom_maxwell_client_id');

You can schedule bootstrap tasks to be run in the future by setting the started_at column. Maxwell will wait until this time to start the bootstrap.

mysql> insert into maxwell.bootstrap (database_name, table_name, client_id, started_at) 
values ('fooDB', 'barTable', 'custom
6.2 演示 应用 maxwell 同步全表到 kafka 中
6.2.1 筹备 kafka 和 maxwell 和 consumer

保障 kafka 和 maxwell 曾经 连贯了 并且提供一个 kafka-console-consumer

kafka 启动

maxwell 连贯 kafka

Kafka-console-consumer

6.2.2 Docker 启动 maxwell-bootstrap 脚本

仍然应用 Docker 的形式 指定 /bin 脚本为 maxwell-bootstrap

docker run -it --rm zendesk/maxwell bin/maxwell-bootstrap --user maxwell  \
    --password maxwell --host=192.168.25.5  --database test --table user --client_id maxwell

–database test : 指定 database

–table user : 指定 table = user

当下面命令执行后 能够看到 kafka-console-consumer 就能收到 database = test 库 table = user 全表的数据了

在 type = bootstrap-start 和 type = bootstrap-comlete 之间的就是 全量数据, 而 bootstrap-start 和 bootstrap-comlete 两条只是作为标记记录的,data 对应是空 第一条则是 执行下面命令所插入 bootstrap 疏导表的 数据

执行 docker run -it --rm zendesk/maxwell bin/maxwell-bootstrap 会主动在 maxwell 数据库的 bootstrap 表中 增加如下记录

{"database":"maxwell","table":"bootstrap","type":"insert","ts":1609315926,"xid":6719,"commit":true,"data":{"id":8,"database_name":"test","table_name":"user","where_clause":null,"is_complete":0,"inserted_rows":0,"total_rows":35,"created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position":0,"client_id":"maxwell","comment":null}}

6.2.3 直接插入 bootstrap 表 触发

也能够通过 对 maxwell 数据的 bootstrap 表插入 也能触发,maxwell 数据库是主动创立的

insert into maxwell.bootstrap (database_name, table_name) values (‘test’, ‘address’);

6.3 bootstrap 过程中 maxwell 解体

在进行 bootstrap 过程中,如果 maxwell 解体,重启时,bootstrap 会齐全从新开始,不论之前进行到多少,若不心愿这样,能够到数据库中 maxwell 设置 is_complete 字段值为 1(示意实现),或者删除该行

7. 扩大 Maxwell 过滤器配置

Maxwell 能够通过 --filter 配置项来指定过滤规定,通过 exclude 排除,通过 include 蕴含,值能够为具体的数据库、数据表、数据列,甚至用 Javascript 来定义简单的过滤规定;能够用正则表达式形容,有几个来自官网的例子

# 仅匹配 foodb 数据库的 tbl 表和所有 table_数字的表
--filter='exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
# 排除所有库所有表,仅匹配 db1 数据库
--filter = 'exclude: *.*, include: db1.*'
# 排除含 db.tbl.col 列值为 reject 的所有更新
--filter = 'exclude: db.tbl.col = reject'
# 排除任何蕴含 col_a 列的更新
--filter = 'exclude: *.*.col_a = *'
# blacklist 黑名单,齐全排除 bad_db 数据库,若要复原,必须删除 maxwell 库
--filter = 'blacklist: bad_db.*' 

总结:

本篇次要解说了 Maxwell 次要是干嘛的,并且介绍了 Maxwell 如何配合 Kafka 和 Redis 进行应用,最初还介绍了 Maxwell BootStrap 的操作形式,最初扩大了 Maxwell 的过滤器配置形式。。除了 Maxwell 还有 阿里的 Canal 你会更喜爱哪个呢,我比拟喜爱 Maxwell 不过公司 目前在用 Canal。。

集体博客网站 https://www.askajohnny.com 欢送来拜访!

本文由博客一文多发平台 OpenWrite 公布!

正文完
 0