乐趣区

关于大数据:大数据技术之Maxwell

第 1 章 Maxwell 概述

1.1 Maxwell 定义

Maxwell 是由美国 Zendesk 开源,用 Java 编写的 MySQL 实时抓取软件。实时读取

MySQL 二进制日志 Binlog,并生成 JSON 格局的音讯,作为生产者发送给 Kafka,Kinesis、

RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。

官网地址:http://maxwells-daemon.io/

1.2 Maxwell 工作原理

1.2.1 MySQL 主从复制过程

➢ Master 主库将扭转记录,写到二进制日志 (binary log) 中

➢ Slave 从库向 mysql master 发送 dump 协定,将 master 主库的 binary log events 拷贝

到它的中继日志(relay log);

➢ Slave 从库读取并重做中继日志中的事件,将扭转的数据同步到本人的数据库。

1.2.2 Maxwell 的工作原理

Maxwell 的工作原理很简略,就是把本人伪装成 MySQL 的一个 slave,而后以 slave 的身份伪装从 MySQL(master)复制数据。

1.2.3 MySQL 的 binlog

(1) 什么是 binlog

MySQL 的二进制日志能够说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除了数据查问语句)语句,以事件模式记录,还蕴含语句所执行的耗费的工夫,MySQL 的二进制日志是事务平安型的。

一般来说开启二进制日志大略会有 1% 的性能损耗。二进制有两个最重要的应用场景:

➢ 其一:MySQL Replication 在 Master 端开启 binlog,Master 把它的二进制日志传递

给 slaves 来达到 master-slave 数据统一的目标。

➢ 其二:天然就是数据恢复了,通过应用 mysqlbinlog 工具来使复原数据。

二进制日志包含两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除了数据查问语句)语句事件。

(2) binlog 的开启

➢ 找到 MySQL 配置文件的地位

➢ Linux: /etc/my.cnf

如果 /etc 目录下没有,能够通过 locate my.cnf 查找地位

➢ Windows: \my.ini

➢ 在 mysql 的配置文件下, 批改配置

在[mysqld] 区块,设置 / 增加 log-bin=mysql-bin

这个示意 binlog 日志的前缀是 mysql-bin,当前生成的日志文件就是 mysql-bin.000001 的文件前面的数字按程序生成,每次 mysql 重启或者达到单个文件大小的阈值时,新生一个文件,按程序编号。

(3) binlog 的分类设置

mysql binlog 的格局有三种,别离是 STATEMENT,MIXED,ROW。

在配置文件中能够抉择配置 binlog_format= statement|mixed|row

➢ 三种格局的区别:

◼ statement

语句级,binlog 会记录每次一执行写操作的语句。

绝对 row 模式节俭空间,然而可能产生不一致性,比方

update test set create_date=now();

如果用 binlog 日志进行复原,因为执行工夫不同可能产生的数据就不同。

长处:节俭空间

毛病:有可能造成数据不统一。

◼ row

行级,binlog 会记录每次操作后每行记录的变动。

长处:保持数据的相对一致性。因为不论 sql 是什么,援用了什么函数,他只记录

执行后的成果。

毛病:占用较大空间。

◼ mixed

混合级别,statement 的升级版,肯定水平上解决了 statement 模式因为一些状况

而造成的数据不统一问题。

默认还是 statement,在某些状况下,譬如:

当函数中蕴含 UUID() 时;

蕴含 AUTO_INCREMENT 字段的表被更新时;

执行 INSERT DELAYED 语句时;

用 UDF 时;

会依照 ROW 的形式进行解决

长处:节俭空间,同时兼顾了肯定的一致性。

毛病:还有些极个别状况依旧会造成不统一,另外 statement 和 mixed 对于须要对

binlog 监控的状况都不不便。

综合下面比照,Maxwell 想做监控剖析,抉择 row 格局比拟适合

第 2 章 Maxwell 应用

2.1 Maxwell 装置部署

2.1.1 装置地址

(1)Maxwell 官网地址:http://maxwells-daemon.io/

(2)文档查看地址:http://maxwells-daemon.io/qui…

2.1.2 装置部署

(1)软件根底,读者须要提前装置好 kafka 和 MySQL,此文档不再赘述。

(2)上传 maxwell-1.29.2.tar.gz 到 /opt/software 下

(3)解压 maxwell-1.29.2.tar.gz 的安装包到 /opt/module 下

[atguigu@hadoop102 software]$ tar -zxvf maxwell-1.29.2.tar.gz -C /opt/module/

