乐趣区

关于tidb:TiFlash-源码阅读九TiFlash-中常用算子的设计与实现

本文次要介绍了数据库系统中罕用的算子 Join 和 Aggregation 在 TiFlash 中的执行状况,包含查问打算生成、编译阶段与执行阶段,以冀望读者对 TiFlash 的算子有初步的理解。

视频

https://www.bilibili.com/video/BV1tt4y1875T

算子概要

在浏览本文之前,举荐浏览本系列的前作:计算层 overview,以对 TiFlash 计算层、MPP 框架有肯定理解。

在数据库系统中,算子是执行 SQL 次要逻辑的中央。一条 SQL 会被 parser 解析为一棵算子树(查问打算),而后通过 optimizer 的优化,再交给对应的 executor 执行,如下图所示。

本文的次要内容包含

  1. TiDB 如何生成与优化 MPP 算子与查问打算
  2. Join 算子在 TiFlash 中的编译(编译指的是将 TiDB-server 下发的执行打算片段生成可执行构造的过程,下同)与执行
  3. Aggregation 算子在 TiFlash 中的编译与执行

构建查问打算

一些背景常识:

  1. 逻辑打算与物理打算:能够简略了解为逻辑打算是指算子要做什么,物理打算是指算子怎么去做这件事。比方,“将数据从表 a 和表 b 中读取进去,而后做 join”形容的是逻辑打算;而“在 TiFlash 中做 shuffle hash join”形容的是物理打算。更多信息能够参阅:TiDB 源码浏览系列文章
  2. MPP:大规模并行计算,个别用来形容节点间能够替换数据的并行计算,在以后版本(6.1.0,下同)的 TiDB 中,MPP 运算都产生在 TiFlash 节点上。举荐观看:源码解读 – TiFlash 计算层 overview。MPP 是物理打算级别的概念。

MPP 打算

在 TiDB 中,能够在 SQL 前加上 explain 来查看这条 SQL 的查问打算,如下图所示,是一棵由物理算子组成的树,能够查看 TiDB 执行打算概览 来对其有更多的理解。

MPP 查问打算的独特之处在于查问打算中多出了用于进行数据交换的 ExchangeSender 和 ExchangeReceiver 算子。

执行打算中会有这样的 pattern,代表将会在此处进行数据传输与替换。

     ...
     |_ExchangeReceiver_xx
        |_ ExchangeSender_xx
             …

每个 ExchangeSender 都会有一个 ExchangeType,来标识本次数据传输的类别,包含:

  1. HashPartition,将数据按 Hash 值进行分区之后散发到上游节点。
  2. Broadcast,将本身数据拷贝若干份,播送到所有上游节点中。
  3. PassThrough,将本人的数据全副传给一个指定节点,此时接管方能够是 TiFlash 节点(ExchangeReceiver);也能够是 TiDB-server 节点(TableReader),代表 MPP 运算结束,向 TiDB-server 返回数据。

在下面的查问打算图中,一共有三个 ExchangeSender,id 别离是 19, 13 和 17。其中 ExchangeSender_13 和 ExchangeSender_17 都是将读入后的数据按哈希值 shuffle 到所有节点中,以便进行 join,而 ExchangeSender_19 则是将 join 实现后的数据返回到 TiDB-server 节点中。

增加 Exchange

在优化器的打算摸索过程中,会有两处为查问打算树插入 Exchange 算子:

  1. 一个是 MPP 打算在摸索结束后,接入 TiDB 的 tableReader 时。类型为 passThrough type. 源码在函数 func (t *mppTask) convertToRootTaskImpl
  2. 一个是 MPP 打算在摸索过程中,发现以后算子的 property(这里次要指分区属性)不满足下层要求时。例如下层要求须要按 a 列的 hash 值分区,然而上层算子不能满足这个要求,就会插入一组 Exchange.
func (t *mppTask) enforceExchanger(prop *property.PhysicalProperty) *mppTask {if !t.needEnforceExchanger(prop) {return t}
   return t.copy().(*mppTask).enforceExchangerImpl(prop)
}

