乐趣区

关于apache-doris:Apache-Doris-Binlog-Load使用方法及示例

Binlog Load 提供了一种使 Doris 增量同步用户在 Mysql 数据库的对数据更新操作的 CDC(Change Data Capture)性能,使用户更方面的实现 Mysql 数据的导入

留神:
该性能须要在 0.15 及当前的版本里应用

  1. 装置配置 Mysql
    装置 Mysql
    疾速应用 Docker 装置配置 Mysql,具体参照上面的连贯
    https://segmentfault.com/a/11…
    如果是在物理机上安装能够参考上面的连贯:
    在 CentOS 7 中装置 MySQL 8 的教程详解
    开启 Mysql binlog
    进入 Docker 容器或者物理机上批改 /etc/my.cnf 文件,在 [mysqld] 上面增加以下内容,

    log_bin=mysql_bin
    binlog-format=Row
    server-id=1

    而后重启 Mysql
    systemctl restart mysqld
    创立 Mysql 表

    create database demo;
    ​
    CREATE TABLE `test_cdc` (
    `id` int NOT NULL AUTO_INCREMENT,
    `sex` TINYINT(1) DEFAULT NULL,
    `name` varchar(20) DEFAULT NULL,
    `address` varchar(255) DEFAULT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB
  2. 装置配置 Canal
    下载 canal-1.1.5: https://github.com/alibaba/ca…

解压 Canal 到指定目录:
tar zxvf canal.deployer-1.1.5.tar.gz -C ./canal
在 conf 文件夹下新建目录并重命名,作为 instance 的根目录,目录名你能够本人命名便于辨认即可
例如我这里的命名是和我的数据库库名统一:demo
vi conf/demo/instance.properties
上面给出的是一个我的示例配置:
这外面的参数阐明请参考 Canal 官网文档

 #################################################
 ## mysql serverId , v1.0.26+ will autoGen
 canal.instance.mysql.slaveId=12115
 ​
 # enable gtid use true/false
 canal.instance.gtidon=false
 ​
 # position info
 canal.instance.master.address=10.220.146.11:3306
 canal.instance.master.journal.name=
 canal.instance.master.position=
 canal.instance.master.timestamp=
 canal.instance.master.gtid=
 ​
 # rds oss binlog
 canal.instance.rds.accesskey=
 canal.instance.rds.secretkey=
 canal.instance.rds.instanceId=
 ​
 # table meta tsdb info
 canal.instance.tsdb.enable=true
 #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
 #canal.instance.tsdb.dbUsername=canal
 #canal.instance.tsdb.dbPassword=canal
 ​
 #canal.instance.standby.address =
 #canal.instance.standby.journal.name =
 #canal.instance.standby.position =
 #canal.instance.standby.timestamp =
 #canal.instance.standby.gtid=
 ​
 # username/password
 canal.instance.dbUsername=zhangfeng
 canal.instance.dbPassword=zhangfeng800729)(*Q
 canal.instance.connectionCharset = UTF-8
 # enable druid Decrypt database password
 canal.instance.enableDruid=false
 #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
 ​
 # table regex
 canal.instance.filter.regex=demo\\..*
 # table black regex
 canal.instance.filter.black.regex=
 # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
 #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
 # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
 #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
 ​
 # mq config
 #canal.mq.topic=
 # dynamic topic route by schema or table regex
 #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
 #canal.mq.partition=0
 # hash partition config
 #canal.mq.partitionsNum=3
 #canal.mq.partitionHash=test.table:id^name,.*\\..*
 #################################################

启动 Canal
sh bin/startup.sh

留神:canal instance user/passwd
1.1.5 版本,在 canal.properties 里加上这两个配置
canal.user = canal canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
默认明码为 canal/canal,canal.passwd 的明码值能够通过 select password(“xxx”) 来获取

验证是否启动胜利
tail -200f logs/demo/demo.log

3. 开始同步数据
3.1 创立 Doris 指标表
用户须要先在 Doris 端创立好与 Mysql 端对应的指标表

Binlog Load 只能反对 Unique 类型的指标表,且必须激活指标表的 Batch Delete 性能。

开启 Batch Delete 的办法能够参考 help alter table 中的批量删除性能。

 CREATE TABLE `doris_mysql_binlog_demo` (
   `id` int NOT NULL,
   `sex` TINYINT(1),
   `name` varchar(20),
   `address` varchar(255) 
 ) ENGINE=OLAP
 UNIQUE KEY(`id`,sex)
 COMMENT "OLAP"
 DISTRIBUTED BY HASH(`sex`) BUCKETS 1
 PROPERTIES (
 "replication_allocation" = "tag.location.default: 3",
 "in_memory" = "false",
 "storage_format" = "V2"
 );
 ​
 -- enable batch delete
 ALTER TABLE test_2.doris_mysql_binlog_demo ENABLE FEATURE "BATCH_DELETE";

3.1 创立同步作业
3.1.1 Create Sync Job 语法阐明
Name: ‘CREATE SYNC JOB’ Description:

数据同步 (Sync Job) 性能,反对用户提交一个常驻的数据同步作业,通过从指定的远端地址读取 Binlog 日志,增量同步用户在 Mysql 数据库的对数据更新操作的 CDC(Change Data Capture)性能。目前数据同步作业只反对对接 Canal,从 Canal Server 上获取解析好的 Binlog 数据,导入到 Doris 内。用户可通过 SHOW SYNC JOB 查看数据同步作业状态。语法:

 CREATE SYNC [db.]job_name
  (
     channel_desc, 
     channel_desc
     ...
  )
 binlog_desc

job_name
同步作业名称,是作业在以后数据库内的惟一标识,雷同 job_name 的作业只能有一个在运行。
channel_desc
作业下的数据通道,用来形容 mysql 源表到 doris 指标表的映射关系。
语法:

FROM mysql_db.src_tbl INTO des_tbl
[partitions]
[columns_mapping]
mysql_db.src_tbl

指定 mysql 端的数据库和源表。
des_tbl
指定 doris 端的指标表,只反对 Unique 表,且需开启表的 batch delete 性能 (开启办法请看 help alter table 的 ’ 批量删除性能 ’)。
partitions
指定导入目标表的哪些 partition 中。如果不指定,则会主动导入到对应的 partition 中。
示例:
PARTITION(p1, p2, p3)
column_mapping
指定 mysql 源表和 doris 指标表的列之间的映射关系。如果不指定,FE 会默认源表和指标表的列按程序一一对应。
不反对 col_name = expr 的模式示意列。
示例:
假如指标表列为 (k1, k2, v1),

扭转列 k1 和 k2 的程序
COLUMNS(k2, k1, v1)

疏忽源数据的第四列
COLUMNS(k2, k1, v1, dummy_column)

binlog_desc
用来形容远端数据源,目前仅反对 canal 一种。
语法:
FROM BINLOG
(
“key1” = “value1”,
“key2” = “value2”
)
Canal 数据源对应的属性,以 canal. 为前缀
canal.server.ip: canal server 的地址
canal.server.port: canal server 的端口
canal.destination: instance 的标识
canal.batchSize: 获取的 batch 大小的最大值,默认 8192
canal.username: instance 的用户名
canal.password: instance 的明码
canal.debug: 可选,设置为 true 时,会将 batch 和每一行数据的详细信息都打印进去 Examples:

简略为 test_db 的 test_tbl 创立一个名为 job1 的数据同步作业,连贯本地的 Canal 服务器,对应 Mysql 源表 mysql_db1.tbl1。

CREATE SYNC `test_db`.`job1`
(FROM `mysql_db1`.`tbl1` INTO `test_tbl `)
FROM BINLOG
(
"type" = "canal",
"canal.server.ip" = "127.0.0.1",
"canal.server.port" = "11111",
"canal.destination" = "example",
"canal.username" = "","canal.password"=""
);

为 test_db 的多张表创立一个名为 job1 的数据同步作业,一一对应多张 Mysql 源表,并显式的指定列映射。

CREATE SYNC `test_db`.`job1`
(FROM `mysql_db`.`t1` INTO `test1` COLUMNS(k1, k2, v1) PARTITIONS (p1, p2),
FROM `mysql_db`.`t2` INTO `test2` COLUMNS(k3, k4, v2) PARTITION p1
)
FROM BINLOG
(
"type" = "canal",
"canal.server.ip" = "xx.xxx.xxx.xx",
"canal.server.port" = "12111",
"canal.destination" = "example",
"canal.username" = "username",
"canal.password" = "password"
);

3.1.2 开始同步 mysql 表里数据到 Doris
留神:
创立同步工作之前,首先要在 fe.conf 里配置 enable_create_sync_job=true,这个默认是 false 不启用,否则就不能创立同步工作

 CREATE SYNC test_2.doris_mysql_binlog_demo_job 
 (FROM demo.test_cdc INTO doris_mysql_binlog_demo) 
 FROM BINLOG 
 (
     "type" = "canal", 
     "canal.server.ip" = "10.220.146.10", 
     "canal.server.port" = "11111", 
     "canal.destination" = "demo",  
     "canal.username" = "canal", 
     "canal.password" = "canal"
 );

3.1.3 查看同步工作

 SHOW SYNC JOB from test_2;

3.1.4 查看表里的数据

 select * from doris_mysql_binlog_demo;

3.1.5 删除数据
咱们在 Mysql 数据表里删除数据,而后看 Doris 表里的变动

 delete from test_cdc where id in (12,13)

咱们在去看 Doris 表里,id 是 12,13 这两条数据曾经被删除

3.1.6 多表同步
多表同步只须要像上面这样写法就能够了

 CREATE SYNC test_2.doris_mysql_binlog_demo_job 
 (
     FROM demo.test_cdc INTO doris_mysql_binlog_demo,
     FROM demo.test_cdc_1 INTO doris_mysql_binlog_demo,
     FROM demo.test_cdc_2 INTO doris_mysql_binlog_demo,
     FROM demo.test_cdc_3 INTO doris_mysql_binlog_demo
 ) 
退出移动版