ClickHouse 源码泛读
前言
首先从最整体的视角看下ClickHouse的解决流程:
入口函数TCP/HTTP/RPCHandler::runImpl
- 构建pipeline
state.io = executeQuery()
- 调度执行pipeline, reply to client
if(state.io.pipeline.pushing()) { processInsertQuery();} else if (state.io.pipeline.pulling()) { processOrdinaryQueryWithProcessors();} else if ... { ...}
整体分为两大块:
- 解析sql,构建pipeline。
- 而后依据pipeline的特点(insert or other)抉择对应的调度器执行pipeline,拿到后果返回给客户端。
对于第二局部能够参考我之前写的文章:ClickHouse之Pipeline执行引擎,这篇文章次要剖析第一局部。
executeQuery
地位:src/Interpreters/executeQuery.cpp 1073
转发到executeQueryImpl。
executeQueryImpl
地位:src/Interpreters/executeQuery.cpp 358
解析SQL,并依据sql类型结构对应的Interpreter,调用Interpreter的execute()函数,取得pipeline,本文以Select语句为例进行剖析。
InterpreterSelectQuery::execute
地位:src/Interpreters/InterpreterSelectQuery.cpp 684
- 结构QueryPlan
- 依据QueryPlan结构QueryPipelineBuilder
- 依据builder结构pipeline
其中第二局部和第三局部的逻辑比较简单,本文暂且略过不表,重点剖析第一局部。
InterpreterSelectQuery::buildQueryPlan
地位:src/Interpreters/InterpreterSelectQuery.cpp 656
次要工作转发到executeImpl
InterpreterSelectQuery::executeImpl
地位:src/Interpreters/InterpreterSelectQuery.cpp 1105
/// Read the data from Storage. from_stage - to what stage the request was completed in Storage.executeFetchColumns(from_stage, query_plan);/// 依据解析后的ast以及其余信息向query_plan中一直增加各种类型的QueryPlanStep,注:QueryPlan实际上是一个树状构造,树节点类型为QueryPlanStep。
这里将executeFetchColumns独自列出来,因为这里波及到构建从存储引擎读取数据的QueryPlanStep,本文着重剖析这里。
InterpreterSelectQuery::executeFetchColumns
地位:src/Interpreters/InterpreterSelectQuery.cpp 1926
函数前半部分设计很多优化相干以及各种参数的获取,在刚开始浏览源码的时候这些内容能够暂且跳过,首先梳理分明整个我的项目的枝干,由粗到细缓缓剖析,否则很容易迷失在繁冗的细节中。关注2159行这里:
storage->read(query_plan, required_columns, storage_snapshot, query_info, context, processing_stage, max_block_size, max_streams);
StorageMergeTree::read
地位:src/Storages/StorageMergeTree.cpp 215
关注这里:
if (auto plan = reader.read(column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams, processed_stage, nullptr, enable_parallel_reading)) query_plan = std::move(*plan);
MergeTreeDataSelectExecutor::read
地位:src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp 135
这里对于查问是否应用projection进行分状况解决,咱们暂且关注不应用projection的分支。
MergeTreeDataSelectExecutor::readFromParts
地位:src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp 1282
关注这部分代码:
auto read_from_merge_tree = std::make_unique<ReadFromMergeTree>( std::move(parts), real_column_names, virt_column_names, data, query_info, storage_snapshot, context, max_block_size, num_streams, sample_factor_column_queried, max_block_numbers_to_read, log, merge_tree_select_result_ptr, enable_parallel_reading ); QueryPlanPtr plan = std::make_unique<QueryPlan>(); plan->addStep(std::move(read_from_merge_tree)); return plan;
剖析到这里可知,在结构QueryPlan阶段咱们实际上只往QueryPlan中增加了一个QueryPlanStep,它的类型是ReadFromMergeTree,读者能够看下这个类的继承关系验证,它的确是QueryPlanStep子类型。接下来的重点就是剖析ReadFromMergeTree这个类型。
在剖析之前咱们有必要晓得以下信息:
在 依据QueryPlan结构QueryPipelineBuilder阶段,咱们实际上依赖于QueryPlanStep的虚函数:
/// Add processors from current step to QueryPipeline./// Calling this method, we assume and don't check that:/// * pipelines.size() == getInputStreams.size()/// * header from each pipeline is the same as header from corresponding input_streams/// Result pipeline must contain any number of streams with compatible output header is hasOutputStream(),/// or pipeline should be completed otherwise.virtual QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings) = 0;
然而咱们发现ReadFromMergeTree并没有重写这个函数,起因如下:
ReadFromMergeTree的继承链为 ReadFromMergeTree -> ISourceStep -> QueryPlanStep。
在ISourceStep中曾经重写了这个函数,因而咱们只须要关注initializePipeline这个虚函数即可。
QueryPipelineBuilderPtr ISourceStep::updatePipeline(QueryPipelineBuilders, const BuildQueryPipelineSettings & settings){ auto pipeline = std::make_unique<QueryPipelineBuilder>(); QueryPipelineProcessorsCollector collector(*pipeline, this); initializePipeline(*pipeline, settings); auto added_processors = collector.detachProcessors(); processors.insert(processors.end(), added_processors.begin(), added_processors.end()); return pipeline;}
ReadFromMergeTree::initializePipeline
关注:
auto result = getAnalysisResult();...pipe = spreadMarkRangesAmongStreams( std::move(result.parts_with_ranges), column_names_to_read);...pipeline.init(std::move(pipe));
能够看到,咱们是通过一个pipe初始化了pipeline(type : QueryPipelineBuilder),而后在ISourceStep::updatePipeline中返回并参加构建pipeline,因而咱们的重点转移到了如何构建这个pipe。注:对于QueryPipelineBuilder和Pipe的关系,大家能够跳转看看,其实只是一层很浅的封装。
ReadFromMergeTree::spreadMarkRangesAmongStreams
地位:src/Processors/QueryPlan/ReadFromMergeTree.cpp 375
转发到read函数
ReadFromMergeTree::read
地位:src/Processors/QueryPlan/ReadFromMergeTree.cpp 287
代码如下:
Pipe ReadFromMergeTree::read( RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache){ if (read_type == ReadType::Default && max_streams > 1) return readFromPool(parts_with_range, required_columns, max_streams, min_marks_for_concurrent_read, use_uncompressed_cache); auto pipe = readInOrder(parts_with_range, required_columns, read_type, use_uncompressed_cache, 0); /// Use ConcatProcessor to concat sources together. /// It is needed to read in parts order (and so in PK order) if single thread is used. if (read_type == ReadType::Default && pipe.numOutputPorts() > 1) pipe.addTransform(std::make_shared<ConcatProcessor>(pipe.getHeader(), pipe.numOutputPorts())); return pipe;}
如果max_streams > 1,则转发到readFromPool,并且在pipe中增加一个ConcatProcessor,将多个source合并为一个。否则转发到readInOrder。
todo
整个零碎的链路切实太长了,前面的内容有工夫再剖析吧,之后的内容能够看下这篇文章。