关于spark:SparkCeleborn更快更稳更弹性

4次阅读

共计 6144 个字符,预计需要花费 16 分钟才能阅读完成。

摘要:本文整顿自阿里云 EMR Spark 团队的周克勇(一锤),在 Spark&DS Meetup 的分享。本篇内容次要分为三个局部:

  1. 传统 Shuffle 的问题
  2. Apache Celeborn(Incubating)简介
  3. Celeborn 在性能、稳定性、弹性上的设计

一、传统 Shuffle 的问题

Apache Spark 是广为风行的大数据处理引擎,它有很多应用场景: Spark SQL、批处理、流解决、MLLIB、GraphX 等。在所有组件下是对立的 RDD 形象,RDD 血统通过两种依赖关系形容,窄依赖和宽依赖。其中宽依赖是撑持简单算子(Join, Agg 等)的要害,而宽依赖实现机制就是 Shuffle。

传统的 Shuffle 实现如上图两头局部所示,每个 Mapper 对 Shuffle Output 的数据,依据 Partition ID 做排序,而后把排序好的数据和索引写入本地盘。Shuffle Read 阶段,Reducer 从所有 Mapper 的 Shuffle 文件里读取属于本人的 Partition 数据。但这种实现有如下几个缺点:

  • 第一,依赖大容量的本地盘或云盘存储 Shuffle 数据,数据须要驻留直至生产实现。这就限度了存算拆散,因为存算拆散架构下,计算节点通常不心愿有大容量的本地盘,心愿计算完结就能够开释节点。
  • 第二,Mapper 做排序会占用较大内存,甚至触发堆外排序,引入额定的磁盘 IO。
  • 第三,Shuffle Read 有大量的网络连接,逻辑连接数是 m×n。
  • 第四,存在大量的随机读盘。假如一个 Mapper 的 Shuffle 数据是 128M,Reducer 的并发是 2000,那么每个文件将会被读 2000 次,每次只随机读 64k,这就很容易达到磁盘 IOPS 的瓶颈。
  • 第五,数据单正本,容错性不高。

以上五点缺点最终导致不够高效、不够稳固以及不够弹性。

二、Apache Celeborn (Incubating)

Apache Celeborn(Incubating)是咱们团队晚期为了解决上述问题开发的 Remote Shuffle Service,曾经于 2022 年 10 月捐献给了 Apache 基金会。Celeborn 的定位是大数据引擎对立两头数据服务,它是引擎无关的,并且除了反对 Shuffle,将来还会反对 Spilled data,这样计算节点就能真正解除对大容量本地盘的依赖。

在正式介绍 Celeborn 设计之前,简略介绍一些历史。Celeborn 最早诞生于 2020 年,过后的名字是 Remote Shuffle Service,次要为了满足客户需要,2021 年 12 月正式对外开源。开源之后咱们吸引了来自小米、Shopee、网易等开发者共建,其中很多人曾经成为了外围贡献者。

2022 年 10 月正式进入 Apache 孵化器,截至目前咱们积攒了 600+ 的 commits,32 个 contributor,330+ 的 star,也心愿更多感兴趣的开发者参加共建。

三、Celeborn 的性能、稳定性、弹性

Celeborn 针对性能晋升的设计,次要包含外围设计、如何对接 Spark AQE、列式 Shuffle、多层存储。

1. 性能

Celeborn 采纳了 Push Shuffle + Partition 数据聚合的外围设计。简略来讲,每个 Mapper 的外部保护一个 Buffer 来缓存 Shuffle 数据,当 Buffer 超过阈值之后会触发推送,Mapper 把属于同一个 Partition 的数据推给事后调配好的 Worker。

如上图所示,Partition1 和 Partition2 的数据推给 Worker1,Partition3 的数据推给 Worker2,每个 Partition 最终会生成一个文件。在 Shuffle Read 阶段 Reducer 只需从一个 Worker 上读取属于本人的数据。在这个设计下 Shuffle 数据不落盘,也不须要做排序。同时 Shuffle Read 从随机读转换成了程序读,网络的连接数也从乘数关系变成了线性关系。这就解决了传统 Shuffle 的次要缺点。

Partition 切分的设计动机是,对于大作业或者存在数据歪斜的数据,一个 Partition 的文件会变得十分大。咱们遇到单 Partition 超过百 G 的状况,很容易把磁盘打爆,也会导致磁盘负载不平衡。

针对这种状况,Celeborn 实现了 Partition 切分。具体来讲,Worker 会动静监测每个 Partition 文件的大小,当超过阈值的时候会返回给 Client 一个 Split 标记。Client 收到 Split 标记后,会异步申请新的 Worker,等新的 Worker Ready 后,Client 会往新的 Worker 推送数据。这样就能够保障单个 Partition 的 Split 文件不会过大,在 Shuffle Read 的时候会读取这两个 Split 文件。

接下来介绍 Celeborn 如何反对 Spark AQE。AQE 是近几年 Apache Spark 最重要的优化,它次要有三个场景,Partition 合并、Join Strategy 切换、Skew Join 优化。AQE 对 Shuffle 模块的要求是要可能按 Partition 的范畴和 Mapper 的范畴去读,按 Partition 的范畴读会比拟天然,如上图中右上角所示,Reducer1 间接读取 Partition2、3、4 的数据。

