本文由社区用户 Milittle 供稿

LOOKUP 是图数据库 NebulaGraph 的一个查问语句。它依赖索引,能够查问点或者边的信息。在本文,我将着重从源码的角度解析一下 LOOKUP 语句的毕生是如何度过的。

本文源码浏览基于内核源码的 v3.3.0 版本,详见 GitHub https://github.com/vesoft-inc/nebula/releases/tag/v3.3.0

读源码之前

首先,咱们须要明确 NebulaGraph 中 LOOKUP 语句的语法:

LOOKUP ON {<vertex_tag> | <edge_type>}[WHERE <expression> [AND <expression> ...]]YIELD <return_list> [AS <alias>][<clause>];<return_list>    <prop_name> [AS <col_alias>] [, <prop_name> [AS <prop_alias>] ...];
  • <vertex_tag> 是 Tag 的类型,比方:数据集 basketballplayer 中的 player 和 team;
  • <edge_type> 是 EdgeType 的类型,比方:数据集 basketballplayer 中的 follow 和 serve;
  • <expression> 是表达式;
  • <return_list> 是返回的列表,比方:id(vertex),这部分内容具体参见 nGQL 的 Schema 函数 nGQL Schema 函数详解;
  • <clause> 是子句,能够是 ORDER BYLIMIT 等子句,子句详情参见 子句;

这里有个 LOOKUP 应用注意事项:

  1. 如果曾经存在点、边,然而没有索引。必须在新建索引后再通过 REBUILD INDEX 重建索引,能力使其失效;

读语句解析原理

为了便于大家了解这里放一张 NebulaGraph 计算层的服务架构:

咱们再来看下此次浏览的语句,是一个比较简单的 LOOKUP Sentence。用比较简单的语句来解析 LOOKUP 语句的基本原理,前面能够缓缓扩大条件语句和子句:

// 咱们须要剖析以下语句LOOKUP ON player YIELD id(vertex);

1. 从 Parser 开始

咱们先从 Parser 动手剖析 LOOKUP Sentence 的组成部分。这里不介绍 lex 词法剖析和 yacc 语法分析,感兴趣的小伙伴本人能够理解一下。上面,咱们间接上咱们关怀的局部:

咱们关上源码,找到文件 src/parser/parser.yy 文件,外面有所有语句的定义。咱们 定位到 LOOKUP Sentence,是这里 https://github.com/Milittle/nebula/blob/90a3107044ce1621c7834a0f36a4eef273ec2f31/src/parser/parser.yy#L2176。上面便是 LOOKUP 语句的定义,你也能够拷贝下面的链接拜访 GitHub 查看。来,咱们剖析剖析每个局部:

/// LOOKUP 语句的语法定义lookup_sentence    : KW_LOOKUP KW_ON name_label lookup_where_clause yield_clause {        $$ = new LookupSentence($3, $4, $5);    }    ;// KW_LOOKUP 是 LOOKUP 的关键字,大小写不敏感的// KW_ON 是 ON 的关键字,大小写不敏感的// name_label 是 LABEL 的定义,也是 strval,简略的说就是字符串// lookup_where_clause 是 WHERE 子句的定义,这个咱们前面有机会扩大介绍,也有一个对应的语义定义// yield_clause 这个是 YIELD 输入数据的要害语句,在 v3.x 版本当前,YIELD 子句是必须要指定的,不指定会报语法错误/// YIELD clause 的语法定义,其实 YIELD clause 用在了很多其余语句中,比方 GO、FIND PATH、GET SUBGRAPHyield_clause    : %empty { $$ = nullptr; }    | KW_YIELD yield_columns {        if ($2->hasAgg()) {            delete($2);            throw nebula::GraphParser::syntax_error(@2, "Invalid use of aggregating function in yield clause.");        }        $$ = new YieldClause($2);    }    | KW_YIELD KW_DISTINCT yield_columns {        if ($3->hasAgg()) {            delete($3);            throw nebula::GraphParser::syntax_error(@3, "Invalid use of aggregating function in yield clause.");        }        $$ = new YieldClause($3, true);    }    ;// 能够为 empty,然而前面 validator 会进行校验,不指定就会报 Error// KW_YIELD 是 YIELD 的关键字,大小写不敏感// yield_columns 是输入的列信息,也有对应的一个语法定义// KW_DISTINCT 是 distinct 关键字,示意是否去除反复数据的语义,大小写不敏感// LOOKUP Sentence 就是下面所有的信息组成,都会被结构在这个类外面,也就是 LOOKUP 语句的内容了

上面,咱们持续从 lookup_sentence 语句的定义往下规约看,能够看到它属于 src/parser/parser.yy:2917: traverse_sentence → src/parser/parser.yy:2936: piped_sentence → src/parser/parser.yy:2942: set_sentence → src/parser/parser.yy:3924: sentence → src/parser/parser.yy:3933: seq_sentence

其实,下面这些你能够临时疏忽,因为这些都是对 sentence 的规约形象,有些汇合语句和管道语句。这里,我想表白的是这些语句肯定会映射到 seq_sentence 上的,即,序列语句。你能够把它了解为用分号分隔的复合语句,只不过这外面只蕴含了一条 lookup_sentence 而已。这样子,就好了解为什么下文在 seq_sentence 寻找入口代码,而不是 lookup_sentence.

2. 从 nGQL 解析看 LOOKUP 语句

第二,从 nGQL 的解析过程持续看 LOOKUP Sentence。其实,方才曾经强调过了,这里解析进去的对象肯定是 seq_sentence

