简介: 本文将介绍数仓建设过程中面对三种计算模式,较低的研发效率、不可控的数据品质,以及臃肿数据接口服务的窘境的解决方案。背景在漫长的数仓建设过程中,实时数仓与离线数仓别离由不同的团队进行独立建设,有大而广的离线数仓体系,也有须要谋求业务时效,须要建设实时数仓。然而,业务数据需要和数据产品需要中,往往须要把实时数据与离线数据联合在一起进行比对和剖析,然而这两个人造不一样的数据存储和计算构造,须要同时开发两套数据模型。在数据处理过程中,实时数仓须要应用Blink/Flink 解决,离线须要写ODPS SQL解决,还有在线计算模型,须要开发java代码解决。

如上图所示,实时数据与离线数据在存储层、计算层、服务层,都是割裂拆散、独立建设的。实时数据,有着增量式计算的个性,须要疾速的流转与计算,它次要以DataHub、Flink、Hbase等异构零碎做为撑持,串联造成一个残缺实时计算全链路。而离线数据,是定时、批量计算的个性,由存储计算对立的ODPS零碎做为撑持。它们除了计算链路的差别,还有着数据处理的逻辑差别:流批处理不能复用,ODPS和Blink/Flink的SQL规范不统一,他们的底层调度和数据处理逻辑也有基本的差异,一个是以MR作为外围的批处理形式,一个是以Flink/Blink为外围的流解决。有些流批处理场景须要调用HSF接口,调用HSF接口,在Java Spring环境里,是信手拈来的事件,但到了ODPS/Flink环境里,就变得额定的一个大挑战,甚至不可实现,因为在ODPS/Flink是无奈加载或者极难应用Spring容器的,这就会让开发在面对简单流解决场景,更偏向应用本人相熟的Java环境,但同时也象征失去ODPS/Flink那种贴近业务表白的SQL表白。计算解决除了流批处理,还更宽泛存在在线交互计算,这个和流解决异步解决还不太一样,是须要同步计算并返回后果,这通常是在Java环境开发HSF接口,但如果你要对外同时提供三种能力:流计算、批计算和在线计算的时候,就面临须要三端开发,流计算和批计算尚且还有SQL,尽管不太统一,但至多大同小异,但在线交互计算就是须要纯Java开发了,将SQL翻译成Java代码可是个不小的挑战。 Unify SQL VS 流批一体面对三种计算模式,较低的研发效率、不可控的数据品质,以及臃肿数据接口服务的窘境,三端(流计算、批计算、在线计算)一体计算的想法天然油然而生,在20年做价格力业务时候,我就始终思考有什么解法,其实,这个也是大数据架构常常面临的问题,业界达成共识,能够演绎两种计划:▐  流批一体计算 同一个引擎承载流、批两种计算模式,在流计算模式下进行实时数据计算,在批计算模式下进行离线数据计算。

流批一体计算的典型架构是:Flink + Kappa架构。Flink能够实现基于SQL的流批一体的计算表白,简单计算通过Java利用承接,价格力计算架构就是典型这种架构,然而这种架构存在以下问题:没有解决在线交互计算Flink解决了流计算、批计算一体的能力,这两种都是异步解决,只是时效不同而已,但没有解决在线计算的能力,如果要提供在线计算能力,不得不在以下两个计划抉择:通过Java利用提供同步计算接口,这样就存在两套逻辑:一个是Flink实现的流批处理,另一个是Java实现的在线解决。提供两个接口,一个接口是发动计算申请,将计算申请交到Flink解决后,再提供一个轮训查问接口,查问计算好后的数据,这个计划至多在计算上做到一套代码,但这种同步转异步解决的计划势必会影响产品的设计。 Flink的批处理吞吐量 Flink实现批处理,其实是有点两厢情愿,为啥这么说,因为其吞吐规模,跟MR批计算(ODPS)齐全不是一个量级,如果Flink真能实现和ODPS齐全对等的吞吐规模和资源老本,那齐全不须要ODPS什么事了,但事实是,对于一些只有批量解决场景的(比方特色预处理、统计计算),ODPS依然是第一优先选择,只有当面临流批同时存在的场景时候,并且对批处理规模要求不大时候,Flink确实提供十分不错的一体化解决方案。▐  Unify SQL 同一SQL代码通过自动化本义,翻译到流计算引擎和批计算引擎上进行流、批计算,也包含翻译到HSF接口代码,提供在线交互计算能力。

