乐趣区

关于大数据:ByteHouse-实时导入技术演进

更多技术交换、求职机会,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群

ByteHouse 是火山引擎上的一款云原生数据仓库,为用户带来极速剖析体验,可能撑持实时数据分析和海量离线数据分析;便捷的弹性扩缩容能力,极致的剖析性能和丰盛的企业级个性,助力客户数字化转型。

本文将从需要动机、技术实现及理论利用等角度,介绍基于不同架构的 ByteHouse 实时导入技术演进。

外部业务的实时导入需要

ByteHouse 实时导入技术的演进动机,起初于字节跳动外部业务的需要。

在字节外部,ByteHouse 次要还是以 Kafka 为实时导入的次要数据源(本文都以 Kafka 导入为例开展形容,下文不再赘述)。对于大部分外部用户而言,其数据体量偏大;所以用户更看重数据导入的性能、服务的稳定性以及导入能力的可扩展性。而对于数据延时性,大多数用户只有是秒级可见就能满足其需要。基于这样的场景,ByteHouse 进行了定制性的优化。

分布式架构下的高可用

社区原生分布式架构

ByteHouse 首先沿用了 Clickhouse 社区的分布式架构,但分布式架构有一些人造性架构层面的缺点,这些痛点次要体现在三个方面:

  • 节点故障:当集群机器数量达到肯定规模当前,根本每周都须要人工解决节点故障。对于单正本集群在某些极其 case 下,节点故障甚至会导致数据失落。
  • 读写抵触:因为分布式架构的读写耦合,当集群负载达到肯定水平当前,用户查问和实时导入就会呈现资源抵触——尤其是 CPU 和 IO,导入就会受到影响,呈现生产 lag。
  • 扩容老本:因为分布式架构数据根本都是本地存储,在扩容当前,数据无奈做 Reshuffle,新扩容的机器简直没有数据,而旧的机器上磁盘可能曾经快写满,造成集群负载不均的状态,导致扩容并不能起到无效的成果。

这些是分布式架构人造的痛点,然而因为其人造的并发个性,以及本地磁盘数据读写的极致性能优化,能够说有利有弊。

社区实时导入设计

  • High-Level 生产模式:依靠 Kafka 本身的 rebalance 机制做生产负载平衡。
  • 两级并发
    基于分布式架构的实时导入外围设计其实就是两级并发:
    一个 CH 集群通常有多个 Shard,每个 Shard 都会并发做生产导入,这就是第一级 Shard 间的多过程并发;每个 Shard 外部还能够应用多个线程并发生产,从而达到很高的性能吞吐。
  • 攒批写入
    就单个线程来说,根本生产模式是攒批写入——生产肯定的数据量,或者肯定工夫之后,再一次性写入。攒批写入能够更好地实现性能优化,查问性能晋升,并升高后盾 Merge 线程的压力。

无奈满足的需要

上述社区的设计与实现,还是无奈满足用户的一些高级需要:

  • 首先局部高级用户对数据的散布有着比拟严格的要求,比方他们对于一些特定的数据有特定的 Key,心愿雷同 key 的数据落盘到同一个 Shard(比方惟一键需要)。这种状况下,社区 High Level 的生产模式是无奈满足的。
  • 其次是 High level 的生产模式 rebalance 不可控,可能最终会导致 Clickhouse 集群中导入的数据在各个 Shard 之间调配不均。
  • 当然,生产工作的调配不可知,在一些生产异样情景下,想要排查问题也变得十分艰难;对于一个企业级利用,这是难以承受的。

自研分布式架构生产引擎 HaKafka

为了解决上述需要,ByteHouse 团队基于分布式架构自研了一种生产引擎——HaKafka。

高可用(Ha)

HaKafka 继承了社区原有 Kafka 表引擎的生产长处,再重点做了高可用的 Ha 优化。

就分布式架构来谈,其实每个 Shard 内可能都会有多个正本,在每个正本上都能够做 HaKafka 表的创立。然而 ByteHouse 只会通过 ZK 选一个 Leader,让 Leader 来真正地执行生产流程,其余节点位于 Stand by 状态。当 Leader 节点不可用了,ZK 能够在秒级将 Leader 切到 Stand by 节点持续生产,从而实现一种高可用。

Low—Level 生产模式

HaKafka 的生产模式从 High Level 调整到了 Low Level 模式。Low Level 模式能够保障 Topic Partition 有序和平均地调配到集群内各个 shard;与此同时,Shard 外部能够再一次用多线程,让每个线程来生产不同 Partition。从而齐全继承了社区 Kafka 表引擎两级并发的长处。