/// src/graph/service/QueryInstance.cppvoid QueryInstance::execute() {  Status status = validateAndOptimize(); // 1. 负责 validate、执行打算生成、执行打算优化等工作  if (!status.ok()) {    onError(std::move(status));    return;  }  // Sentence is explain query, finish  if (!explainOrContinue()) {  // 6. 判断是否是 explain 语句。如果是,间接输入执行打算,不做理论物理算子执行    onFinish();    return;  }  // The execution engine converts the physical execution plan generated by the Planner into a  // series of Executors through the Scheduler to drive the execution of the Executors.  scheduler_->schedule()    // 7. 理论物理算子调度执行的局部,通过 DAG,对每一个 plan -> executor 的转换执行(后续步骤会进行详解)      .thenValue([this](Status s) {        if (s.ok()) {          this->onFinish(); // 8. 这里是干完了所有物理执行打算,而后开始解决客户端 resp 了        } else {          this->onError(std::move(s)); // 9. 这里是下面的过程出错了,须要解决 Error 信息        }      }) // 10. 上面是解决一些异常情况,也是走谬误分支      .thenError(folly::tag_t<ExecutionError>{},                 [this](const ExecutionError &e) { onError(e.status()); })      .thenError(folly::tag_t<std::exception>{},                 [this](const std::exception &e) { onError(Status::Error("%s", e.what())); });}// 这个函数执行的是正文 1 的内容Status QueryInstance::validateAndOptimize() {  auto *rctx = qctx()->rctx();  auto &spaceName = rctx->session()->space().name;  VLOG(1) << "Parsing query: " << rctx->query();  // Result of parsing, get the parsing tree  // 2. 第一步中的语法解析就是这里的解释,对 nGQL 进行词法语法解析,进去的 result 就是 Sentence*,通过咱们下面的剖析,这里吐出来的就是 seq_sentence 了  auto result = GQLParser(qctx()).parse(rctx->query());  NG_RETURN_IF_ERROR(result);  sentence_ = std::move(result).value();  // 3. 这里是做指标的统计。这个能够在 dashboard 外面展现  if (sentence_->kind() == Sentence::Kind::kSequential) {    size_t num = static_cast<const SequentialSentences *>(sentence_.get())->numSentences();    stats::StatsManager::addValue(kNumSentences, num);    if (FLAGS_enable_space_level_metrics && spaceName != "") {      stats::StatsManager::addValue(          stats::StatsManager::counterWithLabels(kNumSentences, {{"space", spaceName}}), num);    }  } else {    stats::StatsManager::addValue(kNumSentences);    if (FLAGS_enable_space_level_metrics && spaceName != "") {      stats::StatsManager::addValue(          stats::StatsManager::counterWithLabels(kNumSentences, {{"space", spaceName}}));    }  }  // Validate the query, if failed, return  // 4. 这个是源码校验 nGQL 解析进去的内容是否合乎咱们的预期,如果不合乎预期就报语法错误  // validate 过程还会波及到执行打算的生成,重点函数  NG_RETURN_IF_ERROR(Validator::validate(sentence_.get(), qctx()));  // Optimize the query, and get the execution plan  // 5. 对下面生成的执行打算进行 RBO 规定的优化,这个留在前面有机会再介绍  NG_RETURN_IF_ERROR(findBestPlan());  stats::StatsManager::addValue(kOptimizerLatencyUs, *(qctx_->plan()->optimizeTimeInUs()));  if (FLAGS_enable_space_level_metrics && spaceName != "") {    stats::StatsManager::addValue(        stats::StatsManager::histoWithLabels(kOptimizerLatencyUs, {{"space", spaceName}}));  }  return Status::OK();}

咱们依照下面的正文局部进行解说,有的比拟容易的局部,像正文 1、2、3、5。咱们上面重点介绍正文 4 的局部

// src/graph/validator/Validator.cpp// Entry of validating sentence.// Check session, switch space of validator context, create validators and validate.// static// 1. 参数 sentence 就是方才咱们从语法解析器中拿到的 seq_sentence// 2. 参数 qctx 是咱们查问上下文,一个语句进来对应一个查问上下文,这个是在 QueryEngine 外面生成的,感兴趣能够自行浏览一下Status Validator::validate(Sentence* sentence, QueryContext* qctx) {  DCHECK(sentence != nullptr);  DCHECK(qctx != nullptr);  // Check if space chosen from session. if chosen, add it to context.  auto session = qctx->rctx()->session();  if (session->space().id > kInvalidSpaceID) {    auto spaceInfo = session->space();    qctx->vctx()->switchToSpace(std::move(spaceInfo));  }  // 3. 既然咱们须要校验该 sentence 是否合乎咱们的预期,则须要依据 sentence 的类型,创立一个 validator,记住目前是 seq_sentence  // 所以生成的就是 SequentialValidator,能够间接看下 makeValidator 函数的 switch case  auto validator = makeValidator(sentence, qctx);  // 4. 调用 validator 进行校验,咱们切换到上面的函数中  NG_RETURN_IF_ERROR(validator->validate());  auto root = validator->root();  if (!root) {    return Status::SemanticError("Get null plan from sequential validator");  }  qctx->plan()->setRoot(root);  return Status::OK();}// 5. 所有子类 validator,调用 validate 办法,进行校验// Validate current sentence.// Check validator context, space, validate, duplicate reference columns,// check permission according to sentence kind and privilege of user.Status Validator::validate() {  if (!vctx_) {    VLOG(1) << "Validate context was not given.";    return Status::SemanticError("Validate context was not given.");  }  if (!sentence_) {    VLOG(1) << "Sentence was not given";    return Status::SemanticError("Sentence was not given");  }  if (!noSpaceRequired_ && !spaceChosen()) {    VLOG(1) << "Space was not chosen.";    return Status::SemanticError("Space was not chosen.");  }  if (!noSpaceRequired_) {    space_ = vctx_->whichSpace();    VLOG(1) << "Space chosen, name: " << space_.spaceDesc.space_name_ref().value()            << " id: " << space_.id;  }  auto vidType = space_.spaceDesc.vid_type_ref().value().type_ref().value();  vidType_ = SchemaUtil::propTypeToValueType(vidType);  // 6. 调用子类 validateImpl  NG_RETURN_IF_ERROR(validateImpl());  // Check for duplicate reference column names in pipe or var statement  NG_RETURN_IF_ERROR(checkDuplicateColName());  // Execute after validateImpl because need field from it  if (FLAGS_enable_authorize) {    NG_RETURN_IF_ERROR(checkPermission());  }  // 7. 这里是生成执行打算调用  NG_RETURN_IF_ERROR(toPlan());  return Status::OK();}