Flink的流批一体架构十分优良,能解决90%的流批一体问题,但可怜的是,咱们有些业务场景(典型的价格计算场景),远不是Flink写写SQL能够解决的:整个电商是个非常复杂的业务体系,就以我所处在的营销域里,就要面对招商模型、投放模型、流动模型、权利等等,这些都远不是一个繁多零碎能够承接的,也不是一个繁多团队能够承接的,阿里针对这样简单的业务,设计了HSF这样微服务电商架构,然而Flink和电商这样的Java技术栈显著是割裂的,怎么联合两种体系架构,一方面施展电商的微服务架构的红利,一方面又能利用到Flink的SQL流批处理能力,思考到Flink自身的局限性,与其让Flink反对HSF,不如让Java环境反对Flink SQL,换句话说,设计一个SQL引擎,它能通过sql的流解决形式,解决Java对象,将流引擎嵌入到Java里,随调随用。Flink的批引擎,在面对T级离线数据批量解决,是十分耗资源,简直不可用,正如下面所讲的,Flink批处理的吞吐量远不迭ODPS的MR批处理,那么,咱们为何不让这样计算依然交接给ODPS解决,然而,ODPS和Flink的SQL规范不统一,须要两端开发,当初问题变成:怎么对立ODPS和Flink的开发,说的再艰深点,咱们可不可以在ODPS和Flink下面架设一层对立Unify SQL,这个SQL引擎能够翻译成ODPS或者Flink能了解的解决(ODPS翻译成MR程序,Flink翻译成Stream Operator)形式,抹平ODPS和Flink的SQL语义差异。如果仅仅是抹平ODPS和Flink的SQL差别,带来的收益其实并不大,然而其对立SQL表白计算的设计,是能够进一步扩宽其利用范畴,比方在线交互计算,或者说,咱们能够进一步打造对立计算引擎,包含编排不同模式的计算能力,比方:有些场景对时效要求比拟高,咱们能够调度Flink计算,对时效没有要求,但数据量微小,能够只调度离线计算,有些须要提供HSF接口,就调度利用启动spring接口。Unify SQL Engine 淘系价格计算引擎,以Flink + Kappa为外围的数据架构,对于这种数据架构演进,能够参考我其余文章,三种计算模式的叠加是价格服务计算引擎的常态模式,他们都在各自外围计算施展本人最大的劣势:ODPS:离线批量计算引擎,外围劣势,十分高的计算吞吐量,但时效差,有面向MR和SQL编程模式,业务和BI敌对,次要用在数据预处理、离线特色加工、常见维表ETL等。Flink:流式解决引擎,外围劣势,低提早计算,时效好,极高的容错和高可靠性,但吞吐量相比ODPS个别,有面向Stream API和SQL API的编程模式,业务和BI敌对,次要用在实时数据加工(优惠、订单等)、音讯预处理等Java计算:外围劣势,丰盛的电商Java HSF接口,简单的畛域模型,面向对象设计,开发敌对,然而业务和BI不敌对,容错和可靠性依赖开发设计,提早和吞吐量也高度依赖开发设计。 那么如何整合这3个不同计算架构,Flink提出一个引擎承接所有计算模式,也就是Flink的流批一体引擎,但这带来的问题就是,不同计算模式,底层的引擎自身就很难齐全周全到,与其去对立计算引擎,为何不对立表白和调度,而把真正的计算下放到各自计算引擎,这就是Unify Engine的核心思想。 ▐  SQL引擎技术 在实现三端一体化时候,有个核心技术难点,就是SQL引擎,很多数据产品都自带本人的SQL引擎,Flink外部有SQL引擎,ODPS外部有C++实现的SQL引擎,Hive也有,Mysql外部也有SQL解析引擎,这些SQL引擎都高度集成到各自的存储和计算里,如果你说要找个独立的可用在Java环境的SQL引擎,市面上有是有,不过要么是非常复杂的calcite sql引擎,要么是非常简单的select * 繁难sql引擎,能做的事件非常少,开箱即用的简直没有。但Unify SQL引擎又是实现三端一体化的外围组件,没有它,其余什么事件都无从谈起。从无设计一个SQL引擎老本是十分高的,其中不说简单的语法解析,生成AST语法树,就单单SQL逻辑打算优化,就是非常复杂,侥幸的是,业界是存在一个能够二次开发的SQL引擎,就是calcite SQL引擎,其实,很多SQL引擎都是基于calcite二次开发的,比方Flink、Spark外部的SQL解析引擎就是基于calcite二次开发的,咱们设计的SQL引擎也是基于calcite的。Calcite 应用了基于关系代数的查问引擎,聚焦在关系代数的语法分析和查问逻辑的布局,通过calcite提供的SQL API(解析、验证等)将它们转换成关系代数的形象语法树,并依据肯定的规定或老本预计对AST关系进行优化,最初进一步生成ODPS/Flink/Java环境能够了解的执行代码。calcite的次要性能:SQL解析:Calcite的SQL解析是通过JavaCC实现,应用JavaCC变成SQL语法形容文件,将SQL解析成未经校验(unvalided AST)的AST语法树。SQL校验:无状态校验,即验证SQL语句是否符合规范;有状态校验,通过和元数据验证SQL的schema,字段,UDF是否存在,以及类型是否匹配等。这一步生成的是未经优化的RelNode(逻辑打算树)SQL查问优化:对下面步骤的输入(RelNode),进行优化,这一过程会循环应用优化器(RBO规定优化器和CBO老本优化器),在放弃语义等价的根底上,生成执行老本最低的SQL逻辑树(Lo)至于calcite的比拟具体的原理,能够详解:Apache Calcite 解决流程详解(地址:https://xie.infoq.cn/article/...),这里不详解了。有了calcite,解决了SQL->逻辑树,然而真正执行SQL计算的,还须要进一步将逻辑数转换成物理执行树(Physical Exec DAG),在这个DAG,是蕴含可执行的Java代码(JavaCode)片段,最初下发到不同执行环境,会被进一步串联可被环境执行的链路,比方在ODPS环境,会生成MR代码,在Flink环境,会被转换成Stream Operator,在Java环境,会被转换成CollectorChain,在Spring环境,会被转换成Bean组件。

PS:如果你们看过Flink源码,对下面流程会十分眼生,是的,Unify SQL Engine不是从头设计的,是基于Flink 1.12源码魔改的,其中Parse和上面要说的Codegen技术都是间接参考了Flink设计,当然说是魔改的,就是还有大量代码须要基于下面做二次开发,比方从执行DAG到各个环境真正可执行的MR/Bean/Stream。▐  Codegen技术 在SQL解析后,通过逻辑优化器和物理优化器,产生的PhyscialRel物理打算树,蕴含大量的简单数据逻辑解决,比方SQL常见的CASE WHEN语句,常见的做法是给所有符号运算定义个父类(比方ExecNode),理论运行时,委派给实在的子类运行,这波及到大量虚构函数表的搜查,最终这种分支指令肯定水平阻止指令的管道化和并行执行,导致这种搜查老本比函数自身执行老本还高。 Codegen技术就是专门针对这样的场景孕育而生,行业做的比拟杰出的Codegen技术,有LLVM和Janino,LLVM次要针对编译器,而Java的代码codegen通常应用Janino,Janino做为一种玲珑疾速的Java编译器,不仅能像Javac将一组java文件编译成Class文件,也能够将Java表达式、语句块、类定义块或者Java文件进行编译,间接加载成ByteCode,并在同一个JVM里进行运行。 Unify SQL Engine也应用Janino用来做CodeGen技术,并无效地晋升代码的执行效率。对于Janino更多内容,能够参考这篇文章:Java CodeGen编译器Janino(地址:https://zhuanlan.zhihu.com/p/...)。这里有采纳Codegen和不采纳Codegen的技术性能比照: 表达式100*x+20/2(x+y)(xx+y)/(x-y)100/(xy)Node树遍历执行10ms88msJanino生成代码执行6ms9ms 能够看出当表达式越简单时,应用Janino的成果就会体现越显著。 ▐  有状态计算 通常计算分为无状态计算和有状态计算,无状态计算个别是过滤、project映射,其每次计算依赖以后数据上下文,互相独立的,不依赖前后数据,因而,不须要有额定的存储保留两头计算结果或者缓存数据,但还有一类是有状态计算,除了以后数据上下文,还须要依赖之前计算的两头态数据,典型的比方:sum求和:须要有存储保留以后求和的后果,当有新的数据过去,联合以后两头后果根底上累加去重:去掉之前反复呈现的数据,须要保留之前曾经解决过哪些数据,而后有新的数据须要计算,要和保留的数据比拟是否反复排序:须要有存储保留之前排好的数据,当有新的数据过去,会变更之前的排序后果,并diff后,将从新排序后有变动的数据从新发到上游

可见,当须要进行有状态计算,须要有后背存储来承载中间状态后果,Unify SQL Engine是反对3种后背存储:内存、Redis和Hbase:内存State是只保留到内存,一旦重新启动,就失落历史数据,内存State通常用在单机有状态计算,并且容忍数据失落。个别用在ODPS的MR程序里,因为一次MR调用状态计算,只须要以后执行上下文的累计后果,不须要放在全局缓存,不同批次之间的累计是通过MR API之间传输,内存State齐全够用。Redis:对于须要跨多机状态计算,就会用到Redis作为后背存储,Unify SQL Engine在Java环境里默认是应用这个作为后背存储。Redis后备存储个别用在Java计算环境,数据会流通过不同生产机器,计算的两头后果须要全局可见。Hbase:如果状态数据超过100G,能够抉择Hbase做为后背存储,性能尽管比不上Redis,但状态能够保留很长时间,对于长周期的状态计算十分有用。 ▐  JOIN语义  Flink是能够反对双流Join,然而Flink的双流Join的语义齐全照搬了SQL的JOIN语义,就是一边的数据会和另一边的所有数据JOIN,这个对于离线剖析没有任何问题,然而对于实时计算是会存在反复计算,在有些场景还有损业务逻辑,比方:当订单流去双流JOIN优惠表的时候,就会呈现这个问题,优惠表的数据是会不停变动的,然而咱们心愿以快照数据做为JOIN的根据,而不是把优惠变更的数据都复现一遍,Unify SQL Engine是做到后者语义的,也就是SNAPSHOT JOIN,也是业务场景常见的语义:

一些想法 ▐  对立调度  Unify SQL Engine当初曾经能够做到将SQL翻译成不同执行环境可运行的工作,通过Unify SQL对立表白了不同环境的逻辑计算,然而离最终咱们冀望的还很远,其中一点就是要做到对立调度和调配,当初不同环境的协调是须要开发者本人去调配和调度,比方哪些计算须要下发到ODPS MR计算,哪些是在Java环境运行,将来咱们心愿这些调配也是能够做到对立调度和运行,包含全量和增量计算的主动协同,离线和在线数据协同等 ▐  资源老本  通过Unify SQL Engine,开发者能够本人抉择底层的计算引擎,对于数据量较大但对时效要求不高的场景,能够抉择在ODPS计算,对于时效有要求同时数据规模可承受内,能够抉择在Flink调度,对于计算逻辑简单,须要大量依赖HSF接口,能够抉择在Java环境启动,抉择本人最容易接受的资源和老本,承接其计算语义。 同时,也是心愿通过Unify SQL Engine最大化的利用计算资源,比方Java利用,很多状况下是闲暇状态的,CPU利用率是比拟低下的,比方一些流计算能够下发到这些闲暇的利用,并占用十分小的CPU(比方5%以内),整体的资源利用率就晋升了,还比方,Flink计算资源是比拟难申请,那么能够抉择在Java环境里计算(Java相比Flink环境不足一些个性,比方Exactly once语义)等等。原文链接:https://click.aliyun.com/m/10...本文为阿里云原创内容,未经容许不得转载。