2.1.3 MySQL 环境筹备

(1)批改 mysql 的配置文件,开启 MySQL Binlog 设置
atguigu@hadoop102 software]$ sudo vim /etc/my.cnf
在 [mysqld] 模块下增加一下内容
[mysqld]
server_id=1
log-bin=mysql-bin
binlog_format=row
#binlog-do-db=test_maxwell
并重启 Mysql 服务
[atguigu@hadoop102 software]$ sudo systemctl restart mysqld
登录 mysql 并查看是否批改实现
[atguigu@hadoop102 ~]$ mysql -uroot -p123456
mysql> show variables like '%binlog%';
查看下列属性
binlog_format | ROW 
(2)进入 /var/lib/mysql 目录,查看 MySQL 生成的 binlog 文件
[atguigu@hadoop102 ~]$ cd /var/lib/mysql
[atguigu@hadoop102 mysql]$ sudo ls -l
总用量 188500
-rw-r-----. 1 mysql mysql 154 11 月 17 16:30 mysqlbin.000001
-rw-r-----. 1 mysql mysql 19 11 月 17 16:30 mysqlbin.index

注:MySQL 生成的 binlog 文件初始大小肯定是 154 字节,而后前缀是 log-bin 参数配置的,后缀是默认从.000001,而后顺次递增。除了 binlog 文件文件以外,MySQL 还会额定生产一个.index 索引文件用来记录以后应用的 binlog 文件。

2.1.4 初始化 Maxwell 元数据库

(1)在 MySQL 中建设一个 maxwell 库用于存储 Maxwell 的元数据
[atguigu@hadoop102 module]$ mysql -uroot -p123456
mysql> CREATE DATABASE maxwell;
(2)设置 mysql 用户明码安全级别
mysql> set global validate_password_length=4;
mysql> set global validate_password_policy=0;
(3)调配一个账号能够操作该数据库
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '123456';
(4)调配这个账号能够监控其余数据库的权限
mysql> GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@'%';
(5)刷新 mysql 表权限
mysql> flush privileges;

2.1.5 Maxwell 过程启动

Maxwell 过程启动形式有如下两种:

(1)应用命令行参数启动 Maxwell 过程
[atguigu@hadoop102 maxwell-1.29.2]$ bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout

–user 连贯 mysql 的用户

–password 连贯 mysql 的用户的明码

–host mysql 装置的主机名

–producer 生产者模式(stdout:控制台 kafka:kafka 集群)

(2)批改配置文件,定制化启动 Maxwell 过程
[atguigu@hadoop102 maxwell-1.29.2]$ cp 
config.properties.example config.properties
[atguigu@hadoop102 maxwell-1.29.2]$ vim config.properties
[atguigu@hadoop102 maxwell-1.29.2]$ bin/maxwell --
config ./config.properties

2.2 Maxwell 入门案例

2.2.1 监控 Mysql 数据并在控制台打印

(1)运行 maxwell 来监控 mysql 数据更新
[atguigu@hadoop102 maxwell-1.29.2]$ bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout
(2)向 mysql 的 test_maxwell 库的 test 表插入一条数据,查看 maxwell 的控制台输入
mysql> insert into test values(1,'aaa');
{
 "database": "test_maxwell", -- 库名
 "table": "test", -- 表名
 "type": "insert", -- 数据更新类型
 "ts": 1637244821, -- 操作工夫
 "xid": 8714, -- 操作 id
 "commit": true, -- 提交胜利
 "data": { -- 数据
 "id": 1,
 "name": "aaa"
 }
}
(3)向 mysql 的 test_maxwell 库的 test 表同时插入 3 条数据,控制台呈现了 3 条 json

日志,阐明 maxwell 是以数据行为单位进行日志的采集的。

mysql> INSERT INTO test VALUES(2,'bbb'),(3,'ccc'),(4,'ddd');
{"database":"test_maxwell","table":"test","type":"insert","ts":1637245127,"xid":9129,"xoffset":0,"data":{"id":2,"name":"bbb"}}
{"database":"test_maxwell","table":"test","type":"insert","ts":1637245127,"xid":9129,"xoffset":1,"data":{"id":3,"name":"ccc"}}
{"database":"test_maxwell","table":"test","type":"insert","ts":1637245127,"xid":9129,"commit":true,"data":{"id":4,"name":"ddd"}}