讲了这么久了,啥时候到 LOOKUP。只能说快了,因为第一次讲源码,一些上下文信息须要讲清楚,不然大家一看就看得云里雾里了。

3. 深刻到 validator

上面,咱们要进入 SequentialValidator.cppvalidateImpl() 去一探到底。

// src/graph/validator/SequentialValidator.cpp// Validator of sequential sentences which combine multiple sentences, e.g. GO ...; GO ...;// Call validator of sub-sentences.Status SequentialValidator::validateImpl() {  Status status;  if (sentence_->kind() != Sentence::Kind::kSequential) {    return Status::SemanticError(        "Sequential validator validates a SequentialSentences, but %ld is "        "given.",        static_cast<int64_t>(sentence_->kind()));  }  auto seqSentence = static_cast<SequentialSentences*>(sentence_);  auto sentences = seqSentence->sentences();  if (sentences.size() > static_cast<size_t>(FLAGS_max_allowed_statements)) {    return Status::SemanticError("The maximum number of statements allowed has been exceeded");  }  DCHECK(!sentences.empty());  // 咱们的 StartNode 就是这里创立进去的  seqAstCtx_->startNode = StartNode::make(seqAstCtx_->qctx);  // 个别序列语句中会放很多语句,也就是分号分隔的语句,这里咱们只有一条语句就是 lookup_sentence  // LOOKUP 语句创立进去 LookupValidator,终于看到曙光了  for (auto* sentence : sentences) {    auto validator = makeValidator(sentence, qctx_);    NG_RETURN_IF_ERROR(validator->validate());    seqAstCtx_->validators.emplace_back(std::move(validator));  }  return Status::OK();}

4. 读一读 LookupValidator

终于,看到点 LOOKUP 的影子了,LookupValidator 驾到:

// src/graph/validator/LookupValidator.cpp// LOOKUP 的 validateImpl 比拟简洁,间接对 From Where Yield e别离进行校验Status LookupValidator::validateImpl() {  lookupCtx_ = getContext<LookupContext>();  // 详情请见上面的子函数剖析  NG_RETURN_IF_ERROR(validateFrom());  // 此次不波及,咱们先不做剖析  NG_RETURN_IF_ERROR(validateWhere());  // 详情请见上面的子函数剖析  NG_RETURN_IF_ERROR(validateYield());  return Status::OK();}// Validate specified schema(tag or edge) from sentenceStatus LookupValidator::validateFrom() {  auto spaceId = lookupCtx_->space.id;  auto from = sentence()->from();  // 依据 spaceId 和指定的 label_name 查问 Schema  auto ret = qctx_->schemaMng()->getSchemaIDByName(spaceId, from);  NG_RETURN_IF_ERROR(ret);  // 指定的是不是边类型  lookupCtx_->isEdge = ret.value().first;  // 指定的 schemaId  lookupCtx_->schemaId = ret.value().second;  schemaIds_.emplace_back(ret.value().second);  return Status::OK();}// Validate yield clause.Status LookupValidator::validateYield() {  auto yieldClause = sentence()->yieldClause();  if (yieldClause == nullptr) {    return Status::SemanticError("Missing yield clause.");  }  // 这个是判断是否指定了 distinct 关键字,用于后续生成 dedup  lookupCtx_->dedup = yieldClause->isDistinct();  lookupCtx_->yieldExpr = qctx_->objPool()->makeAndAdd<YieldColumns>();  // 如果是边类型,返回的列中,有 src、dst、rank、type  if (lookupCtx_->isEdge) {    idxReturnCols_.emplace_back(nebula::kSrc);    idxReturnCols_.emplace_back(nebula::kDst);    idxReturnCols_.emplace_back(nebula::kRank);    idxReturnCols_.emplace_back(nebula::kType);    // 校验边类型    NG_RETURN_IF_ERROR(validateYieldEdge());  } else { // 如果点类型、返回的列中有 vid    idxReturnCols_.emplace_back(nebula::kVid);    // 校验点类型,这次咱们介绍点类型的校验    NG_RETURN_IF_ERROR(validateYieldTag());  }  if (exprProps_.hasInputVarProperty()) {    return Status::SemanticError("unsupport input/variable property expression in yield.");  }  if (exprProps_.hasSrcDstTagProperty()) {    return Status::SemanticError("unsupport src/dst property expression in yield.");  }  extractExprProps();  return Status::OK();}// Validate yield clause when lookup on tag.// Disable invalid expressions, check schema name, rewrites expression to fit semantic,// check type and collect properties.Status LookupValidator::validateYieldTag() {  auto yield = sentence()->yieldClause();  auto yieldExpr = lookupCtx_->yieldExpr;  // yield 子句外面的每一个逗号分隔的就是一个 col、咱们的示例语句是 id(vertex)  // src/parser/parser.yy:1559 对 col 进行了定义  for (auto col : yield->columns()) {    // 如果发现表达式有 Edge 类型的,则间接把语义谬误    if (ExpressionUtils::hasAny(col->expr(), {Expression::Kind::kEdge})) {      return Status::SemanticError("illegal yield clauses `%s'", col->toString().c_str());    }    // 如果是 label 属性,则进行表达式名字的校验,比方 yield player.name 这种语句    if (col->expr()->kind() == Expression::Kind::kLabelAttribute) {      const auto& schemaName = static_cast<LabelAttributeExpression*>(col->expr())->left()->name();      if (schemaName != sentence()->from()) {        return Status::SemanticError("Schema name error: %s", schemaName.c_str());      }    }    // 这块应该是重写表达式,有 label 属性转换为 Tag 的 prop,这里不是特地分明,后续精读一下    col->setExpr(ExpressionUtils::rewriteLabelAttr2TagProp(col->expr()));    NG_RETURN_IF_ERROR(ValidateUtil::invalidLabelIdentifiers(col->expr()));    auto colExpr = col->expr();    // 揣测表达式的类型    auto typeStatus = deduceExprType(colExpr);    NG_RETURN_IF_ERROR(typeStatus);    // 组织输入,由名字和类型组成的汇合对象    outputs_.emplace_back(col->name(), typeStatus.value());    yieldExpr->addColumn(col->clone().release());    NG_RETURN_IF_ERROR(deduceProps(colExpr, exprProps_, &schemaIds_));  }  return Status::OK();}

