乐趣区

ClickHouse内核分析MergeTree的Merge和Mutation机制

注:以下分析基于开源 v19.15.2.2-stable 版本进行

引言

ClickHouse 内核分析系列文章,继上一篇文章 MergeTree 查询链路 之后,这次我将为大家介绍 MergeTree 存储引擎的异步 Merge 和 Mutation 机制。建议读者先补充上一篇文章的基础知识,这样会比较容易理解。

MergeTree Mutation 功能介绍

在上一篇系列文章中,我已经介绍过 ClickHouse 内核中的 MergeTree 存储一旦生成一个 Data Part,这个 Data Part 就不可再更改了。所以从 MergeTree 存储内核层面,ClickHouse 就不擅长做数据更新删除操作。但是绝大部分用户场景中,难免会出现需要手动订正、修复数据的场景。所以 ClickHouse 为用户设计了一套离线异步机制来支持低频的 Mutation(改、删)操作。

Mutation 命令执行

ALTER TABLE [db.]table DELETE WHERE filter_expr;
ALTER TABLE [db.]table UPDATE column1 = expr1 [, ...] WHERE filter_expr;

ClickHouse 的方言把 Delete 和 Update 操作也加入到了 Alter Table 的范畴中,它并不支持裸的 Delete 或者 Update 操作。当用户执行一个如上的 Mutation 操作获得返回时,ClickHouse 内核其实只做了两件事情:

  1. 检查 Mutation 操作是否合法;
  2. 保存 Mutation 命令到存储文件中,唤醒一个异步处理 merge 和 mutation 的工作线程;

两者的主体逻辑分别在 MutationsInterpreter::validate 函数和 StorageMergeTree::mutate 函数中。

MutationsInterpreter::validate 函数 dry run 一个异步 Mutation 执行的全过程,其中涉及到检查 Mutation 是否合法的判断原则是列值更新后记录的分区键和排序键不能有变化。因为分区键和排序键一旦发生变化,就会导致多个 Data Part 之间之间 Merge 逻辑的复杂化。剩余的 Mutation 执行过程可以看做是打开一个 Data Part 的 BlockInputStream,在这个 BlockStream 的基础上封装删除操作的 FilterBlockInputStream,再加上更新操作的 ExpressionBlockInputStream,最后把数据通过 BlockOutputStream 写回到新的 Data Part 中。这里简单介绍一下 ClickHouse 的计算层实现,整体上它是一个火山模型的计算引擎,数据的各种 filer、投影、join、agg 都是通过 BlockStrem 抽象实现,在 BlockStream 中数据是按照 Block 进行传输处理的,而 Block 中的数据又是按照列模式组织,这使得 ClickHouse 在单列的计算上可以批量化并使用一些 SIMD 指令加速。BlockOutputStream 承担了 MergeTree Data Part 列存写入和索引构建的全部工作,我会在后续的文章中会详细展开介绍 ClickHouse 计算层中各类功能的 BlockStream,以及 BlockOutputStream 中构建索引的实现细节。

在 Mutation 命令的执行过程中,我们可以看到 MergeTree 会把整条 Alter 命令保存到存储文件夹下,然后创建一个 MergeTreeMutationEntry 对象保存到表的待修改状态中,最后唤醒一个异步处理 merge 和 mutation 的工作线程。这里有一个关键的问题,因为 Mutation 的实际操作是异步发生的,在用户的 Alter 命令返回之后仍然会有数据写入,系统如何在异步订正的过程中排除掉 Alter 命令之后写入的数据呢?下一节中我会介绍 MergeTree 中 Data Part 的 Version 机制,它可以在 Data Part 级别解决上面的问题。但是因为 ClickHouse 写入链路的异步性,ClickHouse 仍然无法保证 Alter 命令前 Insert 的每条纪录都被更新,只能确保 Alter 命令前已经存在的 Data Part 都会被订正,推荐用户只用来订正 T + 1 场景的离线数据。

异步 Merge&Mutation

Batch Insert 和 Mutation 的数据一致性

