关于clickhouse:字节跳动数据平台技术揭秘基于-ClickHouse-的复杂查询实现与优化

16次阅读

共计 8291 个字符,预计需要花费 21 分钟才能阅读完成。

更多技术交换、求职机会、试用福利,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群

ClickHouse 作为目前业内支流的列式存储数据库 (DBMS) 之一,领有着同类型 DBMS 难以企及的查问速度。作为该畛域中的后起之秀,ClickHouse 已凭借其性能劣势引领了业内新一轮剖析型数据库的热潮。但随着企业业务数据量的不断扩大,在简单 query 场景下,ClickHouse 容易存在查问异样问题,影响业务失常推动。

字节跳动作为国内最大规模的 ClickHouse 使用者,在对 ClickHouse 的利用与优化过程中积攒了大量技术教训。本文将分享字节跳动解决 ClickHouse 简单查问问题的优化思路与技术细节。

我的项目背景

ClickHouse 的执行模式与 Druid、ES 等大数据引擎相似,其根本的查问模式可分为两个阶段。第一阶段,Coordinator 在收到查问后,将申请发送给对应的 Worker 节点。第二阶段,Worker 节点实现计算,Coordinator 在收到各 Worker 节点的数据后进行汇聚和解决,并将解决后的后果返回。

两阶段的执行模式可能较为高效地反对目前许多常见的业务场景,例如各类大宽表单的查问,这也是 ClickHouse 最善于的场景。ClickHouse 的长处是简略、高效,通常来说,简略就意味着高效。但随着企业业务的继续倒退,更加简单的业务场景对 ClickHouse 提出了以下三类挑战。

第一类,当一阶段返回的数据较多,且二阶段计算较为简单时,Coordinator 会接受较大压力,容易成为 Query 的瓶颈。例如一些重计算的 Agg 算子,如 Count Distinct,若采纳哈希表的形式进行去重,第二阶段需在 Coordinator 单机下来合并各个 Worker 的哈希表。这个计算量会很重且无奈并行。

第二类,因为目前 ClickHouse 模式并不反对 Shuffle,因而对于 Join 而言,右表必须为全量数据。无论是一般 Join 还是 Global Join,当右表的数据量较大时,若将数据都放到内存中,会比拟容易 OOM。若将数据 spill 到磁盘,尽管能够解决内存问题,但因为有磁盘 IO 和数据序列化、反序列化的代价,因而查问的性能会受到影响。特地是当 Join 采纳 Hash Join 时,如果右表是一张大表,构建也会比较慢。针对构建问题,近期社区也进行了一些右表并行构建的优化,数据依照 Join key 进行 Split 来并行地构建多个 Hash Table,但额定的代价是左右表都须要减少一次 Split 操作。

第三类,则是对于简单查问(如多表 Join、嵌套多个子查问、window function 等),ClickHouse 对这类需要场景的反对并不是特地敌对,因为 ClickHouse 并不能通过 Shuffle 来扩散数据减少执行并行度,并且其生成的 Pipeline 在一些 case 下并不能充沛并行。因而在某些场景下,难以施展集群的全副资源。

随着企业业务复杂度的一直晋升,简单查问,特地是有多轮的分布式 Join,且有很多 agg 的计算的需要会越来越强烈。在这种状况下,业务并不心愿所有的 Query 都依照 ClickHouse 善于的模式进行,即通过上游数据 ETL 来产生大宽表。这样做对 ETL 的老本较大,并且可能会有一些数据冗余。

企业的集群资源是无限的,但整体的数据量会持续增长,因而在这种状况下,咱们心愿可能充沛地去利用机器的资源,来应答这种越来越简单的业务场景和 SQL。所以咱们的指标是基于 ClickHouse 可能高效反对简单查问。

技术计划

对于 ClickHouse 简单查问的实现,咱们采纳了分 Stage 的执行形式,来替换掉目前 ClickHouse 的两阶段执行形式。相似于其余的分布式数据库引擎,例如 Presto 等,会将一个简单的 Query 按数据交换状况切分成多个 Stage,各 Stage 之间则通过 Exchange 实现数据交换。Stage 之间的数据交换次要有以下三种模式。

