关于javascript:ClickHouse-源码阅读-SQL的前世今生

37次阅读

共计 13944 个字符,预计需要花费 35 分钟才能阅读完成。

注:以下剖析基于开源 v19.15.2.2-stable 版本进行,社区最新版本代码改变较大,然而总体思路是不变的。

用户提交一条查问 SQL 背地产生了什么?

在传统关系型数据库中,SQL 处理器的组件次要包含以下几种:

• Query Parsing
负责进行词法和语法分析, 把程序从人类高可读的格局 (即 SQL) 转化成机器高可读的格局(AST, 形象语法树)。

词法剖析指的是把 SQL 中的字符序列分解成一个个独立的词法单元——Token(< 类型,值 >)。
语法分析指的是从词法分析器输入的 token 中辨认各类短语,并结构出一颗形象语法树。而依照结构形象语法树的方向,又能够把语法分析分成自顶向下和自底向上剖析两种。而 ClickHouse 采纳的则是手写一个递归降落的语法分析器。

• Query Rewrite
即通常咱们说的 ”Logical Optimizer” 或基于规定的优化器(Rule-Based Optimizer, 即 RBO)。

其负责利用一些启发式规定,负责简化和标准化查问,无需扭转查问的语义。

常见操作有: 谓词和算子下推,视图开展,简化常量运算表达式,谓词逻辑的重写,语义的优化等。

• Query Optimizer
即通常咱们所说的 ”Physical Optimizer”,负责把外部查问表白转化成一个高效的查问打算,领导 DBMS 如何去取表,如何进行排序,如何 Join。如下图所示,一个查问打算能够被认为是一个数据流图,在这个数据流图中,表数据会像在管道中传输一样,从一个查问操作符 (operator) 传递到另一个查问操作符。

一个查问打算

• Query Executor
查问执行器,负责执行具体的查问打算,从存储引擎中获取数据并且对数据利用查问打算失去后果。
执行引擎也分为很多种,如经典的火山模型(Volcano Model),还有 ClickHouse 采纳的向量化执行模型(Vectorization Model)。

(图来自经典论文 Architecture Of Database System)

但不论是传统的关系型数据库,还是非关系型数据库,SQL 的解析和生成执行打算过程都是大同小异的,而纵览 ClickHouse 的源代码,能够把用户提交一条查问 SQL 背地的过程总结如下:

1. 服务端接管客户端发来的 SQL 申请,具体模式是一个网络包,Server 的协定层须要拆包把 SQL 解析进去

2.Server 负责初始化上下文与 Network Handler,而后 Parser 对 Query 做词法和语法分析,解析成 AST

3.Interpreter 的 SyntaxAnalyzer 会利用一些启发式规定对 AST 进行优化重写

4.Interpreter 的 ExpressionAnalyzer 依据上下文信息以及优化重写后的 AST 生成物理执行打算

5. 物理执行打算散发到本地或者分布式的 executor, 各自从存储引擎中获取数据, 利用执行打算

6.Server 把执行后的后果以 Block 流的模式输入到 Socket 缓冲区,Client 从 Socket 中读取即可失去后果

接管客户端申请

咱们要以服务端的视角来登程,首先来看 server.cpp 大略做什么事件:

上面只筛选重要的逻辑:

• 初始化上下文

• 初始化 Zookeeper(ClickHouse 的正本复制机制须要依赖 ZooKeeper)

• 惯例配置初始化

• 绑定服务端的端口,依据网络协议初始化 Handler,对客户端提供服务

