乐趣区

关于分布式:解析分布式应用框架Ray架构源码

摘要:Ray 的定位是分布式应用框架,次要指标是使能分布式应用的开发和运行。

Ray 是 UC Berkeley 大学 RISE lab(前 AMP lab)2017 年 12 月 开源的新一代分布式应用框架(刚公布的时候定位是高性能分布式计算框架,20 年中批改定位为分布式应用框架),通过一套引擎解决简单场景问题,通过动静计算及状态共享提高效率,实现研发、运行时、容灾一体化

Ray 架构解析

业务指标

Ray 的定位是分布式应用框架,次要指标是使能分布式应用的开发和运行。

业务场景

具体的粗粒度应用场景包含

  • 弹性负载,比方 Serverless Computing
  • 机器学习训练,Ray Tune, RLlib, RaySGD 提供的训练能力
  • 在线服务,例如 Ray Server 提供在线学习的案例
  • 数据处理,例如 Modin, Dask-On-Ray, MARS-on-Ray
  • 长期计算(例如,并行化 Python 应用程序,将不同的分布式框架粘合在一起)
    Ray 的 API 让开发者能够轻松的在单个分布式应用中组合多个 libraries,例如,Ray 的 tasks 和 Actors 可能会 call into 或 called from 在 Ray 上运行的分布式训练(e.g. torch.distributed)或者在线服务负载; 在这种场景下,Ray 是作为一个“分布式胶水”零碎,因为它提供通用 API 接口并且性能足以撑持许多不同工作负载类型。

零碎设计指标

  • Ray 架构设计的外围准则是 API 的简略性和通用性
  • Ray 的零碎的外围指标是性能(低开销和程度可伸缩性)和可靠性。为了达成外围指标,设计过程中须要就义一些其余现实的指标,例如简化的零碎架构。例如,Ray 应用了分布式参考计数和分布式内存之类的组件,这些组件减少了体系结构的复杂性,然而对于性能和可靠性而言却是必须的。
  • 为了进步性能,Ray 建设在 gRPC 之上,并且在许多状况下能够达到或超过 gRPC 的原始性能。与独自应用 gRPC 相比,Ray 使应用程序更容易利用并行和分布式执行以及分布式内存共享(通过共享内存对象存储)。
  • 为了进步可靠性,Ray 的外部协定旨在确保产生故障时的正确性,同时又缩小了常见状况的开销。Ray 施行了分布式参考计数协定以确保内存平安,并提供了各种从故障中复原的选项。
  • 因为 Ray 应用形象资源而不是机器来示意计算能力,因而 Ray 应用程序能够无缝的从便携机环境扩大到群集,而无需更改任何代码。Ray 通过分布式溢出调度程序和对象管理器实现了无缝扩大,而开销却很低。

相干零碎上下文

  • 集群管理系统:Ray 能够在 Kubernetes 或 SLURM 之类的集群管理系统之上运行,以提供更轻量的 task 和 Actor 而不是容器和服务。
  • 并行框架:与 Python 并行化框架(例如 multiprocessing 或 Celery)相比,Ray 提供了更通用,更高性能的 API。Ray 零碎还明确反对内存共享。
  • 数据处理框架: 与 Spark,Flink,MARS 或 Dask 等数据处理框架相比,Ray 提供了一个 low-level 且较简化的 API。这使 API 更加灵便,更适宜作为“分布式胶水”框架。另一方面,Ray 对数据模式,关系表或流数据流没有外在的反对。仅通过库(例如 Modin,Dask-on-Ray,MARS-on-Ray)提供此类性能。
  • Actor 框架:与诸如 Erlang 和 Akka 之类的专用 actor 框架不同,Ray 与现有的编程语言集成,从而反对跨语言操作和语言本机库的应用。Ray 零碎还通明地治理无状态计算的并行性,并明确反对参与者之间的内存共享。
  • HPC 零碎:HPC 零碎都反对 MPI 消息传递接口,MPI 是比 task 和 actor 更底层的接口。这能够使应用程序具备更大的灵活性,然而开发的复杂度加大了很多。这些零碎和库中的许多(例如 NCCL,MPI)也提供了优化的个体通信原语(例如 allreduce)。Ray 应用程序能够通过初始化各组 Ray Actor 之间的通信组来利用此类原语(例如,就像 RaySGD 的 torch distributed)。

零碎设计

逻辑架构:

畛域模型

  • Task:在与调用者不同的过程上执行的单个函数调用。工作能够是无状态的(@ ray.remote 函数)或有状态的(@ ray.remote 类的办法 - 请参见上面的 Actor)。工作与调用者异步执行:.remote()调用立刻返回一个 ObjectRef,可用于检索返回值。
  • Object:应用程序值。这能够由工作返回,也能够通过 ray.put 创立。对象是不可变的:创立后就无奈批改。工人能够应用 ObjectRef 援用对象。
  • Actor:有状态的工作过程(@ ray.remote 类的实例)。Actor 工作必须应用句柄或对 Actor 的特定实例的 Python 援用来提交。
  • Driver:程序根目录。这是运行 ray.init()的代码。
  • Job:源自同一驱动程序的(递归)工作,对象和参与者的汇合

集群设计

如上图所示,Ray 集群包含一组同类的 worker 节点和一个集中的全局管制存储(GCS)实例。
局部零碎元数据由 GCS 治理,GCS 是基于可插拔数据存储的服务,这些元数据也由 worker 本地缓存,例如 Actor 的地址。GCS 治理的元数据拜访频率较低,但可能被群集中的大多数或所有 worker 应用,例如,群集的以后节点成员身份。这是为了确保 GCS 性能对于应用程序性能影响不大。

Ownership

  • 大部分零碎元数据是依据去中心化理念(ownership)进行治理的:每个工作过程都治理和领有它提交的工作以及这些工作返回的“ObjectRef”。Owner 负责确保工作的执行并促成将 ObjectRef 解析为其根底值。相似地,worker 领有通过“ray.put”调用创立的任何对象。
  • OwnerShip 的设计具备以下长处(与 Ray 版本 <0.8 中应用的更集中的设计相比):
  1. 低工作提早(〜1 RTT,<200us)。常常拜访的零碎元数据对于必须对其进行更新的过程而言是本地的。
  2. 高吞吐量(每个客户端约 10k 工作 / 秒;线性扩大到集群中数百万个工作 / 秒),因为零碎元数据通过嵌套的近程函数调用天然散布在多个 worker 过程中。
  3. 简化的架构。owner 集中了平安垃圾收集对象和零碎元数据所需的逻辑。
  4. 进步了可靠性。能够依据应用程序构造将工作程序故障彼此隔离,例如,一个近程调用的故障不会影响另一个。
  • OwnerShip 附带的一些衡量取舍是:
  1. 要解析“ObjectRef”,对象的 owner 必须是可及的。这意味着对象必须与其 owner 绑定。无关对象复原和持久性的更多信息,请参见 object 故障和 object 溢出。
  2. 目前无奈转让 ownership。

外围组件

  • Ray 实例由一个或多个工作节点组成,每个工作节点由以下物理过程组成:
  1. 一个或多个工作过程,负责工作的提交和执行。工作过程要么是无状态的(能够执行任何 @ray.remote 函数),要么是 Actor(只能依据其 @ray.remote 类执行办法)。每个 worker 过程都与特定的作业关联。初始工作线程的默认数量等于计算机上的 CPU 数量。每个 worker 存储 ownership 表和小对象:
    a. Ownership 表。工作线程具备援用的对象的零碎元数据,例如,用于存储援用计数。
    b. in-process store,用于存储小对象。
  2. Raylet。raylet 在同一群集上的所有作业之间共享。raylet 有两个主线程:
    a. 调度器。负责资源管理和满足存储在分布式对象存储中的工作参数。群集中的单个调度程序包含 Ray 散布式调度程序。
    b. 共享内存对象存储(也称为 Plasma Object Store)。负责存储和传输大型对象。集群中的单个对象存储包含 Ray 分布式对象存储。

每个工作过程和 raylet 都被调配了一个惟一的 20 字节标识符以及一个 IP 地址和端口。雷同的地址和端口能够被后续组件重用(例如,如果以前的工作过程死亡),但惟一 ID 永远不会被重用(即,它们在过程死亡时被标记为墓碑)。工作过程与其本地 raylet 过程共享命运。

  • 其中一个工作节点被指定为 Head 节点。除了上述过程外,Head 节点还托管:
  1. 全局管制存储 (GCS)。GCS 是一个键值服务器,蕴含零碎级元数据,如对象和参与者的地位。GCS 目前还不反对高可用,后续版本中 GCS 能够在任何和多个节点上运行,而不是指定的头节点上运行。
  2. Driver 过程 (es)。Driver 是一个非凡的工作过程,它执行顶级应用程序(例如,Python 中的__main__)。它能够提交工作,但不能执行任何工作自身。Driver 过程能够在任何节点上运行。

交互设计