// t.partTp 示意以后算子已有的 partition type,prop 示意父算子要求的 partition type
func (t *mppTask) needEnforceExchanger(prop *property.PhysicalProperty) bool {
   switch prop.MPPPartitionTp {
   case property.AnyType:
      return false
   case property.BroadcastType:
      return true
   case property.SinglePartitionType:
      return t.partTp != property.SinglePartitionType
   default:
      if t.partTp != property.HashType {return true}
      if len(prop.MPPPartitionCols) != len(t.hashCols) {return true}
      for i, col := range prop.MPPPartitionCols {if !col.Equal(t.hashCols[i]) {return true}
      }
      return false
   }
}

Property 对于分区属性的要求(MPPPartitionTp)有以下几种:

  1. AnyType,对上层算子没有要求,所以并不需要增加 exchange;
  2. BroadcastType,用于 broadcast join,要求上层节点复制数据并播送到所有节点中,此时肯定须要增加一个 broadcast exchange;
  3. SinglePartitionType,要求上层节点将数据汇总到同一台节点中,此时如果曾经在同一台节点上,则不必再进行 exchange。
  4. HashType,要求上层节点按特定列的哈希值进行分区,如果曾经按要求分好区了,则不必再进行 exchange.

在优化器的生成查问打算的摸索中,每个算子都会对上层有 property 要求,同时也须要满足下层传下来的 property;当高低两层的 property 无奈匹配时,就插入一个 exchange 算子替换数据。依附这些 property,能够不重不漏的插入 exchange 算子。

MPP 算法

是否抉择 MPP 算法是在 TiDB 优化器 生成物理打算 时决定,即 CBO(Cost-Based Optimization) 阶段。优化器会遍历所有可抉择的打算门路,包含含有 MPP 算法的打算与不含有 MPP 算法的打算,预计它们的代价,并抉择其中总代价最小的一个查问打算。

对于以后的 TiDB repo 代码,有四个地位能够触发 MPP 打算的生成,别离对应于 join、agg、window function、projection 四个算子:

  1. func (p *LogicalJoin) tryToGetMppHashJoin
  2. func (la *LogicalAggregation) tryToGetMppHashAggs
  3. func (lw *LogicalWindow) tryToGetMppWindows
  4. func (p *LogicalProjection) exhaustPhysicalPlans

这里只形容具备代表性的 join 和 agg 算子,其余算子同理。

Join

以后 TiDB 反对两种 MPP Join 算法,别离是:

  • Shuffle Hash Join,将两张表的数据各自按 hash key 分区后 shuffle 到各个节点上,而后做 hash join,如上一节中举出的查问打算图所示。
  • Broadcast Join,将小表播送到大表所在的每个节点,而后做 hash join,如下图所示。

tryToGetMppHashJoin 函数在构建 join 算子时给出了对子算子的 property 要求:

if useBCJ { // broadcastJoin
    …
    childrenProps[buildside] = {MPPPartitionTp: BroadcastType}
    childrenProps[1-buildside] = {MPPPartitionTp: AnyType}
    …
} else { // shuffle hash join
    …
    childrenProps[0] = {MPPPartitionTp: HashType, key: leftKeys}
    childrenProps[1] = {MPPPartitionTp: HashType, key: rightKeys}
    …
}

如代码所示,broadcast join 要求 buildside(这里指要播送的小表)具备一个 BroadcastType 的 property,对大表侧则没有要求。而 shuffle hash join 则要求两侧都具备 HashType 的分区属性,分区列别离是 left keys 和 right keys。

Aggregation

以后 tryToGetMppHashAggs 可能生成三种 MPP Aggregation 打算:

1.“一阶段 agg”,要求数据先按 group by key 分区,而后再进行聚合。

2.“两阶段 agg”,首先在本地节点进行第一阶段聚合,而后按 group by key 分区,再进行一次聚合(用 sum 汇总后果)。

3.“scalar agg”,没有分区列的特定状况,在本地节点进行第一阶段聚合,而后汇总到同一台节点上实现第二阶段聚合。

