关于clickhouse:浅谈-ByteHouse-Projection-优化实践

10次阅读

共计 9856 个字符,预计需要花费 25 分钟才能阅读完成。

预聚合是 OLAP 零碎中罕用的一种优化伎俩,在通过在加载数据时就进行局部聚合计算,生成聚合后的两头表或视图,从而在查问时间接应用这些事后计算好的聚合后果,进步查问性能,实现这种预聚合办法大多都应用物化视图来实现。

Clickhouse 社区实现的 Projection 性能相似于物化视图,原始的概念来源于 Vertica,在原始表数据加载时,依据聚合 SQL 定义的表达式,计算写入数据的聚合数据与原始数据同步写入存储。在数据查问的过程中,如果查问 SQL 通过匹配剖析能够通过聚合数据计算失去,间接查问聚合数据缩小计算开销,大幅晋升查问性能。

Clickhouse Projection 是针对物化视图现有问题,在查问匹配,数据一致性上扩大了应用场景:

  • 反对 normal projection,依照不同列进行数据重排,对于不同条件疾速过滤数据
  • 反对 aggregate projection, 应用聚合查问在源表上间接定义出预聚合模型
  • 查问剖析能依据查问代价,主动抉择最优 Projection 进行查问优化,无需改写查问
  • projeciton 数据存储于原始 part 目录下,在任一时刻针对任一数据变换操作均提供一致性保障
  • 保护简略,不需另外定义新表,在原始表增加 projection 属性

ByteHouse 是火山引擎基于 ClickHouse 研发的一款剖析型数据库产品,是同时反对实时和离线导入的自助数据分析平台,可能对 PB 级海量数据进行高效剖析。具备实在时剖析、存储 - 计算拆散、多级资源隔离、云上全托管服务四大特点,为了更好的兼容社区的 projection 性能,扩大 projection 应用场景,ByteHouse 对 Projection 进行了匹配场景和架构上进行了优化。在 ByteHouse 商业客户性能测试 projection 的性能测试,在 1.2 亿条的理论生产数据集中进行测试,查问并发能力晋升 10~20 倍,上面从 projeciton 在优化器查问改写和基于 ByteHouse 框架改良两个方面谈一谈目前的优化工作。

Projection 应用

为了进步 ByteHouse 对社区有很好的兼容性,ByteHouse 保留了原有语法的反对,projection 操作分为创立,删除,物化,删除数据几个操作。为了便于了解前面的优化应用行为剖析零碎例子作为剖析的对象。

语法

-- 新增 projection 定义
ALTER TABLE [db].table ADD PROJECTION name (SELECT <COLUMN LIST EXPR> [GROUP BY] [ORDER BY] )

-- 删除 projection 定义并且删除 projection 数据 
ALTER TABLE [db].table DROP PROJECTION name 

-- 物化原表的某个 partition 数据
ALTER TABLE [db.]table MATERIALIZE PROJECTION name IN PARTITION partition_name

-- 删除 projection 数据但不删除 projection 定义
ALTER TABLE [db.]table CLEAR PROJECTION name IN PARTITION partition_name 

实例

CREATE DATABASE IF NOT EXISTS tea_data;

创立原始数据表
CREATE TABLE tea_data.events(
  app_id UInt32,
  user_id UInt64,
  event_type UInt64,
  cost UInt64,
  action_duration UInt64,
  display_time UInt64,
  event_date Date
) ENGINE = CnchMergeTree PARTITION BY toDate(event_date)
ORDER BY
  (app_id, user_id, event_type);

创立 projection 前写入 2023-05-28 分区测试数据
INSERT INTO tea_data.events
SELECT
    number / 100,
    number % 10,
    number % 3357,
    number % 166,
    number % 5,
    number % 40,
    '2023-05-28 05:11:55'
FROM system.numbers LIMIT 100000;

创立聚合 projection
ALTER TABLE tea_data.events ADD PROJECTION agg_sum_proj_1
(
SELECT
    app_id,
    user_id,
    event_date,
    sum(action_duration)
    GROUP BY app_id,
    user_id, event_date
);

创立 projection 后写入 2023-05-29 分区测试数据
INSERT INTO tea_data.events
SELECT
    number / 100,
    number % 10,
    number % 3357,
    number % 166,
    number % 5,
    number % 40,
    '2023-05-29 05:11:55'
FROM system.numbers LIMIT 100000;

Note:CnchMergeTree 是 Bytehouse 特有的引擎

Query Optimizer 扩大 Projection 改写

Bytehouse 优化器

