作者:vivo互联网技术-Shuai Guangying
在《探索Presto SQL引擎(1)-巧用Antlr》中,咱们介绍了Antlr的根本用法以及如何应用Antlr4实现解析SQL查问CSV数据,更加深刻了解Presto查问引擎反对的SQL语法以及实现思路。
本次带来的是系列文章的第2篇,本文梳理了Join的原理,以及Join算法在Presto中的实现思路。通过实践和实际的联合,能够在了解原理的根底上,更加深刻了解Join算法在OLAP场景下的工程落地技巧,比方火山模型,列式存储,批量解决等思维的利用。
一、背景
在业务开发中应用数据库,通常会有标准不容许过多表的Join。例如阿里巴巴开发手册中,有如下的规定:
【强制】超过三个表禁止Join。须要Join的字段,数据类型必须相对统一;多表关联查问时,保障被关联的字段须要有索引。阐明:即便双表Join也要留神表索引、SQL性能。
在大数据数仓的建设中,只管咱们有星型构造和雪花构造,然而最终交付业务应用的大多是宽表。
能够看出业务应用数据库中的一个矛盾点:咱们须要Join来提供灵便的关联操作,然而又要尽量避免多表和大表Join带来的性能问题。这是为什么呢?
二、Join的基本原理
在数据库中Join提供的语义是十分丰盛的。简略总结如下:
通常了解Join的实现原理,从Cross Join是最好的切入点,也就是所谓的笛卡尔积。对于汇合进行笛卡尔积运算,了解非常简单,就是穷举两个汇合中元素所有的组合状况。在数据库中,汇合就对应到数据表中的所有行(tuples),汇合中的元素就对应到单行(tuple)。所以实现Cross Join的算法也就跃然纸上了。
实现的代码样例如下:
List<Tuple> r = newArrayList( new Tuple(newArrayList(1,"a")), new Tuple(newArrayList(2,"b"))); List<Tuple> s = newArrayList( new Tuple(newArrayList(3,"c")), new Tuple(newArrayList(4,"d"))); int cnt =0;for(Tuple ri:r){ for(Tuple si:s){ Tuple c = new Tuple().merge(ri).merge(si); System.out.println(++cnt+": "+ c); }}/** * out: 1: [1, a, 3, c] 2: [1, a, 4, d] 3: [2, b, 3, c] 4: [2, b, 4, d] */
能够看出实现逻辑非常简单,就是两个For循环嵌套。
2.1 Nested Loop Join算法
在这个根底上,实现Inner Join的第一个算法就顺其自然了。十分直白的名称:Nested Loop,实现关键点如下:
(起源:Join Processing in Relational Databases)
其中,操作符能够是:=, !=, <, >, ≤, ≥。
相比笛卡尔积的实现思路,也就是增加了一层if条件的判断用于过滤满足条件的组合。
对于Nested Loop算法,最要害的点在于它的执行效率。如果参加Join的两张表一张量级为1万,一张量级为10w,那么进行比拟的次数为1w*10w=10亿次。在大数据时代,通常一张表数据量都是以亿为单位,如果应用Nested Loop Join算法,那么Join操作的比拟次数间接就是天文数字了。所以Nested Loop Join基本上是作为万不得已的保底计划。Nested Loop这个框架下,常见的优化措施如下:
- 小表驱动大表,即数据量较大的集作为于for循环的外部循环。
- 一次解决一个数据块,而不是一条记录。也就是所谓的Block Nested Loop Join,通过分块升高IO次数,晋升缓存命中率。
值得一提的是Nested Loop Join的思维尽管十分奢侈,然而人造的具备分布式、并行的能力。这也是为什么各类NoSQL数据库中仍然保留Nested Loop Join实现的重要一点。尽管单机串行执行慢,然而能够并行化的话,那就是加机器能解决的问题了。
2.2 Sort Merge Join算法
通过后面的剖析能够晓得,Nested Loop Join算法的关键问题在于比拟次数过多,算法的复杂度为O(m*n),那么突破口也得朝着这个点。如果汇合中的元素是有序的,比拟的次数会大幅度降低,防止很多无意义的比拟运算。对于有序的所以Join的第二种实现形式如下所形容:
(起源:Join Processing in Relational Databases)s)
通过将JOIN操作拆分成Sort和Merge两个阶段实现Join操作的减速。对于Sort阶段,是能够提前准备好能够复用的。这样的思维对于MySQL这类关系型数据库是十分敌对的,这也能解释阿里巴巴开发手册中要求关联的字段必须建设索引,因为索引保障了数据有序。该算法工夫复杂度为排序开销O(m_log(m)+n_log(n))+合并开销O(m+n)。然而通常因为索引保障了数据有序,索引其工夫复杂度为O(m+n)。
2.3 Hash Join算法
Sort Merge Join的思维在落地中有肯定的限度。所谓成也萧何败萧何,对于基于Hadoop的数仓而言,保证数据存储的有序性这个点对于性能影响过大。在海量数据的背景下,保护索引老本是比拟大的。而且索引还依赖于应用场景,不可能每个字段都建一个索引。在数据表关联的场景是大表关联小表时,比方:用户表(大表)--当日订单表(小表);事实表(大表)–维度表(小表),能够通过空间换工夫。回忆一下,在根底的数据结构中,tree构造和Hash构造堪称数据处理的两大法宝:一个保证数据有序不便实现区间搜寻,一个通过hash函数实现精准命中点对点查问效率高。
在这样的背景下,通过将小表Hash化,实现Join的想法也就难能可贵了。
(起源:Join Processing in Relational Databases)
而且即便一张表在单机环境生成Hash内存耗费过大,还能够利用Hash将数据进行切分,实现分布式能力。所以,在Presto中Join算法通常会抉择Hash Join,该算法的工夫复杂度为O(m+n)。
通过相干材料的学习,能够发现Join算法的实现原理还是相当简略的,排序和Hash是数据结构最为根底的内容。理解了Join的根本思维,如何落地实际进去呢?毕竟talk is cheap。在我的项目中实现Join之前,须要一些铺垫常识。通常来说外围算法是皇冠上的明珠,然而仅有明珠是不够的还须要皇冠作为底座。
三、Join工程化前置条件
3.1 SQL解决架构-火山模型
在将Join算法落地前,须要先理解一下数据库解决数据的根本架构。在了解架构的根底上,能力将Join算法搁置到适合的地位。在后面系列文章中探讨了基于antlr实现SQL语句的解析。能够发现SQL语法反对的操作类型十分丰盛:查问表(TableScan),过滤数据(Filter),排序(Order),限度(Limit),字段进行运算(Project), 聚合(Group),关联(Join)等。为了实现上述的能力,须要一个具备并行化能力且可扩大的架构。
1994年Goetz Graefe在论文《Volcano-An Extensible and Parallel Query Evaluation System》提出了一个架构设计思维,这就是赫赫有名的火山模型,也称为迭代模型。火山模型其实蕴含了文件系统和查询处理两个局部,这里咱们重点关注查询处理的设计思维。架构图如下:
(起源:《Balancing vectorized execution with bandwidth-optimized storage》)
简略解读一下:
职责拆散:将不同操作独立成一个的Operator,Operator采纳open-next-close的迭代器模式。
例如对于SQL 。
SELECT Id, Name, Age, (Age - 30) * 50 AS BonusFROM PeopleWHERE Age > 30
对应到Scan, Select, Project三个Operator,数据交互通过next()函数实现。上述的实践在Presto中能够对应起来,例如Presto中几个罕用的Operator, 基本上是见名知意:
动静组装:Operator基于SQL语句的解析实现动静组装,多个Operator造成一个管道(pipeline)。
例如:print和predicate两个operator造成一个管道:
(起源: 《Volcano-An Extensible and Parallel Query Evaluation System》)
在火山模型的根底上,Presto排汇了数据库畛域的其余思维,对根底的火山模型进行了优化革新,次要体现在如下几点:
- Operator数据处理优化成一次一个Page,而不是一次行(也称为tuple)。
- Page的存储采纳列式构造。即雷同的列封装到一个Block中。
批量解决联合列式存储奠定了向量化计算的根底。这也是数据库畛域的优化方向。
3.2 批量解决和列式存储
在研读Presto源码时,简直到处都能够看到Page/Block的身影。所以了解Page/Block背地的思维是了解Presto实现机制的根底。有相干书籍和文档解说Page/Block的概念,然而因为这些概念是跟其余概念混在一起出现,导致一时间不容易了解。
笔者认为Type-Block-Page三者放在一起,更容易了解。咱们应用数据库,通常须要定义表,字段名称,字段类型。在传统的DBMS中,通常是按行存储数据,通常构造如下:
(起源:《数据库系统实现》)
然而通常OLAP场景不须要读取所有的字段,基于这样的场景,就衍生进去了列式存储。就是咱们看到的如下构造:
(起源:《Presto技术底细》)
即每个字段对应一个Block, 多个Block的切面才是一条记录,也就是所谓的行,在一些论文中称为tuple。通过比照能够分明看出Presto中,Page就是典型了列式存储的实现。所以在Presto中,每个Type必然会关联到一种Block。例如:bigint类型就对应着LongArrayBlockBuilder,varchar类型对应着VariableWidthBlock。
了解了原理,操作Page/Block就变得非常简单了,简略的demo代码如下:
import com.facebook.presto.common.Page;import com.facebook.presto.common.PageBuilder;import com.facebook.presto.common.block.Block;import com.facebook.presto.common.block.BlockBuilder;import com.facebook.presto.common.type.BigintType;import com.facebook.presto.common.type.Type;import com.facebook.presto.common.type.VarcharType;import com.google.common.collect.Lists;import io.airlift.slice.Slice; import java.util.List; import static io.airlift.slice.Slices.utf8Slice; /** * PageBlockDemo * * @version 1.0 * @since 2021/6/22 19:26 */public class PageBlockDemo { private static Page buildPage(List<Type> types,List<Object[]> dataSet){ PageBuilder pageBuilder = new PageBuilder(types); // 封装成Page for(Object[] row:dataSet){ // 实现一行 pageBuilder.declarePosition(); for (int column = 0; column < types.size(); column++) { BlockBuilder out = pageBuilder.getBlockBuilder(column); Object colVal = row[column]; if(colVal == null){ out.appendNull(); }else{ Type type = types.get(column); Class<?> javaType = type.getJavaType(); if(javaType == long.class){ type.writeLong(out,(long)colVal); }else if(javaType == Slice.class){ type.writeSlice(out, utf8Slice((String)colVal)); }else{ throw new UnsupportedOperationException("not implemented"); } } } } // 生成Page Page page = pageBuilder.build(); pageBuilder.reset(); return page; } private static void readColumn(List<Type> types,Page page){ // 从Page中读取列 for(int column=0;column<types.size();column++){ Block block = page.getBlock(column); Type type = types.get(column); Class<?> javaType = type.getJavaType(); System.out.print("column["+type.getDisplayName()+"]>>"); List<Object> colList = Lists.newArrayList(); for(int pos=0;pos<block.getPositionCount();pos++){ if(javaType == long.class){ colList.add(block.getLong(pos)); }else if(javaType == Slice.class){ colList.add(block.getSlice(pos,0,block.getSliceLength(pos)).toStringUtf8()); }else{ throw new UnsupportedOperationException("not implemented"); } } System.out.println(colList); } } public static void main(String[] args) { /** * 假如有两个字段,一个字段类型为int, 一个字段类型为varchar */ List<Type> types = Lists.newArrayList(BigintType.BIGINT, VarcharType.VARCHAR); // 按行存储 List<Object[]> dataSet = Lists.newArrayList( new Object[]{1L,"aa"}, new Object[]{2L,"ba"}, new Object[]{3L,"cc"}, new Object[]{4L,"dd"}); Page page = buildPage(types, dataSet); readColumn(types,page); }}// 运行后果://column[bigint]>>[1, 2, 3, 4]//column[varchar]>>[aa, ba, cc, dd]
将数据封装成Page在各个Operator中流转,一方面防止了对象的序列化和反序列化老本,另一方面相比tuple的形式升高了函数调用的开销。这跟集装箱运货升高运输成本的思维是相似的。
四、Join算法的工程实际
了解了Join的外围算法和基础架构,联合前文中对antlr实现SQL表达式的解析以及实现where条件过滤,咱们曾经具备了实现Join的根底条件。接下来简略讲述一下Join算法的落地流程。首先在语法层面须要反对Join的语法,因为本文目标在于钻研算法实现流程,而不在于实现残缺的Join性能,因而咱们暂且先思考反对两张表单字段的等值Join语法。
首先在语法上须要反对Join, 基于antlr语法的定义关键点如下:
querySpecification : SELECT selectItem (',' selectItem)* (FROM relation (',' relation)*)? (WHERE where=booleanExpression)? ; selectItem : expression #selectSingle ; relation : left=relation ( joinType JOIN rightRelation=relation joinCriteria ) #joinRelation | sampledRelation #relationDefault ; joinType : INNER? ; joinCriteria : ON booleanExpression ;
上述的语法定义将Join的要害因素拆解得十分清晰:Join的左表, Join的类型,Join关键词, Join的右表, Join的关联条件。例如,通常咱们最简略的Join语句用例如下(借用presto的tpch数据源):
select t2.custkey, t2.phone, t1.orderkey from orders t1 inner join customer t2 on t1.custkey=t2.custkey limit 10;
对应着语法和SQL语句用例,能够看到在将Join算法落地,还须要思考如下细节点:
- 检测SQL语句,确保SQL语句合乎语法要求。
- 梳理表的别名和字段的对应关系,确保查问的字段和表可能对应起来,Join条件的字段类型可能匹配。
- Join算法的选取,是HashJoin还是NestedLoopJoin还是SortMergeJoin?
- 哪个表是build表,哪个表是probe表?
- Join条件的判断如何实现?
- 整个查问波及到Operator如何组装,以实现最终后果的输入?
咱们回顾一下SQL执行的要害流程:
(起源: Query Execution Flow Architecture (SQL Server))
基于下面的流程,问题其实曾经有了答案。
- Parser:借助antlr的能力即可实现SQL语法的检测。
- Binding:基于SQL语句生成AST,利用元数据检测字段和表的映射关系以及Join条件的字段类型。
- Planner:基于AST生成查问打算。
- Executor:基于查问打算生成对应的Operator并执行。
以NestedLoop Join算法为例,理解一下Presto的实现思路。对于NestedLoopJoin Join算法的落地,在Presto中其实是拆解为两个阶段:组合阶段和过滤阶段。在实现JoinOperator时,只需负责两个表数据的笛卡尔积组合即可。外围代码如下:
// NestedLoopPageBuilder中实现两个Page计算笛卡尔积的解决逻辑,这里RunLengthEncodedBlock用于一个元素复制,典型地笛卡尔积计算中须要将一列元素从1行复制成多行。@Overridepublic Page next(){ if (!hasNext()) { throw new NoSuchElementException(); } if (noColumnShortcutResult >= 0) { rowIndex = maxRowIndex; return new Page(noColumnShortcutResult); } rowIndex++; // Create an array of blocks for all columns in both pages. Block[] blocks = new Block[numberOfProbeColumns + numberOfBuildColumns]; // Make sure we always put the probe data on the left and build data on the right. int indexForRleBlocks = buildPageLarger ? 0 : numberOfProbeColumns; int indexForPageBlocks = buildPageLarger ? numberOfProbeColumns : 0; // For the page with less rows, create RLE blocks and add them to the blocks array for (int i = 0; i < smallPage.getChannelCount(); i++) { Block block = smallPage.getBlock(i).getSingleValueBlock(rowIndex); blocks[indexForRleBlocks] = new RunLengthEncodedBlock(block, largePage.getPositionCount()); indexForRleBlocks++; } // Put the page with more rows in the blocks array for (int i = 0; i < largePage.getChannelCount(); i++) { blocks[indexForPageBlocks + i] = largePage.getBlock(i); } return new Page(largePage.getPositionCount(), blocks);}
五、小结
本文简略梳理了Join的根本算法以及在Presto中实现的根本框架,并以NestedLoop Join算法为例,演示了在Presto中的实现外围点。能够看出相比原始的算法形容,Presto的工程落地是截然不同: 不仅反对了所有的Join语义,而且实现了分布式能力。这其中有架构层面的思考,也有性能层面的思考,十分值得摸索跟钻研。就Join算法,能够摸索的点还有很多,比方多表Join的程序选取,大表跟小表Join的算法优化,Semi Join的算法优化,Join算法数据歪斜的问题等等,堪称路漫漫其修远兮,将在后续系列文章中持续剖析摸索。
六、参考资料
- Presto源码
- Join Processing in Relational Databases
- Volcano-An Extensible and Parallel Query Evaluation System