关于数据库:Push还是Pull这是个问题么

58次阅读

共计 8900 个字符,预计需要花费 23 分钟才能阅读完成。

Push 还是 Pull,这是个问题么?

数据库的 SQL 计算引擎负责解决和执行 SQL 申请。通常来说,查问优化器会输入物理执行打算,它通常由一系列 Operator 组成,为了确保执行效率的高效,须要将 Operator 组成流水线执行。

有两种流水线的构建形式:第一种是需要驱动的流水线,其中一个 Operator 一直从上级 Operator 反复拉取下一个数据 Tuple;第二种是数据驱动的流水线,由 Operator 将每个数据 Tuple 推送给下一个 Operator。那么,这两种流水线构建,哪种更好呢?这可能并不是一个容易答复的问题,在 Snowflake 的论文中提到:基于 Push 的执行进步了缓存效率,因为它将控制流逻辑从数据循环中移除。它还使 Snowflake 可能无效地解决流水线的 DAG 打算,为两头后果的共享和管道化发明了额定的机会。

下边这幅来自参考文献 [1] 的图最间接地阐明 Push 和 Pull 的区别:

简略地说,Pull 流水线基于迭代器模型,经典的火山模型正是基于 Pull 来构建。火山模型是数据库成熟的 SQL 执行计划,该设计模式将关系型代数中的每一种操作形象成一个 Operator,整个 SQL 语句在这种状况下造成一个 Operator 树 (执行打算树);通过自顶向下的调用 next 接口,火山模型可能以数据库行为单位解决数据,就是图中所示的 next() 办法。这种申请是递归调用的,直到查问打算树的叶子结点能够拜访数据自身。因而,对于 Pull 模型来说,这是非常容易了解和实现的:每个 Operator 都须要实现 next()办法,只有将查问打算树构建好,就递归调用即可。火山模型有如下特点:

  1. 以数据行为单位解决数据,每一行数据的解决都会调用 next 接口。
  2. next 接口的调用,须要通过虚函数机制,相比于间接调用函数,虚函数须要的 CPU 指令更多,因而更低廉。
  3. 以行为单位的数据处理,会导致 CPU 缓存应用效率低下和一些不必要的复杂性:数据库必须记住解决到哪一行,以便解决跳到下一行;其次解决完一行后须要将下一行加载到 CPU 缓存中,而实际上 CPU 缓存所能存储的数据行数远不止一行。
  4. 火山模型最大的益处是接口看起来洁净而且易懂。因为数据流和控制流在一起,每个 Operator 有良好的形象,比方 Filter 只须要关怀如何依据谓词过滤数据,Aggregates 只须要关怀如何聚合数据。

为了升高开销,Pull 模型能够引入向量化减速,就是实现 GetChunk()办法每次获取一批数据取代 next()获取一行数据,以 Projection 算子为例阐明:

 void Projection::GetChunk(DataChunk &result) {
        // get the next chunk from the child
        child->GetChunk(child_chunk);
        if (child_chunk.size() == 0) {return;}
        // execute expressions
        executor.Execute(child_chunk, result);
 }

这里边,存在一些跟控制流无关的代码,它跟 Operator 的解决逻辑耦合在一起,且每个 Operator 实现都要蕴含这些代码,例如这里须要判断 child_chunk 为空的状况,因为 child 在 GetChunk 时进行了过滤解决。因而,Pull 模型的接口外部实现比拟冗余和易错。

与 Pull 流水线的迭代器模型不同,在 Push 模型中,数据流和控制流是相同的,具体来说,不是目标 Operator 向源 Operator 申请数据,而是从源 Operator 向目标 Operator 推送数据,这是通过源 Operator 将数据作为参数传递给目标 Operator 的生产办法 (Consume) 实现的,因而,Push 流水线模型等价于访问者 (Visitor) 模型,每个 Operator 不再提供 next,而换之以 Produce/Consume。Push 模型是 Hyper 提出的[3],称之为 Pipeline Operator,它提出的初衷,是认为迭代器模型以 Operator 为核心,Operator 的边界过于清晰,因而导致数据在 Operator 之间传递(从 CPU 寄存器转移到内存)产生额定的内存带宽开销,无奈做到 Data Locality 最大化,所以执行须要从以 Operator 为核心切换到以数据为核心,尽量让数据在寄存器中保留更长时间,确保 Data Locality 最大化。进一步的,Hyper 将操作系统的 NUMA 调度框架引入了数据库的查问执行调度[2],为 Push 模型实现了 parallelism-aware(就是对并行更敌对):

  • 采纳 Pipeline 来组合算子,自底而上 Push 调度。当一个工作执行完结时,它会告诉调度器将后序工作退出到工作队列中,每个数据块的单位被称为 Morsel。一个 Morsel 大概蕴含 10000 行数据。查问工作的执行单位是解决一个 Morsel。
  • 优先将一个内核上的工作产生的后序任务调度在同一个内核上,防止了在内核间进行数据通信的开销。
  • 当一个内核闲暇时,它有能力从其余内核“偷取”一个工作来执行(Work Stealing),这尽管有时会减少一个数据传输的开销,然而却缓解了繁忙内核上工作的沉积,总体来说会放慢工作的执行。
  • 在内核闲暇并能够偷取工作时,调度器并非立刻满足闲暇内核的要求,而是让它稍稍期待一段时间。在这段时间里,如果繁忙内核能够实现本人的工作,那么跨内核调度就能够被防止。

