乐趣区

关于数据库:TinySQL学习笔记之SelectionExec

本文以中的代码对应的 tinysql 我的项目版本号为 df75611

SelectionExec是 tinysql 第五章第一节中要补全的执行器组件,心愿可能通过本文的总结来阐明 tinysql 执行器中的一些概念。

接口

作为执行器组件,SelectionExec实现了 Executor 接口, 其定义位于 tinysql/executor/executor.go L136

// Executor is the physical implementation of a algebra operator.
//
// In TiDB, all algebra operators are implemented as iterators, i.e., they
// support a simple Open-Next-Close protocol. See this paper for more details:
//
// "Volcano-An Extensible and Parallel Query Evaluation System"
//
// Different from Volcano's execution model, a"Next" function call in TiDB will
// return a batch of rows, other than a single row in Volcano.
// NOTE: Executors must call "chk.Reset()" before appending their results to it.
type Executor interface {base() *baseExecutor
    Open(context.Context) error
    Next(ctx context.Context, req *chunk.Chunk) error
    Close() error
    Schema() *expression.Schema}

每一个 Executor 的实现都是从 baseExecutor 扩大进去的,个别都会继承其中的 base/Schema 办法,前者是将 `baseExecutor 返回,而后者返回的是表构造,设计了哪些列和键。

在火山模型中,次要须要应用 Open/Next/Close 三个办法。代码上最根本的逻辑是,当下层 ExecutorNext办法被调用时,被调用的 Executor 通过调用上层 ExecutorNext办法返回的 Chunk,通过肯定的解决来构建本层的返回。简略的这么说比拟宽泛,所以心愿通过分享我本人在写 proj 时浏览代码的集体了解,帮忙大家了解Executor 的串联。

构造体

SelectionExec的定义位于 tinysql/executor/executor.go L345

// SelectionExec represents a filter executor.
type SelectionExec struct {
    baseExecutor                           // 根底构造

    batched     bool                       // 是否以批处理的模式返回后果
    filters     []expression.Expression    // 过滤器表达式列表
    selected    []bool                     // 过滤后果 buffer 
    inputIter   *chunk.Iterator4Chunk      // 迭代器
    inputRow    chunk.Row                  // 迭代以后行
    childResult *chunk.Chunk               // 上层 Executor 返回的后果 buffer
}

其中 baseExecutor 的定义位于 tinysql/executor/executor.go L55

type baseExecutor struct {
    ctx           sessionctx.Context        // 执行上下文
    id            fmt.Stringer              // 标识
    schema        *expression.Schema        // 表构造
    initCap       int                       // Chunk 初始容量
    maxChunkSize  int                       // 返回 Chunk 的最大尺寸
    children      []Executor                // 上层 Executor
    retFieldTypes []*types.FieldType        // 返回的列信息}

办法

上面依据 SelectionExecExecutor接口的实现,来阐明,次要阐明的是 Next 办法

base

实现非常简单,就是间接继承 baseExecutor 的 base 办法

// base returns the baseExecutor of an executor, don't override this method!
func (e *baseExecutor) base() *baseExecutor {return e}

Schema

这里实现也是间接继承了 baseExecutor 的 Schema 办法

// Schema returns the current baseExecutor's schema. If it is nil, then create and return a new one.
func (e *baseExecutor) Schema() *expression.Schema {
    if e.schema == nil {return expression.NewSchema()
    }
    return e.schema
}

Open

SelectionExec对 Open 办法进行了重写, 实质上 Open 办法是进行了初始化操作

// Open implements the Executor Open interface.
func (e *SelectionExec) Open(ctx context.Context) error {
  // 调用 baseExecutor 的初始化,相似 super
    if err := e.baseExecutor.Open(ctx); err != nil {return err}
  // 留神这里这 newFirstChunk 的名字具备肯定的迷惑性,实际上依据上层 Executor 的属性来构建 chunk
    e.childResult = newFirstChunk(e.children[0]) 
  // 判断是否能够 filters 是否能够向量化执行
  // 其实就是查看是否所有的 filter 都能够向量化, 只有所有 filter 都能够向量化,才能够进行批执行
    e.batched = expression.Vectorizable(e.filters)
    if e.batched {
    // 如果能够进行批执行的话,构建一个 bool 切片作为 buffer,来保留过滤器的抉择状况
    // 在这里初始化好了这块空间,只有之后没有产生切片的 resize,那么始终应用的是这块空间
    // 加重内存调配和 GC 的压力
        e.selected = make([]bool, 0, chunk.InitialCapacity)
    }
  // 留神这里仅仅是实现了 iterator 和 chunk 的绑定,此时 chunk 中没有数据,iterator 也没有意义
    e.inputIter = chunk.NewIterator4Chunk(e.childResult)
  // 这里就是指向了一个空 Row
    e.inputRow = e.inputIter.End()
    return nil
}

baseExecutor的实现如下

// Open initializes children recursively and "childrenResults" according to children's schemas.
func (e *baseExecutor) Open(ctx context.Context) error {
  // 实质上就是遍历所有位于上层的 Executor 调用一遍 Open
  // 留神一点就是,位于上层的 Executor 会先于以后 Executor 被初始化
    for _, child := range e.children {err := child.Open(ctx)
        if err != nil {return err}
    }
    return nil
}

Close

SelectionExec对 Close 办法进行了重写, 实质上 Close 办法是进行了资源开释的作用

// Close implements plannercore.Plan Close interface.
func (e *SelectionExec) Close() error {
  // 清空两个 buffer
    e.childResult = nil
    e.selected = nil
    return e.baseExecutor.Close()}

baseExecutor的实现如下

// Close closes all executors and release all resources.
func (e *baseExecutor) Close() error {
    var firstErr error
  // 与 Open 时类似,就是间接调用一遍上层 Executor,for _, src := range e.children {if err := src.Close(); err != nil && firstErr == nil {firstErr = err}
    }
    return firstErr
}

Next

SelectionExec的 Next 办法同样进行了重写,这里也是须要咱们进行填充的局部,然而这段代码内外两层循环乍一看下来有一些令人费解。

// Next implements the Executor Next interface.
func (e *SelectionExec) Next(ctx context.Context, req *chunk.Chunk) error {req.GrowAndReset(e.maxChunkSize)

    if !e.batched {return e.unBatchedNext(ctx, req)
    }

    /*
        Exit the loop when:
            1. the `req` chunk` is full.
            2. there is no further results from child.
            3. meets any error.
     */
    for {
        // Fill in the `req` util it is full or the `inputIter` is fully processed.
        for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {// Your code here.}
        err := Next(ctx, e.children[0], e.childResult)
        if err != nil {return err}
        // no more data.
        if e.childResult.NumRows() == 0 {return nil}
        /* Your code here.
           Process and filter the child result using `expression.VectorizedFilter`.
         */
    }
}

