作者:
刘腾飞 汇量后端开发工程师
阿里云开源 OLAP 研发团队
EMR-StarRocks 介绍
阿里云 EMR 在年初推出了 StarRocks 服务,StarRocks 是新一代极速全场景 MPP(Massively Parallel Processing)数据仓库,致力于构建极速和对立剖析体验。EMR StarRocks 具备如下特点:
- 兼容 MySQL 协定,可应用 MySQL 客户端和罕用 BI 工具对接 StarRocks 来剖析数据
-
采纳分布式架构:
- 对数据表进行程度划分并以多正本存储
- 集群规模能够灵便伸缩,反对 10 PB 级别的数据分析
- 反对 MPP 框架,并行减速计算
- 反对多正本,具备弹性容错能力
- 反对向量化引擎和 CBO
- 反对弹性扩缩容
- 反对明细模型、聚合模型、主键模型和更新模型
更多详细信息能够参考 https://help.aliyun.com/docum…
Flink-CDC 概念介绍
CDC 的全称是 Change Data Capture,面向的场景包含数据同步、数据散发、数据采集,Flink CDC 次要面向数据库的变更,能够将上游数据和 Schema 的变更同步到上游数据湖和数据仓库中。2020 年 7 月,Flink CDC 我的项目提交了第一个 Commit,去年 8 月,Flink 社区公布了 CDC2.0,通过两年工夫的打磨,在商业化应用上曾经十分成熟。本文次要以 Mysql CDC 为例,介绍 StarRocks+Flink CDC 实时入仓中用户遇到的痛点,以及在 Flink 和 StarRocks 层面进行的对应优化和解决方案。
应用 CDC 将一张 Mysql 表中的数据导入到 StarRocks 的表中,首先须要在 StarRocks 上建设用来承接 Mysql 数据的指标表,而后在 Flink 上别离创立 Mysql 表和 StarRocks 表在 Flink 中 Sink 和 Source 表的映射,而后执行一条 insert into sink_table from source_table 语句。执行完 Insert into 之后,会生成一个 CDC 工作,CDC 工作首先向指标表同步源表的全量数据,实现后持续基于 Binlog 进行增量数据的同步。通过一个工作,实现数据的全量 + 增量同步,对于用户来讲是十分敌对的。然而在应用的过程中,仍然发现了一些痛点。
实时写入场景的用户痛点
SQL 开发工作量大
对于一些还没有实现数仓建设的新业务,或是刚刚开始依靠 StarRocks 进行 OLAP 平台建设的用户而言,在 StarRocks 中建表以承载 Mysql 同步过去的数据是第一步。在一些简单的业务中,Mysql 中的表往往有几十上百张,每张表又有数十个字段,要把它们对应的 StarRocks 表的建表语句全副编写进去是一个很大的工作量。第一个痛点 StarRocks 建表的工作量大。
Flink 字段的数据类型映射关系简单易错
在 StarRocks 中建表是第一步,建表实现之后,为了启动 CDC 工作,还须要在 Flink 中建设 Mysql 对应的 Source 表,以及 StarRocks 对应的 Sink 表,其中 Flink 建表时,每个字段的字段类型与 Mysql、与 StarRocks 的映射关系须要严格留神,对于动辄几十上百个须要字段的表,每个字段都须要查找对应在 Flink 的类型映射关系,尤其令开发人员苦楚。因而,第二个痛点是上下游表与 Flink 字段的数据类型映射关系简单,容易出错。
Schema 变更操作繁琐
第三个痛点来自于业务数据 Schema 的变动,据 Fivetran 公司考察,约有 60% 的公司数据 Schema 每个月都会发生变化,30% 的公司数据 Schema 每周都会发生变化。对于 Mysql 表中字段的增删改,用户心愿在不影响 CDC 工作的状况下,将 Schema 变动同步到上游的 StarRocks。目前罕用的计划,是在手动进行工作后,更改 StarRocks 和 Mysql 的 Schema,更改 Flink 侧的 Sink 和 Source 表构造,通过指定 savepoints 的形式再次启动工作。Schema 变更的操作繁琐,无奈自动化是第三个痛点。
数据同步工作占用资源多
第四个痛点,是在表的数量多、实时增量数据量大的场景下,CDC 工作占用的内存和 cpu 资源较高,出于节省成本的思考,用户心愿尽可能的在资源利用方面进行优化。
接下来,咱们来看针对这些痛点,EMR-StarRocks 在与 Flink 深度联合方面做了哪些优化,提供了什么样的解决方案。
CTAS&CDAS
EMR-StarRocks 与 Flink 团队推出的 CTAS&CDAS 性能次要是针对前三个痛点研发的一个解决方案。通过 CTAS&CDAS,能够应用一条 SQL 语句,实现 StarRocks 建表、Flink-CDC 工作创立、实时同步 Schema 变更等本来须要多项繁冗操作的工作,令开发和运维的工作量大大降低。
CTAS 介绍
CTAS 的全称是 create table as,语法结构如下:
CREATE TABLE IF NOT EXISTS runoob_tbl1 with ('starrocks.create.table.properties'='engine = olap primary key(runoob_id) distributed by hash(runoob_id) buckets 8',
'database-name'='test_cdc',
'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
'load-url'='172.16.**.**:8030',
'table-name'='runoob_tbl_sr',
'username'='test',
'password' = '123456',
'sink.buffer-flush.interval-ms' = '5000'
)
as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc',
'table-name' = 'runoob_tbl' )*/;
通过 CTAS 的语法结构能够看到,除了集群信息和 DataBase 信息外,还有一个非凡配置“starrocks.create.table.properties”,这是因为 Mysql 与 StarRocks 的表构造有一些不同,如 Key Type、分区、Bucket Number 等非凡配置,因而用它来承接 StarRocks 建表语句中字段定义前面的内容。
为了不便用户更快的建表,还设置了一个 Simple Mode,配置形式如下:
CREATE TABLE IF NOT EXISTS runoob_tbl1 with (
'starrocks.create.table.properties'='buckets 8',
'starrocks.create.table.mode'='simple',
'database-name'='test_cdc',
'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
'load-url'='172.16.**.**:8030',
'table-name'='runoob_tbl_sr',
'username'='test',
'password' = '123456',
'sink.buffer-flush.interval-ms' = '5000'
)
as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc',
'table-name' = 'runoob_tbl' )*/;
开启 Simple Mode 之后,将默认应用 Primary Key 模型,默认应用 Mysql 中的主键作为 Primary Key,默认应用哈希 (主键) 进行分桶,这样,用户在启动 Simple Mode 对表应用 CTAS 语句时,就齐全不须要关怀 Mysql 中原表有哪些字段,字段名称是什么,主键是什么,只须要晓得表名,就能够高效的实现 SQL 编写。
CTAS 的原理
如图所示,在执行了 CTAS 语句后,首先 Flink 会主动在 StarRocks 中创立一个与 Mysql 源表的 Schema 雷同的指标表,而后建设 Mysql 与 StarRocks 表在 Flink 中的 Sink 和 Source 映射,接下来启动一个 CDC 工作,该工作将同步源表数据到指标表,并在运行时监测 Mysql 源表发送过去的数据产生的 Schema 变更,主动将 Schema 变更同步到 StarRocks 指标表中。CTAS 性能实际上是用一个 SQL,实现了本来须要手动编写 SQL 和执行的多项操作。
接下来介绍 CTAS 的实现原理。CTAS 的实现次要依赖了 Flink CDC、Flink Catalog 和 Schema Evolution。Flink 的 CDC 性能后面曾经介绍过了。其中的 Catalog 性能,使 Flink 能够感知到 StarRocks 中所有的 DataBase 和所有 table 的 Schema,并对它们进行 DDL 操作。而 Schema Evolution 性能,是通过对数据的 Schema 变动进行检测和记录实现的,例如,当 Mysql 产生增列操作时,CTAS 工作并不会依据 Mysql 的 DDL 变动,立即对上游 StarRocks 进行增加列的操作,而是当第一条应用了新 Schema 的数据被解决时,才会通过比照新旧数据 Schema 的区别,生成对应的 Alter Table Add Column 语句,对 StarRocks 进行增列操作,在期待 StarRocks 的 Schema 变更实现之后,新的数据才会被推送到上游。
CDAS 介绍
CDAS 是 CTAS 的一个语法糖。通过 CDAS 语句,能够实现 Mysql 中的整库同步,即生成一个 Flink Job,Source 是 Mysql 中的 database,指标表是 StarRocks 中对应的多张表。
CREATE DATABASE IF NOT EXISTS sr_db with (
'starrocks.create.table.properties'='buckets 8',
'starrocks.create.table.mode'='simple',
'database-name'='test_cdc',
'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
'load-url'='172.16.**.**:8030',
'username'='test',
'password' = '123456',
'sink.buffer-flush.interval-ms' = '5000'
)
as table mysql.test_cdc.runoob_tbl including table
'tabl1','tbl2','tbl3' /*+ OPTIONS ( 'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc' )*/;
因为咱们冀望应用一条 SQL 生成多张表的 Schema 和 CDC 工作,因而须要对立应用 Simple 模式。在理论应用过程中,一个 DataBase 中可能有些表不须要同步、有些表须要自定义配置,因而咱们能够应用 Including Table 语法,只抉择一个 DataBase 中的局部表进行 CDAS 操作,对于须要自定义属性配置的表,则应用 CTAS 语句进行操作。
重要个性
CTAS&CDAS 的几个重要个性包含:
- 反对将多个 CDC 工作应用同一个 Job 执行,节俭了大量的内存和 CPU 资源。
- 反对 Source 合并,在应用 CDAS 进行数据同步时,会应用一个 Job 治理所有表的同步工作,并主动将所有表的 Source 合并为一个,缩小 Mysql 侧并发读取的压力。
- 反对的 Schema Change 类型包含减少列、删除列和批改列名。这里须要留神的是,以后所反对的删除列操作,是通过将对应字段的值置空来实现的,例如上游 Mysql 表删除了一个字段,在 Flink 检测到数据 Schema 变更后,并不会将 StarRocks 中对应的列删除,而是在将数据写入到 StarRocks 时,把对应的字段的值填为空值。而批改列名的操作,也是通过减少一个新列,并把新数据中原来的列的值置空来实现的。
Connector-V2 介绍
Connector-V2 是为了解决第四个痛点而研发的,能够帮忙用户升高通过 Flink 导入 StarRocks 时的内存耗费,晋升工作的稳定性。
如图所示,在 V1 版本中,为了保障 Exactly-Once,咱们须要将一次 Checkpoint 期间的所有数据都憋在 Flink 的 Sink 算子的内存中,因为 Checkpoint 工夫不能设置的太短,且无奈预测单位工夫内数据的流量,因而不仅造成了内存资源的重大耗费,还常常因 OOM 带来稳定性问题。
V2 版本通过两阶段提交的个性解决了这个问题,两阶段提交指的是,数据的提交分为两个阶段,第一阶段提交数据写入工作,在数据写入阶段数据都是不可见的,并且能够分批屡次写入,第二阶段是提交阶段,通过 Commit 申请将之前多批次写入的数据同时置为可见。StarRocks 侧提供了 Begin、Prepare、Commit 等接口,反对将屡次数据写入申请作为同一个事务提交,保障了同一事务内数据的一致性。
通过显示的调用 Transaction 接口的形式,能够由原来在 Flink 侧积攒少量数据、一次性发送数据的形式,改良为间断小批量提交数据,在保障 Exactly-Once 的同时,大大降低了 Flink 侧用于存储数据 Buffer 的内存耗费问题,也进步了 Flink 工作的稳定性。
StarRocks + Flink 在汇量的实际
在汇量的广告投放剖析业务中,应用了 CDAS 个性来实现 Mysql 到 Flink 数据的实时变更。
此前,该业务次要依靠某闭源数据仓库进行 OLAP 剖析,随着数据量的增长,在单表查问和多表 Join 场景都呈现了较大的瓶颈,查问耗时达到无奈容忍的分钟级,因而从新选型采纳了 StarRocks 进行数据分析,在对应场景下体现非常优异。
在汇量的业务场景下,StarRocks 中有几十张波及操作元数据的小表是应用 CDAS 进行实时同步的,另外几张数据量较大的明细表是以离线导入的模式按天更新的。应用 CDAS 的次要是数据更新和 Schema 变动较为频繁的小表和维度表,进行业务查问时,将这些实时更新的表与离线的数据表进行 Join,通过小表实时更新、大表离线更新、大小表联结查问的形式,实现了实时性、老本以及导入与查问性能的取舍平衡。因为业务对数据的准确性要求较高,因而应用了 Exactly-once 语义,通过 Flink 的 Checkpoint 机制来保证数据的不丢不重。