利用的 Driver 能够通过以下形式之一连贯到 Ray:

  1. 调用 `ray.init()’,没有参数。这将启动一个嵌入式单节点 Ray 实例,利用能够立刻应用该实例。
  2. 通过指定 ray.init(地址 =<GCS addr>)连贯到现有的 Ray 集群。在后端,Driver 将以指定的地址连贯到 GCS,并查找群集其余组件的地址,例如其本地 raylet 地址。Driver 必须与 Ray 群集的现有节点之一合部。这是因为 Ray 的共享内存性能,所以合部是必要的前提。
  3. 应用 Ray 客户端 `ray.util.connect()’ 从近程计算机(例如笔记本电脑)连贯。默认状况下,每个 Ray 群集都会在能够接管近程客户端连贯的头节点上启动一个 Ray Client Server,用来接管近程 client 连贯。然而因为网络提早,间接从客户端运行的某些操作可能会更慢。

Runtime

  • 所有 Ray 外围组件都是用 C ++ 实现的。Ray 通过一个名为“core worker”的通用嵌入式 C ++ 库反对 Python 和 Java。此库实现 ownership 表、过程内存储,并治理与其余工作器和 Raylet 的 gRPC 通信。因为库是用 C ++ 实现的,所有语言运行时都共享 Ray 工作协定的通用高性能实现。

Task 的 lifetime

Owner 负责确保提交的 Task 的执行,并促成将返回的 ObjectRef 解析为其根底值。如下图,提交 Task 的过程被视为后果的 Owner,并负责从 raylet 获取资源以执行 Task,Driver 领有 A 的后果,Worker 1 领有 B 的后果。

  • 提交 Task 时,Owner 会期待所有依赖项就绪,即作为参数传递给 Task 的 ObjectRefs(请参见 Object 的 lifetime)变得可用。依赖项不须要是本地的;Owner 一旦认为依赖项在群集中的任何中央可用,就会立刻就绪。当依赖关系就绪时,Owner 从散布式调度程序申请资源以执行工作,一旦资源可用,调度程序就会授予申请,并应用调配给 owner 的 worker 的地址进行响应。
  • Owner 将 task spec 通过 gRPC 发送给租用的 worker 来调度工作。执行工作后,worker 必须存储返回值。如果返回值较小,则工作线程将值间接 inline 返回给 Owner,Owner 将其复制到其过程中对象存储区。如果返回值很大,则 worker 将对象存储在其本地共享内存存储中,并向所有者返回分布式内存中的 ref。让 owner 能够援用对象,不用将对象提取到其本地节点。
  • 当 Task 以 ObjectRef 作为其参数提交时,必须在 worker 开始执行之前解析对象值。如果该值较小,则它将间接从所有者的过程中对象存储复制到任务说明中,在任务说明中,执行 worker 线程能够援用它。如果该值较大,则必须从分布式内存中提取对象,以便 worker 在其本地共享内存存储中具备正本。scheduler 通过查找对象的地位并从其余节点申请副原本协调此对象传输。
  • 容错:工作可能会以谬误完结。Ray 辨别了两种类型的工作谬误:
  1. 应用程序级。这是工作过程处于活动状态,但工作以谬误完结的任何场景。例如,在 Python 中抛出 IndexError 的工作。
  2. 零碎级。这是工作过程意外死亡的任何场景。例如,隔离故障的过程,或者如果工作程序的本地 raylet 死亡。
  • 因为应用程序级谬误而失败的工作永远不会重试。异样被捕捉并存储为工作的返回值。因为零碎级谬误而失败的工作能够主动重试到指定的尝试次数。
  • 代码参考:
  1. src/ray/core_worker/core_worker.cc
  2. src/ray/common/task/task_spec.h
  3. src/ray/core_worker/transport/direct_task_transport.cc
  4. src/ray/core_worker/transport/ 依赖关系_解析器.cc
  5. src/ray/core_worker/task_manager.cc
  6. src/ray/protobuf/common.proto

Object 的 lifetime