struct MergeTreePartInfo
{
    String partition_id;
    Int64 min_block = 0;
    Int64 max_block = 0;
    UInt32 level = 0;
    Int64 mutation = 0;   /// If the part has been mutated or contains mutated parts, is equal to mutation version number.
    ...
    /// Get block number that can be used to determine which mutations we still need to apply to this part
    /// (all mutations with version greater than this block number).
    Int64 getDataVersion() const { return mutation ? mutation : min_block;}
    ...    
    bool operator<(const MergeTreePartInfo & rhs) const
    {return std::forward_as_tuple(partition_id, min_block, max_block, level, mutation)
            < std::forward_as_tuple(rhs.partition_id, rhs.min_block, rhs.max_block, rhs.level, rhs.mutation);
    }
}

在具体展开 MergeTree 的异步 merge 和 mutation 机制之前,先需要详细介绍一下 MergeTree 中对 Data Part 的管理方式。每个 Data Part 都有一个 MergeTreePartInfo 对象来保存它的 meta 信息,MergeTreePartInfo 类的结构如上方代码所示。

  1. partition_id:表示所属的数据分区 id。
  2. min_block、max_block:blockNumber 是数据写入的一个版本信息,在上一篇系列文章中讲过,用户每次批量写入的数据都会生成一个 Data Part。同一批写入的数据会被 assign 一个唯一的 blockNumber,而这个 blockNumber 是在 MergeTree 表级别自增的。以及 MergeTree 在 merge 多个 Data Part 的时候会准守一个原则:在同一个数据分区下选择 blockNumber 区间相邻的若干个 Data Parts 进行合并,不会出现在同一个数据分区下 Data Parts 之间的 blockNumber 区间出现重合。所以 Data Part 中的 min_block 和 max_block 可以表示当前 Data Part 中数据的版本范围。
  3. level:表示 Data Part 所在的层级,新写入的 Data Part 都属于 level 0。异步 merge 多个 Data Part 的过程中,系统会选择其中最大的 level + 1 作为新 Data Part 的 level。这个信息可以一定程度反映出当前的 Data Part 是经历了多少次 merge,但是不能准确表示,核心原因是 MergeTree 允许多个 Data Part 跨 level 进行 merge 的,为了最终一个数据分区内的数据 merge 成一个 Data Part。
  4. mutation:和批量写入数据的版本号机制类似,MergeTree 表的 mutation 命令也会被 assign 一个唯一的 blockNumber 作为版本号,这个版本号信息会保存在 MergeTreeMutationEntry 中,所以通过版本号信息我们可以看出数据写入和 mutation 命令之间的先后关系。Data Part 中的这个 mutation 表示的则是当前这个 Data Part 已经完成的 mutation 操作,对每个 Data Part 来说它是按照 mutation 的 blockNumber 顺序依次完成所有的 mutation。

解释了 MergeTreePartInfo 类中的信息含义,我们就可以理解上一节中遗留的异步 Mutation 如何选择哪些 Data Parts 需要订正的问题。系统可以通过 MergeTreePartInfo::getDataVersion() { return mutation ? mutation : min_block}函数来判断当前 Data Part 是否需要进行某个 mutation 订正,比较两者 version 即可。

Merge&Mutation 工作任务

ClickHouse 内核中异步 merge、mutation 工作由统一的工作线程池来完成,这个线程池的大小用户可以通过参数 background_pool_size 进行设置。线程池中的线程 Task 总体逻辑如下,可以看出这个异步 Task 主要做三块工作:清理残留文件,merge Data Parts 和 mutate Data Part。

BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask()
{
    ....
    try
    {
        /// Clear old parts. It is unnecessary to do it more than once a second.
        if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))
        {
            {
                /// TODO: Implement tryLockStructureForShare.
                auto lock_structure = lockStructureForShare(false, "");
                clearOldPartsFromFilesystem();
                clearOldTemporaryDirectories();}
            clearOldMutations();}
        ///TODO: read deduplicate option from table config
        if (merge(false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/))
            return BackgroundProcessingPoolTaskResult::SUCCESS;
        if (tryMutatePart())
            return BackgroundProcessingPoolTaskResult::SUCCESS;
        return BackgroundProcessingPoolTaskResult::ERROR;
    }
   ...
}

需要清理的残留文件分为三部分:过期的 Data Part,临时文件夹,过期的 Mutation 命令文件。如下方代码所示,MergeTree Data Part 的生命周期包含多个阶段,创建一个 Data Part 的时候分两阶段执行 Temporary->Precommitted->Commited,淘汰一个 Data Part 的时候也可能会先经过一个 Outdated 状态,再到 Deleting 状态。在 Outdated 状态下的 Data Part 仍然是可查的。异步 Task 在收集 Outdated Data Part 的时候会根据它的 shared_ptr 计数来判断当前是否有查询 Context 引用它,没有的话才进行删除。清理临时文件的逻辑较为简单,在数据文件夹中遍历搜索 ”tmp_” 开头的文件夹,并判断创建时长是否超过 temporary_directories_lifetime。临时文件夹主要在 ClickHouse 的两阶段提交过程可能造成残留。最后是清理数据已经全部订正完成的过期 Mutation 命令文件。