依照单个或者多个 key 进行 Shuffle

将单个或者多个节点的数据汇聚到一个节点上,称为 Gather

将同一份数据复制到多个节点上,称为 Broadcast 或播送

对于单个 Stage 执行,持续复用 ClickHouse 目前底层的执行形式。开发上依照不同性能切分不同模块。各个模块预约接口,缩小彼此的依赖与耦合。即便模块产生变动或外部逻辑调整,也不会影响其余模块。其次,对模块采纳插件架构,容许模块依照灵便配置反对不同的策略。这样便可能依据不同业务场景实现不同的策略。

首先,当 Coordinator 承受简单的查问当前,它会在以后的语法树的根底上,依据节点类型和数据分布状况,插入 Exchange 节点,并生成一个分布式 Plan。其次,Coordinator 节点会依据 ExchangeNode 类型切分 Plan,并生成每个 Stage 执行打算片段。

接着,Coordinator 节点会调用 SegmentScheduler 调度器,将各 Stage 的 PlanSegment 发送给 Worker 节点。当 Worker 接管到 PlanSegment 后,InterpreterPlanSegment 会实现数据的读取和执行,通过 ExchangeManager 实现数据的交互。最初,Coordinator 从最初一轮 Stage 所对应的 ExchangeManager 中去读取数据,并返回给 Client。

查问片段调度器 SegmentScheduler 负责调度查问不同的 PlanSegment,依据上下游依赖关系和数据分布,以及 Stage 并行度和 worker 散布和状态信息,依照肯定的调度策略,将 PlanSemgent 发给不同的 Worker 节点。

目前而言,咱们在进行打算下发和调度时,次要实现了两种策略。

第一种是依赖调度,依据 Stage 依赖关系定义拓扑构造,产生 DAG 图,并依据 DAG 图调度 Stage。依赖调度要等到依赖 Stage 启动当前,才会调度对应的 Stage。例如两表 Join,会先调度左右表读取 Stage,之后再调度 Join 这个 Stage,因为 Join 的 Stage 依赖于左右表的 Stage。

第二种是 AllAtOnce 策略,先计算每个 Stage 的相干信息,后一次性调度所有 Stage。

相比而言,这两种策略是在容错、资源应用和延时下来做取舍。第一种策略依赖调度,能够实现更好的容错。因为 ClickHouse 数据能够有多个正本,读数据时,如局部节点连贯失败,能够尝试它的正本节点。对后续依赖的节点的 Stage 来说,并不需要感知到后面 Stage 的执行状况。非 Source Stage,自身没有对数据的依赖,所以容错能力会更强,只有保障 Stage 并行度的节点存活即可。甚至极其状况下,如需保障 Query 失常执行,也能够升高 Stage 的并行度。但调度存在依赖关系,并不能齐全并行,会减少调度的时长。Stage 较多的状况下,调度延时可能会占据 SQL 整体不小的比例。针对上述问题的可做如下优化:对于一些没有依赖关系的,尽可能反对并行。例如同一个 Stage 的不同节点,能够并行。没有依赖关系的 Stage,也能够并行。

第二种调度策略是 AllAtOnce,通过并行能够极大升高调度延时。为防止出现大量网络 IO 线程,能够通过异步化伎俩控制线程数目。AllAtOnce 策略的毛病是容错性没有依赖调度好,每一个 Stage 的 Worker 在调度前就曾经确定了,调度过程中有一个 Worker 呈现连贯异样,则整个 Query 都会失败。另一类状况,Stage 在上游数据还没有 ready,就被调度起来了,则须要较长时间等数据。例如 Final 的 agg Stage,要等 Partial agg 实现当前才可能拿到对应的数据。尽管咱们也对此进行了一些优化,并不会长时间空跑,节约 CPU 资源。然而其实也耗费了一部分资源,例如须要去创立这些执行的线程。

