关于presto:探究Presto-SQL引擎2浅析Join

53次阅读

共计 9282 个字符,预计需要花费 24 分钟才能阅读完成。

作者:vivo 互联网技术 -Shuai Guangying

在《探索 Presto SQL 引擎(1)- 巧用 Antlr》中,咱们介绍了 Antlr 的根本用法以及如何应用 Antlr4 实现解析 SQL 查问 CSV 数据,更加深刻了解 Presto 查问引擎反对的 SQL 语法以及实现思路。

本次带来的是系列文章的第 2 篇,本文梳理了 Join 的原理,以及 Join 算法在 Presto 中的实现思路。通过实践和实际的联合,能够在了解原理的根底上,更加深刻了解 Join 算法在 OLAP 场景下的工程落地技巧,比方火山模型,列式存储,批量解决等思维的利用。

一、背景

在业务开发中应用数据库,通常会有标准不容许过多表的 Join。例如阿里巴巴开发手册中,有如下的规定:

【强制】超过三个表禁止 Join。须要 Join 的字段,数据类型必须相对统一;多表关联查问时,保障被关联的字段须要有索引。阐明:即便双表 Join 也要留神表索引、SQL 性能。

在大数据数仓的建设中,只管咱们有星型构造和雪花构造,然而最终交付业务应用的大多是宽表。

能够看出业务应用数据库中的一个矛盾点:咱们须要 Join 来提供灵便的关联操作,然而又要尽量避免多表和大表 Join 带来的性能问题。这是为什么呢?

二、Join 的基本原理

在数据库中 Join 提供的语义是十分丰盛的。简略总结如下:

通常了解 Join 的实现原理,从 Cross Join 是最好的切入点,也就是所谓的笛卡尔积。对于汇合进行笛卡尔积运算,了解非常简单,就是穷举两个汇合中元素所有的组合状况。在数据库中,汇合就对应到数据表中的所有行(tuples),汇合中的元素就对应到单行(tuple)。所以实现 Cross Join 的算法也就跃然纸上了。

实现的代码样例如下:

List<Tuple>  r = newArrayList(new Tuple(newArrayList(1,"a")),
        new Tuple(newArrayList(2,"b")));
 
List<Tuple>  s = newArrayList(new Tuple(newArrayList(3,"c")),
        new Tuple(newArrayList(4,"d")));
 
int cnt =0;
for(Tuple ri:r){for(Tuple si:s){Tuple c = new Tuple().merge(ri).merge(si);
        System.out.println(++cnt+":"+ c);
    }
}
/**
 * out:
 1: [1, a, 3, c]
 2: [1, a, 4, d]
 3: [2, b, 3, c]
 4: [2, b, 4, d]
 */

能够看出实现逻辑非常简单,就是两个 For 循环嵌套。

2.1 Nested Loop Join 算法

在这个根底上,实现 Inner Join 的第一个算法就顺其自然了。十分直白的名称:Nested Loop,实现关键点如下:

(起源:Join Processing in Relational Databases)

其中,θ 操作符能够是:=, !=, <, >, ≤, ≥。

相比笛卡尔积的实现思路,也就是增加了一层 if 条件的判断用于过滤满足条件的组合。

对于 Nested Loop 算法,最要害的点在于它的执行效率。如果参加 Join 的两张表一张量级为 1 万,一张量级为 10w,那么进行比拟的次数为 1w*10w=10 亿次。在大数据时代,通常一张表数据量都是以亿为单位,如果应用 Nested Loop Join 算法,那么 Join 操作的比拟次数间接就是天文数字了。所以 Nested Loop Join 基本上是作为万不得已的保底计划。Nested Loop 这个框架下,常见的优化措施如下:

  • 小表驱动大表,即数据量较大的集作为于 for 循环的外部循环。
  • 一次解决一个数据块,而不是一条记录。也就是所谓的 Block Nested Loop Join,通过分块升高 IO 次数,晋升缓存命中率。

值得一提的是 Nested Loop Join 的思维尽管十分奢侈,然而人造的具备分布式、并行的能力。这也是为什么各类 NoSQL 数据库中仍然保留 Nested Loop Join 实现的重要一点。尽管单机串行执行慢,然而能够并行化的话,那就是加机器能解决的问题了。

2.2 Sort Merge Join 算法

通过后面的剖析能够晓得,Nested Loop Join 算法的关键问题在于比拟次数过多,算法的复杂度为 O(m*n),那么突破口也得朝着这个点。如果汇合中的元素是有序的,比拟的次数会大幅度降低,防止很多无意义的比拟运算。对于有序的所以 Join 的第二种实现形式如下所形容:

(起源:Join Processing in Relational Databases)s)

通过将 JOIN 操作拆分成 Sort 和 Merge 两个阶段实现 Join 操作的减速。对于 Sort 阶段,是能够提前准备好能够复用的。这样的思维对于 MySQL 这类关系型数据库是十分敌对的,这也能解释阿里巴巴开发手册中要求关联的字段必须建设索引,因为索引保障了数据有序。该算法工夫复杂度为排序开销 O(m_log(m)+n_log(n))+ 合并开销 O(m+n)。然而通常因为索引保障了数据有序,索引其工夫复杂度为 O(m+n)。

2.3 Hash Join 算法

Sort Merge Join 的思维在落地中有肯定的限度。所谓成也萧何败萧何,对于基于 Hadoop 的数仓而言,保证数据存储的有序性这个点对于性能影响过大。在海量数据的背景下,保护索引老本是比拟大的。而且索引还依赖于应用场景,不可能每个字段都建一个索引。在数据表关联的场景是大表关联小表时,比方:用户表(大表)– 当日订单表(小表);事实表(大表)–维度表(小表),能够通过空间换工夫。回忆一下,在根底的数据结构中,tree 构造和 Hash 构造堪称数据处理的两大法宝:一个保证数据有序不便实现区间搜寻,一个通过 hash 函数实现精准命中点对点查问效率高。

在这样的背景下,通过将小表 Hash 化,实现 Join 的想法也就难能可贵了。

(起源:Join Processing in Relational Databases)

而且即便一张表在单机环境生成 Hash 内存耗费过大,还能够利用 Hash 将数据进行切分,实现分布式能力。所以,在 Presto 中 Join 算法通常会抉择 Hash Join,该算法的工夫复杂度为 O(m+n)。

通过相干材料的学习,能够发现 Join 算法的实现原理还是相当简略的,排序和 Hash 是数据结构最为根底的内容。理解了 Join 的根本思维,如何落地实际进去呢?毕竟 talk is cheap。在我的项目中实现 Join 之前,须要一些铺垫常识。通常来说外围算法是皇冠上的明珠,然而仅有明珠是不够的还须要皇冠作为底座。

三、Join 工程化前置条件

3.1 SQL 解决架构 - 火山模型

在将 Join 算法落地前,须要先理解一下数据库解决数据的根本架构。在了解架构的根底上,能力将 Join 算法搁置到适合的地位。在后面系列文章中探讨了基于 antlr 实现 SQL 语句的解析。能够发现 SQL 语法反对的操作类型十分丰盛:查问表 (TableScan),过滤数据(Filter),排序(Order),限度(Limit),字段进行运算(Project),聚合(Group),关联(Join) 等。为了实现上述的能力,须要一个具备并行化能力且可扩大的架构。

1994 年 Goetz Graefe 在论文《Volcano-An Extensible and Parallel Query Evaluation System》提出了一个架构设计思维,这就是赫赫有名的火山模型,也称为迭代模型。火山模型其实蕴含了文件系统和查询处理两个局部,这里咱们重点关注查询处理的设计思维。架构图如下:

(起源:《Balancing vectorized execution with bandwidth-optimized storage》)

简略解读一下:

职责拆散:将不同操作独立成一个的 Operator,Operator 采纳 open-next-close 的迭代器模式。

例如对于 SQL。

SELECT Id, Name, Age, (Age - 30) * 50 AS Bonus
FROM People
WHERE Age > 30

对应到 Scan,Select, Project 三个 Operator,数据交互通过 next()函数实现。上述的实践在 Presto 中能够对应起来,例如 Presto 中几个罕用的 Operator, 基本上是见名知意:

动静组装:Operator 基于 SQL 语句的解析实现动静组装,多个 Operator 造成一个管道(pipeline)。

例如:print 和 predicate 两个 operator 造成一个管道:

(起源:《Volcano-An Extensible and Parallel Query Evaluation System》)

在火山模型的根底上,Presto 排汇了数据库畛域的其余思维,对根底的火山模型进行了优化革新,次要体现在如下几点:

  1. Operator 数据处理优化成一次一个 Page,而不是一次行(也称为 tuple)。
  2. Page 的存储采纳列式构造。即雷同的列封装到一个 Block 中。

批量解决联合列式存储奠定了向量化计算的根底。这也是数据库畛域的优化方向。

3.2 批量解决和列式存储

在研读 Presto 源码时,简直到处都能够看到 Page/Block 的身影。所以了解 Page/Block 背地的思维是了解 Presto 实现机制的根底。有相干书籍和文档解说 Page/Block 的概念,然而因为这些概念是跟其余概念混在一起出现,导致一时间不容易了解。

