OneFlow的官网文档中提供了一个结构global tensor的例子。结构时指定placement
和sbp
参数就是全局视角的global tensor,数据能够散布在多个节点上。
在单机CPU环境下,能够启动3个过程、每个过程设置不同的环境变量,运行如下Python代码就能够创立一个简略的global tensor。
# export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=3 RANK=0 LOCAL_RANK=0# export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=3 RANK=1 LOCAL_RANK=1# export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=3 RANK=2 LOCAL_RANK=2import oneflow as flowP0 = flow.placement("cpu", ranks=[0, 1])a0_sbp = flow.sbp.split(0)A0 = flow.Tensor([[1,2,3],[4,5,6]], placement=P0, sbp=a0_sbp)
1 sbp
sbp由split, broadcast, partial的首字母组合而成。
- split示意物理设施上的tensor,是将global tensor切分失去的。
- broadcast示意global tensor会复制并播送到所有的物理设施上。
- partial示意global tensor与物理设施上的tensor的形态雷同,然而物理设施上的值,只是global tensor的一部分。
Python端flow.sbp包定义了split等3种类型。其C++ binding代码在sbp_symbol.cpp中。这些类型都是SbpParallel类型,是protobuf message对象。三种类型通过oneof parallel_type共享存储。
其中broadcast和partial_sum都是空音讯,赋值时须要调用mutable办法显式表明oneof字段具体是哪种类型。
split的值示意在tensor的哪个轴上切分数据。轴的index值是一个[[0, 5]之间的整数](https://github.com/Oneflow-In...)。所有的split SbpParallel对象被保留到一个动态vector中。
2 placement的结构
placement属性指定tensor寄存在哪些物理设施上。或者,在纯CPU环境下,tensor寄存在哪些过程中。
在上述例子中,flow.placement("cpu", ranks=[0, 1])
创立一个placement对象。第一个参数是设施类型,目前反对cpu
或cuda
。ranks示意设施列表,tensor将散布在这些设施上(依据sbp的批示切分或播送数据)。
ranks只列出了rank id(全局惟一),没有指定节点host。rank与host关系是依据环境变量确定的。环境变量RANK
示意全局惟一的rank id,LOCAL_RANK
示意节点内的本地rank id。在GPU环境下,个别一个过程对应一块设施。WORLD_SIZE
示意所有节点的设施(过程)总数。
oneflow包在初始化时,会依据环境变量在各个节点间建设管制面通信连贯,以及数据面通信连贯。这样每个过程就晓得有多少个节点、有多少个设施/过程、以后过程在整个集群的地位。
通过placement的构造函数绑定能够晓得,其对应的C++类型是ParallelDesc。对象结构由函数CreateParallelDescSymbol实现。次要调用流程如下:
2.1 确定machine和device
ParseAndFormatRanks会将ranks数组[0, 1]
转为形如"machine_id:device_id"
的字符串数组,供后续解决应用。这里的逻辑决定了如何依据ranks中的id,确定tensor数据在节点和设施上的散布:
- machine_id = rank / NumOfProcessPerNode
- device_id = rank % NumOfProcessPerNode
从上述公式能够看出,各个节点的设施/过程数量须要是统一的。
2.2 结构并缓存ParallelDesc对象
CreateParallelDesc函数实现ParallelDesc的结构。其中MakeParallelConf会先依据"machine_id:device_id"
等数据结构一个cfg::ParallelConf
对象,这是一个相似oneflow::ParallelConf的类型,文件位于build/oneflow/core/job/placement.cfg.h
,是cmake构建过程中主动生成的文件。
cfg::ParallelConf
等对象的接口相似protobuf message,但实现了hash办法,能够作为hash map的key。
之后的PhysicalRun尽管波及虚拟机,但指令列表应该是空的,实质性的逻辑只是调用builder->GetParallelDescSymbol,其中的外围逻辑是FindOrCreateSymbolId,创建对象并缓存,前面的Storage::Get只是获取后果。
FindOrCreateSymbolId次要实现两级缓存:
- 首先通过IdCache<cfg::ParallelConf>实现从
cfg::ParallelConf
到symbol id(64位整数)的映射。这可能也是为何不间接用protobuf的起因之一(因为protobuf不能实现稳固的hash)? - 其次通过Storage<ParallelDesc>实现从symbol id到
ParallelDesc
的映射。
最终依据cfg::ParallelConf
能够间接获取之前缓存的ParallelDesc
对象。
CreateParallelDesc的具体调用流程如下:
3 global tensor结构调用流程
上面以本文开始的例子剖析一下结构global tensor的调用流程。这可能不是一个典型的场景,只是人为指定的数据便于debug。
通过之前探讨local tensor时的类关系图能够晓得,EagerConsistentTensorImpl内含一个local tensor的变量。能够设想,结构global tensor时,会先结构一个local tensor、再做一些后续解决。
Python端创立tensor对象时,如果像本文开始的例子那样指定placement、sbp和数据,对应的Functor是ConsistentTensorWithDataCtorFunctor。外围逻辑在MakeConsistentTensorFromData中。次要的调用流程如下:
上述各个局部的次要职能如下:
- DataConsistencyCheck会在tensor的placement波及的各个节点间拷贝数据、校验数据是否统一。
- functional::Empty这几行代码会结构一个local tensor并填充数据。这里和之前探讨local tensor的过程统一。
- functional::Cast进行数据类型dtype的转换。
- LocalToConsistent把local tensor转为global tensor。
- ToConsistent依据placement和sbp对tensor数据和shape进行裁剪。
3.1 数据一致性校验:DataConsistencyCheck
OneFlow在各个节点的所有过程都执行雷同的Python用户脚本。本文开始的这个例子,tensor的数据是人为指定的(也可能是从record文件读取的),所以相干节点须要校验一下数据的一致性。通过其它形式构建tensor可能就不须要这个步骤(比方randn)。
数据的传输和比拟只在与该placement相干的过程之间进行。比方rank=2
的节点就不会解决示例tensor的数据校验,在这里会间接返回。
placement相干的过程形成一个环(Ring),环中的每个节点给下一个节点发送数据,从上一个节点接收数据,并比拟数据是否统一。
3.2 数据类型转换:Cast
在这个例子中,Functor强制指定了tensor的数据类型是float。
而Python端如果用整数,local_tensor的dtype是int64;如果用浮点数,dtype是double。所以Cast就是将这些数据类型对立转为float。
这里其实没想太明确,local tensor不须要cast,为何global tensor须要cast呢?毕竟各个过程执行的代码都是统一的。
Cast执行的次要流程如下。CPU下的转换操作在CastCpu内实现。
3.3 local到global的转换
LocalToConsistentFunctor的op就不是UserOpExpr了,而是CastToConsistentOpExpr。而以后的inputs tensor还是local的,所以会进入对应版本的ApplyImpl执行。在ApplyImpl中,次要执行LocalToConsistent函数。WithConsistencyChecked这一大段代码,因为上面两处的作用会间接返回:
- DisableCheckConsistentTensorMetaScope
- CheckConsistentTensorMeta
LocalToConsistent是在RawLocalToConsistent上加了一层装璜。
RawLocalToConsistent的作用次要是结构一个global tensor并与之前的local tensor建设关联。但返回时,global tensor的数据和shape仍和之前残缺的local tensor一样。
RawLocalToConsistent的调用流程如下:
须要留神的是,对于rank=2
的过程来说,示例中的tensor与它无关,不会设置local tensor。对应的逻辑链条如下:
- GetTensorDevice4CurrentProcessCtx不会设置parallel_id的值。
- GetTensorDevice4CurrentProcessCtx给id_val赋一个空的Optional。
- 在rank=2的过程中,TryGetParallelId返回false。
之前说过,所有过程执行的Python代码都是一样的,然而对于rank=2
的过程来说:
- 从开始到CastFunctor,执行的逻辑与
ranks=[0,1]
的过程统一。 - 然而在LocalToConsistent这一步,不再保留local tensor。后续的global tensor就是一个空的tensor,然而shape等信息仍与其它节点统一。
3.4 shape和数据的裁剪:ToConsistent
ToConsistent依据placement和sbp对tensor数据和shape进行裁剪。其对应的Functor是ToConsistentFunctor。因为输出自身就是global tensor,所以会转给ConsistentToConsistent执行。其中的op类型是ConsistentToConsistentOpExpr。通过Dispatch后会进入对应的ApplyImpl执行,再进入RawConsistentToConsistent。
RawConsistentToConsistent外围逻辑在于调用RecursiveGetBoxingOutput,对tensor进行裁剪。这之后,如果过程与tensor相干(rank=0或1),进入if的第一个分支,将tensor后果赋值给outpus。如果过程与tensor无关(rank=2),进入第二个分支,给outputs赋一个空的tensor。
RecursiveGetBoxingOutput是用CheckConsistentTensorMeta润饰的CalcBoxingOutput。裁剪的外围逻辑由CalcBoxingOutput实现。
3.4.1 tensor元数据校验:CheckConsistentTensorMeta
顾名思义,CheckConsistentTensorMeta的作用是校验各个节点的tensor的元数据,外围是调用LaunchTensorMetaConsistencyCheck,tensor相干的每个过程会执行4次节点间通信以及数据比拟。如果元数据被缓存后,就只须要LaunchTensorMetaConsistencyCheck中的一次通信。
通信须要传输校验如下数据:
- FlatTensorConsistency
- FlatConsistentTensorMeta
- FlatNdSbp
- FlatParallelConf
3.4.2 tensor数据裁剪:CalcBoxingOutput
Boxing是不同sbp的tensor之间的主动匹配机制。CalcBoxingOutput应该是Boxing机制实现一部分。这个例子的情景也比较简单,只是对broad tensor进行裁剪。CalcBoxingOutput的调用流程如下:
首先会获取一个EagerBoxingInterpreter对象boxing_interpreter,理论的类型是NaiveEagerBoxingInterpreter。
在结构boxing_interpreter这个解释器时,会先获取BoxingExpr,RawMainBoxingExpr是将多个表达式通过Or运算串起来形成一个复合表达式,所以返回的是一个OrBoxingExpr。
失去表达式之后的一个重要步骤,就是获取BoxingFunction,这是理论执行boxing的函数。对于OrBoxingExpr来说,就是一一校验各子表达式与输入输出是否匹配。找到一个子表达式后,就调用这个子表达式的GetBoxingFunction函数。
RawMainBoxingExpr的每个表达式都有一个名字,能够晓得本文例子对应的表达式是名字为symmetric-b-to-s
的AtomicBoxingExpr。这是一个事后注册的函数,对应的BoxingFunction是SymmetricB2S。b-to-s
应该是broad to split
的缩写。整个意思应该是把一个残缺的播送tensor平均切分。在这个函数中会确定切分的参数start, stop和step。
在CPU环境下,流程会执行到SliceKernelUtil::Forward。其中SwitchDoForward是通过宏定义的一个函数。宏开展后的代码相似上面这样。理论会进入DoForward执行裁剪后的数据拷贝。
template<typename... Args> static void SwitchDoForward(const std::tuple<int32_t>& switch_tuple, Args&&... args) { static const std::map<std::tuple<int32_t>, std::function<void(Args && ...)>> case_handlers{ {SwitchCase(1), [](Args&&... args) { return SliceKernelUtil<DeviceType::kCPU, T>::DoForward<1>(std::forward<Args>(args)...); }}, }; return case_handlers.at(switch_tuple)(std::forward<Args>(args)...); };
4 用randn结构随机tensor
randn这种操作就不须要过程间的数据交互,各个过程只生成本人负责的数据就能够了。比方上面的例子:
import oneflow as flowP0 = flow.placement("cpu", ranks=[0, 1])a0_sbp = flow.sbp.split(0)A0 = flow.randn(4, 5, placement=P0, sbp=a0_sbp)# print(A0)
randn是一个UserOpExpr,global版本对应的Functor是ConsistentRandNFunctor,会进入Interpret执行。
EagerConsistentTensorImpl::New时会调用GetPhysicalShape获取local tensor的shape。randn的kernel就只生成local tensor的数据。
参考资料
- oneflow v0.7.0
- OneFlow源码浏览1:算子签名的主动推断
- OneFlow源码浏览2:Op、Kernel与解释器
- OneFlow源码浏览3:Op指令在虚拟机中的执行
- OneFlow源码浏览4:tensor体系与local tensor
- Global Tensor
- 集群的全局视角
- Global View的概念和实现
- OneFlow的Global Tensor学习笔记和实习总结