OneFlow 的官网文档中提供了一个结构 global tensor 的例子。结构时指定 placement
和sbp
参数就是全局视角的 global tensor,数据能够散布在多个节点上。
在单机 CPU 环境下,能够启动 3 个过程、每个过程设置不同的环境变量,运行如下 Python 代码就能够创立一个简略的 global tensor。
# export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=3 RANK=0 LOCAL_RANK=0
# export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=3 RANK=1 LOCAL_RANK=1
# export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=3 RANK=2 LOCAL_RANK=2
import oneflow as flow
P0 = flow.placement("cpu", ranks=[0, 1])
a0_sbp = flow.sbp.split(0)
A0 = flow.Tensor([[1,2,3],[4,5,6]], placement=P0, sbp=a0_sbp)
1 sbp
sbp 由 split, broadcast, partial 的首字母组合而成。
- split 示意物理设施上的 tensor,是将 global tensor 切分失去的。
- broadcast 示意 global tensor 会复制并播送到所有的物理设施上。
- partial 示意 global tensor 与物理设施上的 tensor 的形态雷同,然而物理设施上的值,只是 global tensor 的一部分。
Python 端 flow.sbp 包定义了 split 等 3 种类型。其 C ++ binding 代码在 sbp_symbol.cpp 中。这些类型都是 SbpParallel 类型,是 protobuf message 对象。三种类型通过 oneof parallel_type 共享存储。
其中 broadcast 和partial_sum都是空音讯,赋值时须要调用 mutable 办法显式表明 oneof 字段具体是哪种类型。
split的值示意在 tensor 的哪个轴上切分数据。轴的 index 值是一个 [[0, 5] 之间的整数](https://github.com/Oneflow-In…)。所有的 split SbpParallel 对象被保留到一个动态 vector 中。
2 placement 的结构
placement 属性指定 tensor 寄存在哪些物理设施上。或者,在纯 CPU 环境下,tensor 寄存在哪些过程中。
在上述例子中,flow.placement("cpu", ranks=[0, 1])
创立一个 placement 对象。第一个参数是设施类型,目前反对 cpu
或cuda
。ranks 示意设施列表,tensor 将散布在这些设施上(依据 sbp 的批示切分或播送数据)。
ranks 只列出了 rank id(全局惟一),没有指定节点 host。rank 与 host 关系是依据环境变量确定的。环境变量 RANK
示意全局惟一的 rank id,LOCAL_RANK
示意节点内的本地 rank id。在 GPU 环境下,个别一个过程对应一块设施。WORLD_SIZE
示意所有节点的设施(过程)总数。
oneflow 包在初始化时,会依据环境变量在各个节点间建设管制面通信连贯,以及数据面通信连贯。这样每个过程就晓得有多少个节点、有多少个设施 / 过程、以后过程在整个集群的地位。
通过 placement 的构造函数绑定能够晓得,其对应的 C ++ 类型是 ParallelDesc。对象结构由函数 CreateParallelDescSymbol 实现。次要调用流程如下:
2.1 确定 machine 和 device
ParseAndFormatRanks 会将 ranks 数组 [0, 1]
转为形如 "machine_id:device_id"
的字符串数组,供后续解决应用。这里的逻辑决定了如何依据 ranks 中的 id,确定 tensor 数据在节点和设施上的散布:
- machine_id = rank / NumOfProcessPerNode
- device_id = rank % NumOfProcessPerNode
从上述公式能够看出,各个节点的设施 / 过程数量须要是统一的。
2.2 结构并缓存 ParallelDesc 对象
CreateParallelDesc 函数实现 ParallelDesc 的结构。其中 MakeParallelConf 会先依据 "machine_id:device_id"
等数据结构一个 cfg::ParallelConf
对象,这是一个相似 oneflow::ParallelConf 的类型,文件位于build/oneflow/core/job/placement.cfg.h
,是 cmake 构建过程中主动生成的文件。
cfg::ParallelConf
等对象的接口相似 protobuf message,但实现了 hash 办法,能够作为 hash map 的 key。
之后的 PhysicalRun 尽管波及虚拟机,但指令列表应该是空的,实质性的逻辑只是调用 builder->GetParallelDescSymbol,其中的外围逻辑是 FindOrCreateSymbolId,创建对象并缓存,前面的 Storage::Get 只是获取后果。
FindOrCreateSymbolId 次要实现两级缓存:
- 首先通过 IdCache<cfg::ParallelConf> 实现从
cfg::ParallelConf
到 symbol id(64 位整数)的映射。这可能也是为何不间接用 protobuf 的起因之一(因为 protobuf 不能实现稳固的 hash)? - 其次通过 Storage<ParallelDesc> 实现从 symbol id 到
ParallelDesc
的映射。
最终依据 cfg::ParallelConf
能够间接获取之前缓存的 ParallelDesc
对象。
CreateParallelDesc 的具体调用流程如下:
3 global tensor 结构调用流程
上面以本文开始的例子剖析一下结构 global tensor 的调用流程。这可能不是一个典型的场景,只是人为指定的数据便于 debug。
通过之前探讨 local tensor 时的类关系图能够晓得,EagerConsistentTensorImpl 内含一个 local tensor 的变量。能够设想,结构 global tensor 时,会先结构一个 local tensor、再做一些后续解决。
Python 端创立 tensor 对象时,如果像本文开始的例子那样指定 placement、sbp 和数据,对应的 Functor 是 ConsistentTensorWithDataCtorFunctor。外围逻辑在 MakeConsistentTensorFromData 中。次要的调用流程如下:
上述各个局部的次要职能如下:
- DataConsistencyCheck 会在 tensor 的 placement 波及的各个节点间拷贝数据、校验数据是否统一。
- functional::Empty 这几行代码会结构一个 local tensor 并填充数据。这里和之前探讨 local tensor 的过程统一。
- functional::Cast 进行数据类型 dtype 的转换。
- LocalToConsistent 把 local tensor 转为 global tensor。
- ToConsistent 依据 placement 和 sbp 对 tensor 数据和 shape 进行裁剪。
3.1 数据一致性校验:DataConsistencyCheck
OneFlow 在各个节点的所有过程都执行雷同的 Python 用户脚本。本文开始的这个例子,tensor 的数据是人为指定的(也可能是从 record 文件读取的),所以相干节点须要校验一下数据的一致性。通过其它形式构建 tensor 可能就不须要这个步骤(比方 randn)。
数据的传输和比拟只在与该 placement 相干的过程之间进行。比方 rank=2
的节点就不会解决示例 tensor 的数据校验,在这里会间接返回。
placement 相干的过程形成一个环(Ring),环中的每个节点给下一个节点发送数据,从上一个节点接收数据,并比拟数据是否统一。
3.2 数据类型转换:Cast
在这个例子中,Functor 强制指定了 tensor 的数据类型是 float。
而 Python 端如果用整数,local_tensor 的 dtype 是 int64;如果用浮点数,dtype 是 double。所以 Cast 就是将这些数据类型对立转为 float。
这里其实没想太明确,local tensor 不须要 cast,为何 global tensor 须要 cast 呢?毕竟各个过程执行的代码都是统一的。
Cast 执行的次要流程如下。CPU 下的转换操作在 CastCpu 内实现。
3.3 local 到 global 的转换
LocalToConsistentFunctor 的 op 就不是 UserOpExpr 了,而是 CastToConsistentOpExpr。而以后的 inputs tensor 还是 local 的,所以会进入对应版本的 ApplyImpl 执行。在 ApplyImpl 中,次要执行 LocalToConsistent 函数。WithConsistencyChecked 这一大段代码,因为上面两处的作用会间接返回:
- DisableCheckConsistentTensorMetaScope
- CheckConsistentTensorMeta
LocalToConsistent 是在 RawLocalToConsistent 上加了一层装璜。
RawLocalToConsistent 的作用次要是结构一个 global tensor 并与之前的 local tensor 建设关联。但返回时,global tensor 的数据和 shape 仍和之前残缺的 local tensor 一样。
RawLocalToConsistent 的调用流程如下:
须要留神的是,对于 rank=2
的过程来说,示例中的 tensor 与它无关,不会设置 local tensor。对应的逻辑链条如下:
- GetTensorDevice4CurrentProcessCtx 不会设置 parallel_id 的值。
- GetTensorDevice4CurrentProcessCtx 给 id_val 赋一个空的 Optional。
- 在 rank= 2 的过程中,TryGetParallelId 返回 false。
之前说过,所有过程执行的 Python 代码都是一样的,然而对于 rank=2
的过程来说:
- 从开始到 CastFunctor,执行的逻辑与
ranks=[0,1]
的过程统一。 - 然而在 LocalToConsistent 这一步,不再保留 local tensor。后续的 global tensor 就是一个空的 tensor,然而 shape 等信息仍与其它节点统一。
3.4 shape 和数据的裁剪:ToConsistent
ToConsistent 依据 placement 和 sbp 对 tensor 数据和 shape 进行裁剪。其对应的 Functor 是 ToConsistentFunctor。因为输出自身就是 global tensor,所以会转给 ConsistentToConsistent 执行。其中的 op 类型是 ConsistentToConsistentOpExpr。通过 Dispatch 后会进入对应的 ApplyImpl 执行,再进入 RawConsistentToConsistent。
RawConsistentToConsistent 外围逻辑在于调用 RecursiveGetBoxingOutput,对 tensor 进行裁剪。这之后,如果过程与 tensor 相干(rank= 0 或 1),进入 if 的第一个分支,将 tensor 后果赋值给 outpus。如果过程与 tensor 无关(rank=2),进入第二个分支,给 outputs 赋一个空的 tensor。
RecursiveGetBoxingOutput 是用 CheckConsistentTensorMeta 润饰的 CalcBoxingOutput。裁剪的外围逻辑由 CalcBoxingOutput 实现。
3.4.1 tensor 元数据校验:CheckConsistentTensorMeta
顾名思义,CheckConsistentTensorMeta 的作用是校验各个节点的 tensor 的元数据,外围是调用 LaunchTensorMetaConsistencyCheck,tensor 相干的每个过程会执行 4 次节点间通信以及数据比拟。如果元数据被缓存后,就只须要 LaunchTensorMetaConsistencyCheck 中的一次通信。
通信须要传输校验如下数据:
- FlatTensorConsistency
- FlatConsistentTensorMeta
- FlatNdSbp
- FlatParallelConf
3.4.2 tensor 数据裁剪:CalcBoxingOutput
Boxing 是不同 sbp 的 tensor 之间的主动匹配机制。CalcBoxingOutput 应该是 Boxing 机制实现一部分。这个例子的情景也比较简单,只是对 broad tensor 进行裁剪。CalcBoxingOutput 的调用流程如下:
首先会获取一个 EagerBoxingInterpreter 对象 boxing_interpreter,理论的类型是 NaiveEagerBoxingInterpreter。
在结构 boxing_interpreter 这个解释器时,会先获取 BoxingExpr,RawMainBoxingExpr 是将多个表达式通过 Or 运算串起来形成一个复合表达式,所以返回的是一个 OrBoxingExpr。
失去表达式之后的一个重要步骤,就是获取 BoxingFunction,这是理论执行 boxing 的函数。对于 OrBoxingExpr 来说,就是一一校验各子表达式与输入输出是否匹配。找到一个子表达式后,就调用这个子表达式的 GetBoxingFunction 函数。
RawMainBoxingExpr 的每个表达式都有一个名字,能够晓得本文例子对应的表达式是名字为 symmetric-b-to-s
的 AtomicBoxingExpr。这是一个事后注册的函数,对应的 BoxingFunction 是 SymmetricB2S。b-to-s
应该是 broad to split
的缩写。整个意思应该是把一个残缺的播送 tensor 平均切分。在这个函数中会确定切分的参数 start, stop 和 step。
在 CPU 环境下,流程会执行到 SliceKernelUtil::Forward。其中 SwitchDoForward 是通过宏定义的一个函数。宏开展后的代码相似上面这样。理论会进入 DoForward 执行裁剪后的数据拷贝。
template<typename... Args>
static void SwitchDoForward(const std::tuple<int32_t>& switch_tuple, Args&&... args) {static const std::map<std::tuple<int32_t>, std::function<void(Args && ...)>> case_handlers{{SwitchCase(1),
[](Args&&... args) {return SliceKernelUtil<DeviceType::kCPU, T>::DoForward<1>(std::forward<Args>(args)...);
}},
};
return case_handlers.at(switch_tuple)(std::forward<Args>(args)...);
};
4 用 randn 结构随机 tensor
randn 这种操作就不须要过程间的数据交互,各个过程只生成本人负责的数据就能够了。比方上面的例子:
import oneflow as flow
P0 = flow.placement("cpu", ranks=[0, 1])
a0_sbp = flow.sbp.split(0)
A0 = flow.randn(4, 5, placement=P0, sbp=a0_sbp)
# print(A0)
randn 是一个 UserOpExpr,global 版本对应的 Functor 是 ConsistentRandNFunctor,会进入 Interpret 执行。
EagerConsistentTensorImpl::New 时会调用 GetPhysicalShape 获取 local tensor 的 shape。randn 的 kernel 就只生成 local tensor 的数据。
参考资料
- oneflow v0.7.0
- OneFlow 源码浏览 1:算子签名的主动推断
- OneFlow 源码浏览 2:Op、Kernel 与解释器
- OneFlow 源码浏览 3:Op 指令在虚拟机中的执行
- OneFlow 源码浏览 4:tensor 体系与 local tensor
- Global Tensor
- 集群的全局视角
- Global View 的概念和实现
- OneFlow 的 Global Tensor 学习笔记和实习总结