在 Low-Level 生产模式下,上游用户只有在写入 Topic 的时候,保障没有数据歪斜,那么通过 HaKafka 导入到 Clickhouse 里的数据必定也是均匀分布在各个 shard 的。

同时,对于有非凡数据分布需要——将雷同 Key 的数据写到雷同 Shard——的高级用户,只有在上游保障雷同 Key 的数据写入雷同 Partition,那么导入 ByteHouse 也就能齐全满足用户需要,很好地反对惟一键等场景。

场景一:
基于上图可见,假如有一个双正本的 Shard,每个正本都会有一张雷同的 HaKafka 表处于 Ready 的状态。然而只有通过 ZK 选主胜利的 leader 节点上,HaKafka 才会执行对应的生产流程。当这个 leader 节点宕机当前,正本 Replica 2 会主动再被选为一个新的 Leader,持续生产,从而保障高可用。

场景二:
在节点故障场景下,个别须要执行替换节点流程。对于分布式节点替换有一个很沉重的操作——拷贝数据。

如果是一个多正本的集群,一个正本故障,另一个正本是完整的。咱们很天然心愿在节点替换阶段,Kafka 生产放在完整的正本 Replica 2 上,因为其上旧数据是齐备的。这样 Replica 2 就始终是一个齐备的数据集,能够失常对外提供服务。这一点 HaKafka 是能够保障的。HaKafka 选主的时候,如果确定有某一个节点在替换节点流程当中,会防止将其选为 Leader。

导入性能优化:Memory Table

HaKafka 还做到了 Memory Table 的优化。

思考这样一个场景:业务有一个大宽表,可能有上百列的字段 或者上千的 Map-Key。因为 ClickHouse 每一个列都会对应落盘为一个具体的文件,列越多,每次导入写的文件也就越多。那么,雷同生产工夫内,就会频繁地写很多的碎文件,对于机器的 IO 是很惨重的累赘,同时给 MERGE 带来很大压力;重大时甚至导致集群不可用。为了解决这种场景,咱们设计了 Memory Table 实现导入性能优化。

Memory Table 的做法就是每一次导入数据不间接刷盘,而是存在内存中;当数据达到一定量当前,再集中刷盘,缩小 IO 操作。Memory Table 能够提供对外查问服务的,查问会路由到生产节点所在的正本去读 memory table 里边的数据,这样保障了不影响数据导入的延时性。从外部应用教训来看,Memory Table 不仅很好地解决了局部大宽表业务导入需要,而且导入性能最高能够晋升 3 倍左右。

云原生新架构

鉴于上文形容的分布式架构的人造缺点,ByteHouse 团队始终致力于对架构进行降级。咱们抉择了业务支流的云原生架构,新的架构在 2021 年初开始服务字节外部业务,并于 2023 年初进行了代码开源(ByConity)。

云原生架构自身有着很人造的主动容错能力以及轻量级的扩缩容能力。同时,因为它的数据是云存储的,既实现了存储计算拆散,数据的安全性和稳定性也失去了进步。当然,云原生架构也不是没有毛病,将原来的本地读写改为远端读写,必然会带来肯定的读写性能损耗。然而,以肯定的性能损耗来换取架构的合理性,升高运维老本,其实是利大于弊的。

上图是 ByteHouse 云原生架构的架构图,本文针对实时导入这块介绍几个重要的相干组件。

  • Cloud Service
    首先,总架构分为三层,第一层是 Cloud Service,次要蕴含 Server 和 Catlog 两个组件。这一层是服务入口,用户的所有申请包含查问导入都从 Server 进入。Server 只对申请做预处理,不具体执行;在 Catlog 查问元信息后,把预处理的申请和元信息下发到 Virtual Warehouse 执行。
  • Virtual Warehouse
    Virtual Warehouse 是执行层。不同的业务,能够有独立的 Virtual Warehouse,从而做到资源隔离。当初 Virtual Warehouse 次要分为两类,一类是 Default,一类是 Write,Default 次要做查问,Write 做导入,实现读写拆散。
  • VFS
    最底层是 VFS(数据存储),反对 HDFS、S3、aws 等云存储组件。

基于云原生架构的实时导入设计

在云原生架构下,Server 端不做具体的导入执行,只做工作治理。因而在 Server 端,每个生产表会有一个 Manager,用来治理所有的生产执行工作,并将其调度到 Virtual Warehouse 上执行。