到这里,LOOKUP 的 validator 工作差不多完事了。

5. 语句如何变成执行打算

介绍得不够粗疏,我还在相熟过程,接下来就是介绍将 sentence 转换成执行打算的过程了。

执行打算生成

执行打算的生成,像是一些简略的语句,就通过子类的 validatortoPlan 间接生成了,比方:SHOW HOSTS 这个语句,就是间接在 ShowHostsValidator::toPlan 办法中间接生成执行打算。然而,对于一些比较复杂的语句来说,子类 validator 都没有实现 toPlan 办法,也就是须要借助父类的 toPlan 办法来生成执行打算。比方,本文在读的 LOOKUP 语句也属于简单语句:

// src/graph/validator/Validator.cpp// 这里就是简单语句生成执行打算的入口// 须要配合 AstContext 来生成,对于 LOOKUP 语句来说,就是 LookupContext// Call planner to get final execution plan.Status Validator::toPlan() {  // **去子类 LookupValidator 的 getAstContext() 办法看下,是不是返回的是 LookupContext**  auto* astCtx = getAstContext();  if (astCtx != nullptr) {    astCtx->space = space_;  }  // 利用形象语法树上下文,借用 Planner 的 toPlan 生成具体的执行打算  auto subPlanStatus = Planner::toPlan(astCtx);  NG_RETURN_IF_ERROR(subPlanStatus);  auto subPlan = std::move(subPlanStatus).value();  // 将返回的 subPlan 对 root 和 tail 进行填充  root_ = subPlan.root;  tail_ = subPlan.tail;  VLOG(1) << "root: " << root_->kind() << " tail: " << tail_->kind();  return Status::OK();}

6. 进入 toPlan() 一探到底

从章节 5. 下面获知,须要进入 Planner 的 toPlan 办法一探到底

// src/graph/planner/Planner.cppStatusOr<SubPlan> Planner::toPlan(AstContext* astCtx) {  if (astCtx == nullptr) {    return Status::Error("AstContext nullptr.");  }  const auto* sentence = astCtx->sentence;  DCHECK(sentence != nullptr);  // 从形象语法树的执行上下文取到咱们的 sentence  // 上面的 plannerMap 是咱们在 src/graph/planner/PlannersRegister.cpp 注册好的,一些简单的语句都在这里注册好了  auto planners = plannersMap().find(sentence->kind());  if (planners == plannersMap().end()) {    return Status::Error("No planners for sentence: %s", sentence->toString().c_str());  }  for (auto& planner : planners->second) { // second 是语句具体对应的 planner 的实例化对象: MatchAndInstantiate    if (planner.match(astCtx)) { // match 办法是具体 planner 的 match 办法,对应到 LookupPlaner,就是 match      // 这里的 instantiate 是 LookupPlanner 的 make 办法      // 这里的 transform 是拿着 lookupcontext 生成执行打算的函数      return planner.instantiate()->transform(astCtx);    }  }  return Status::Error("No planner matches sentence: %s", sentence->toString().c_str());}

7. 打算中的 transform()

咱们剖析到这里,应用了 Planner 的 toPlan 办法生成一些简单语句的执行打算。接下来,就是进去 LookupPlanner 的 transform 办法从 LookupContext 转换到执行打算的过程了。咱们间接定位到 LookupPlanner 的 transform 办法上:

