关于数据库:深度解读-KaiwuDB-的排序操作

45次阅读

共计 5454 个字符,预计需要花费 14 分钟才能阅读完成。

一、单节点执行

在单节点环境执行一条简略的 SQL 语句 SELECT * FROM NATION ORDER BY N_NAME。NATION 是一张小表,只有 25 条记录;对第 2 列 N_NAME 进行升序排列。

1. 形象语法树

上述示例中的 SQL 语句通过分析器解析后失去 AST,如下图所示:

2. 逻辑打算

将 AST 转换成一个树状构造的 Plan,称之为逻辑查问打算。形象语法树中的每一个语法元素都被转换成一个查问逻辑单元,例如 scanNode, sortNode, joinNode 等。

逻辑打算能够通过一系列规定进行优化,称之为 RBO(Rule Base Optimization)。

举一个简略的例子,SQL 语句 SELECT FROM t WHERE a + 1 > 4 通过规定改写能够转换为 SELECT FROM t WHERE a > 3。从数据库的打算角度,两者有很大差异。

前者只能扫描全表,每次读取一条记录并计算表达式判断是否合乎过滤条件;后者能够利用 a 列索引信息缩小扫描范畴,即便没有索引也不须要每次进行表达式计算。

例子中的逻辑打算很简略,就是扫描节点 Scan 和排序节点 Sort。命令 Explain SELECT * FROM NATION ORDER BY N_NAME 显示如下:

3. 物理打算

(DistSQLPlanner).PlanAndRun 办法把逻辑打算转换为物理打算,其中递归调用 createPlanForNode 办法生成各个物理算子,交给执行器具体执行。

生成物理打算也是一个优化过程,称之为 CBO(Cost Base Optimization)。例如逻辑打算的表连贯,在具体实现时能够替换程序,也能够有不同的实现办法(Nest Loop Join,Sort Merge Join 和 Hash Join)。

此时就须要通过代价模型(Cost Model)在微小的搜寻空间中寻找一个正当的物理执行打算。这是一个 NP 问题,所以个别只能找到一个绝对最优解。

此外,KaiwuDB 依据底层 KV 数据的散布和预估返回数据集的大小,决定是否须要生成分布式执行打算;这个例子是一个本地执行的物理打算。

逻辑打算节点和物理打算节点并不是一一对应关系,然而这个例子中,逻辑打算中的 Scan 和 Sort 别离对应物理打算中的 TableReader 算子和 Sorter 算子。

4. 执行引擎

最初调用 (DistSQLPlanner).Run 办法执行物理打算。执行引擎采纳火山模型(Volcano),每一层执行算子通过调用下一层的 Next 办法获取一条记录。执行伪代码如下:

5. 排序剖析

上面具体分析排序算子操作,首先理解一下数据在各个阶段的存储格局。
(1)数据格式

在 KV 存储引擎中,数据的存储格局如下:

* 注:Column ID Diff 示意跟前一个列 ID 的差值

TableReader 通过 KV 存储接口,读取一条数据后返回一个 EncDatumRow 对象,外面增加了 KaiwuDB 的编码信息。
::: hljs-center

:::
Sorter 算子的 fill 办法将 EncDatumRow 对象中的数据解码后退出 MemRowContainer 对象的 chunks 里,用于后续的排序。

chunks 是一个二维数组 [][]Datum,Datum 是一个接口,代表 SQL 中的值,具体的实现类包含 DBool, DInt, DString 等。KaiwuDB 中尽量应用原生数据类型,比方 DBool/DInt/DString 别离被定义成 bool/int64/string,并实现了 Datum 接口办法;而 DDecimal 就是一个自定义构造体。

func (s *sortAllProcessor) fill() (ok bool, _ error) {ctx := s.EvalCtx.Ctx()       
    for {        
        //input 是一个 RowChannel 对象,Next 获取下一条数据        
        row, meta := s.input.Next()        
        ......        

        // rows 是一个 MemRowContainer 对象        
        if err := s.rows.AddRow(ctx, row); err != nil {return false, err}    
    }    
    s.rows.Sort(ctx)    
    
    s.i = s.rows.NewFinalIterator(ctx)    
    s.i.Rewind()    
    return true, nil
}

直觉上 []Datum 应该对应表里的一行数据,然而为了缩小 golang 里切片扩容带来的影响,KaiwuDB 将 64 行数据打包放在了一个 []Datum 里。Nation 表的数据在 chunks 中打印显示如下:
::: hljs-center

:::

内存中的 chunks 构造:

UML 类图

(2)排序办法

最罕用的排序执行算子叫做 sortAllProcessorSorter,将全副待排序后果读入后(内存或磁盘),进行一次排序输入最终后果。调用时序图如下:

MemRowContainer 的 Sort 办法实现了内存中对 chunks 数组的排序:

  • 数组长度小于等于 12 时,采纳插入排序;
  • 疾速排序,递归最大深度 2*ceil(lg(n+1));
  • 递归到了最大深度,采纳堆排序;
// At accesses a row at a specific index.
func (c *RowContainer) At(i int) tree.Datums {
    // This is a hot-path: do not add additional checks here. 
    chunk, pos := c.getChunkAndPos(i)    
    return c.chunks[chunk][pos : pos+c.numCols : pos+c.numCols]
}

// Sort sorts data.
// It makes one call to data.Len to determine n, and O(n*log(n)) calls to
// data.Less and data.Swap. The sort is not guaranteed to be stable.
func Sort(data sort.Interface, cancelChecker *CancelChecker) {n := data.Len()   
    quickSort(data, 0, n, maxDepth(n), cancelChecker)
}