一阶段 agg 和两阶段 agg 的区别是是否先在本地节点做一次预聚合,优化器会依据 SQL 与代价估算来抉择执行哪种形式。对于反复值很多的状况,两阶段 agg 能够在网络传输前缩小很多数据量,从而缩小大量的网络耗费;而如果反复值很少的状况下,这次预聚合并不会缩小很多数据量,反而白白增大了 cpu 与内存耗费,此时就不如应用一阶段 agg。

这里留一个小思考题,这三种 agg 各自对下方有什么 property 要求?在聚合做完之后又满足了怎么的 property?

答案是:

一阶段 agg 要求 hash,做完满足 hash;二阶段 agg 无要求,做完满足 hash;scalar agg 无要求,做完满足 singlePartition.

编译与执行

执行打算构建好之后,TiDB-server 会将 dag(执行打算的片段)下发给对应的 TiFlash 节点。在 TiFlash 节点中,须要首先解析这些执行打算,这个过程咱们称作“编译”,编译的后果是 BlockInputStream,它是 TiFlash 中的可执行构造;而最初一步就是在 TiFlash 中执行这些 BlockInputStream.

下图是一个 BlockInputStream DAG 的例子,每个 BlockInputStream 都有三个办法:readPrefix, read 和 readSuffix;相似于其余火山模型调用 open、next 和 close。

下图的起源是 TiFlash 执行器线程模型 – 知乎专栏 (zhihu.com),对于执行模型更多的内容,能够参考这篇文章或者 TiFlash Overview,这里不再赘述。

Join 的编译与执行

TiDB-server 节点会将查问打算按 Exchange 会作为分界,将查问切分为不同的打算片段(task),作为 dag 发给 TiFlash 节点。比方对于下图中所示的查问打算,会切分为这三个红框。

TiFlash 节点在编译实现后生成的 BlockInputStream 如下,能够在 debug 日志中看到:

task 1
ExchangeSender
 Expression: <final projection>
  Expression: <projection after push down filter>
   Filter: <push down filter>
    DeltaMergeSegmentThread
 
task 2
ExchangeSender
 Expression: <final projection>
  Expression: <projection after push down filter>
   Filter: <push down filter>
    DeltaMergeSegmentThread
 
task 3
CreatingSets
 Union: <for join>
  HashJoinBuildBlockInputStream x 20: <join build, build_side_root_executor_id = ExchangeReceiver_15>, join_kind = Inner
   Expression: <append join key and join filters for build side>
    Expression: <final projection>
     Squashing: <squashing after exchange receiver>
      TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
 Union: <for mpp>
  ExchangeSender x 20
   Expression: <final projection>
    Expression: <remove useless column after join>
     HashJoinProbe: <join probe, join_executor_id = HashJoin_34>
      Expression: <final projection>
       Squashing: <squashing after exchange receiver>
        TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
  

其中 task1 和 task2 是将数据从存储层读出,通过简略的解决之后,发给 ExchangeSender. 在 task3 中,有三个 BlockInpuStream 值得关注,别离是:CreatingSets, HashJoinBuild, HashJoinProbe.

CreatingSetsBlockInputStream

承受一个数据 BlockInputStream 示意 joinProbe,还有若干个代表 JoinBuild 的 Subquery。CreatingSets 会并发启动这些 Subquery, 期待他们执行完结后在开始启动数据 InputStream. 上面两张图别离是 CreatingSets 的 readPrefix 和 read 函数的调用栈。

为什么 CreatingSets 可能同时创立多张哈希表?因为在一个多表 join 中,同一个打算片段可能紧接着做屡次 join porbe,如下图所示:

task:4
CreatingSets
 Union x 2: <for join>
  HashJoinBuildBlockInputStream x 20: <join build, build_side_root_executor_id = ExchangeReceiver_22>, join_kind = Left
   Expression: <append join key and join filters for build side>
    Expression: <final projection>
     Squashing: <squashing after exchange receiver>
      TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
 Union: <for mpp>
  ExchangeSender x 20
   Expression: <final projection>
    Expression: <remove useless column after join>
     HashJoinProbe: <join probe, join_executor_id = HashJoin_50>
      Expression: <final projection>
       Expression: <remove useless column after join>
        HashJoinProbe: <join probe, join_executor_id = HashJoin_14>
         Expression: <final projection>
          Squashing: <squashing after exchange receiver>
           TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
 

