本文首发于 Nebula Graph Community 公众号
上篇咱们讲述了 Query Engine Optimizer 局部的内容,在本文咱们解说下 Query Engine 剩下的 Scheduler 和 Executor 局部。
概述
在执行阶段,执行引擎通过 Scheduler(调度器)将 Planner 生成的物理执行打算转换为一系列 Executor,驱动 Executor 的执行。
Executor,即执行器,物理执行打算中的每个 PlanNode 都会对应一个 Executor。
源码定位
调度器的源码在 src/scheduler
目录下:
src/scheduler
├── AsyncMsgNotifyBasedScheduler.cpp
├── AsyncMsgNotifyBasedScheduler.h
├── CMakeLists.txt
├── Scheduler.cpp
└── Scheduler.h
Scheduler 抽象类定义了调度器的公共接口,能够继承该类实现多种调度器。
目前实现了 AsyncMsgNotifyBasedScheduler 调度器,它基于异步音讯通信与广度优先搜寻防止栈溢出。
执行器的源码在 src/executor
目录下:
src/executor
├── admin
├── algo
├── CMakeLists.txt
├── ExecutionError.h
├── Executor.cpp
├── Executor.h
├── logic
├── maintain
├── mutate
├── query
├── StorageAccessExecutor.cpp
├── StorageAccessExecutor.h
└── test
执行过程
首先,调度器从执行打算的根节点开始通过应用广度优先搜索算法遍历整个执行打算并依据节点间的执行依赖关系,构建它们的音讯告诉机制。
执行时,每个节点收到它的所依赖的节点全副执行结束的音讯后,会被调度执行。一旦本身执行实现,又会发送音讯给依赖本人的节点,直至整个打算执行结束。
void AsyncMsgNotifyBasedScheduler::runExecutor(
std::vector<folly::Future<Status>>&& futures,
Executor* exe,
folly::Executor* runner,
std::vector<folly::Promise<Status>>&& promises) const {folly::collect(futures).via(runner).thenTry([exe, pros = std::move(promises), this](auto&& t) mutable {if (t.hasException()) {return notifyError(pros, Status::Error(t.exception().what()));
}
auto status = std::move(t).value();
auto depStatus = checkStatus(std::move(status));
if (!depStatus.ok()) {return notifyError(pros, depStatus);
}
// Execute in current thread.
std::move(execute(exe)).thenTry([pros = std::move(pros), this](auto&& exeTry) mutable {if (exeTry.hasException()) {return notifyError(pros, Status::Error(exeTry.exception().what()));
}
auto exeStatus = std::move(exeTry).value();
if (!exeStatus.ok()) {return notifyError(pros, exeStatus);
}
return notifyOK(pros);
});
});
}
每个 Executor 会经验 create-open-execute-close 四个阶段:
create
依据节点类型生成对应的 Executor。
open
在 Executor 正式执行前做一些初始化操作,以及慢查问终止和内存水位的判断。
Nebula 反对手动 kill
掉某个查问语句的执行,因而每个 Executor 执行前须要查看下以后执行打算状态,若被标记为 killed
,则终止执行。
每个 Query 类型的 Executor 执行前,还须要查看以后零碎所占用内存是否达到内存水位。若达到内存水位,则终止执行,这能在肯定水平上防止 OOM。
Status Executor::open() {if (qctx_->isKilled()) {VLOG(1) << "Execution is being killed. session:" << qctx()->rctx()->session()->id()
<< "ep:" << qctx()->plan()->id()
<< "query:" << qctx()->rctx()->query();
return Status::Error("Execution had been killed");
}
auto status = MemInfo::make();
NG_RETURN_IF_ERROR(status);
auto mem = std::move(status).value();
if (node_->isQueryNode() && mem->hitsHighWatermark(FLAGS_system_memory_high_watermark_ratio)) {
return Status::Error("Used memory(%ldKB) hits the high watermark(%lf) of total system memory(%ldKB).",
mem->usedInKB(),
FLAGS_system_memory_high_watermark_ratio,
mem->totalInKB());
}
numRows_ = 0;
execTime_ = 0;
totalDuration_.reset();
return Status::OK();}
execute
Query 类型的 Executor 的输出和输入都是一张表(DataSet)。
Executor 的执行基于迭代器模型:每次计算时,调用输出表的迭代器的 next()
办法,获取一行数据,进行计算,直至输出表被遍历结束。
计算的后果形成一张新表,输入给后续的 Executor 作为输入。
folly::Future<Status> ProjectExecutor::execute() {SCOPED_TIMER(&execTime_);
auto* project = asNode<Project>(node());
auto columns = project->columns()->columns();
auto iter = ectx_->getResult(project->inputVar()).iter();
DCHECK(!!iter);
QueryExpressionContext ctx(ectx_);
VLOG(1) << "input:" << project->inputVar();
DataSet ds;
ds.colNames = project->colNames();
ds.rows.reserve(iter->size());
for (; iter->valid(); iter->next()) {
Row row;
for (auto& col : columns) {Value val = col->expr()->eval(ctx(iter.get()));
row.values.emplace_back(std::move(val));
}
ds.rows.emplace_back(std::move(row));
}
VLOG(1) << node()->outputVar() << ":" << ds;
return finish(ResultBuilder().value(Value(std::move(ds))).finish());
}
如果以后 Executor 的输出表不会被其余 Executor 作为输出时,这些输出表所用的内存会在执行阶段被 drop 掉,减小内存占用。
void Executor::drop() {for (const auto &inputVar : node()->inputVars()) {if (inputVar != nullptr) {
// Make sure use the variable happened-before decrement count
if (inputVar->userCount.fetch_sub(1, std::memory_order_release) == 1) {
// Make sure drop happened-after count decrement
CHECK_EQ(inputVar->userCount.load(std::memory_order_acquire), 0);
ectx_->dropResult(inputVar->name);
VLOG(1) << "Drop variable" << node()->outputVar();
}
}
}
}
close
Executor 执行结束后,将收集到的一些执行信息如执行工夫,输出表的行数等增加到 profiling stats 中。
用户能够在 profile 一个语句后显示的执行打算中查看这些统计信息。
Execution Plan (optimize time 141 us)
-----+------------------+--------------+-----------------------------------------------------+--------------------------------------
| id | name | dependencies | profiling data | operator info |
-----+------------------+--------------+-----------------------------------------------------+--------------------------------------
| 2 | Project | 3 | ver: 0, rows: 56, execTime: 147us, totalTime: 160us | outputVar: [ |
| | | | | { |
| | | | | "colNames": [ |
| | | | | "VertexID", |
| | | | | "player.age" |
| | | | | ], |
| | | | | "name": "__Project_2", |
| | | | | "type": "DATASET" |
| | | | | } |
| | | | | ] |
| | | | | inputVar: __TagIndexFullScan_1 |
| | | | | columns: [ |
| | | | | "$-.VertexID AS VertexID", |
| | | | | "player.age" |
| | | | | ] |
-----+------------------+--------------+-----------------------------------------------------+--------------------------------------
| 3 | TagIndexFullScan | 0 | ver: 0, rows: 56, execTime: 0us, totalTime: 6863us | outputVar: [ |
| | | | | { |
| | | | | "colNames": [ |
| | | | | "VertexID", |
| | | | | "player.age" |
| | | | | ], |
| | | | | "name": "__TagIndexFullScan_1", |
| | | | | "type": "DATASET" |
| | | | | } |
| | | | | ] |
| | | | | inputVar: |
| | | | | space: 318 |
| | | | | dedup: false |
| | | | | limit: 9223372036854775807 |
| | | | | filter: |
| | | | | orderBy: [] |
| | | | | schemaId: 319 |
| | | | | isEdge: false |
| | | | | returnCols: [ |
| | | | | "_vid", |
| | | | | "age" |
| | | | | ] |
| | | | | indexCtx: [ |
| | | | | { |
| | | | | "columnHints": [], |
| | | | | "index_id": 325, |
| | | | | "filter": "" |
| | | | | } |
| | | | | ] |
-----+------------------+--------------+-----------------------------------------------------+--------------------------------------
| 0 | Start | | ver: 0, rows: 0, execTime: 1us, totalTime: 19us | outputVar: [ |
| | | | | { |
| | | | | "colNames": [], |
| | | | | "type": "DATASET", |
| | | | | "name": "__Start_0" |
| | | | | } |
| | | | | ] |
-----+------------------+--------------+-----------------------------------------------------+--------------------------------------
以上,源码解析 Query Engine 相干的模块就解说结束了,后续将解说局部个性内容。
交换图数据库技术?退出 Nebula 交换群请先填写下你的 Nebula 名片,Nebula 小助手会拉你进群~~
【流动】Nebula Hackathon 2021 进行中,一起来摸索未知,支付 ¥ 150,000 奖金 →→ https://nebula-graph.com.cn/hackathon/