共计 11104 个字符,预计需要花费 28 分钟才能阅读完成。
本文首发于 2020-06-07 17:17:10
《ClickHouse 和他的敌人们》系列文章转载自圈内好友 BohuTANG 的博客,原文链接:
https://bohutang.me/2020/06/0…
以下为注释。
作为一个 OLAP 的 DBMS 来说,有 2 个端十分重要:
-
用户如何不便的链进来,这是入口端
- ClickHouse 除了本人的 client 外,还提供了 MySQL/PG/GRPC/HTTP 等接入形式
-
数据如何不便的挂上去,这是数据源端
- ClickHouse 除了本人的引擎外,还能够挂载 MySQL/Kafka 等内部数据源
这样内外互通,多条敌人多条路,以实现“数据”级的编排能力。
明天谈的是入口端的 MySQL 协定,也是本系列 ClickHouse 的第一个好敌人,用户可通过 MySQL 客户端或相干 Driver 间接链接到 ClickHouse,进行数据读写等操作。
本文通过 MySQL 的 Query 申请,借用调用栈来理解下 ClickHouse 的数据读取全过程。
如何实现?
入口文件在:
MySQLHandler.cpp
握手协定
- MySQLClient 发送 Greeting 数据报文到 MySQLHandler
- MySQLHandler 回复一个 Greeting-Response 报文
- MySQLClient 发送认证报文
- MySQLHandler 对认证报文进行鉴权,并返回鉴权后果
MySQL Protocol 实现在: Core/MySQLProtocol.h
最近的代码中调整为了 Core/MySQL/PacketsProtocolText.h
Query 申请
当认证通过后,就能够进行失常的数据交互了。
-
当 MySQLClient 发送申请:
mysql> SELECT * FROM system.numbers LIMIT 5;
-
MySQLHandler 的调用栈:
->MySQLHandler::comQuery -> executeQuery -> pipeline->execute -> MySQLOutputFormat::consume
- MySQLClient 接管到后果
在步骤 2 里,executeQuery(executeQuery.cpp) 十分重要。
它是所有前端 Server 和 ClickHouse 内核的接入口,第一个参数是 SQL 文本 (‘select 1’),第二个参数是后果集要发送到哪里去 (socket net)。
调用栈剖析
SELECT * FROM system.numbers LIMIT 5
1. 获取数据源
StorageSystemNumbers 数据源:
DB::StorageSystemNumbers::read(std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, std::__1::shared_ptr<DB::StorageInMemoryMetadata const> const&, DB::SelectQueryInfo const&, DB::Context const&, DB::QueryProcessingStage::Enum, unsigned long, unsigned int) StorageSystemNumbers.cpp:135
DB::ReadFromStorageStep::ReadFromStorageStep(std::__1::shared_ptr<DB::RWLockImpl::LockHolderImpl>, std::__1::shared_ptr<DB::StorageInMemoryMetadata const>&, DB::SelectQueryOptions,
DB::InterpreterSelectQuery::executeFetchColumns(DB::QueryProcessingStage::Enum, DB::QueryPlan&, std::__1::shared_ptr<DB::PrewhereInfo> const&, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&) memory:3028
DB::InterpreterSelectQuery::executeFetchColumns(DB::QueryProcessingStage::Enum, DB::QueryPlan&, std::__1::shared_ptr<DB::PrewhereInfo> const&, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&) InterpreterSelectQuery.cpp:1361
DB::InterpreterSelectQuery::executeImpl(DB::QueryPlan&, std::__1::shared_ptr<DB::IBlockInputStream> const&, std::__1::optional<DB::Pipe>) InterpreterSelectQuery.cpp:791
DB::InterpreterSelectQuery::buildQueryPlan(DB::QueryPlan&) InterpreterSelectQuery.cpp:472
DB::InterpreterSelectWithUnionQuery::buildQueryPlan(DB::QueryPlan&) InterpreterSelectWithUnionQuery.cpp:183
DB::InterpreterSelectWithUnionQuery::execute() InterpreterSelectWithUnionQuery.cpp:198
DB::executeQueryImpl(const char *, const char *, DB::Context &, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer *) executeQuery.cpp:385
DB::executeQuery(DB::ReadBuffer&, DB::WriteBuffer&, bool, DB::Context&, std::__1::function<void (std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&,
DB::MySQLHandler::comQuery(DB::ReadBuffer&) MySQLHandler.cpp:307
DB::MySQLHandler::run() MySQLHandler.cpp:141
这里最次要的是 ReadFromStorageStep 函数,从不同 storage 里获取数据源 pipe:
Pipes pipes = storage->read(required_columns, metadata_snapshot, query_info, *context, processing_stage, max_block_size, max_streams);
2. Pipeline 结构
DB::LimitTransform::LimitTransform(DB::Block const&, unsigned long, unsigned long, unsigned long, bool, bool, std::__1::vector<DB::SortColumnDescription, std::__1::allocator<DB::SortColumnDescription> >) LimitTransform.cpp:21
DB::LimitStep::transformPipeline(DB::QueryPipeline&) memory:2214
DB::LimitStep::transformPipeline(DB::QueryPipeline&) memory:2299
DB::LimitStep::transformPipeline(DB::QueryPipeline&) memory:3570
DB::LimitStep::transformPipeline(DB::QueryPipeline&) memory:4400
DB::LimitStep::transformPipeline(DB::QueryPipeline&) LimitStep.cpp:33
DB::ITransformingStep::updatePipeline(std::__1::vector<std::__1::unique_ptr<DB::QueryPipeline, std::__1::default_delete<DB::QueryPipeline> >, std::__1::allocator<std::__1::unique_ptr<DB::QueryPipeline, std::__1::default_delete<DB::QueryPipeline> > > >) ITransformingStep.cpp:21
DB::QueryPlan::buildQueryPipeline() QueryPlan.cpp:154
DB::InterpreterSelectWithUnionQuery::execute() InterpreterSelectWithUnionQuery.cpp:200
DB::executeQueryImpl(const char *, const char *, DB::Context &, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer *) executeQuery.cpp:385
DB::executeQuery(DB::ReadBuffer&, DB::WriteBuffer&, bool, DB::Context&, std::__1::function<void (std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)>) executeQuery.cpp:722
DB::MySQLHandler::comQuery(DB::ReadBuffer&) MySQLHandler.cpp:307
DB::MySQLHandler::run() MySQLHandler.cpp:141
3. Pipeline 执行
DB::LimitTransform::prepare(std::__1::vector<unsigned long, std::__1::allocator<unsigned long> > const&, std::__1::vector<unsigned long, std::__1::allocator<unsigned long> > const&) LimitTransform.cpp:67
DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue<DB::PipelineExecutor::ExecutionState*, std::__1::deque<DB::PipelineExecutor::ExecutionState*, std::__1::allocator<DB::PipelineExecutor::ExecutionState*> > >&, std::__1::unique_lock<std::__1::mutex>) PipelineExecutor.cpp:291
DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&, std::__1::queue<DB::PipelineExecutor::ExecutionState*, std::__1::deque<DB::PipelineExecutor::ExecutionState*, std::__1::allocator<DB::PipelineExecutor::ExecutionState*> > >&, unsigned long) PipelineExecutor.cpp:264
DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue<DB::PipelineExecutor::ExecutionState*, std::__1::deque<DB::PipelineExecutor::ExecutionState*, std::__1::allocator<DB::PipelineExecutor::ExecutionState*> > >&, std::__1::unique_lock<std::__1::mutex>) PipelineExecutor.cpp:373
DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&, std::__1::queue<DB::PipelineExecutor::ExecutionState*, std::__1::deque<DB::PipelineExecutor::ExecutionState*, std::__1::allocator<DB::PipelineExecutor::ExecutionState*> > >&, unsigned long) PipelineExecutor.cpp:264
DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue<DB::PipelineExecutor::ExecutionState*, std::__1::deque<DB::PipelineExecutor::ExecutionState*, std::__1::allocator<DB::PipelineExecutor::ExecutionState*> > >&, std::__1::unique_lock<std::__1::mutex>) PipelineExecutor.cpp:373
DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&, std::__1::queue<DB::PipelineExecutor::ExecutionState*, std::__1::deque<DB::PipelineExecutor::ExecutionState*, std::__1::allocator<DB::PipelineExecutor::ExecutionState*> > >&, unsigned long) PipelineExecutor.cpp:264
DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue<DB::PipelineExecutor::ExecutionState*, std::__1::deque<DB::PipelineExecutor::ExecutionState*, std::__1::allocator<DB::PipelineExecutor::ExecutionState*> > >&, std::__1::unique_lock<std::__1::mutex>) PipelineExecutor.cpp:373
DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&, std::__1::queue<DB::PipelineExecutor::ExecutionState*, std::__1::deque<DB::PipelineExecutor::ExecutionState*, std::__1::allocator<DB::PipelineExecutor::ExecutionState*> > >&, unsigned long) PipelineExecutor.cpp:264
DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue<DB::PipelineExecutor::ExecutionState*, std::__1::deque<DB::PipelineExecutor::ExecutionState*, std::__1::allocator<DB::PipelineExecutor::ExecutionState*> > >&, std::__1::unique_lock<std::__1::mutex>) PipelineExecutor.cpp:373
DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&, std::__1::queue<DB::PipelineExecutor::ExecutionState*, std::__1::deque<DB::PipelineExecutor::ExecutionState*, std::__1::allocator<DB::PipelineExecutor::ExecutionState*> > >&, unsigned long) PipelineExecutor.cpp:264
DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue<DB::PipelineExecutor::ExecutionState*, std::__1::deque<DB::PipelineExecutor::ExecutionState*, std::__1::allocator<DB::PipelineExecutor::ExecutionState*> > >&, std::__1::unique_lock<std::__1::mutex>) PipelineExecutor.cpp:373
DB::PipelineExecutor::initializeExecution(unsigned long) PipelineExecutor.cpp:747
DB::PipelineExecutor::executeImpl(unsigned long) PipelineExecutor.cpp:764
DB::PipelineExecutor::execute(unsigned long) PipelineExecutor.cpp:479
DB::executeQuery(DB::ReadBuffer&, DB::WriteBuffer&, bool, DB::Context&, std::__1::function<void (std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)>) executeQuery.cpp:833
DB::MySQLHandler::comQuery(DB::ReadBuffer&) MySQLHandler.cpp:307
DB::MySQLHandler::run() MySQLHandler.cpp:141
4. Output 执行发送
DB::MySQLOutputFormat::consume(DB::Chunk) MySQLOutputFormat.cpp:53
DB::IOutputFormat::work() IOutputFormat.cpp:62
DB::executeJob(DB::IProcessor *) PipelineExecutor.cpp:155
operator() PipelineExecutor.cpp:172
DB::PipelineExecutor::executeStepImpl(unsigned long, unsigned long, std::__1::atomic<bool>*) PipelineExecutor.cpp:630
DB::PipelineExecutor::executeSingleThread(unsigned long, unsigned long) PipelineExecutor.cpp:546
DB::PipelineExecutor::executeImpl(unsigned long) PipelineExecutor.cpp:812
DB::PipelineExecutor::execute(unsigned long) PipelineExecutor.cpp:479
DB::executeQuery(DB::ReadBuffer&, DB::WriteBuffer&, bool, DB::Context&, std::__1::function<void (std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)>) executeQuery.cpp:800
DB::MySQLHandler::comQuery(DB::ReadBuffer&) MySQLHandler.cpp:311
DB::MySQLHandler::run() MySQLHandler.cpp:141
总结
ClickHouse 的模块化比拟清晰,像乐高积木一样能够组合拼装,当咱们执行:
SELECT * FROM system.numbers LIMIT 5
首先内核解析 SQL 语句生成 AST,而后依据 AST 获取数据源 Source,pipeline.Add(Source)。
其次依据 AST 信息生成 QueryPlan,依据 QueryPlan 再生成相应的 Transform,pipeline.Add(LimitTransform)。
而后增加 Output Sink 作为数据发送对象,pipeline.Add(OutputSink)。
执行 pipeline, 各个 Transformer 开始工作。
ClickHouse 的 Transformer 调度零碎叫做 Processor,也是决定性能的重要模块,详情见 Pipeline 处理器和调度器。
ClickHouse 是一辆手动挡的奢华跑车,收费领有,海啸们!
欢送关注我的微信公众号【数据库内核】:分享支流开源数据库和存储引擎相干技术。
题目 | 网址 |
---|---|
GitHub | https://dbkernel.github.io |
知乎 | https://www.zhihu.com/people/… |
思否(SegmentFault) | https://segmentfault.com/u/db… |
掘金 | https://juejin.im/user/5e9d3e… |
开源中国(oschina) | https://my.oschina.net/dbkernel |
博客园(cnblogs) | https://www.cnblogs.com/dbkernel |