关于clickhouse:ClickHouse-源码泛读

38次阅读

共计 5197 个字符,预计需要花费 13 分钟才能阅读完成。

ClickHouse 源码泛读

前言

首先从最整体的视角看下 ClickHouse 的解决流程:

入口函数
TCP/HTTP/RPCHandler::runImpl

  • 构建 pipeline
    state.io = executeQuery()
  • 调度执行 pipeline, reply to client
if(state.io.pipeline.pushing()) {processInsertQuery();
} else if (state.io.pipeline.pulling()) {processOrdinaryQueryWithProcessors();
} else if ... {...}

整体分为两大块:

  • 解析 sql,构建 pipeline。
  • 而后依据 pipeline 的特点(insert or other)抉择对应的调度器执行 pipeline,拿到后果返回给客户端。
    对于第二局部能够参考我之前写的文章:ClickHouse 之 Pipeline 执行引擎,这篇文章次要剖析第一局部。

executeQuery

地位:src/Interpreters/executeQuery.cpp 1073

转发到 executeQueryImpl。

executeQueryImpl

地位:src/Interpreters/executeQuery.cpp 358

解析 SQL,并依据 sql 类型结构对应的 Interpreter,调用 Interpreter 的 execute() 函数,取得 pipeline,本文以 Select 语句为例进行剖析。

InterpreterSelectQuery::execute

地位:src/Interpreters/InterpreterSelectQuery.cpp 684

  • 结构 QueryPlan
  • 依据 QueryPlan 结构 QueryPipelineBuilder
  • 依据 builder 结构 pipeline

其中第二局部和第三局部的逻辑比较简单,本文暂且略过不表,重点剖析第一局部。

InterpreterSelectQuery::buildQueryPlan

地位:src/Interpreters/InterpreterSelectQuery.cpp 656

次要工作转发到 executeImpl

InterpreterSelectQuery::executeImpl

地位:src/Interpreters/InterpreterSelectQuery.cpp 1105

/// Read the data from Storage. from_stage - to what stage the request was completed in Storage.
executeFetchColumns(from_stage, query_plan);

/// 依据解析后的 ast 以及其余信息向 query_plan 中一直增加各种类型的 QueryPlanStep,注:QueryPlan 实际上是一个树状构造,树节点类型为 QueryPlanStep。

这里将 executeFetchColumns 独自列出来,因为这里波及到构建从存储引擎读取数据的 QueryPlanStep,本文着重剖析这里。

InterpreterSelectQuery::executeFetchColumns

地位:src/Interpreters/InterpreterSelectQuery.cpp 1926

函数前半部分设计很多优化相干以及各种参数的获取,在刚开始浏览源码的时候这些内容能够暂且跳过,首先梳理分明整个我的项目的枝干,由粗到细缓缓剖析,否则很容易迷失在繁冗的细节中。关注 2159 行这里:

storage->read(query_plan, required_columns, storage_snapshot, query_info, context, processing_stage, max_block_size, max_streams);

StorageMergeTree::read

地位:src/Storages/StorageMergeTree.cpp 215

关注这里:

if (auto plan = reader.read(column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams, processed_stage, nullptr, enable_parallel_reading))
    query_plan = std::move(*plan);

MergeTreeDataSelectExecutor::read

地位:src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp 135

这里对于查问是否应用 projection 进行分状况解决,咱们暂且关注不应用 projection 的分支。

MergeTreeDataSelectExecutor::readFromParts

地位:src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp 1282

关注这部分代码:

    auto read_from_merge_tree = std::make_unique<ReadFromMergeTree>(std::move(parts),
        real_column_names,
        virt_column_names,
        data,
        query_info,
        storage_snapshot,
        context,
        max_block_size,
        num_streams,
        sample_factor_column_queried,
        max_block_numbers_to_read,
        log,
        merge_tree_select_result_ptr,
        enable_parallel_reading
    );

    QueryPlanPtr plan = std::make_unique<QueryPlan>();
    plan->addStep(std::move(read_from_merge_tree));
    return plan;

