关于java:Maxwell-mysqltojson初体验

Maxwell (mysql-to-json)初体验

本篇次要解说一下 Maxwell , Maxwell 是一个读取 MySQL binlogs 日志而后转换成json 输入到 Kafka ,Redis ,RabbitMQ 等等 中间件中

前言

以前写过一篇对于 阿里的 canal ,它也是通过监听 mysql 的 binlogs 日志的工具,本公司目前就是应用这个,而我明天要说的是 maxwell 它是在外部本人转换为 json 格局 输入到 其余中间件

1.下载和装置Maxwell

间接下载

官网地址: http://maxwells-daemon.io/

或者 Docker 下载

我这里抉择的是应用Docker的形式进行装置

 docker pull zendesk/maxwell

2.配置 Mysql 开起binlogs

配置 my.cnf

我的门路在 /etc/my.cnf

$ vi my.cnf

[mysqld]
server_id=1
log-bin=master   
binlog_format=row

或者间接运行如下指令:

mysql> set global binlog_format=ROW;
mysql> set global binlog_row_image=FULL;

须要给 maxwell 用户 肯定的权限

mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

# or for running maxwell locally:

mysql> CREATE USER 'maxwell'@'localhost' IDENTIFIED BY 'XXXXXX';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'localhost';

3. Maxwell 输入到 stdout 规范输入模式启

在保障 mysql 能够拜访 并且失常启动的状况下 输出以下命令:

docker run -it --rm zendesk/maxwell bin/maxwell --user=maxwell \
    --password=maxwell --host=192.168.25.5 --producer=stdout

命令解析

–user = maxwell : 是后面配置的 mysql 的用户

–password=maxwell : 是后面配置的mysql 的 maxwell的 明码

–host = 192.168.25.5 : 是本机的 mysql 地址

–producer : 是指 规范的输入形式(控制台输入)

执行sql :

 insert into user(id , userName , userAge , userAddress) values (200, "johnny" , 25 , "wuxi") 

能够看到 maxwell 控制台 对其进行输入了 这个插入操作

{"database":"test","table":"user","type":"insert","ts":1609312375,"xid":735,"commit":true,
 "data":{"id":200,"userName":"johnny","userAge":25,"userAddress":"wuxi"}}

4.Maxwell 输入到 kafka 模式

maxwell 官网比拟举荐的 形式就是 配合 kafka 进行应用 上面先来筹备 kafka 环境

4.1 启动 zookeeper

具体环境自行筹备

4.2 启动kafka

4.3 启动maxwell 指定producer = kafka

producer = kafka

    docker run -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
    --password='maxwell' --host='192.168.25.5' --producer=kafka \
    --kafka.bootstrap.servers=192.168.25.5:9092 --kafka_topic=maxwell

参数解析

–user = maxwell : 是后面配置的 mysql 的用户

–password=maxwell : 是后面配置的mysql 的 maxwell的 明码

–host = 192.168.25.5 : 是本机的 mysql 地址

–producer = kafka : 自定输入json到 kafka中

–kafka.bootstrap.servers=192.168.25.5:9092 : 指定kafka 的 地址

–kafka_topic=maxwell : 指定 kafka topic

maxwell启动胜利

4.4 启动kafka consumer 来监听音讯

间接应用 kafka 自带的 consumer工具 进行监听 topic = maxwell

./kafka-console-consumer --bootstrap-server 192.168.25.5:9092 -topic maxwell

执行sql :

 insert into user(id , userName , userAge , userAddress) values (201, "candy" , 26 , "wuxi") 

能够看到 maxwell 将这个插入操作json发送到kafka中 并且被 console-consumer 进行了生产

5.Maxwell 输入到 Redis 模式

maxwell 也能输入到 redis中, 能够通过

筹备一个能够拜访的Redis 我的是在虚拟机上的 192.168.25.101 6379

producer = redis

docker run -it --rm zendesk/maxwell bin/maxwell --user='maxwell' --password='maxwell' --host='192.168.25.5' \
--producer=redis --redis_host=47.98.250.186 --redis_port=6380     --redis_type=lpush

执行sql :

insert into user(id , userName , userAge , userAddress) values (203, "jack" , 26 , "wuxi");
insert into user(id , userName , userAge , userAddress) values (204, "jack2" , 26 , "wuxi");
insert into user(id , userName , userAge , userAddress) values (205, "jack3" , 26 , "wuxi"); 

能够看到 maxwell 将这些插入操作json发送到redis中

127.0.0.1:6379> lrange maxwell 0 -1
1) "{\"database\":\"test\",\"table\":\"user\",\"type\":\"insert\",\"ts\":1609315062,\"xid\":4462,\"commit\":true,\"data\":{\"id\":205,\"userName\":\"jack3\",\"userAge\":26,\"userAddress\":\"wuxi\"}}"
2) "{\"database\":\"test\",\"table\":\"user\",\"type\":\"insert\",\"ts\":1609315062,\"xid\":4461,\"commit\":true,\"data\":{\"id\":204,\"userName\":\"jack2\",\"userAge\":26,\"userAddress\":\"wuxi\"}}"
3) "{\"database\":\"test\",\"table\":\"user\",\"type\":\"insert\",\"ts\":1609315012,\"xid\":4322,\"commit\":true,\"data\":{\"id\":203,\"userName\":\"jack\",\"userAge\":26,\"userAddress\":\"wuxi\"}}"