以多表 Join 为例:

SELECT ... 
FROM S
JOIN R USING A
JOIN T USING B;

该查问由多个 Pipeline 组合而成,Pipeline 之间须要并行,Pipeline 外部也要并行。理论中并行的管制只须要在 Pipeline 的端点即可,例如上图中,两头的过滤等算子,自身无需思考并行,因为源头的 TableScan 扫描会 Push 数据给它,而 Pipeline 的 Sink 是 Hash Join,它的 Hashtable Build 阶段须要 parallelism-aware,但 Probe 阶段无需这样。以 Push 为根底管制 Pipeline 的 parallelism-aware,从技术上更加容易做到。

Push 模型实现 parallelism-aware 绝对容易,那么为什么 Pull 模型实现 parallelism-aware 就不太容易呢?因为是自顶而下来调度而非数据驱动,因而一个间接的想法是划定分区,而后由优化器依据分区制订物理打算,不同分区的物理打算并行执行。这样容易导致一个问题就是让查问打算更加简单 (引入更多分区),而且并不容易做到负载的主动平衡,具体来说:对输出数据进行分区时,通过一些 Operator(如 Filter) 后,不同分区保留下来的数据量区别很大,因而后续的算子执行就会面临数据歪斜问题。此外,不同的 CPU 解决同样数据量所破费的工夫也并不一定雷同,它会受到环境烦扰、任务调度、阻塞、谬误等起因减慢甚至停止相应,因而也会拖慢整体执行的效率。

Hyper 的 Push 模型是在 2011 年提出的,在这之前的 SQL 引擎,大都采纳基于火山的 Pull 模型。已知基于 Push 构建的有 Presto,Snowflake,Hyper,QuickStep,HANA,DuckDB(在 2021 年 10 月从 Pull 模型切换到了 Push 模型 (详见参考文献[4]),Starrocks 等。ClickHouse 是个异类,在它本人的 Meetup 资料中,声称本人是 Pull 和 Push 的组合,其中查问是采纳了 Pull 模型。并且,在它的代码中,也一样是采纳 Pull 字眼,作为查问调度外围驱动——PullingAsyncPipelineExecutor。在通过 AST 生成 QueryPlan(逻辑打算) 后,通过一些 RBO 优化,ClickHouse 将 QueryPlan 依照后序遍历的形式将其转化为 Pipeline,这种形式生成的 Pipeline,跟 Push 模型是很像的,因为 Pipeline 的每个 Operator(ClickHouse 定义叫 Processor),都有输出和输入,Operator 从输出将数据 Pull 过去,实现解决后,再 Push 给 Pipeline 的下一级 Operator。因而,ClickHouse 并不是传统的火山 Pull 模型实现,而是从查问打算树生成 Pipeline 执行打算。从火山 Pull 模型的 Plan Tree 生成 Pipeline 的办法是后序遍历,从没有 Children 的 Node 开始构建第一个 Pipeline,这是 Push 模型中生成 Pipeline Operator 的规范做法:

QueryPipelinePtr QueryPlan::buildQueryPipeline(...)
{
    struct Frame
    {Node * node = {};
        QueryPipelines pipelines = {};};
    QueryPipelinePtr last_pipeline;
    std::stack<Frame> stack;
    stack.push(Frame{.node = root});
    while (!stack.empty())
    {auto & frame = stack.top();
        if (last_pipeline)
        {frame.pipelines.emplace_back(std::move(last_pipeline));
            last_pipeline = nullptr;
        }
        size_t next_child = frame.pipelines.size();
        if (next_child == frame.node->children.size())
        {last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);
            stack.pop();}
        else
            stack.push(Frame{.node = frame.node->children[next_child]});
    }
    return last_pipeline;
}

