作者:
刘腾飞 汇量后端开发工程师
阿里云开源OLAP研发团队
EMR-StarRocks介绍
阿里云EMR在年初推出了StarRocks服务,StarRocks是新一代极速全场景MPP(Massively Parallel Processing)数据仓库,致力于构建极速和对立剖析体验。EMR StarRocks具备如下特点:
- 兼容MySQL协定,可应用MySQL客户端和罕用BI工具对接StarRocks来剖析数据
采纳分布式架构:
- 对数据表进行程度划分并以多正本存储
- 集群规模能够灵便伸缩,反对10 PB级别的数据分析
- 反对MPP框架,并行减速计算
- 反对多正本,具备弹性容错能力
- 反对向量化引擎和CBO
- 反对弹性扩缩容
- 反对明细模型、聚合模型、主键模型和更新模型
更多详细信息能够参考https://help.aliyun.com/docum...
Flink-CDC概念介绍
CDC的全称是Change Data Capture,面向的场景包含数据同步、数据散发、数据采集,Flink CDC 次要面向数据库的变更,能够将上游数据和Schema的变更同步到上游数据湖和数据仓库中。2020年7月,Flink CDC我的项目提交了第一个Commit,去年8月,Flink社区公布了CDC2.0,通过两年工夫的打磨,在商业化应用上曾经十分成熟。本文次要以Mysql CDC为例,介绍StarRocks+Flink CDC实时入仓中用户遇到的痛点,以及在Flink和StarRocks层面进行的对应优化和解决方案。
应用CDC将一张Mysql表中的数据导入到StarRocks的表中,首先须要在StarRocks上建设用来承接Mysql数据的指标表,而后在Flink上别离创立Mysql表和StarRocks表在Flink中Sink和Source表的映射,而后执行一条insert into sink_table from source_table语句。执行完Insert into之后,会生成一个CDC工作,CDC工作首先向指标表同步源表的全量数据,实现后持续基于Binlog进行增量数据的同步。通过一个工作,实现数据的全量+增量同步,对于用户来讲是十分敌对的。然而在应用的过程中,仍然发现了一些痛点。
实时写入场景的用户痛点
SQL开发工作量大
对于一些还没有实现数仓建设的新业务,或是刚刚开始依靠StarRocks进行OLAP平台建设的用户而言,在StarRocks中建表以承载Mysql同步过去的数据是第一步。在一些简单的业务中,Mysql中的表往往有几十上百张,每张表又有数十个字段,要把它们对应的StarRocks表的建表语句全副编写进去是一个很大的工作量。第一个痛点StarRocks建表的工作量大。
Flink字段的数据类型映射关系简单易错
在StarRocks中建表是第一步,建表实现之后,为了启动CDC工作,还须要在Flink中建设Mysql对应的Source表,以及StarRocks对应的Sink表,其中Flink建表时,每个字段的字段类型与Mysql、与StarRocks的映射关系须要严格留神,对于动辄几十上百个须要字段的表,每个字段都须要查找对应在Flink的类型映射关系,尤其令开发人员苦楚。因而,第二个痛点是上下游表与Flink字段的数据类型映射关系简单,容易出错。
Schema变更操作繁琐
第三个痛点来自于业务数据Schema的变动,据Fivetran公司考察,约有60%的公司数据Schema每个月都会发生变化,30%的公司数据Schema每周都会发生变化。对于Mysql表中字段的增删改,用户心愿在不影响CDC工作的状况下,将Schema变动同步到上游的StarRocks。目前罕用的计划,是在手动进行工作后,更改StarRocks和Mysql的Schema,更改Flink侧的Sink和Source表构造,通过指定savepoints的形式再次启动工作。Schema变更的操作繁琐,无奈自动化是第三个痛点。
数据同步工作占用资源多
第四个痛点,是在表的数量多、实时增量数据量大的场景下,CDC工作占用的内存和cpu资源较高,出于节省成本的思考,用户心愿尽可能的在资源利用方面进行优化。
接下来,咱们来看针对这些痛点,EMR-StarRocks在与Flink深度联合方面做了哪些优化,提供了什么样的解决方案。
CTAS&CDAS
EMR-StarRocks与Flink团队推出的CTAS&CDAS性能次要是针对前三个痛点研发的一个解决方案。通过CTAS&CDAS,能够应用一条SQL语句,实现StarRocks建表、Flink-CDC工作创立、实时同步Schema变更等本来须要多项繁冗操作的工作,令开发和运维的工作量大大降低。
CTAS介绍
CTAS的全称是create table as,语法结构如下:
CREATE TABLE IF NOT EXISTS runoob_tbl1 with ('starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8','database-name'='test_cdc','jdbc-url'='jdbc:mysql://172.16.**.**:9030','load-url'='172.16.**.**:8030','table-name'='runoob_tbl_sr','username'='test','password' = '123456','sink.buffer-flush.interval-ms' = '5000') as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;
通过CTAS的语法结构能够看到,除了集群信息和DataBase信息外,还有一个非凡配置“starrocks.create.table.properties”,这是因为Mysql与StarRocks的表构造有一些不同,如Key Type、分区、Bucket Number等非凡配置,因而用它来承接StarRocks建表语句中字段定义前面的内容。
为了不便用户更快的建表,还设置了一个Simple Mode,配置形式如下:
CREATE TABLE IF NOT EXISTS runoob_tbl1 with ('starrocks.create.table.properties'=' buckets 8','starrocks.create.table.mode'='simple','database-name'='test_cdc','jdbc-url'='jdbc:mysql://172.16.**.**:9030','load-url'='172.16.**.**:8030','table-name'='runoob_tbl_sr','username'='test','password' = '123456','sink.buffer-flush.interval-ms' = '5000') as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;
开启Simple Mode之后,将默认应用Primary Key模型,默认应用Mysql中的主键作为Primary Key,默认应用哈希(主键)进行分桶,这样,用户在启动Simple Mode对表应用CTAS语句时,就齐全不须要关怀Mysql中原表有哪些字段,字段名称是什么,主键是什么,只须要晓得表名,就能够高效的实现SQL编写。
CTAS的原理
如图所示,在执行了CTAS语句后,首先Flink会主动在StarRocks中创立一个与Mysql源表的Schema雷同的指标表,而后建设Mysql与StarRocks表在Flink中的Sink和Source映射,接下来启动一个CDC工作,该工作将同步源表数据到指标表,并在运行时监测Mysql源表发送过去的数据产生的Schema变更,主动将Schema变更同步到StarRocks指标表中。CTAS性能实际上是用一个SQL,实现了本来须要手动编写SQL和执行的多项操作。
接下来介绍CTAS的实现原理。CTAS的实现次要依赖了Flink CDC、Flink Catalog和Schema Evolution。Flink的CDC性能后面曾经介绍过了。其中的Catalog性能,使Flink能够感知到StarRocks中所有的DataBase和所有table的Schema,并对它们进行DDL操作。而Schema Evolution性能,是通过对数据的Schema变动进行检测和记录实现的,例如,当Mysql产生增列操作时,CTAS工作并不会依据Mysql的DDL变动,立即对上游StarRocks进行增加列的操作,而是当第一条应用了新Schema的数据被解决时,才会通过比照新旧数据Schema的区别,生成对应的Alter Table Add Column语句,对StarRocks进行增列操作,在期待StarRocks的Schema变更实现之后,新的数据才会被推送到上游。
CDAS介绍
CDAS是CTAS的一个语法糖。通过CDAS语句,能够实现Mysql中的整库同步,即生成一个Flink Job,Source是Mysql中的database,指标表是StarRocks中对应的多张表。
CREATE DATABASE IF NOT EXISTS sr_db with ('starrocks.create.table.properties'=' buckets 8','starrocks.create.table.mode'='simple','database-name'='test_cdc','jdbc-url'='jdbc:mysql://172.16.**.**:9030','load-url'='172.16.**.**:8030','username'='test','password' = '123456','sink.buffer-flush.interval-ms' = '5000') as table mysql.test_cdc.runoob_tbl including table 'tabl1','tbl2','tbl3' /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'test', 'password' = '123456', 'database-name' = 'test_cdc' )*/;
因为咱们冀望应用一条SQL生成多张表的Schema和CDC工作,因而须要对立应用Simple模式。在理论应用过程中,一个DataBase中可能有些表不须要同步、有些表须要自定义配置,因而咱们能够应用Including Table语法,只抉择一个DataBase中的局部表进行CDAS操作,对于须要自定义属性配置的表,则应用CTAS语句进行操作。
重要个性
CTAS&CDAS的几个重要个性包含:
- 反对将多个CDC工作应用同一个Job执行,节俭了大量的内存和CPU资源。
- 反对Source合并,在应用CDAS进行数据同步时,会应用一个Job治理所有表的同步工作,并主动将所有表的Source合并为一个,缩小Mysql侧并发读取的压力。
- 反对的Schema Change类型包含减少列、删除列和批改列名。这里须要留神的是,以后所反对的删除列操作,是通过将对应字段的值置空来实现的,例如上游Mysql表删除了一个字段,在Flink检测到数据Schema变更后,并不会将StarRocks中对应的列删除,而是在将数据写入到StarRocks时,把对应的字段的值填为空值。而批改列名的操作,也是通过减少一个新列,并把新数据中原来的列的值置空来实现的。
Connector-V2介绍
Connector-V2是为了解决第四个痛点而研发的,能够帮忙用户升高通过Flink导入StarRocks时的内存耗费,晋升工作的稳定性。
如图所示,在V1版本中,为了保障Exactly-Once,咱们须要将一次Checkpoint期间的所有数据都憋在Flink的Sink算子的内存中,因为Checkpoint工夫不能设置的太短,且无奈预测单位工夫内数据的流量,因而不仅造成了内存资源的重大耗费,还常常因OOM带来稳定性问题。
V2版本通过两阶段提交的个性解决了这个问题,两阶段提交指的是,数据的提交分为两个阶段,第一阶段提交数据写入工作,在数据写入阶段数据都是不可见的,并且能够分批屡次写入,第二阶段是提交阶段,通过Commit申请将之前多批次写入的数据同时置为可见。StarRocks侧提供了Begin、Prepare、Commit等接口,反对将屡次数据写入申请作为同一个事务提交,保障了同一事务内数据的一致性。
通过显示的调用Transaction接口的形式,能够由原来在Flink侧积攒少量数据、一次性发送数据的形式,改良为间断小批量提交数据,在保障Exactly-Once的同时,大大降低了Flink侧用于存储数据Buffer的内存耗费问题,也进步了Flink工作的稳定性。
StarRocks + Flink在汇量的实际
在汇量的广告投放剖析业务中,应用了CDAS个性来实现Mysql到Flink数据的实时变更。
此前,该业务次要依靠某闭源数据仓库进行OLAP剖析,随着数据量的增长,在单表查问和多表Join场景都呈现了较大的瓶颈,查问耗时达到无奈容忍的分钟级,因而从新选型采纳了StarRocks进行数据分析,在对应场景下体现非常优异。
在汇量的业务场景下,StarRocks中有几十张波及操作元数据的小表是应用CDAS进行实时同步的,另外几张数据量较大的明细表是以离线导入的模式按天更新的。应用CDAS的次要是数据更新和Schema变动较为频繁的小表和维度表,进行业务查问时,将这些实时更新的表与离线的数据表进行Join,通过小表实时更新、大表离线更新、大小表联结查问的形式,实现了实时性、老本以及导入与查问性能的取舍平衡。因为业务对数据的准确性要求较高,因而应用了Exactly-once语义,通过Flink的Checkpoint机制来保证数据的不丢不重。