ClickHouse 的查问节点执行次要是以 SQL 模式在节点间相互交互。在切分 Stage 后,咱们须要反对可能执行一个独自的 PlanSegment 的执行打算。因而,InterpreterPlanSegment 次要的作用就是承受一个序列化后的 PlanSegment,可能在 Worker 节点下来运行整个 PlanSegment 的逻辑。此外,咱们也进行了性能和性能上的加强,例如反对一个 Stage 解决多个 Join,这样便能够缩小 Stage 的数目和一些不必要的传输,用一个 Stage 就能够实现整个 Join 的过程。InterpreterPlanSegment 的执行会上报对应的状态信息,如呈现执行异样,会将异样信息报告给查问片段调度器,调度器会勾销 Query 其余的 Stage 的 Worker 执行。

ExchangeManager 是 PlanSegment 数据交换的媒介,能均衡数据上下游解决的能力。整体而言,咱们的设计采纳 Push 与队列的形式,当上游的数据 ready 时,被动推送给上游,并在这个根底上反对了反压的能力。

在整个流程中,上下游都会通过队列来优化发送和读取,上游与上游会有一个本人的队列。当队列饱和的时候,会通过相似反压的机制来管制上游这个执行速度,若上游计算快,上游解决能力比较慢,呈现上游解决不过去的状况,则会通过反压的形式来管制上游执行的速度。

因为采纳 push 和队列,因而要思考一个绝对比拟非凡的场景,在某些 case 的状况下,上游的 Stage 并不需要读取全副的上游的数据。例如 Limit100,上游只需读取 100 条数据,而上游可能会产生十分大规模的数据。因而在这种状况下,当上游的 Stage 读取到足够的数据后,它须要可能被动勾销上游 Stage 的执行,并且清空队列。

ExchangeManager 思考的优化点较多,例如细粒度的内存管制,可能依照实例、Query、Segment 等多个档次进行内存管制,防止 OOM。更长期的思考是在一些对提早要求不高、数据量大的场景,通过将数据 Spill 到磁盘,升高内存的应用

第二,为了晋升传输效率,小数据要做 Merge,大数据要做 Split。同时,在网络传输和解决某些场景的时候,须要做一种有序性的保障。例如在 Sort 的场景,Partial Sort 和 Merge Sort 的网络传输过程必须要保障是有序的,传输数据不能呈现乱序的状况,否则进行 Merge Sort 时数据就会出问题,并影响最终后果。

第三,连贯的复用和网络的优化,包含上下游在同一个节点,尽可能走内存替换,而不走网络。这样能够缩小网络开销以及数据的序列化和反序列化的代价。此外,ClickHouse 在计算上做了十分短缺的优化,因而其在某些场景中,内存带宽会成为瓶颈,在 ExchangeManager 的一些场景中,能够用一些零拷贝和其余优化,尽量减少内存的拷贝。

第四,异样解决和监控。相比于单机,分布式状况下异常情况会更加简单,且更加难以感知。通过重试可能防止一些节点短时性的高负载或者异样对查问的影响。做好监控,在出问题的时候,能疾速感知,并进行排查,也可能针对性地去做优化。

优化与诊断

首先是 Join 的多种实现和优化。依据数据的规模和散布,能够依据不同的场景去抉择适合的 Join 的实现形式:

Shuffle Join,是目前应用形式最多,也是最常见的。

Broadcast Join,大表 Join 小表场景,将右表播送到左表的所有 Worker 节点下面,这样能够防止左表大表的数据传输。

Colocate Join,如果左右表都已依照 Join key 散布,并且它们是相通的散布的话,其实不须要去做数据的 exchange,能够将数据的传输减到最小。

网络连接的优化,外围实质是缩小连贯的建设和应用,特地是在数据须要 Shuffle 时,下一轮 Stage 中的每一个节点都要从上游的 Stage 中的每个节点去拉取数据。若集群整体的节点数较多,且存在很多较简单的 Query,就会建设十分多的连贯。

目前在字节外部,ClickHouse 集群的规模十分大,在以后 ClickHouse 二阶段执行的高并发状况下,单机最大可能会建设几万个连贯。因而必须要进行网络连接的优化,特地是反对连贯的复用,每个连贯上能够跑多个 Stage 查问。通过尽可能去复用连贯,在不同的节点之间,可能建设固定数目的连贯,不同的 Query、Stage 都会复用这些连贯,连接数并不会随着 Query 和 Stage 的规模的增长而增长。