ByteHouse 优化器为业界目前惟一的 ClickHouse 优化器计划。ByteHouse 优化器的能力简略总结如下:

  • RBO:反对:列裁剪、分区裁剪、表达式简化、子查问解关联、谓词下推、冗余算子打消、Outer-JOIN 转 INNER-JOIN、算子下推存储、分布式算子拆分等常见的启发式优化能力。
  • CBO:基于 Cascade 搜寻框架,实现了高效的 Join 枚举算法,以及基于 Histogram 的代价估算,对 10 表全连贯级别规模的 Join Reorder 问题,可能全量枚举并寻求最优解,同时针对大于 10 表规模的 Join Reorder 反对启发式枚举并寻求最优解。CBO 反对基于规定扩大搜寻空间,除了常见的 Join Reorder 问题以外,还反对 Outer-Join/Join Reorder,Magic Set Placement 等相干优化能力。
  • 分布式打算优化:面向分布式 MPP 数据库,生成分布式查问打算,并且和 CBO 联合在一起。绝对业界支流实现:分为两个阶段,首先寻求最优的单机版打算,而后将其分布式化。咱们的计划则是将这两个阶段交融在一起,在整个 CBO 寻求最优解的过程中,会联合分布式打算的诉求,从代价的角度抉择最优的分布式打算。对于 Join/Aggregate 的还反对 Partition 属性开展。
  • 高阶优化能力:实现了 Dynamic Filter pushdown、单表物化视图改写、基于代价的 CTE(公共表达式共享)。

借助 bytehouse 优化器弱小的能力,针对 projection 原有实现的几点局限性做了优化,上面咱们先来看一下社区在 projection 改写的具体实现。

社区 Projection

改写实现在非优化器执行模式下,对原始表的聚合查问可通过 aggregate projection 减速,即读取 projection 中的预聚合数据而不是原始数据。计算反对了 normal partition 和 projection partition 的混合查问,如果一个 partition 的 projection 还没物化,能够应用原始数据进行计算。
具体改写执行逻辑:

  • 打算阶段
  • 将原查问打算和已有 projection 进行匹配筛选能满足查问要求的 projection candidates;
  • 基于最小的 mark 读取数抉择最优的 projection candidate;
  • 对原查问打算中的 ActionDAG 进行改写和折叠,之后用于 projection part 数据的后续计算;
  • 将以后数据处理阶段晋升到 WithMergeableState;
  • 执行阶段
  • MergeTreeDataSelectExecutor 会将 aggregate 之前的计算进行拆分:对于 normal part,应用原查问打算进行计算;对于 projection part,应用改写后 ActionDAG 结构 QueryPipeline;
  • 将两份数据合并,用于 aggregate 之后的计算。

    Bytehouse 优化器改写实现

    优化器会将查问切分为不同的 plan segment 散发到 worker 节点并行执行,segment 之间通过 exchange 替换数据,在 plan segment 外部依据 query plan 构建 pipeline 执行,以上面简略聚合查问为例,阐明优化器如何匹配 projection。

Q1: 
SELECT   
app_id,    
user_id,    
sum(action_duration)
FROM tea_data.eventsWHERE event_date = '2023-05-29'
GROUP BY    
app_id,    
user_id

在执行打算阶段优化器尽量的将 TableScan 下层的 Partial Aggregation Step,Projection 和 Filter 下推到 TableScan 中,在将 plan segment 发送到 worker 节点后,在依据查问代价抉择适合 projection 进行匹配改写,从上面的执行打算上看,命中 projection 会在 table scan 中间接读取 AggregateFunction(sum, UInt64)的 state 数据,相比于没有命中 projection 的执行打算缩小了 AggregaingNode 的聚合运算。

  • Q1 查问打算(optimizer_projection_support=0)
  • Q1 查问打算(optimizer_projection_support=1)
混合读取 Projection

Projection 在创立之后不反对更新 schema,只能创立新的 projection,然而在一些对于 projection schema 变更需要频繁业务场景下,须要同一个查问既可能读取旧 projection 也能读取新 projection,所以在匹配时须要从 partition 维度进行匹配而不是从 projection 定义的维度进行匹配,混合读取不同 projection 的数据,这样会使查问更加灵便,更好的适应业务场景,上面举个具体的实例:

创立新的 projection
ALTER TABLE tea_data.events ADD PROJECTION agg_sum_proj_2
(
SELECT
    app_id,
    sum(action_duration),
    sum(cost)
    GROUP BY app_id
);

写入 2023-05-30 的数据
INSERT INTO tea_data.events
SELECT
    number / 10,
    number % 100,
    number % 23,
    number % 3434,
    number % 23,
    number % 55,
    '2023-05-30 04:12:43'
FROM system.numbers LIMIT 100000;

执行查问
Q2:
SELECT
    app_id,
    sum(action_duration)