接下来是 Pipeline 调度,首先 PullingAsyncPipelineExecutor::pull 从 Pipeline 中拉取数据:

    PullingAsyncPipelineExecutor executor(pipeline);
    Block block;
    while (executor.pull(block, ...))
    {if (isQueryCancelled())
        {executor.cancel();
            break;
        }
        if (block)
        {if (!state.io.null_format)
                sendData(block);
        }
        sendData({});
    }

pull 调用的时候从 thread_group 抉择线程,而后 data.executor->execute(num_threads)执行 PipelineExecutor,num_threads 示意并行线程数。接下来 PipelineExecutor 将 Pipeline 转化为执行图 ExecutingGraph。Pipeline 是逻辑构造,并不关怀如何执行,ExecutingGraph 则是物理调度执行的参照。ExecutingGraph 通过 Pipeline Operator 的 InputPort 和 OutputPort 转换为 Edge,用 Edge 把 2 个 Operator 连接起来,Operator 就是图的 Node。随后就是 PipelineExecutor::execute 通过 ExecutingGraph 对 Pipeline 调度,这个函数次要作用是通过 task_queue 中 pop 出执行打算的 ExecutingGraph::Node 来调度工作。调度时,线程会不停遍历 ExecutingGraph,依据 Operator 的执行状态进行调度执行,直到所有的 Operator 都达到 Finished 状态。调度器初始化,是筛选 ExecutingGraph 中所有没有 OutPort 的 Node 启动的,因而,控制流是从 Pipeline 的 Sink Node 收回的,递归调用 prepareProcessor,这区别于 Push 模型的控制流从 Source Node 开始逐级向上。除了管制流向的不同,这个 Pipeline Operator 跟 Push 齐全一样,因而也有人将 ClickHouse 纳入 Push 模型中,毕竟,在很多文献的上下文,Push 等同于 Pipeline Operator,Pull 等同于火山。Pipeline 和 ExecutingGraph 的对应如图所示(在 ClickHouse 中,Operator=Processor=Transformer):

因而,Push 模型是 parallelism-aware 的,实质上须要设计工作良好的调度器,来管制数据流和并行度。除了上述的长处,奢侈的 Push 模型也存在一些毛病:解决 Limit 和 Merge Join 有一些艰难(详见参考文献[1]),对于前者来说,Operator 不容易控制数据何时不再由源 Operator 产生,这样就会产生一些永远不会被应用的元素。对于后者来说,因为 Merge Join Operator 无奈以后由哪个源 Operator 产生下一个数据 Tuple,所以 Merge Join 无奈进行流水线解决,因而至多对其中一个源 Operator 要突破流水线(Pipeline Breaker),须要对其进行物化操作。这两个问题,其本质仍然是 Push 模型下的 Pipeline 调度问题:消费者如何管制生产者,除了 Limit 和 Merge Join 之外,其余的操作,例如终止正在进行的查问,也是一样的状况。正如通过拆散查问打算树和 Pipeline 使得 Pull 模型能够 parallelism-aware 之外,Push 模型在工程实现上也并没有必要齐全如同论文所形容,只能管制 Pipeline 的源头。通过引入 ClickHouse task_queue 相似的机制,Push 模型同样能够做到对源 Operator 的逐级管制。

MatrixOne 基于 Golang 开发,因而间接利用 Go 语言个性实现了 Push 模型:利用 channel 作为阻塞音讯队列,告诉生产者。查问打算由多个 Operator 形成,pipeline 是蕴含多个 Operator 的执行序列。Operator 代表一个具体的操作,比方典型的过滤,投影,hash build 和 hash probe 都能够。对于一个查问打算来说,首先须要确定应用多少个 pipeline,应用多少个 cpu,每个 cpu 跑哪些 pipeline。具体实现中,借助于 Golang 语言的个性:一个 pipeline 对应一个 goroutine,pipeline 之间通过 channel(无 Buffer)传递数据,pipeline 的调度也是通过 channel 来驱动。举例如下:

Connector Operator
func Call(proc *process.Process, arg interface{}) (bool, error) {
    ...
    if inputBatch == nil {
        select {case <-reg.Ctx.Done():
            process.FreeRegisters(proc)
            return true, nil
        case reg.Ch <- inputBatch:
            return false, nil
        }
    }
}

因为是 Push 模型,因而一个查问打算是通过 Producer Pipeline 触发整个流程的,非生产者的 Pipeline,没有接管到数据,是不会运行。Producer Pipeline 在启动后,就会尝试读取数据,而后通过 channel 将数据发送给另一个 Pipeline,Producer Pipeline 在启动后就会不停的读取数据,只存在两种状况会退出:

  • 数据读取完
  • 产生谬误