笔者认为 Type-Block-Page 三者放在一起,更容易了解。咱们应用数据库,通常须要定义表,字段名称,字段类型。在传统的 DBMS 中,通常是按行存储数据,通常构造如下:

(起源:《数据库系统实现》)

然而通常 OLAP 场景不须要读取所有的字段,基于这样的场景,就衍生进去了列式存储。就是咱们看到的如下构造:

(起源:《Presto 技术底细》)

即每个字段对应一个 Block,多个 Block 的切面才是一条记录,也就是所谓的行,在一些论文中称为 tuple。通过比照能够分明看出 Presto 中,Page 就是典型了列式存储的实现。所以在 Presto 中,每个 Type 必然会关联到一种 Block。例如:bigint 类型就对应着 LongArrayBlockBuilder,varchar 类型对应着 VariableWidthBlock。

了解了原理,操作 Page/Block 就变得非常简单了,简略的 demo 代码如下:

import com.facebook.presto.common.Page;
import com.facebook.presto.common.PageBuilder;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.VarcharType;
import com.google.common.collect.Lists;
import io.airlift.slice.Slice;
 
import java.util.List;
 
import static io.airlift.slice.Slices.utf8Slice;
 
/**
 * PageBlockDemo
 *
 * @version 1.0
 * @since 2021/6/22 19:26
 */
public class PageBlockDemo {private static Page buildPage(List<Type> types,List<Object[]> dataSet){PageBuilder pageBuilder = new PageBuilder(types);
        // 封装成 Page
        for(Object[] row:dataSet){
            // 实现一行
            pageBuilder.declarePosition();
            for (int column = 0; column < types.size(); column++) {BlockBuilder out =  pageBuilder.getBlockBuilder(column);
 
                Object colVal = row[column];
                if(colVal == null){out.appendNull();
                }else{Type type = types.get(column);
                    Class<?> javaType = type.getJavaType();
                    if(javaType == long.class){type.writeLong(out,(long)colVal);
                    }else if(javaType == Slice.class){type.writeSlice(out, utf8Slice((String)colVal));
                    }else{throw new UnsupportedOperationException("not implemented");
                    }
                }
            }
        }
        // 生成 Page
        Page page = pageBuilder.build();
        pageBuilder.reset();
        return page;
    }
 
    private static void readColumn(List<Type> types,Page page){
        // 从 Page 中读取列
        for(int column=0;column<types.size();column++){Block block = page.getBlock(column);
            Type type = types.get(column);
            Class<?> javaType = type.getJavaType();
 
            System.out.print("column["+type.getDisplayName()+"]>>");
            List<Object> colList = Lists.newArrayList();
            for(int pos=0;pos<block.getPositionCount();pos++){if(javaType == long.class){colList.add(block.getLong(pos));
                }else if(javaType == Slice.class){colList.add(block.getSlice(pos,0,block.getSliceLength(pos)).toStringUtf8());
                }else{throw new UnsupportedOperationException("not implemented");
                }
            }
            System.out.println(colList);
        }
    }
 
    public static void main(String[] args) {
        /**
         * 假如有两个字段,一个字段类型为 int, 一个字段类型为 varchar
         */
        List<Type> types = Lists.newArrayList(BigintType.BIGINT, VarcharType.VARCHAR);
 
        // 按行存储
        List<Object[]> dataSet = Lists.newArrayList(new Object[]{1L,"aa"},
                new Object[]{2L,"ba"},
                new Object[]{3L,"cc"},
                new Object[]{4L,"dd"});
 
        Page page = buildPage(types, dataSet);
 
        readColumn(types,page);
 
    }
}
// 运行后果://column[bigint]>>[1, 2, 3, 4]
//column[varchar]>>[aa, ba, cc, dd]

将数据封装成 Page 在各个 Operator 中流转,一方面防止了对象的序列化和反序列化老本,另一方面相比 tuple 的形式升高了函数调用的开销。这跟集装箱运货升高运输成本的思维是相似的。

四、Join 算法的工程实际

了解了 Join 的外围算法和基础架构,联合前文中对 antlr 实现 SQL 表达式的解析以及实现 where 条件过滤,咱们曾经具备了实现 Join 的根底条件。接下来简略讲述一下 Join 算法的落地流程。首先在语法层面须要反对 Join 的语法,因为本文目标在于钻研算法实现流程,而不在于实现残缺的 Join 性能,因而咱们暂且先思考反对两张表单字段的等值 Join 语法。

首先在语法上须要反对 Join, 基于 antlr 语法的定义关键点如下:

querySpecification
    : SELECT  selectItem (',' selectItem)*
      (FROM relation (',' relation)*)?
      (WHERE where=booleanExpression)?
    ;
 