// src/graph/planner/ngql/LookupPlanner.cppStatusOr<SubPlan> LookupPlanner::transform(AstContext* astCtx) {  // 是不是咱们下面提到的 lookupContext  auto lookupCtx = static_cast<LookupContext*>(astCtx);  auto qctx = lookupCtx->qctx;  // ON 前面的 name_label  auto from = static_cast<const LookupSentence*>(lookupCtx->sentence)->from();  SubPlan plan;    // 如果是边的话,生成的是 EdgeIndexFullScan  if (lookupCtx->isEdge) {    auto* edgeIndexFullScan = EdgeIndexFullScan::make(qctx,                                                      nullptr,                                                      from,                                                      lookupCtx->space.id,                                                      {},                                                      lookupCtx->idxReturnCols,                                                      lookupCtx->schemaId,                                                      lookupCtx->isEmptyResultSet);    edgeIndexFullScan->setYieldColumns(lookupCtx->yieldExpr);    plan.tail = edgeIndexFullScan;    plan.root = edgeIndexFullScan;  } else { // 如果是点的话,生成的是 TagIndexFullScan    auto* tagIndexFullScan = TagIndexFullScan::make(qctx,                                                    nullptr,                                                    from,                                                    lookupCtx->space.id,                                                    {},                                                    lookupCtx->idxReturnCols,                                                    lookupCtx->schemaId,                                                    lookupCtx->isEmptyResultSet);    tagIndexFullScan->setYieldColumns(lookupCtx->yieldExpr);    plan.tail = tagIndexFullScan;    plan.root = tagIndexFullScan;  }  plan.tail->setColNames(lookupCtx->idxColNames);  // 咱们没有指定 where 语句,所以不会有 filter 算子  if (lookupCtx->filter) {    plan.root = Filter::make(qctx, plan.root, lookupCtx->filter);  }  // 会有 Project 算子生成:对输入列做一个映射  plan.root = Project::make(qctx, plan.root, lookupCtx->yieldExpr);  // 这里是 distinct 关键字,咱们没有指定,默认是没有这个算子的  if (lookupCtx->dedup) {    plan.root = Dedup::make(qctx, plan.root);  }  return plan;}

8. explain 验证生成的执行打算

通过咱们上述的介绍,执行打算曾经生成了。那么,咱们是不是能够通过 explain 或者 profile 来验证咱们剖析生成的执行打算就是 Project→TagIndexFullScan→Start 呢。上面是咱们通过 explain 生成的执行打算,它验证了咱们剖析的源码和生成的执行打算是统一的。 大喜

(root@nebula) [basketballplayer]> explain lookup on player yield id(vertex)Execution succeeded (time spent 615µs/1.057064ms)Execution Plan (optimize time 42 us)-----+------------------+--------------+----------------+-----------------------------------| id | name             | dependencies | profiling data | operator info                    |-----+------------------+--------------+----------------+-----------------------------------|  2 | Project          | 3            |                | outputVar: {                     ||    |                  |              |                |   "colNames": [                  ||    |                  |              |                |     "id(VERTEX)"                 ||    |                  |              |                |   ],                             ||    |                  |              |                |   "type": "DATASET",             ||    |                  |              |                |   "name": "__Project_2"          ||    |                  |              |                | }                                ||    |                  |              |                | inputVar: __TagIndexFullScan_1   ||    |                  |              |                | columns: [                       ||    |                  |              |                |   "id(VERTEX)"                   ||    |                  |              |                | ]                                |-----+------------------+--------------+----------------+-----------------------------------|  3 | TagIndexFullScan | 0            |                | outputVar: {                     ||    |                  |              |                |   "colNames": [                  ||    |                  |              |                |     "_vid",                      ||    |                  |              |                |     "player._tag",               ||    |                  |              |                |     "player.age",                ||    |                  |              |                |     "player.name"                ||    |                  |              |                |   ],                             ||    |                  |              |                |   "type": "DATASET",             ||    |                  |              |                |   "name": "__TagIndexFullScan_1" ||    |                  |              |                | }                                ||    |                  |              |                | inputVar:                        ||    |                  |              |                | space: 6                         ||    |                  |              |                | dedup: false                     ||    |                  |              |                | limit: 9223372036854775807       ||    |                  |              |                | filter:                          ||    |                  |              |                | orderBy: []                      ||    |                  |              |                | schemaId: 7                      ||    |                  |              |                | isEdge: false                    ||    |                  |              |                | returnCols: [                    ||    |                  |              |                |   "_vid",                        ||    |                  |              |                |   "_tag",                        ||    |                  |              |                |   "age",                         ||    |                  |              |                |   "name"                         ||    |                  |              |                | ]                                ||    |                  |              |                | indexCtx: [                      ||    |                  |              |                |   {                              ||    |                  |              |                |     "columnHints": [],           ||    |                  |              |                |     "filter": "",                ||    |                  |              |                |     "index_id": 11               ||    |                  |              |                |   }                              ||    |                  |              |                | ]                                |-----+------------------+--------------+----------------+-----------------------------------|  0 | Start            |              |                | outputVar: {                     ||    |                  |              |                |   "colNames": [],                ||    |                  |              |                |   "type": "DATASET",             ||    |                  |              |                |   "name": "__Start_0"            ||    |                  |              |                | }                                |-----+------------------+--------------+----------------+-----------------------------------

阶段小结

源码浏览到这里,咱们晓得 Graph 层从一个 nGQL 语句,到生成执行打算的所有过程。当中可能有一些细节没有八面玲珑,然而,咱们应该整体对代码有了初步理解。

9. 调度执行打算

接下来,咱们要理解执行打算是如何被物理执行、Executor 是如何调度执行打算的。目前,咱们只波及到三个物理算子的执行,而且 Start 节点是一个没有理论语义的算子。这里咱们仔细分析一下 TagIndexScan 和 Project 算子。

咱们须要先回到第二章节的正文 7 那里了。正文 5 咱们就不讲了,那里是内核语句 RBO 规定对执行打算进行优化的子模块,咱们的简略语句的执行打算不波及这块,留下后续扩大介绍吧。

