本文以中的代码对应的tinysql我的项目版本号为df75611

SelectionExec是tinysql第五章第一节中要补全的执行器组件,心愿可能通过本文的总结来阐明tinysql执行器中的一些概念。

接口

作为执行器组件,SelectionExec实现了Executor接口, 其定义位于tinysql/executor/executor.go L136

// Executor is the physical implementation of a algebra operator.//// In TiDB, all algebra operators are implemented as iterators, i.e., they// support a simple Open-Next-Close protocol. See this paper for more details://// "Volcano-An Extensible and Parallel Query Evaluation System"//// Different from Volcano's execution model, a "Next" function call in TiDB will// return a batch of rows, other than a single row in Volcano.// NOTE: Executors must call "chk.Reset()" before appending their results to it.type Executor interface {    base() *baseExecutor    Open(context.Context) error    Next(ctx context.Context, req *chunk.Chunk) error    Close() error    Schema() *expression.Schema}

每一个Executor的实现都是从baseExecutor扩大进去的,个别都会继承其中的base/Schema办法,前者是将`baseExecutor返回,而后者返回的是表构造,设计了哪些列和键。

在火山模型中,次要须要应用Open/Next/Close三个办法。代码上最根本的逻辑是,当下层ExecutorNext办法被调用时,被调用的Executor通过调用上层ExecutorNext办法返回的Chunk,通过肯定的解决来构建本层的返回。简略的这么说比拟宽泛,所以心愿通过分享我本人在写proj时浏览代码的集体了解,帮忙大家了解Executor的串联。

构造体

SelectionExec的定义位于tinysql/executor/executor.go L345

// SelectionExec represents a filter executor.type SelectionExec struct {    baseExecutor                           // 根底构造    batched     bool                       // 是否以批处理的模式返回后果    filters     []expression.Expression    // 过滤器表达式列表    selected    []bool                     // 过滤后果buffer     inputIter   *chunk.Iterator4Chunk      // 迭代器    inputRow    chunk.Row                  // 迭代以后行    childResult *chunk.Chunk               // 上层Executor返回的后果buffer}

其中baseExecutor的定义位于tinysql/executor/executor.go L55

type baseExecutor struct {    ctx           sessionctx.Context        // 执行上下文    id            fmt.Stringer              // 标识    schema        *expression.Schema        // 表构造    initCap       int                       // Chunk初始容量    maxChunkSize  int                       // 返回Chunk的最大尺寸    children      []Executor                // 上层Executor    retFieldTypes []*types.FieldType        // 返回的列信息}

办法

上面依据SelectionExecExecutor接口的实现,来阐明,次要阐明的是Next办法

base

实现非常简单,就是间接继承baseExecutor的base办法

// base returns the baseExecutor of an executor, don't override this method!func (e *baseExecutor) base() *baseExecutor {    return e}

Schema

这里实现也是间接继承了baseExecutor的Schema办法

// Schema returns the current baseExecutor's schema. If it is nil, then create and return a new one.func (e *baseExecutor) Schema() *expression.Schema {    if e.schema == nil {        return expression.NewSchema()    }    return e.schema}

Open

SelectionExec对Open办法进行了重写, 实质上Open办法是进行了初始化操作

// Open implements the Executor Open interface.func (e *SelectionExec) Open(ctx context.Context) error {  // 调用baseExecutor的初始化,相似super    if err := e.baseExecutor.Open(ctx); err != nil {         return err    }  // 留神这里这newFirstChunk的名字具备肯定的迷惑性,实际上依据上层Executor的属性来构建chunk    e.childResult = newFirstChunk(e.children[0])   // 判断是否能够filters是否能够向量化执行  // 其实就是查看是否所有的filter都能够向量化, 只有所有filter都能够向量化,才能够进行批执行    e.batched = expression.Vectorizable(e.filters)    if e.batched {    // 如果能够进行批执行的话,构建一个bool切片作为buffer,来保留过滤器的抉择状况    // 在这里初始化好了这块空间,只有之后没有产生切片的resize,那么始终应用的是这块空间    // 加重内存调配和GC的压力        e.selected = make([]bool, 0, chunk.InitialCapacity)    }  // 留神这里仅仅是实现了iterator和chunk的绑定,此时chunk中没有数据,iterator也没有意义    e.inputIter = chunk.NewIterator4Chunk(e.childResult)  // 这里就是指向了一个空Row    e.inputRow = e.inputIter.End()    return nil}

baseExecutor的实现如下

// Open initializes children recursively and "childrenResults" according to children's schemas.func (e *baseExecutor) Open(ctx context.Context) error {  // 实质上就是遍历所有位于上层的Executor调用一遍Open  // 留神一点就是,位于上层的Executor会先于以后Executor被初始化    for _, child := range e.children {        err := child.Open(ctx)        if err != nil {            return err        }    }    return nil}

Close

SelectionExec对Close办法进行了重写, 实质上Close办法是进行了资源开释的作用

// Close implements plannercore.Plan Close interface.func (e *SelectionExec) Close() error {  // 清空两个buffer    e.childResult = nil    e.selected = nil    return e.baseExecutor.Close()}

baseExecutor的实现如下

// Close closes all executors and release all resources.func (e *baseExecutor) Close() error {    var firstErr error  // 与Open时类似,就是间接调用一遍上层Executor,    for _, src := range e.children {        if err := src.Close(); err != nil && firstErr == nil {            firstErr = err        }    }    return firstErr}

Next

SelectionExec的Next办法同样进行了重写,这里也是须要咱们进行填充的局部,然而这段代码内外两层循环乍一看下来有一些令人费解。

// Next implements the Executor Next interface.func (e *SelectionExec) Next(ctx context.Context, req *chunk.Chunk) error {    req.GrowAndReset(e.maxChunkSize)    if !e.batched {        return e.unBatchedNext(ctx, req)    }    /*        Exit the loop when:            1. the `req` chunk` is full.            2. there is no further results from child.            3. meets any error.     */    for {        // Fill in the `req` util it is full or the `inputIter` is fully processed.        for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {            // Your code here.        }        err := Next(ctx, e.children[0], e.childResult)        if err != nil {            return err        }        // no more data.        if e.childResult.NumRows() == 0 {            return nil        }        /* Your code here.           Process and filter the child result using `expression.VectorizedFilter`.         */    }}

依据课程文档的倡议是要求咱们通过浏览unBatchedNext办法来学习函数性能,该函数就是单行解决咱们要实现的逻辑的单行解决版本,并且该函数与咱们要实现的局部的代码构造相似,通过了解这部分代码,也能帮忙咱们晓得批处理时大略的解决流程。

// unBatchedNext filters input rows one by one and returns once an input row is selected.// For sql with "SETVAR" in filter and "GETVAR" in projection, for example: "SELECT @a FROM t WHERE (@a := 2) > 0",// we have to set batch size to 1 to do the evaluation of filter and projection.func (e *SelectionExec) unBatchedNext(ctx context.Context, chk *chunk.Chunk) error {  // 外循环, 实质上是更新上层后果集    for {    // 内循环,实质是一直迭代遍历上层后果集,直到返回一个被选中的行时,插入要返回的chunk,并    //    // 留神第一次调用Next的时,因为`e.inputRow`是一个空Row,`e.inputIter.End()`同样也会返回一个空Row    // 因而第一次调用Next的时候并不会先进入内循环的逻辑        for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {            selected, _, err := expression.EvalBool(e.ctx, e.filters, e.inputRow)            if err != nil {                return err            }            if selected {                chk.AppendRow(e.inputRow)                e.inputRow = e.inputIter.Next()                return nil            }        }        /* 这里才是第一次执行时的理论开始地位 */        // Next函数的实质是将调用上层children[0]的Next办法,并将调用后果更新到childResult当中        err := Next(ctx, e.children[0], e.childResult)        if err != nil {            return err        }    // 将inputRow定位到更新后的chunk第一行        e.inputRow = e.inputIter.Begin()        // 如果曾经没有数据了,那就间接返回nil        if e.childResult.NumRows() == 0 {            return nil        }    }}

那么依据这块流程的了解,那么咱们就能够大抵补充出代码

// Next implements the Executor Next interface.func (e *SelectionExec) Next(ctx context.Context, req *chunk.Chunk) error {  // 这里由Callee对后果集chunk进行重置  // 在批处理时,会返回maxChunkSize限定大小的后果集    req.GrowAndReset(e.maxChunkSize)    if !e.batched {        return e.unBatchedNext(ctx, req)    }    /*        Exit the loop when:            1. the `req` chunk` is full.            2. there is no further results from child.            3. meets any error.    */    for {        // Fill in the `req` util it is full or the `inputIter` is fully processed.        for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {            /* Your code here. */      // 依据过滤后果buffer中的数据判断以后行是否被选中,如果被选中了则增加到后果集中            if e.selected[e.inputRow.Idx()] {                req.AppendRow(e.inputRow)        if req.IsFull() { // 如果后果集被填满了,那么须要将inputRow未被检索的第一行,并返回               e.inputRow = e.inputIter.Next()                    return nil                }            }              }    // 这里是调用volcano模型处在上层的子语句的Next办法, 并赋值到以后的childResult中,更新上层后果集内容        err := Next(ctx, e.children[0], e.childResult)         if err != nil {            return err        }        // no more data.        if e.childResult.NumRows() == 0 {            return nil        }        /* Your code here.           Process and filter the child result using `expression.VectorizedFilter`.        */        // 这里次要是反复利用selected所申请的空间, 留神肯定要赋值e.selected, 进行同步扭转        e.inputRow = e.inputIter.Begin()    // selectd保留应用向量filters过滤后的后果        e.selected, err = expression.VectorizedFilter(e.ctx, e.filters, e.inputIter, e.selected)         if err != nil {            return nil        }    }}

那么大家可能有一个疑难就是为什么须要应用内外两层循环来实现,这里依照我的集体了解做肯定的解释

其实这里代码结构设计的十分精彩, 整个构造看起来就像是一个变速器。

外循环的作用是充分利用以后在e.inputRow/e.inputIter/e.selected/e.childResult中的缓存, 因为上上层的chunk可能是有肯定尺寸差距的

内循环的作用是进行后果集的附加, 会有两种退出内层循环的状况

  • 一旦后果集被填充斥, 就退出函数, 实现了以后的解决,以后还没解决的信息依然保留在e.inputRow/e.inputIter/e.selected/e.childResult中, 下一次调用Next的时候就能够接着从e.inputRow开始
  • 如果遍历完了上层的后果集, 以后层的后果集依然还没有被填满, 那么就会从调用Next更新上层的后果集, 而后在依据filter进行筛选, 将上层后果集中filter过滤后的后果保留到e.selected外面, 而后从头开始

上面残缺的形容一下调用Next的理论执行流程, 因为理论的执行流程会和代码自上而下的程序不一样, 这也是这段代码构造精妙的中央

  1. 首先要阐明的是SelectionExec在调用Next之前是须要调用Open进行初始化的, 那么Open须要关注的是, Open中进行的仅仅是状态的初始化, 并没有执行本质的计算, (e.childResult应用了newFirstChunk的时候只是进行了字段/容量/大小的初始化, 并没有进行内容填充), e.childResult是空的, e.inputItere.inputRow应该也是空的, 须要在后续步骤中进行初始化.
  2. 第1次调用SelectionExec.Next

    1. 留神的循环条件e.inputRow != e.inputIter.End()此时是不成立的, 二者都是空的Row构造体, 所以第一次调用Next的时候齐全不进入内循环
    2. 调用Next将上层数据加载到e.childResult当中, 进行一些查看
    3. 更新e.inputRow使之对应e.inputIter的第一个数据
    4. 应用expression.VectorizedFilter依据e.filters的条件将上层后果集数据的依据过滤器的过滤后果寄存到e.selected
    5. 回到外循环结尾往下执行, 那么咱们就间接进入了内循环, 在e.inputRow != e.inputIter.End()此时曾经成立了, 所以能够进入内循环
    6. 在内循环中, 须要判断后果集是否曾经被填满

      • 如果没有被填满, 那么就依据筛选后果, 思考是否将遍历到的行放到后果集中, 当遍历完结时, 步骤b开始持续往下执行
      • 如果曾经被填满, 那么就间接返回. 在上层后果集中遍历的状态保留在e.inputRow/e.inputIter中, filter过滤的后果放在e.selected中, 期待下一次Next调用的时候再调用
  3. 第n次调用SelectionExec.Next

    1. 如果上一次调用Next时还有上层后果集的数据没有遍历完,那么过后的遍历状态依然保留在e.inputRow/e.inputIter/e.selected/e.childResult中, 那么能够间接从2.5开始,进入内循环
    2. 如果上一次调用Next时刚好上层后果集的数据也遍历完了,那么e.inputRow就会是一个空Row, 从2.1开始往下执行,也就是从新加载上层数据。