本文首发于 2020-06-12 20:57:10
《ClickHouse和他的敌人们》系列文章转载自圈内好友 BohuTANG 的博客,原文链接:
https://bohutang.me/2020/06/1...
以下为注释。
最初更新: 2020-08-15
本文谈下 ClickHouse 外围科技:处理器 Processor 和有向无环调度器 DAG Scheduler。
这些概念并不是 ClickHouse 独创,感兴趣的同学能够关注下 materialize 的 timely-dataflow,虎哥用 golang 也写过一个原型。
拼的是实现细节,正是这些模块的精良设计,才有了 ClickHous e整体的高性能。
Pipeline问题
在传统数据库系统中,一个 Query 解决流程大体是:
其中在 Plan 阶段,往往会减少一个 Pipeline 组装(一个 transformer 代表一次数据处理):
所有 transformer 被编排成一个流水线(pipeline),而后交给 executor 串行式执行,每执行一个 transformer 数据集就会被加工并输入,始终到上游的 sinker。
能够看到,这种模型的长处是简略,毛病是性能低,无奈施展 CPU 的并行能力,通常叫火山模型(volcano-style),对于 OLTP 低提早来说足够,对于计算密集的 OLAP 来说是远远不够的,CPU 不到 100% 就是立功!
对于下面的例子,如果 transformer1 和 transformer2 没有交加,那么它们就能够并行处理:
这样就波及到一些比拟灵魂的问题:
- 如何实现 transformer 的灵便编排?
- 如何实现 transformer 间的数据同步?
- 如何实现 transformer 间的并行调度?
Processor 和 DAG Scheduler
1. Transformer 编排
ClickHouse 实现了一系列根底 transformer 模块,见 src/Processors/Transforms,比方:
- FilterTransform – WHERE 条件过滤
- SortingTransform – ORDER BY 排序
- LimitByTransform – LIMIT 裁剪
当咱们执行:
SELECT * FROM t1 WHERE id=1 ORDER BY time DESC LIMIT 10
对于 ClickHouse 的 QueryPipeline 来说,它会依照以下形式进行编排组装:
QueryPipeline::addSimpleTransform(Source)QueryPipeline::addSimpleTransform(FilterTransform)QueryPipeline::addSimpleTransform(SortingTransform)QueryPipeline::addSimpleTransform(LimitByTransform)QueryPipeline::addSimpleTransform(Sinker)
这样就实现了 Transformer 的编排,然而执行时数据如何进行同步呢?
2. Transformer 数据同步
当 QueryPipeline 进行 transformer 编排时,咱们还须要进行更加底层的 DAG 连通构建。
connect(Source.OutPort, FilterTransform.InPort)connect(FilterTransform.OutPort, SortingTransform.InPort)connect(SortingTransform.OutPort, LimitByTransform.InPort)connect(LimitByTransform.OutPort, Sinker.InPort)
这样就实现了数据的流向关系,一个 transformer 的 OutPort 对接另外一个的 InPort,就像咱们事实中的水管管道一样,接口有 3 通甚至多通。
3. Transformer 执行调度
当初管道组装起来了,那么管道内的水如何进行解决和给压流动呢?
ClickHouse 定义了一套 transform 状态,processor 依据这些状态来实现调度。
enum class Status{ NeedData // 期待数据流进入 PortFull, // 管道流出端阻塞 Finished, // 实现状态,退出 Ready, // 切换到 work 函数,进行逻辑解决 Async, // 切换到 schedule 函数,进行异步解决 Wait, // 期待异步解决 ExpandPipeline, // Pipeline 须要裂变};
当 source 生成数据后,它的状态会设置为 PortFull,意思是等着流入其余 transformer 的 InPort,processor 会开始调度 FilterTransformer(NeedData) 的 Prepare,进行 PullData,而后它的状态设置为 Ready,期待 processor 调度 Work 办法进行数据Filter解决,大家就这样靠状态让 processor 去感知,来调度和做状态迁徙,直到 Finished 状态。
这里值得一提的是 ExpandPipeline 状态,它会依据 transformer 的实现,能够把一个 transformer 裂变出更多个 transformer 并行执行,达到一个爆炸成果。
Example
SELECT number + 1 FROM t1;
为了更加深刻了解 ClickHouse 的 processor 和 scheduler 机制,咱们来一个原生态的 example:
- 一个 Source:{0,1,2,3,4}
- AdderTransformer 对每个数字做加1操作
- 一个 Sinker,输入后果
1. Source
class MySource : public ISource{public: String getName() const override { return "MySource"; } MySource(UInt64 end_) : ISource(Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})), end(end_) { }private: UInt64 end; bool done = false; Chunk generate() override { if (done) { return Chunk(); } MutableColumns columns; columns.emplace_back(ColumnUInt64::create()); for (auto i = 0U; i < end; i++) columns[0]->insert(i); done = true; return Chunk(std::move(columns), end); }};
2. MyAddTransform
class MyAddTransformer : public IProcessor{public: String getName() const override { return "MyAddTransformer"; } MyAddTransformer() : IProcessor( {Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})}, {Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})}) , input(inputs.front()) , output(outputs.front()) { } Status prepare() override { if (output.isFinished()) { input.close(); return Status::Finished; } if (!output.canPush()) { input.setNotNeeded(); return Status::PortFull; } if (has_process_data) { output.push(std::move(current_chunk)); has_process_data = false; } if (input.isFinished()) { output.finish(); return Status::Finished; } if (!input.hasData()) { input.setNeeded(); return Status::NeedData; } current_chunk = input.pull(false); return Status::Ready; } void work() override { auto num_rows = current_chunk.getNumRows(); auto result_columns = current_chunk.cloneEmptyColumns(); auto columns = current_chunk.detachColumns(); for (auto i = 0U; i < num_rows; i++) { auto val = columns[0]->getUInt(i); result_columns[0]->insert(val+1); } current_chunk.setColumns(std::move(result_columns), num_rows); has_process_data = true; } InputPort & getInputPort() { return input; } OutputPort & getOutputPort() { return output; }protected: bool has_input = false; bool has_process_data = false; Chunk current_chunk; InputPort & input; OutputPort & output;};
3. MySink
class MySink : public ISink{public: String getName() const override { return "MySinker"; } MySink() : ISink(Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})) { }private: WriteBufferFromFileDescriptor out{STDOUT_FILENO}; FormatSettings settings; void consume(Chunk chunk) override { size_t rows = chunk.getNumRows(); size_t columns = chunk.getNumColumns(); for (size_t row_num = 0; row_num < rows; ++row_num) { writeString("prefix-", out); for (size_t column_num = 0; column_num < columns; ++column_num) { if (column_num != 0) writeChar('\t', out); getPort() .getHeader() .getByPosition(column_num) .type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings); } writeChar('\n', out); } out.next(); }};
4. DAG Scheduler
int main(int, char **){ auto source0 = std::make_shared<MySource>(5); auto add0 = std::make_shared<MyAddTransformer>(); auto sinker0 = std::make_shared<MySink>(); /// Connect. connect(source0->getPort(), add0->getInputPort()); connect(add0->getOutputPort(), sinker0->getPort()); std::vector<ProcessorPtr> processors = {source0, add0, sinker0}; PipelineExecutor executor(processors); executor.execute(1);}
总结
从开发者角度看还是比较复杂,状态迁徙还须要开发者本人管制,不过 upstream 曾经做了大量的根底工作,比方对 source的封装 ISource,对 sink 的封装 ISink,还有一个根底的 ISimpleTransform,让开发者在下层应用 processor 时更加容易,能够积木式搭建出本人想要的 pipeline。
ClickHouse 的 transformer 数据单元是 Chunk,transformer 对上游 OutPort 流过来的 Chunk 进行加工,而后输入给上游的 InPort,图连通式的流水线并行工作,让 CPU 尽量满负荷工作。
当一个 SQL 被解析成 AST 后,ClickHouse 依据 AST 构建 Query Plan,而后依据 QueryPlan 构建出 pipeline,最初由 processor 负责调度和执行。
目前,ClickHouse 新版本曾经默认开启 QueryPipeline,同时这块代码也在不停的迭代。
欢送关注我的微信公众号【数据库内核】:分享支流开源数据库和存储引擎相干技术。
题目 | 网址 |
---|---|
GitHub | https://dbkernel.github.io |
知乎 | https://www.zhihu.com/people/... |
思否(SegmentFault) | https://segmentfault.com/u/db... |
掘金 | https://juejin.im/user/5e9d3e... |
开源中国(oschina) | https://my.oschina.net/dbkernel |
博客园(cnblogs) | https://www.cnblogs.com/dbkernel |