当非生产者的 pipeline 没有从 channel 读取 Producer Pipeline 推送的数据时,Producer Pipeline 会阻塞。非生产者的 Pipeline 在启动后并不会立即执行,除非 Producer Pipeline 在 channel 中搁置了数据。Pipeline 在启动后会在以下两种状况退出:

  • 从 channel 中承受到了退出信息
  • 产生谬误

MatrixOne 会依据数据的散布将 Producer Pipeline 调配到具体的节点。在特定的节点接管到 Producer Pipeline 后会依据以后机器和查问打算的状况 (目前是获取机器的核数) 来派生多个 Producer pipeline。其余 Pipeline 的并行度,则是在承受数据的时候确定其并行度。

下边先看一个简略查问:

select * from R where a > 1 limit 10

这个查问存在 Limit Operator,意味着存在上文所述的 Cancel,Limit,Merge Join 等 Pipeline 的终止条件。该查问的 Pipeline 如下所示,它在 2 个 Core 上并行执行。

graph LR
batch0-->restrict0-->limit0-->connector0-->merge-->limit
batch1-->restrict1-->limit1-->connector1-->merge

因为 Limit 的存在,Pipeline 引入了 Merge Operator,与此同时跟调度相干的问题是:

  • Merge 不能无限度地承受多个 Pipeline 的数据,Merge 须要依据内存大小通过 Connector 向上游发送 channel 进行数据读取。
  • Pipeline 数目依据 CPU 数量动静决定,当 Pipeline 不再推送数据时,查问天然终止,因而 Merge 须要标记是否曾经传输完结。

再看一个简单一些的例子,tpch-q3:

select
    l_orderkey,
    sum(l_extendedprice * (1 - l_discount)) as revenue,
    o_orderdate,
    o_shippriority
from
    customer,
    orders,
    lineitem
where
    c_mktsegment = 'HOUSEHOLD'
    and c_custkey = o_custkey
    and l_orderkey = o_orderkey
    and o_orderdate < date '1995-03-29'
    and l_shipdate > date '1995-03-29'
group by
    l_orderkey,
    o_orderdate,
    o_shippriority
order by
    revenue desc,
    o_orderdate
limit 10

假如查问打算如下:

graph TD
A[limit 10] --> B[order]
B --> C[group with agg]
C --> A0[filter]
A0 --> D[join]
D --> E[join]
D --> F[scan customer]
E --> G[scan orders]
E --> H[scan lineitem]

假如三张表的数据均匀分布在两个节点 node0 和 node1 上,那么对应的 Pipeline 如下:

采纳 Push 模型的还有一个潜在长处在于,它跟流计算的 Data Flow 范式 (如 Flink) 容易保持一致。FlinkSQL 会把查问打算的每个 Operator 转为流式 Operator,流式 Operator 会将每个 Operator 的计算结果的更新传给下一个 Operator,这从逻辑上跟 Push 模型是统一的。对于打算在数据库外部实现流引擎的 MatrixOne 来说,这是一个逻辑复用的中央,当然,流引擎远不是只依附 Push 模型就能够解决的,这超出了本文探讨的领域。采纳 Push 模型最初一个潜在长处是,它和查问编译 Codegen 是人造组合,目前 MatrixOne 并没有实现 Codegen,故而这也超出了本文探讨的领域。

以后的 MatrixOne,实现了根本的基于 Push 模型的计算并行调度,在将来,还会在多方面上进行改良,比方针对多查问的混合并发与并行的任务调度,比方当算子因为内存不足须要进行 Spill 解决时,也须要 Pipeline 的调度可能感知并无效解决,既能实现工作,还能最小化 IO 开销,这里边会有很多十分有意思的工作。也欢送对这方面感兴趣的同学跟咱们一起探讨这些层面上的翻新。所以,Push 还是 Pull,这是个问题么?如同是,如同也不是,所有以实际效果为着眼点,并非简略地非黑既白,这代表了一种计算并行调度的思维形式。

参考文献

[1] Shaikhha, Amir and Dashti, Mohammad and Koch, Christoph, Push versus pull-based loop fusion in query engines, Journal of Functional Programming, Cambridge University Press, 2018

[2] Leis, Viktor and Boncz, Peter and Kemper, Alfons and Neumann, Thomas, Morsel-driven parallelism: A NUMA-aware query evaluation framework for the many-core age, SIGMOD 2014

[3] Thomas Neumann, Efficiently compiling efficient query plans for modern hardware, VLDB 2011

[4] https://github.com/duckdb/duc…

[5] https://presentations.clickho…

欢送退出 MatrixOne 社区

官网:matrixorigin.cn
源码:github.com/matrixorigin/matrixone
Slack:matrixoneworkspace.slack.com
知乎 | CSDN | 墨天轮 | OSCHINA | InfoQ:MatrixOrigin

正文完
 0