OneFlow的官网文档中提供了一个结构global tensor的例子。结构时指定placementsbp参数就是全局视角的global tensor,数据能够散布在多个节点上。
在单机CPU环境下,能够启动3个过程、每个过程设置不同的环境变量,运行如下Python代码就能够创立一个简略的global tensor。

# export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=3 RANK=0 LOCAL_RANK=0# export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=3 RANK=1 LOCAL_RANK=1# export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=3 RANK=2 LOCAL_RANK=2import oneflow as flowP0 = flow.placement("cpu", ranks=[0, 1])a0_sbp = flow.sbp.split(0)A0 = flow.Tensor([[1,2,3],[4,5,6]], placement=P0, sbp=a0_sbp)

1 sbp

sbp由split, broadcast, partial的首字母组合而成。

  • split示意物理设施上的tensor,是将global tensor切分失去的。
  • broadcast示意global tensor会复制并播送到所有的物理设施上。
  • partial示意global tensor与物理设施上的tensor的形态雷同,然而物理设施上的值,只是global tensor的一部分。

Python端flow.sbp包定义了split等3种类型。其C++ binding代码在sbp_symbol.cpp中。这些类型都是SbpParallel类型,是protobuf message对象。三种类型通过oneof parallel_type共享存储。

其中broadcastpartial_sum都是空音讯,赋值时须要调用mutable办法显式表明oneof字段具体是哪种类型。
split的值示意在tensor的哪个轴上切分数据。轴的index值是一个[[0, 5]之间的整数](https://github.com/Oneflow-In...)。所有的split SbpParallel对象被保留到一个动态vector中。

2 placement的结构

placement属性指定tensor寄存在哪些物理设施上。或者,在纯CPU环境下,tensor寄存在哪些过程中。
在上述例子中,flow.placement("cpu", ranks=[0, 1])创立一个placement对象。第一个参数是设施类型,目前反对cpucuda。ranks示意设施列表,tensor将散布在这些设施上(依据sbp的批示切分或播送数据)。
ranks只列出了rank id(全局惟一),没有指定节点host。rank与host关系是依据环境变量确定的。环境变量RANK示意全局惟一的rank id,LOCAL_RANK示意节点内的本地rank id。在GPU环境下,个别一个过程对应一块设施。WORLD_SIZE示意所有节点的设施(过程)总数。

oneflow包在初始化时,会依据环境变量在各个节点间建设管制面通信连贯,以及数据面通信连贯。这样每个过程就晓得有多少个节点、有多少个设施/过程、以后过程在整个集群的地位。

通过placement的构造函数绑定能够晓得,其对应的C++类型是ParallelDesc。对象结构由函数CreateParallelDescSymbol实现。次要调用流程如下:

2.1 确定machine和device

ParseAndFormatRanks会将ranks数组[0, 1]转为形如"machine_id:device_id"的字符串数组,供后续解决应用。这里的逻辑决定了如何依据ranks中的id,确定tensor数据在节点和设施上的散布:

  • machine_id = rank / NumOfProcessPerNode
  • device_id = rank % NumOfProcessPerNode

从上述公式能够看出,各个节点的设施/过程数量须要是统一的。

2.2 结构并缓存ParallelDesc对象

CreateParallelDesc函数实现ParallelDesc的结构。其中MakeParallelConf会先依据"machine_id:device_id"等数据结构一个cfg::ParallelConf对象,这是一个相似oneflow::ParallelConf的类型,文件位于build/oneflow/core/job/placement.cfg.h,是cmake构建过程中主动生成的文件。

cfg::ParallelConf等对象的接口相似protobuf message,但实现了hash办法,能够作为hash map的key。

之后的PhysicalRun尽管波及虚拟机,但指令列表应该是空的,实质性的逻辑只是调用builder->GetParallelDescSymbol,其中的外围逻辑是FindOrCreateSymbolId,创建对象并缓存,前面的Storage::Get只是获取后果。

FindOrCreateSymbolId次要实现两级缓存:

  • 首先通过IdCache<cfg::ParallelConf>实现从cfg::ParallelConf到symbol id(64位整数)的映射。这可能也是为何不间接用protobuf的起因之一(因为protobuf不能实现稳固的hash)?
  • 其次通过Storage<ParallelDesc>实现从symbol id到ParallelDesc的映射。

最终依据cfg::ParallelConf能够间接获取之前缓存的ParallelDesc对象。

CreateParallelDesc的具体调用流程如下:

3 global tensor结构调用流程

上面以本文开始的例子剖析一下结构global tensor的调用流程。这可能不是一个典型的场景,只是人为指定的数据便于debug。

通过之前探讨local tensor时的类关系图能够晓得,EagerConsistentTensorImpl内含一个local tensor的变量。能够设想,结构global tensor时,会先结构一个local tensor、再做一些后续解决。

Python端创立tensor对象时,如果像本文开始的例子那样指定placement、sbp和数据,对应的Functor是ConsistentTensorWithDataCtorFunctor。外围逻辑在MakeConsistentTensorFromData中。次要的调用流程如下:

上述各个局部的次要职能如下:

  • DataConsistencyCheck会在tensor的placement波及的各个节点间拷贝数据、校验数据是否统一。
  • functional::Empty这几行代码会结构一个local tensor并填充数据。这里和之前探讨local tensor的过程统一。
  • functional::Cast进行数据类型dtype的转换。
  • LocalToConsistent把local tensor转为global tensor。
  • ToConsistent依据placement和sbp对tensor数据和shape进行裁剪。

3.1 数据一致性校验:DataConsistencyCheck

OneFlow在各个节点的所有过程都执行雷同的Python用户脚本。本文开始的这个例子,tensor的数据是人为指定的(也可能是从record文件读取的),所以相干节点须要校验一下数据的一致性。通过其它形式构建tensor可能就不须要这个步骤(比方randn)。

数据的传输和比拟只在与该placement相干的过程之间进行。比方rank=2的节点就不会解决示例tensor的数据校验,在这里会间接返回。

placement相干的过程形成一个环(Ring),环中的每个节点给下一个节点发送数据,从上一个节点接收数据,并比拟数据是否统一。

3.2 数据类型转换:Cast

在这个例子中,Functor强制指定了tensor的数据类型是float。

而Python端如果用整数,local_tensor的dtype是int64;如果用浮点数,dtype是double。所以Cast就是将这些数据类型对立转为float。

这里其实没想太明确,local tensor不须要cast,为何global tensor须要cast呢?毕竟各个过程执行的代码都是统一的。

Cast执行的次要流程如下。CPU下的转换操作在CastCpu内实现。

3.3 local到global的转换

LocalToConsistentFunctor的op就不是UserOpExpr了,而是CastToConsistentOpExpr。而以后的inputs tensor还是local的,所以会进入对应版本的ApplyImpl执行。在ApplyImpl中,次要执行LocalToConsistent函数。WithConsistencyChecked这一大段代码,因为上面两处的作用会间接返回:

  • DisableCheckConsistentTensorMetaScope
  • CheckConsistentTensorMeta

LocalToConsistent是在RawLocalToConsistent上加了一层装璜。

RawLocalToConsistent的作用次要是结构一个global tensor并与之前的local tensor建设关联。但返回时,global tensor的数据和shape仍和之前残缺的local tensor一样。

RawLocalToConsistent的调用流程如下:

须要留神的是,对于rank=2的过程来说,示例中的tensor与它无关,不会设置local tensor。对应的逻辑链条如下:

  • GetTensorDevice4CurrentProcessCtx不会设置parallel_id的值。
  • GetTensorDevice4CurrentProcessCtx给id_val赋一个空的Optional。
  • 在rank=2的过程中,TryGetParallelId返回false。

之前说过,所有过程执行的Python代码都是一样的,然而对于rank=2的过程来说:

  • 从开始到CastFunctor,执行的逻辑与ranks=[0,1]的过程统一。
  • 然而在LocalToConsistent这一步,不再保留local tensor。后续的global tensor就是一个空的tensor,然而shape等信息仍与其它节点统一。

3.4 shape和数据的裁剪:ToConsistent

ToConsistent依据placement和sbp对tensor数据和shape进行裁剪。其对应的Functor是ToConsistentFunctor。因为输出自身就是global tensor,所以会转给ConsistentToConsistent执行。其中的op类型是ConsistentToConsistentOpExpr。通过Dispatch后会进入对应的ApplyImpl执行,再进入RawConsistentToConsistent。

RawConsistentToConsistent外围逻辑在于调用RecursiveGetBoxingOutput,对tensor进行裁剪。这之后,如果过程与tensor相干(rank=0或1),进入if的第一个分支,将tensor后果赋值给outpus。如果过程与tensor无关(rank=2),进入第二个分支,给outputs赋一个空的tensor。

RecursiveGetBoxingOutput是用CheckConsistentTensorMeta润饰的CalcBoxingOutput。裁剪的外围逻辑由CalcBoxingOutput实现。

3.4.1 tensor元数据校验:CheckConsistentTensorMeta

顾名思义,CheckConsistentTensorMeta的作用是校验各个节点的tensor的元数据,外围是调用LaunchTensorMetaConsistencyCheck,tensor相干的每个过程会执行4次节点间通信以及数据比拟。如果元数据被缓存后,就只须要LaunchTensorMetaConsistencyCheck中的一次通信。

通信须要传输校验如下数据:

  • FlatTensorConsistency
  • FlatConsistentTensorMeta
  • FlatNdSbp
  • FlatParallelConf

3.4.2 tensor数据裁剪:CalcBoxingOutput

Boxing是不同sbp的tensor之间的主动匹配机制。CalcBoxingOutput应该是Boxing机制实现一部分。这个例子的情景也比较简单,只是对broad tensor进行裁剪。CalcBoxingOutput的调用流程如下:

首先会获取一个EagerBoxingInterpreter对象boxing_interpreter,理论的类型是NaiveEagerBoxingInterpreter。

在结构boxing_interpreter这个解释器时,会先获取BoxingExpr,RawMainBoxingExpr是将多个表达式通过Or运算串起来形成一个复合表达式,所以返回的是一个OrBoxingExpr。

失去表达式之后的一个重要步骤,就是获取BoxingFunction,这是理论执行boxing的函数。对于OrBoxingExpr来说,就是一一校验各子表达式与输入输出是否匹配。找到一个子表达式后,就调用这个子表达式的GetBoxingFunction函数。

RawMainBoxingExpr的每个表达式都有一个名字,能够晓得本文例子对应的表达式是名字为symmetric-b-to-s的AtomicBoxingExpr。这是一个事后注册的函数,对应的BoxingFunction是SymmetricB2S。b-to-s应该是broad to split的缩写。整个意思应该是把一个残缺的播送tensor平均切分。在这个函数中会确定切分的参数start, stop和step。

在CPU环境下,流程会执行到SliceKernelUtil::Forward。其中SwitchDoForward是通过宏定义的一个函数。宏开展后的代码相似上面这样。理论会进入DoForward执行裁剪后的数据拷贝。

  template<typename... Args>  static void SwitchDoForward(const std::tuple<int32_t>& switch_tuple, Args&&... args) {    static const std::map<std::tuple<int32_t>, std::function<void(Args && ...)>> case_handlers{        {SwitchCase(1),         [](Args&&... args) {           return SliceKernelUtil<DeviceType::kCPU, T>::DoForward<1>(std::forward<Args>(args)...);         }},    };    return case_handlers.at(switch_tuple)(std::forward<Args>(args)...);  };

4 用randn结构随机tensor

randn这种操作就不须要过程间的数据交互,各个过程只生成本人负责的数据就能够了。比方上面的例子:

import oneflow as flowP0 = flow.placement("cpu", ranks=[0, 1])a0_sbp = flow.sbp.split(0)A0 = flow.randn(4, 5, placement=P0, sbp=a0_sbp)# print(A0)

randn是一个UserOpExpr,global版本对应的Functor是ConsistentRandNFunctor,会进入Interpret执行。
EagerConsistentTensorImpl::New时会调用GetPhysicalShape获取local tensor的shape。randn的kernel就只生成local tensor的数据。

参考资料

  • oneflow v0.7.0
  • OneFlow源码浏览1:算子签名的主动推断
  • OneFlow源码浏览2:Op、Kernel与解释器
  • OneFlow源码浏览3:Op指令在虚拟机中的执行
  • OneFlow源码浏览4:tensor体系与local tensor
  • Global Tensor
  • 集群的全局视角
  • Global View的概念和实现
  • OneFlow的Global Tensor学习笔记和实习总结