6.应用 Maxwell BootStrap 初始化表

肯定有 数据割接过程,或者表的 全量逻辑需要,这时候 maxwell 提供了 bootstrap 机制,能够将整个表的数据 全副发送到 producer 中

6.1 官网相干解释

摘取官网的 参数解释

option description
–log_level LOG_LEVEL log level (DEBUG, INFO, WARN or ERROR)
–user USER mysql username
–password PASSWORD mysql password
–host HOST mysql host
–port PORT mysql port
–database DATABASE mysql database containing the table to bootstrap
–table TABLE mysql table to bootstrap
–where WHERE_CLAUSE where clause to restrict the rows bootstrapped from the specified table
–client_id CLIENT_ID specify which maxwell instance should perform the bootstrap operation
–comment COMMENT arbitrary comment to be added to every bootstrap row record

摘取 官网 Starting a table bootstrap


You can start a bootstrap using:

bin/maxwell-bootstrap --database fooDB --table barTable

Optionally, you can include a where clause to replay part of the data.

bin/maxwell-bootstrap --database fooDB --table barTable --where "my_date >= '2017-01-07 00:00:00'"

Alternatively you can insert a row in the maxwell.bootstrap table to trigger a bootstrap.

mysql> insert into maxwell.bootstrap (database_name, table_name) values ('fooDB', 'barTable');

Note that if a Maxwell client_id has been set you should specify the client id.

mysql> insert into maxwell.bootstrap (database_name, table_name, client_id) values ('fooDB', 'barTable', 'custom_maxwell_client_id');

You can schedule bootstrap tasks to be run in the future by setting the started_at column. Maxwell will wait until this time to start the bootstrap.

mysql> insert into maxwell.bootstrap (database_name, table_name, client_id, started_at) 
values ('fooDB', 'barTable', 'custom
6.2 演示 应用maxwell 同步全表到kafka 中
6.2.1 筹备 kafka 和 maxwell 和 consumer

保障 kafka 和 maxwell 曾经 连贯了 并且提供一个 kafka-console-consumer

kafka 启动

maxwell连贯kafka

Kafka-console-consumer

6.2.2 Docker 启动 maxwell-bootstrap 脚本

仍然应用Docker的形式 指定 /bin脚本为maxwell-bootstrap

docker run -it --rm zendesk/maxwell bin/maxwell-bootstrap --user maxwell  \
    --password maxwell --host=192.168.25.5  --database test --table user --client_id maxwell

–database test :指定database

–table user : 指定table = user

当下面命令执行后 能够看到 kafka-console-consumer 就能收到 database = test 库 table = user全表的数据了

在 type = bootstrap-start 和 type = bootstrap-comlete 之间的就是 全量数据,而bootstrap-start和bootstrap-comlete 两条只是作为标记记录的,data对应是空 第一条则是 执行下面命令所插入bootstrap疏导表的 数据

执行 docker run -it --rm zendesk/maxwell bin/maxwell-bootstrap 会主动在 maxwell 数据库的 bootstrap 表中 增加如下记录

{"database":"maxwell","table":"bootstrap","type":"insert","ts":1609315926,"xid":6719,"commit":true,"data":{"id":8,"database_name":"test","table_name":"user","where_clause":null,"is_complete":0,"inserted_rows":0,"total_rows":35,"created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position":0,"client_id":"maxwell","comment":null}}

6.2.3 直接插入 bootstrap表 触发

也能够通过 对maxwell 数据的 bootstrap表插入 也能触发 , maxwell 数据库是主动创立的

insert into maxwell.bootstrap (database_name, table_name) values (‘test’, ‘address’);

6.3 bootstrap过程中 maxwell解体

在进行bootstrap过程中,如果maxwell解体,重启时,bootstrap会齐全从新开始,不论之前进行到多少,若不心愿这样,能够到数据库中 maxwell 设置 is_complete 字段值为1(示意实现),或者删除该行

7. 扩大 Maxwell 过滤器配置

Maxwell 能够通过 --filter 配置项来指定过滤规定,通过 exclude 排除,通过 include 蕴含,值能够为具体的数据库、数据表、数据列,甚至用 Javascript 来定义简单的过滤规定;能够用正则表达式形容,有几个来自官网的例子

# 仅匹配foodb数据库的tbl表和所有table_数字的表
--filter='exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
# 排除所有库所有表,仅匹配db1数据库
--filter = 'exclude: *.*, include: db1.*'
# 排除含db.tbl.col列值为reject的所有更新
--filter = 'exclude: db.tbl.col = reject'
# 排除任何蕴含col_a列的更新
--filter = 'exclude: *.*.col_a = *'
# blacklist 黑名单,齐全排除bad_db数据库,若要复原,必须删除maxwell库
--filter = 'blacklist: bad_db.*' 

总结:

本篇次要解说了 Maxwell 次要是干嘛的,并且介绍了 Maxwell 如何配合 Kafka 和 Redis 进行应用,最初还介绍了 Maxwell BootStrap 的操作形式,最初扩大了 Maxwell 的过滤器配置形式 。。 除了 Maxwell 还有 阿里的 Canal 你会更喜爱哪个呢 ,我比拟喜爱 Maxwell 不过公司 目前在用 Canal 。。

集体博客网站 https://www.askajohnny.com 欢送来拜访!

本文由博客一文多发平台 OpenWrite 公布!

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理