而依据 Mapper 的范畴读,实现起来略微简单,能够分为以下三个步骤:

  • 第一步,Split 切分。Skew Join 意味着数据有歪斜,大概率会触发 Partition 切分,例如 Partition1 切分成了 Split1 和 Split2。
  • 第二步,Sort On Read。在首次 Read 某个 Partition Split 文件的时候,会触发 Sort On Read,Worker 会依据 Partition ID 对这个文件做排序。排序之后,Mapper 的范畴读就会从排序之前的随机读变成程序读。比方我要读 Mapper1 到 Mapper2 的数据,如果是排序之前的文件,我须要对这个文件 seek 四次,但如果是排序之后我只须要 seek 一次。
  • 第三步,Range Read。Sub Reducer 从这两个 Partition 里程序读取属于本人的 Mapper 范畴的数据。同时,Split 文件会记录本人的 Mapper 列表,这样就能够裁剪掉不必要的 Split 文件。

接下来介绍 Celeborn 的列式 Shuffle。家喻户晓,行存和列存是两种常见的数据布局形式。列存的益处是雷同类型的数据放在一起,易于编码,如字典编码、行程编码、Delta 编码、前缀编码等,能够十分大程度升高数据量。以往列存次要用于存储源表数据,而计算引擎算子内的两头数据大多用行存,因为以往算子的实现大多基于行存数据。

但近几年向量化引擎越来越风行,包含 Velox、ClickHouse、DuckDB 等,他们都应用了向量化的算子实现,因而算子的两头数据也应用了列存。尽管 Databricks 的 photon 引擎应用了向量化技术,但 Apache Spark 仍然是基于行存的引擎。

为了在 Apache Spark 中实现列式 Shuffle,Celeborn 引入了行列转换和代码生成,在 Shuffle Write 的时候把行存的数据转化成列存,在 Shuffle Read 的时候把列转化为行存。同时为了晋升行列转换的效率,Celeborn 引入了代码生成的技术来打消解释执行的开销。在 3T TPCDS 的测试中开启列式 Shuffle 后,整体的 Shuffle Size 能够缩小 40%,行列转换的开销低于 5%。

接下来介绍 Celeborn 的多层存储。多层存储的设计指标是让 Celeborn 可能灵便适配多种硬件环境,并尽可能让数据寄存在更快的存储层。Celeborn 定义了三种存储介质:内存、本地盘、分布式存储(OSS/HDFS)。用户能够任意抉择 1-3 种存储,比方能够只用本地盘,也能够只用内存和 OSS。

上图展现了同时抉择三种介质的存储机制,首先内存会被划分为两个逻辑区域,Push Data Region 和 Cache Region。Map 推送的数据会先落在 Push Data Region,当某个 Partition 的数据超过预设阈值会触发 Flush,这个时候 Celeborn 会去判断 Partition 的指标存储层,如果是本地盘(P3),这部分数据将被刷到本地;如果是内存 Cache(p4),这部分数据会被逻辑划分给 Cache Region(不会有真正的内存拷贝)。

当 Cache Region 满了时,Celeborn 会把最大的 Partition Evict 到下一层存储,例如 P4 会被刷到本地盘。一旦某个 Partition 的数据被刷盘,它后续的数据将不会被移到 Cache Region。

当本地盘满了时,咱们有两种策略,第一种是把本地文件 Evict 到 OSS。第二种不必动本地文件,数据间接从内存 Flush 到 OSS。

多层存储既能够通过内存晋升小 Shuffle 的性能,也能够利用 OSS 的海量存储空间,反对超大的 Shuffle,还还能够让 Celeborn 不依赖本地盘,比方只抉择内存和 OSS,那么 Celeborn 就没有本地盘,这样就能够更好的对 Celeborn 服务自身实现弹性。

2. 稳定性 Celeborn

针对服务自身稳定性的设计,次要包含绍原地降级、拥塞管制、负载平衡。

首先介绍原地降级。可用性是服务必须满足的要求,蓝绿切换的形式尽管能够满足大部分场景,但须要较多人工染指和长期资源扩张。Celeborn 通过协定向前兼容和优雅重启实现了利用无感的原地降级。向前兼容咱们通过协定的 PB 化实现,而优雅重启咱们利用了 Partition 被动切分的个性,上图展现了优雅重启的过程。

首先,内部零碎触发优雅重启,Worker 收到信号后,把本人标记为 graceful shutdown 状态,并上报给 Master,尔后 Master 将不会向 Worker 调配新的 slots。而后 Worker 给 PushData 的返回打上 HardSplit 的标,Client 收到这个标记后将不会持续向这个 Worker 推送数据,同时向该 Worker 会发动 CommitFile 的音讯,当 Worker 上所有缓存在内存中的 Partition 数据实现 CommitFile 后,Worker 会把内存的状态序列化并存到本地的 LevelDB,而后重启。之后从 LevelDB 里读取并复原状态,最初向 Master 从新注册。

