摘要:Ray的定位是分布式应用框架,次要指标是使能分布式应用的开发和运行。
Ray是UC Berkeley大学 RISE lab(前AMP lab) 2017年12月 开源的新一代分布式应用框架(刚公布的时候定位是高性能分布式计算框架,20年中批改定位为分布式应用框架),通过一套引擎解决简单场景问题,通过动静计算及状态共享提高效率,实现研发、运行时、容灾一体化
Ray架构解析
业务指标
Ray的定位是分布式应用框架,次要指标是使能分布式应用的开发和运行。
业务场景
具体的粗粒度应用场景包含
- 弹性负载,比方Serverless Computing
- 机器学习训练,Ray Tune, RLlib, RaySGD提供的训练能力
- 在线服务, 例如Ray Server提供在线学习的案例
- 数据处理, 例如Modin, Dask-On-Ray, MARS-on-Ray
- 长期计算(例如,并行化Python应用程序,将不同的分布式框架粘合在一起)
Ray的API让开发者能够轻松的在单个分布式应用中组合多个libraries,例如,Ray的tasks和Actors可能会call into 或called from在Ray上运行的分布式训练(e.g. torch.distributed)或者在线服务负载; 在这种场景下,Ray是作为一个“分布式胶水”零碎,因为它提供通用API接口并且性能足以撑持许多不同工作负载类型。
零碎设计指标
- Ray架构设计的外围准则是API的简略性和通用性
- Ray的零碎的外围指标是性能(低开销和程度可伸缩性)和可靠性。为了达成外围指标,设计过程中须要就义一些其余现实的指标,例如简化的零碎架构。例如,Ray应用了分布式参考计数和分布式内存之类的组件,这些组件减少了体系结构的复杂性,然而对于性能和可靠性而言却是必须的。
- 为了进步性能,Ray建设在gRPC之上,并且在许多状况下能够达到或超过gRPC的原始性能。与独自应用gRPC相比,Ray使应用程序更容易利用并行和分布式执行以及分布式内存共享(通过共享内存对象存储)。
- 为了进步可靠性,Ray的外部协定旨在确保产生故障时的正确性,同时又缩小了常见状况的开销。 Ray施行了分布式参考计数协定以确保内存平安,并提供了各种从故障中复原的选项。
- 因为Ray应用形象资源而不是机器来示意计算能力,因而Ray应用程序能够无缝的从便携机环境扩大到群集,而无需更改任何代码。 Ray通过分布式溢出调度程序和对象管理器实现了无缝扩大,而开销却很低。
相干零碎上下文
- 集群管理系统:Ray能够在Kubernetes或SLURM之类的集群管理系统之上运行,以提供更轻量的task和Actor而不是容器和服务。
- 并行框架:与Python并行化框架(例如multiprocessing或Celery)相比,Ray提供了更通用,更高性能的API。 Ray零碎还明确反对内存共享。
- 数据处理框架: 与Spark,Flink,MARS或Dask等数据处理框架相比,Ray提供了一个low-level且较简化的API。这使API更加灵便,更适宜作为“分布式胶水”框架。另一方面,Ray对数据模式,关系表或流数据流没有外在的反对。仅通过库(例如Modin,Dask-on-Ray,MARS-on-Ray)提供此类性能。
- Actor框架:与诸如Erlang和Akka之类的专用actor框架不同,Ray与现有的编程语言集成,从而反对跨语言操作和语言本机库的应用。 Ray零碎还通明地治理无状态计算的并行性,并明确反对参与者之间的内存共享。
- HPC零碎:HPC零碎都反对MPI消息传递接口,MPI是比task和actor更底层的接口。这能够使应用程序具备更大的灵活性,然而开发的复杂度加大了很多。这些零碎和库中的许多(例如NCCL,MPI)也提供了优化的个体通信原语(例如allreduce)。 Ray应用程序能够通过初始化各组Ray Actor之间的通信组来利用此类原语(例如,就像RaySGD的torch distributed)。
零碎设计
逻辑架构:
畛域模型
- Task:在与调用者不同的过程上执行的单个函数调用。工作能够是无状态的(@ ray.remote函数)或有状态的(@ ray.remote类的办法-请参见上面的Actor)。工作与调用者异步执行:.remote()调用立刻返回一个ObjectRef,可用于检索返回值。
- Object:应用程序值。这能够由工作返回,也能够通过ray.put创立。对象是不可变的:创立后就无奈批改。工人能够应用ObjectRef援用对象。
- Actor:有状态的工作过程(@ ray.remote类的实例)。 Actor工作必须应用句柄或对Actor的特定实例的Python援用来提交。
- Driver: 程序根目录。这是运行ray.init()的代码。
- Job:源自同一驱动程序的(递归)工作,对象和参与者的汇合
集群设计
如上图所示,Ray集群包含一组同类的worker节点和一个集中的全局管制存储(GCS)实例。
局部零碎元数据由GCS治理,GCS是基于可插拔数据存储的服务,这些元数据也由worker本地缓存,例如Actor的地址。 GCS治理的元数据拜访频率较低,但可能被群集中的大多数或所有worker应用,例如,群集的以后节点成员身份。这是为了确保GCS性能对于应用程序性能影响不大。
Ownership
- 大部分零碎元数据是依据去中心化理念(ownership)进行治理的:每个工作过程都治理和领有它提交的工作以及这些工作返回的“ ObjectRef”。Owner负责确保工作的执行并促成将ObjectRef解析为其根底值。相似地,worker领有通过“ ray.put”调用创立的任何对象。
- OwnerShip的设计具备以下长处(与Ray版本<0.8中应用的更集中的设计相比):
- 低工作提早(〜1 RTT,<200us)。常常拜访的零碎元数据对于必须对其进行更新的过程而言是本地的。
- 高吞吐量(每个客户端约10k工作/秒;线性扩大到集群中数百万个工作/秒),因为零碎元数据通过嵌套的近程函数调用天然散布在多个worker过程中。
- 简化的架构。owner集中了平安垃圾收集对象和零碎元数据所需的逻辑。
- 进步了可靠性。能够依据应用程序构造将工作程序故障彼此隔离,例如,一个近程调用的故障不会影响另一个。
- OwnerShip附带的一些衡量取舍是:
- 要解析“ ObjectRef”,对象的owner必须是可及的。这意味着对象必须与其owner绑定。无关对象复原和持久性的更多信息,请参见object故障和object溢出。
- 目前无奈转让ownership。
外围组件
- Ray实例由一个或多个工作节点组成,每个工作节点由以下物理过程组成:
- 一个或多个工作过程,负责工作的提交和执行。工作过程要么是无状态的(能够执行任何@ray.remote函数),要么是Actor(只能依据其@ray.remote类执行办法)。每个worker过程都与特定的作业关联。初始工作线程的默认数量等于计算机上的CPU数量。每个worker存储ownership表和小对象:
a. Ownership 表。工作线程具备援用的对象的零碎元数据,例如,用于存储援用计数。
b. in-process store,用于存储小对象。 - Raylet。raylet在同一群集上的所有作业之间共享。raylet有两个主线程:
a. 调度器。负责资源管理和满足存储在分布式对象存储中的工作参数。群集中的单个调度程序包含Ray散布式调度程序。
b. 共享内存对象存储(也称为Plasma Object Store)。负责存储和传输大型对象。集群中的单个对象存储包含Ray分布式对象存储。
每个工作过程和raylet都被调配了一个惟一的20字节标识符以及一个IP地址和端口。雷同的地址和端口能够被后续组件重用(例如,如果以前的工作过程死亡),但惟一ID永远不会被重用(即,它们在过程死亡时被标记为墓碑)。工作过程与其本地raylet过程共享命运。
- 其中一个工作节点被指定为Head节点。除了上述过程外,Head节点还托管:
- 全局管制存储(GCS)。GCS是一个键值服务器,蕴含零碎级元数据,如对象和参与者的地位。GCS目前还不反对高可用,后续版本中GCS能够在任何和多个节点上运行,而不是指定的头节点上运行。
- Driver过程(es)。Driver是一个非凡的工作过程,它执行顶级应用程序(例如,Python中的__main__)。它能够提交工作,但不能执行任何工作自身。Driver过程能够在任何节点上运行。
交互设计
利用的Driver能够通过以下形式之一连贯到Ray:
- 调用`ray.init()’,没有参数。这将启动一个嵌入式单节点Ray实例,利用能够立刻应用该实例。
- 通过指定ray.init(地址=<GCS addr>)连贯到现有的Ray集群。在后端,Driver将以指定的地址连贯到GCS,并查找群集其余组件的地址,例如其本地raylet地址。Driver必须与Ray群集的现有节点之一合部。这是因为Ray的共享内存性能,所以合部是必要的前提。
- 应用Ray客户端`ray.util.connect()'从近程计算机(例如笔记本电脑)连贯。默认状况下,每个Ray群集都会在能够接管近程客户端连贯的头节点上启动一个Ray Client Server,用来接管近程client连贯。然而因为网络提早,间接从客户端运行的某些操作可能会更慢。
Runtime
- 所有Ray外围组件都是用C++实现的。Ray通过一个名为“core worker”的通用嵌入式C++库反对Python和Java。此库实现ownership表、过程内存储,并治理与其余工作器和Raylet的gRPC通信。因为库是用C++实现的,所有语言运行时都共享Ray工作协定的通用高性能实现。
Task的lifetime
Owner负责确保提交的Task的执行,并促成将返回的ObjectRef解析为其根底值。如下图,提交Task的过程被视为后果的Owner,并负责从raylet获取资源以执行Task,Driver领有A的后果,Worker 1领有B的后果。
- 提交Task时,Owner会期待所有依赖项就绪,即作为参数传递给Task的ObjectRefs(请参见Object的lifetime)变得可用。依赖项不须要是本地的;Owner一旦认为依赖项在群集中的任何中央可用,就会立刻就绪。当依赖关系就绪时,Owner从散布式调度程序申请资源以执行工作,一旦资源可用,调度程序就会授予申请,并应用调配给owner的worker的地址进行响应。
- Owner将task spec通过gRPC发送给租用的worker来调度工作。执行工作后,worker必须存储返回值。如果返回值较小,则工作线程将值间接inline返回给Owner,Owner将其复制到其过程中对象存储区。如果返回值很大,则worker将对象存储在其本地共享内存存储中,并向所有者返回分布式内存中的ref。让owner能够援用对象,不用将对象提取到其本地节点。
- 当Task以ObjectRef作为其参数提交时,必须在worker开始执行之前解析对象值。如果该值较小,则它将间接从所有者的过程中对象存储复制到任务说明中,在任务说明中,执行worker线程能够援用它。如果该值较大,则必须从分布式内存中提取对象,以便worker在其本地共享内存存储中具备正本。scheduler通过查找对象的地位并从其余节点申请副原本协调此对象传输。
- 容错:工作可能会以谬误完结。Ray辨别了两种类型的工作谬误:
- 应用程序级。这是工作过程处于活动状态,但工作以谬误完结的任何场景。例如,在Python中抛出IndexError的工作。
- 零碎级。这是工作过程意外死亡的任何场景。例如,隔离故障的过程,或者如果工作程序的本地raylet死亡。
- 因为应用程序级谬误而失败的工作永远不会重试。异样被捕捉并存储为工作的返回值。因为零碎级谬误而失败的工作能够主动重试到指定的尝试次数。
- 代码参考:
- src/ray/core_worker/core_worker.cc
- src/ray/common/task/task_spec.h
- src/ray/core_worker/transport/direct_task_transport.cc
- src/ray/core_worker/transport/依赖关系_解析器.cc
- src/ray/core_worker/task_manager.cc
- src/ray/protobuf/common.proto
Object的lifetime
下图Ray中的分布式内存治理。worker能够创立和获取对象。owner负责确定对象何时平安开释。
- 对象的owner就是通过提交创立task或调用ray.put创立初始ObjectRef的worker。owner治理对象的生存期。Ray保障,如果owner是活的,对象最终可能会被解析为其值(或者在worker失败的状况下引发谬误)。如果owner已死亡,则获取对象值的尝试永远不会hang,但可能会引发异样,即便对象仍有物理正本。
- 每个worker存储其领有的对象的援用计数。无关如何跟踪援用的详细信息,请参阅援用计数。Reference仅在上面两种操作期间计算:
1.将ObjectRef或蕴含ObjectRef的对象作为参数传递给Task。
2.从Task中返回ObjectRef或蕴含ObjectRef的对象。 - 对象能够存储在owner的过程内内存存储中,也能够存储在分布式对象存储中。此决定旨在缩小每个对象的内存占用空间和解析工夫。
- 当没有故障时,owner保障,只有对象仍在作用域中(非零援用计数),对象的至多一个正本最终将可用。。
- 有两种办法能够将ObjectRef解析为其值:
1.在ObjectRef上调用ray.get。
2.将ObjectRef作为参数传递给工作。执行工作程序将解析ObjectRefs,并将工作参数替换为解析的值。 - 当对象较小时,能够通过间接从owner的过程内存储中检索它来解析。大对象存储在分布式对象存储中,必须应用分布式协定解析。
- 当没有故障时,解析将保障最终胜利(但可能会引发应用程序级异样,例如worker segfault)。如果存在故障,解析可能会引发零碎级异样,但永远不会挂起。如果对象存储在分布式内存中,并且对象的所有正本都因raylet故障而失落,则该对象可能会失败。Ray还提供了一个选项,能够通过重建主动复原此类失落的对象。如果对象的所有者过程死亡,对象也可能失败。
- 代码参考:
- src/ray/core_worker/store_Provider/memory_store/memory_store.cc
- src/ray/core_worker/store_Provider/plasma_store_provider.cc
- src/ray/core_worker/reference_count.cc
- src/ray/object_manager/object_manager.cc
Actor的lifetime
Actor的lifetimes和metadata (如IP和端口)是由GCS service治理的.每一个Actor的Client都会在本地缓存metadata,应用metadata通过gRPC将task发送给Actor.
如上图,与Task提交不同,Task提交齐全扩散并由Task Owner治理,Actor lifetime由GCS服务集中管理。
- 在Python中创立Actor时,worker首先同步向GCS注册Actor。这确保了在创立Actor之前,如果创立worker失败的状况下的正确性。一旦GCS响应,Actor创立过程的其余部分将是异步的。Worker过程在创立一个称为Actor创立Task的非凡Task队列。这与一般的非Actor工作相似,只是其指定的资源是在actor过程的生存期内获取的。创建者异步解析actor创立task的依赖关系,而后将其发送到要调度的GCS服务。同时,创立actor的Python调用立刻返回一个“actor句柄”,即便actor创立工作尚未调度,也能够应用该句柄。
- Actor的工作执行与一般Task 相似:它们返回futures,通过gRPC间接提交给actor过程,在解析所有ObjectRef依赖关系之前,不会运行。和一般Task次要有两个区别:
- 执行Actor工作不须要从调度器获取资源。这是因为在打算其创立工作时,参与者已在其生命周期内取得资源。
- 对于Actor的每个调用者,工作的执行程序与提交程序雷同。
- 当Actor的创建者退出时,或者群集中的作用域中没有更多挂起的工作或句柄时,将被清理。不过对于detached Actor来说不是这样的,因为detached actor被设计为能够通过名称援用的长Actor,必须应用ray.kill(no_restart=True)显式清理。
- Ray还反对async actor,这些Actor能够应用asyncio event loop并发运行工作。从调用者的角度来看,向这些actor提交工作与向惯例actor提交工作雷同。惟一的区别是,当task在actor上运行时,它将公布到在后盾线程或线程池中运行的异步事件循环中,而不是间接在主线程上运行。
- 代码参考:
- Core worker源码: src/ray/core_worker/core_worker.h. 此代码是任务调度、Actor任务调度、过程内存储和内存治理中波及的各种协定的骨干。
- Python: python/ray/includes/libcoreworker.pxd
- Java: src/ray/core_worker/lib/java
- src/ray/core_worker/core_worker.cc
- src/ray/core_worker/transport/direct_actor_transport.cc
- src/ray/gcs/gcs_server/gcs_actor_manager.cc
- src/ray/gcs/gcs_server/gcs_actor_scheduler.cc
- src/ray/protobuf/core_worker.proto
本文分享自华为云社区《分布式应用框架Ray架构源码解析》,原文作者:Leo Xiao 。
点击关注,第一工夫理解华为云陈腐技术~