网络传输优化,在数据中心内,近程的间接的内存拜访,通常指 RDMA,是一种可能超过近程主机操作系统的内核,去拜访内存里的数据的技术。因为这种技术不须要通过操作系统,所以不仅节俭了大量的 CPU 资源,同样也晋升了零碎吞吐量,升高了零碎的网络通信提早,尤其适宜大规模并行的计算机集群。因为 ClickHouse 在计算层面做了很多优化,而网络带宽相比于内存带宽要小不少,在一些数据量传输特地大的场景,网络传输会成为肯定的瓶颈。为了晋升网络传输的效率和晋升数据 exchange 的吞吐,一方面能够引入压缩来升高传输数据量,另一方面能够引入 RDMA 来缩小肯定的开销。通过测试,在一些数据传输量大的场景,有不小的收益。

利用 Runtime Filter 的优化在不少数据库也有应用。Join 的算子通常是 OLAP 引擎里最耗时的算子,优化 Join 算子有两种思路。一种思路是能够晋升 Join 算子的性能。比方对于 HashJoin,能够优化 HashTable 实现,也能够实现更好的哈希算法,包含做一些更好的并行的形式。

另一种思路是,如果自身算子耗时比拟重,能够缩小参加算子计算的数据。Runtime Filter 是在一些场景下特地是事实表 Join 多张维度表的星型模型场景有比拟好的成果。在此类场景下,通常事实表的规模会十分大,而大部分的过滤条件都是在维度表下面。

Runtime Filter 的作用,是通过在 Join 的 Probe 端,提前过滤掉并不会命中 Join 条件的输出数据,从而大幅缩小 Join 中的数据传输和计算。通过这种形式,可能缩小整体的执行工夫。因而咱们在简单查问上也反对了 Runtime Filter,目前次要反对 Min Max 和 Bloom Filter。

如果 runtime filter 的列(join column)构建了索引(主键、skip index…),是须要从新生成 pipeline 的。因为命中索引后,可能会缩小数据的读取,pipeline 并行度和对应数据的解决 range 都可能发生变化。如果 runtime filter 的列跟索引无关,能够在打算生成的时候事后带上过滤条件,一开始为空,只是占位,runtime filter 下发的时候把占位信息改成真正的过滤条件即可。这样即便 runtime filter 下发超时了,查问片段曾经开始执行,只有查问片段没有执行完,之后的数据依然能够进行过滤。

但须要留神的是,Runtime Filter 是一种非凡场景下的优化,针对场景是右表数据量不大,并且构建的 Runtime Filter 对左表有比拟好的过滤成果。若右表数据量较大,构建的 Runtime Filter 的工夫比拟久,或对左表的数据过滤没有成果。Runtime Filter 反而会减少查问的耗时和计算的开销。因而要依据数据的特色和规模来决定是否开启优化。

性能诊断和剖析对简单查问很要害,因为引入了简单查问的多 Stage 模型,SQL 执行的模式会变得复杂。对此的优化首先是尽可能欠缺各类 Metrics,包含 Query 执行工夫、不同 Stage 执行工夫、起始工夫、完结工夫、解决的 IO 数据量、算子解决的数据、执行状况,以及各类的算子 Metrics 和一些 Profile Events(例如 Runtime Filter 会有构建工夫、过滤数据量等 Metrics)。

其次,咱们记录了反压信息与上下游的队列长度,以此推断 Stage 的执行状况和瓶颈。

通常能够有如下判断:

输出和输入队列数目同为低或同为高别离表明以后 stage 解决失常或处于被上游反压,此时能够通过反压信息来进一步判断

当输出和输入队列数目不一样,这可能是出于反压传导的中间状态或者该 stage 就是反压的本源

如果一个 stage 的输入队列数目很多,且常常被反压,通常是被上游 stage 所影响,所以能够排除它自身是反压本源的可能性,更多关注它的上游

如果一个 stage 的输入队列数目很少,但其输出队列的数目很高,则表明它有可能是反压的本源。优化指标是晋升这个 stage 的解决能力。