mysql> update test set name='zaijian' where id =1;
{"database":"test_maxwell","table":"test","type":"update","ts"
:1631618614,"xid":535,"commit":true,"data":{"id":1,"name":"zai
jian"},"old":{"name":"nihao"}}
(4)批改 test_maxwell 库的 test 表的一条数据,查看 maxwell 的控制台输入
mysql> update test set name='abc' where id =1;
{
 "database": "test_maxwell",
 "table": "test",
 "type": "update",
 "ts": 1637245338,
 "xid": 9418,
 "commit": true,
 "data": { -- 批改后的数据
 "id": 1,
 "name": "abc"
 },
 "old": { -- 批改前的数据
 "name": "aaa"
 }
}
(5)删除 test_maxwell 库的 test 表的一条数据,查看 maxwell 的控制台输入
mysql> DELETE FROM test WHERE id =1;

{
 "database": "test_maxwell",
 "table": "test",
 "type": "delete",
 "ts": 1637245630,
 "xid": 9816,
 "commit": true,
 "data": {
 "id": 1,
 "name": "abc"
 } 
 }

2.2.2 监控 Mysql 数据输入到 kafka

1)实现步骤:
(1)启动 zookeeper 和 kafka
[atguigu@hadoop102 bin]$ jpsall
=============== hadoop102 ===============
3511 QuorumPeerMain
4127 Kafka
=============== hadoop103 ===============
1885 Kafka
1342 QuorumPeerMain
=============== hadoop104 ===============
1345 QuorumPeerMain
1886 Kafka
(2)启动 Maxwell 监控 binlog
atguigu@hadoop102 maxwell-1.29.2]$ bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=kafka --kafka.bootstrap.servers=hadoop102:9092 --kafka_topic=maxwell
(3)关上 kafka 的控制台的消费者生产 maxwell 主题
[atguigu@hadoop102 ~]$ kafka-console-consumer.sh --bootstrapserver hadoop102:9092 --topic maxwell
(4)向 test_maxwell 库的 test 表再次插入一条数据
mysql> insert into test values (5,'eee');
(5)通过 kafka 消费者来查看到了数据,阐明数据胜利传入 kafka
{"database":"test_maxwell","table":"test","type":"insert","ts":1637245889,"xid":10155,"commit":true,"data":{"id":5,"name":"eee"}}
2)kafka 主题数据的分区管制 在公司生产环境中,

咱们个别都会用 maxwell 监控多个 mysql 库的数据,而后将这些数据发往 kafka 的一个主题 Topic,并且这个主题也必定是多分区的,为了进步并发度。那么如何管制这些数据的分区问题,就变得至关重要,实现步骤如下:

(1)批改 maxwell 的配置文件,定制化启动 maxwell 过程
[atguigu@hadoop102 maxwell-1.29.2]$ vim config.properties
# tl;dr config
log_level=info
producer=kafka
kafka.bootstrap.servers=hadoop102:9092
# mysql login info
host=hadoop102
user=maxwell
password=123456
# *** kafka ***
# list of kafka brokers
#kafka.bootstrap.servers=hosta:9092,hostb:9092
# kafka topic to write to
# this can be static, e.g. 'maxwell', or dynamic, e.g. 
namespace_%{database}_%{table}
# in the latter case 'database' and 'table' will be replaced 
with the values for the row being processed
kafka_topic=maxwell3
# *** partitioning ***
# What part of the data do we partition by?
#producer_partition_by=database # [database, table, 
primary_key, transaction_id, column]
producer_partition_by=database
控制数据分区模式,可选模式有 库名,表名,主键,列名
# specify what fields to partition by when using 
producer_partition_by=column
# column separated list.
#producer_partition_columns=name
# when using producer_partition_by=column, partition by this 
when
# the specified column(s) don't exist.
#producer_partition_by_fallback=database
(2)手动创立一个 3 个分区的 topic,名字就叫做 maxwell3
[atguigu@hadoop102 maxwell-1.29.2]$ kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --create --
replication-factor 2 --partitions 3 --topic maxwell3
(3)利用配置文件启动 Maxwell 过程
[atguigu@hadoop102 maxwell-1.29.2]$ bin/maxwell --

config ./config.properties
(4)向 test_maxwell 库的 test 表再次插入一条数据
mysql> insert into test_maxwell.test values (6,'fff');
(5)通过 kafka tool 工具查看,此条数据进入了 maxwell3 主题的 1 号分区
(6)向 test 库的 aaa 表插入一条数据
mysql> insert into test_maxwell2.test values (23,'dd');
(7)通过 kafka tool 工具查看,此条数据进入了 maxwell3 主题的 0 号分区, 阐明库名

会对数据进入的分区造成影响。

