本篇论文是Facebook 2019年发表介绍Presto的综述类论文,本篇论文从Presto的应用示例、架构、零碎设计等几个方面零碎的介绍了Presto的内核和实现原理,对于通识性的理解Presto有肯定帮忙。
注:本篇论文中所介绍的Presto版本是0.211版本,过后Presto还没决裂出PrestoDB和PrestoSQL。
一、Presto介绍
Presto作为一个分布式查问引擎,于2013年开始就曾经在Facebook的生产环境中应用。并且现在曾经在Uber、Netflix、Airbnb、Bloomberg以及LinkedIn这样的大公司中应用。
Presto具备自适应、灵便以及可扩大等个性。Presto提供了规范的ANSI SQL接口来查问存储于各零碎中的数据,如Hadoop、RDBMS、NoSQL数据库中的数据,以及Kafka这样的流式组件中的数据(Presto中内置了十分多的connectors供用户应用)。Presto对外提供了开放式的HTTP API、提供对JDBC的反对并且反对商业规范的BI的查问工具(如Tableau)。其内置的Hive connector源生反对对HDFS或Amazon S3上的文件进行读写,并且反对多种风行的开源文件格式,包含ORC、Parquet以及Avro。
二、Presto在Facebook的应用示例
1.Interactive Analytics(交互式剖析)
Facebook内运行着一个宏大的多租户数据仓库,一些业务部门或个别团队会共享其中一小部分托管的集群。其数据存储在一个分布式文件系统之上,而元数据则存储在独自的服务中,这些零碎别离具备HDFS和Hive Metastore服务相似的API。
Facebook的工程师常常会检索大量的数据(50GB-3TB的压缩数据),用来验证假如,并构建可视化的数据展板。这些用户通常会应用查问工具、BI工具或Jupyter notebooks来进行查问操作。各个群集须要反对50-100的并发查问能力,并且对查问响应工夫十分敏感。而对于某些探索性的查问,用户可能并不需要获取所有的查问后果。通常在返回初始后果后,查问就会被立刻勾销或者用户会通过LIMIT来限度零碎返回的后果。
2.Batch ETL (批量ETL)
下面咱们介绍到的数据仓库会应用ETL查问工作定期填充新的数据。查问工作通常是通过一个工作流零碎顺次调度执行的。Presto反对用户从历史遗留的批处理零碎迁徙ETL工作,目前ETL查问工作在Facebook的Presto工作负载中占了很大一部分。这些查问通常是由数据工程师开发并优化的。绝对于Interactive Analytics中波及的查问,它们通常会占用更多的硬件资源,并且会波及大量的CPU转换和内存(通常是数TB的分布式内存)密集型的计算,例如大表之间的join及聚合。因而绝对于资源利用率以及集群吞吐量来说,查问提早不是首要关注的。
3.A/B Testing (A/B测试)
Facebook应用A/B测试,通过统计假设性的测试来评估产品变更带来的影响。在Facebook大量的A/B测试的基础架构是基于Presto构建的。用户冀望测试后果能够在数小时之内出现(而不是数天),并且后果应该是准确无误的。对于用户来说,可能在交互式提早的工夫内(5~30s),对后果数据进行任意切分来取得更深刻的见解同样重要。而通过预处理来聚合这些数据往往很难满足这一需要,因而必须得实时计算。生成这样的后果须要关联多个大型数据集,包含用户、设施、测试以及事件属性等数据。因为查问是通过编程形式实现的,所以查问须要被限度在较小的汇合内。
4.Developer/Advertiser Analytics(开发者/广告主剖析)
为内部开发者和广告客户提供的几种自定义报表工具也都是基于Presto构建的。Facebook Analytics就是其中一个理论案例,它为应用Facebook平台构建应用程序的开发人员提供了高级的剖析工具。这些工具通常对外开放一个Web界面,该界面能够生成一组受限的查问模型。查问须要聚合的数据量是十分大的,然而这些查问是有目的性的,因为用户只能拜访他们的应用程序或广告的数据。大部分的查问包含连贯、聚合以及窗口函数。因为这些工具是交互式的,因而有十分严格的查问提早限度(约50ms~5s)。鉴于用户的数量,集群须要达到99.999%的高可用,并且反对数百个并发查问。
三、Presto架构概览
一个Presto集群须要由一个Coordinator以及一个或多个Worker节点组成。Coordinator次要负责接管查问申请、解析语句、生成打算、优化查问以及查问调度。Worker节点次要负责查询处理。如下所示的即为Presto架构:
整体的执行流程能够简述如下:
客户端向Coordinator发送一个蕴含sql的http申请。Coordinator接管到这个申请,会通过评估队列策略,解析和剖析sql文本,创立和优化分布式执行打算来解决申请。
Coordinator将执行打算分发给Worker节点,接着Worker节点开始启动tasks并且开始枚举splits,而这些splits是对外部存储系统中可寻址的数据块的一种费解解决。Splits会被调配给那些负责读取数据的tasks。
Worker节点运行这些tasks来解决从内部存储系统获取的splits,以及来自于其余Worker节点解决过得两头数据。Worker节点之间通过多任务单干机制来并发解决来自不同查问的tasks。工作尽可能的以流水线的形式来执行,这使得数据能够在tasks之间进行流动。对于某些特定的查问,Presto可能在解决完所有数据之前就返回后果。两头数据以及状态会尽可能地存储在内存中。当在节点之间对数据进行shuffle时,Presto会调整缓冲区来达到最小化的提早。
Presto被设计成可扩大的,并提供通用的插件接口。插件能够提供自定义的数据类型、函数、访问控制、事件监听策略、排队策略以及属性配置。更重要的是插件还提供了connector,这使得Presto能够通过Connector API和内部的存储系统进行通信。Connector API次要蕴含如下四个局部:Metadata API、Data Location API、Data Source API以及Data Sink API。这些API能够帮忙在分布式查问引擎中实现高性能的connectors,开发者曾经为Presto社区提供了数十个connector,而且咱们也留神到了一些专有的connectors。
四、Presto零碎设计
1.SQL Dialect(SQL反对)
Presto采纳了规范的ANSI SQL标准 ,具备肯定的扩大能力,例如反对maps and arrays, 反对anonymous functions (lambda expressions) ,反对高阶函数 higher-order functions。
2.Client Interfaces, Parsing, and Planning(接口、语法解析和逻辑打算)
Presto的Coordinator基于RESTful HTTP提供命令行接口并反对JDBC。基于ANTLR的解析器把sql转为相应的语法树。logical planner把AST形象语法树变为逻辑执行打算,叶子节点是input。例如上面的SQL转化为如图的逻辑执行打算。
SELECT orders.orderkey, SUM(tax)FROM ordersLEFT JOIN lineitem ON orders.orderkey = lineitem.orderkeyWHERE discount = 0GROUP BY orders.orderkey
如上生成查问的逻辑打算如下所示:
3.Query Optimization(查问优化)
打算优化器会将逻辑打算转换成可能示意无效执行策略的物理构造。转换过程是通过RBO(Rule-Based Optimization:基于规定的优化器)做plan nodes的等价变换,较常见的规定包含predicate and limit pushdown(查问下推)、column pruning(列剪枝)以及decorrelation(去相关性)。基于CBO(Cost-Based Optimization:基于代价的优化器)也在加强,原理是利用Cascades framework在搜寻空间外面找到最优的打算,目前曾经实现的两类CBO是join办法的抉择(hash index,index join等)以及join reorder。
以下是对一些比拟重要的优化伎俩的列举:
3.1 Data Layouts(数据属性)
Connector提供了Data Layout API,优化器能够获取待扫描数据的地位信息以及分区、排序、分组和索引等信息。例如做工作散发的locality aware,partition pruning,sort pushdown,index join等。
3.2 Predicate Pushdown(查问下推)
范畴和等值查问的下推,能够更好的filter inputs。举例来说,在MySQL分片之上构建了专有的Connector。Connector将存储在Mysql实例上的数据划分成小的数据分片,并且依据范畴和点查,下推到指定的分片。
另外对于highly selective filters的查问非常适合查问下推,能够利用分区裁剪(partition pruning)和 file-format feature(比方min-max等粗糙集索引)缩小IO。
3.3 Inter-node Parallelism(节点间并行计算)
Plan node中蕴含了输入数据的各类信息(如分区、排序、分片及分组等数据个性),而后Plan node会在worker间并行执行Plan nodes组成的stages,stage物理上体现为多个tasks的并行执行,task就是雷同的算子,只是在不同的input data上。stage间通过插入exchange算子,利用buffer来替换数据,shuffle是计算和IO密集型的,因而怎么在stages间做shuffle十分重要。下图就是一个执行打算,靠shuffle串联起来。
3.4 Intra-node Parallelism(节点内并行执行)
单节点过程内同样能够并行,hash table和字典能够在线程外面shards分区并行,论文里提了两个use case,用节点内多线程并行能够减速计算,下图是一个例子,扫描数据和hash build都能够并行起来,靠local shuffle替换数据。
4.Scheduling(调度)
Coordinator通过散发可执行的task的形式,向Woker节点调配执行打算的各个stage,这些可执行的task能够被看作单个处理单元。接着,Coordinator将一个stage的task与其余stage的task通过shuffles相连接,从而造成一个树形的解决链路。只有各个stage都是可用的,数据就会在stage之间流转。
这里须要先了解Presto外面的stage、task、pipeline、driver的概念(能够参阅Presto官网文档)。Coordinator把plan stages分发给worker,stage只是形象的概念,worker内理论运行的被称作task,这是最小的执行单元。task有输出和输入,靠shuffle把上下游的task连接起来。比方在理论调试页面外面有0.x,1.x,0示意stage 0,x示意stage内task的并行度。
一个task能够蕴含1到多个pipeline,pipeline蕴含一系列operators,例如hash-join算子包含至多两个pipeline,build table pipeline,另外一个是流式的probe pipeline。优化器如果发现一个pipeline能够反对并行优化,那么就会把一个pipeline拆开,例如build pipeline能够拆成scan data和build partitions两个pipeline,节点内并行度能够不同。pipeline间接用local shuffle串联起来。driver在Query Execution中来介绍。
调度策略用于决定哪些stage该被执行,stage内多少个tasks并行调度起来。
4.1 Stage scheduling
在这一调度阶段,Presto反对两种调度策略:一次性调度(all-at-once)以及阶段调度(phased)。前者能够实现将所有的stage调度起来,实用于提早敏感的场景,例如上文中的A/B测试和广告主服务。后者的典型操作如hash-join,须要先build好,再启动probe端,比拟节约内存,实用于批处理场景。
4.2 Task Scheduling
Table scan stage的调度会思考网络拓扑和数据locality,比方share-nothing的部署模式下,须要有一部分worker和storage node co-located起来,Intermediate Stages能够执行在任何worker节点类型上。profling的结果表明table scan很多花在解压、decoding、filters、applying transformation,读数据到connector上,通过下面提到的节点内并行能够最小化wall time。在facebook,presto用share-storage模式部署,这时候网络往往最先成为瓶颈。为了补救计算存储拆散架构下的拜访提早高问题,倒退出了Raptor connector。
4.3 Split Scheduling
在table scan stage读取数据之前,须要先获取存储的split,比方file的门路和offset,分好split后才能够正式启动执行;Intermediate Stages则不同,随时能够执行。
5.Query Execution(查询处理)
5.1 Local Data Flow(本地数据流)
一旦data split调配给线程后,它会在driver中循环执行。Presto的driver中的循环比风行的Volcano递归迭代器模型更简单,但却提供了重要的性能。它更实用于多任务的合作解决,因为算子(operator)能够在线程生成之前就迅速进入已知的状态,而不是有限地阻塞上来。此外,driver能够在不须要额定的输出文件的状况下,通过挪动operator之间的数据使每个数据量子的执行效率达到最大化(如复原资源密集型的计算或爆炸性转换的计算)。循环的每次迭代都会在operators之间挪动数据驱使查问一直的执行。
drive循环中操作的数据单元称之为page,而一个page为某一列一串行值编码后的数据。Connector的Data Source API在传入一个data split之后会返回若干个page。drive中的循环会在operators之间挪动page数据,直到调度实现或operator无奈继续执行为止。
5.2 Shuffles
为了尽可能升高查问提早,最大化资源利用率,shuffle采纳了In-memory buffer shuffle,同时基于HTTP协定,上游worker通过long-polling向上游worker申请数据,实现了一种ACK(音讯确认)的机制,保证数据不丢不重,上游能够依照上游吞吐产出数据和清理数据。
对于shuffle的并发度Presto设计了一种监控机制,例如一旦output buffer过满,执行stall并且耗费内存,input buffer又较为闲暇,便会导致解决效率又有余。因而通过监控buffer的使用率,能够实现自行调控shuffle并行度,进而防止上述的极其景象,网络资源便能够在多个申请间进行均衡。
5.3 Writes
ETL是insert into select的工作模式,通常会写入其余表,connector data sink writer的并行度管制是自适应adaptive的,以此来实现最大化的写入吞吐。
6.Resource Management(资源管理)
Presto非常适合多租户应用,而要害的因素就在于它内置了一个细粒度资源管理零碎。这使得一个集群能够同时执行数百个查问,并最大水平地利用CPU,IO和内存资源。
6.1 CPU Scheduling(CPU调度)
上文中已经介绍过Presto要反对的特点是adaptive,最大化集群资源利用率,多个queries间要做偏心调度(fair sharing),这样能够让短查问也有机会执行。
一个worker外面有多个task须要执行,task执行分片粒度叫做quanta(maximum quanta of one second),一个task执行完quanta周期后,会放回queue外面期待再次被调度。如果output buffer已满,或者input buffer闲暇,那么会提前退出,不等一个周期的quanta。调度计划采纳多级反馈队列(multi-level feedback queue,5级),执行总工夫越长的task会放到更高级别的队列中。Presto应用一个外部的yield指令来做task间的切换(集体了解,即便看起来context switch,然而非os级别的,有点协程的轻量级个性,使得集群能够multi-tenant的服务,短查问也有工夫高优实现)。cpu运行工夫越少的task会被优先调度,这样能够确保短查问能更快被执行完,而长查问自身对于提早也不是十分敏感。
6.2 Memory Management(内存治理)
Presto的Memory pool(内存池)分两类,user or system memory(用户内存), 以及reserve memory(预留内存)。
reserve memory个别寄存shuffle buffers这些和用户查问无关的数据。这二者的内存都有管制,超过限度的阈值查问就会被kill掉。个别依据查问的pattern来布局集群规模和内存。对于超过内存限度的查问,Presto有两种应答办法,spilling:对于大join和agg计算,spill to disk。不过在Facebook生产环境不会应用spill to disk,因为太过影响性能,集群会被prevision成足够的规模,在内存计算和shuffle。
reserved pools:如果节点内存不足,并且集群没有配置为Spilling,或没有残余的可撤销内存,reserved pool机制能够保障集群不被阻塞。每个节点上的查问内存池会进一步被细分为两个池:general pool和reserved pool。当Worker节点上的general pool耗尽时,那么在worker节点上的占用内存最多的那查问会在整个集群中被晋升到reserved pool中。在这种状况下,该查问所耗费的是reserved pool中的内存,而不在是general pool中的内存。为了防止死锁,在整个集群中同时只有一个查问能够在reserved pool中执行。如果Worker节点上的general pool内存已用完,而reserved pool也曾经被占用,那么该Worker节点上其余task的所有内存相干的申请将被阻塞。运行在reserved pool中的查问会始终占用该pool直到其执行实现,这个时候,群集将进行阻塞之前所有未实现的内存申请。这在某种程度上看起来有点节约,因而必须正当的调整每个Worker节点上reserved pool的大小,以满足在本地内存限度下运行查问。当某个查问阻塞了大部分的Worker节点时,集群也能够配置成kill掉这个查问。
7.Fault Tolerance(容错)
Presto能够通过low-level(低级别)的retry(重试)从长期的谬误中复原。然而,一旦Coordinator或Worker节点产生了解体,没有任何内置的容灾措施能够解救。Coordinator的故障将导致集群的不可用,而Worker节点的解体将导致该节点上所有正在执行的查问失败。目前,对于这样的谬误,Presto须要依附Client端从新提交失败的查问来解决。
目前,对于上问题的容错解决在Facebook的生产环境中是通过额定的机制保障某些特定场景下的高可用。在Interactive Analytics 和Batch ETL 的案例中运行着一个备用的Coordinator,而在A/B Testing以及Developer/Advertiser Analytics 的案列中,运行着多个集群来保障高可用。内部监控零碎会辨认那些产生异样故障数量的节点并将它们从集群中剔除,而被修复的节点会重新加入集群。以上的措施都是在某种程度上升高服务不可用的工夫,然而无奈齐全屏蔽故障的产生。
罕用的check pointing 以及局部复原机制通常是十分耗费计算资源的,并且很难在即席查问零碎中实现。基于复制策略的容错机制通常也是相当耗资源的。思考到这样的老本开销,这种技术的预期价值还尚不明确,尤其是在思考到节点均匀故障工夫时,如在Batch ETL的案列中,集群的节点数达到了数千个并且大部分的查问都是在数小时之内实现的。在其余的钻研中也得出过相似的论断。
五、查问优化
1.Working with the JVM(应用JVM)
Presto是通过java实现的,并且运行在Hotspot Java虚拟机上,一些性能敏感的计算例如压缩,checksum能够用非凡的指令和优化。JIT(JVM即时编译器)能够将字节码runtime的优化为machine code,例如inlining, loop unrolling, and intrinsics(一些native代码的调用),同时也在摸索用GraalVM来优化。
Java的实现须要非常重视GC(垃圾收集),Presto采纳了G1 GC,同时为了加重GC的累赘,防止创立humongous大对象,flat memory arrays to reduce reference and object counts and make the job of the GC easier。同时因为G1须要保护对象汇合的构造(remembered sets tructures),所以大型和高度关联的对象图可能会存在一些问题。查问执行门路上的关键步骤的数据结构是通过扁平化的内存数组来实现的,目标就是为了缩小援用以及对象数量,从而使GC变得绝对轻松一点。例如,在HISTOGRAM聚合中,会在一个扁平的数组或哈希表中存储所有组中的bucket keys(桶键)以及对象计数,而不是为每一个histogram保护独立的对象。
2.Code Generation(代码生成)
引擎的次要性能特色之一,就是生产JVM字节码。这有如下两种表现形式:
Expression Evaluation(表达式求值):Presto所提供的表达式解释器能够将表达式计算编译成java代码,从而减速表达式的求值。
Targeting JIT Optimizer Heuristics(针对JIT的优化器):Presto会为一些要害算子(operators)和算子组合生成字节码。字节码生成器利用引擎在计算语义方面的劣势来生成更易于JIT优化器进行优化的字节码。防止quanta做task工夫片切换对JIT的影响,防止类型推导以及虚函数调用,JIT也会进一步适配数据的变动。
生成的字节码还得益于内联所带来的二次效应。JVM可能扩充优化范畴,主动向量化大部分的计算,并且能够利用基于频率的基本块布局来最大水平地缩小分支。这样使得CPU分支预测也变得更加无效。字节码生成进步了引擎将两头后果存储在CPU寄存器或CPU缓存中而不是内存中的能力。
3.File Format Features(文件格式的个性)
扫描算子(Scan operator)应用叶子阶段split的信息来调用Connector API,并以Pages的模式接管列数据。一个page由一个block列表组成,而每个block是具备扁平内存示意模式的列。应用扁平内存的数据结构对性能十分重要,尤其是对那些简单的数据类型。在紧凑的循环体内,指针跟踪,拆箱和虚构办法调用都减少了大量的开销。
而这类connectors会尽可能的应用特定的文件格式。Presto配有自定义的reader,能够通过应用文件页眉/页脚中的统计信息(如页眉中保留的最小-最大范畴和布隆过滤器),来高效地过滤数据。Reader能够间接将某些模式的压缩数据间接读成blocks,从而让引擎能够无效地对其进行解决。
下图显示了一个page中列的布局形式,其中每一列都有其对应的编码方式。字典编码的块(DictionaryBlock)在压缩数据的低基数局部十分高效,游程编码的块(run-length encoded block,RLEBlock)对反复的数据进行压缩。多个page能够共享一个字典,这能够大大提高内存利用率。ORC文件中的一列能够为整个“stripe”(最多数百万行),应用单个字典。
4.Lazy Data Loading(数据的懒加载)
Presto反对数据的惰性物化(lazy materialization)。此性能能够利用ORC,Parquet和RCFile等文件格式的列压缩个性。Connector能够生成惰性的blocks,仅当理论拜访时才读取,解压缩和解码数据。
5.Operating on Compressed Data(对压缩数据的解决)
Presto所采取的第一种解决办法是在压缩数据上间接进行计算。如上图中的构造,当page处理器在计算转换或过滤时遇到dictionary block,它将解决 dictionary 内的所有的值(或RLE的块中的单个值)。这容许引擎以疾速的无条件循环解决整个dictionary。在某些状况下,dictionary中存在的值多于block中的行。在这种场景下,page处理器揣测未援用的值将在后续的block中应用。Page处理器会继续跟踪产生的理论行数和dictionary的大小,这有利于比照解决索引和解决dictionary的效率。如果行数大于dictionary的大小,则解决dictionary的效率可能更高。当page处理器在block序列中遇到了新的dictionary时,它会应用这种启发式的办法确定是否持续揣测。
Presto所采取的第二种解决办法是在用字典值代替数据自身进行计算。在构建哈希表(如联接或聚合)时,Presto还会利用dictionary block构造。在解决索引时,operator会在数组中记录每个dictionary entry(字典条目)在哈希表中的地位。如果有条目在后续的索引中反复,会简略的重用该条目标的地位而不是从新对其进行计算。当间断的blocks共享同一dictionary 时,page处理器将保留该数组,来进一步缩小必要的计算。
六、工程相干
在这一部分中,提供了Presto在设计上所参考的一些工程哲学,这些对于Presto的设计和倒退有着重要意义,也十分具备借鉴意义。
- Adaptiveness over configurability(基于配置的自适应性)
自适应高于配置。例如cpu执行的quanta分片机制使得短查问能够疾速执行,ETL 写入的并行度,反压等个性。
- Effortless instrumentation(唾手可得的工具)
细粒度的性能统计信息。通过Presto的库(libraries)收集统计信息,并且针对每一个查问收集了算子(operator)级别的统计信息,并将这些信息合并到task以及stage级别的统计信息中。通过这些细粒度的监控伎俩,使得Presto在优化时能以数据为驱动。
- Static configuration(动态配置)
如Presto这样的简单零碎,诸多操作问题很难疾速地定位本源并予以解决。因而Presto抉择不动静调整配置,防止集群不稳固。
- Vertical integration(垂直集成)
Presto的各组件都应该能够很好的与Presto交互,比方发现gzip包慢,那么Presto就自行革新实现。在多线程下的debug和控制能力也十分重要。
最初,咱们简短的进行一个总结。在本篇论文中,咱们介绍了Presto,这是一个由Facebook开发的开源的MPP SQL查问引擎,能够疾速的解决大型的数据集,具备自适应、灵便以及可扩大等个性,值得咱们对其的一些实现原理进行深刻的钻研。