// Sort is part of the SortableRowContainer interface.
func (mc *MemRowContainer) Sort(ctx context.Context) {
    mc.invertSorting = false    
    cancelChecker := sqlbase.NewCancelChecker(ctx)    
    sqlbase.Sort(mc, cancelChecker)}

func quickSort(data sort.Interface, a, b, maxDepth int, cancelChecker 
*CancelChecker) {
    for b-a > 12 { // Use ShellSort for slices <= 12 elements
        if maxDepth == 0 {heapSort(data, a, b, cancelChecker)            
            return        
        }        
        maxDepth--        
        // Short-circuit sort if necessary.        
        if cancelChecker.Check() != nil {return}       
        mlo, mhi := doPivot(data, a, b)        
        // Avoiding recursion on the larger subproblem guarantees        
        // a stack depth of at most lg(b-a).        
        if mlo-a < b-mhi {quickSort(data, a, mlo, maxDepth, cancelChecker)            
            a = mhi // i.e., quickSort(data, mhi, b)        
        } else {quickSort(data, mhi, b, maxDepth, cancelChecker)            
            b = mlo // i.e., quickSort(data, a, mlo)        
        }    
    }    
    if b-a > 1 {        
        // Do ShellSort pass with gap 6        
        // It could be written in this simplified form cause b-a <= 12        
        for i := a + 6; i < b; i++ {if data.Less(i, i-6) {data.Swap(i, i-6)            
            }        
        }        
        insertionSort(data, a, b)    
    }
}// At accesses a row at a specific index.
func (c *RowContainer) At(i int) tree.Datums {
    // This is a hot-path: do not add additional checks here. 
    chunk, pos := c.getChunkAndPos(i)    
    return c.chunks[chunk][pos : pos+c.numCols : pos+c.numCols]
}

// Sort sorts data.
// It makes one call to data.Len to determine n, and O(n*log(n)) calls to
// data.Less and data.Swap. The sort is not guaranteed to be stable.
func Sort(data sort.Interface, cancelChecker *CancelChecker) {n := data.Len()   
    quickSort(data, 0, n, maxDepth(n), cancelChecker)
}

// Sort is part of the SortableRowContainer interface.
func (mc *MemRowContainer) Sort(ctx context.Context) {
    mc.invertSorting = false    
    cancelChecker := sqlbase.NewCancelChecker(ctx)    
    sqlbase.Sort(mc, cancelChecker)}

func quickSort(data sort.Interface, a, b, maxDepth int, cancelChecker 
*CancelChecker) {
    for b-a > 12 { // Use ShellSort for slices <= 12 elements
        if maxDepth == 0 {heapSort(data, a, b, cancelChecker)            
            return        
        }        
        maxDepth--        
        // Short-circuit sort if necessary.        
        if cancelChecker.Check() != nil {return}       
        mlo, mhi := doPivot(data, a, b)        
        // Avoiding recursion on the larger subproblem guarantees        
        // a stack depth of at most lg(b-a).        
        if mlo-a < b-mhi {quickSort(data, a, mlo, maxDepth, cancelChecker)            
            a = mhi // i.e., quickSort(data, mhi, b)        
        } else {quickSort(data, mhi, b, maxDepth, cancelChecker)            
            b = mlo // i.e., quickSort(data, a, mlo)        
        }    
    }    
    if b-a > 1 {        
        // Do ShellSort pass with gap 6        
        // It could be written in this simplified form cause b-a <= 12        
        for i := a + 6; i < b; i++ {if data.Less(i, i-6) {data.Swap(i, i-6)            
            }        
        }        
        insertionSort(data, a, b)    
    }
}

如果须要排序的数据超出了阈值(WorkMemBytes,默认 64MB),会调用 spillToDisk 办法将 MemRowContainer 中的数据写入 DiskRowContainer 中,后者将 OrderBy 列的信息作为 Key 值写入 KV 存储引擎。

此外为了进步 KV 写入速度,DiskRowContainer 不会每次只写一条数据,而是有一个 buffer 负责累积一批键值对,而后一起写入。

其余的两种排序执行算子包含:

  • sortTopkProcessor: Limit 下推到 Sorter 时,算子只调配 N 行的排序空间,而后进行堆排序;
  • sortChunksProcessor: 多列排序时,如果前 i 列 (0 < i < N) 曾经有序,算子逐行读入输出数据,直到前 i 列呈现不同值;反复对读入的批次进行排序。解决完数据集后前 i+1 列曾经有序,迭代后面的步骤对后续列进行排序,直到后果集多列排序实现。

二、多节点执行

在三节点环境上执行 SQL 语句 SELECT * FROM LINEITEM ORDER BY L_SHIPDATE。LINEITEM 是一张大表,有约 6000 万条数据;对第 11 列 L_SHIPDATE 进行升序排列。

1. 逻辑打算

逻辑打算和单节点环境的打算类似。

2. 物理打算

KaiwuDB 在多节点环境中将数据分片放在不同的节点中,每个分片数据有多个备份。示意图如下:

分布式执行打算依据 Span 信息分为多个 tableReader 算子在多个节点上执行;AddNoGroupingStage 办法将 sorter 算子退出到所有的 tableReader 算子之后,对各个节点上的分片数据进行排序。

最初 FinalizePlan 办法会减少一个 No-op 算子,归并汇总最终的后果集。生成的分布式物理打算如下:

正文完
 0