// src/graph/scheduler/AsyncMsgNotifyBasedScheduler.cpp// 咱们回到了正文 7 那里,对 scheduler_ 的 shcedule 办法解读一下// 而后咱们再看 LOOKUP 语句的两个物理算子在这里是怎么执行的// 目前内核只实现了基于音讯的异步调度器folly::Future<Status> AsyncMsgNotifyBasedScheduler::schedule() {  // 拿到执行打算的 root 节点,在这次的语句中,就是 Project  auto root = qctx_->plan()->root();  // 这块还没有深刻解读过,后续再扩大吧  if (FLAGS_enable_lifetime_optimize) {    // special for root    root->outputVarPtr()->userCount.store(std::numeric_limits<uint64_t>::max(),                                          std::memory_order_relaxed);    analyzeLifetime(root);  }  // 递归将执行打算 convert 到物理执行打算 Executor,也就是 Project->ProjectExecutor, TagindexFullScan->IndexScanExecutor  // 把物理 Executor 的拓扑构造创立进去  //    ProjectExecutor 依赖 IndexScanExecutor IndexScanExecutor 的后继是 ProjectExecutor  //    IndexScanExecutor 依赖 StartExecutor StartExecutor 的后继是 IndexScanExecutor  auto executor = Executor::create(root, qctx_);  // 这里开始 DAG 的物理打算执行  // 调度是基于 folly 的 Promise 和 Future 异步调用开展的  return doSchedule(executor);}folly::Future<Status> AsyncMsgNotifyBasedScheduler::doSchedule(Executor* root) const {  // 这个是依照算子的 id,承诺给别的算子的 promise(你能够了解为谁依赖这个算子,那么就给谁一个 promise)  std::unordered_map<int64_t, std::vector<folly::Promise<Status>>> promiseMap;  // 这个是以后算子,被谁允诺过的 future,是从 promise 那里或者的后果值。也就是说,如果这个算子依赖了某些算子,只有它们的允诺兑现了(promise set value),这里的 future 能力失去解决  std::unordered_map<int64_t, std::vector<folly::Future<Status>>> futureMap;  // 这个 queue 是为了辅助算子生成 promiseMap 和 futureMap 的  std::queue<Executor*> queue;  // 这个 queue2 是为联合方才生成的 promiseMap 和 futureMap 理论进行调度运行的  std::queue<Executor*> queue2;  // 算子节点拜访标记,防止反复遍历  std::unordered_set<Executor*> visited;  auto* runner = qctx_->rctx()->runner();  // 首先把 root 的 promise 进去,这个对于咱们的执行打算中的算子就是 Project  folly::Promise<Status> promiseForRoot;  auto resultFuture = promiseForRoot.getFuture();  promiseMap[root->id()].emplace_back(std::move(promiseForRoot));  queue.push(root);  visited.emplace(root);  // 开始 DAG 拜访图计算节点,生成每一个节点的 promise 和 future  while (!queue.empty()) {    auto* exe = queue.front();    queue.pop();    queue2.push(exe);    std::vector<folly::Future<Status>>& futures = futureMap[exe->id()];    if (exe->node()->kind() == PlanNode::Kind::kArgument) {      auto nodeInputVar = exe->node()->inputVar();      const auto& writtenBy = qctx_->symTable()->getVar(nodeInputVar)->writtenBy;      for (auto& node : writtenBy) {        folly::Promise<Status> p;        futures.emplace_back(p.getFuture());        auto& promises = promiseMap[node->id()];        promises.emplace_back(std::move(p));      }    } else {      for (auto* dep : exe->depends()) {        auto notVisited = visited.emplace(dep).second;        if (notVisited) {          queue.push(dep);        }        folly::Promise<Status> p;        futures.emplace_back(p.getFuture());        auto& promises = promiseMap[dep->id()];        promises.emplace_back(std::move(p));      }    }  }  // 开始调度执行,上面的 scheduleExecutor 这个办法是要害  // 这个办法是纯异步运行的,比方运行 ProjectExecutor,它的依赖是 IndexScanExecutor  // 那么 ProjectExecutor 的 future 就来自于 IndexScanExecutor 的 promise  // ProjectExecutor 须要在 folly::collect 出期待 IndexScanExecutor 的执行完结  // 这样 ProjectExecutor 才能够失去执行的机会  while (!queue2.empty()) {    auto* exe = queue2.front();    queue2.pop();    auto currentFuturesFound = futureMap.find(exe->id());    DCHECK(currentFuturesFound != futureMap.end());    auto currentExeFutures = std::move(currentFuturesFound->second);    auto currentPromisesFound = promiseMap.find(exe->id());    DCHECK(currentPromisesFound != promiseMap.end());    auto currentExePromises = std::move(currentPromisesFound->second);    scheduleExecutor(std::move(currentExeFutures), exe, runner)        .thenTry([this, pros = std::move(currentExePromises)](auto&& t) mutable {          if (t.hasException()) {            notifyError(pros, Status::Error(std::move(t).exception().what()));          } else {            auto v = std::move(t).value();            if (v.ok()) {              notifyOK(pros); // **Promise填充:胜利当前具体填充promise的中央**            } else {              notifyError(pros, v);            }          }        });  }  return resultFuture;}// 你能够把这个函数了解为异步调度器,下面把所有的算子通过这个函数进行了调度// 第一个参数蕴含了该算子所有的 futures,也就是这个算子依赖算子的 promise 须要执行完结,这里的 futures 才能够获取到后果// 第二个参数是该算子的 Executor// 第三个参数是执行器,你能够了解为线程池// 依据不同的算子类型,实现不同的分支运行,咱们下面的语句是走 default 分支// lookup on player yield id(vertex);语句整体的调度过程// ProjectExecutor(P)->IndexScanExecutor(I)->Start(S)执行打算。上面咱们用简写来示意三个算子// 首先 P 算子调度当前,它到了 default 分支,depends 不为空,那么走 runExecutor// P 算子的 future 就来自于 I 算子的 promise,所以须要期待 I 算子的执行完结// I 算子调度到这个函数当前,它到了 default 分支,depends 不为空,那么走 runExecutor// I 算子的 future 就来自于 S 算子的 promise,所以须要期待 S 算子的执行完结// S 算子调度到这个函数当前,它到了 default 分支,depends 为空,那么走 runLeafExecutor// S 算子就开始 execute 的逻辑了,能够去看看 StartExecutor 的 executor 办法,啥也没干,所以之前说 start 算子没啥语义// S 算子完结当前,它的 promise 被填充,其实是下面那个函数的回调填充的,具体看我下面的正文 **Promise 填充**// 那么 I 算子的 future 就失去了响应,去 runExecutor 看看,是不是也是有一个回调,立马发动了 I 算子的调用// 当 I 算子的 promise 也被下面的函数填充// 那么 P 算子的 executor 也失去了执行,这下就算执行完folly::Future<Status> AsyncMsgNotifyBasedScheduler::scheduleExecutor(    std::vector<folly::Future<Status>>&& futures, Executor* exe, folly::Executor* runner) const {  switch (exe->node()->kind()) {    case PlanNode::Kind::kSelect: {      auto select = static_cast<SelectExecutor*>(exe);      return runSelect(std::move(futures), select, runner);    }    case PlanNode::Kind::kLoop: {      auto loop = static_cast<LoopExecutor*>(exe);      return runLoop(std::move(futures), loop, runner);    }    case PlanNode::Kind::kArgument: {      return runExecutor(std::move(futures), exe, runner);    }    default: {      if (exe->depends().empty()) {        return runLeafExecutor(exe, runner);      } else {        return runExecutor(std::move(futures), exe, runner);      }    }  }}