enum class State
    {
        Temporary,       /// the part is generating now, it is not in data_parts list
        PreCommitted,    /// the part is in data_parts, but not used for SELECTs
        Committed,       /// active data part, used by current and upcoming SELECTs
        Outdated,        /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes
        Deleting,        /// not active data part with identity refcounter, it is deleting right now by a cleaner
        DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor
    };

Merge 逻辑

StorageMergeTree::merge 函数是 MergeTree 异步 Merge 的核心逻辑,Data Part Merge 的工作除了通过后台工作线程自动完成,用户还可以通过 Optimize 命令来手动触发。自动触发的场景中,系统会根据后台空闲线程的数据来启发式地决定本次 Merge 最大可以处理的数据量大小,max_bytes_to_merge_at_min_space_in_pool 和 max_bytes_to_merge_at_max_space_in_pool 参数分别决定当空闲线程数最大时可处理的数据量上限以及只剩下一个空闲线程时可处理的数据量上限。当用户的写入量非常大的时候,应该适当调整工作线程池的大小和这两个参数。当用户手动触发 merge 时,系统则是根据 disk 剩余容量来决定可处理的最大数据量。

接下来介绍 merge 过程中最核心的逻辑:如何选择 Data Parts 进行 merge?为了方便理解,这里先介绍一下 Data Parts 在 MergeTree 表引擎中的管理组织方式。上一节中提到的 MergeTreePartInfo 类中定义了比较操作符,MergeTree 中的 Data Parts 就是按照这个比较操作符进行排序管理,排序键是 (partition_id, min_block, max_block, level, mutation),索引管理结构如下图所示:

自动 Merge 的处理逻辑,首先是通过 MergeTreeDataMergerMutator::selectPartsToMerge 函数筛选出本次 merge 要合并的 Data Parts,这个筛选过程需要准守三个原则:

  1. 跨数据分区的 Data Part 之间不能合并;
  2. 合并的 Data Parts 之间必须是相邻(在上图的有序组织关系中相邻),只能在排序链表中按段合并,不能跳跃;
  3. 合并的 Data Parts 之间的 mutation 状态必须是一致的,如果 Data Part A 后续还需要完成 mutation-23 而 Data Part B 后续不需要完成 mutation-23(数据全部是在 mutation 命令之后写入或者已经完成 mutation-23),则 A 和 B 不能进行合并;

所以我们上面的 Data Parts 组织关系逻辑示意图中,相同颜色的 Data Parts 是可以合并的。虽然图中三个不同颜色的 Data Parts 序列都是可以合并的,但是合并工作线程每次只会挑选其中某个序列的一小段进行合并(如前文所述,系统会限定每次合并的 Data Parts 的数据量)。对于如何从这些序列中挑选出最佳的一段区间,ClickHouse 抽象出了 IMergeSelector 类来实现不同的逻辑。当前主要有两种不同的 merge 策略:TTL 数据淘汰策略和常规策略。

  • TTL 数据淘汰策略:TTL 数据淘汰策略启用的条件比较苛刻,只有当某个 Data Part 中存在数据生命周期超时需要淘汰,并且距离上次使用 TTL 策略达到一定时间间隔(默认 1 小时)。TTL 策略也非常简单,首先挑选出 TTL 超时最严重 Data Part,把这个 Data Part 所在的数据分区作为要进行数据合并的分区,最后会把这个 TTL 超时最严重的 Data Part 前后连续的所有存在 TTL 过期的 Data Part 都纳入到 merge 的范围中。这个策略简单直接,每次保证优先合并掉最老的存在过期数据的 Data Part。
  • 常规策略:这里的选举策略就比较复杂,基本逻辑是枚举每个可能合并的 Data Parts 区间,通过启发式规则判断是否满足合并条件,再有启发式规则进行算分,选取分数最好的区间。启发式判断是否满足合并条件的算法在 SimpleMergeSelector.cpp::allow 函数中,其中的主要思想分为以下几点:系统默认对合并的区间有一个 Data Parts 数量的限制要求(每 5 个 Data Parts 才能合并);如果当前数据分区中的 Data Parts 出现了膨胀,则适量放宽合并数量限制要求(最低可以两两 merge);如果参与合并的 Data Parts 中有很久之前写入的 Data Part,也适量放宽合并数量限制要求,放宽的程度还取决于要合并的数据量。第一条规则是为了提升写入性能,避免在高速写入时两两 merge 这种低效的合并方式。最后一条规则则是为了保证随着数据分区中的 Data Part 老化,老龄化的数据分区内数据全部合并到一个 Data Part。中间的规则更多是一种保护手段,防止因为写入和频繁 mutation 的极端情况下,Data Parts 出现膨胀。启发式算法的策略则是优先选择 IO 开销最小的 Data Parts 区间完成合并,尽快合并掉小数据量的 Data Parts 是对在线查询最有利的方式,数据量很大的 Data Parts 已经有了很较好的数据压缩和索引效率,合并操作对查询带来的性价比较低。