下图 Ray 中的分布式内存治理。worker 能够创立和获取对象。owner 负责确定对象何时平安开释。

  • 对象的 owner 就是通过提交创立 task 或调用 ray.put 创立初始 ObjectRef 的 worker。owner 治理对象的生存期。Ray 保障,如果 owner 是活的,对象最终可能会被解析为其值(或者在 worker 失败的状况下引发谬误)。如果 owner 已死亡,则获取对象值的尝试永远不会 hang,但可能会引发异样,即便对象仍有物理正本。
  • 每个 worker 存储其领有的对象的援用计数。无关如何跟踪援用的详细信息,请参阅援用计数。Reference 仅在上面两种操作期间计算:
    1. 将 ObjectRef 或蕴含 ObjectRef 的对象作为参数传递给 Task。
    2. 从 Task 中返回 ObjectRef 或蕴含 ObjectRef 的对象。
  • 对象能够存储在 owner 的过程内内存存储中,也能够存储在分布式对象存储中。此决定旨在缩小每个对象的内存占用空间和解析工夫。
  • 当没有故障时,owner 保障,只有对象仍在作用域中(非零援用计数),对象的至多一个正本最终将可用。。
  • 有两种办法能够将 ObjectRef 解析为其值:
    1. 在 ObjectRef 上调用 ray.get。
    2. 将 ObjectRef 作为参数传递给工作。执行工作程序将解析 ObjectRefs,并将工作参数替换为解析的值。
  • 当对象较小时,能够通过间接从 owner 的过程内存储中检索它来解析。大对象存储在分布式对象存储中,必须应用分布式协定解析。
  • 当没有故障时,解析将保障最终胜利(但可能会引发应用程序级异样,例如 worker segfault)。如果存在故障,解析可能会引发零碎级异样,但永远不会挂起。如果对象存储在分布式内存中,并且对象的所有正本都因 raylet 故障而失落,则该对象可能会失败。Ray 还提供了一个选项,能够通过重建主动复原此类失落的对象。如果对象的所有者过程死亡,对象也可能失败。
  • 代码参考:
  1. src/ray/core_worker/store_Provider/memory_store/memory_store.cc
  2. src/ray/core_worker/store_Provider/plasma_store_provider.cc
  3. src/ray/core_worker/reference_count.cc
  4. src/ray/object_manager/object_manager.cc

Actor 的 lifetime

Actor 的 lifetimes 和 metadata (如 IP 和端口) 是由 GCS service 治理的. 每一个 Actor 的 Client 都会在本地缓存 metadata,应用 metadata 通过 gRPC 将 task 发送给 Actor.

如上图,与 Task 提交不同,Task 提交齐全扩散并由 Task Owner 治理,Actor lifetime 由 GCS 服务集中管理。

  • 在 Python 中创立 Actor 时,worker 首先同步向 GCS 注册 Actor。这确保了在创立 Actor 之前, 如果创立 worker 失败的状况下的正确性。一旦 GCS 响应,Actor 创立过程的其余部分将是异步的。Worker 过程在创立一个称为 Actor 创立 Task 的非凡 Task 队列。这与一般的非 Actor 工作相似,只是其指定的资源是在 actor 过程的生存期内获取的。创建者异步解析 actor 创立 task 的依赖关系,而后将其发送到要调度的 GCS 服务。同时,创立 actor 的 Python 调用立刻返回一个“actor 句柄”,即便 actor 创立工作尚未调度,也能够应用该句柄。
  • Actor 的工作执行与一般 Task 相似:它们返回 futures,通过 gRPC 间接提交给 actor 过程,在解析所有 ObjectRef 依赖关系之前,不会运行。和一般 Task 次要有两个区别:
  1. 执行 Actor 工作不须要从调度器获取资源。这是因为在打算其创立工作时,参与者已在其生命周期内取得资源。
  2. 对于 Actor 的每个调用者,工作的执行程序与提交程序雷同。
  • 当 Actor 的创建者退出时,或者群集中的作用域中没有更多挂起的工作或句柄时,将被清理。不过对于 detached Actor 来说不是这样的,因为 detached actor 被设计为能够通过名称援用的长 Actor,必须应用 ray.kill(no_restart=True) 显式清理。
  • Ray 还反对 async actor,这些 Actor 能够应用 asyncio event loop 并发运行工作。从调用者的角度来看,向这些 actor 提交工作与向惯例 actor 提交工作雷同。惟一的区别是,当 task 在 actor 上运行时,它将公布到在后盾线程或线程池中运行的异步事件循环中,而不是间接在主线程上运行。
  • 代码参考:
  1. Core worker 源码: src/ray/core_worker/core_worker.h. 此代码是任务调度、Actor 任务调度、过程内存储和内存治理中波及的各种协定的骨干。
  2. Python: python/ray/includes/libcoreworker.pxd
  3. Java: src/ray/core_worker/lib/java
  4. src/ray/core_worker/core_worker.cc
  5. src/ray/core_worker/transport/direct_actor_transport.cc
  6. src/ray/gcs/gcs_server/gcs_actor_manager.cc
  7. src/ray/gcs/gcs_server/gcs_actor_scheduler.cc
  8. src/ray/protobuf/core_worker.proto

本文分享自华为云社区《分布式应用框架 Ray 架构源码解析》,原文作者:Leo Xiao。

点击关注,第一工夫理解华为云陈腐技术~

退出移动版