依据课程文档的倡议是要求咱们通过浏览 unBatchedNext 办法来学习函数性能,该函数就是单行解决咱们要实现的逻辑的单行解决版本,并且该函数与咱们要实现的局部的代码构造相似,通过了解这部分代码,也能帮忙咱们晓得批处理时大略的解决流程。

// unBatchedNext filters input rows one by one and returns once an input row is selected.
// For sql with "SETVAR" in filter and "GETVAR" in projection, for example: "SELECT @a FROM t WHERE (@a := 2) > 0",
// we have to set batch size to 1 to do the evaluation of filter and projection.
func (e *SelectionExec) unBatchedNext(ctx context.Context, chk *chunk.Chunk) error {
  // 外循环, 实质上是更新上层后果集
    for {
    // 内循环,实质是一直迭代遍历上层后果集,直到返回一个被选中的行时,插入要返回的 chunk,并
    //
    // 留神第一次调用 Next 的时,因为 `e.inputRow` 是一个空 Row,`e.inputIter.End()` 同样也会返回一个空 Row
    // 因而第一次调用 Next 的时候并不会先进入内循环的逻辑
        for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {selected, _, err := expression.EvalBool(e.ctx, e.filters, e.inputRow)
            if err != nil {return err}
            if selected {chk.AppendRow(e.inputRow)
                e.inputRow = e.inputIter.Next()
                return nil
            }
        }
    
    /* 这里才是第一次执行时的理论开始地位 */
    
    // Next 函数的实质是将调用上层 children[0]的 Next 办法,并将调用后果更新到 childResult 当中
        err := Next(ctx, e.children[0], e.childResult)
        if err != nil {return err}
    // 将 inputRow 定位到更新后的 chunk 第一行
        e.inputRow = e.inputIter.Begin()
        // 如果曾经没有数据了,那就间接返回 nil
        if e.childResult.NumRows() == 0 {return nil}
    }
}

那么依据这块流程的了解,那么咱们就能够大抵补充出代码

