MariaDB ColumnStore 专为大数据扩大而设计,可解决 PB 级数据,线性可伸缩性和杰出的性能,并能对剖析查问进行实时响应。它利用列式存储,压缩,即时投影以及程度和垂直分区的 I / O 劣势在剖析大型数据集时提供了杰出的性能。(这是官网的介绍,https://mariadb.com/kb/en/mariadb-columnstore/)
自己自研发了一个 mysql -> mariadb columnStore 同步工具,同时反对全量同步和增量同步模式,github 地址:https://github.com/yutianyong125/mcs_etl
接下来演示一下同步过程
下载该我的项目
git clone https://github.com/yutianyong125/mcs_etl.git
cd mcs_etl
启动 mysql,不便起见,这里应用 docker,仅测试,不思考数据挂载问题,这里我把 secure-file-priv 配置的目录挂载进去,是为了导入数据的时候用到,另须要指定应用自定义的 mysql 配置文件
vim /tmp/conf.d/my.cnf
# 保留为以下配置
[mysqld]
log_bin = mysql_bin # 开启 binlog 日志并指定 binlog 日志命名前缀
binlog_format=ROW # 指定 binlog 格局
server_id = 1 # 指定 master server_id
secure_file_priv=/tmp/mysql-files # SELECT INTO OUTFILE 语句须要开启这个配置
mkdir /tmp/mysql-files # 先创立好寄存导出数据的文件夹
docker run --rm -d --name mysql -v /tmp/conf.d:/etc/mysql/conf.d -v /tmp/mysql-files:/tmp/mysql-files -e MYSQL_ROOT_PASSWORD=123456 -p 3306:3306 mysql:5.7 --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --secure-file-priv=/tmp/mysql-files
应用存储过程增加测试数据
-- 创立 test 库
create database test;
use test;
-- 创立 user 表
CREATE TABLE user(
id INT NOT NULL AUTO_INCREMENT,
uname VARCHAR(20) NOT NULL,
sex VARCHAR(5) NOT NULL,
score INT NOT NULL,
copy_id INT NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=INNODB CHARSET=utf8;
-- 存储过程插入 10 万条数据
DROP PROCEDURE IF EXISTS add_user;
DELIMITER //
create PROCEDURE add_user(in num INT)
BEGIN
DECLARE rowid INT DEFAULT 0;
DECLARE firstname CHAR(1);
DECLARE name1 CHAR(1);
DECLARE name2 CHAR(1);
DECLARE sex CHAR(1);
DECLARE score CHAR(2);
DECLARE uname CHAR(3);
WHILE rowid < num DO
SET firstname = SUBSTRING('赵钱孙李周吴郑王林杨柳刘孙陈江阮侯邹高彭徐',FLOOR(1+21*RAND()),1);
SET name1 = SUBSTRING('一二三四五六七八九十甲乙丙丁静景京晶名明铭敏闵民军君俊骏天田甜兲恬益依成城诚立莉力黎励',ROUND(1+43*RAND()),1);
SET name2 = SUBSTRING('一二三四五六七八九十甲乙丙丁静景京晶名明铭敏闵民军君俊骏天田甜兲恬益依成城诚立莉力黎励',ROUND(1+43*RAND()),1);
SET sex=FLOOR(0 + (RAND() * 2));
SET score= FLOOR(40 + (RAND() *60));
SET rowid = rowid + 1;
SET uname = CONCAT(firstname,name1,name2);
insert INTO user (uname,sex,score,copy_id) VALUES (uname,sex,score,rowid);
END WHILE;
END //
DELIMITER ;
call add_user(100000);
-- 创立 test1 库
create database test1;
use test1;
-- 创立表 t1
CREATE TABLE `t` (`id` int(11) NOT NULL,
`a` int(11) DEFAULT NULL,
`b` int(11) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `a` (`a`),
KEY `b` (`b`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 存储过程插入数据
delimiter //
create procedure idata()
begin
declare i int;
set i=1;
while(i<=100000)do
insert into t values(i, i, i);
set i=i+1;
end while;
end //
delimiter ;
call idata();
启动 mariadb columnStore
docker run --rm -d --name mcs -eMARIADB_ROOT_PASSWORD=123456 -p 3307:3306 mariadb/columnstore:1.2.3
批改我的项目配置文件, conf/etl.toml
# 增量 ETL 配置段
[IncrementEtl]
StartFile = "" # 开始同步的 binlog 文件
StartPosition = 0 # 开始同步的 binlog 地位
ServerId = 100 # 同步 slave 的 serverId,只有不与 master serverId 雷同即可
# 全量 ETL 配置段
[FullEtl]
# mysql 数据导出寄存文件夹
OutFileDir = "/tmp/mysql-files/"
# 全量 ETL 规定,Schema: 数据库,Tables: table 数组
# eg: 导出整个库的表 tables=["*"],导出特定的表 tables=["test1", "test2"]
[[Rule]]
Schema = "test"
Tables = ["user"]
[[Rule]]
Schema = "test1"
Tables = ["*"]
# mysql 数据库配置
[Source]
Host = "127.0.0.1"
Port = 3306
User = "root"
Pwd = "123456"
# mariadb columnStore 数据库配置
[Target]
Host = "127.0.0.1"
Port = 3307
User = "root"
Pwd = "123456"
执行二进制文件,指定全量同步模式
./mcs_etl -model full
胜利输入如下,因为是多协程异步执行工作的,所以总耗时可能与 etl 时长最大的表相差无几
同步 `test1`.`t` 耗时 2.025725559s
同步 `test`.`user` 耗时 4.358782152s
fullEtl 耗时 4.373473788s
增量形式进行同步步骤如下:
mysql 运行 show master status 查看以后 binlog 日志文件和 Position,批改 conf/etl.toml 配置文件中的 IncrementEtl 配置段
# 增量 ETL 配置段
[IncrementEtl]
StartFile = "mysql_bin.000004" # 开始同步的 binlog 文件
StartPosition = 154 # 开始同步的 binlog 地位
ServerId = 100 # 同步 slave 的 serverId,只有不与 master serverId 雷同即可
执行二进制文件,指定增量同步模式,此时会进入循环监听期待生产状态
./mcs_etl -model increment
接下来在 mysql 做一些增删改操作
update `test`.`user` set sex = 0 where id = 1;
insert into `test1`.`t` values (100001, 100001, 100001);
delete from `test1`.`t` where id = 100001;
控制台输入日志如下
binlog 复原的语句:update `test`.`user` set `id` = '1',`uname` = '邹二君',`sex` = '0',`score` = '59',`copy_id` = '1' where `id` = '1' and `uname` = '邹二君' and `sex` = '1' and `score` = '59' and `copy_id` = '1'
==>
兼容解决转换后的语句:update `test`.`user` set `id` = '1',`uname` = '邹二君',`sex` = '0',`score` = '59',`copy_id` = '1' where `id` = '1' and `uname` = '邹二君' and `sex` = '1' and `score` = '59' and `copy_id` = '1'
执行后果:执行胜利
binlog 复原的语句:insert into `test1`.`t` (`id`,`a`,`b`) values ('100001','100001','100001')
==>
兼容解决转换后的语句:insert into `test1`.`t` (`id`,`a`,`b`) values ('100001','100001','100001')
执行后果:执行胜利
binlog 复原的语句:delete from `test1`.`t` where `id` = '100001' and `a` = '100001' and `b` = '100001'
==>
兼容解决转换后的语句:delete from `test1`.`t` where `id` = '100001' and `a` = '100001' and `b` = '100001'
执行后果:执行胜利