作者: 喻奎 阿里云智能 高级技术专家
本文次要从四局部介绍,阿里云云原生大数据计算服务MaxCompute湖仓一体近实时增量解决技术架构的外围设计和利用场景。
一、MaxCompute 湖仓一体倒退过程
MaxCompute作为阿里云自研的海量大数据处理平台曾经有十几年的倒退历史,在规模和扩展性方面始终体现比拟优良。其依靠阿里云飞天分布式操作系统,可能提供疾速,齐全托管的EB级数据仓库及数据湖解决方案,可经济高效的解决海量数据。目前,其承当着阿里团体绝大部分离线数据存储和计算力,是阿里云产品矩阵中最重要的自研外围平台之一。
MaxCompute倒退之初,次要聚焦数仓方面的大数据处理业务场景,并且解决的数据源次要为格式化数据。随着数据处理场景的多样化和业界数据湖架构的衰亡,加上阿里团体外部自身数据也十分多,反对多样化数据源也就成为了一个必选项。因而MaxCompute 设计了欠缺的表面机制,能够读取存储在内部多种格局的数据对象,例如Hadoop开源体系,OSS半结构化或非结构化数据,为此也尽可能设计开发对立的元数据处理架构,此阶段MaxCompute 在湖仓一体化解决方案中迈出了重要一步,极大的扩大了数据处理的业务场景,无效的突破数据孤岛,联动各方面的数据进行综合剖析来开掘整体数据价值。但时效性有余,通常是T+1离线场景。
随着用户数和数据规模一直减少,很多业务场景也越加简单,须要更加欠缺综合的整体解决方案。其中的关键环节之一就是数据须要更加高效的流转起来,为此MaxCompute 进一步设计欠缺凋谢存储和计算架构,更好的去交融生态,让数据可晦涩的进得来也出得去。此外,还有一个重要的业务场景是大规模批量解决和高时效高效率增量解决一体化解决方案,为简化用户数据处理链路,节俭不同零碎之间的数据迁徙老本以及冗余计算和存储老本,MaxCompute 团队设计开发了MaxCompute离线和近实时增量解决的一体化架构。总体来说,现阶段以及将来会基于对立的存储、对立的元数据、对立的计算引擎无效撑持湖仓一体的整体技术架构,让数据可能凋谢互通高效流转,并且计算和存储老本继续优化。
二、MaxCompute近实时增量解决技术架构简介
MaxCompte离线 &近实时增量解决业务零碎架构现状
随着以后数据处理的业务场景日趋简单,对于时效性要求低的大规模数据全量批处理的场景,间接应用MaxCompute 足以很好的满足业务需要,对于时效性要求很高的秒级实时数据处理或者流解决,则须要应用实时零碎或流零碎来满足需要。
但其实对于大部份业务场景,并不要求秒级数据更新可见,更多的是分钟级或者小时级的增量数据处理场景,并且叠加海量数据批处理场景。
对于这类业务场景的解决方案,如果应用繁多的MaxCompute 离线批量解决链路,为了计算的高效性,须要将用户各种简单的一些链路和解决逻辑转化成T+1的批次解决,链路复杂度减少,也可能产生冗余的计算和存储老本,且时效性也较差。但如果应用繁多的实时零碎,资源耗费的老本比拟高,性价比也较低,并且大规模数据批处理的稳定性也有余。因而以后比拟典型的解决方案是Lambda架构,全量批处理应用MaxCompute链路,时效性要求比拟高的增量解决应用实时零碎链路,但该架构也存在大家所熟知的一些固有缺点,比方多套解决和存储引擎引发的数据不统一问题,多份数据冗余存储和计算引入的额定老本,架构简单以及开发周期长等。
针对这些问题近几年大数据开源生态也推出了各种解决方案,最风行的就是Spark/Flink/Presto开源数据处理引擎,深度集成开源数据湖Hudi、Delta Lake和Iceberg三剑客,来综合提供解决方案,解决Lamdba架构带来的一系列问题,而MaxCompute近一年自研开发的离线近实时增量解决一体化架构,同样是为了解决这些问题而设计,不仅仅具备分钟级的增全量数据读写以及数据处理的业务需要,也能提供Upsert,Timetravel等一系列实用功能,可大幅扩大业务场景,并且无效的节俭数据计算,存储和迁徙老本,切实进步用户体验。下文就将介绍该技术架构的一些典型的性能和设计。
MaxCompute近实时增量解决技术架构
MaxCompute近实时增量解决整体架构的设计改变次要集中在五个模块:数据接入、计算引擎、数据优化服务,元数据管理,数据文件组织。其余部份间接复用MaxCompute 已有的架构和计算流程,比方数据的分布式存储间接集成了阿里云基础设施盘古服务。
- 数据接入次要反对各种数据源全量和近实时增量导入性能。MaxCompute 联结相干产品定制开发多种数据接入工具,例如MaxCompute 定制开发的Flink Connector,DataWorks的数据集成等,用来反对高效的近实时增量数据导入。这些工具会对接MaxCompute 的数据通道服务Tunnel Server,次要反对高并发分钟级增量数据写入。此外,也反对MaxCompute SQL,以及其它一些接口用于反对全量数据高效写入。
- 计算引擎次要蕴含MaxCompute 自研的SQL引擎,负责Timetravel和增量场景下的SQL DDL/DML/DQL的语法解析,优化和执行链路。此外,MaxCompute外部集成的Spark等引擎也在设计开发反对中。
- 数据优化服务次要由MaxCompute 的Storage Service来负责智能的主动治理增量数据文件,其中包含小文件合并Clustering,数据Compaction,数据排序等优化服务。对于其中局部操作,Storage Service会依据数据特色,时序等多个维度综合评估,主动执行数据优化工作,尽可能放弃衰弱高效的数据存储和计算状态。
- 元数据管理次要负责增量场景下数据版本治理,Timetravel治理,事务并发抵触治理,元数据更新和优化等。
- 数据文件组织次要蕴含对全量和增量数据文件格式的治理以及读写相干的模块。
三、外围设计解剖
对立的数据文件组织格局
要反对全量和增量解决一体化架构首先须要设计对立的表类型以及对应的数据组织格局,这里称为Transactional Table2.0,简称TT2,根本能够反对一般表的所有性能,同时反对增量解决链路的新场景,包含timetravel查问、upsert操作等。
TT2要失效只须要在创立一般表时额定设置主键primary key(PK),以及表属性transactional为true即可。PK列用于反对Upsert链路性能,PK值雷同的多行记录在查问或者Compaction会merge成一行数据,只保留最新状态。transactional属性则代表反对ACID事务机制,满足读写快照隔离,并且每行数据会绑定事务属性,比方事务timestamp,用来反对timetravel查问,过滤出正确数据版本的记录。此外TT2的tblproperties还能够设置其余的一些可选的表属性,比方write.bucket.num用来配置数据写入的并发度,acid.data.retain.hours用来配置历史数据的无效查问工夫范畴等。
TT2表数据文件存在多种组织格局用来反对丰盛的读写场景。其中base file数据文件不保留Update/Delete中间状态,用来撑持全量批处理的读写效率,delta file增量数据文件会保留每行数据的中间状态,用于满足近实时增量读写需要。
为了进一步优化读写效率,TT2反对依照BucketIndex对数据进行切分存储,BucketIndex数据列默认复用PK列,bucket数量可通过配置表属性write.bucket.num指定,数据写入的高并发可通过bucket数量程度扩大,并且查问时,如果过滤条件为PK列,也可无效的进行Bucket裁剪查问优化。数据文件也可依照PK列进行排序,可无效晋升MergeSort的效率,并有助于DataSkipping查问优化。数据文件会依照列式压缩存储,可无效缩小存储的数据量,节省成本,也可无效的晋升IO读写效率。
数据近实时流入
后面介绍了对立的数据组织格局,接下来须要思考数据如何高效写入TT2。
数据流入次要分成近实时增量写入和批量写入两种场景。这里先形容如何设计高并发的近实时增量写入场景。用户的数据源丰盛多样,可能存在数据库,日志零碎或者其余音讯队列等零碎中,为了不便用户迁徙数据写入TT2, MaxCompute定制开发了Flink Connector、Dataworks数据集成以及其它开源工具,并且针对TT2表做了很多专门的设计开发优化。这些工具外部会集成MaxCompute 数据通道服务Tunnel提供的客户端SDK,反对分钟级高并发写入数据到Tunnel Server,由它高并发把数据写入到每个Bucket的数据文件中。
写入并发度可通过后面提及的表属性write.bucket.num来配置,因而写入速度可程度扩大。对同一张表或分区的数据,写入数据会按pk值对数据进行切分,雷同pk值会落在同一个bucket桶中。此外,数据分桶的益处还有利于数据优化治理操作例如小文件clustering,compaction等都能够桶的粒度来并发计算,进步执行效率。分桶对于查问优化也十分有益处,可反对bucket裁剪、shuffle move等查问优化操作。
Tunnel SDK提供的数据写入接口目前反对upsert和delete两种数据格式,upsert蕴含insert / update两种隐含语义,如数据行不存在就代表insert,如已存在就代表update。commit接口代表原子提交这段时间写入的数据如返回胜利就代表写入数据查问可见,满足读写快照隔离级别,如返回失败,数据须要从新写入。
SQL批量写入
批量导入次要通过SQL进行操作。为了不便用户操作,实现了操作TT2所有的DDL / DML语法。SQL引擎内核模块包含Compiler、Optimizer、Runtime等都做了大量革新开发以反对相干性能,包含特定语法的解析,特定算子的Planner优化,针对pk列的去重逻辑,以及runtime结构Upsert格局数据写入等。数据计算写入实现之后,会由Meta Service来原子性更新Meta信息,此外,也做了大量革新来反对残缺的事务机制保障读写隔离、事务冲突检测等等。
小数据文件合并
因为TT2自身反对分钟级近实时增量数据导入,高流量场景下可能会导致增量小文件数量收缩,从而引发存储拜访压力大、老本高,并且大量的小文件还会引发meta更新以及剖析执行慢,数据读写IO效率低下等问题,因而须要设计正当的小文件合并服务, 即Clustering服务来主动优化此类场景。
Clustering服务次要由MaxCompute 外部的Storage Service来负责执行,专门解决小文件合并的问题,须要留神的是,它并不会扭转任何数据的历史中间状态,即不会打消数据的Update/Delete中间状态。
联合上图可大略理解Clustering服务的整体操作流程。Clustering策略制订次要依据一些典型的读写业务场景而设计,会周期性的依据数据文件大小,数量等多个维度来综合评估,进行分档次的合并。Level0到Level1次要针对原始写入的Delta小文件(图中蓝色数据文件)合并为中等大小的Delta文件(图中黄色数据文件),当中等大小的Delta文件达到肯定规模后,会进一步触发Level1到Level2的合并,生成更大的Delta文件(图中橙色数据文件)。
对于一些超过肯定大小的数据文件会进行专门的隔离解决,不会触发进一步合并,防止不必要的读写放大问题,如图中Bucket3的T8数据文件。超过肯定时间跨度的文件也不会合并,因为时间跨度太大的数据合并在一起的话,当TimeTravel或者增量查问时,可能会读取大量不属于此次查问工夫范畴的历史数据,造成不必要的读放大问题。
因为数据是依照BucketIndex来切分存储的,因而Clustering服务会以bucket粒度来并发执行,大幅缩短整体运行工夫。
Clustering服务须要和Meta Service进行交互,获取须要执行此操作的表或分区的列表,执行完结之后,会把新老数据文件的信息传入Meta Service,它负责Clustering操作的事务冲突检测,新老文件meta信息原子更新、老的数据文件回收等。
Clustering服务能够很好的解决大文件数量收缩引发的一系列效率低下的读写问题,但不是频率越高越好,执行一次也会耗费计算和IO资源,至多数据都要全副读写一遍,存在肯定的读写放大问题。因而执行策略的抉择尤其重要,所以目前临时不会凋谢给用户手动执行,而是引擎依据零碎状态智能主动触发执行,可保障Clustering服务执行的高效率。
数据文件Compaction
除了小文件收缩问题须要解决外,仍然还有一些典型场景存在其它问题。TT2反对update、delete格局的数据写入,如果存在大量此格局的数据写入,会造成中间状态的冗余记录太多,引发存储和计算成本减少,查问效率低下等问题。因而须要设计正当的数据文件compaction服务优化此类场景。
Compaction服务次要由MaxCompute 外部的Storage Service来负责执行,既反对用户手动执行SQL语句触发、也可通过配置表属性依照工夫频率、Commit次数等维度主动触发。此服务会把选中的数据文件,蕴含base file和delta file,一起进行Merge,打消数据的Update / Delete中间状态,PK值雷同的多行记录只保留最新状态的一行记录,最初生成新的只蕴含Insert格局的base file。
联合上图可大略理解Compaction服务的整体操作流程。t1到t3时间段,一些delta files写入进来,触发compaction操作,同样会以bucket粒度并发执行,把所有的delta files进行merge,而后生成新的base file。之后t4和t6时间段,又写入了一批新的delta files,再触发compaction操作,会把以后存在的base file和新增的delta files一起做merge操作,从新生成一个新的base file。
Compaction服务也须要和Meta Service进行交互,流程和Clustering相似,获取须要执行此操作的表或分区的列表,执行完结之后,会把新老数据文件的信息传入Meta Service,它负责Compaction操作的事务冲突检测,新老文件meta信息原子更新、老的数据文件回收等。
Compaction服务通过打消数据两头历史状态,可节俭计算和存储老本,极大减速全量快照查问场景的效率,但也不是频率越高越好,首先执行一次也要读取一遍全量数据进行Merge,极大耗费计算和IO资源,并且生成的新base file也会占据额定的存储老本,而老的delta file文件可能须要用于反对timetravel查问,因而不能很快删除,仍然会有存储老本,所以Compaction操作须要用户依据本人的业务场景和数据特色来正当抉择执行的频率,通常来说,对于Update / Delete格局的记录较多,并且全量查问次数也较多的场景,能够适当减少compaction的频率来减速查问。
事务管理
以上次要介绍了典型的数据更新操作,而它们的事务并发治理都会对立由Meta Service进行管制。
下面表格具体展现了各个具体操作并发执行的事物抵触规定。Meta服务采纳了经典的MVCC模型来满足读写快照隔离,采纳OCC模型进行乐观事务并发管制。对于一些高频的操作独自设计优化了事务冲突检测和重试机制,如clustering操作和insert into 并发执行,即便事务Start和Commit工夫呈现穿插也不会抵触失败,都能胜利执行,即便在原子提交Meta信息更新时呈现小概率失败也可在Meta层面进行事务重试,代价很低,不须要数据从新计算和读写。
此外,各种数据文件信息以及快照版本也须要无效的治理,其中蕴含数据版本、统计信息、历史数据、生命周期等等。对于TimeTravel和增量查问,Meta层面专门进行了设计开发优化,反对高效的查问历史版本和文件信息。
TimeTravel查问
基于TT2,计算引擎可高效反对典型的业务场景TimeTravel查问,即查问历史版本的数据,可用于回溯历史状态的业务数据,或数据出错时,用来复原历史状态数据进行数据纠正,当然也反对间接应用restore操作复原到指定的历史版本。
对于TimeTravel查问,会首先找到要查问的历史数据版本之前最近的base file,再查找前面的delta files,进行合并输入,其中base file能够用来减速查问读取效率。
这里联合上图进一步形容一些具体的数据查问场景。比方创立一TT2表,schema蕴含一个pk列和一个val列。右边图展现了数据变动过程,在t2和t4时刻别离执行了compaction操作,生成了两个base file: b1和b2。b1中曾经打消了历史中间状态记录(2,a),只保留最新状态的记录 (2,b)。
如查问t1时刻的历史数据,只需读取delta file (d1)进行输入; 如查问t2时刻,只需读取base file (b1) 输入其三条记录。如查问t3时刻,就会蕴含base file ( b1)加上delta file (d3)进行合并输入,可依此类推其余时刻的查问。
可见,base文件虽可用来减速查问,但须要触发较重的compaction操作,用户须要联合本人的业务场景抉择适合的触发策略。
TimeTravel可依据timestamp和version两种版本状态进行查问,除了间接指定一些常量和罕用函数外,咱们还额定开发了get_latest_timestamp和get_latest_version两个函数,第二个参数代表它是最近第几次commit,不便用户获取咱们外部的数据版本进行精准查问,晋升用户体验。
增量查问
此外,SQL增量查问也是重点设计开发的场景,次要用于一些业务的近实时增量解决链路,新增SQL语法采纳between and关键字,查问的工夫范畴是左开右闭,即begin是一个开区间,必须大于它,end是一个闭区间。
增量查问不会读取任何base file,只会读取指定工夫区间内的所有delta files,依照指定的策略进行Merge输入。
通过上诉表格可进一步理解细节,如begin是t1-1,end是t1,只读取t1时间段对应的delta file (d1)进行输入, 如果end是t2,会读取两个delta files (d1和d2);如果begin是t1,end是t2-1,即查问的工夫范畴为(t1, t2),这个时间段是没有任何增量数据插入的,会返回空行。
对于Clustering和Compaction操作也会产生新的数据文件,但并没有减少新的逻辑数据行,因而这些新文件都不会作为新增数据的语义,增量查问做了专门设计优化,会剔除掉这些文件,也比拟贴合用户应用场景。
历史版本数据回收
因为Timetravel和增量查问都会查问数据的历史状态,因而须要保留肯定的工夫,可通过表属性acid.data.retain.hours来配置保留的工夫范畴。如果历史状态数据存在的工夫早于配置值,零碎会开始主动回收清理,一旦清理实现,TimeTravel就查问不到对应的历史状态了。回收的数据次要蕴含操作日志和数据文件两局部。
同时,也会提供purge命令,用于非凡场景下手动触发强制革除历史数据。
数据接入生态集成现状
初期上线反对接入TT2的工具次要包含:
- DataWorks数据集成:反对数据库等丰盛的数据源表全量以及增量的同步业务。
- MaxCompute Flink Connector:反对近实时的upsert数据增量写入,这一块还在继续优化中,包含如何确保Exactly Once语义,如何保障大规模分区写入的稳定性等,都会做深度的设计优化。
- MaxCompute MMA:反对大规模批量 Hive数据迁徙。很多业务场景数据迁徙可能先把存在的全量表导入进来,之后再继续近实时导入增量数据,因而须要有一些批量导入的工具反对。
- 阿里云实时计算Flink版Connector:反对近实时Upsert数据增量写入,性能还在欠缺中。
- MaxCompute SDK:间接基于SDK开发反对近实时导入数据,不举荐
- MaxCompute SQL:通过SQL批量导入数据
对其它一些接入工具,比方Kafka等,后续也在陆续布局反对中。
特点
作为一个新设计的架构,MaxCompute 会尽量去笼罩开源数据湖(HUDI / Iceberg)的一些通用性能,有助于相似业务场景的用户进行数据和业务链路迁徙。此外,MaxCompute离线 & 近实时增量解决一体化架构还具备一些独特的亮点:
- 对立的存储、元数据、计算引擎一体化设计,做了十分深度和高效的集成,具备存储成本低,数据文件治理高效,查问效率高,并且Timetravel / 增量查问可复用MaxCompute批量查问的大量优化规定等劣势。
- 全套对立的SQL语法反对,十分便于用户应用。
- 深度定制优化的数据导入工具,反对一些简单的业务场景。
- 无缝连接MaxCompute现有的业务场景,能够缩小迁徙、存储、计算成本。
- 齐全自动化治理数据文件,保障更好的读写稳定性和性能,主动优化存储效率和老本。
- 基于MaxCompute平台齐全托管,用户能够开箱即用,没有额定的接入老本,性能失效只须要创立一张新类型的表即可。
- 作为齐全自研的架构,需要开发节奏齐全自主可控。
四、利用实际与将来布局
离线 & 近实时增量解决一体化业务架构实际
基于新架构,MaxCompute可从新构建离线 & 近实时增量解决一体化的业务架构,即能够解决大部分的Lambda架构的痛点,也能节俭应用繁多离线或者实时零碎架构带来的一些不可避免的计算和存储老本。各种数据源能够不便的通过丰盛的接入工具实现增量和离线批量导入,由对立的存储和数据管理服务主动优化数据编排,应用对立的计算引擎反对近实时增量解决链路和大规模离线批量解决链路,而且由对立的元数据服务反对事务和文件元数据管理。它带来的劣势十分显著,可无效防止纯离线零碎解决增量数据导致的冗余计算和存储,也能解决纯实时零碎昂扬的资源耗费老本,也可打消多套零碎的不统一问题和缩小冗余多份存储老本以及零碎间的数据迁徙老本,其余的劣势能够参考上图,就不一一列举了。总体而言,就是应用一套架构既能够满足增量解决链路的计算存储优化以及分钟级的时效性,又能保障批处理的整体高效性,还能无效节俭资源应用老本。
将来布局
最初再看一下将来一年内的布局:
- 继续欠缺SQL的整体性能反对,升高用户接入门槛;欠缺Schema Evolution反对。
- 更加丰盛的数据接入工具的开发反对,继续优化特定场景的数据写入效率。
- 开发增量查问小工作分钟级别的pipeline主动执行调度框架,极大的简化用户增量解决链路业务的开发难度,齐全主动依据工作执行状态触发pipeline任务调度,并主动读取增量数据进行计算。
- 继续持续优化SQL查问效率,以及数据文件主动优化治理。
- 扩大生态交融,反对更多的第三方引擎读写TT2。
新架构目前还没有在MaxCompute最新的对外版本推出,大略6-7月份咱们将对外公布邀测应用,大家能够通过关注MaxCompute官网理解相干停顿。也欢送大家退出MaxCompute开发者钉钉群,与咱们间接沟通。
五、Q&A
Q1:Bucket数量的设置与commit距离以及compaction距离设置的最佳举荐是什么?
A1:Bucket数量与导入的数据量相干,数据量越大,倡议设置的bucket数量多一些,在批量导入的场景,举荐每个bucket的数据量不要超过1G,在近实时增量导入场景,也要依据Tunnel的可用资源以及QPS流量状况来决定bucket数量。对于commit的距离尽管反对分钟级数据可见,但如果数据规模较大,bucket数量较多,咱们举荐距离最好在五分钟以上,也须要思考联合 Flink Connector的checkpoint机制来联动设置commit频率,以反对Exactly Once语义,流量不大的话,5~10分钟距离是推荐值。Compaction距离跟业务场景相干,它有很大的计算成本,也会引入额定的base file存储老本,如果对查问效率要求比拟高且比拟频繁,compaction须要思考设置正当的频率,如果不设置,随着delta files和update记录的一直减少,查问效率会越来越差。
Q2:会不会因为commit太快,compaction跟不上?
A2:Commit频率和Compaction频率没有间接关系,Compaction会读取全量数据,所以频率要低一些,至多小时或者天级别,而Commit写入增量数据的频率是比拟快的,通常是分钟级别。
Q3:是否须要专门的增量计算优化器?
A3:这个问题很好,的确须要有一些特定的优化规定,目前只是复用咱们现有的SQL优化器,后续会继续布局针对一些非凡的场景进行增量计算的设计优化。
Q4:刚刚说会在一两个月邀测MaxCompute新架构,让大家去征询。是全副替换为新的架构还是上线一部分的新架构去做些尝试,是要让用户去抉择吗?还是怎么?
A4:新技术架构对用户来说是通明的,用户能够通过MaxCompute无缝接入应用,只须要创立新类型的表即可。针对有这个需要的新业务或者之前解决链路性价比不高的老业务,能够思考缓缓切换到这条新链路尝试应用。
【 MaxCompute公布收费试用打算,为数仓建设提速 】新用户可0元支付5000CU*小时计算资源与100GB存储,有效期3个月。 立刻支付>>