ClickHouse入门实践副本与分片

3次阅读

共计 23414 个字符,预计需要花费 59 分钟才能阅读完成。

概述

集群是副本和分片的基础,它将 ClickHouse 的服务拓扑由单节点延伸到多个节点,但它并不像 Hadoop 生态的某些系统那样,要求所有节点组成一个单一的大集群。ClickHouse 的集群配置非常灵活,用户既可以将所有节点组成一个单一集群,也可以按照业务的诉求,把节点划分为多个小的集群。在每个小的集群区域之间,它们的节点、分区和副本数量可以各不相同

从作用来看,ClickHouse 集群的工作更多是针对逻辑层面的。集群定义了多个节点的拓扑关系,这些节点在后续服务过程中可能会协同工作,而执行层面的具体工作则交给了副本和分片来执行。副本和分片这对双胞胎兄弟,有时候看起来泾渭分明,有时候又让人分辨不清。这里有两种区分的方法。一种是从数据层面区分,假设 ClickHouse 的 N 个节点组成了一个集群,在集群的各个节点上,都有一张结构相同的数据表 Y。如果 N1 的 Y 和 N2 的 Y 中的数据完全不同,则 N1 和 N2 互为分片;如果它们的数据完全相同,则它们互为副本。换言之,分片之间的数据是不同的,而副本之间的数据是完全相同的。所以抛开表引擎的不同,单纯从数据层面来看,副本和分片有时候只有一线之隔。

另一种是从功能作用层面区分,使用副本的主要目的是防止数据丢失,增加数据存储的冗余;而使用分片的主要目的是实现数据的水平切分


本章接下来会按照由易到难的方式介绍副本、分片和集群的使用方法。从数据表的初始形态 1 分片、0 副本开始介绍;接着介绍如何为它添加副本,从而形成 1 分片、1 副本的状态;再介绍如何引入分片,将其转换为多分片、1 副本的形态(多副本的形态以此类推)

这种形态的变化过程像极了企业内的业务发展过程。在业务初期,我们从单张数据表开始;在业务上线之后,可能会为它增加副本,以保证数据的安全,或者希望进行读写分离;随着业务量的发展,单张数据表可能会遇到瓶颈,此时会进一步为它增加分片,从而实现数据的水平切分。在接下来的示例中,也会遵循这样的演示路径进行说明。

数据副本

不知大家是否还记得,在介绍 MergeTree 的时候,曾经讲过它的命名规则。如果在 *MergeTree 的前面增加 Replicated 的前缀,则能够组合成一个新的变种引擎,即 Replicated-MergeTree 复制表。

换言之,只有使用了 ReplicatedMergeTree 复制表系列引擎,才能应用副本的能力(后面会介绍另一种副本的实现方式)。或者用一种更为直接的方式理解,即使用 ReplicatedMergeTree 的数据表就是副本。

ReplicatedMergeTree 是 MergeTree 的派生引擎,它在 MergeTree 的基础上加入了分布式协同的能力。

在 MergeTree 中,一个数据分区由开始创建到全部完成,会历经两类存储区域。
(1)内存:数据首先会被写入内存缓冲区。
(2)本地磁盘:数据接着会被写入 tmp 临时目录分区,待全部完成后再将临时目录重命名为正式分区。ReplicatedMergeTree 在上述基础之上增加了 ZooKeeper 的部分,它会进一步在 ZooKeeper 内创建一系列的监听节点,并以此实现多个实例之间的通信。在整个通信过程中,ZooKeeper 并不会涉及表数据的传输。

副本的特点

作为数据副本的主要实现载体,ReplicatedMergeTree 在设计上有一些显著特点。
❑ 依赖 ZooKeeper:在执行 INSERT 和 ALTER 查询的时候,ReplicatedMergeTree 需要借助 ZooKeeper 的分布式协同能力,以实现多个副本之间的同步。但是在查询副本的时候,并不需要使用 ZooKeeper。关于这方面的更多信息,会在稍后详细介绍。
❑ 表级别的副本:副本是在表级别定义的,所以每张表的副本配置都可以按照它的实际需求进行个性化定义,包括副本的数量,以及副本在集群内的分布位置等。
❑ 多主架构(Multi Master):可以在任意一个副本上执行 INSERT 和 ALTER 查询,它们的效果是相同的。这些操作会借助 ZooKeeper 的协同能力被分发至每个副本以本地形式执行。
❑ Block 数据块:在执行 INSERT 命令写入数据时,会依据 max_insert_block_size 的大小(默认 1048576 行)将数据切分成若干个 Block 数据块。所以 Block 数据块是数据写入的基本单元,并且具有写入的原子性和唯一性。
❑ 原子性:在数据写入时,一个 Block 块内的数据要么全部写入成功,要么全部失败。
❑ 唯一性:在写入一个 Block 数据块的时候,会按照当前 Block 数据块的数据顺序、数据行和数据大小等指标,计算 Hash 信息摘要并记录在案。在此之后,如果某个待写入的 Block 数据块与先前已被写入的 Block 数据块拥有相同的 Hash 摘要(Block 数据块内数据顺序、数据大小和数据行均相同),则该 Block 数据块会被忽略。这项设计可以预防由异常原因引起的 Block 数据块重复写入的问题。如果只是单纯地看这些特点的说明,可能不够直观。没关系,接下来会逐步展开,并附带一系列具体的示例。

ZooKeeper 的配置方式

ClickHouse 使用一组 zookeeper 标签定义相关配置,默认情况下,在全局配置 config. xml 中定义即可。但是各个副本所使用的 Zookeeper 配置通常是相同的,为了便于在多个节点之间复制配置文件,更常见的做法是将这一部分配置抽离出来,独立使用一个文件保存。

首先,在服务器的 /etc/clickhouse-server/config.d 目录下创建一个名为 metrika.xml 的配置文件:


接着,在全局配置 config.xml 中使用 <include_from> 标签导入刚才定义的配置:
并引用 ZooKeeper 配置的定义:

其中,incl 与 metrika.xml 配置文件内的节点名称要彼此对应。至此,整个配置过程就完成了。

ClickHouse 在它的系统表中,颇为贴心地提供了一张名为 zookeeper 的代理表。通过这张表,可以使用 SQL 查询的方式读取远端 ZooKeeper 内的数据。有一点需要注意,在用于查询的 SQL 语句中,必须指定 path 条件,例如查询根路径:

SELECT * FROM  system.zookeeper where path = '/';

SELECT name,value,czxid,mzxid FROM  system.zookeeper where path = '/clickhouse';

副本的定义形式

正如前文所言,使用副本的好处甚多。首先,由于增加了数据的冗余存储,所以降低了数据丢失的风险;其次,由于副本采用了多主架构,所以每个副本实例都可以作为数据读、写的入口,这无疑分摊了节点的负载。

在使用副本时,不需要依赖任何集群配置, ReplicatedMergeTree 结合 ZooKeeper 就能完成全部工作。