从这个流程咱们能够看到,因为有被动 Split 机制的存在,Celeborn 的优雅重启比起其余零碎要更加高效,基本上能够在秒级别实现,且齐全不影响作业运行。

接下来介绍 Celeborn 在 Shuffle Write 阶段的拥塞管制。为了防止刹时的大作业把 Worker 内存打爆,Celeborn 参考了 TCP 的拥塞管制机制,包含慢启动、拥塞防止、拥塞管制三个环节。

Pusher 初始的时候处于慢启动状态,推送数据的速率很慢,但这个速率会以指数级上涨,当它达到某个阈值后会进入拥塞防止阶段。这时推送速率的上涨速度会变慢,变成固定的斜率。而这时如果 Worker 内存达到警戒线,会触发拥塞管制,给每个 Client 发一条标记。Client 收到之后会回到一开始的慢启动状态,Pusher 的速率也相应降到非常低。

流量管制的另一种常见设计是 Credit Based 的流控,简略来说就是每当我推送数据之前,要先向 Worker 拿到肯定的 Credit,这意味着 Worker 会为我预留一部分内存,我只能推送不超过我手里的 Credit 的数据。这种机制能够保障对内存的精准管制,但它的 Tradeoff 是减少了控制流,对性能有肯定的影响。

Celeborn 在 Shuffle Write 阶段采纳的类 TCP 的拥塞管制,能同时兼顾刹时流量的顶峰和稳固态的性能。同时,Celeborn 在反对 Flink 的 Shuffle Read 阶段采纳了 Credit Based 的设计。

接下来介绍 Celeborn 的负载平衡设计。以后 Celeborn 关注的负载平衡次要集中在磁盘,设计指标是隔离坏盘,并尽量把负载调配给更快的、空间更足的盘。具体来说,Worker 会监控本地每块可用盘的状态,包含衰弱度、刷盘速率、预测将来的用量,这些状态信息随心跳发给 Master。Master 保护了整个集群所有可用盘的状态信息,并依据某个算法模型对磁盘进行分组。级别高的组会调配更多的工作负载,如果属于同一个组,会尽量调配给可用容量更大的盘。Celeborn 这种负载平衡的设计在异构环境下有更稳固的体现。

3. 弹性 Celeborn

针对弹性的设计,次要包含 Spark on K8s + Celeborn 计划。

在 Yarn 的场景,External Shuffle Service 是 Spark 开启动静资源伸缩的前提,Shuffle 数据托管给 ESS 后,Executor 就能够开释。但 Spark on K8s 场景不存在 ESS,为了服务后续的 Shuffle Read,Pod 即便处于闲暇状态也无奈开释。开源计划为了优化这个场景,加了一个参数 spark.dynamicAllocation.shuffleTracking.enabled,通过跟踪 Shuffle 文件是否被读取来决定是否开释。

但依据咱们的测试,这个参数的成果无限。集成 Celeborn 之后,Shuffle 数据托管给 Celeborn 集群,Pod 就能够在闲暇后立刻开释,从而做到真正的弹性。

4. 典型场景

Celeborn 有以下三种典型的场景。

  • 第一种是齐全混部。也就是 HDFS、Yarn、Celeborn 散布在同一个集群,它的次要收益是能够晋升性能和稳定性。
  • 第二种是 Celeborn 独立部署,HDFS 和 Yarn 混部。它除了能晋升性能和稳定性,还能隔离源表数据的 IO 和 Shuffle 数据的 IO 对磁盘的抢占,提供了肯定的资源隔离,以及 Celeborn 集群的局部弹性。
  • 第三种是存算拆散。源表的数据存在对象存储,计算节点运行在 K8s 或者 Yarn 集群,Celeborn 的集群也独立部署,这种场景下计算集群和 Celeborn 集群都能够享受残缺的弹性。

5.Evaluation

接下来分享两个案例,第一个是混部的案例。一位用户把 Celeborn 混部在计算集群中,Celeborn 部署的整体规模达到 1000 台以上,但每个 Worker 给的资源比拟无限。

这位用户每天的 Shuffle 数据量在通过压缩后能够达到 4PB,对大数据稳定性的晋升也十分的显著。从上图能够看到,存在 8 万多并发,单个 Shuffle 有 16T 规模的作业,在 HDD 环境下也能够稳固的运行,在上 Celeborn 之前这个作业是跑不过的。

第二个是一个存算拆散的案例。一位用户采纳了齐全存算拆散的架构,它的计算节点跑在 K8s 上,源表数据存在 OSS,Celeborn 集群独立部署。他们的计算节点每天 Pod 的数量有好几万,默认开启 Spark 的动静资源伸缩性能,有十分好的弹性,除此之外,性能和稳定性也有显著晋升。

上图是咱们在规范测试集 TPCDS 3T 的混部环境的测试后果。Celeborn 在不额定耗费机器资源的状况下,单正本比 External Shuffle Service 性能晋升 20%,双正本有 13% 的晋升。

原文链接

本文为阿里云原创内容,未经容许不得转载。

正文完
 0