FROM tea_data.events
WHERE event_date >= '2023-05-28'
GROUP BY app_id
  • Q2 执行打算
  • 依照 partition 来匹配 projection
    查问过滤条件 WHERE event_date >= ‘2023-05-28’ 会读取是三个分区的数据,并且 agg_sum_proj_1,agg_sum_proj_2 都满足 Q2 的查问条件,所以 table scan 会读取 2023-05-28 的原始数据,2023-05-29 会读取 agg_sum_proj_1 的数据,2023-05-30 因为 agg_sum_proj_2 绝对于 agg_sum_proj_1 的数据聚合度更高,读取代价较小,抉择读取 agg_sum_proj_2 的数据,混合读取不同 projection 的数据。

    原始表 Schema 更新

    当对原始表增加新字段(维度或指标 ),对应 projection 不蕴含这些字段,这时候为了利用 projection 个别状况下须要删除 projection 从新做物化,比拟浪费资源,如果优化器匹配算法能正确处理不存在缺省字段,并应用缺省值参加计算就能够解决这个问题。


ALTER TABLE tea_data.events ADD COLUMN device_id String after event_type;
ALTER TABLE tea_data.events ADD COLUMN stay_time UInt64 after device_id;

执行查问
Q3:
SELECT
    app_id,
    device_id,
    sum(action_duration),
    max(stay_time)
FROM tea_data.events
WHERE event_date >= '2023-05-28'
GROUP BY app_id,device_id
  • Q3 执行打算
  • 默认值参加计算
    从查问打算能够看出,即便 agg_sum_proj_1 和 agg_sum_proj_2 并不蕴含新增的维度字段 device_id,指标字段 stay_time, 依然能够命中原始的 partiton 的 projection,并且应用默认值来参加计算,这样能够利用旧的 projection 进行查问减速。

    Bytehouse Projection 实现

    Projection 是依照 Bytehouse 的存算拆散架构进行设计的,Projecton 数据由分布式存储对立进行治理,而针对 projection 的查问和计算则在无状态的计算节点上进行。相比于社区版,Bytehouse Projection 实现了以下劣势:

  • 对于 Projection 数据的存储节点和计算节点能够独立扩大,即能够依据不同业务对于 Projection 的应用需要,减少存储或者计算节点。
  • 当进行 Projection 查问时,能够依据不同 Projection 的数据查问量来调配计算节点的资源,从而实现资源的隔离和优化,进步查问效率。
  • Projection 的元数据存储非常轻量,在业务数据急剧变动的时候,计算节点能够做到业务无感知扩缩容,无需额定的 Projection 数据迁徙。

    Projection 数据存储

    在 Bytehouse 中,多个 projections 数据与 data 数据存储在一个共享存储文件中。文件的内部数据对 projections 外部的内容没有感知,相当于一个黑盒。当须要读取某个 projection 时,通过 checksums 外面存储的 projection 指针,定位到特定 projection 地位,实现 projection 数据解析与加载。

    Write 操作

    Projection 写入分为两局部,先在本地做数据写入,产生 part 文件存储在 worker 节点本地,而后通过 dumpAndCommitCnchParts 将数据 dump 到近程共享存储。

  • 写入本地  
    通过 writeTempPart()将 block 写入本地,当写完原始 part 后,循环通过办法 addProjectionPart()将每一个 projection 写入 part 文件夹,并增加到 new_part 中进行治理。
  • dump 到近程存储  
    dumpCnchParts()的时候,依照上述的存储格局,写入完原始 part 中的 bin 和 mark 数据后,循环将每一个 projection 文件夹中的数据写入到共享存储文件中,并记录地位和大小到 checksums,如下:
    写入 header
    写入 data
    写入 projections
    写入 Primary index
    写入 Checksums
    写入 Metainfo
    写入 Unique Key Index
    写入 data footger

    Merge 操作

    随着工夫的推移,针对同一个 partition 会存在越来越多的 parts,而 parts 越多查问过滤时的代价就会越大。因而,Bytehouse 在后盾过程中会 merge 同一个 partition 的 parts 组成更大的 part,从而缩小 part 的数量进步查问的效率。

  • 对于每一个要 merge 的 part
    对于 part 中的每一列,缓存对应的 segments 到本地
    创立 MergeTreeReaderStreamWithSegmentCache,通过近程文件 buffer 或者本地 segments 的 buffer 初始化
  • 通过 MergingSortedTransform 或 AggregatingSortedTransform 等将 sources 交融成 PipelineExecutingBlockInputStream
  • 创立 MergedBlockOutputStream
  • 对于 projection,进行如下操作
  • 建设每一个 projection 的读取流,本地缓存 buffer 或者近程文件 buffer
  • 原始表 merge 过程,对 parts 中的 projections 进行 merge
  • 通过 dumper 将新的残缺 part 存储到远端

Mutate 操作

