本文以中的代码对应的 tinysql 我的项目版本号为 df75611
SelectionExec
是 tinysql 第五章第一节中要补全的执行器组件,心愿可能通过本文的总结来阐明 tinysql 执行器中的一些概念。
接口
作为执行器组件,SelectionExec
实现了 Executor
接口, 其定义位于 tinysql/executor/executor.go L136
// Executor is the physical implementation of a algebra operator.
//
// In TiDB, all algebra operators are implemented as iterators, i.e., they
// support a simple Open-Next-Close protocol. See this paper for more details:
//
// "Volcano-An Extensible and Parallel Query Evaluation System"
//
// Different from Volcano's execution model, a"Next" function call in TiDB will
// return a batch of rows, other than a single row in Volcano.
// NOTE: Executors must call "chk.Reset()" before appending their results to it.
type Executor interface {base() *baseExecutor
Open(context.Context) error
Next(ctx context.Context, req *chunk.Chunk) error
Close() error
Schema() *expression.Schema}
每一个 Executor
的实现都是从 baseExecutor
扩大进去的,个别都会继承其中的 base
/Schema
办法,前者是将 `baseExecutor
返回,而后者返回的是表构造,设计了哪些列和键。
在火山模型中,次要须要应用 Open
/Next
/Close
三个办法。代码上最根本的逻辑是,当下层 Executor
的Next
办法被调用时,被调用的 Executor
通过调用上层 Executor
的Next
办法返回的 Chunk
,通过肯定的解决来构建本层的返回。简略的这么说比拟宽泛,所以心愿通过分享我本人在写 proj 时浏览代码的集体了解,帮忙大家了解Executor
的串联。
构造体
SelectionExec
的定义位于 tinysql/executor/executor.go L345
// SelectionExec represents a filter executor.
type SelectionExec struct {
baseExecutor // 根底构造
batched bool // 是否以批处理的模式返回后果
filters []expression.Expression // 过滤器表达式列表
selected []bool // 过滤后果 buffer
inputIter *chunk.Iterator4Chunk // 迭代器
inputRow chunk.Row // 迭代以后行
childResult *chunk.Chunk // 上层 Executor 返回的后果 buffer
}
其中 baseExecutor
的定义位于 tinysql/executor/executor.go L55
type baseExecutor struct {
ctx sessionctx.Context // 执行上下文
id fmt.Stringer // 标识
schema *expression.Schema // 表构造
initCap int // Chunk 初始容量
maxChunkSize int // 返回 Chunk 的最大尺寸
children []Executor // 上层 Executor
retFieldTypes []*types.FieldType // 返回的列信息}
办法
上面依据 SelectionExec
对Executor
接口的实现,来阐明,次要阐明的是 Next 办法
base
实现非常简单,就是间接继承 baseExecutor
的 base 办法
// base returns the baseExecutor of an executor, don't override this method!
func (e *baseExecutor) base() *baseExecutor {return e}
Schema
这里实现也是间接继承了 baseExecutor
的 Schema 办法
// Schema returns the current baseExecutor's schema. If it is nil, then create and return a new one.
func (e *baseExecutor) Schema() *expression.Schema {
if e.schema == nil {return expression.NewSchema()
}
return e.schema
}
Open
SelectionExec
对 Open 办法进行了重写, 实质上 Open 办法是进行了初始化操作
// Open implements the Executor Open interface.
func (e *SelectionExec) Open(ctx context.Context) error {
// 调用 baseExecutor 的初始化,相似 super
if err := e.baseExecutor.Open(ctx); err != nil {return err}
// 留神这里这 newFirstChunk 的名字具备肯定的迷惑性,实际上依据上层 Executor 的属性来构建 chunk
e.childResult = newFirstChunk(e.children[0])
// 判断是否能够 filters 是否能够向量化执行
// 其实就是查看是否所有的 filter 都能够向量化, 只有所有 filter 都能够向量化,才能够进行批执行
e.batched = expression.Vectorizable(e.filters)
if e.batched {
// 如果能够进行批执行的话,构建一个 bool 切片作为 buffer,来保留过滤器的抉择状况
// 在这里初始化好了这块空间,只有之后没有产生切片的 resize,那么始终应用的是这块空间
// 加重内存调配和 GC 的压力
e.selected = make([]bool, 0, chunk.InitialCapacity)
}
// 留神这里仅仅是实现了 iterator 和 chunk 的绑定,此时 chunk 中没有数据,iterator 也没有意义
e.inputIter = chunk.NewIterator4Chunk(e.childResult)
// 这里就是指向了一个空 Row
e.inputRow = e.inputIter.End()
return nil
}
baseExecutor
的实现如下
// Open initializes children recursively and "childrenResults" according to children's schemas.
func (e *baseExecutor) Open(ctx context.Context) error {
// 实质上就是遍历所有位于上层的 Executor 调用一遍 Open
// 留神一点就是,位于上层的 Executor 会先于以后 Executor 被初始化
for _, child := range e.children {err := child.Open(ctx)
if err != nil {return err}
}
return nil
}
Close
SelectionExec
对 Close 办法进行了重写, 实质上 Close 办法是进行了资源开释的作用
// Close implements plannercore.Plan Close interface.
func (e *SelectionExec) Close() error {
// 清空两个 buffer
e.childResult = nil
e.selected = nil
return e.baseExecutor.Close()}
baseExecutor
的实现如下
// Close closes all executors and release all resources.
func (e *baseExecutor) Close() error {
var firstErr error
// 与 Open 时类似,就是间接调用一遍上层 Executor,for _, src := range e.children {if err := src.Close(); err != nil && firstErr == nil {firstErr = err}
}
return firstErr
}
Next
SelectionExec
的 Next 办法同样进行了重写,这里也是须要咱们进行填充的局部,然而这段代码内外两层循环乍一看下来有一些令人费解。
// Next implements the Executor Next interface.
func (e *SelectionExec) Next(ctx context.Context, req *chunk.Chunk) error {req.GrowAndReset(e.maxChunkSize)
if !e.batched {return e.unBatchedNext(ctx, req)
}
/*
Exit the loop when:
1. the `req` chunk` is full.
2. there is no further results from child.
3. meets any error.
*/
for {
// Fill in the `req` util it is full or the `inputIter` is fully processed.
for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {// Your code here.}
err := Next(ctx, e.children[0], e.childResult)
if err != nil {return err}
// no more data.
if e.childResult.NumRows() == 0 {return nil}
/* Your code here.
Process and filter the child result using `expression.VectorizedFilter`.
*/
}
}
依据课程文档的倡议是要求咱们通过浏览 unBatchedNext 办法来学习函数性能,该函数就是单行解决咱们要实现的逻辑的单行解决版本,并且该函数与咱们要实现的局部的代码构造相似,通过了解这部分代码,也能帮忙咱们晓得批处理时大略的解决流程。
// unBatchedNext filters input rows one by one and returns once an input row is selected.
// For sql with "SETVAR" in filter and "GETVAR" in projection, for example: "SELECT @a FROM t WHERE (@a := 2) > 0",
// we have to set batch size to 1 to do the evaluation of filter and projection.
func (e *SelectionExec) unBatchedNext(ctx context.Context, chk *chunk.Chunk) error {
// 外循环, 实质上是更新上层后果集
for {
// 内循环,实质是一直迭代遍历上层后果集,直到返回一个被选中的行时,插入要返回的 chunk,并
//
// 留神第一次调用 Next 的时,因为 `e.inputRow` 是一个空 Row,`e.inputIter.End()` 同样也会返回一个空 Row
// 因而第一次调用 Next 的时候并不会先进入内循环的逻辑
for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {selected, _, err := expression.EvalBool(e.ctx, e.filters, e.inputRow)
if err != nil {return err}
if selected {chk.AppendRow(e.inputRow)
e.inputRow = e.inputIter.Next()
return nil
}
}
/* 这里才是第一次执行时的理论开始地位 */
// Next 函数的实质是将调用上层 children[0]的 Next 办法,并将调用后果更新到 childResult 当中
err := Next(ctx, e.children[0], e.childResult)
if err != nil {return err}
// 将 inputRow 定位到更新后的 chunk 第一行
e.inputRow = e.inputIter.Begin()
// 如果曾经没有数据了,那就间接返回 nil
if e.childResult.NumRows() == 0 {return nil}
}
}
那么依据这块流程的了解,那么咱们就能够大抵补充出代码
// Next implements the Executor Next interface.
func (e *SelectionExec) Next(ctx context.Context, req *chunk.Chunk) error {
// 这里由 Callee 对后果集 chunk 进行重置
// 在批处理时,会返回 maxChunkSize 限定大小的后果集
req.GrowAndReset(e.maxChunkSize)
if !e.batched {return e.unBatchedNext(ctx, req)
}
/*
Exit the loop when:
1. the `req` chunk` is full.
2. there is no further results from child.
3. meets any error.
*/
for {
// Fill in the `req` util it is full or the `inputIter` is fully processed.
for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {
/* Your code here. */
// 依据过滤后果 buffer 中的数据判断以后行是否被选中,如果被选中了则增加到后果集中
if e.selected[e.inputRow.Idx()] {req.AppendRow(e.inputRow)
if req.IsFull() { // 如果后果集被填满了,那么须要将 inputRow 未被检索的第一行,并返回
e.inputRow = e.inputIter.Next()
return nil
}
}
}
// 这里是调用 volcano 模型处在上层的子语句的 Next 办法, 并赋值到以后的 childResult 中,更新上层后果集内容
err := Next(ctx, e.children[0], e.childResult)
if err != nil {return err}
// no more data.
if e.childResult.NumRows() == 0 {return nil}
/* Your code here.
Process and filter the child result using `expression.VectorizedFilter`.
*/
// 这里次要是反复利用 selected 所申请的空间, 留神肯定要赋值 e.selected, 进行同步扭转
e.inputRow = e.inputIter.Begin()
// selectd 保留应用向量 filters 过滤后的后果
e.selected, err = expression.VectorizedFilter(e.ctx, e.filters, e.inputIter, e.selected)
if err != nil {return nil}
}
}
那么大家可能有一个疑难就是为什么须要应用内外两层循环来实现,这里依照我的集体了解做肯定的解释
其实这里代码结构设计的十分精彩, 整个构造看起来就像是一个变速器。
外循环的作用是充分利用以后在 e.inputRow
/e.inputIter
/e.selected
/e.childResult
中的缓存, 因为上上层的 chunk 可能是有肯定尺寸差距的
内循环的作用是进行后果集的附加, 会有两种退出内层循环的状况
- 一旦后果集被填充斥, 就退出函数, 实现了以后的解决, 以后还没解决的信息依然保留在
e.inputRow
/e.inputIter
/e.selected
/e.childResult
中, 下一次调用 Next 的时候就能够接着从e.inputRow
开始 - 如果遍历完了上层的后果集, 以后层的后果集依然还没有被填满, 那么就会从调用
Next
更新上层的后果集, 而后在依据 filter 进行筛选, 将上层后果集中 filter 过滤后的后果保留到e.selected
外面, 而后从头开始
上面残缺的形容一下调用 Next 的理论执行流程, 因为理论的执行流程会和代码自上而下的程序不一样, 这也是这段代码构造精妙的中央
- 首先要阐明的是
SelectionExec
在调用Next
之前是须要调用Open
进行初始化的, 那么Open
须要关注的是,Open
中进行的仅仅是状态的初始化, 并没有执行本质的计算, (e.childResult
应用了newFirstChunk
的时候只是进行了字段 / 容量 / 大小的初始化, 并没有进行内容填充),e.childResult
是空的,e.inputIter
和e.inputRow
应该也是空的, 须要在后续步骤中进行初始化. -
第 1 次调用 SelectionExec.Next
- 留神的循环条件
e.inputRow != e.inputIter.End()
此时是不成立的, 二者都是空的 Row 构造体, 所以第一次调用 Next 的时候齐全不进入内循环 - 调用 Next 将上层数据加载到
e.childResult
当中, 进行一些查看 - 更新
e.inputRow
使之对应e.inputIter
的第一个数据 - 应用
expression.VectorizedFilter
依据e.filters
的条件将上层后果集数据的依据过滤器的过滤后果寄存到e.selected
- 回到外循环结尾往下执行, 那么咱们就间接进入了内循环, 在
e.inputRow != e.inputIter.End()
此时曾经成立了, 所以能够进入内循环 -
在内循环中, 须要判断后果集是否曾经被填满
- 如果没有被填满, 那么就依据筛选后果, 思考是否将遍历到的行放到后果集中, 当遍历完结时, 步骤 b 开始持续往下执行
- 如果曾经被填满, 那么就间接返回. 在上层后果集中遍历的状态保留在
e.inputRow
/e.inputIter
中, filter 过滤的后果放在e.selected
中, 期待下一次 Next 调用的时候再调用
- 留神的循环条件
-
第 n 次调用
SelectionExec.Next
- 如果上一次调用
Next
时还有上层后果集的数据没有遍历完,那么过后的遍历状态依然保留在e.inputRow
/e.inputIter
/e.selected
/e.childResult
中, 那么能够间接从 2.5 开始,进入内循环 - 如果上一次调用
Next
时刚好上层后果集的数据也遍历完了,那么e.inputRow
就会是一个空 Row, 从 2.1 开始往下执行,也就是从新加载上层数据。
- 如果上一次调用