Join Build

留神,join 在此处仅代表 hash join,曾经与网络通信和 MPP 级别的算法无关。

对于 join 的代码都在 dbms/src/Interpreters/Join.cpp 中;咱们以上面两张表进行 join 为例来阐明:

left_table l join right_table r 
on l.join_key=r.join_key
where l.b>=r.c 

默认右表做 build 端,左表做 probe 端。哈希表的值应用链式存储:

Join Probe

这里次要形容的是 JoinBlockImpl 这个函数的流程:

1.block 蕴含了左表的内容;创立 added_columns, 即要增加到 block 中的右表的列;而后创立相应的过滤器 replicate_offsets:示意以后共匹配了几行,之后能够用于筛选未匹配上的行,或复制匹配了多行的行。

2. 顺次查找哈希表,依据查找后果调用相应的 addFound 或 addNotFound 函数,填充 added_columns 和过滤器。

从填充的过程中也能够看到,replicate_offsets 左表示意到以后行为止,一共能匹配上的右表的行数。并且 replicate_offsets[i] – replicate_offsets[i-1] 就示意左表第 i 行匹配到的右表的行数。

3. 将 added_column 间接拼接到 block 上,此时会有短暂的 block 行数不统一。

4. 依据过滤器的内容,复制或过滤掉原先左表中的行。

5. 最初在 block 上解决 other condition,则失去了 join 的后果。

上文中形容的是对于失常的“all”join 的状况,须要返回左右表的数据。与之绝对的则是“any”join,示意半连贯,无需返回右表,只需返回左表的数据,则无需应用 replicate_offsets 这个辅助数组,读者能够自行浏览代码。依然在 dbms/src/intepreters/Join.cpp 中。

Aggregation 的编译与执行

还是以一个查问打算以及对应的 BlockInputStream 为例:

task:1
ExchangeSender
 Expression: <final projection>
  Expression: <before order and select>
   Aggregating
    Concat
     Expression: <before aggregation>
      Expression: <projection>
       Expression: <before projection>
        Expression: <final projection>
         DeltaMergeSegmentThread
 
task:2
Union: <for mpp>
 ExchangeSender x 20
  Expression: <final projection>
   Expression: <projection>
    Expression: <before projection>
     Expression: <final projection>
      SharedQuery: <restore concurrency>
       ParallelAggregating, max_threads: 20, final: true
        Expression x 20: <before aggregation>
         Squashing: <squashing after exchange receiver>
          TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Int64>, <exchange_receiver_1, Nullable(Int64)>}

从查问打算中能够看到这是一个两阶段 agg,第一阶段对应 task1,执行聚合的 BlockInputStream 是 Aggregating。第二阶段对应 task2,执行聚合的 BlockInputStream 是 ParallelAggragating。两个 task 通过 Exchange 进行网络数据传输。

在 aggregation 的编译期,会查看以后 pipeline 可能提供的并行度,如果只有 1,则应用 AggregatingBlockInputStream 单线程执行,如果大于 1 则应用 ParallelAggragating 并行执行。

DAGQueryBlockInterpreter::executeAggregation(){if (pipeline.streams.size() > 1){ParallelAggregatingBlockInputStream}else {AggregatingBlockInputStream}
}

AggregatingBlockInputStream 的调用栈如下:

ParallelAggregatingBlockInputStream 外部会分两阶段操作(这里的两阶段是外部执行中的概念,产生在同一台节点上,和查问打算中的两阶段不是一个概念)。partial 阶段别离在 N 个线程构建 HashTable,merge 阶段则将 N 个 HashTable 合并起来,对外输入一个流。调用栈如下:

如果 result 是空,那么会独自调用一次 executeOnBlock 办法,来生成一个默认数据,相似于 count() 没有输出时,会返回一个 0.

两种执行形式都用到了 Aggregator 的 executeOnBlock 办法和 mergeAndConvertToBlocks 办法,他们的调用栈如图所示。前者是理论执行聚合函数的中央,会调用聚合函数的 add 办法,将数据值退出;后者的次要目标是将 ParallelAggregating 并行生成的哈希表合并。

退出移动版