乐趣区

关于边缘计算:eKuiper-源码解读从一条-SQL-到流处理任务的旅程

概述

LF Edge eKuiper 是 Golang 实现的轻量级物联网边缘剖析、流式解决开源软件,能够运行在各类资源受限的边缘设施上。eKuiper 的次要指标是在边缘端提供一个流媒体软件框架。其 规定引擎 容许用户提供基于 SQL 或基于图形(相似于 Node-RED)的规定,在几分钟内创立物联网边缘剖析利用。

本文中,咱们将以源码为脉络,论述一条 SQL 从被 eKuiper 接管后,是如何从一条文本变成一个可执行的处理过程。通过本文,你能够理解到以下内容:

  1. 一个 SQL 计算引擎根本的解决流程
  2. eKuiper 在每个解决流程中的具体代码节点

筹备

为了更加直观地理解到 eKuiper 外部的代码运行逻辑,在解说 eKuiper 规定引擎的处理过程中,咱们会波及到 eKuiper 中的一部分代码,并对其中的要害局部进行较为具体的解说。

为了更好地了解之后的内容,你须要理解:

  1. eKuiper 我的项目:https://github.com/lf-edge/ekuiper
  2. Golang 的根底用法

框架

从 eKuiper 接管到 SQL 的文本,到最终依据这个 SQL 的语义去做出相应的读取与计算工作。eKuiper 外部的 SQL 计算引擎在其中承当了解析、结构、优化与运行这总共 4 局部工作,即咱们之后将重点关注 SQL 处理过程中的以下几个环节:

  1. SQL Parser 将 SQL 文本转换为 AST 对象
  2. 基于 AST 对象生成逻辑打算
  3. 优化逻辑打算并生成执行算子
  4. 运行执行算子,开始读取数据与计算并最终将后果写入到上游

从 SQL 文本到执行算子树

从这一节开始,咱们将开始依据 eKuiper 中的代码节点,来了解一条 SQL 文本是如何一步步被最终转换为一个能够被理论执行的算子树。

以下代码理论展现了 eKuiper 代码中解析文本、优化打算、结构执行算子这几个解决流程,咱们将一一进行开展理解。

func PlanSQLWithSourcesAndSinks(rule *api.Rule, sources []*node.SourceNode, sinks []*node.SinkNode) (*topo.Topo, error) {
    sql := rule.Sql
    conf.Log.Infof("Init rule with options %+v", rule.Options)
    stmt, err := xsql.GetStatementFromSql(sql)
    if err != nil {return nil, err}

    ......
    // Create logical plan and optimize. Logical plans are a linked list
    lp, err := createLogicalPlan(stmt, rule.Options, store)
    if err != nil {return nil, err}
    tp, err := createTopo(rule, lp, sources, sinks, streamsFromStmt)
    if err != nil {return nil, err}
    return tp, nil
}

解析 SQL 文本

通过以下函数,咱们将一个 SQL 文本解析为了 AST 对象