int Server::main()
{
    // 初始化上下文
    global_context = std::make_unique<Context>(Context::createGlobal());
    global_context->setApplicationType(Context::ApplicationType::SERVER);
     
    // zk 初始化
    zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] {return global_context->getZooKeeper(); });
    
    // 其余 config 的初始化
    //...
    
    // 绑定端口, 对外提供服务
    auto address = make_socket_address(host, port);
    socket.bind(address, /* reuseAddress = */ true);

    // 依据网络协议建设不同的 server 类型
    // 当初反对的 server 类型有:HTTP,HTTPS,TCP,Interserver,mysql
    // 以 TCP 版本为例:
    create_server("tcp_port", [&](UInt16 port)
    {
        Poco::Net::ServerSocket socket;
        auto address = socket_bind_listen(socket, listen_host, port);
        servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(new TCPHandlerFactory(*this),
            server_pool,
            socket,
            new Poco::Net::TCPServerParams));
     });
    
    // 启动 server
    for (auto & server : servers)
            server->start();}

客户端发来的申请是由各自网络协议所对应的 Handler 来进行的,server 在启动的时候 Handler 会被初始化并绑定在指定端口中。咱们以 TCPHandler 为例,看看服务端是如何解决客户端发来的申请的,重点关注 TCPHandler::runImpl 的函数实现:

• 初始化输出和输入流的缓冲区

• 承受申请报文,拆包

• 执行 Query(包含整个词法语法分析,Query 重写,物理打算生成和生成后果)

• 把 Query 后果保留到输入流,而后发送到 Socket 的缓冲区,期待发送回客户端

void TCPHandler::runImpl()
{
    // 实例化套接字对应的输出和输入流缓冲区
    in = std::make_shared<ReadBufferFromPocoSocket>(socket());
    out = std::make_shared<WriteBufferFromPocoSocket>(socket());
    
    while (1){
        // 接管申请报文
        receivePacket();
        
        // 执行 Query    
        state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);
    
        // 依据 Query 品种来解决不同的 Query
        // 解决 insert Query
        processInsertQuery();
        // 并发解决一般 Query
        processOrdinaryQueryWithProcessors();
        // 单线程解决一般 Query
        processOrdinaryQuery();}
    
}

那 CK 解决客户端发送过去的 Query 的具体逻辑是怎么的呢?

咱们能够在dbms/src/Interpreters/executeQuery.cpp 中一探到底:

具体逻辑在 executeQueryImpl 函数中, 筛选外围的逻辑进行解说:

static std::tuple<ASTPtr, BlockIO> executeQueryImpl()
{
    // 结构 Parser
    ParserQuery parser(end, settings.enable_debug_queries);
    ASTPtr ast;

    // 把 Query 转化为形象语法树
    ast = parseQuery(parser, begin, end, "", max_query_size);

    // 生成 interpreter 实例
    auto interpreter = InterpreterFactory::get(ast, context, stage);

    // interpreter 解析 AST, 后果是 BlockIO
    res = interpreter->execute();

    // 返回后果是形象语法树和解析后的后果组成的二元组
    return std::make_tuple(ast, res);
}

该函数所做的事件:

• 构建 Parser, 把 Query 解析成 AST(形象语法树)

• InterpreterFactory 依据 AST 生成对应的 Interpreter 实例

• AST 是由 Interpreter 来解析的,执行后果是一个 BlockIO,BlockIO 是对 BlockInputStreamBlockOutputStream的一个封装。

总结:
• 服务端调用 executeQuery 来解决 client 发送的 Query,执行后的后果保留在 state 这个构造体的 io 成员中。

每一条 Query 都会对应一个 state 构造体,记录了这条 Query 的 id,解决状态,压缩算法,Query 的文本和 Query 所解决数据对应的 IO 流等元信息。

• 而后服务端调用 processOrdinaryQuery 等办法把输入流后果封装成异步的 IO 流,发送到回 client。

解析申请(Parser)

CK 抉择采纳手写一个递归降落的 Parser 来对 SQL 进行解析,生成的后果是这个 SQL 对应的形象语法树 (AST), 形象语法树由示意各个操作的节点(IAST) 示意。而本节次要介绍 Parser 背地的外围逻辑:

词法剖析和语法分析的外围逻辑能够在 parseQuery.cpp 的 tryParseQuery 中和盘托出。