10. LOOKUP 语句的算子在执行什么?

下面我介绍了物理算子通过 folly 三方库的 Promise 和 Future 异步编程模型来实现调度执行。接下来,重点介绍一下咱们本次 LOOKUP 语句中两个算子执行了什么。源码走起:下面的语句次要介绍了三个物理算子:ProjectExecutorIndexScanExecutorStartExecutor。这里多说一句,因为和 IndexScan 无关的算子都会映射到 IndexScanExecutor

// StartExecutor:啥也没干// IndexScanExecutor:是次要干活的,须要 graph 和 storage 的 rpc,拉取数据// ProjectExecutor:这个物理执行算子不须要和 storage 交互,间接在 graph 层闭环计算// 这三个算子,咱们只剖析后两个算子的源码:// src/graph/executor/query/IndexScanExecutor.cppfolly::Future<Status> IndexScanExecutor::execute() {  return indexScan();}folly::Future<Status> IndexScanExecutor::indexScan() {  // 拿到和 storage 交互的 storageClient  StorageClient *storageClient = qctx_->getStorageClient();  auto *lookup = asNode<IndexScan>(node());  if (lookup->isEmptyResultSet()) {    DataSet dataSet({"dummy"});    return finish(ResultBuilder().value(Value(std::move(dataSet))).build());  }  const auto &ictxs = lookup->queryContext();  auto iter = std::find_if(      ictxs.begin(), ictxs.end(), [](auto &ictx) { return !ictx.index_id_ref().is_set(); });  if (ictxs.empty() || iter != ictxs.end()) {    return Status::Error("There is no index to use at runtime");  }  // Req 的公共申请参数  StorageClient::CommonRequestParam param(lookup->space(),                                          qctx()->rctx()->session()->id(),                                          qctx()->plan()->id(),                                          qctx()->plan()->isProfileEnabled());  return storageClient      ->lookupIndex(param,                    ictxs,                    lookup->isEdge(), // 是不是边类型                    lookup->schemaId(), // schemaId                    lookup->returnColumns(), // resp 返回的列数据                    lookup->orderBy(), // 是否带有 orderBy,为了下推 TopN 算子                    lookup->limit(qctx_)) // 是否带有 limit,为了下推 limit 算子      .via(runner())      .thenValue([this](StorageRpcResponse<LookupIndexResp> &&rpcResp) {        addStats(rpcResp, otherStats_);        return handleResp(std::move(rpcResp));      });}// TODO(shylock) merge the handler with GetProptemplate <typename Resp>Status IndexScanExecutor::handleResp(storage::StorageRpcResponse<Resp> &&rpcResp) {  auto completeness = handleCompleteness(rpcResp, FLAGS_accept_partial_success);  if (!completeness.ok()) {    return std::move(completeness).status();  }  auto state = std::move(completeness).value();  nebula::DataSet v;  // 把每一个 resp 拉进去解决,因为咱们 storage 是能够分布式部署的  // 这里有一个问题重点提出一下,后果集会保护在 ectx_ 中,供 ProjectExecutor 一会取  for (auto &resp : rpcResp.responses()) {    if (resp.data_ref().has_value()) {      nebula::DataSet &data = *resp.data_ref();      // TODO: convert the column name to alias.      if (v.colNames.empty()) {        v.colNames = data.colNames;      }      v.rows.insert(v.rows.end(), data.rows.begin(), data.rows.end());    } else {      state = Result::State::kPartialSuccess;    }  }  if (!node()->colNames().empty()) {    DCHECK_EQ(node()->colNames().size(), v.colNames.size());    v.colNames = node()->colNames();  }  return finish(      ResultBuilder().value(std::move(v)).iter(Iterator::Kind::kProp).state(state).build());}// src/graph/executor/query/ProjectExecutor.cppfolly::Future<Status> ProjectExecutor::execute() {  SCOPED_TIMER(&execTime_);  auto *project = asNode<Project>(node());  // 方才说从 storage 获取的后果数据都放在 ectx_ 外面了  auto iter = ectx_->getResult(project->inputVar()).iter();  DCHECK(!!iter);  QueryExpressionContext ctx(ectx_);  // 默认 max_job_size 是 1,咱们先看 if 分支,看 handleJob 到底干了啥  if (FLAGS_max_job_size <= 1) {    auto ds = handleJob(0, iter->size(), iter.get());    return finish(ResultBuilder().value(Value(std::move(ds))).build());  } else {    DataSet ds;    ds.colNames = project->colNames();    ds.rows.reserve(iter->size());    auto scatter = [this](size_t begin, size_t end, Iterator *tmpIter) -> StatusOr<DataSet> {      return handleJob(begin, end, tmpIter);    };    auto gather = [this, result = std::move(ds)](auto &&results) mutable {      for (auto &r : results) {        auto &&rows = std::move(r).value();        result.rows.insert(result.rows.end(),                           std::make_move_iterator(rows.begin()),                           std::make_move_iterator(rows.end()));      }      finish(ResultBuilder().value(Value(std::move(result))).build());      return Status::OK();    };    return runMultiJobs(std::move(scatter), std::move(gather), iter.get());  }}DataSet ProjectExecutor::handleJob(size_t begin, size_t end, Iterator *iter) {  auto *project = asNode<Project>(node());  auto columns = project->columns()->clone();  DataSet ds;  ds.colNames = project->colNames();  QueryExpressionContext ctx(qctx()->ectx());  ds.rows.reserve(end - begin);  // 从头到尾遍历数据,去除关怀的数据  for (; iter->valid() && begin++ < end; iter->next()) {    Row row;    for (auto &col : columns->columns()) {      Value val = col->expr()->eval(ctx(iter)); // 这个是表达式的 eval 执行,对于咱们 id(vertex) 对应的是:src/common/function/FunctionManager.cpp:1832 auto &attr = functions_["id"];      row.values.emplace_back(std::move(val)); // 这个对于 id(vertex) 的 val 来说,就是 vertex.id    ds.rows.emplace_back(std::move(row));  }  return ds;}