Mutation 逻辑

StorageMergeTree::tryMutatePart 函数是 MergeTree 异步 mutation 的核心逻辑,主体逻辑如下。系统每次都只会订正一个 Data Part,但是会聚合多个 mutation 任务批量完成,这点实现非常的棒。因为在用户真实业务场景中一次数据订正逻辑中可能会包含多个 Mutation 命令,把这多个 mutation 操作聚合到一起订正效率上就非常高。系统每次选择一个排序键最小的并且需要订正 Data Part 进行操作,本意上就是把数据从前往后进行依次订正。

Mutation 功能是 MergeTree 表引擎最新推出一大功能,从我个人的角度看在实现完备度上还有一下两点需要去优化:

  1. mutation 没有实时可见能力。我这里的实时可见并不是指在存储上立即原地更新,而是给用户提供一种途径可以立即看到数据订正后的最终视图确保订正无误。类比在使用 CollapsingMergeTree、SummingMergeTree 等高级 MergeTree 引擎时,数据还没有完全 merge 到一个 Data Part 之前,存储层并没有一个数据的最终视图。但是用户可以通过 Final 查询模式,在计算引擎层实时聚合出数据的最终视图。这个原理对 mutation 实时可见也同样适用,在实时查询中通过 FilterBlockInputStream 和 ExpressionBlockInputStream 完成用户的 mutation 操作,给用户提供一个最终视图。
  2. mutation 和 merge 相互独立执行。看完本文前面的分析,大家应该也注意到了目前 Data Part 的 merge 和 mutation 是相互独立执行的,Data Part 在同一时刻只能是在 merge 或者 mutation 操作中。对于 MergeTree 这种存储彻底 Immutable 的设计,数据频繁 merge、mutation 会引入巨大的 IO 负载。实时上 merge 和 mutation 操作是可以合并到一起去考虑的,这样可以省去数据一次读写盘的开销。对数据写入压力很大又有频繁 mutation 的场景,会有很大帮助。
for (const auto & part : getDataPartsVector())
        {
            ...
            size_t current_ast_elements = 0;
            for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
            {MutationsInterpreter interpreter(shared_from_this(), it->second.commands, global_context);
                size_t commands_size = interpreter.evaluateCommandsSize();
                if (current_ast_elements + commands_size >= max_ast_elements)
                    break;
                current_ast_elements += commands_size;
                commands.insert(commands.end(), it->second.commands.begin(), it->second.commands.end());
            }
            auto new_part_info = part->info;
            new_part_info.mutation = current_mutations_by_version.rbegin()->first;
            future_part.parts.push_back(part);
            future_part.part_info = new_part_info;
            future_part.name = part->getNewName(new_part_info);
            tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, true);
            break;
        }

最后在经过后台工作线程一轮 merge 和 mutation 操作之后,上一节中展示的 MergeTree 表引擎中的 Data Parts 可能发生的变化如下图所示,2020-05-10 数据分区下的头两个 Data Parts 被 merge 到了一起,并且完成了 Mutation 37 和 Mutation 39 的数据订正,新产生的 Data Part 如红色所示:

Clickhouse 产品链接:https://www.aliyun.com/product/clickhouse

ClickHouse 内核分析系列文章:

MergeTree 查询链路
希望通过内核分析系列文章,让大家更好地了解这款世界领先的列式存储分析型数据库。

退出移动版