预聚合是 OLAP 零碎中罕用的一种优化伎俩,在通过在加载数据时就进行局部聚合计算,生成聚合后的两头表或视图,从而在查问时间接应用这些事后计算好的聚合后果,进步查问性能,实现这种预聚合办法大多都应用物化视图来实现。
Clickhouse 社区实现的 Projection 性能相似于物化视图,原始的概念来源于 Vertica,在原始表数据加载时,依据聚合 SQL 定义的表达式,计算写入数据的聚合数据与原始数据同步写入存储。在数据查问的过程中,如果查问 SQL 通过匹配剖析能够通过聚合数据计算失去,间接查问聚合数据缩小计算开销,大幅晋升查问性能。
Clickhouse Projection 是针对物化视图现有问题,在查问匹配,数据一致性上扩大了应用场景:
- 反对 normal projection,依照不同列进行数据重排,对于不同条件疾速过滤数据
- 反对 aggregate projection, 应用聚合查问在源表上间接定义出预聚合模型
- 查问剖析能依据查问代价,主动抉择最优 Projection 进行查问优化,无需改写查问
- projeciton 数据存储于原始 part 目录下,在任一时刻针对任一数据变换操作均提供一致性保障
- 保护简略,不需另外定义新表,在原始表增加 projection 属性
ByteHouse 是火山引擎基于 ClickHouse 研发的一款剖析型数据库产品,是同时反对实时和离线导入的自助数据分析平台,可能对 PB 级海量数据进行高效剖析。具备实在时剖析、存储 - 计算拆散、多级资源隔离、云上全托管服务四大特点,为了更好的兼容社区的 projection 性能,扩大 projection 应用场景,ByteHouse 对 Projection 进行了匹配场景和架构上进行了优化。在 ByteHouse 商业客户性能测试 projection 的性能测试,在 1.2 亿条的理论生产数据集中进行测试,查问并发能力晋升 10~20 倍,上面从 projeciton 在优化器查问改写和基于 ByteHouse 框架改良两个方面谈一谈目前的优化工作。
Projection 应用
为了进步 ByteHouse 对社区有很好的兼容性,ByteHouse 保留了原有语法的反对,projection 操作分为创立,删除,物化,删除数据几个操作。为了便于了解前面的优化应用行为剖析零碎例子作为剖析的对象。
语法
-- 新增 projection 定义
ALTER TABLE [db].table ADD PROJECTION name (SELECT <COLUMN LIST EXPR> [GROUP BY] [ORDER BY] )
-- 删除 projection 定义并且删除 projection 数据
ALTER TABLE [db].table DROP PROJECTION name
-- 物化原表的某个 partition 数据
ALTER TABLE [db.]table MATERIALIZE PROJECTION name IN PARTITION partition_name
-- 删除 projection 数据但不删除 projection 定义
ALTER TABLE [db.]table CLEAR PROJECTION name IN PARTITION partition_name
实例
CREATE DATABASE IF NOT EXISTS tea_data;
创立原始数据表
CREATE TABLE tea_data.events(
app_id UInt32,
user_id UInt64,
event_type UInt64,
cost UInt64,
action_duration UInt64,
display_time UInt64,
event_date Date
) ENGINE = CnchMergeTree PARTITION BY toDate(event_date)
ORDER BY
(app_id, user_id, event_type);
创立 projection 前写入 2023-05-28 分区测试数据
INSERT INTO tea_data.events
SELECT
number / 100,
number % 10,
number % 3357,
number % 166,
number % 5,
number % 40,
'2023-05-28 05:11:55'
FROM system.numbers LIMIT 100000;
创立聚合 projection
ALTER TABLE tea_data.events ADD PROJECTION agg_sum_proj_1
(
SELECT
app_id,
user_id,
event_date,
sum(action_duration)
GROUP BY app_id,
user_id, event_date
);
创立 projection 后写入 2023-05-29 分区测试数据
INSERT INTO tea_data.events
SELECT
number / 100,
number % 10,
number % 3357,
number % 166,
number % 5,
number % 40,
'2023-05-29 05:11:55'
FROM system.numbers LIMIT 100000;
Note:CnchMergeTree 是 Bytehouse 特有的引擎
Query Optimizer 扩大 Projection 改写
Bytehouse 优化器
ByteHouse 优化器为业界目前惟一的 ClickHouse 优化器计划。ByteHouse 优化器的能力简略总结如下:
- RBO:反对:列裁剪、分区裁剪、表达式简化、子查问解关联、谓词下推、冗余算子打消、Outer-JOIN 转 INNER-JOIN、算子下推存储、分布式算子拆分等常见的启发式优化能力。
- CBO:基于 Cascade 搜寻框架,实现了高效的 Join 枚举算法,以及基于 Histogram 的代价估算,对 10 表全连贯级别规模的 Join Reorder 问题,可能全量枚举并寻求最优解,同时针对大于 10 表规模的 Join Reorder 反对启发式枚举并寻求最优解。CBO 反对基于规定扩大搜寻空间,除了常见的 Join Reorder 问题以外,还反对 Outer-Join/Join Reorder,Magic Set Placement 等相干优化能力。
- 分布式打算优化:面向分布式 MPP 数据库,生成分布式查问打算,并且和 CBO 联合在一起。绝对业界支流实现:分为两个阶段,首先寻求最优的单机版打算,而后将其分布式化。咱们的计划则是将这两个阶段交融在一起,在整个 CBO 寻求最优解的过程中,会联合分布式打算的诉求,从代价的角度抉择最优的分布式打算。对于 Join/Aggregate 的还反对 Partition 属性开展。
- 高阶优化能力:实现了 Dynamic Filter pushdown、单表物化视图改写、基于代价的 CTE(公共表达式共享)。
借助 bytehouse 优化器弱小的能力,针对 projection 原有实现的几点局限性做了优化,上面咱们先来看一下社区在 projection 改写的具体实现。
社区 Projection
改写实现在非优化器执行模式下,对原始表的聚合查问可通过 aggregate projection 减速,即读取 projection 中的预聚合数据而不是原始数据。计算反对了 normal partition 和 projection partition 的混合查问,如果一个 partition 的 projection 还没物化,能够应用原始数据进行计算。
具体改写执行逻辑:
- 打算阶段
- 将原查问打算和已有 projection 进行匹配筛选能满足查问要求的 projection candidates;
- 基于最小的 mark 读取数抉择最优的 projection candidate;
- 对原查问打算中的 ActionDAG 进行改写和折叠,之后用于 projection part 数据的后续计算;
- 将以后数据处理阶段晋升到 WithMergeableState;
- 执行阶段
- MergeTreeDataSelectExecutor 会将 aggregate 之前的计算进行拆分:对于 normal part,应用原查问打算进行计算;对于 projection part,应用改写后 ActionDAG 结构 QueryPipeline;
-
将两份数据合并,用于 aggregate 之后的计算。
Bytehouse 优化器改写实现
优化器会将查问切分为不同的 plan segment 散发到 worker 节点并行执行,segment 之间通过 exchange 替换数据,在 plan segment 外部依据 query plan 构建 pipeline 执行,以上面简略聚合查问为例,阐明优化器如何匹配 projection。
Q1:
SELECT
app_id,
user_id,
sum(action_duration)
FROM tea_data.eventsWHERE event_date = '2023-05-29'
GROUP BY
app_id,
user_id
在执行打算阶段优化器尽量的将 TableScan 下层的 Partial Aggregation Step,Projection 和 Filter 下推到 TableScan 中,在将 plan segment 发送到 worker 节点后,在依据查问代价抉择适合 projection 进行匹配改写,从上面的执行打算上看,命中 projection 会在 table scan 中间接读取 AggregateFunction(sum, UInt64)的 state 数据,相比于没有命中 projection 的执行打算缩小了 AggregaingNode 的聚合运算。
- Q1 查问打算(optimizer_projection_support=0)
- Q1 查问打算(optimizer_projection_support=1)
混合读取 Projection
Projection 在创立之后不反对更新 schema,只能创立新的 projection,然而在一些对于 projection schema 变更需要频繁业务场景下,须要同一个查问既可能读取旧 projection 也能读取新 projection,所以在匹配时须要从 partition 维度进行匹配而不是从 projection 定义的维度进行匹配,混合读取不同 projection 的数据,这样会使查问更加灵便,更好的适应业务场景,上面举个具体的实例:
创立新的 projection
ALTER TABLE tea_data.events ADD PROJECTION agg_sum_proj_2
(
SELECT
app_id,
sum(action_duration),
sum(cost)
GROUP BY app_id
);
写入 2023-05-30 的数据
INSERT INTO tea_data.events
SELECT
number / 10,
number % 100,
number % 23,
number % 3434,
number % 23,
number % 55,
'2023-05-30 04:12:43'
FROM system.numbers LIMIT 100000;
执行查问
Q2:
SELECT
app_id,
sum(action_duration)
FROM tea_data.events
WHERE event_date >= '2023-05-28'
GROUP BY app_id
- Q2 执行打算
-
依照 partition 来匹配 projection
查问过滤条件 WHERE event_date >= ‘2023-05-28’ 会读取是三个分区的数据,并且 agg_sum_proj_1,agg_sum_proj_2 都满足 Q2 的查问条件,所以 table scan 会读取 2023-05-28 的原始数据,2023-05-29 会读取 agg_sum_proj_1 的数据,2023-05-30 因为 agg_sum_proj_2 绝对于 agg_sum_proj_1 的数据聚合度更高,读取代价较小,抉择读取 agg_sum_proj_2 的数据,混合读取不同 projection 的数据。原始表 Schema 更新
当对原始表增加新字段(维度或指标 ),对应 projection 不蕴含这些字段,这时候为了利用 projection 个别状况下须要删除 projection 从新做物化,比拟浪费资源,如果优化器匹配算法能正确处理不存在缺省字段,并应用缺省值参加计算就能够解决这个问题。
ALTER TABLE tea_data.events ADD COLUMN device_id String after event_type;
ALTER TABLE tea_data.events ADD COLUMN stay_time UInt64 after device_id;
执行查问
Q3:
SELECT
app_id,
device_id,
sum(action_duration),
max(stay_time)
FROM tea_data.events
WHERE event_date >= '2023-05-28'
GROUP BY app_id,device_id
- Q3 执行打算
-
默认值参加计算
从查问打算能够看出,即便 agg_sum_proj_1 和 agg_sum_proj_2 并不蕴含新增的维度字段 device_id,指标字段 stay_time, 依然能够命中原始的 partiton 的 projection,并且应用默认值来参加计算,这样能够利用旧的 projection 进行查问减速。Bytehouse Projection 实现
Projection 是依照 Bytehouse 的存算拆散架构进行设计的,Projecton 数据由分布式存储对立进行治理,而针对 projection 的查问和计算则在无状态的计算节点上进行。相比于社区版,Bytehouse Projection 实现了以下劣势:
- 对于 Projection 数据的存储节点和计算节点能够独立扩大,即能够依据不同业务对于 Projection 的应用需要,减少存储或者计算节点。
- 当进行 Projection 查问时,能够依据不同 Projection 的数据查问量来调配计算节点的资源,从而实现资源的隔离和优化,进步查问效率。
-
Projection 的元数据存储非常轻量,在业务数据急剧变动的时候,计算节点能够做到业务无感知扩缩容,无需额定的 Projection 数据迁徙。
Projection 数据存储
在 Bytehouse 中,多个 projections 数据与 data 数据存储在一个共享存储文件中。文件的内部数据对 projections 外部的内容没有感知,相当于一个黑盒。当须要读取某个 projection 时,通过 checksums 外面存储的 projection 指针,定位到特定 projection 地位,实现 projection 数据解析与加载。
Write 操作
Projection 写入分为两局部,先在本地做数据写入,产生 part 文件存储在 worker 节点本地,而后通过 dumpAndCommitCnchParts 将数据 dump 到近程共享存储。
- 写入本地
通过 writeTempPart()将 block 写入本地,当写完原始 part 后,循环通过办法 addProjectionPart()将每一个 projection 写入 part 文件夹,并增加到 new_part 中进行治理。 -
dump 到近程存储
dumpCnchParts()的时候,依照上述的存储格局,写入完原始 part 中的 bin 和 mark 数据后,循环将每一个 projection 文件夹中的数据写入到共享存储文件中,并记录地位和大小到 checksums,如下:
写入 header
写入 data
写入 projections
写入 Primary index
写入 Checksums
写入 Metainfo
写入 Unique Key Index
写入 data footgerMerge 操作
随着工夫的推移,针对同一个 partition 会存在越来越多的 parts,而 parts 越多查问过滤时的代价就会越大。因而,Bytehouse 在后盾过程中会 merge 同一个 partition 的 parts 组成更大的 part,从而缩小 part 的数量进步查问的效率。
- 对于每一个要 merge 的 part
对于 part 中的每一列,缓存对应的 segments 到本地
创立 MergeTreeReaderStreamWithSegmentCache,通过近程文件 buffer 或者本地 segments 的 buffer 初始化 - 通过 MergingSortedTransform 或 AggregatingSortedTransform 等将 sources 交融成 PipelineExecutingBlockInputStream
- 创立 MergedBlockOutputStream
- 对于 projection,进行如下操作
- 建设每一个 projection 的读取流,本地缓存 buffer 或者近程文件 buffer
- 原始表 merge 过程,对 parts 中的 projections 进行 merge
- 通过 dumper 将新的残缺 part 存储到远端
Mutate 操作
Bytehouse 采纳 MVCC 的形式,针对 mutate 波及的列,新增一个 delta part 版本存储此次 mutate 波及到的列。相应地,咱们在 mutate 的时候,结构 projection 的 mutate 操作的 inputstream,将 mutate 后的 projection 和原始表数据一起写到同一个 delta part 中。
- 在 MutationsInterpreter 外面,通过 InterpreterSelectQuery(mutation_ast)获取 BlockInputStream
-
projection 通过 block 和 InterpreterSelectQuery(projection.ast)从新构建
Materialize 物化操作
如下图所示,依据 Bytehouse 的 part 治理形式,针对 mutate 操作或新增物化操作,咱们为 part 生成新的 delta part,在下图 part 中,它所治理的三个 projections 由 base part 中的 proj2,delta part#1 中的 proj1’,以及 delta part#2 中的 proj3 独特形成。当 parts 加载实现后,delta part#2 会存储 base part 中的 proj2 的指针和 delta part#1 中的 proj1’ 指针,以及本身的 proj3 指针,对下层提供对立的拜访服务。
Worker 端磁盘缓存
目前,CNCH 中针对不同数据设计了不同的缓存类型
- DiskCacheSegment:治理 bin 和 mark 数据
- ChecksumsDiskCacheSegment:治理 checksums 数据
- PrimaryIndexDiskCacheSegment:治理主键索引数据 BitMapIndexDiskCacheSegment:治理 bitmap 索引数据
针对 Projection 中的数据,别离通过上述的 DiskCache,ChecksumsDiskCache 和 PrimaryIndexDiskCache 对 bin,mark,checksums 以及索引进行缓存。
另外,为了放慢 Projection 数据的加载过程,咱们新增了 MetaInfoDiskCacheSegment 用于缓存 Projection 相干的元数据信息。
理论案例剖析
某实在用户场景的数据集,咱们利用它对 Projection 性能进行了测试。
该数据粗放 1.2 亿条,蕴含 projection 约 240G 大小,测试机器 80CPU(s) / 376G Mem,配置如下:
- SET allow_experimental_projection_optimization = 1
- use_uncompressed_cache = true
- max_threads = 1
- log_level = error
-
开启 Projection 查问并发度 80,敞开 Projection 查问并发度为 30
测试后果
开启 Projection 后,针对 1.2 亿条的数据集,查问性能晋升 10~20 倍。
表构造
CREATE TABLE user.trades(
`type` UInt8,`status` UInt64,`block_hash` String,`sequence_number` UInt64,`block_timestamp` DateTime,`transaction_hash` String,`transaction_index` UInt32,`from_address` String,`to_address` String,`value` String,`input` String,`nonce` UInt64,`contract_address` String,`gas` UInt64,`gas_price` UInt64,`gas_used` UInt64,`effective_gas_price` UInt64,`cumulative_gas_used` UInt64,`max_fee_per_gas` UInt64,`max_priority_fee_per_gas` UInt64,`r` String,`s` String,`v` UInt64,`logs_count` UInt32,PROJECTION tx_from_address_hit
(
SELECT *
ORDER BY from_address
),PROJECTION tx_to_address_hit (
SELECT *
ORDER BY to_address
),PROJECTION tx_sequence_number_hit (
SELECT *
ORDER BY sequence_number
),PROJECTION tx_transaction_hash_hit (
SELECT *
ORDER BY transaction_hash
)
)
ENGINE=CnchMergeTree()
PRIMARY KEY (transaction_hash,from_address,to_address)
ORDER BY (transaction_hash,from_address,to_address)
PARTITION BY toDate(toStartOfMonth(`block_timestamp`));
开启 Projection
Q1
WITH tx AS (SELECT * FROM user.trades WHERE from_address = '0x9686cd65a0e998699faf938879fb' ORDER BY sequence_number DESC,transaction_index DESC UNION ALL SELECT * FROM user.trades WHERE to_address = '0x9686cd65a0e998699faf938879fb' ORDER BY sequence_number DESC,transaction_index DESC) SELECT * FROM tx LIMIT 100;
Q2
with tx as (select sequence_number,transaction_index,transaction_hash,input from user.trades where from_address = '0xdb03b11f5666d0e51934b43bd' order by sequence_number desc,transaction_index desc UNION ALL select sequence_number,transaction_index,transaction_hash,input from user.trades where to_address = '0xdb03b11f5666d0e51934b43bd' order by sequence_number desc,transaction_index desc) select sequence_number,transaction_hash,substring(input,1,8) as func_sign from tx order by sequence_number desc,transaction_index desc limit 100 settings max_threads = 1,allow_experimental_projection_optimization = 1,use_uncompressed_cache = true;
敞开 Projection
Q1
Q2