selectItem
    : expression  #selectSingle
    ;
 
relation
    : left=relation
      (joinType JOIN rightRelation=relation joinCriteria)                                           #joinRelation
    | sampledRelation                             #relationDefault
    ;
 
joinType
    : INNER?
    ;
 
joinCriteria
    : ON booleanExpression
    ;

上述的语法定义将 Join 的要害因素拆解得十分清晰:Join 的左表,Join 的类型,Join 关键词,Join 的右表,Join 的关联条件。例如,通常咱们最简略的 Join 语句用例如下(借用 presto 的 tpch 数据源):

select t2.custkey, t2.phone, t1.orderkey from orders t1 inner join customer t2 on t1.custkey=t2.custkey limit 10;

对应着语法和 SQL 语句用例,能够看到在将 Join 算法落地,还须要思考如下细节点:

  • 检测 SQL 语句,确保 SQL 语句合乎语法要求。
  • 梳理表的别名和字段的对应关系,确保查问的字段和表可能对应起来,Join 条件的字段类型可能匹配。
  • Join 算法的选取,是 HashJoin 还是 NestedLoopJoin 还是 SortMergeJoin?
  • 哪个表是 build 表,哪个表是 probe 表?
  • Join 条件的判断如何实现?
  • 整个查问波及到 Operator 如何组装,以实现最终后果的输入?

咱们回顾一下 SQL 执行的要害流程:

(起源: Query Execution Flow Architecture (SQL Server))

基于下面的流程,问题其实曾经有了答案。

  • Parser:借助 antlr 的能力即可实现 SQL 语法的检测。
  • Binding:基于 SQL 语句生成 AST,利用元数据检测字段和表的映射关系以及 Join 条件的字段类型。
  • Planner:基于 AST 生成查问打算。
  • Executor:基于查问打算生成对应的 Operator 并执行。

以 NestedLoop Join 算法为例,理解一下 Presto 的实现思路。对于 NestedLoopJoin Join 算法的落地,在 Presto 中其实是拆解为两个阶段:组合阶段和过滤阶段。在实现 JoinOperator 时,只需负责两个表数据的笛卡尔积组合即可。外围代码如下:

// NestedLoopPageBuilder 中实现两个 Page 计算笛卡尔积的解决逻辑,这里 RunLengthEncodedBlock 用于一个元素复制,典型地笛卡尔积计算中须要将一列元素从 1 行复制成多行。@Override
public Page next()
{if (!hasNext()) {throw new NoSuchElementException();
    }
 
    if (noColumnShortcutResult >= 0) {
        rowIndex = maxRowIndex;
        return new Page(noColumnShortcutResult);
    }
 
    rowIndex++;
 
    // Create an array of blocks for all columns in both pages.
    Block[] blocks = new Block[numberOfProbeColumns + numberOfBuildColumns];
 
    // Make sure we always put the probe data on the left and build data on the right.
    int indexForRleBlocks = buildPageLarger ? 0 : numberOfProbeColumns;
    int indexForPageBlocks = buildPageLarger ? numberOfProbeColumns : 0;
 
    // For the page with less rows, create RLE blocks and add them to the blocks array
    for (int i = 0; i < smallPage.getChannelCount(); i++) {Block block = smallPage.getBlock(i).getSingleValueBlock(rowIndex);
        blocks[indexForRleBlocks] = new RunLengthEncodedBlock(block, largePage.getPositionCount());
        indexForRleBlocks++;
    }
 
    // Put the page with more rows in the blocks array
    for (int i = 0; i < largePage.getChannelCount(); i++) {blocks[indexForPageBlocks + i] = largePage.getBlock(i);
    }
 
    return new Page(largePage.getPositionCount(), blocks);
}

五、小结

本文简略梳理了 Join 的根本算法以及在 Presto 中实现的根本框架,并以 NestedLoop Join 算法为例,演示了在 Presto 中的实现外围点。能够看出相比原始的算法形容,Presto 的工程落地是截然不同: 不仅反对了所有的 Join 语义,而且实现了分布式能力。这其中有架构层面的思考,也有性能层面的思考,十分值得摸索跟钻研。就 Join 算法,能够摸索的点还有很多,比方多表 Join 的程序选取,大表跟小表 Join 的算法优化,Semi Join 的算法优化,Join 算法数据歪斜的问题等等,堪称路漫漫其修远兮,将在后续系列文章中持续剖析摸索。

六、参考资料

  1. Presto 源码
  2. Join Processing in Relational Databases
  3. Volcano-An Extensible and Parallel Query Evaluation System

正文完
 0