11. 数据结果显示

咱们通过物理执行算子,把数据放在最初一个算子的 ProjectExecutor 的 ectx_(ExecutionContext) 外面了。咱们接下来就是要晓得,哪个流程把这个执行上下文的数据取走了:给客户端的 resp 填充这些数据,最终显示到咱们的 nebula-console,或者其余客户端中。Its time to go back to 章节 2. 的正文 8:

// 请看第二步的正文 8:this->onFinish(); // 8. 这里是干完了所有物理执行打算,而后开始解决客户端 resp 了// 咱们进到 onFinish 函数看下:void QueryInstance::onFinish() {  auto rctx = qctx()->rctx();  VLOG(1) << "Finish query: " << rctx->query();  auto &spaceName = rctx->session()->space().name;  rctx->resp().spaceName = std::make_unique<std::string>(spaceName);  // 这个函数做了填充后果数据到 resp 中  fillRespData(&rctx->resp());  auto latency = rctx->duration().elapsedInUSec();  rctx->resp().latencyInUs = latency;  addSlowQueryStats(latency, spaceName);  rctx->finish();  rctx->session()->deleteQuery(qctx_.get());  // The `QueryInstance' is the root node holding all resources during the  // execution. When the whole query process is done, it's safe to release this  // object, as long as no other contexts have chances to access these resources  // later on, e.g. previously launched uncompleted async sub-tasks, EVEN on  // failures.  delete this;} // 把执行的数据从 ectx 中取出,而后填充到执行 resp 中,这次语句执行就完结了// Get result from query context and fill the responsevoid QueryInstance::fillRespData(ExecutionResponse *resp) {  auto ectx = DCHECK_NOTNULL(qctx_->ectx());  auto plan = DCHECK_NOTNULL(qctx_->plan());  const auto &name = plan->root()->outputVar();  if (!ectx->exist(name)) return;  auto &&value = ectx->moveValue(name);  if (!value.isDataSet()) return;  // Fill dataset  auto result = value.moveDataSet();  if (!result.colNames.empty()) {    // 后果填充    resp->data = std::make_unique<DataSet>(std::move(result));  } else {    // 如果有谬误,错误码和错误信息    resp->errorCode = ErrorCode::E_EXECUTION_ERROR;    resp->errorMsg = std::make_unique<std::string>("Internal error: empty column name list");    LOG(ERROR) << "Empty column name list";  }}

小结

目前为止,咱们把 LOOKUP 是怎么在内核中执行的毕生的源码解读就做完了。有很多细节没有开展,后续的文章中咱们将一直开展。其实,对于任意一个语句,根本执行的流程和 LOOKUP 的毕生都相似,其中有不同的中央就是额定的算子不同,算子之间解决的逻辑不同。而且,这次咱们没有关上 Storage 服务的代码,能够作为一个遗留项。

祝大家都能够在 NebulaGraph 图数据库的源码世界外面飞翔,欢送大家和我来进行交换,学习 Wey Gu 的形式,给大家留一个微信联系方式:echo TWlsaXR0bGVUaW1l | base64 -d Call me.


谢谢你读完本文 (///▽///)

要来近距离体验一把图数据库吗?当初能够用用 NebulaGraph Cloud 来搭建本人的图数据系统哟,快来节俭大量的部署安装时间来搞定业务吧~ NebulaGraph 阿里云计算巢现 30 天收费应用中,点击链接来用用图数据库吧~

想看源码的小伙伴能够返回 GitHub 浏览、应用、(^^)-☆ star 它 -> GitHub;和其余的 NebulaGraph 用户一起交换图数据库技术和利用技能,留下「你的名片」一起游玩呢~