// Next implements the Executor Next interface.
func (e *SelectionExec) Next(ctx context.Context, req *chunk.Chunk) error {
  // 这里由 Callee 对后果集 chunk 进行重置
  // 在批处理时,会返回 maxChunkSize 限定大小的后果集
    req.GrowAndReset(e.maxChunkSize)

    if !e.batched {return e.unBatchedNext(ctx, req)
    }

    /*
        Exit the loop when:
            1. the `req` chunk` is full.
            2. there is no further results from child.
            3. meets any error.
    */
    for {
        // Fill in the `req` util it is full or the `inputIter` is fully processed.
        for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {
            /* Your code here. */
      // 依据过滤后果 buffer 中的数据判断以后行是否被选中,如果被选中了则增加到后果集中
            if e.selected[e.inputRow.Idx()] {req.AppendRow(e.inputRow)
        if req.IsFull() { // 如果后果集被填满了,那么须要将 inputRow 未被检索的第一行,并返回
               e.inputRow = e.inputIter.Next()
                    return nil
                }
            }
      
        }
    // 这里是调用 volcano 模型处在上层的子语句的 Next 办法, 并赋值到以后的 childResult 中,更新上层后果集内容
        err := Next(ctx, e.children[0], e.childResult) 
        if err != nil {return err}
        // no more data.
        if e.childResult.NumRows() == 0 {return nil}
        /* Your code here.
           Process and filter the child result using `expression.VectorizedFilter`.
        */
        // 这里次要是反复利用 selected 所申请的空间, 留神肯定要赋值 e.selected, 进行同步扭转
        e.inputRow = e.inputIter.Begin()
    // selectd 保留应用向量 filters 过滤后的后果
        e.selected, err = expression.VectorizedFilter(e.ctx, e.filters, e.inputIter, e.selected) 
        if err != nil {return nil}
    }
}

那么大家可能有一个疑难就是为什么须要应用内外两层循环来实现,这里依照我的集体了解做肯定的解释

其实这里代码结构设计的十分精彩, 整个构造看起来就像是一个变速器。

外循环的作用是充分利用以后在 e.inputRow/e.inputIter/e.selected/e.childResult 中的缓存, 因为上上层的 chunk 可能是有肯定尺寸差距的

内循环的作用是进行后果集的附加, 会有两种退出内层循环的状况

  • 一旦后果集被填充斥, 就退出函数, 实现了以后的解决, 以后还没解决的信息依然保留在 e.inputRow/e.inputIter/e.selected/e.childResult 中, 下一次调用 Next 的时候就能够接着从 e.inputRow 开始
  • 如果遍历完了上层的后果集, 以后层的后果集依然还没有被填满, 那么就会从调用 Next 更新上层的后果集, 而后在依据 filter 进行筛选, 将上层后果集中 filter 过滤后的后果保留到 e.selected 外面, 而后从头开始

上面残缺的形容一下调用 Next 的理论执行流程, 因为理论的执行流程会和代码自上而下的程序不一样, 这也是这段代码构造精妙的中央

  1. 首先要阐明的是 SelectionExec 在调用 Next 之前是须要调用 Open 进行初始化的, 那么 Open 须要关注的是, Open中进行的仅仅是状态的初始化, 并没有执行本质的计算, (e.childResult应用了 newFirstChunk 的时候只是进行了字段 / 容量 / 大小的初始化, 并没有进行内容填充), e.childResult是空的, e.inputItere.inputRow 应该也是空的, 须要在后续步骤中进行初始化.
  2. 第 1 次调用 SelectionExec.Next

    1. 留神的循环条件 e.inputRow != e.inputIter.End() 此时是不成立的, 二者都是空的 Row 构造体, 所以第一次调用 Next 的时候齐全不进入内循环
    2. 调用 Next 将上层数据加载到 e.childResult 当中, 进行一些查看
    3. 更新 e.inputRow 使之对应 e.inputIter 的第一个数据
    4. 应用 expression.VectorizedFilter 依据 e.filters 的条件将上层后果集数据的依据过滤器的过滤后果寄存到e.selected
    5. 回到外循环结尾往下执行, 那么咱们就间接进入了内循环, 在 e.inputRow != e.inputIter.End() 此时曾经成立了, 所以能够进入内循环
    6. 在内循环中, 须要判断后果集是否曾经被填满

      • 如果没有被填满, 那么就依据筛选后果, 思考是否将遍历到的行放到后果集中, 当遍历完结时, 步骤 b 开始持续往下执行
      • 如果曾经被填满, 那么就间接返回. 在上层后果集中遍历的状态保留在 e.inputRow/e.inputIter 中, filter 过滤的后果放在 e.selected 中, 期待下一次 Next 调用的时候再调用
  3. 第 n 次调用SelectionExec.Next

    1. 如果上一次调用 Next 时还有上层后果集的数据没有遍历完,那么过后的遍历状态依然保留在 e.inputRow/e.inputIter/e.selected/e.childResult 中, 那么能够间接从 2.5 开始,进入内循环
    2. 如果上一次调用 Next 时刚好上层后果集的数据也遍历完了,那么 e.inputRow 就会是一个空 Row, 从 2.1 开始往下执行,也就是从新加载上层数据。
退出移动版