ReplicatedMergeTree 的定义方式如下:

ENGINE =ReplicatedMergeTree('zk_path','replica_name')

zk_path 用于指定在 ZooKeeper 中创建的数据表的路径,路径名称是自定义的,并没有固定规则,用户可以设置成自己希望的任何路径。即便如此,ClickHouse 还是提供了一些约定俗成的配置模板以供参考,例如:

/clickhouse/tables/{shard}/table_name

其中:
❑ /clickhouse/tables/ 是约定俗成的路径固定前缀,表示存放数据表的根路径。
❑ {shard}表示分片编号,通常用数值替代,例如 01、02、03。一张数据表可以有多个分片,而每个分片都拥有自己的副本。
❑ table_name 表示数据表的名称,为了方便维护,通常与物理表的名字相同(虽然 ClickHouse 并不强制要求路径中的表名称和物理表名相同);而 replica_name 的作用是定义在 ZooKeeper 中创建的副本名称,该名称是区分不同副本实例的唯一标识。一种约定成俗的命名方式是使用所在服务器的域名称。

对于 zk_path 而言,同一张数据表的同一个分片的不同副本,应该定义相同的路径;而对于 replica_name 而言,同一张数据表的同一个分片的不同副本,应该定义不同的名称。

ReplicatedMergeTree 原理解析

ReplicatedMergeTree 作为复制表系列的基础表引擎,涵盖了数据副本最为核心的逻辑,将它拿来作为副本的研究标本是最合适不过了。因为只要剖析了 ReplicatedMergeTree 的核心原理,就能掌握整个 ReplicatedMergeTree 系列表引擎的使用方法。

数据结构

在 ReplicatedMergeTree 的核心逻辑中,大量运用了 ZooKeeper 的能力,以实现多个 ReplicatedMergeTree 副本实例之间的协同,包括主副本选举、副本状态感知、操作日志分发、任务队列和 BlockID 去重判断等。在执行 INSERT 数据写入、MERGE 分区和 MUTATION 操作的时候,都会涉及与 ZooKeeper 的通信。但是在通信的过程中,并不会涉及任何表数据的传输,在查询数据的时候也不会访问 ZooKeeper,所以不必过于担心 ZooKeeper 的承载压力。

因为 ZooKeeper 对 ReplicatedMergeTree 非常重要,所以下面首先从它的数据结构开始介绍。

ZooKeeper 内的节点结构

ReplicatedMergeTree 需要依靠 ZooKeeper 的事件监听机制以实现各个副本之间的协同。所以,在每张 ReplicatedMergeTree 表的创建过程中,它会以 zk_path 为根路径,在 Zoo-Keeper 中为这张表创建一组监听节点。按照作用的不同,监听节点可以大致分成如下几类:

(1)元数据:
❑ /metadata:保存元数据信息,包括主键、分区键、采样表达式等。
❑ /columns:保存列字段信息,包括列名称和数据类型。
❑ /replicas:保存副本名称,对应设置参数中的 replica_name。

(2)判断标识:
❑ /leader_election:用于主副本的选举工作,主副本会主导 MERGE 和 MUTATION 操作(ALTER DELETE 和 ALTER UPDATE)。这些任务在主副本完成之后再借助 ZooKeeper 将消息事件分发至其他副本。
❑ /blocks:记录 Block 数据块的 Hash 信息摘要,以及对应的 partition_id。通过 Hash 摘要能够判断 Block 数据块是否重复;通过 partition_id,则能够找到需要同步的数据分区。
❑ /block_numbers:按照分区的写入顺序,以相同的顺序记录 partition_id。各个副本在本地进行 MERGE 时,都会依照相同的 block_numbers 顺序进行。
❑ /quorum:记录 quorum 的数量,当至少有 quorum 数量的副本写入成功后,整个写操作才算成功。quorum 的数量由 insert_quorum 参数控制,默认值为 0。

(3)操作日志:
❑ /log:常规操作日志节点(INSERT、MERGE 和 DROP PARTITION),它是整个工作机制中最为重要的一环,保存了副本需要执行的任务指令。log 使用了 ZooKeeper 的持久顺序型节点,每条指令的名称以 log- 为前缀递增,例如 log-0000000000、log-0000000001 等。每一个副本实例都会监听 /log 节点,当有新的指令加入时,它们会把指令加入副本各自的任务队列,并执行任务。关于这方面的执行逻辑,稍后会进一步展开。

❑ /mutations:MUTATION 操作日志节点,作用与 log 日志类似,当执行 ALERTDELETE 和 ALERT UPDATE 查询时,操作指令会被添加到这个节点。mutations 同样使用了 ZooKeeper 的持久顺序型节点,但是它的命名没有前缀,每条指令直接以递增数字的形式保存,例如 0000000000、0000000001 等。关于这方面的执行逻辑,同样稍后展开。

