作者: 张凯 @阿里云、陳韋廷 @Intel、周渊 @Intel
简介
Apache Celeborn(Incubating) 是阿里云捐献给 Apache 的通用 Remote Shuffle Service,旨在晋升大数据计算引擎的性能 / 稳定性 / 弹性,目前已广泛应用于生产场景。Gluten 是 Intel 开源的引擎减速我的项目,旨在通过把 Spark Java Engine 替换为 Native Engine(Velox, ClickHouse, Arrow 等)来减速 Spark 引擎。过来一段时间,Gluten 社区和 Celeborn 社区相互合作,胜利把 Celeborn 集成进 Gluten,本文将对此加以介绍。
Gluten: 给 Spark 换上 Native 引擎
Gluten 我的项目旨在解决基于 Apache Spark 的数据负载场景中的 CPU 计算瓶颈。随着 IO 技术的晋升,特地是 SSD 和万兆网卡的遍及,CPU 计算瓶颈逐步成为限度性能的次要因素。然而,基于 JVM 进行 CPU 指令优化绝对艰难,因为与其余本地语言(如 C ++)相比,JVM 提供的优化性能较少。
为了解决这个问题,开源社区曾经有一些成熟的本地引擎(如 ClickHouse、Velox)具备了优良的向量化执行能力,能够带来显著的性能劣势。然而,这些引擎通常与 Spark 生态系统脱离,对于那些曾经重大依赖 Spark 计算框架且无奈接受大量运维和迁徙老本的用户来说,不够敌对。Gluten 我的项目的指标是使 Spark 用户可能享受这些成熟的本地引擎带来的性能劣势,而无需迁徙。
Gluten 我的项目利用 Spark 插件机制,拦挡并将查问打算发送给本地引擎执行,从而跳过 Spark 自身不够高效的执行门路。该我的项目反对多个本地引擎作为后端,包含 Velox、ClickHouse 和 Apache Arrow。对于本地引擎无奈解决的操作,Gluten 会回退到 Spark 的失常执行门路。在线程模型方面,Gluten 应用 JNI 调用库的模式,在 Spark 执行器工作线程中间接调用本地代码,防止引入简单的线程模型。
在内存治理方面,Gluten 可能对立治理本地内存和 JVM 内存,通过本地内存池和工作内存管理器分配内存。当内存不足时,会触发溢出操作,开释内存。此外,Gluten 还提供了残缺的列式 Shuffle 机制及及对立 API 接口用于连接市场受欢迎的第三方 RemoteShuffleService 如 Celeborn,防止了数据转换开销及提供服务。
为了兼容不同的本地引擎,Gluten 定义了清晰的 JNI 接口,作为 Spark 框架和底层引擎之间的桥梁。这些接口用于申请传递、数据传输和能力检测等方面的需要。开发者只需实现这些接口,并满足相应的语义要求,即可利用 Gluten 实现 Spark 和本地引擎的整合工作。此外,Spark 的架构设计中还预留了 Shim Layer 来适配反对不同版本的 Spark。
Gluten Columnar Shuffle
Shuffle 自身是影响 Spark 性能的重要一环,这里会引入屡次序列化 / 反序列化,网络传输,磁盘读写,因而要想实现高性能才不至于成为瓶颈。因为 Native Engine 采纳列式(Columnar)数据结构暂存数据,如果简略的沿用 Spark 的基于行数据模型的 Shuffle,则会在 Shuffle Write 阶段引入数据列转行的环节,在 Shuffle Read 阶段引入数据行转列的环节,能力使数据能够晦涩周转。然而无论行转列,还是列转行的老本都不低。因而,Gluten 必须提供残缺的 Columnar Shuffle 机制以避开这里的转化开销。具体到 columnar shuffle 实现层,次要分成 shuffle 数据写入和 shuffle 数据读取两块。
Columnar Shuffle 写入
和原生 Spark 一样,Columnar Shuffle 指标是将以后 DAG 产生的长期数据写入磁盘,在下一个 stage 须要将数据读出,也须要反对内存不足时的 spill 操作,优先保障查问的健壮性。与 Spark 里不同的中央次要有以下几点:
- Spark 默认采纳 row based 格局存储数据,Gluten 里 shuffle 采纳了 columnar format 来保留数据。
<!—->
-
- 目前的实现是基于 Arrow format,来做序列化的工作,并且反对自定义压缩格局。采纳 columnar format 来实现能够不便的引入 SIMD 指令来做优化
<!—->
- Spark 默认采纳 sort-based shuffle,而 gluten 里默认采纳 hash based shuffle
<!—->
-
- Sort based shuffle 比,hash based 算法复杂度更低,但须要占用更多内存,并且引入很多小文件问题。gluten 里实现 hash based shuffle 时,也参考了 sort based shuffle 的局部设计,缩小了小文件过多带来的影响
在一个 TPC-H Like Scale Factor 6TB 的测试场景中,Columnar Shuffle Write 和原生 Spark 的 row based shuffle 相比,能够达到缩小约~12% 的 Shuffle Size 的成果。
Columnar Shuffle 读取
在实现 Columnar Shuffle 读取时,Gluten 复用了 Spark 里的 netty based shuffle transfer 机制,只须要提供对应的 de-serializer,将曾经写到磁盘上的 shuffle 文件读取上来,并反序列化交给 reducer。Spark 里引入了很多软件栈比方 netty, kryo,导致 reducer 读取时有反复的内存拷贝,Gluten 里也做了一些零拷贝优化来缩小这里的软件开销。
Celeborn:解决本地 Shuffle 的限度
Gluten 本地 Shuffle 限度
上图展现了 Gluten Columnar Shuffle 的主流程,其中 Hash-based Shuffle、Native Partitioner、零拷贝等设计是其取得高性能的要害。然而,Gluten 沿用了 Spark 的本地 Shuffle 框架,存在以下次要限度。
- 依赖大容量本地盘存储 shuffle 数据,一方面无奈利用存算拆散架构,另一方面计算节点“有状态”无奈及时缩容,从而导致难以兼容云原生架构,资源利用率低。
- Shuffle Write 内存缓和时 Spill 到磁盘,减少额定的磁盘 I/O。
- Shuffle Read 有大量的网络连接和大量磁盘随机读,导致较差的稳定性和性能。
Celeborn 简介
Apache Celeborn(Incubating) 是较成熟的通用 Remote Shuffle Service,能够很好的解决大数据引擎本地 Shuffle 存在的稳定性、性能、弹性的问题,详见文末索引 13。Apache Celeborn 社区和 Gluten 社区过来一段时间相互配合,胜利把 Celeborn 集成进 Gluten,使得 Native Spark 能更好的拥抱 Cloud Native。接下来将介绍 Gluten 如何集成 Celeborn。
Gluten + Celeborn
整体设计
Gluten 集成 Celeborn 的设计指标是同时保留 Gluten Columnar Shuffle 和 Celeborn Remote Shuffle 的外围设计,让两者的劣势叠加。
上图形容了 Gluten+Celeborn Columnar Shuffle 的整体设计:Shuffle Writer 复用 Native Partitioner,拦挡本地 IO 并改为推向 Celeborn 集群;Celeborn 集群做数据重组 (聚合雷同 Partition 的数据) 和多备份;Shuffle Reader 从特定 Celeborn Worker 上程序读取数据并反序列化为 Column Batch。这个设计不仅保留了 Gluten Columnar Shuffle 的高性能设计,又充分利用了 Celeborn 远端存储、数据重组和多正本的能力。
具体而言,Gluten 集成 Celeborn 次要在于实现对应的 ShuffleManager,ShuffleWriter 以及 ShuffleReader,接下来将别离介绍。
CelebornShuffleManager
CelebornShuffleManager 继承了 Spark ShuffleManager 接口,作为 Gluten 对接 Celeborn 的 ShuffleManager,次要做了以下工作:
- 向 Celeborn register shuffle,失败则回退到 Gluten 的本地 Columnar Shuffle。
- 与 Celeborn 集群建设连贯并初始化 Celeborn Shuffle Client。
- 提供 getWriter 办法获取 CelebornShuffleWriter。
- 提供 getReader 办法获取 CelebornShuffleReader。
CelebornShuffleWriter
CelebornShuffleWriter 与 Gluten Columnar Shuffle 统一,都采纳了 Hash-based Shuffle。其外围性能是复用 Gluten 中的 Native Partitioner,并将磁盘 IO 操作 (Spill,写 Shuffle 文件) 替换为推向 Celeborn 集群。次要流程如下:
- 通过 JNI 向 Native 模块传递 CelebornPartitionPusher,使得 Native 模块可向 Celeborn 集群推送数据。
- 复用 Native Partitioner 对列式数据进行 Partition,与 Gluten Columnar Shuffle 保持一致。
- 向 GlutenMemoryConsumer 注册 Spiller,保障在 Spark 监测到内存不足触发 Spill 时,能够通过 Celeborn SDK 把数据推送到 Celeborn 集群,从而防止额定的磁盘 IO。
- 在 Native 模块,当全副数据实现 Partition 后,将写文件操作替换成通过 Celeborn SDK 推送到 Celeborn 集群。
CelebornShuffleReader
CelebornShuffleReader 跟 Celeborn 集群建设连贯,读取 Shuffle 数据。在 Gluten 侧实现 CelebornColumnarBatchSerializer,通过 deserializeStream 办法定制 InputStream 的 deserialize 流程,最初将反序列化的 ColumnarBatch 交给 Gluten 持续解决。从上述两图比照可知,本地 Shuffle 的 Reducer 从多个文件读取数据,而 Celeborn Reducer 只需从一个 Worker 上读取,随机读转换成了程序读,网络的连接数也从乘数关系变成了线性关系,从而晋升了 Shuffle Read 的性能。
性能测试
Celeborn 在磁盘资源受限时有最好的性能体现。咱们测试了三组硬件环境:SDD 环境,充沛 HDD 环境,无限 HDD 环境。整体论断是:在 SDD 环境,Gluten + Celeborn Columnar Shuffle 性能跟 Gluten 本地 Columnar Shuffle 持平;在充沛 HDD 和无限 HDD 环境,Gluten + Celeborn Columnar Shuffle 性能比 Gluten 本地 Columnar Shuffle 别离晋升8% 和12%。
充沛 HDD 环境
部署形式:Celeborn 集群和 Yarn 集群混部。
硬件环境:1 x Master(64 vCPU, 256 GiB) 5 x worker(40 vCPU, 176 GiB, 15x7300GB HDD)
Spark 版本:3.3.1
Benchmark:3T TPCDS
下图是 Gluten+Celeborn 相比 Gluten 的 Top20 的减速比:
下图是残缺 TPCDS 的工夫比照,整体晋升 8%:
受限 HDD 环境
部署形式:Celeborn 集群和 Yarn 集群混部。
硬件环境:1 x Master(64 vCPU, 256 GiB) 5 x worker(40 vCPU, 176 GiB, 2x7300GB HDD)
Spark 版本:3.3.1
Benchmark:3T TPCDS
下图是 Gluten+Celeborn 相比 Gluten 的 Top20 的减速比:
下图是残缺 TPCDS 的工夫比照,整体晋升12% :
SSD 环境
最初把磁盘全副换成 SSD,Gluten+Celeborn 在不额定耗费机器资源的状况下,比 Gluten 性能晋升 1.2%,性能根本持平。
总结
本篇文章介绍了 Gluten 我的项目的背景和指标,以及它如何解决基于 Apache Spark 的数据负载场景中的 CPU 计算瓶颈。Gluten 利用 Spark 插件机制,将查问打算发送给本地引擎执行,从而跳过 Spark 自身不够高效的执行门路。该我的项目反对多个本地引擎作为后端,引入 Columnar Shuffle 设计,并对立治理本地内存和 JVM 内存。此外,Gluten 集成了 Celeborn 作为 Remote Shuffle Service,Celeborn 采纳了 Push Shuffle 的设计,通过远端存储、数据重组、内存缓存、多正本等设计,不仅进一步晋升 Gluten Shuffle 的性能和稳定性,还使得 Gluten 领有更好的弹性,从而更好的拥抱云原生。
欢送退出咱们的开源我的项目,并奉献你的代码!咱们的我的项目位于
Gluten: https://github.com/oap-project/gluten
Celeborn: https://github.com/apache/incubator-celeborn
Celeborn 用户交换钉群: 41594456
Reference
[1]https://developer.aliyun.com/article/779686
[2]https://developer.aliyun.com/article/857757
[3]https://developer.aliyun.com/article/891951
[4]https://developer.aliyun.com/article/1153123