func GetStatementFromSql(sql string) (*ast.SelectStatement, error) {

本文中咱们临时先不波及 SQL 解析器中的具体实现细节,相干内容将在之后的 eKuiper 源码浏览中进行解说。感兴趣的敌人能够通过以下函数作为入口进行理解:

func (p *Parser) Parse() (*ast.SelectStatement, error) {

值得一提的是,在 SQL Parser 的具体实现上,也有一些其余 well-known 的数据库实现应用了 yacc 的计划来间接生成 SQL Parser。eKuiper 之所以抉择本人实现 SQL Parser,一个十分重要的起因是对于一个运行在边缘端的利用而言,binary size 是一个十分重要的指标。本人实现 SQL Parser 而非应用 yacc 这类的 Parser Generator 的技术,有助于管制和升高 eKuiper 编译后整体的 binary size 的大小。

结构与优化逻辑打算

当 SQL 文本还解析为 AST 对象后,咱们须要将该 AST 对象转换为一个能够用来形容该 SQL 该当被计算引擎如何执行的逻辑打算。这一步骤被封装在了以下代码函数入口中:

func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store kv.KeyValue) (LogicalPlan, error) {

在 createLogicalPlan 函数中,它接管一个 AST 树对象,并返还一个逻辑打算树对象,在整个函数过程中,它总共做了以下 3 件事件:

  1. 抽取 SQL 中的各类信息,并将其与理论的表达式或者是 schema 信息进行绑定。
  2. 依据 AST 对象结构最后的逻辑打算。
  3. 依据最后的逻辑打算进行逻辑优化。

在一条 SQL 中,它所带的信息里蕴含了一些本来注册计算引擎中的信息,比方流、表的定义,也蕴含了一些长期申明的信息,比方列或者表达式的 alias name。在以下代码函数入口中,eKuiper 会从 AST 树对象中抽取出以下信息,并进行响应的绑定:

func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]streamInfo, []*ast.Call, error) {
  1. 从 AST 树对象中抽取出流与表的 AST 对象,并从 eKuiper 的存储中取出事后设置好的流、表的定义,并将这些 schema 信息绑定到 ast 对象中。
  2. 从 AST 对象中将查问中的 filed 与各个流、表进行绑定

当咱们解决好 AST 树对象中的各个节点的信息绑定后,咱们就能够依据 AST 树对象来结构一个最后的逻辑打算。以下代码显示了在 eKuiper 中是如何依据自底向上的构建逻辑打算。从最底层的 DataSource 算子,一路向上 build 逻辑算子,直至整个逻辑算子树结构结束。

func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store kv.KeyValue) (LogicalPlan, error) {
// 1. build Datasource
// 2. build Window
// 3. Buld JoinAlign / Join
// 4. Build Filter
// 5. Build Agg
// 6. Build Having
// 7. Build Sort
// 8. Build Proj
}

当咱们取得了最原始的逻辑打算树当前,咱们须要对逻辑打算进行逻辑优化。逻辑优化阶段会对本来的打算进行优化。逻辑优化阶段,简略来说就是对一个逻辑算子树进行等价的变换,这个变换并不会影响最终的计算结果,然而能够让计算过程缩小更多不必要的计算量。