❑ /replicas/{replica_name}/*:每个副本各自的节点下的一组监听节点,用于指导副本在本地执行具体的任务指令,其中较为重要的节点有如下几个:
❍ /queue:任务队列节点,用于执行具体的操作任务。当副本从 /log 或 /mutations 节点监听到操作指令时,会将执行任务添加至该节点下,并基于队列执行。
❍ /log_pointer:log 日志指针节点,记录了最后一次执行的 log 日志下标信息,例如 log_pointer:4 对应了 /log/log-0000000003(从 0 开始计数)。
❍ /mutation_pointer:mutations 日志指针节点,记录了最后一次执行的 mutations 日志名称,例如 mutation_pointer:0000000000 对应了 /mutations/000000000。

Entry 日志对象的数据结构

ReplicatedMergeTree 在 ZooKeeper 中有两组非常重要的父节点,那就是 /log 和 /mutations。它们的作用犹如一座通信塔,是分发操作指令的信息通道,而发送指令的方式,则是为这些父节点添加子节点。所有的副本实例,都会监听父节点的变化,当有子节点被添加时,它们能实时感知。

这些被添加的子节点在 ClickHouse 中被统一抽象为 Entry 对象,而具体实现则由 Log-Entry 和 MutationEntry 对象承载,分别对应 /log 和 /mutations 节点。

1)LogEntryLogEntry 用于封装 /log 的子节点信息,它拥有如下几个核心属性:
❑ source replica:发送这条 Log 指令的副本来源,对应 replica_name。
❑ type:操作指令类型,主要有 get、merge 和 mutate 三种,分别对应从远程副本下载分区、合并分区和 MUTATION 操作。
❑ block_id:当前分区的 BlockID,对应 /blocks 路径下子节点的名称。❑ partition_name:当前分区目录的名称。

2)MutationEntryMutationEntry 用于封装 /mutations 的子节点信息,它同样拥有如下几个核心属性:
❑ source replica:发送这条 MUTATION 指令的副本来源,对应 replica_name。
❑ commands:操作指令,主要有 ALTER DELETE 和 ALTER UPDATE。
❑ mutation_id:MUTATION 操作的版本号。
❑ partition_id:当前分区目录的 ID。以上就是 Entry 日志对象的数据结构信息,在接下来将要介绍的核心流程中,将会看到它们的身影。

以上就是 Entry 日志对象的数据结构信息,在接下来将要介绍的核心流程中,将会看到它们的身影。

副本协同的核心流程

副本协同的核心流程主要有 INSERT、MERGE、MUTATION 和 ALTER 四种,分别对应了数据写入、分区合并、数据修改和元数据修改。INSERT 和 ALTER 查询是分布式执行的。借助 ZooKeeper 的事件通知机制,多个副本之间会自动进行有效协同,但是它们不会使用 ZooKeeper 存储任何分区数据。而其他查询并不支持分布式执行,包括 SELECT、CREATE、DROP、RENAME 和 ATTACH。例如,为了创建多个副本,我们需要分别登录每个 ClickHouse 节点,在它们本地执行各自的 CREATE 语句(后面将会介绍如何利用集群配置简化这一操作)。接下来,会依次介绍上述流程的工作机理。为了便于理解,我先来整体认识一下各个流程的介绍方法。

首先,拟定一个演示场景,即使用 ReplicatedMergeTree 实现一张拥有 1 分片、1 副本的数据表,并以此来贯穿整个讲解过程(对于大于 1 个副本的场景,流程以此类推)。

接着,通过对 ReplicatedMergeTree 分别执行 INSERT、MERGE、MUTATION 和 ALTER 操作,以此来讲解相应的工作原理。与此同时,通过实际案例,论证工作原理。

INSERT 的核心执行流程

当需要在 ReplicatedMergeTree 中执行 INSERT 查询以写入数据时,即会进入 INSERT 核心流程

创建第一个副本实例
假设首先从 CH5 节点开始,对 CH5 节点执行下面的语句后,会创建第一个副本实例:

CREATE TABLE replicated_sales_1 (
    id String,
    price Float64,
    create_time DateTime
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/01/replicated_sales_1','ch5.nauu.com')
partition by toYYYYMM(create_time)
ORDER BY id ;

在创建的过程中,ReplicatedMergeTree 会进行一些初始化操作,例如:❑ 根据 zk_path 初始化所有的 ZooKeeper 节点。
❑ 在 /replicas/ 节点下注册自己的副本实例 ch5.nauu.com。
❑ 启动监听任务,监听 /log 日志节点。
❑ 参与副本选举,选举出主副本,选举的方式是向 /leader_election/ 插入子节点,第一个插入成功的副本就是主副本。

接着,在 CH6 节点执行下面的语句,创建第二个副本实例。表结构和 zk_path 需要与第一个副本相同,而 replica_name 则需要设置成 CH6 的域名:

CREATE TABLE replicated_sales_1 (
    id String,
    price Float64,
    create_time DateTime
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/01/replicated_sales_1','ch6.nauu.com')
partition by toYYYYMM(create_time)
ORDER BY id ;

在创建过程中,第二个 ReplicatedMergeTree 同样会进行一些初始化操作,例如:
❑ 在 /replicas/ 节点下注册自己的副本实例 ch6.nauu.com。
❑ 启动监听任务,监听 /log 日志节点。
❑ 参与副本选举,选举出主副本。在这个例子中,CH5 副本成为主副本。

现在尝试向第一个副本 CH5 写入数据。执行如下命令:

上述命令执行之后,首先会在本地完成分区目录的写入:

接着向 /blocks 节点写入该数据分区的 block_id:

该 block_id 将作为后续去重操作的判断依据。如果此时再次执行刚才的 INSERT 语句,试图写入重复数据,则会出现如下提示:

即副本会自动忽略 block_id 重复的待写入数据。
此外,如果设置了 insert_quorum 参数(默认为 0),并且 insert_quorum>=2,则 CH5 会进一步监控已完成写入操作的副本个数,只有当写入副本个数大于或等于 insert_quorum 时,整个写入操作才算成功。

由第一个副本实例推送 Log 日志

在 3 步骤完成之后,会继续由执行了 INSERT 的副本向 /log 节点推送操作日志。在这个例子中,会由第一个副本 CH5 担此重任。日志的编号是 /log/log-0000000000,而 LogEntry 的核心属性如下:

从日志内容中可以看出,操作类型为 get 下载,而需要下载的分区是 201905_0_0_0。其余所有副本都会基于 Log 日志以相同的顺序执行命令。

第二个副本实例拉取 Log 日志
CH6 副本会一直监听 /log 节点变化,当 CH5 推送了 /log/log-0000000000 之后,CH6 便会触发日志的拉取任务并更新 log_pointer,将其指向最新日志下标:

在拉取了 LogEntry 之后,它并不会直接执行,而是将其转为任务对象放至队列:

这是因为在复杂的情况下,考虑到在同一时段内,会连续收到许多个 LogEntry,所以使用队列的形式消化任务是一种更为合理的设计。注意,拉取的 LogEntry 是一个区间,这同样也是因为可能会连续收到多个 LogEntry。

第二个副本实例向其他副本发起下载请求

CH6 基于 /queue 队列开始执行任务。当看到 type 类型为 get 的时候,ReplicatedMerge-Tree 会明白此时在远端的其他副本中已经成功写入了数据分区,而自己需要同步这些数据。

CH6 上的第二个副本实例会开始选择一个远端的其他副本作为数据的下载来源。远端副本的选择算法大致是这样的:

(1)从 /replicas 节点拿到所有的副本节点。

(2)遍历这些副本,选取其中一个。选取的副本需要拥有最大的 log_pointer 下标,并且 /queue 子节点数量最少。log_pointer 下标最大,意味着该副本执行的日志最多,数据应该更加完整;而 /queue 最小,则意味着该副本目前的任务执行负担较小。

在这个例子中,算法选择的远端副本是 CH5。于是,CH6 副本向 CH5 发起了 HTTP 请求,希望下载分区 201905_0_0_0:

如果第一次下载请求失败,在默认情况下,CH6 再尝试请求 4 次,一共会尝试 5 次(由 max_fetch_partition_retries_count 参数控制,默认为 5)。

CH5 的 DataPartsExchange 端口服务接收到调用请求,在得知对方来意之后,根据参数做出响应,将本地分区 201905_0_0_0 基于 DataPartsExchang 的服务响应发送回 CH6:

CH6 副本在收到 CH5 的分区数据后,首先将其写至临时目录:

待全部数据接收完成之后,重命名该目录:

至此,整个写入流程结束。

可以看到,在 INSERT 的写入过程中,ZooKeeper 不会进行任何实质性的数据传输。本着谁执行谁负责的原则,在这个案例中由 CH5 首先在本地写入了分区数据。之后,也由这个副本负责发送 Log 日志,通知其他副本下载数据。如果设置了 insert_quorum 并且 insert_quorum>=2,则还会由该副本监控完成写入的副本数量。其他副本在接收到 Log 日志之后,会选择一个最合适的远端副本,点对点地下载分区数据。

MERGE 的核心执行流程

当 ReplicatedMergeTree 触发分区合并动作时,即会进入这个部分的流程,它的核心流程如图所示。


无论 MERGE 操作从哪个副本发起,其合并计划都会交由主副本来制定。在 INSERT 的例子中,CH5 节点已经成功竞选为主副本,所以为了方便论证,这个案例就从 CH6 节点开始。整个流程从上至下按照时间顺序进行,其大致分成 5 个步骤。现在,根据图 1 中所示编号讲解整个过程。

创建远程连接,尝试与主副本通信

首先在 CH6 节点执行 OPTIMIZE,强制触发 MERGE 合并。这个时候,CH6 通过 /replicas 找到主副本 CH5,并尝试建立与它的远程连接。

主副本接收通信
主副本 CH5 接收并建立来自远端副本 CH6 的连接。

主副本接收通信

主副本 CH5 接收并建立来自远端副本 CH6 的连接。

由主副本制定 MERGE 计划并推送 Log 日志
由主副本 CH5 制定 MERGE 计划,并判断哪些分区需要被合并。在选定之后,CH5 将合并计划转换为 Log 日志对象并推送 Log 日志,以通知所有副本开始合并。日志的核心信息如下:

从日志内容中可以看出,操作类型为 Merge 合并,而这次需要合并的分区目录是 201905_0_0_0 和 201905_1_1_0。
与此同时,主副本还会锁住执行线程,对日志的接收情况进行监听:

其监听行为由 replication_alter_partitions_sync 参数控制,默认值为 1。当此参数为 0 时,不做任何等待;为 1 时,只等待主副本自身完成;为 2 时,会等待所有副本拉取完成。

各个副本分别拉取 Log 日志
CH5 和 CH6 两个副本实例将分别监听 /log/log-0000000002 日志的推送,它们也会分别拉取日志到本地,并推送到各自的 /queue 任务队列:

各个副本分别在本地执行 MERGE
CH5 和 CH6 基于各自的 /queue 队列开始执行任务:

各个副本开始在本地执行 MERGE:

至此,整个合并流程结束。

可以看到,在 MERGE 的合并过程中,ZooKeeper 也不会进行任何实质性的数据传输,所有的合并操作,最终都是由各个副本在本地完成的。而无论合并动作在哪个副本被触发,都会首先被转交至主副本,再由主副本负责合并计划的制定、消息日志的推送以及对日志接收情况的监控。

MUTATION 的核心执行流程

当对 ReplicatedMergeTree 执行 ALTER DELETE 或者 ALTER UPDATE 操作的时候,即会进入 MUTATION 部分的逻辑,它的核心流程如图所示。



与 MERGE 类似,无论 MUTATION 操作从哪个副本发起,首先都会由主副本进行响应。所以为了方便论证,这个案例还是继续从 CH6 节点开始(因为 CH6 不是主副本)。整个流程从上至下按照时间顺序进行,其大致分成 5 个步骤。现在根据图中所示编号讲解整个过程。

推送 MUTATION 日志
在 CH6 节点尝试通过 DELETE 来删除数据(执行 UPDATE 的效果与此相同),执行如下命令:

执行之后,该副本会接着进行两个重要事项:
创建 MUTATION ID:

将 MUTATION 操作转换为 MutationEntry 日志,并推送到 /mutations/0000000000。MutationEntry 的核心属性如下:

由此也能知晓,MUTATION 的操作日志是经由 /mutations 节点分发至各个副本的。

所有副本实例各自监听 MUTATION 日志
CH5 和 CH6 都会监听 /mutations 节点,所以一旦有新的日志子节点加入,它们都能实时感知:

当监听到有新的 MUTATION 日志加入时,并不是所有副本都会直接做出响应,它们首先会判断自己是否为主副本。

由主副本实例响应 MUTATION 日志并推送 Log 日志

只有主副本才会响应 MUTATION 日志,在这个例子中主副本为 CH5,所以 CH5 将 MUTATION 日志转换为 LogEntry 日志并推送至 /log 节点,以通知各个副本执行具体的操作。日志的核心信息如下:


从日志内容中可以看出,上述操作的类型为 mutate,而这次需要将 201905_0_1_1 分区修改为 201905_0_1_1_2(201905_0_1_1 +”_”+ mutation_id)。

各个副本实例分别拉取 Log 日志
CH5 和 CH6 两个副本分别监听 /log/log-0000000003 日志的推送,它们也会分别拉取日志到本地,并推送到各自的 /queue 任务队列:

各个副本实例分别在本地执行 MUTATION
CH5 和 CH6 基于各自的 /queue 队列开始执行任务:

各个副本,开始在本地执行 MUTATION:

至此,整个 MUTATION 流程结束。

可以看到,在 MUTATION 的整个执行过程中,ZooKeeper 同样不会进行任何实质性的数据传输。所有的 MUTATION 操作,最终都是由各个副本在本地完成的。而 MUTATION 操作是经过 /mutations 节点实现分发的。本着谁执行谁负责的原则,在这个案例中由 CH6 负责了消息的推送。但是无论 MUTATION 动作从哪个副本被触发,之后都会被转交至主副本,再由主副本负责推送 Log 日志,以通知各个副本执行最终的 MUTATION 逻辑。同时也由主副本对日志接收的情况实行监控。

ALTER 的核心执行流程

当对 ReplicatedMergeTree 执行 ALTER 操作进行元数据修改的时候,即会进入 ALTER 部分的逻辑,例如增加、删除表字段等。而 ALTER 的核心流程如图所示。

与之前的几个流程相比,ALTET 的流程会简单很多,其执行过程中并不会涉及 /log 日志的分发。整个流程从上至下按照时间顺序进行,其大致分成 3 个步骤。现在根据图所示编号讲解整个过程。

修改共享元数据
在 CH6 节点尝试增加一个列字段,执行如下语句:

执行之后,CH6 会修改 ZooKeeper 内的共享元数据节点:

数据修改后,节点的版本号也会同时提升:

与此同时,CH6 还会负责监听所有副本的修改完成情况:

监听共享元数据变更并各自执行本地修改

CH5 和 CH6 两个副本分别监听共享元数据的变更。之后,它们会分别对本地的元数据版本号与共享版本号进行对比。在这个案例中,它们会发现本地版本号低于共享版本号,于是它们开始在各自的本地执行更新操作:

确认所有副本完成修改
CH6 确认所有副本均已完成修改:

至此,整个 ALTER 流程结束。
可以看到,在 ALTER 整个的执行过程中,ZooKeeper 不会进行任何实质性的数据传输。所有的 ALTER 操作,最终都是由各个副本在本地完成的。本着谁执行谁负责的原则,在这个案例中由 CH6 负责对共享元数据的修改以及对各个副本修改进度的监控。

数据分片

通过引入数据副本,虽然能够有效降低数据的丢失风险(多份存储),并提升查询的性能(分摊查询、读写分离),但是仍然有一个问题没有解决,那就是数据表的容量问题。到目前为止,每个副本自身,仍然保存了数据表的全量数据。所以在业务量十分庞大的场景中,依靠副本并不能解决单表的性能瓶颈。想要从根本上解决这类问题,需要借助另外一种手段,即进一步将数据水平切分,也就是我们将要介绍的数据分片。

ClickHouse 中的每个服务节点都可称为一个 shard(分片)。从理论上来讲,假设有 N(N >= 1)张数据表 A,分布在 N 个 ClickHouse 服务节点,而这些数据表彼此之间没有重复数据,那么就可以说数据表 A 拥有 N 个分片。然而在工程实践中,如果只有这些分片表,那么整个 Sharding(分片)方案基本是不可用的。对于一个完整的方案来说,还需要考虑数据在写入时,如何被均匀地写至各个 shard,以及数据在查询时,如何路由到每个 shard,并组合成结果集。所以,ClickHouse 的数据分片需要结合 Distributed 表引擎一同使用,如图所示。

Distributed 表引擎自身不存储任何数据,它能够作为分布式表的一层透明代理,在集群内部自动开展数据的写入、分发、查询、路由等工作。

集群的配置方式
在 ClickHouse 中,集群配置用 shard 代表分片、用 replica 代表副本。那么在逻辑层面,表示 1 分片、0 副本语义的配置如下所示:


可以看到,这样的配置似乎有些反直觉,shard 更像是逻辑层面的分组,而无论是副本还是分片,它们的载体都是 replica,所以从某种角度来看,副本也是分片。

关于这方面的详细介绍会在后续展开,现在先回到之前的话题。由于 Distributed 表引擎需要读取集群的信息,所以首先必须为 ClickHouse 添加集群的配置。找到前面在介绍 ZooKeeper 配置时增加的 metrika.xml 配置文件,将其加入集群的配置信息。

集群有两种配置形式,下面分别介绍。

不包含副本的分片
如果直接使用 node 标签定义分片节点,那么该集群将只包含分片,不包含副本。以下面的配置为例:

3 分片、0 副本

该配置定义了一个名为 shard_2 的集群,其包含了 2 个分片节点,它们分别指向了是 CH5 和 CH6 服务器。现在分别对配置项进行说明:
❑ shard_2 表示自定义的集群名称,全局唯一,是后续引用集群配置的唯一标识。在一个配置文件内,可以定义任意组集群。
❑ node 用于定义分片节点,不包含副本。
❑ host 指定部署了 ClickHouse 节点的服务器地址。
❑ port 指定 ClickHouse 服务的 TCP 端口。接下来介绍选填参数:
❑ weight 分片权重默认为 1,在后续小节中会对其详细介绍。
❑ user 为 ClickHouse 用户,默认为 default。
❑ password 为 ClickHouse 的用户密码,默认为空字符串。
❑ secure 为 SSL 连接的端口,默认为 9440。
❑ compression 表示是否开启数据压缩功能,默认为 true。

自定义分片与副本
集群配置支持自定义分片和副本的数量,这种形式需要使用 shard 标签代替先前的 node,除此之外的配置完全相同。在这种自定义配置的方式下,分片和副本的数量完全交由配置者掌控。其中,shard 表示逻辑上的数据分片,而物理上的分片则用 replica 表示。如果在 1 个 shard 标签下定义 N(N >= 1)组 replica,则该 shard 的语义表示 1 个分片和 N - 1 个副本。接下来用几组配置示例进行说明。

不包含副本的分片
下面所示的这组集群配置的效果与先前介绍的 shard_2 集群相同:

sharding_simple 集群的语义为 2 分片、0 副本(1 分片、0 副本,再加上 1 分片、0 副本)。

N 个分片和 N 个副本
这种形式可以按照实际需求自由组合,例如下面的这组配置,集群 sharding_simple_1 拥有 1 个分片和 1 个副本:

下面所示集群 sharding_ha 拥有 2 个分片,而每个分片拥有 1 个副本:

从上面的配置信息中能够得出结论,集群中 replica 数量的上限是由 ClickHouse 节点的数量决定的,例如为了部署集群 sharding_ha,需要 4 个 ClickHouse 服务节点作为支撑。
在完成上述配置之后,可以查询系统表验证集群配置是否已被加载:

SELECT cluster,host_name FROM  system.clusters;


不用重启集群,自动加载

基于集群实现分布式 DDL

不知道大家是否还记得,在前面介绍数据副本时为了创建多张副本表,我们需要分别登录到每个 ClickHouse 节点,在它们本地执行各自的 CREATE 语句。这是因为在默认的情况下,CREATE、DROP、RENAME 和 ALTER 等 DDL 语句并不支持分布式执行。而在加入集群配置后,就可以使用新的语法实现分布式 DDL 执行了,其语法形式如下:

其中,cluster_name 对应了配置文件中的集群名称,ClickHouse 会根据集群的配置信息顺藤摸瓜,分别去各个节点执行 DDL 语句。

在执行了上述语句之后,ClickHouse 会根据集群 shard_2 的配置信息,分别在 CH5 和 CH6 节点本地创建 test_1_local。
如果要删除 test_1_local,则执行下面的分布式 DROP:

值得注意的是,在改写的 CREATE 语句中,用 {shard} 和{replica}两个动态宏变量代替了先前的硬编码方式。执行下面的语句查询系统表,能够看到当前 ClickHouse 节点中已存在的宏变量:

这些宏变量是通过配置文件的形式预先定义在各个节点的配置文件中的,配置文件如下所示。
在 CH5 节点的 config.xml 配置中预先定义了分区 01 的宏变量:

在 CH6 节点的 config.xml 配置中预先定义了分区 02 的宏变量:

数据结构
与 ReplicatedMergeTree 类似,分布式 DDL 语句在执行的过程中也需要借助 ZooKeeper 的协同能力,以实现日志分发。

ZooKeeper 内的节点结构
在默认情况下,分布式 DDL 在 ZooKeeper 内使用的根路径为:

该路径由 config.xml 内的 distributed_ddl 配置指定:

在此根路径之下,还有一些其他的监听节点,其中包括 /query-[seq],其是 DDL 操作日志,每执行一次分布式 DDL 查询,在该节点下就会新增一条操作日志,以记录相应的操作指令。当各个节点监听到有新日志加入的时候,便会响应执行。DDL 操作日志使用 ZooKeeper 的持久顺序型节点,每条指令的名称以 query- 为前缀,后面的序号递增,例如 query-0000000000、query-0000000001 等。在每条 query-[seq]操作日志之下,还有两个状态节点:
(1)/query-[seq]/active:用于状态监控等用途,在任务的执行过程中,在该节点下会临时保存当前集群内状态为 active 的节点。(2)/query-[seq]/finished:用于检查任务完成情况,在任务的执行过程中,每当集群内的某个 host 节点执行完毕之后,便会在该节点下写入记录。例如下面的语句。

上述语句表示集群内的 CH5 和 CH6 两个节点已完成任务。

DDLLogEntry 日志对象的数据结构
在 /query-[seq]下记录的日志信息由 DDLLogEntry 承载,它拥有如下几个核心属性:
query 记录了 DDL 查询的执行语句,例如:

hosts 记录了指定集群的 hosts 主机列表,集群由分布式 DDL 语句中的 ONCLUSTER 指定,例如:

在分布式 DDL 的执行过程中,会根据 hosts 列表逐个判断它们的执行状态。

initiator 记录初始化 host 主机的名称,hosts 主机列表的取值来自于初始化 host 节点上的集群,例如:

hosts 主机列表的取值来源等同于下面的查询:

分布式 DDL 的核心执行流程

与副本协同的核心流程类似,接下来,就以 10.4.2 节中介绍的创建 test_1_local 的过程为例,解释分布式 DDL 的核心执行流程。整个流程如图所示。

整个流程从上至下按照时间顺序进行,其大致分成 3 个步骤。现在,根据图所示编号讲解整个过程。

(1)推送 DDL 日志:首先在 CH5 节点执行 CREATE TABLE ON CLUSTER,本着谁执行谁负责的原则,在这个案例中将会由 CH5 节点负责创建 DDLLogEntry 日志并将日志推送到 ZooKeeper,同时也会由这个节点负责监控任务的执行进度。

(2)拉取日志并执行:CH5 和 CH6 两个节点分别监听 /ddl/query-0000000064 日志的推送,于是它们分别拉取日志到本地。首先,它们会判断各自的 host 是否被包含在 DDLLog-Entry 的 hosts 列表中。如果包含在内,则进入执行流程,执行完毕后将状态写入 finished 节点;如果不包含,则忽略这次日志的推送。

(3)确认执行进度:在步骤 1 执行 DDL 语句之后,客户端会阻塞等待 180 秒,以期望所有 host 执行完毕。如果等待时间大于 180 秒,则会转入后台线程继续等待(等待时间由 distributed_ddl_task_timeout 参数指定,默认为 180 秒)。

Distributed 原理解析

Distributed 表引擎是分布式表的代名词,它自身不存储任何数据,而是作为数据分片的透明代理,能够自动路由数据至集群中的各个节点,所以 Distributed 表引擎需要和其他数据表引擎一起协同工作。

从实体表层面来看,一张分片表由两部分组成:

❑ 本地表:通常以_local 为后缀进行命名。本地表是承接数据的载体,可以使用非 Distributed 的任意表引擎,一张本地表对应了一个数据分片。
❑ 分布式表:通常以_all 为后缀进行命名。分布式表只能使用 Distributed 表引擎,它与本地表形成一对多的映射关系,日后将通过分布式表代理操作多张本地表。

对于分布式表与本地表之间表结构的一致性检查,Distributed 表引擎采用了读时检查的机制,这意味着如果它们的表结构不兼容,只有在查询时才会抛出错误,而在创建表时并不会进行检查。不同 ClickHouse 节点上的本地表之间,使用不同的表引擎也是可行的,但是通常不建议这么做,保持它们的结构一致,有利于后期的维护并避免造成不可预计的错误。

定义形式

Distributed 表引擎的定义形式如下所示:

ENGINE = Distributed(cluster,database,table,[,sharding_key])

其中,各个参数的含义分别如下:
❑ cluster:集群名称,与集群配置中的自定义名称相对应。在对分布式表执行写入和查询的过程中,它会使用集群的配置信息来找到相应的 host 节点。
❑ database 和 table:分别对应数据库和表的名称,分布式表使用这组配置映射到本地表。
❑ sharding_key:分片键,选填参数。在数据写入的过程中,分布式表会依据分片键的规则,将数据分布到各个 host 节点的本地表。

现在用示例说明 Distributed 表的声明方式,建表语句如下所示:

CREATE TABLE test_shard_2_all ON CLUSTER shard_2_0 (id UInt64) ENGINE = Distributed(shard_2_0,default,test_shard_2_local,intHash64(id));

上述表引擎参数的语义可以理解为,代理的本地表为 default.test_shard_2_local,它们分布在集群 sharding_simple 的各个 shard,在数据写入时会根据 rand()随机函数的取值决定数据写入哪个分片。值得注意的是,此时此刻本地表还未创建,所以从这里也能看出,Distributed 表运用的是读时检查的机制,对创建分布式表和本地表的顺序并没有强制要求。同样值得注意的是,在上面的语句中使用了 ONCLUSTER 分布式 DDL,这意味着在集群的每个分片节点上,都会创建一张 Distributed 表,如此一来便可以从其中任意一端发起对所有分片的读、写请求。

接着需要创建本地表,一张本地表代表着一个数据分片。这里同样可以利用先前已经配置好的集群配置,使用分布式 DDL 语句迅速的在各个节点创建相应的本地表:

CREATE TABLE test_shard_2_local ON CLUSTER  shard_2_0(id UInt64) ENGINE = MergeTree()
ORDER BY id 
PARTITION BY id ;

至此,拥有两个数据分片的分布式表 test_shard_2 就建好了。

查询的分类

Distributed 表的查询操作可以分为如下几类:

❑ 会作用于本地表的查询:对于 INSERT 和 SELECT 查询,Distributed 将会以分布式的方式作用于 local 本地表。而对于这些查询的具体执行逻辑,将会在后续小节介绍。

❑ 只会影响 Distributed 自身,不会作用于本地表的查询:Distributed 支持部分元数据操作,包括 CREATE、DROP、RENAME 和 ALTER,其中 ALTER 并不包括分区的操作(ATTACH PARTITION、REPLACE PARTITION 等)。这些查询只会修改 Distributed 表自身,并不会修改 local 本地表。例如要彻底删除一张分布式表,则需要分别删除分布式表和本地表,示例如下。

DROP TABLE test_shard_2_all ON CLUSTER shard_2_0;
DROP TABLE test_shard_2_local ON CLUSTER shard_2_0;

❑ 不支持的查询:Distributed 表不支持任何 MUTATION 类型的操作,包括 ALTERDELETE 和 ALTER UPDATE。

分片规则
关于分片的规则这里将做进一步的展开说明。分片键要求返回一个整型类型的取值,包括 Int 系列和 UInt 系列。例如分片键可以是一个具体的整型列字段:

Distributed(cluster,database,table,userid)

也可以是一个返回整型的表达式:

-- 按照随机数划分
Distributed(cluster,database,table,rand());
-- 按照用户 id 的散列值划分
Distributed(cluster,database,table,intHash64(userid));

如果不声明分片键,那么分布式表只能包含一个分片,这意味着只能映射一张本地表,否则,在写入数据时将会得到如下异常:

如果一张分布式表只包含一个分片,那就意味着其失去了使用的意义了。所以虽然分片键是选填参数,但是通常都会按照业务规则进行设置。那么数据具体是如何被划分的呢?想要讲清楚这部分逻辑,首先需要明确几个概念。

分片权重(weight)
在集群的配置中,有一项 weight(分片权重)的设置:

weight 默认为 1,虽然可以将它设置成任意整数,但官方建议应该尽可能设置成较小的值。分片权重会影响数据在分片中的倾斜程度,一个分片权重值越大,那么它被写入的数据就会越多。

slot(槽)
slot 可以理解成许多小的水槽,如果把数据比作是水的话,那么数据之水会顺着这些水槽流进每个数据分片。slot 的数量等于所有分片的权重之和,假设集群 sharding_simple 有两个 Shard 分片,第一个分片的 weight 为 10,第二个分片的 weight 为 20,那么 slot 的数量则等于 30。slot 按照权重元素的取值区间,与对应的分片形成映射关系。在这个示例中,如果 slot 值落在 [0, 10) 区间,则对应第一个分片;如果 slot 值落在 [10, 20] 区间,则对应第二个分片。

选择函数

选择函数用于判断一行待写入的数据应该被写入哪个分片,整个判断过程大致分成两个步骤:

(1)它会找出 slot 的取值,其计算公式如下:

slot = shard_value % sum_weight 

其中,shard_value 是分片键的取值;sum_weight 是所有分片的权重之和;slot 等于 shard_value 和 sum_weight 的余数。假设某一行数据的 shard_value 是 10,sum_weight 是 30(两个分片,第一个分片权重为 10,第二个分片权重为 20),那么 slot 值等于 10(10%30 = 10)。

(2)基于 slot 值找到对应的数据分片。当 slot 值等于 10 的时候,它属于 [10, 20) 区间,所以这行数据会对应到第二个 Shard 分片。

整个过程的示意如图所示。

分布式写入的核心流程
在向集群内的分片写入数据时,通常有两种思路:一种是借助外部计算系统,事先将数据均匀分片,再借由计算系统直接将数据写入 ClickHouse 集群的各个本地表,如图所示。

上述这种方案通常拥有更好的写入性能,因为分片数据是被并行点对点写入的。但是这种方案的实现主要依赖于外部系统,而不在于 ClickHouse 自身,所以这里主要会介绍第二种思路。第二种思路是通过 Distributed 表引擎代理写入分片数据的,接下来开始介绍数据写入的核心流程。为了便于理解整个过程,这里会将分片写入、副本复制拆分成两个部分进行讲解。在讲解过程中,会使用两个特殊的集群分别进行演示:
第一个集群拥有 2 个分片和 0 个副本,通过这个示例向大家讲解分片写入的核心流程;
第二个集群拥有 1 个分片和 1 个副本,通过这个示例向大家讲解副本复制的核心流程。

将数据写入分片的核心流程

在对 Distributed 表执行 INSERT 查询的时候,会进入数据写入分片的执行逻辑,它的核心流程如图所示。



在这个流程中,继续使用集群 sharding_simple 的示例,该集群由 2 个分片和 0 个副本组成。整个流程从上至下按照时间顺序进行,其大致分成 5 个步骤。现在根据图所示编号讲解整个过程。

在第一个分片节点写入本地分片数据
首先在 CH5 节点,对分布式表 test_shard_2_all 执行 INSERT 查询,尝试写入 10、30、200 和 55 四行数据。执行之后分布式表主要会做两件事情:第一,根据分片规则划分数据,在这个示例中,30 会归至分片 1,而 10、200 和 55 则会归至分片 2;第二,将属于当前分片的数据直接写入本地表 test_shard_2_local。

第一个分片建立远端连接,准备发送远端分片数据
将归至远端分片的数据以分区为单位,分别写入 test_shard_2_all 存储目录下的临时 bin 文件,数据文件的命名规则如下:

由于在这个示例中只有一个远端分片 CH6,所以它的临时数据文件如下所示:

10、200 和 55 三行数据会被写入上述这个临时数据文件。接着,会尝试与远端 CH6 分片建立连接:

第一个分片向远端分片发送数据
此时,会有另一组监听任务负责监听 /test_shard_2_all 目录下的文件变化,这些任务负责将目录数据发送至远端分片:


其中,每份目录将会由独立的线程负责发送,数据在传输之前会被压缩。

第二个分片接收数据并写入本地
CH6 分片节点确认建立与 CH5 的连接:

在接收到来自 CH5 发送的数据后,将它们写入本地表:

由第一个分片确认完成写入
最后,还是由 CH5 分片确认所有的数据发送完毕:

至此,整个流程结束。
可以看到,在整个过程中,Distributed 表负责所有分片的写入工作。本着谁执行谁负责的原则,在这个示例中,由 CH5 节点的分布式表负责切分数据,并向所有其他分片节点发送数据。

在由 Distributed 表负责向远端分片发送数据时,有异步写和同步写两种模式:如果是异步写,则在 Distributed 表写完本地分片之后,INSERT 查询就会返回成功写入的信息;如果是同步写,则在执行 INSERT 查询之后,会等待所有分片完成写入。使用何种模式由 insert_distributed_sync 参数控制,默认为 false,即异步写。如果将其设置为 true,则可以一进步通过 insert_distributed_timeout 参数控制同步等待的超时时间。

副本复制数据的核心流程

如果在集群的配置中包含了副本,那么除了刚才的分片写入流程之外,还会触发副本数据的复制流程。数据在多个副本之间,有两种复制实现方式:一种是继续借助 Distributed 表引擎,由它将数据写入副本;另一种则是借助 ReplicatedMergeTree 表引擎实现副本数据的分发。两种方式的区别如图所示。

通过 Distributed 复制数据

在这种实现方式下,即使本地表不使用 ReplicatedMergeTree 表引擎,也能实现数据副本的功能。Distributed 会同时负责分片和副本的数据写入工作,而副本数据的写入流程与分片逻辑相同。现在用一个简单示例说明。首先让我们再重温一下集群 sharding_simple_1 的配置,它的配置如下:

现在,尝试在这个集群内创建数据表,首先创建本地表:

接着创建 Distributed 分布式表:

之后,向 Distributed 表写入数据,它会负责将数据写入集群内的每个 replica。

细心的朋友应该能够发现,在这种实现方案下,Distributed 节点需要同时负责分片和副本的数据写入工作,它很有可能会成为写入的单点瓶颈,所以就有了接下来将要说明的第二种方案。

通过 ReplicatedMergeTree 复制数据

如果在集群的 shard 配置中增加 internal_replication 参数并将其设置为 true(默认为 false),那么 Distributed 表在该 shard 中只会选择一个合适的 replica 并对其写入数据。此时,如果使用 ReplicatedMergeTree 作为本地表的引擎,则在该 shard 内,多个 replica 副本之间的数据复制会交由 ReplicatedMergeTree 自己处理,不再由 Distributed 负责,从而为其减负。

在 shard 中选择 replica 的算法大致如下:首选,在 ClickHouse 的服务节点中,拥有一个全局计数器 errors_count,当服务出现任何异常时,该计数累积加 1;接着,当一个 shard 内拥有多个 replica 时,选择 errors_count 错误最少的那个。

加入 internal_replication 配置后示例如下所示:

关于 Distributed 表引擎如何将数据写入分片;而关于 Replicated-MergeTree 表引擎如何复制分发数据。

分布式查询的核心流程

与数据写入有所不同,在面向集群查询数据的时候,只能通过 Distributed 表引擎实现。当 Distributed 表接收到 SELECT 查询的时候,它会依次查询每个分片的数据,再合并汇总返回。接下来将对数据查询时的重点逻辑进行介绍。

多副本的路由规则

在查询数据的时候,如果集群中的一个 shard,拥有多个 replica,那么 Distributed 表引擎需要面临副本选择的问题。它会使用负载均衡算法从众多 replica 中选择一个,而具体使用何种负载均衡算法,则由 load_balancing 参数控制:

有如下四种负载均衡算法:

1)randomrandom
是默认的负载均衡算法,正如前文所述,在 ClickHouse 的服务节点中,拥有一个全局计数器 errors_count,当服务发生任何异常时,该计数累积加 1。而 random 算法会选择 errors_count 错误数量最少的 replica,如果多个 replica 的 errors_count 计数相同,则在它们之中随机选择一个。

2)nearest_hostnamenearest_hostname
可以看作 random 算法的变种,首先它会选择 errors_count 错误数量最少的 replica,如果多个 replica 的 errors_count 计数相同,则选择集群配置中 host 名称与当前 host 最相似的一个。而相似的规则是以当前 host 名称为基准按字节逐位比较,找出不同字节数最少的一个,例如 CH5-1- 1 和 CH5-1-2.nauu.com 有一个字节不同:

而 CH5-1- 1 和 CH5-2- 2 则有两个字节不同:

3)in_orderin_order 同样可以看作 random 算法的变种,首先它会选择 errors_count 错误数量最少的 replica,如果多个 replica 的 errors_count 计数相同,则按照集群配置中 replica 的定义顺序逐个选择。

4)first_or_randomfirst_or_random 可以看作 in_order 算法的变种,首先它会选择 errors_count 错误数量最少的 replica,如果多个 replica 的 errors_count 计数相同,它首先会选择集群配置中第一个定义的 replica,如果该 replica 不可用,则进一步随机选择一个其他的 replica。

多分片查询的核心流程

分布式查询与分布式写入类似,同样本着谁执行谁负责的原则,它会由接收 SELECT 查询的 Distributed 表,并负责串联起整个过程。首先它会将针对分布式表的 SQL 语句,按照分片数量将查询拆分成若干个针对本地表的子查询,然后向各个分片发起查询,最后再汇总各个分片的返回结果。如果对分布式表按如下方式发起查询:

那么它会将其转为如下形式之后,再发送到远端分片节点来执行:

以 sharding_simple 集群的 test_shard_2_all 为例,假设在 CH5 节点对分布式表发起查询:

那么,Distributed 表引擎会将查询计划转换为多个分片的 UNION 联合查询

整个执行计划从下至上大致分成两个步骤:

查询各个分片数据
在图所示执行计划中,One 和 Remote 步骤是并行执行的,它们分别负责了本地和远端分片的查询动作。其中,在 One 步骤会将 SQL 转换成对本地表的查询:

而在 Remote 步骤中,会建立与 CH6 节点的连接,并向其发起远程查询:

CH6 节点在接收到来自 CH5 的查询请求后,开始在本地执行。同样,SQL 会转换成对本地表的查询:

2)合并返回结果
多个分片数据均查询返回后,按如下方法在 CH5 节点将它们合并:

使用 Global 优化分布式子查询

如果在分布式查询中使用子查询,可能会面临两难的局面。下面来看一个示例。假设有这样一张分布式表 test_query_all,它拥有两个分片,而表内的数据如下所示:

其中,id 代表用户的编号,repo 代表仓库的编号。如果现在有一项查询需求,要求找到同时拥有两个仓库的用户,应该如何实现?对于这类交集查询的需求,可以使用 IN 子查询,此时你会面临两难的选择:IN 查询的子句应该使用本地表还是分布式表?(使用 JOIN 面临的情形与 IN 类似)。

1)使用本地表的问题
如果在 IN 查询中使用本地表,例如下面的语句:

那么你会发现返回的结果是错误的。这是为什么呢?这是因为分布式表在接收到查询之后,会将上述 SQL 替换成本地表的形式,再发送到每个分片进行执行:

注意,IN 查询的子句使用的是本地表:

由于在单个分片上只保存了部分的数据,所以该 SQL 语句没有匹配到任何数据

从上图中可以看到,单独在分片 1 或分片 2 内均无法找到 repo 同时等于 100 和 200 的数据。

使用分布式表的问题
为了解决返回结果错误的问题,现在尝试在 IN 查询子句中使用分布式表:


这次返回了正确的查询结果。那是否意味着使用这种方案就万无一失了呢?通过进一步观察执行日志会发现,情况并非如此,该查询的请求被放大了两倍。

这是由于在 IN 查询子句中,同样也使用了分布式表查询:

所以在 CH6 节点接收到这条 SQL 之后,它将再次向其他分片发起远程查询

因此可以得出结论,在 IN 查询子句使用分布式表的时候,查询请求会被放大 N 的平方倍,其中 N 等于集群内分片节点的数量,假如集群内有 10 个分片节点,则在一次查询的过程中,会最终导致 100 次的查询请求,这显然是不可接受的。

使用 GLOBAL 优化查询

为了解决查询放大的问题,可以使用 GLOBAL IN 或 JOIN 进行优化。现在对刚才的 SQL 进行改造,为其增加 GLOBAL 修饰符:

再次分析查询的核心过程,如图所示。

整个过程由上至下大致分成 5 个步骤:
(1)将 IN 子句单独提出,发起了一次分布式查询。
(2)将分布式表转 local 本地表后,分别在本地和远端分片执行查询。(3)将 IN 子句查询的结果进行汇总,并放入一张临时的内存表进行保存。(4)将内存表发送到远端分片节点。
(5)将分布式表转为本地表后,开始执行完整的 SQL 语句,IN 子句直接使用临时内存表的数据。

至此,整个核心流程结束。可以看到,在使用 GLOBAL 修饰符之后,ClickHouse 使用内存表临时保存了 IN 子句查询到的数据,并将其发送到远端分片节点,以此到达了数据共享的目的,从而避免了查询放大的问题。由于数据会在网络间分发,所以需要特别注意临时表的大小,IN 或者 JOIN 子句返回的数据不宜过大。如果表内存在重复数据,也可以事先在子句 SQL 中增加 DISTINCT 以实现去重。

正文完
 0