因为继承了 HaKafka 的 Low Level 生产模式,Manager 会依据配置的生产工作数量,将 Topic Partition 平均调配给各个工作;生产工作的数量是可配置的,下限是 Topic Partition 数目。

基于上图,大家能够看到右边是 Manager,从 catalog 拿到对应的 Offset,而后依据指定的生产工作数目,来调配对应的生产 Partition、并调度到 Virtual Warehouse 的不同节点来执行。

新的生产执行流程

因为云原生新架构下是有事务 Transaction 保障的,所有操作都心愿在一个事务内实现,也更加的合理化。依靠云原生新架构下的 Transaction 实现,每个生产工作的生产流程次要包含以下步骤:

  • 生产开始前,Worker 端的工作会先通过 RPC 申请,向 Server 端申请创立一个事务;
  • 执行 rdkafka::poll(),生产肯定工夫(默认 8s)或者足够大的 block;
  • 将 block 转化为 Part 并 Dump 到 VFS(此时数据不可见);
  • 通过 RPC 申请向 Server 发动事务 Commit 申请(事务中 Commit 的数据包含:dump 实现的 part 元数据以及对应 Kafka offset)
  • 事务提交胜利(数据可见)

容错保障

从上述生产流程里能够看到,云原生新架构下的生产,容错保障次要是基于 Manager 和 Task 的双向心跳以及疾速失败策略:

  • Manager 自身会有一个定期的探活,通过 RPC 查看调度的 Task 是否在失常执行;
  • 同时每个 Task 会在生产中借助事务 RPC 申请来校验本人的有效性,一旦校验失败,它能够主动 kill;
  • 而 Manager 一旦探活失败,则会立刻拉起一个新的生产工作,实现秒级的容错保障。

生产能力

对于生产能力的话,上文提到它是一个可扩展性的,生产工作数量能够由用户来配置,最高能够达到 Topic 的 Partition 数目。如果 Virtual Warehouse 中节点负载高的话,也能够很轻量地扩节点。
当然,Manager 调度工作实现了根本的负载平衡保障——用 Resource Manager 来做工作的治理和调度。

语义加强:Exactly—Once

最初,云原生新架构下的生产语义也有一个加强——从散布书架构的 At-Least-Once 降级到 Exactly—Once。

因为分布式架构是没有事务的,只能做到一个 At-Least-Once,就是任何状况下,保障不丢数据,然而一些极其状况可能会有反复生产产生。到了云原生架构,得益于 Transaction 的实现,每一次生产都能够通过事务让 Part 和 Offset 实现原子性提交,从而达到 Exactly—Once 的语义加强。

Memory buffer

对应 HaKafka 的 memory table,云原生架构同样实现了导入内存缓存 Memory Buffer。

与 Memory Table 不同的是,Memory Buffer 不再绑定到 Kafka 的生产工作上,而是实现为存储表的一层缓存。这样 Memory Buffer 就更具备通用性,不仅是 Kafka 导入能够应用,像 Flink 小批量导入的时候也能够应用。

同时,咱们引入了一个新的组件 WAL。数据导入的时候先写 WAL,只有写胜利了,就能够认为数据导入胜利了——当服务启动后,能够先从 WAL 复原未刷盘的数据;之后再写 Memory buffer,写胜利数据就可见了——因为 Memory Buffer 是能够由用户来查问的。Memory Buffer 的数据同样定期刷盘,刷盘后即可从 WAL 中革除。

业务利用及将来思考

最初简略介绍实时导入在字节外部的应用现状,以及下一代实时导入技术的可能优化方向。

ByteHouse 的实时导入技术是以 Kafka 为主,每天的数据吞吐是在 PB 级,导入的单个线程或者说单个消费者吞吐的经验值在 10-20MiB/s。(这里之所以强调是经验值,因为这个值不是一个固定值,也不是一个峰值;生产吞吐很大水平上取决于用户表的复杂程度,随着表列数减少,导入性能可能会显著升高,无奈应用一个精确的计算公式。因而,这里的经验值更多的是字节外部大部分表的导入性能经验值。)

除了 Kafka,字节外部其实还反对一些其余数据源的实时导入,包含 RocketMQ、Pulsar、MySQL(MaterializedMySQL)、Flink 直写等。

对于下一代实时导入技术的简略思考:

  • 更通用的实时导入技术,可能让用户反对更多的导入数据源。
  • 数据可见延时和性能的一个折衷。

点击跳转 ByteHouse 云原生数据仓库 理解更多

退出移动版