关于javascript:ClickHouse-源码阅读-SQL的前世今生

注:以下剖析基于开源 v19.15.2.2-stable 版本进行,社区最新版本代码改变较大,然而总体思路是不变的。

用户提交一条查问SQL背地产生了什么?

在传统关系型数据库中,SQL处理器的组件次要包含以下几种:

• Query Parsing
负责进行词法和语法分析,把程序从人类高可读的格局(即SQL)转化成机器高可读的格局(AST,形象语法树)。

词法剖析指的是把SQL中的字符序列分解成一个个独立的词法单元——Token(<类型,值>)。
语法分析指的是从词法分析器输入的token中辨认各类短语,并结构出一颗形象语法树。而依照结构形象语法树的方向,又能够把语法分析分成自顶向下和自底向上剖析两种。而ClickHouse采纳的则是手写一个递归降落的语法分析器。

• Query Rewrite
即通常咱们说的”Logical Optimizer”或基于规定的优化器(Rule-Based Optimizer,即RBO)。

其负责利用一些启发式规定,负责简化和标准化查问,无需扭转查问的语义。

常见操作有:谓词和算子下推,视图开展,简化常量运算表达式,谓词逻辑的重写,语义的优化等。

• Query Optimizer
即通常咱们所说的”Physical Optimizer”,负责把外部查问表白转化成一个高效的查问打算,领导DBMS如何去取表,如何进行排序,如何Join。如下图所示,一个查问打算能够被认为是一个数据流图,在这个数据流图中,表数据会像在管道中传输一样,从一个查问操作符(operator)传递到另一个查问操作符。

一个查问打算

• Query Executor
查问执行器,负责执行具体的查问打算,从存储引擎中获取数据并且对数据利用查问打算失去后果。
执行引擎也分为很多种,如经典的火山模型(Volcano Model),还有ClickHouse采纳的向量化执行模型(Vectorization Model)。

(图来自经典论文 Architecture Of Database System)

但不论是传统的关系型数据库,还是非关系型数据库,SQL的解析和生成执行打算过程都是大同小异的,而纵览ClickHouse的源代码,能够把用户提交一条查问SQL背地的过程总结如下:

1.服务端接管客户端发来的SQL申请,具体模式是一个网络包,Server的协定层须要拆包把SQL解析进去

2.Server负责初始化上下文与Network Handler,而后 Parser 对Query做词法和语法分析,解析成AST

3.Interpreter的 SyntaxAnalyzer 会利用一些启发式规定对AST进行优化重写

4.Interpreter的 ExpressionAnalyzer 依据上下文信息以及优化重写后的AST生成物理执行打算

5.物理执行打算散发到本地或者分布式的executor,各自从存储引擎中获取数据,利用执行打算

6.Server把执行后的后果以Block流的模式输入到Socket缓冲区,Client从Socket中读取即可失去后果

接管客户端申请

咱们要以服务端的视角来登程,首先来看server.cpp大略做什么事件:

上面只筛选重要的逻辑:

• 初始化上下文

• 初始化Zookeeper(ClickHouse的正本复制机制须要依赖ZooKeeper)

• 惯例配置初始化

• 绑定服务端的端口,依据网络协议初始化Handler,对客户端提供服务

int Server::main()
{
    // 初始化上下文
    global_context = std::make_unique<Context>(Context::createGlobal());
    global_context->setApplicationType(Context::ApplicationType::SERVER);
     
    // zk初始化
    zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); });
    
    //其余config的初始化
    //...
    
    //绑定端口,对外提供服务
    auto address = make_socket_address(host, port);
    socket.bind(address, /* reuseAddress = */ true);

    //依据网络协议建设不同的server类型
    //当初反对的server类型有: HTTP,HTTPS,TCP,Interserver,mysql
    //以TCP版本为例:
    create_server("tcp_port", [&](UInt16 port)
    {
        Poco::Net::ServerSocket socket;
        auto address = socket_bind_listen(socket, listen_host, port);
        servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
            new TCPHandlerFactory(*this),
            server_pool,
            socket,
            new Poco::Net::TCPServerParams));
     });
    
    //启动server
    for (auto & server : servers)
            server->start();
   
}

客户端发来的申请是由各自网络协议所对应的 Handler 来进行的,server在启动的时候 Handler 会被初始化并绑定在指定端口中。咱们以TCPHandler为例,看看服务端是如何解决客户端发来的申请的,重点关注 TCPHandler::runImpl 的函数实现:

• 初始化输出和输入流的缓冲区

• 承受申请报文,拆包

• 执行Query(包含整个词法语法分析,Query重写,物理打算生成和生成后果)

• 把Query后果保留到输入流,而后发送到Socket的缓冲区,期待发送回客户端

void TCPHandler::runImpl()
{
    //实例化套接字对应的输出和输入流缓冲区
    in = std::make_shared<ReadBufferFromPocoSocket>(socket());
    out = std::make_shared<WriteBufferFromPocoSocket>(socket());
    
    while (1){
        // 接管申请报文
        receivePacket();
        
        // 执行Query    
        state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);
    
        //依据Query品种来解决不同的Query
        //解决insert Query
        processInsertQuery();
        //并发解决一般Query
        processOrdinaryQueryWithProcessors();
        //单线程解决一般Query
        processOrdinaryQuery();
    }
    
}

那CK解决客户端发送过去的Query的具体逻辑是怎么的呢?

咱们能够在dbms/src/Interpreters/executeQuery.cpp 中一探到底:

具体逻辑在 executeQueryImpl 函数中,筛选外围的逻辑进行解说:

static std::tuple<ASTPtr, BlockIO> executeQueryImpl()
{
    //结构Parser
    ParserQuery parser(end, settings.enable_debug_queries);
    ASTPtr ast;

    //把Query转化为形象语法树
    ast = parseQuery(parser, begin, end, "", max_query_size);

    //生成interpreter实例
    auto interpreter = InterpreterFactory::get(ast, context, stage);

    // interpreter解析AST,后果是BlockIO
    res = interpreter->execute();

    //返回后果是形象语法树和解析后的后果组成的二元组
    return std::make_tuple(ast, res);
}

该函数所做的事件:

• 构建Parser,把Query解析成AST(形象语法树)

• InterpreterFactory依据AST生成对应的Interpreter实例

• AST是由Interpreter来解析的,执行后果是一个BlockIO,BlockIO是对 BlockInputStreamBlockOutputStream的一个封装。

总结:
• 服务端调用 executeQuery 来解决client发送的Query,执行后的后果保留在state这个构造体的io成员中。

每一条Query都会对应一个state构造体,记录了这条Query的id,解决状态,压缩算法,Query的文本和Query所解决数据对应的IO流等元信息。

• 而后服务端调用 processOrdinaryQuery 等办法把输入流后果封装成异步的IO流,发送到回client。

解析申请(Parser)

CK抉择采纳手写一个递归降落的Parser来对SQL进行解析,生成的后果是这个SQL对应的形象语法树(AST),形象语法树由示意各个操作的节点(IAST)示意。而本节次要介绍Parser背地的外围逻辑:

词法剖析和语法分析的外围逻辑能够在parseQuery.cpp的 tryParseQuery 中和盘托出。

该函数利用lexer将扫描Query字符流,将其宰割为一个个的Token, token_iterator 即一个Token流迭代器,而后parser再对Token流进行解析生成AST形象语法树。

ASTPtr tryParseQuery()
{
    //Token为lexer词法剖析后的根本单位,词法剖析后生成的是Token流
    Tokens tokens(pos, end, max_query_size);
    IParser::Pos token_iterator(tokens);
    ASTPtr res;
    //Token流通过语法分析生成AST形象语法树
    bool parse_res = parser.parse(token_iterator, res, expected);
    return res;

}

咱们能够看到,语法分析的外围就在于parser执行的parse办法。parse 办法具体的实现在 ParserQuery.cppparseImpl 中。

bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
    ParserQueryWithOutput query_with_output_p(enable_explain);
    ParserInsertQuery insert_p(end);
    ParserUseQuery use_p;
    ParserSetQuery set_p;
    ParserSystemQuery system_p;

    bool res = query_with_output_p.parse(pos, node, expected)
        || insert_p.parse(pos, node, expected)
        || use_p.parse(pos, node, expected)
        || set_p.parse(pos, node, expected)
        || system_p.parse(pos, node, expected);

    return res;
}

咱们能够看到,这个办法粗略地把Query分为了五种,然而实质上能够演绎为两种(第一种为有后果输入,对应show,select,create等语句;第二种为无后果输入,对应insert,use,set和与零碎相干的语句(如exit))

• QueryWithOutput
• InsertQuery
• UseQuery
• SetQuery
• SystemQuery