2.2.3 监控 Mysql 指定表数据输入控制台

(1)运行 maxwell 来监控 mysql 指定表数据更新

[atguigu@hadoop102 maxwell-1.29.2]$ bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --filter 'exclude: *.*, include:test_maxwell.test' --producer=stdout

(2)向 test_maxwell.test 表插入一条数据,查看 maxwell 的监控

mysql> insert into test_maxwell.test values(7,'ggg');{"database":"test_maxwell","table":"test","type":"insert","ts":1637247760,"xid":11818,"commit":true,"data":{"id":7,"name":"ggg"}}

(3)向 test_maxwell.test2 表插入一条数据,查看 maxwell 的监控

mysql> insert into test1 values(1,'nihao');

本次没有收到任何信息

阐明 include 参数失效,只能监控指定的 mysql 表的信息

注:还能够设置 include:test_maxwell.*,通过此种形式来监控 mysql 某个库的所有表,也就是说过滤整个库。读者能够自行测试。

2.2.4 监控 Mysql 指定表全量数据输入控制台,数据初始化

Maxwell 过程默认只能监控 mysql 的 binlog 日志的新增及变动的数据,然而 Maxwell 是反对数据初始化的,能够通过批改 Maxwell 的元数据,来对 MySQL 的某张表进行数据初始化,也就是咱们常说的全量同步。具体操作步骤如下:

需要:将 test_maxwell 库下的 test2 表的四条数据,全量导入到 maxwell 控制台

进行打印。

(1)批改 Maxwell 的元数据,触发数据初始化机制,在 mysql 的 maxwell 库中 bootstrap

表中插入一条数据,写明须要全量数据的库名和表名。

mysql> insert into maxwell.bootstrap(database_name,table_name) values('test_maxwell','test2');
(2)启动 maxwell 过程,此时初始化程序会间接打印 test2 表的所有数据
[atguigu@hadoop102 maxwell-1.29.2]$ bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout
Using kafka version: 1.0.0
23:15:38,841 WARN MaxwellMetrics - Metrics will not be 
exposed: metricsReportingType not configured.
23:15:39,110 INFO Maxwell - Maxwell v1.22.0 is booting 
(StdoutProducer), starting at Position[BinlogPosition[mysqlbin.000004:611096], lastHeartbeat=1637248429242]
23:15:39,194 INFO MysqlSavedSchema - Restoring schema id 6 
(last modified at Position[BinlogPosition[mysqlbin.000004:517625], lastHeartbeat=1637246435111])
23:15:39,299 INFO MysqlSavedSchema - Restoring schema id 1 
(last modified at Position[BinlogPosition[mysqlbin.000004:158612], lastHeartbeat=0])
23:15:39,342 INFO MysqlSavedSchema - beginning to play 
deltas...
23:15:39,343 INFO MysqlSavedSchema - played 5 deltas in 1ms
{"database":"test_maxwell","table":"test2","type":"bootstrapstart","ts":1637248539,"data":{}}
23:15:39,367 INFO SynchronousBootstrapper - bootstrapping 
started for test_maxwell.test2
23:15:39,369 INFO BinlogConnectorReplicator - Setting initial 
binlog pos to: mysql-bin.000004:611096
{"database":"test_maxwell","table":"test2","type":"bootstrapinsert","ts":1637248539,"data":{"id":1,"name":"aa"}}
{"database":"test_maxwell","table":"test2","type":"bootstrapinsert","ts":1637248539,"data":{"id":2,"name":"bb"}}
{"database":"test_maxwell","table":"test2","type":"bootstrapinsert","ts":1637248539,"data":{"id":3,"name":"cc"}}
{"database":"test_maxwell","table":"test2","type":"bootstrapinsert","ts":1637248539,"data":{"id":4,"name":"dd"}}
{"database":"test_maxwell","table":"test2","type":"bootstrapcomplete","ts":1637248539,"data":{}}
23:15:39,387 INFO SynchronousBootstrapper - bootstrapping 
ended for #8 test_maxwell.test2
23:15:39,465 INFO BinaryLogClient - Connected to 
hadoop102:3306 at mysql-bin.000004/611096 (sid:6379, cid:108)
23:15:39,465 INFO BinlogConnectorLifecycleListener - Binlog 
connected
(3)当数据全副初始化实现当前,Maxwell 的元数据会变动

is_complete 字段从 0 变为 1

start_at 字段从 null 变为具体工夫(数据同步开始工夫)

complete_at 字段从 null 变为具体工夫(数据同步完结工夫)

退出移动版