该函数利用 lexer 将扫描 Query 字符流,将其宰割为一个个的 Token,token_iterator 即一个 Token 流迭代器,而后 parser 再对 Token 流进行解析生成 AST 形象语法树。

ASTPtr tryParseQuery()
{
    //Token 为 lexer 词法剖析后的根本单位, 词法剖析后生成的是 Token 流
    Tokens tokens(pos, end, max_query_size);
    IParser::Pos token_iterator(tokens);
    ASTPtr res;
    //Token 流通过语法分析生成 AST 形象语法树
    bool parse_res = parser.parse(token_iterator, res, expected);
    return res;

}

咱们能够看到, 语法分析的外围就在于 parser 执行的 parse 办法。parse 办法具体的实现在 ParserQuery.cppparseImpl 中。

bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{ParserQueryWithOutput query_with_output_p(enable_explain);
    ParserInsertQuery insert_p(end);
    ParserUseQuery use_p;
    ParserSetQuery set_p;
    ParserSystemQuery system_p;

    bool res = query_with_output_p.parse(pos, node, expected)
        || insert_p.parse(pos, node, expected)
        || use_p.parse(pos, node, expected)
        || set_p.parse(pos, node, expected)
        || system_p.parse(pos, node, expected);

    return res;
}

咱们能够看到, 这个办法粗略地把 Query 分为了五种, 然而实质上能够演绎为两种(第一种为有后果输入, 对应 show,select,create 等语句; 第二种为无后果输入, 对应 insert,use,set 和与零碎相干的语句(如 exit))

• QueryWithOutput
• InsertQuery
• UseQuery
• SetQuery
• SystemQuery

每一种 Query 都自定义了其专属的 Parser, 所以代码逻辑是当接管到一个 Query 输出的时候,会尝试各种 Query 的 Parser,直到胜利为止。

咱们能够 select 语句对应的 parser 进行剖析:

外围逻辑能够总结为:

1. 先给出 select 语句中可能呈现的关键词

2. 在词法剖析生成的 Token 流中爬取这些关键词

3. 如果胜利爬取,则 setExpression 函数会组装该关键字对应的 AST 节点

每一种 SQL 语句 (如 select,drop,insert,create) 都有对应的 AST 类,并且别离蕴含了这些语句中特有的关键字。

bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
    // 创立 AST 树节点
    auto select_query = std::make_shared<ASTSelectQuery>();
    node = select_query;
    
    //select 语句中会呈现的关键词
    ParserKeyword s_select("SELECT");
    ParserKeyword s_distinct("DISTINCT");
    ParserKeyword s_from("FROM");
    ParserKeyword s_prewhere("PREWHERE");
    ParserKeyword s_where("WHERE");
    ParserKeyword s_group_by("GROUP BY");
    ParserKeyword s_with("WITH");
    ParserKeyword s_totals("TOTALS");
    ParserKeyword s_having("HAVING");
    ParserKeyword s_order_by("ORDER BY");
    ParserKeyword s_limit("LIMIT");
    ParserKeyword s_settings("SETTINGS");
    ParserKeyword s_by("BY");
    ParserKeyword s_rollup("ROLLUP");
    ParserKeyword s_cube("CUBE");
    ParserKeyword s_top("TOP");
    ParserKeyword s_with_ties("WITH TIES");
    ParserKeyword s_offset("OFFSET");
 
    //...
    // 顺次对 Token 流爬取上述关键字
    ParserTablesInSelectQuery().parse(pos, tables, expected)
   
    // 依据语法分析后果设置 AST 的 Expression 属性, 能够了解为如果 SQL 存在该关键字, 这个关键字都会转化为 AST 上的一个节点
    select_query->setExpression(ASTSelectQuery::Expression::WITH, std::move(with_expression_list));
    select_query->setExpression(ASTSelectQuery::Expression::SELECT, std::move(select_expression_list));
    select_query->setExpression(ASTSelectQuery::Expression::TABLES, std::move(tables));
    select_query->setExpression(ASTSelectQuery::Expression::PREWHERE, std::move(prewhere_expression));
    select_query->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression));
    select_query->setExpression(ASTSelectQuery::Expression::GROUP_BY, std::move(group_expression_list));
    select_query->setExpression(ASTSelectQuery::Expression::HAVING, std::move(having_expression));
    select_query->setExpression(ASTSelectQuery::Expression::ORDER_BY, std::move(order_expression_list));
    select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_OFFSET, std::move(limit_by_offset));
    select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_LENGTH, std::move(limit_by_length));
    select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY, std::move(limit_by_expression_list));
    select_query->setExpression(ASTSelectQuery::Expression::LIMIT_OFFSET, std::move(limit_offset));
    select_query->setExpression(ASTSelectQuery::Expression::LIMIT_LENGTH, std::move(limit_length));
    select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, std::move(settings));
        
}

