Push还是Pull,这是个问题么?

数据库的SQL计算引擎负责解决和执行SQL申请。通常来说,查问优化器会输入物理执行打算,它通常由一系列Operator组成,为了确保执行效率的高效,须要将Operator组成流水线执行。

有两种流水线的构建形式:第一种是需要驱动的流水线,其中一个Operator一直从上级Operator反复拉取下一个数据Tuple;第二种是数据驱动的流水线,由Operator将每个数据Tuple推送给下一个Operator。那么,这两种流水线构建,哪种更好呢? 这可能并不是一个容易答复的问题,在Snowflake的论文中提到:基于Push的执行进步了缓存效率,因为它将控制流逻辑从数据循环中移除。它还使 Snowflake 可能无效地解决流水线的DAG 打算,为两头后果的共享和管道化发明了额定的机会。

下边这幅来自参考文献[1]的图最间接地阐明Push和Pull的区别:

简略地说,Pull流水线基于迭代器模型,经典的火山模型正是基于Pull来构建。火山模型是数据库成熟的 SQL执行计划,该设计模式将关系型代数中的每一种操作形象成一个Operator,整个 SQL 语句在这种状况下造成一个 Operator树(执行打算树);通过自顶向下的调用 next 接口,火山模型可能以数据库行为单位解决数据,就是图中所示的next()办法。这种申请是递归调用的,直到查问打算树的叶子结点能够拜访数据自身。因而,对于Pull模型来说,这是非常容易了解和实现的:每个Operator都须要实现next()办法,只有将查问打算树构建好,就递归调用即可。火山模型有如下特点:

  1. 以数据行为单位解决数据,每一行数据的解决都会调用 next 接口。
  2. next 接口的调用,须要通过虚函数机制,相比于间接调用函数,虚函数须要的CPU 指令更多,因而更低廉。
  3. 以行为单位的数据处理,会导致 CPU 缓存应用效率低下和一些不必要的复杂性:数据库必须记住解决到哪一行,以便解决跳到下一行;其次解决完一行后须要将下一行加载到 CPU 缓存中,而实际上 CPU 缓存所能存储的数据行数远不止一行。
  4. 火山模型最大的益处是接口看起来洁净而且易懂。因为数据流和控制流在一起,每个Operator有良好的形象,比方Filter只须要关怀如何依据谓词过滤数据,Aggregates只须要关怀如何聚合数据。