总的来说,SQL 的场景无所不包,非常复杂的场景有时还是须要对引擎有肯定理解的同学去诊断和剖析,给出优化倡议。字节目前也在不断完善这些教训,心愿可能通过不断完善 Metrics 和剖析的门路,继续加重 Oncall 的累赘,在某些场景下可能更加精确地给出优化倡议。

成果与瞻望

根据上述所提,目前执行模型存在三个毛病,咱们进行了简单查问的优化,因而须要验证这种新的模式是否可能解决发现的问题,测试场景如下:

第二阶段计算较简单,且第一阶段数据较多

Hash Join 右表是大表

多表 Join,模仿简单 Query

以 SSB 1T 数据作为数据集,环境则是构建了 8 个节点的集群。

Case1——二阶段计算简单。咱们看到有一个比拟重的计算算子 UniqExact,就是 count distinct 的计算形式,通过 Hash 表做去重。count distinct 默认采纳这种算法,当咱们应用简单查问后,Query 的执行工夫从 8.5 秒缩小到 2.198 秒。第二阶段 agg uniqExact 算子的合并本来由 coordinator 单点合并,当初通过依照 group by key shuffle 后能够由多个节点并行实现。因而通过 shuffle 加重了 coordinator 的 merge agg 压力。

Case2——右表为大表。因为 ClickHouse 对多表的优化做的还不是很到位。这里采纳子查问来下推过滤的条件。在这个 case 中,Lineorder 是一张大表,采纳简单查问的模式当前,Query 执行工夫从 17 秒优化到了 1.7 秒。因为 Lineorder 是一张大表,通过 Shuffle 能够将数据依照 Join key Shuffle 到各 Worker 节点上,这样就缩小了右表构建的压力。

Case3——多表 Join。开启简单查问后,Query 的执行工夫从 8.58 秒优化到 4.464 秒,所有的右表都能够同时开始数据的解决和构建。为了和现有模式做比照,简单查问这里并没有开启 runtime filter,开启 runtime filter 后成果会更好。

事实上,优化器对简单查问的性能晋升也十分大,通过一些 RBO 的规定,例如常见的谓词下推、相干子查问的解决等,能够极大晋升 SQL 的执行效率。在简单查问的模式下,因为有优化器的存在,用户甚至不须要写得非常复杂,优化器主动去实现这些下推和 RBO 规定优化。

此外,抉择用哪一种 Join 的实现,也会对 Join 的性能影响较大。若可能满足 Join Key 散布,应用 Colocate Join 能够缩小左右表 Shuffle 的传输代价。在多表 Join 的状况下,Join 的程序和 Join 的实现形式对执行的时长影响,会比两表 Join 更大。借助这种数据的统计信息,通过一些 CBO 的优化,能够失去一个比拟好的执行模式。

有了优化器,业务同学能够依照业务逻辑来写任何的 SQL,引擎主动计算出绝对最优的 SQL 打算并执行,减速查问的执行。

总结一下,ClickHouse 目前的执行模式在很多单表的场景下体现十分优异,咱们次要针对简单场景做优化,通过实现多 Stage 的模式,实现了 Stage 之间的数据的传输,从工程实际上做了较多尝试和优化,去晋升执行和网络传输的性能。并心愿通过欠缺 Metrics 和智能诊断来升高 SQL 剖析和调优的门槛。目前曾经实现了第一步,将来字节仍有很多致力的方向。

首先,是要持续去晋升执行和 Exchange 的性能。这里不议论引擎执行通用的优化,比方更好的索引或者算子的优化,次要是跟简单查问模式无关。举一个例子,比方 Stage 复用,在 SQL 呈现子查问后果被重复应用的场景,比方一些多表 join 和 CTE 场景可能有帮忙。通过 Stage 复用能够缩小雷同数据的屡次读取。Stage 复用咱们之前就曾经反对,然而用的场景比拟少,将来筹备更灵便和通用。其次,Metrics 和智能诊断增强。SQL 的灵便度很高,因而一些简单查问如果没有 Metrics 其实简直很难去做诊断和调优。以上都是字节跳动数据平台在将来会长期的继续去发力的方向。

立刻跳转火山引擎 ByteHouse 官网理解详情!

正文完
 0