整个 Parser 的流程图:

执行申请(Interpreter)

解释器 (Interpreter) 负责从形象语法树中创立查问执行的流水线,整条流水线以 BlockInputStreamBlockOutputStream 进行组织。比方说 ”select” 是基于 ”from” 的 Block 输入流来进行抉择的,抉择后的后果也会以 Block 输入流的模式输入到后果。首先咱们来看:

dbms/src/Interpreters/InterpreterFactory.cpp

每一种 Query 都会有对应的 Interpreter,这个工厂办法就是依据 AST 的品种来实例化其对应的 Interpreter, 由其来具体执行对应 AST 的执行打算:

std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context & context, QueryProcessingStage::Enum stage)
{
    // 举个例子, 如果该 AST 是由 select 语句转化过去,
    if (query->as<ASTSelectQuery>())
    {
        /// This is internal part of ASTSelectWithUnionQuery.
        /// Even if there is SELECT without union, it is represented by ASTSelectWithUnionQuery with single ASTSelectQuery as a child.
        return std::make_unique<InterpreterSelectQuery>(query, context, SelectQueryOptions(stage));
    }
}

咱们就以 InterpreterSelectQuery 为例,理解其实例化的外围逻辑:

InterpreterSelectQuery::InterpreterSelectQuery()
 {
     // 获取 AST
    auto & query = getSelectQuery();
    
    // 对 AST 做进一步语法分析,对语法树做优化重写
    syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze(query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage, NamesAndTypesList());
    
    // 每一种 Query 都会对应一个特有的表达式分析器, 用于爬取 AST 生成执行打算(操作链)
    query_analyzer = std::make_unique<SelectQueryExpressionAnalyzer>(
        query_ptr, syntax_analyzer_result, context,
        NameSet(required_result_column_names.begin(), required_result_column_names.end()),
        options.subquery_depth, !options.only_analyze); 
 }

语法分析间接生成的 AST 转化成执行打算可能性能上并不是最优的,因而须要 SyntaxAnalyzer 对其进行优化重写,在其源码中能够看到其波及到十分多 基规定优化(rule based optimization) 的 trick。
SyntaxAnalyzer 会一一针对这些规定对查问进行查看,确定其是否满足转换规则,一旦满足就会对其进行转换。

SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze()
{
     // 剔除冗余列
     removeDuplicateColumns(result.source_columns);
     
     // 依据 settings 中 enable_optimize_predicate_expression 配置判断是否进行谓词下移
     replaceJoinedTable(node);
     
     // 依据 settings 中 distributed_product_mode 配置重写 IN 与 JOIN 表达式
     InJoinSubqueriesPreprocessor(context).visit(query);
     
     // 优化 Query 外部的布尔表达式
     LogicalExpressionsOptimizer().perform();

     // 创立一个从别名到 AST 节点的映射字典 
     QueryAliasesVisitor(query_aliases_data, log.stream()).visit(query);
    
     // 公共子表达式的打消
     QueryNormalizer(normalizer_data).visit(query);
    
     // 打消 select 从句后的冗余列
     removeUnneededColumnsFromSelectClause(select_query, required_result_columns, remove_duplicates);
     
     // 执行标量子查问,并且用常量代替标量子查问后果
     executeScalarSubqueries(query, context, subquery_depth);

     // 如果是 select 语句还会做下列优化:
    
     // 谓词下移优化
     PredicateExpressionsOptimizer(select_query, settings, context).optimize();
     
     /// GROUP BY 从句的优化
     optimizeGroupBy(select_query, source_columns_set, context);
     
     /// ORDER BY 从句的冗余项剔除
     optimizeOrderBy(select_query);
     
     /// LIMIT BY 从句的冗余列剔除
     optimizeLimitBy(select_query);
     
     /// USING 语句的冗余列剔除
     optimizeUsing(select_query);
   
}

这里筛选几个简略介绍一下:
• 公共子表达式打消 (Common Subexpression Elimination)
如果表达式 x op y 先前被计算过,并且从先前的计算到当初其计算表达式对应的值没有扭转,那么 x op y 就称为公共子表达式。公共子表达式打消会搜寻所有雷同计算表达式的实例,并剖析是否值得用保留计算值的单个变量来替换它们,以缩小计算的开销。

• 标量子查问 (Scala Subquery) 的常量替换
标量子查问就是返回繁多值的子查问,和公共子表达式打消类似,能够用常量来替换 SQL 中所有的标量子查问后果以缩小计算开销。

• 谓词下移 (Predicate Pushdown)
把外层查问块中的 WHERE 子句的谓词下移到较低层查问块如视图,以尽可能把过滤数据的操作挪动到凑近数据源的地位。提前进行数据过滤可能大幅缩小网络传输或者内存读取拜访的数据量,以进步查问效率。
query_analyzer 的作用能够了解为解析优化重写后的 AST,而后对所要进行的操作组成一条操作链,即物理执行打算,如:

ExpressionActionsChain chain;
analyzer.appendWhere(chain);
chain.addStep();
analyzer.appendSelect(chain);
analyzer.appendOrderBy(chain);
chain.finalize();

上述代码把 where,select,orderby 操作都退出到操作链中,接下来就能够从 Storage 层读取 Block,对 Block 数据利用上述操作链的操作。而执行的外围逻辑,就在对应 Interpreter 的 executeImpl 办法实现中, 这里以 select 语句的 Interpreter 来理解下读取 Block 数据并且对 block 数据进行相应操作的流程。

void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input)
{
     // 对应 Query 的 AST
     auto & query = getSelectQuery();
    
     AnalysisResult expressions;
     // 物理打算,判断表达式是否有 where,aggregate,having,order_by,litmit_by 等字段
     expressions = analyzeExpressions(getSelectQuery(),
                *query_analyzer,
                QueryProcessingStage::FetchColumns,
                options.to_stage,
                context,
                storage,
                true,
                filter_info);
    
     // 从 Storage 读取数据
     executeFetchColumns(from_stage, pipeline, sorting_info, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere);

     // eg: 依据 SQL 的关键字在 BlockStream 流水线中执行相应的操作, 如 where,aggregate,distinct 都别离由一个函数负责执行
     executeWhere(pipeline, expressions.before_where, expressions.remove_where_filter);
     
     executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final);
     
     executeDistinct(pipeline, true, expressions.selected_columns);    
    
}

既然咱们晓得了执行打算 AnalysisResult(即物理执行打算),接下来就须要从 storage 层中读取数据来执行对应的操作,外围逻辑在 executeFetchColumns 中: 外围操作就是从 storage 层读取所要解决列的 Block,并组织成 BlockStream。