每一种Query都自定义了其专属的Parser,所以代码逻辑是当接管到一个Query输出的时候,会尝试各种Query的Parser,直到胜利为止。

咱们能够select语句对应的parser进行剖析:

外围逻辑能够总结为:

1.先给出select语句中可能呈现的关键词

2.在词法剖析生成的Token流中爬取这些关键词

3.如果胜利爬取,则 setExpression 函数会组装该关键字对应的AST节点

每一种SQL语句(如select,drop,insert,create)都有对应的AST类,并且别离蕴含了这些语句中特有的关键字。

bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
    //创立AST树节点
    auto select_query = std::make_shared<ASTSelectQuery>();
    node = select_query;
    
    //select语句中会呈现的关键词
    ParserKeyword s_select("SELECT");
    ParserKeyword s_distinct("DISTINCT");
    ParserKeyword s_from("FROM");
    ParserKeyword s_prewhere("PREWHERE");
    ParserKeyword s_where("WHERE");
    ParserKeyword s_group_by("GROUP BY");
    ParserKeyword s_with("WITH");
    ParserKeyword s_totals("TOTALS");
    ParserKeyword s_having("HAVING");
    ParserKeyword s_order_by("ORDER BY");
    ParserKeyword s_limit("LIMIT");
    ParserKeyword s_settings("SETTINGS");
    ParserKeyword s_by("BY");
    ParserKeyword s_rollup("ROLLUP");
    ParserKeyword s_cube("CUBE");
    ParserKeyword s_top("TOP");
    ParserKeyword s_with_ties("WITH TIES");
    ParserKeyword s_offset("OFFSET");
 
    //...
    //顺次对Token流爬取上述关键字
    ParserTablesInSelectQuery().parse(pos, tables, expected)
   
    //依据语法分析后果设置AST的Expression属性,能够了解为如果SQL存在该关键字,这个关键字都会转化为AST上的一个节点
    select_query->setExpression(ASTSelectQuery::Expression::WITH, std::move(with_expression_list));
    select_query->setExpression(ASTSelectQuery::Expression::SELECT, std::move(select_expression_list));
    select_query->setExpression(ASTSelectQuery::Expression::TABLES, std::move(tables));
    select_query->setExpression(ASTSelectQuery::Expression::PREWHERE, std::move(prewhere_expression));
    select_query->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression));
    select_query->setExpression(ASTSelectQuery::Expression::GROUP_BY, std::move(group_expression_list));
    select_query->setExpression(ASTSelectQuery::Expression::HAVING, std::move(having_expression));
    select_query->setExpression(ASTSelectQuery::Expression::ORDER_BY, std::move(order_expression_list));
    select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_OFFSET, std::move(limit_by_offset));
    select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_LENGTH, std::move(limit_by_length));
    select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY, std::move(limit_by_expression_list));
    select_query->setExpression(ASTSelectQuery::Expression::LIMIT_OFFSET, std::move(limit_offset));
    select_query->setExpression(ASTSelectQuery::Expression::LIMIT_LENGTH, std::move(limit_length));
    select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, std::move(settings));
        
}

整个Parser的流程图:

执行申请(Interpreter)

解释器(Interpreter)负责从形象语法树中创立查问执行的流水线,整条流水线以 BlockInputStreamBlockOutputStream 进行组织。比方说”select”是基于”from”的Block输入流来进行抉择的,抉择后的后果也会以Block输入流的模式输入到后果。 首先咱们来看:

dbms/src/Interpreters/InterpreterFactory.cpp

每一种Query都会有对应的Interpreter,这个工厂办法就是依据AST的品种来实例化其对应的Interpreter,由其来具体执行对应AST的执行打算:

std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context & context, QueryProcessingStage::Enum stage)
{
    //举个例子,如果该AST是由select语句转化过去,
    if (query->as<ASTSelectQuery>())
    {
        /// This is internal part of ASTSelectWithUnionQuery.
        /// Even if there is SELECT without union, it is represented by ASTSelectWithUnionQuery with single ASTSelectQuery as a child.
        return std::make_unique<InterpreterSelectQuery>(query, context, SelectQueryOptions(stage));
    }
}

咱们就以 InterpreterSelectQuery 为例,理解其实例化的外围逻辑:

InterpreterSelectQuery::InterpreterSelectQuery()
 {
     //获取AST
    auto & query = getSelectQuery();
    
    //对AST做进一步语法分析,对语法树做优化重写
    syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze(
        query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage, NamesAndTypesList());
    
    //每一种Query都会对应一个特有的表达式分析器,用于爬取AST生成执行打算(操作链)
    query_analyzer = std::make_unique<SelectQueryExpressionAnalyzer>(
        query_ptr, syntax_analyzer_result, context,
        NameSet(required_result_column_names.begin(), required_result_column_names.end()),
        options.subquery_depth, !options.only_analyze); 
 }

语法分析间接生成的AST转化成执行打算可能性能上并不是最优的,因而须要SyntaxAnalyzer 对其进行优化重写,在其源码中能够看到其波及到十分多 基规定优化(rule based optimization) 的trick。
SyntaxAnalyzer 会一一针对这些规定对查问进行查看,确定其是否满足转换规则,一旦满足就会对其进行转换。

SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze()
{
     // 剔除冗余列
     removeDuplicateColumns(result.source_columns);
     
     // 依据settings中enable_optimize_predicate_expression配置判断是否进行谓词下移
     replaceJoinedTable(node);
     
     // 依据settings中distributed_product_mode配置重写IN 与 JOIN 表达式
     InJoinSubqueriesPreprocessor(context).visit(query);
     
     // 优化Query外部的布尔表达式
     LogicalExpressionsOptimizer().perform();

     // 创立一个从别名到AST节点的映射字典 
     QueryAliasesVisitor(query_aliases_data, log.stream()).visit(query);
    
     // 公共子表达式的打消
     QueryNormalizer(normalizer_data).visit(query);
    
     // 打消select从句后的冗余列
     removeUnneededColumnsFromSelectClause(select_query, required_result_columns, remove_duplicates);
     
     // 执行标量子查问,并且用常量代替标量子查问后果
     executeScalarSubqueries(query, context, subquery_depth);

     // 如果是select语句还会做下列优化:
    
     // 谓词下移优化
     PredicateExpressionsOptimizer(select_query, settings, context).optimize();
     
     /// GROUP BY 从句的优化
     optimizeGroupBy(select_query, source_columns_set, context);
     
     /// ORDER BY 从句的冗余项剔除
     optimizeOrderBy(select_query);
     
     /// LIMIT BY 从句的冗余列剔除
     optimizeLimitBy(select_query);
     
     /// USING语句的冗余列剔除
     optimizeUsing(select_query);
   
}

这里筛选几个简略介绍一下:
• 公共子表达式打消(Common Subexpression Elimination)
如果表达式 x op y 先前被计算过,并且从先前的计算到当初其计算表达式对应的值没有扭转,那么 x op y 就称为公共子表达式。公共子表达式打消会搜寻所有雷同计算表达式的实例,并剖析是否值得用保留计算值的单个变量来替换它们,以缩小计算的开销。

• 标量子查问(Scala Subquery)的常量替换
标量子查问就是返回繁多值的子查问,和公共子表达式打消类似,能够用常量来替换SQL中所有的标量子查问后果以缩小计算开销。

• 谓词下移(Predicate Pushdown)
把外层查问块中的WHERE子句的谓词下移到较低层查问块如视图,以尽可能把过滤数据的操作挪动到凑近数据源的地位。提前进行数据过滤可能大幅缩小网络传输或者内存读取拜访的数据量,以进步查问效率。
query_analyzer 的作用能够了解为解析优化重写后的AST,而后对所要进行的操作组成一条操作链,即物理执行打算,如:

ExpressionActionsChain chain;
analyzer.appendWhere(chain);
chain.addStep();
analyzer.appendSelect(chain);
analyzer.appendOrderBy(chain);
chain.finalize();

上述代码把where,select,orderby操作都退出到操作链中,接下来就能够从Storage层读取Block,对Block数据利用上述操作链的操作。而执行的外围逻辑,就在对应Interpreter的 executeImpl 办法实现中,这里以select语句的Interpreter来理解下读取Block数据并且对block数据进行相应操作的流程。

void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input)
{
     // 对应Query的AST
     auto & query = getSelectQuery();
    
     AnalysisResult expressions;
     // 物理打算,判断表达式是否有where,aggregate,having,order_by,litmit_by等字段
     expressions = analyzeExpressions(
                getSelectQuery(),
                *query_analyzer,
                QueryProcessingStage::FetchColumns,
                options.to_stage,
                context,
                storage,
                true,
                filter_info);
    
     // 从Storage读取数据
     executeFetchColumns(from_stage, pipeline, sorting_info, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere);

     // eg:依据SQL的关键字在BlockStream流水线中执行相应的操作, 如where,aggregate,distinct都别离由一个函数负责执行
     executeWhere(pipeline, expressions.before_where, expressions.remove_where_filter);
     
     executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final);
     
     executeDistinct(pipeline, true, expressions.selected_columns);    
    
}