Bytehouse 采纳 MVCC 的形式,针对 mutate 波及的列,新增一个 delta part 版本存储此次 mutate 波及到的列。相应地,咱们在 mutate 的时候,结构 projection 的 mutate 操作的 inputstream,将 mutate 后的 projection 和原始表数据一起写到同一个 delta part 中。

  • 在 MutationsInterpreter 外面,通过 InterpreterSelectQuery(mutation_ast)获取 BlockInputStream
  • projection 通过 block 和 InterpreterSelectQuery(projection.ast)从新构建

    Materialize 物化操作

    如下图所示,依据 Bytehouse 的 part 治理形式,针对 mutate 操作或新增物化操作,咱们为 part 生成新的 delta part,在下图 part 中,它所治理的三个 projections 由 base part 中的 proj2,delta part#1 中的 proj1’,以及 delta part#2 中的 proj3 独特形成。当 parts 加载实现后,delta part#2 会存储 base part 中的 proj2 的指针和 delta part#1 中的 proj1’ 指针,以及本身的 proj3 指针,对下层提供对立的拜访服务。

    Worker 端磁盘缓存

    目前,CNCH 中针对不同数据设计了不同的缓存类型

  • DiskCacheSegment:治理 bin 和 mark 数据
  • ChecksumsDiskCacheSegment:治理 checksums 数据
  • PrimaryIndexDiskCacheSegment:治理主键索引数据 BitMapIndexDiskCacheSegment:治理 bitmap 索引数据
    针对 Projection 中的数据,别离通过上述的 DiskCache,ChecksumsDiskCache 和 PrimaryIndexDiskCache 对 bin,mark,checksums 以及索引进行缓存。

另外,为了放慢 Projection 数据的加载过程,咱们新增了 MetaInfoDiskCacheSegment 用于缓存 Projection 相干的元数据信息。

理论案例剖析

某实在用户场景的数据集,咱们利用它对 Projection 性能进行了测试。
该数据粗放 1.2 亿条,蕴含 projection 约 240G 大小,测试机器 80CPU(s) / 376G Mem,配置如下:

  • SET allow_experimental_projection_optimization = 1
  • use_uncompressed_cache = true
  • max_threads = 1
  • log_level = error
  • 开启 Projection 查问并发度 80,敞开 Projection 查问并发度为 30

    测试后果

    开启 Projection 后,针对 1.2 亿条的数据集,查问性能晋升 10~20 倍。

    表构造

                                                 
CREATE TABLE user.trades(                                     
 `type` UInt8,`status` UInt64,`block_hash` String,`sequence_number` UInt64,`block_timestamp` DateTime,`transaction_hash` String,`transaction_index` UInt32,`from_address` String,`to_address` String,`value` String,`input` String,`nonce` UInt64,`contract_address` String,`gas` UInt64,`gas_price` UInt64,`gas_used` UInt64,`effective_gas_price` UInt64,`cumulative_gas_used` UInt64,`max_fee_per_gas` UInt64,`max_priority_fee_per_gas` UInt64,`r` String,`s` String,`v` UInt64,`logs_count` UInt32,PROJECTION tx_from_address_hit
  (                                         
    SELECT *                                                
    ORDER BY from_address
  ),PROJECTION tx_to_address_hit (                                           
    SELECT *                                                 
    ORDER BY to_address 
 ),PROJECTION tx_sequence_number_hit (                                      
    SELECT *                                              
    ORDER BY sequence_number 
 ),PROJECTION tx_transaction_hash_hit (                                         
    SELECT *                                                  
    ORDER BY transaction_hash 
 )                         
)
ENGINE=CnchMergeTree()
PRIMARY KEY (transaction_hash,from_address,to_address) 
ORDER BY (transaction_hash,from_address,to_address) 
PARTITION BY toDate(toStartOfMonth(`block_timestamp`));                            

开启 Projection

Q1

WITH tx AS (SELECT * FROM user.trades WHERE from_address = '0x9686cd65a0e998699faf938879fb' ORDER BY sequence_number DESC,transaction_index DESC UNION ALL SELECT * FROM user.trades WHERE to_address = '0x9686cd65a0e998699faf938879fb' ORDER BY sequence_number DESC,transaction_index DESC) SELECT * FROM tx LIMIT 100;

Q2

with tx as (select sequence_number,transaction_index,transaction_hash,input from user.trades where from_address = '0xdb03b11f5666d0e51934b43bd' order by sequence_number desc,transaction_index desc UNION ALL select sequence_number,transaction_index,transaction_hash,input from user.trades where to_address = '0xdb03b11f5666d0e51934b43bd' order by sequence_number desc,transaction_index desc) select sequence_number,transaction_hash,substring(input,1,8) as func_sign from tx order by sequence_number desc,transaction_index desc limit 100 settings max_threads = 1,allow_experimental_projection_optimization = 1,use_uncompressed_cache = true;

敞开 Projection

Q1

Q2

正文完
 0