void InterpreterSelectQuery::executeFetchColumns(
        QueryProcessingStage::Enum processing_stage, TPipeline & pipeline,
        const SortingInfoPtr & sorting_info, const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere)
{   
    // 实例化 Block Stream
    auto streams = storage->read(required_columns, query_info, context, processing_stage, max_block_size, max_streams)
    // 读取列对应的 Block, 并且组织成 Block Stream
    streams = {std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns))};
    streams.back() = std::make_shared<ExpressionBlockInputStream>(streams.back(), query_info.prewhere_info->remove_columns_actions); 
}

读取完 Block Stream 之后就是对其执行各种 execute 操作如 executeAggregation , executeWhere 操作,详见 InterpreterSelectQuery::executeImpl 的代码。

因而 Interpreter 的处理过程能够总结为:

• 对 AST 进行优化重写
• 解析重写后的 AST 并生成操作链(执行打算)
• 从存储引擎中读取要解决的 Block 数据
• 对读取的 Block 数据利用操作链上的操作

那咱们读取 Block Stream 并进行解决后,生成的后果如何写回到 storage 层呢? 咱们这里以 insert 语句的 Interpreter 来理解下:

BlockIO InterpreterInsertQuery::execute()
{
     // table 为存储引擎接口
    StoragePtr table = getTable(query);
    BlockOutputStreamPtr out;
    
    // 从存储引擎读取 Block Stream
    auto query_sample_block = getSampleBlock(query, table);
    out = std::make_shared<AddingDefaultBlockOutputStream>(out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
    
    // 执行后果封装成 BlockIO
    BlockIO res;
    res.out = std::move(out);    
}

下面代码中的 StoragePtr 实际上就是 IStorage 这个存储引擎的接口

using StoragePtr = std::shared_ptr<IStorage>;

无论是写入还是读取操作都是依附底层存储引擎 (如 MergeTree) 的 write 和 read 接口来实现的,对于存储引擎的细节实现这里临时不赘述,这里咱们只须要晓得咱们从存储引擎接口中以流形式读取 Block 数据,而后果组织成 BlockIO 流输入。Interpreter 的流程总结如下:

返回申请后果

TCPHandler::runImpl 中,执行完 executeQuery 之后须要调用各种 processQuery 的办法来给 client 返回执行 SQL 后的后果。
咱们以 TCPHandler::processOrdinaryQuery 为例做简略剖析:

void TCPHandler::processOrdinaryQuery()
{
    // 把 BlockStream 封装成异步的 Stream, 那么从流中读取数据将会是异步操作
    AsynchronousBlockInputStream async_in(state.io.in);
    
    while(true){
         Block block;
         // 从 IO 流读取 block 数据
         block = async_in.read();
         // 发送 block 数据
         sendData(block);
    }
}

Server 负责在 sendData 函数中把输入后果写入到套接字输入缓冲区中,client 只有从这个输入缓冲区读取就可能失去后果。

void TCPHandler::sendData(const Block & block)
{
    // 初始化 OutputStream 的参数
    initBlockOutput(block);

    // 调用 BlockOutputStream 的 write 函数, 把 Block 写到输入流
    state.block_out->write(block);
    state.maybe_compressed_out->next();
    out->next();}

结语

理解 ClickHouse 背地 SQL 的查问整个流程,不仅能让数据库使用者更清晰地意识到如何编写最优化的 SQL,也可能让数据库内核开发者加深对数据库体系结构的了解,进步开发效率。本文并没有波及到太深刻的技术细节,诸如向量化执行引擎,SIMD,基于 llvm 的动静代码生成,类 MergeTree 存储引擎等 CK 的技术细节也没有提及,只是从宏观角度给读者介绍了执行 SQL 背地内核到底产生了什么。后续咱们会推出更多内核源码解读文章,敬请关注。

写在最初

阿里云曾经率先推出了 ClickHouse 的云托管产品,产品首页地址:云数据库 ClickHouse,目前正在收费公测中,欢送大家点击链接申请收费试用。

原文链接
本文为阿里云原创内容,未经容许不得转载。

正文完
 0