既然咱们晓得了执行打算AnalysisResult(即物理执行打算),接下来就须要从storage层中读取数据来执行对应的操作,外围逻辑在 executeFetchColumns 中: 外围操作就是从storage层读取所要解决列的Block,并组织成BlockStream。

void InterpreterSelectQuery::executeFetchColumns(
        QueryProcessingStage::Enum processing_stage, TPipeline & pipeline,
        const SortingInfoPtr & sorting_info, const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere)
{   
    // 实例化Block Stream
    auto streams = storage->read(required_columns, query_info, context, processing_stage, max_block_size, max_streams)
    // 读取列对应的Block,并且组织成Block Stream
    streams = {std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns))};
    streams.back() = std::make_shared<ExpressionBlockInputStream>(streams.back(), query_info.prewhere_info->remove_columns_actions); 
}

读取完Block Stream之后就是对其执行各种execute操作如 executeAggregation , executeWhere 操作,详见 InterpreterSelectQuery::executeImpl 的代码。

因而Interpreter的处理过程能够总结为:

• 对AST进行优化重写
• 解析重写后的AST并生成操作链(执行打算)
• 从存储引擎中读取要解决的Block数据
• 对读取的Block数据利用操作链上的操作

那咱们读取Block Stream并进行解决后,生成的后果如何写回到storage层呢? 咱们这里以insert语句的Interpreter来理解下:

BlockIO InterpreterInsertQuery::execute()
{
     // table为存储引擎接口
    StoragePtr table = getTable(query);
    BlockOutputStreamPtr out;
    
    // 从存储引擎读取Block Stream
    auto query_sample_block = getSampleBlock(query, table);
    out = std::make_shared<AddingDefaultBlockOutputStream>(
        out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
    
    //执行后果封装成BlockIO
    BlockIO res;
    res.out = std::move(out);    
}

下面代码中的StoragePtr实际上就是IStorage这个存储引擎的接口

using StoragePtr = std::shared_ptr<IStorage>;

无论是写入还是读取操作都是依附底层存储引擎(如MergeTree)的write和read接口来实现的,对于存储引擎的细节实现这里临时不赘述,这里咱们只须要晓得咱们从存储引擎接口中以流形式读取Block数据,而后果组织成BlockIO流输入。Interpreter的流程总结如下:

返回申请后果

TCPHandler::runImpl 中,执行完 executeQuery 之后须要调用各种processQuery的办法来给client返回执行SQL后的后果。
咱们以 TCPHandler::processOrdinaryQuery 为例做简略剖析:

void TCPHandler::processOrdinaryQuery()
{
    //把BlockStream封装成异步的Stream,那么从流中读取数据将会是异步操作
    AsynchronousBlockInputStream async_in(state.io.in);
    
    while(true){
         Block block;
         //从IO流读取block数据
         block = async_in.read();
         //发送block数据
         sendData(block);
    }
}

Server负责在 sendData 函数中把输入后果写入到套接字输入缓冲区中,client只有从这个输入缓冲区读取就可能失去后果。

void TCPHandler::sendData(const Block & block)
{
    //初始化OutputStream的参数
    initBlockOutput(block);

    // 调用BlockOutputStream的write函数,把Block写到输入流
    state.block_out->write(block);
    state.maybe_compressed_out->next();
    out->next();
}

结语

理解ClickHouse背地SQL的查问整个流程,不仅能让数据库使用者更清晰地意识到如何编写最优化的SQL,也可能让数据库内核开发者加深对数据库体系结构的了解,进步开发效率。本文并没有波及到太深刻的技术细节,诸如向量化执行引擎,SIMD,基于llvm的动静代码生成,类MergeTree存储引擎等CK的技术细节也没有提及,只是从宏观角度给读者介绍了执行SQL背地内核到底产生了什么。后续咱们会推出更多内核源码解读文章,敬请关注。

写在最初

阿里云曾经率先推出了ClickHouse的云托管产品,产品首页地址:云数据库ClickHouse,目前正在收费公测中,欢送大家点击链接申请收费试用。

原文链接
本文为阿里云原创内容,未经容许不得转载。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理