举一个简略的例子,对于 select * from t1 join t2 on [t1.](http://t1.id)a = t2.a where t1.b > 10 这条 SQL 来说,其本来的逻辑打算如下:

而后在逻辑优化阶段,咱们能够将 Filter 算子进行下推至 Join 算子之下,从而让参加 Join 算子的数据量被提前过滤一部分,来缩小整个计算过程中所波及到的计算量。

以下代码展现了 eKuiper 中是如何进行逻辑优化的:

var optRuleList = []logicalOptRule{&columnPruner{},
    &predicatePushDown{},}

func optimize(p LogicalPlan) (LogicalPlan, error) {
    var err error
    for _, rule := range optRuleList {p, err = rule.optimize(p)
        if err != nil {return nil, err}
    }
    return p, err
}

在随后的系列当中咱们会比拟具体地介绍目前 eKuiper 中的逻辑优化环节中的代码细节。

当咱们的逻辑打算优化结束当前,咱们须要依据逻辑打算来结构具体的执行算子。在 eKuiper 中,咱们通过 Topo 构造来保护整个执行算子的上下文环境。

以下代码展现了构建执行算子的函数入口:

func createTopo(rule *api.Rule, lp LogicalPlan, sources []*node.SourceNode, sinks []*node.SinkNode, streamsFromStmt []string) (*topo.Topo, error) {
type Topo struct {
   ......
   sources            []node.DataSourceNode
   sinks              []*node.SinkNode
   ops                []node.OperatorNode
   ......
}

Topo 作为执行算子 Context,会将逻辑打算中的 DataSource 算子放在 sources 中,将其余算子放在 ops 中,而最终的 SQL 后果会汇总到 sinks 中。在这里咱们重点关注算子是如何结构的:

以下代码展现了 eKuiper 中是如何依据逻辑算子结构执行算子的:

func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []*node.SourceNode, streamsFromStmt []string, index int) (api.Emitter, int, error) {var inputs []api.Emitter
    newIndex := index
    for _, c := range lp.Children() {input, ni, err := buildOps(c, tp, options, sources, streamsFromStmt, newIndex)
       .......
    }
    ......
    switch t := lp.(type) {
    case *DataSourcePlan:
       isSchemaless := t.isSchemaless
       switch t.streamStmt.StreamType {
       case ast.TypeStream:
          ......
          op = srcNode
    ......
    case *ProjectPlan:
       op = Transform(&operator.ProjectOp{ColNames: t.colNames, AliasNames: t.aliasNames, AliasFields: t.aliasFields, ExprFields: t.exprFields, IsAggregate: t.isAggregate, AllWildcard: t.allWildcard, WildcardEmitters: t.wildcardEmitters, ExprNames: t.exprNames, SendMeta: t.sendMeta}, fmt.Sprintf("%d_project", newIndex), options)
    default:
       return nil, 0, fmt.Errorf("unknown logical plan %v", t)
    }
    ......
    if onode, ok := op.(node.OperatorNode); ok {tp.AddOperator(inputs, onode)
    }
    return op, newIndex, nil
}

在结构算子的过程中,咱们次要关注 2 个问题:

  1. buildOps 是如何遍历整个逻辑算子树,将每个逻辑算子转换为执行算子
  2. buildOps 是如何串联起整个执行算子的树形构造,将上层算子的 Ouput 后果传递给下层算子的 Input 起源。

在 buildOps 过程中,通过递归的形式,以自底向上的形式遍历整个逻辑算子树来结构执行算子。当下层算子结构结束当前,咱们在以下代码中会将上层算子的 Ouput 作为后果参数传递给下层算子的结构过程中,将上层算子的 Output 和下层算子的 Input 连接起来

if onode, ok := op.(node.OperatorNode); ok {tp.AddOperator(inputs, onode)
}

当执行算子树被创立结束当前,咱们会将顶层算子的 Output 和这条 SQL 的 sink 连接起来,从而使得 eKuiper 会将 SQL 计算的后果写入到上游的 sink 中。

func createTopo(rule *api.Rule, lp LogicalPlan, sources []*node.SourceNode, sinks []*node.SinkNode, streamsFromStmt []string) (*topo.Topo, error) {
    ......
    input, _, err := buildOps(lp, tp, rule.Options, sources, streamsFromStmt, 0)
    if err != nil {return nil, err}
    inputs := []api.Emitter{input}
    ......
    for _, sink := range sinks {tp.AddSink(inputs, sink)
    }
    ......
    return tp, nil
}

启动执行算子树

当执行算子树被结构结束后,咱们就须要启动执行算子树来真正执行这条 SQL,在以下的代码中展现了 eKuiper 启动执行算子的代码入口:

func (s *Topo) Open() <-chan error {
    ......
    for _, snk := range s.sinks {snk.Open(s.ctx.WithMeta(s.name, snk.GetName(), s.store), s.drain)
    }

    //apply operators, if err bail
    for _, op := range s.ops {op.Exec(s.ctx.WithMeta(s.name, op.GetName(), s.store), s.drain)
    }

    // open source, if err bail
    for _, source := range s.sources {source.Open(s.ctx.WithMeta(s.name, source.GetName(), s.store), s.drain)
    }
    .......
}

咱们会以 sink / 执行算子 / source 的程序,开始启动每个环节的算子。在这里,咱们以单个算子运行为例,来理解执行算子的运行过程中的大抵逻辑。

在以下的代码中展现了,对于单个算子而言,是如何读取上层算子的数据,进行计算,而后交付给下层算子进行解决。

func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
    ......   
    for {
       select {
       // process incoming item
       case item := <-o.input:
          ......
          result := o.op.Apply(exeCtx, item, fv, afv)
          switch val := result.(type) {
          default:
             .......
             o.Broadcast(val)
          }
       // is cancelling
       case <-ctx.Done():
          return
       }
    }
}

每个执行算子会从本人的 input channel 中取出上层算子交付的数据,对于 UnaryOperator 而言,会通过 Apply 行为来将数据进行计算,将计算后的后果通过 Broadcast 转交给下层算子进行解决。

总结

在本篇文章中,咱们以梳理要害代码节点的形式理解了 eKuiper 的 SQL 计算引擎中是如何解析、解决,并最终执行这条 SQL 失去相应的后果。对于整个计算引擎要害解决节点里,咱们理解了每个环节的代码大抵是如何运行的。

在后续的分享中,咱们将以具体 SQL 为例,深刻到各个环节、算子的外部执行的代码逻辑,从而让大家更好地了解 eKuiper 是如何在边缘端承受数据、解决计算并最终写入上游的整体流程。敬请期待。

版权申明:本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.com/zh/blog/ekuiper-source-code-interpretation

退出移动版