剖析到这里可知,在结构 QueryPlan 阶段咱们实际上只往 QueryPlan 中增加了一个 QueryPlanStep,它的类型是 ReadFromMergeTree,读者能够看下这个类的继承关系验证,它的确是 QueryPlanStep 子类型。接下来的重点就是剖析 ReadFromMergeTree 这个类型。

在剖析之前咱们有必要晓得以下信息:
在 依据 QueryPlan 结构 QueryPipelineBuilder 阶段,咱们实际上依赖于 QueryPlanStep 的虚函数:

/// Add processors from current step to QueryPipeline.
/// Calling this method, we assume and don't check that:
///   * pipelines.size() == getInputStreams.size()
///   * header from each pipeline is the same as header from corresponding input_streams
/// Result pipeline must contain any number of streams with compatible output header is hasOutputStream(),
///   or pipeline should be completed otherwise.
virtual QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings) = 0;

然而咱们发现 ReadFromMergeTree 并没有重写这个函数,起因如下:
ReadFromMergeTree 的继承链为 ReadFromMergeTree -> ISourceStep -> QueryPlanStep。
在 ISourceStep 中曾经重写了这个函数,因而咱们只须要关注 initializePipeline 这个虚函数即可。

QueryPipelineBuilderPtr ISourceStep::updatePipeline(QueryPipelineBuilders, const BuildQueryPipelineSettings & settings)
{auto pipeline = std::make_unique<QueryPipelineBuilder>();
    QueryPipelineProcessorsCollector collector(*pipeline, this);
    initializePipeline(*pipeline, settings);
    auto added_processors = collector.detachProcessors();
    processors.insert(processors.end(), added_processors.begin(), added_processors.end());
    return pipeline;
}

ReadFromMergeTree::initializePipeline

关注:

auto result = getAnalysisResult();
...
pipe = spreadMarkRangesAmongStreams(std::move(result.parts_with_ranges),
    column_names_to_read);
...
pipeline.init(std::move(pipe));

能够看到,咱们是通过一个 pipe 初始化了 pipeline(type : QueryPipelineBuilder),而后在 ISourceStep::updatePipeline 中返回并参加构建 pipeline,因而咱们的重点转移到了如何构建这个 pipe。注:对于 QueryPipelineBuilder 和 Pipe 的关系,大家能够跳转看看,其实只是一层很浅的封装。

ReadFromMergeTree::spreadMarkRangesAmongStreams

地位:src/Processors/QueryPlan/ReadFromMergeTree.cpp 375

转发到 read 函数

ReadFromMergeTree::read

地位:src/Processors/QueryPlan/ReadFromMergeTree.cpp 287
代码如下:

Pipe ReadFromMergeTree::read(
    RangesInDataParts parts_with_range, Names required_columns, ReadType read_type,
    size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache)
{if (read_type == ReadType::Default && max_streams > 1)
        return readFromPool(parts_with_range, required_columns, max_streams,
                            min_marks_for_concurrent_read, use_uncompressed_cache);

    auto pipe = readInOrder(parts_with_range, required_columns, read_type, use_uncompressed_cache, 0);

    /// Use ConcatProcessor to concat sources together.
    /// It is needed to read in parts order (and so in PK order) if single thread is used.
    if (read_type == ReadType::Default && pipe.numOutputPorts() > 1)
        pipe.addTransform(std::make_shared<ConcatProcessor>(pipe.getHeader(), pipe.numOutputPorts()));

    return pipe;
}

如果 max_streams > 1,则转发到 readFromPool,并且在 pipe 中增加一个 ConcatProcessor,将多个 source 合并为一个。否则转发到 readInOrder。

todo

整个零碎的链路切实太长了,前面的内容有工夫再剖析吧,之后的内容能够看下这篇文章。

正文完
 0