Push还是Pull,这是个问题么?
数据库的SQL计算引擎负责解决和执行SQL申请。通常来说,查问优化器会输入物理执行打算,它通常由一系列Operator组成,为了确保执行效率的高效,须要将Operator组成流水线执行。
有两种流水线的构建形式:第一种是需要驱动的流水线,其中一个Operator一直从上级Operator反复拉取下一个数据Tuple;第二种是数据驱动的流水线,由Operator将每个数据Tuple推送给下一个Operator。那么,这两种流水线构建,哪种更好呢? 这可能并不是一个容易答复的问题,在Snowflake的论文中提到:基于Push的执行进步了缓存效率,因为它将控制流逻辑从数据循环中移除。它还使 Snowflake 可能无效地解决流水线的DAG 打算,为两头后果的共享和管道化发明了额定的机会。
下边这幅来自参考文献[1]的图最间接地阐明Push和Pull的区别:
简略地说,Pull流水线基于迭代器模型,经典的火山模型正是基于Pull来构建。火山模型是数据库成熟的 SQL执行计划,该设计模式将关系型代数中的每一种操作形象成一个Operator,整个 SQL 语句在这种状况下造成一个 Operator树(执行打算树);通过自顶向下的调用 next 接口,火山模型可能以数据库行为单位解决数据,就是图中所示的next()办法。这种申请是递归调用的,直到查问打算树的叶子结点能够拜访数据自身。因而,对于Pull模型来说,这是非常容易了解和实现的:每个Operator都须要实现next()办法,只有将查问打算树构建好,就递归调用即可。火山模型有如下特点:
- 以数据行为单位解决数据,每一行数据的解决都会调用 next 接口。
- next 接口的调用,须要通过虚函数机制,相比于间接调用函数,虚函数须要的CPU 指令更多,因而更低廉。
- 以行为单位的数据处理,会导致 CPU 缓存应用效率低下和一些不必要的复杂性:数据库必须记住解决到哪一行,以便解决跳到下一行;其次解决完一行后须要将下一行加载到 CPU 缓存中,而实际上 CPU 缓存所能存储的数据行数远不止一行。
- 火山模型最大的益处是接口看起来洁净而且易懂。因为数据流和控制流在一起,每个Operator有良好的形象,比方Filter只须要关怀如何依据谓词过滤数据,Aggregates只须要关怀如何聚合数据。
为了升高开销,Pull模型能够引入向量化减速,就是实现GetChunk()办法每次获取一批数据取代next()获取一行数据,以Projection算子为例阐明:
void Projection::GetChunk(DataChunk &result) { // get the next chunk from the child child->GetChunk(child_chunk); if (child_chunk.size() == 0) { return; } // execute expressions executor.Execute(child_chunk, result); }
这里边,存在一些跟控制流无关的代码,它跟Operator的解决逻辑耦合在一起,且每个Operator实现都要蕴含这些代码,例如这里须要判断child_chunk为空的状况,因为child在GetChunk时进行了过滤解决。因而,Pull模型的接口外部实现比拟冗余和易错。
与Pull流水线的迭代器模型不同,在Push模型中,数据流和控制流是相同的,具体来说,不是目标Operator向源Operator申请数据,而是从源Operator向目标Operator推送数据,这是通过源Operator将数据作为参数传递给目标Operator的生产办法(Consume)实现的,因而,Push流水线模型等价于访问者(Visitor)模型,每个Operator不再提供next,而换之以Produce/Consume。Push模型是Hyper提出的[3],称之为Pipeline Operator,它提出的初衷,是认为迭代器模型以Operator为核心,Operator的边界过于清晰,因而导致数据在Operator之间传递(从CPU寄存器转移到内存)产生额定的内存带宽开销,无奈做到Data Locality最大化,所以执行须要从以Operator为核心切换到以数据为核心,尽量让数据在寄存器中保留更长时间,确保Data Locality最大化。进一步的,Hyper将操作系统的NUMA调度框架引入了数据库的查问执行调度[2],为Push模型实现了parallelism-aware(就是对并行更敌对):
- 采纳Pipeline来组合算子,自底而上Push调度。当一个工作执行完结时,它会告诉调度器将后序工作退出到工作队列中,每个数据块的单位被称为 Morsel。一个Morsel 大概蕴含10000行数据。查问工作的执行单位是解决一个 Morsel。
- 优先将一个内核上的工作产生的后序任务调度在同一个内核上,防止了在内核间进行数据通信的开销。
- 当一个内核闲暇时,它有能力从其余内核“偷取”一个工作来执行(Work Stealing),这尽管有时会减少一个数据传输的开销,然而却缓解了繁忙内核上工作的沉积,总体来说会放慢工作的执行。
- 在内核闲暇并能够偷取工作时,调度器并非立刻满足闲暇内核的要求,而是让它稍稍期待一段时间。在这段时间里,如果繁忙内核能够实现本人的工作,那么跨内核调度就能够被防止。
以多表Join为例:
SELECT ... FROM SJOIN R USING AJOIN T USING B;
该查问由多个Pipeline组合而成,Pipeline之间须要并行,Pipeline外部也要并行。理论中并行的管制只须要在Pipeline的端点即可,例如上图中,两头的过滤等算子,自身无需思考并行,因为源头的TableScan扫描会Push数据给它,而Pipeline的Sink是Hash Join,它的Hashtable Build阶段须要parallelism-aware,但Probe阶段无需这样。以Push为根底管制Pipeline的parallelism-aware,从技术上更加容易做到。
Push模型实现parallelism-aware绝对容易,那么为什么Pull模型实现parallelism-aware就不太容易呢?因为是自顶而下来调度而非数据驱动,因而一个间接的想法是划定分区,而后由优化器依据分区制订物理打算,不同分区的物理打算并行执行。这样容易导致一个问题就是让查问打算更加简单(引入更多分区),而且并不容易做到负载的主动平衡,具体来说:对输出数据进行分区时,通过一些Operator(如Filter)后,不同分区保留下来的数据量区别很大,因而后续的算子执行就会面临数据歪斜问题。此外,不同的CPU解决同样数据量所破费的工夫也并不一定雷同,它会受到环境烦扰、任务调度、阻塞、谬误等起因减慢甚至停止相应,因而也会拖慢整体执行的效率。
Hyper的Push模型是在2011年提出的,在这之前的SQL引擎,大都采纳基于火山的Pull模型。已知基于Push构建的有Presto,Snowflake,Hyper,QuickStep,HANA,DuckDB(在2021年10月从Pull模型切换到了Push模型(详见参考文献[4]),Starrocks等。ClickHouse是个异类,在它本人的Meetup资料中,声称本人是Pull和Push的组合,其中查问是采纳了Pull模型。并且,在它的代码中,也一样是采纳Pull字眼,作为查问调度外围驱动——PullingAsyncPipelineExecutor。在通过AST生成QueryPlan(逻辑打算)后,通过一些RBO优化,ClickHouse将QueryPlan依照后序遍历的形式将其转化为Pipeline,这种形式生成的Pipeline,跟Push模型是很像的,因为Pipeline的每个Operator(ClickHouse定义叫Processor),都有输出和输入,Operator从输出将数据Pull过去,实现解决后,再Push给Pipeline的下一级Operator。因而,ClickHouse并不是传统的火山Pull模型实现,而是从查问打算树生成Pipeline执行打算。从火山Pull模型的Plan Tree生成Pipeline的办法是后序遍历,从没有Children 的Node开始构建第一个Pipeline,这是Push模型中生成Pipeline Operator的规范做法:
QueryPipelinePtr QueryPlan::buildQueryPipeline(...){ struct Frame { Node * node = {}; QueryPipelines pipelines = {}; }; QueryPipelinePtr last_pipeline; std::stack<Frame> stack; stack.push(Frame{.node = root}); while (!stack.empty()) { auto & frame = stack.top(); if (last_pipeline) { frame.pipelines.emplace_back(std::move(last_pipeline)); last_pipeline = nullptr; } size_t next_child = frame.pipelines.size(); if (next_child == frame.node->children.size()) { last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings); stack.pop(); } else stack.push(Frame{.node = frame.node->children[next_child]}); } return last_pipeline;}
接下来是Pipeline调度,首先PullingAsyncPipelineExecutor::pull从Pipeline中拉取数据:
PullingAsyncPipelineExecutor executor(pipeline); Block block; while (executor.pull(block, ...)) { if (isQueryCancelled()) { executor.cancel(); break; } if (block) { if (!state.io.null_format) sendData(block); } sendData({}); }
pull调用的时候从thread_group抉择线程,而后data.executor->execute(num_threads)执行PipelineExecutor,num_threads示意并行线程数。接下来PipelineExecutor将Pipeline转化为执行图ExecutingGraph。Pipeline是逻辑构造,并不关怀如何执行,ExecutingGraph则是物理调度执行的参照。ExecutingGraph通过Pipeline Operator的InputPort和OutputPort转换为Edge,用Edge把2个Operator连接起来,Operator就是图的Node。随后就是PipelineExecutor::execute通过ExecutingGraph对Pipeline调度,这个函数次要作用是通过task_queue中pop出执行打算的ExecutingGraph::Node来调度工作。调度时,线程会不停遍历ExecutingGraph,依据Operator的执行状态进行调度执行,直到所有的Operator都达到Finished
状态。调度器初始化,是筛选ExecutingGraph中所有没有OutPort的Node启动的,因而,控制流是从Pipeline的Sink Node收回的,递归调用prepareProcessor,这区别于Push模型的控制流从Source Node开始逐级向上。除了管制流向的不同,这个Pipeline Operator跟Push齐全一样,因而也有人将ClickHouse纳入Push模型中,毕竟,在很多文献的上下文,Push等同于Pipeline Operator,Pull等同于火山。Pipeline和ExecutingGraph的对应如图所示(在ClickHouse中,Operator=Processor=Transformer):
因而,Push模型是parallelism-aware的,实质上须要设计工作良好的调度器,来管制数据流和并行度。除了上述的长处,奢侈的Push模型也存在一些毛病:解决Limit和Merge Join有一些艰难(详见参考文献[1]),对于前者来说,Operator不容易控制数据何时不再由源Operator产生,这样就会产生一些永远不会被应用的元素。对于后者来说,因为Merge Join Operator无奈以后由哪个源Operator产生下一个数据Tuple,所以Merge Join无奈进行流水线解决,因而至多对其中一个源Operator要突破流水线(Pipeline Breaker),须要对其进行物化操作。这两个问题,其本质仍然是Push模型下的Pipeline调度问题:消费者如何管制生产者,除了Limit和Merge Join之外,其余的操作,例如终止正在进行的查问,也是一样的状况。正如通过拆散查问打算树和Pipeline使得Pull模型能够parallelism-aware之外,Push模型在工程实现上也并没有必要齐全如同论文所形容,只能管制Pipeline的源头。通过引入ClickHouse task_queue相似的机制,Push模型同样能够做到对源Operator的逐级管制。
MatrixOne基于Golang开发,因而间接利用Go语言个性实现了Push模型:利用channel作为阻塞音讯队列,告诉生产者。查问打算由多个Operator形成,pipeline是蕴含多个Operator的执行序列。Operator代表一个具体的操作,比方典型的过滤,投影,hash build和hash probe都能够。对于一个查问打算来说,首先须要确定应用多少个pipeline,应用多少个cpu,每个cpu跑哪些pipeline。具体实现中,借助于Golang语言的个性:一个pipeline对应一个goroutine,pipeline之间通过channel(无Buffer)传递数据,pipeline的调度也是通过channel来驱动。举例如下:
Connector Operatorfunc Call(proc *process.Process, arg interface{}) (bool, error) { ... if inputBatch == nil { select { case <-reg.Ctx.Done(): process.FreeRegisters(proc) return true, nil case reg.Ch <- inputBatch: return false, nil } }}
因为是Push模型,因而一个查问打算是通过Producer Pipeline触发整个流程的,非生产者的Pipeline,没有接管到数据,是不会运行。Producer Pipeline在启动后,就会尝试读取数据,而后通过channel将数据发送给另一个Pipeline,Producer Pipeline在启动后就会不停的读取数据,只存在两种状况会退出:
- 数据读取完
- 产生谬误
当非生产者的pipeline没有从channel读取Producer Pipeline推送的数据时,Producer Pipeline会阻塞。非生产者的Pipeline在启动后并不会立即执行,除非Producer Pipeline在channel中搁置了数据。Pipeline在启动后会在以下两种状况退出:
- 从channel中承受到了退出信息
- 产生谬误
MatrixOne会依据数据的散布将Producer Pipeline调配到具体的节点。在特定的节点接管到Producer Pipeline后会依据以后机器和查问打算的状况(目前是获取机器的核数)来派生多个Producer pipeline。其余Pipeline的并行度,则是在承受数据的时候确定其并行度。
下边先看一个简略查问:
select * from R where a > 1 limit 10
这个查问存在Limit Operator,意味着存在上文所述的Cancel,Limit,Merge Join等Pipeline的终止条件。该查问的Pipeline如下所示,它在2个Core上并行执行。
graph LRbatch0-->restrict0-->limit0-->connector0-->merge-->limitbatch1-->restrict1-->limit1-->connector1-->merge
因为Limit的存在,Pipeline引入了Merge Operator,与此同时跟调度相干的问题是:
- Merge不能无限度地承受多个Pipeline的数据,Merge须要依据内存大小通过Connector向上游发送channel进行数据读取。
- Pipeline数目依据CPU数量动静决定,当Pipeline不再推送数据时,查问天然终止,因而Merge须要标记是否曾经传输完结。
再看一个简单一些的例子,tpch-q3:
select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriorityfrom customer, orders, lineitemwhere c_mktsegment = 'HOUSEHOLD' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date '1995-03-29' and l_shipdate > date '1995-03-29'group by l_orderkey, o_orderdate, o_shippriorityorder by revenue desc, o_orderdatelimit 10
假如查问打算如下:
graph TDA[limit 10] --> B[order]B --> C[group with agg]C --> A0[filter]A0 --> D[join]D --> E[join]D --> F[scan customer]E --> G[scan orders]E --> H[scan lineitem]
假如三张表的数据均匀分布在两个节点node0和node1上,那么对应的Pipeline如下:
采纳Push模型的还有一个潜在长处在于,它跟流计算的Data Flow范式(如Flink)容易保持一致。FlinkSQL会把查问打算的每个Operator转为流式Operator,流式Operator会将每个Operator的计算结果的更新传给下一个Operator,这从逻辑上跟Push模型是统一的。对于打算在数据库外部实现流引擎的MatrixOne来说,这是一个逻辑复用的中央,当然,流引擎远不是只依附Push模型就能够解决的,这超出了本文探讨的领域。采纳Push模型最初一个潜在长处是,它和查问编译Codegen是人造组合,目前MatrixOne并没有实现Codegen,故而这也超出了本文探讨的领域。
以后的MatrixOne,实现了根本的基于Push模型的计算并行调度,在将来,还会在多方面上进行改良,比方针对多查问的混合并发与并行的任务调度,比方当算子因为内存不足须要进行Spill解决时,也须要Pipeline的调度可能感知并无效解决,既能实现工作,还能最小化IO开销,这里边会有很多十分有意思的工作。也欢送对这方面感兴趣的同学跟咱们一起探讨这些层面上的翻新。所以,Push还是Pull,这是个问题么?如同是,如同也不是,所有以实际效果为着眼点,并非简略地非黑既白,这代表了一种计算并行调度的思维形式。
参考文献
[1] Shaikhha, Amir and Dashti, Mohammad and Koch, Christoph, Push versus pull-based loop fusion in query engines, Journal of Functional Programming, Cambridge University Press, 2018
[2] Leis, Viktor and Boncz, Peter and Kemper, Alfons and Neumann, Thomas, Morsel-driven parallelism: A NUMA-aware query evaluation framework for the many-core age, SIGMOD 2014
[3] Thomas Neumann, Efficiently compiling efficient query plans for modern hardware, VLDB 2011
[4] https://github.com/duckdb/duc...
[5] https://presentations.clickho...
欢送退出MatrixOne社区
官网:matrixorigin.cn
源码:github.com/matrixorigin/matrixone
Slack:matrixoneworkspace.slack.com
知乎 | CSDN | 墨天轮 | OSCHINA | InfoQ:MatrixOrigin