简介:本文将介绍数仓建设过程中面对三种计算模式,较低的研发效率、不可控的数据品质,以及臃肿数据接口服务的窘境的解决方案。背景在漫长的数仓建设过程中,实时数仓与离线数仓别离由不同的团队进行独立建设,有大而广的离线数仓体系,也有须要谋求业务时效,须要建设实时数仓。然而,业务数据需要和数据产品需要中,往往须要把实时数据与离线数据联合在一起进行比对和剖析,然而这两个人造不一样的数据存储和计算构造,须要同时开发两套数据模型。在数据处理过程中,实时数仓须要应用 Blink/Flink 解决,离线须要写 ODPS SQL 解决,还有在线计算模型,须要开发 java 代码解决。
如上图所示,实时数据与离线数据在存储层、计算层、服务层,都是割裂拆散、独立建设的。实时数据,有着增量式计算的个性,须要疾速的流转与计算,它次要以 DataHub、Flink、Hbase 等异构零碎做为撑持,串联造成一个残缺实时计算全链路。而离线数据,是定时、批量计算的个性,由存储计算对立的 ODPS 零碎做为撑持。它们除了计算链路的差别,还有着数据处理的逻辑差别:流批处理不能复用,ODPS 和 Blink/Flink 的 SQL 规范不统一,他们的底层调度和数据处理逻辑也有基本的差异,一个是以 MR 作为外围的批处理形式,一个是以 Flink/Blink 为外围的流解决。有些流批处理场景须要调用 HSF 接口,调用 HSF 接口,在 Java Spring 环境里,是信手拈来的事件,但到了 ODPS/Flink 环境里,就变得额定的一个大挑战,甚至不可实现,因为在 ODPS/Flink 是无奈加载或者极难应用 Spring 容器的,这就会让开发在面对简单流解决场景,更偏向应用本人相熟的 Java 环境,但同时也象征失去 ODPS/Flink 那种贴近业务表白的 SQL 表白。计算解决除了流批处理,还更宽泛存在在线交互计算,这个和流解决异步解决还不太一样,是须要同步计算并返回后果,这通常是在 Java 环境开发 HSF 接口,但如果你要对外同时提供三种能力:流计算、批计算和在线计算的时候,就面临须要三端开发,流计算和批计算尚且还有 SQL,尽管不太统一,但至多大同小异,但在线交互计算就是须要纯 Java 开发了,将 SQL 翻译成 Java 代码可是个不小的挑战。Unify SQL VS 流批一体面对三种计算模式,较低的研发效率、不可控的数据品质,以及臃肿数据接口服务的窘境,三端(流计算、批计算、在线计算)一体计算的想法天然油然而生,在 20 年做价格力业务时候,我就始终思考有什么解法,其实,这个也是大数据架构常常面临的问题,业界达成共识,能够演绎两种计划:▐ 流批一体计算 同一个引擎承载流、批两种计算模式,在流计算模式下进行实时数据计算,在批计算模式下进行离线数据计算。
流批一体计算的典型架构是:Flink + Kappa 架构。Flink 能够实现基于 SQL 的流批一体的计算表白,简单计算通过 Java 利用承接,价格力计算架构就是典型这种架构,然而这种架构存在以下问题:没有解决在线交互计算 Flink 解决了流计算、批计算一体的能力,这两种都是异步解决,只是时效不同而已,但没有解决在线计算的能力,如果要提供在线计算能力,不得不在以下两个计划抉择:通过 Java 利用提供同步计算接口,这样就存在两套逻辑:一个是 Flink 实现的流批处理,另一个是 Java 实现的在线解决。提供两个接口,一个接口是发动计算申请,将计算申请交到 Flink 解决后,再提供一个轮训查问接口,查问计算好后的数据,这个计划至多在计算上做到一套代码,但这种同步转异步解决的计划势必会影响产品的设计。Flink 的批处理吞吐量 Flink 实现批处理,其实是有点两厢情愿,为啥这么说,因为其吞吐规模,跟 MR 批计算(ODPS)齐全不是一个量级,如果 Flink 真能实现和 ODPS 齐全对等的吞吐规模和资源老本,那齐全不须要 ODPS 什么事了,但事实是,对于一些只有批量解决场景的(比方特色预处理、统计计算),ODPS 依然是第一优先选择,只有当面临流批同时存在的场景时候,并且对批处理规模要求不大时候,Flink 确实提供十分不错的一体化解决方案。▐ Unify SQL 同一 SQL 代码通过自动化本义,翻译到流计算引擎和批计算引擎上进行流、批计算,也包含翻译到 HSF 接口代码,提供在线交互计算能力。
Flink 的流批一体架构十分优良,能解决 90% 的流批一体问题,但可怜的是,咱们有些业务场景(典型的价格计算场景),远不是 Flink 写写 SQL 能够解决的:整个电商是个非常复杂的业务体系,就以我所处在的营销域里,就要面对招商模型、投放模型、流动模型、权利等等,这些都远不是一个繁多零碎能够承接的,也不是一个繁多团队能够承接的,阿里针对这样简单的业务,设计了 HSF 这样微服务电商架构,然而 Flink 和电商这样的 Java 技术栈显著是割裂的,怎么联合两种体系架构,一方面施展电商的微服务架构的红利,一方面又能利用到 Flink 的 SQL 流批处理能力,思考到 Flink 自身的局限性,与其让 Flink 反对 HSF,不如让 Java 环境反对 Flink SQL,换句话说,设计一个 SQL 引擎,它能通过 sql 的流解决形式,解决 Java 对象,将流引擎嵌入到 Java 里,随调随用。Flink 的批引擎,在面对 T 级离线数据批量解决,是十分耗资源,简直不可用,正如下面所讲的,Flink 批处理的吞吐量远不迭 ODPS 的 MR 批处理,那么,咱们为何不让这样计算依然交接给 ODPS 解决,然而,ODPS 和 Flink 的 SQL 规范不统一,须要两端开发,当初问题变成:怎么对立 ODPS 和 Flink 的开发,说的再艰深点,咱们可不可以在 ODPS 和 Flink 下面架设一层对立 Unify SQL,这个 SQL 引擎能够翻译成 ODPS 或者 Flink 能了解的解决(ODPS 翻译成 MR 程序,Flink 翻译成 Stream Operator)形式,抹平 ODPS 和 Flink 的 SQL 语义差异。如果仅仅是抹平 ODPS 和 Flink 的 SQL 差别,带来的收益其实并不大,然而其对立 SQL 表白计算的设计,是能够进一步扩宽其利用范畴,比方在线交互计算,或者说,咱们能够进一步打造对立计算引擎,包含编排不同模式的计算能力,比方:有些场景对时效要求比拟高,咱们能够调度 Flink 计算,对时效没有要求,但数据量微小,能够只调度离线计算,有些须要提供 HSF 接口,就调度利用启动 spring 接口。Unify SQL Engine 淘系价格计算引擎,以 Flink + Kappa 为外围的数据架构,对于这种数据架构演进,能够参考我其余文章,三种计算模式的叠加是价格服务计算引擎的常态模式,他们都在各自外围计算施展本人最大的劣势:ODPS:离线批量计算引擎,外围劣势,十分高的计算吞吐量,但时效差,有面向 MR 和 SQL 编程模式,业务和 BI 敌对,次要用在数据预处理、离线特色加工、常见维表 ETL 等。Flink:流式解决引擎,外围劣势,低提早计算,时效好,极高的容错和高可靠性,但吞吐量相比 ODPS 个别,有面向 Stream API 和 SQL API 的编程模式,业务和 BI 敌对,次要用在实时数据加工(优惠、订单等)、音讯预处理等 Java 计算:外围劣势,丰盛的电商 Java HSF 接口,简单的畛域模型,面向对象设计,开发敌对,然而业务和 BI 不敌对,容错和可靠性依赖开发设计,提早和吞吐量也高度依赖开发设计。那么如何整合这 3 个不同计算架构,Flink 提出一个引擎承接所有计算模式,也就是 Flink 的流批一体引擎,但这带来的问题就是,不同计算模式,底层的引擎自身就很难齐全周全到,与其去对立计算引擎,为何不对立表白和调度,而把真正的计算下放到各自计算引擎,这就是 Unify Engine 的核心思想。▐ SQL 引擎技术 在实现三端一体化时候,有个核心技术难点,就是 SQL 引擎,很多数据产品都自带本人的 SQL 引擎,Flink 外部有 SQL 引擎,ODPS 外部有 C ++ 实现的 SQL 引擎,Hive 也有,Mysql 外部也有 SQL 解析引擎,这些 SQL 引擎都高度集成到各自的存储和计算里,如果你说要找个独立的可用在 Java 环境的 SQL 引擎,市面上有是有,不过要么是非常复杂的 calcite sql 引擎,要么是非常简单的 select * 繁难 sql 引擎,能做的事件非常少,开箱即用的简直没有。但 Unify SQL 引擎又是实现三端一体化的外围组件,没有它,其余什么事件都无从谈起。从无设计一个 SQL 引擎老本是十分高的,其中不说简单的语法解析,生成 AST 语法树,就单单 SQL 逻辑打算优化,就是非常复杂,侥幸的是,业界是存在一个能够二次开发的 SQL 引擎,就是 calcite SQL 引擎,其实,很多 SQL 引擎都是基于 calcite 二次开发的,比方 Flink、Spark 外部的 SQL 解析引擎就是基于 calcite 二次开发的,咱们设计的 SQL 引擎也是基于 calcite 的。Calcite 应用了基于关系代数的查问引擎,聚焦在关系代数的语法分析和查问逻辑的布局,通过 calcite 提供的 SQL API(解析、验证等)将它们转换成关系代数的形象语法树,并依据肯定的规定或老本预计对 AST 关系进行优化,最初进一步生成 ODPS/Flink/Java 环境能够了解的执行代码。calcite 的次要性能:SQL 解析:Calcite 的 SQL 解析是通过 JavaCC 实现,应用 JavaCC 变成 SQL 语法形容文件,将 SQL 解析成未经校验(unvalided AST)的 AST 语法树。SQL 校验:无状态校验,即验证 SQL 语句是否符合规范;有状态校验,通过和元数据验证 SQL 的 schema,字段,UDF 是否存在,以及类型是否匹配等。这一步生成的是未经优化的 RelNode(逻辑打算树)SQL 查问优化:对下面步骤的输入(RelNode),进行优化,这一过程会循环应用优化器(RBO 规定优化器和 CBO 老本优化器),在放弃语义等价的根底上,生成执行老本最低的 SQL 逻辑树(Lo)至于 calcite 的比拟具体的原理,能够详解:Apache Calcite 解决流程详解(地址:https://xie.infoq.cn/article/…),这里不详解了。有了 calcite,解决了 SQL-> 逻辑树,然而真正执行 SQL 计算的,还须要进一步将逻辑数转换成物理执行树(Physical Exec DAG),在这个 DAG,是蕴含可执行的 Java 代码(JavaCode)片段,最初下发到不同执行环境,会被进一步串联可被环境执行的链路,比方在 ODPS 环境,会生成 MR 代码,在 Flink 环境,会被转换成 Stream Operator,在 Java 环境,会被转换成 CollectorChain,在 Spring 环境,会被转换成 Bean 组件。
PS:如果你们看过 Flink 源码,对下面流程会十分眼生,是的,Unify SQL Engine 不是从头设计的,是基于 Flink 1.12 源码魔改的,其中 Parse 和上面要说的 Codegen 技术都是间接参考了 Flink 设计,当然说是魔改的,就是还有大量代码须要基于下面做二次开发,比方从执行 DAG 到各个环境真正可执行的 MR/Bean/Stream。▐ Codegen 技术 在 SQL 解析后,通过逻辑优化器和物理优化器,产生的 PhyscialRel 物理打算树,蕴含大量的简单数据逻辑解决,比方 SQL 常见的 CASE WHEN 语句,常见的做法是给所有符号运算定义个父类(比方 ExecNode),理论运行时,委派给实在的子类运行,这波及到大量虚构函数表的搜查,最终这种分支指令肯定水平阻止指令的管道化和并行执行,导致这种搜查老本比函数自身执行老本还高。Codegen 技术就是专门针对这样的场景孕育而生,行业做的比拟杰出的 Codegen 技术,有 LLVM 和 Janino,LLVM 次要针对编译器,而 Java 的代码 codegen 通常应用 Janino,Janino 做为一种玲珑疾速的 Java 编译器,不仅能像 Javac 将一组 java 文件编译成 Class 文件,也能够将 Java 表达式、语句块、类定义块或者 Java 文件进行编译,间接加载成 ByteCode,并在同一个 JVM 里进行运行。Unify SQL Engine 也应用 Janino 用来做 CodeGen 技术,并无效地晋升代码的执行效率。对于 Janino 更多内容,能够参考这篇文章:Java CodeGen 编译器 Janino(地址:https://zhuanlan.zhihu.com/p/…)。这里有采纳 Codegen 和不采纳 Codegen 的技术性能比照:表达式 100*x+20/2(x+y)(xx+y)/(x-y)100/(xy)Node 树遍历执行 10ms88msJanino 生成代码执行 6ms9ms 能够看出当表达式越简单时,应用 Janino 的成果就会体现越显著。▐ 有状态计算 通常计算分为无状态计算和有状态计算,无状态计算个别是过滤、project 映射,其每次计算依赖以后数据上下文,互相独立的,不依赖前后数据,因而,不须要有额定的存储保留两头计算结果或者缓存数据,但还有一类是有状态计算,除了以后数据上下文,还须要依赖之前计算的两头态数据,典型的比方:sum 求和:须要有存储保留以后求和的后果,当有新的数据过去,联合以后两头后果根底上累加去重:去掉之前反复呈现的数据,须要保留之前曾经解决过哪些数据,而后有新的数据须要计算,要和保留的数据比拟是否反复排序:须要有存储保留之前排好的数据,当有新的数据过去,会变更之前的排序后果,并 diff 后,将从新排序后有变动的数据从新发到上游
可见,当须要进行有状态计算,须要有后背存储来承载中间状态后果,Unify SQL Engine 是反对 3 种后背存储:内存、Redis 和 Hbase:内存 State 是只保留到内存,一旦重新启动,就失落历史数据,内存 State 通常用在单机有状态计算,并且容忍数据失落。个别用在 ODPS 的 MR 程序里,因为一次 MR 调用状态计算,只须要以后执行上下文的累计后果,不须要放在全局缓存,不同批次之间的累计是通过 MR API 之间传输,内存 State 齐全够用。Redis:对于须要跨多机状态计算,就会用到 Redis 作为后背存储,Unify SQL Engine 在 Java 环境里默认是应用这个作为后背存储。Redis 后备存储个别用在 Java 计算环境,数据会流通过不同生产机器,计算的两头后果须要全局可见。Hbase:如果状态数据超过 100G,能够抉择 Hbase 做为后背存储,性能尽管比不上 Redis,但状态能够保留很长时间,对于长周期的状态计算十分有用。▐ JOIN 语义 Flink 是能够反对双流 Join,然而 Flink 的双流 Join 的语义齐全照搬了 SQL 的 JOIN 语义,就是一边的数据会和另一边的所有数据 JOIN,这个对于离线剖析没有任何问题,然而对于实时计算是会存在反复计算,在有些场景还有损业务逻辑,比方:当订单流去双流 JOIN 优惠表的时候,就会呈现这个问题,优惠表的数据是会不停变动的,然而咱们心愿以快照数据做为 JOIN 的根据,而不是把优惠变更的数据都复现一遍,Unify SQL Engine 是做到后者语义的,也就是 SNAPSHOT JOIN,也是业务场景常见的语义:
一些想法 ▐ 对立调度 Unify SQL Engine 当初曾经能够做到将 SQL 翻译成不同执行环境可运行的工作,通过 Unify SQL 对立表白了不同环境的逻辑计算,然而离最终咱们冀望的还很远,其中一点就是要做到对立调度和调配,当初不同环境的协调是须要开发者本人去调配和调度,比方哪些计算须要下发到 ODPS MR 计算,哪些是在 Java 环境运行,将来咱们心愿这些调配也是能够做到对立调度和运行,包含全量和增量计算的主动协同,离线和在线数据协同等 ▐ 资源老本 通过 Unify SQL Engine,开发者能够本人抉择底层的计算引擎,对于数据量较大但对时效要求不高的场景,能够抉择在 ODPS 计算,对于时效有要求同时数据规模可承受内,能够抉择在 Flink 调度,对于计算逻辑简单,须要大量依赖 HSF 接口,能够抉择在 Java 环境启动,抉择本人最容易接受的资源和老本,承接其计算语义。同时,也是心愿通过 Unify SQL Engine 最大化的利用计算资源,比方 Java 利用,很多状况下是闲暇状态的,CPU 利用率是比拟低下的,比方一些流计算能够下发到这些闲暇的利用,并占用十分小的 CPU(比方 5% 以内),整体的资源利用率就晋升了,还比方,Flink 计算资源是比拟难申请,那么能够抉择在 Java 环境里计算(Java 相比 Flink 环境不足一些个性,比方 Exactly once 语义)等等。原文链接:https://click.aliyun.com/m/10… 本文为阿里云原创内容,未经容许不得转载。