为了升高开销,Pull模型能够引入向量化减速,就是实现GetChunk()办法每次获取一批数据取代next()获取一行数据,以Projection算子为例阐明:

 void Projection::GetChunk(DataChunk &result) {        // get the next chunk from the child        child->GetChunk(child_chunk);        if (child_chunk.size() == 0) {            return;        }        // execute expressions        executor.Execute(child_chunk, result); }

这里边,存在一些跟控制流无关的代码,它跟Operator的解决逻辑耦合在一起,且每个Operator实现都要蕴含这些代码,例如这里须要判断child_chunk为空的状况,因为child在GetChunk时进行了过滤解决。因而,Pull模型的接口外部实现比拟冗余和易错。

与Pull流水线的迭代器模型不同,在Push模型中,数据流和控制流是相同的,具体来说,不是目标Operator向源Operator申请数据,而是从源Operator向目标Operator推送数据,这是通过源Operator将数据作为参数传递给目标Operator的生产办法(Consume)实现的,因而,Push流水线模型等价于访问者(Visitor)模型,每个Operator不再提供next,而换之以Produce/Consume。Push模型是Hyper提出的[3],称之为Pipeline Operator,它提出的初衷,是认为迭代器模型以Operator为核心,Operator的边界过于清晰,因而导致数据在Operator之间传递(从CPU寄存器转移到内存)产生额定的内存带宽开销,无奈做到Data Locality最大化,所以执行须要从以Operator为核心切换到以数据为核心,尽量让数据在寄存器中保留更长时间,确保Data Locality最大化。进一步的,Hyper将操作系统的NUMA调度框架引入了数据库的查问执行调度[2],为Push模型实现了parallelism-aware(就是对并行更敌对):

  • 采纳Pipeline来组合算子,自底而上Push调度。当一个工作执行完结时,它会告诉调度器将后序工作退出到工作队列中,每个数据块的单位被称为 Morsel。一个Morsel 大概蕴含10000行数据。查问工作的执行单位是解决一个 Morsel。
  • 优先将一个内核上的工作产生的后序任务调度在同一个内核上,防止了在内核间进行数据通信的开销。
  • 当一个内核闲暇时,它有能力从其余内核“偷取”一个工作来执行(Work Stealing),这尽管有时会减少一个数据传输的开销,然而却缓解了繁忙内核上工作的沉积,总体来说会放慢工作的执行。
  • 在内核闲暇并能够偷取工作时,调度器并非立刻满足闲暇内核的要求,而是让它稍稍期待一段时间。在这段时间里,如果繁忙内核能够实现本人的工作,那么跨内核调度就能够被防止。

以多表Join为例:

SELECT ... FROM SJOIN R USING AJOIN T USING B;

该查问由多个Pipeline组合而成,Pipeline之间须要并行,Pipeline外部也要并行。理论中并行的管制只须要在Pipeline的端点即可,例如上图中,两头的过滤等算子,自身无需思考并行,因为源头的TableScan扫描会Push数据给它,而Pipeline的Sink是Hash Join,它的Hashtable Build阶段须要parallelism-aware,但Probe阶段无需这样。以Push为根底管制Pipeline的parallelism-aware,从技术上更加容易做到。

Push模型实现parallelism-aware绝对容易,那么为什么Pull模型实现parallelism-aware就不太容易呢?因为是自顶而下来调度而非数据驱动,因而一个间接的想法是划定分区,而后由优化器依据分区制订物理打算,不同分区的物理打算并行执行。这样容易导致一个问题就是让查问打算更加简单(引入更多分区),而且并不容易做到负载的主动平衡,具体来说:对输出数据进行分区时,通过一些Operator(如Filter)后,不同分区保留下来的数据量区别很大,因而后续的算子执行就会面临数据歪斜问题。此外,不同的CPU解决同样数据量所破费的工夫也并不一定雷同,它会受到环境烦扰、任务调度、阻塞、谬误等起因减慢甚至停止相应,因而也会拖慢整体执行的效率。

Hyper的Push模型是在2011年提出的,在这之前的SQL引擎,大都采纳基于火山的Pull模型。已知基于Push构建的有Presto,Snowflake,Hyper,QuickStep,HANA,DuckDB(在2021年10月从Pull模型切换到了Push模型(详见参考文献[4]),Starrocks等。ClickHouse是个异类,在它本人的Meetup资料中,声称本人是Pull和Push的组合,其中查问是采纳了Pull模型。并且,在它的代码中,也一样是采纳Pull字眼,作为查问调度外围驱动——PullingAsyncPipelineExecutor。在通过AST生成QueryPlan(逻辑打算)后,通过一些RBO优化,ClickHouse将QueryPlan依照后序遍历的形式将其转化为Pipeline,这种形式生成的Pipeline,跟Push模型是很像的,因为Pipeline的每个Operator(ClickHouse定义叫Processor),都有输出和输入,Operator从输出将数据Pull过去,实现解决后,再Push给Pipeline的下一级Operator。因而,ClickHouse并不是传统的火山Pull模型实现,而是从查问打算树生成Pipeline执行打算。从火山Pull模型的Plan Tree生成Pipeline的办法是后序遍历,从没有Children 的Node开始构建第一个Pipeline,这是Push模型中生成Pipeline Operator的规范做法:

QueryPipelinePtr QueryPlan::buildQueryPipeline(...){    struct Frame    {        Node * node = {};        QueryPipelines pipelines = {};    };    QueryPipelinePtr last_pipeline;    std::stack<Frame> stack;    stack.push(Frame{.node = root});    while (!stack.empty())    {        auto & frame = stack.top();        if (last_pipeline)        {            frame.pipelines.emplace_back(std::move(last_pipeline));            last_pipeline = nullptr;        }        size_t next_child = frame.pipelines.size();        if (next_child == frame.node->children.size())        {            last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);            stack.pop();        }        else            stack.push(Frame{.node = frame.node->children[next_child]});    }    return last_pipeline;}

接下来是Pipeline调度,首先PullingAsyncPipelineExecutor::pull从Pipeline中拉取数据:

    PullingAsyncPipelineExecutor executor(pipeline);    Block block;    while (executor.pull(block, ...))    {        if (isQueryCancelled())        {            executor.cancel();            break;        }        if (block)        {            if (!state.io.null_format)                sendData(block);        }        sendData({});    }

pull调用的时候从thread_group抉择线程,而后data.executor->execute(num_threads)执行PipelineExecutor,num_threads示意并行线程数。接下来PipelineExecutor将Pipeline转化为执行图ExecutingGraph。Pipeline是逻辑构造,并不关怀如何执行,ExecutingGraph则是物理调度执行的参照。ExecutingGraph通过Pipeline Operator的InputPort和OutputPort转换为Edge,用Edge把2个Operator连接起来,Operator就是图的Node。随后就是PipelineExecutor::execute通过ExecutingGraph对Pipeline调度,这个函数次要作用是通过task_queue中pop出执行打算的ExecutingGraph::Node来调度工作。调度时,线程会不停遍历ExecutingGraph,依据Operator的执行状态进行调度执行,直到所有的Operator都达到Finished状态。调度器初始化,是筛选ExecutingGraph中所有没有OutPort的Node启动的,因而,控制流是从Pipeline的Sink Node收回的,递归调用prepareProcessor,这区别于Push模型的控制流从Source Node开始逐级向上。除了管制流向的不同,这个Pipeline Operator跟Push齐全一样,因而也有人将ClickHouse纳入Push模型中,毕竟,在很多文献的上下文,Push等同于Pipeline Operator,Pull等同于火山。Pipeline和ExecutingGraph的对应如图所示(在ClickHouse中,Operator=Processor=Transformer):

因而,Push模型是parallelism-aware的,实质上须要设计工作良好的调度器,来管制数据流和并行度。除了上述的长处,奢侈的Push模型也存在一些毛病:解决Limit和Merge Join有一些艰难(详见参考文献[1]),对于前者来说,Operator不容易控制数据何时不再由源Operator产生,这样就会产生一些永远不会被应用的元素。对于后者来说,因为Merge Join Operator无奈以后由哪个源Operator产生下一个数据Tuple,所以Merge Join无奈进行流水线解决,因而至多对其中一个源Operator要突破流水线(Pipeline Breaker),须要对其进行物化操作。这两个问题,其本质仍然是Push模型下的Pipeline调度问题:消费者如何管制生产者,除了Limit和Merge Join之外,其余的操作,例如终止正在进行的查问,也是一样的状况。正如通过拆散查问打算树和Pipeline使得Pull模型能够parallelism-aware之外,Push模型在工程实现上也并没有必要齐全如同论文所形容,只能管制Pipeline的源头。通过引入ClickHouse task_queue相似的机制,Push模型同样能够做到对源Operator的逐级管制。

MatrixOne基于Golang开发,因而间接利用Go语言个性实现了Push模型:利用channel作为阻塞音讯队列,告诉生产者。查问打算由多个Operator形成,pipeline是蕴含多个Operator的执行序列。Operator代表一个具体的操作,比方典型的过滤,投影,hash build和hash probe都能够。对于一个查问打算来说,首先须要确定应用多少个pipeline,应用多少个cpu,每个cpu跑哪些pipeline。具体实现中,借助于Golang语言的个性:一个pipeline对应一个goroutine,pipeline之间通过channel(无Buffer)传递数据,pipeline的调度也是通过channel来驱动。举例如下:

Connector Operatorfunc Call(proc *process.Process, arg interface{}) (bool, error) {    ...    if inputBatch == nil {        select {        case <-reg.Ctx.Done():            process.FreeRegisters(proc)            return true, nil        case reg.Ch <- inputBatch:            return false, nil        }    }}

因为是Push模型,因而一个查问打算是通过Producer Pipeline触发整个流程的,非生产者的Pipeline,没有接管到数据,是不会运行。Producer Pipeline在启动后,就会尝试读取数据,而后通过channel将数据发送给另一个Pipeline,Producer Pipeline在启动后就会不停的读取数据,只存在两种状况会退出:

  • 数据读取完
  • 产生谬误

当非生产者的pipeline没有从channel读取Producer Pipeline推送的数据时,Producer Pipeline会阻塞。非生产者的Pipeline在启动后并不会立即执行,除非Producer Pipeline在channel中搁置了数据。Pipeline在启动后会在以下两种状况退出:

  • 从channel中承受到了退出信息
  • 产生谬误

MatrixOne会依据数据的散布将Producer Pipeline调配到具体的节点。在特定的节点接管到Producer Pipeline后会依据以后机器和查问打算的状况(目前是获取机器的核数)来派生多个Producer pipeline。其余Pipeline的并行度,则是在承受数据的时候确定其并行度。

下边先看一个简略查问:

select * from R where a > 1 limit 10

这个查问存在Limit Operator,意味着存在上文所述的Cancel,Limit,Merge Join等Pipeline的终止条件。该查问的Pipeline如下所示,它在2个Core上并行执行。

graph LRbatch0-->restrict0-->limit0-->connector0-->merge-->limitbatch1-->restrict1-->limit1-->connector1-->merge

因为Limit的存在,Pipeline引入了Merge Operator,与此同时跟调度相干的问题是:

  • Merge不能无限度地承受多个Pipeline的数据,Merge须要依据内存大小通过Connector向上游发送channel进行数据读取。
  • Pipeline数目依据CPU数量动静决定,当Pipeline不再推送数据时,查问天然终止,因而Merge须要标记是否曾经传输完结。

再看一个简单一些的例子,tpch-q3:

select    l_orderkey,    sum(l_extendedprice * (1 - l_discount)) as revenue,    o_orderdate,    o_shippriorityfrom    customer,    orders,    lineitemwhere    c_mktsegment = 'HOUSEHOLD'    and c_custkey = o_custkey    and l_orderkey = o_orderkey    and o_orderdate < date '1995-03-29'    and l_shipdate > date '1995-03-29'group by    l_orderkey,    o_orderdate,    o_shippriorityorder by    revenue desc,    o_orderdatelimit 10

假如查问打算如下:

graph TDA[limit 10] --> B[order]B --> C[group with agg]C --> A0[filter]A0 --> D[join]D --> E[join]D --> F[scan customer]E --> G[scan orders]E --> H[scan lineitem]

假如三张表的数据均匀分布在两个节点node0和node1上,那么对应的Pipeline如下:

采纳Push模型的还有一个潜在长处在于,它跟流计算的Data Flow范式(如Flink)容易保持一致。FlinkSQL会把查问打算的每个Operator转为流式Operator,流式Operator会将每个Operator的计算结果的更新传给下一个Operator,这从逻辑上跟Push模型是统一的。对于打算在数据库外部实现流引擎的MatrixOne来说,这是一个逻辑复用的中央,当然,流引擎远不是只依附Push模型就能够解决的,这超出了本文探讨的领域。采纳Push模型最初一个潜在长处是,它和查问编译Codegen是人造组合,目前MatrixOne并没有实现Codegen,故而这也超出了本文探讨的领域。

以后的MatrixOne,实现了根本的基于Push模型的计算并行调度,在将来,还会在多方面上进行改良,比方针对多查问的混合并发与并行的任务调度,比方当算子因为内存不足须要进行Spill解决时,也须要Pipeline的调度可能感知并无效解决,既能实现工作,还能最小化IO开销,这里边会有很多十分有意思的工作。也欢送对这方面感兴趣的同学跟咱们一起探讨这些层面上的翻新。所以,Push还是Pull,这是个问题么?如同是,如同也不是,所有以实际效果为着眼点,并非简略地非黑既白,这代表了一种计算并行调度的思维形式。

参考文献

[1] Shaikhha, Amir and Dashti, Mohammad and Koch, Christoph, Push versus pull-based loop fusion in query engines, Journal of Functional Programming, Cambridge University Press, 2018

[2] Leis, Viktor and Boncz, Peter and Kemper, Alfons and Neumann, Thomas, Morsel-driven parallelism: A NUMA-aware query evaluation framework for the many-core age, SIGMOD 2014

[3] Thomas Neumann, Efficiently compiling efficient query plans for modern hardware, VLDB 2011

[4] https://github.com/duckdb/duc...

[5] https://presentations.clickho...

欢送退出MatrixOne社区

官网:matrixorigin.cn
源码:github.com/matrixorigin/matrixone
Slack:matrixoneworkspace.slack.com
知乎 | CSDN | 墨天轮 | OSCHINA | InfoQ:MatrixOrigin