关于c++:OneFlow源码阅读5Global-Tensor

31次阅读

共计 6995 个字符,预计需要花费 18 分钟才能阅读完成。

OneFlow 的官网文档中提供了一个结构 global tensor 的例子。结构时指定 placementsbp参数就是全局视角的 global tensor,数据能够散布在多个节点上。
在单机 CPU 环境下,能够启动 3 个过程、每个过程设置不同的环境变量,运行如下 Python 代码就能够创立一个简略的 global tensor。

# export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=3 RANK=0 LOCAL_RANK=0
# export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=3 RANK=1 LOCAL_RANK=1
# export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=3 RANK=2 LOCAL_RANK=2
import oneflow as flow
P0 = flow.placement("cpu", ranks=[0, 1])
a0_sbp = flow.sbp.split(0)
A0 = flow.Tensor([[1,2,3],[4,5,6]], placement=P0, sbp=a0_sbp)

1 sbp

sbp 由 split, broadcast, partial 的首字母组合而成。

  • split 示意物理设施上的 tensor,是将 global tensor 切分失去的。
  • broadcast 示意 global tensor 会复制并播送到所有的物理设施上。
  • partial 示意 global tensor 与物理设施上的 tensor 的形态雷同,然而物理设施上的值,只是 global tensor 的一部分。

Python 端 flow.sbp 包定义了 split 等 3 种类型。其 C ++ binding 代码在 sbp_symbol.cpp 中。这些类型都是 SbpParallel 类型,是 protobuf message 对象。三种类型通过 oneof parallel_type 共享存储。

其中 broadcastpartial_sum都是空音讯,赋值时须要调用 mutable 办法显式表明 oneof 字段具体是哪种类型。
split的值示意在 tensor 的哪个轴上切分数据。轴的 index 值是一个 [[0, 5] 之间的整数](https://github.com/Oneflow-In…)。所有的 split SbpParallel 对象被保留到一个动态 vector 中。

2 placement 的结构

placement 属性指定 tensor 寄存在哪些物理设施上。或者,在纯 CPU 环境下,tensor 寄存在哪些过程中。
在上述例子中,flow.placement("cpu", ranks=[0, 1])创立一个 placement 对象。第一个参数是设施类型,目前反对 cpucuda。ranks 示意设施列表,tensor 将散布在这些设施上(依据 sbp 的批示切分或播送数据)。
ranks 只列出了 rank id(全局惟一),没有指定节点 host。rank 与 host 关系是依据环境变量确定的。环境变量 RANK 示意全局惟一的 rank id,LOCAL_RANK示意节点内的本地 rank id。在 GPU 环境下,个别一个过程对应一块设施。WORLD_SIZE示意所有节点的设施(过程)总数。

oneflow 包在初始化时,会依据环境变量在各个节点间建设管制面通信连贯,以及数据面通信连贯。这样每个过程就晓得有多少个节点、有多少个设施 / 过程、以后过程在整个集群的地位。

通过 placement 的构造函数绑定能够晓得,其对应的 C ++ 类型是 ParallelDesc。对象结构由函数 CreateParallelDescSymbol 实现。次要调用流程如下:

2.1 确定 machine 和 device

ParseAndFormatRanks 会将 ranks 数组 [0, 1] 转为形如 "machine_id:device_id" 的字符串数组,供后续解决应用。这里的逻辑决定了如何依据 ranks 中的 id,确定 tensor 数据在节点和设施上的散布:

  • machine_id = rank / NumOfProcessPerNode
  • device_id = rank % NumOfProcessPerNode

从上述公式能够看出,各个节点的设施 / 过程数量须要是统一的。

2.2 结构并缓存 ParallelDesc 对象

CreateParallelDesc 函数实现 ParallelDesc 的结构。其中 MakeParallelConf 会先依据 "machine_id:device_id" 等数据结构一个 cfg::ParallelConf 对象,这是一个相似 oneflow::ParallelConf 的类型,文件位于build/oneflow/core/job/placement.cfg.h,是 cmake 构建过程中主动生成的文件。

cfg::ParallelConf等对象的接口相似 protobuf message,但实现了 hash 办法,能够作为 hash map 的 key。

之后的 PhysicalRun 尽管波及虚拟机,但指令列表应该是空的,实质性的逻辑只是调用 builder->GetParallelDescSymbol,其中的外围逻辑是 FindOrCreateSymbolId,创建对象并缓存,前面的 Storage::Get 只是获取后果。

FindOrCreateSymbolId 次要实现两级缓存:

  • 首先通过 IdCache<cfg::ParallelConf> 实现从 cfg::ParallelConf 到 symbol id(64 位整数)的映射。这可能也是为何不间接用 protobuf 的起因之一(因为 protobuf 不能实现稳固的 hash)?
  • 其次通过 Storage<ParallelDesc> 实现从 symbol id 到 ParallelDesc 的映射。

最终依据 cfg::ParallelConf 能够间接获取之前缓存的 ParallelDesc 对象。

CreateParallelDesc 的具体调用流程如下:

3 global tensor 结构调用流程

上面以本文开始的例子剖析一下结构 global tensor 的调用流程。这可能不是一个典型的场景,只是人为指定的数据便于 debug。

通过之前探讨 local tensor 时的类关系图能够晓得,EagerConsistentTensorImpl 内含一个 local tensor 的变量。能够设想,结构 global tensor 时,会先结构一个 local tensor、再做一些后续解决。

Python 端创立 tensor 对象时,如果像本文开始的例子那样指定 placement、sbp 和数据,对应的 Functor 是 ConsistentTensorWithDataCtorFunctor。外围逻辑在 MakeConsistentTensorFromData 中。次要的调用流程如下:

上述各个局部的次要职能如下:

  • DataConsistencyCheck 会在 tensor 的 placement 波及的各个节点间拷贝数据、校验数据是否统一。
  • functional::Empty 这几行代码会结构一个 local tensor 并填充数据。这里和之前探讨 local tensor 的过程统一。
  • functional::Cast 进行数据类型 dtype 的转换。
  • LocalToConsistent 把 local tensor 转为 global tensor。
  • ToConsistent 依据 placement 和 sbp 对 tensor 数据和 shape 进行裁剪。

3.1 数据一致性校验:DataConsistencyCheck

OneFlow 在各个节点的所有过程都执行雷同的 Python 用户脚本。本文开始的这个例子,tensor 的数据是人为指定的(也可能是从 record 文件读取的),所以相干节点须要校验一下数据的一致性。通过其它形式构建 tensor 可能就不须要这个步骤(比方 randn)。

数据的传输和比拟只在与该 placement 相干的过程之间进行。比方 rank=2 的节点就不会解决示例 tensor 的数据校验,在这里会间接返回。

placement 相干的过程形成一个环(Ring),环中的每个节点给下一个节点发送数据,从上一个节点接收数据,并比拟数据是否统一。

3.2 数据类型转换:Cast

在这个例子中,Functor 强制指定了 tensor 的数据类型是 float。

而 Python 端如果用整数,local_tensor 的 dtype 是 int64;如果用浮点数,dtype 是 double。所以 Cast 就是将这些数据类型对立转为 float。

这里其实没想太明确,local tensor 不须要 cast,为何 global tensor 须要 cast 呢?毕竟各个过程执行的代码都是统一的。

Cast 执行的次要流程如下。CPU 下的转换操作在 CastCpu 内实现。

3.3 local 到 global 的转换

LocalToConsistentFunctor 的 op 就不是 UserOpExpr 了,而是 CastToConsistentOpExpr。而以后的 inputs tensor 还是 local 的,所以会进入对应版本的 ApplyImpl 执行。在 ApplyImpl 中,次要执行 LocalToConsistent 函数。WithConsistencyChecked 这一大段代码,因为上面两处的作用会间接返回:

  • DisableCheckConsistentTensorMetaScope
  • CheckConsistentTensorMeta

LocalToConsistent 是在 RawLocalToConsistent 上加了一层装璜。

RawLocalToConsistent 的作用次要是结构一个 global tensor 并与之前的 local tensor 建设关联。但返回时,global tensor 的数据和 shape 仍和之前残缺的 local tensor 一样。

RawLocalToConsistent 的调用流程如下:

须要留神的是,对于 rank=2 的过程来说,示例中的 tensor 与它无关,不会设置 local tensor。对应的逻辑链条如下:

  • GetTensorDevice4CurrentProcessCtx 不会设置 parallel_id 的值。
  • GetTensorDevice4CurrentProcessCtx 给 id_val 赋一个空的 Optional。
  • 在 rank= 2 的过程中,TryGetParallelId 返回 false。

之前说过,所有过程执行的 Python 代码都是一样的,然而对于 rank=2 的过程来说:

  • 从开始到 CastFunctor,执行的逻辑与 ranks=[0,1] 的过程统一。
  • 然而在 LocalToConsistent 这一步,不再保留 local tensor。后续的 global tensor 就是一个空的 tensor,然而 shape 等信息仍与其它节点统一。

3.4 shape 和数据的裁剪:ToConsistent

ToConsistent 依据 placement 和 sbp 对 tensor 数据和 shape 进行裁剪。其对应的 Functor 是 ToConsistentFunctor。因为输出自身就是 global tensor,所以会转给 ConsistentToConsistent 执行。其中的 op 类型是 ConsistentToConsistentOpExpr。通过 Dispatch 后会进入对应的 ApplyImpl 执行,再进入 RawConsistentToConsistent。

RawConsistentToConsistent 外围逻辑在于调用 RecursiveGetBoxingOutput,对 tensor 进行裁剪。这之后,如果过程与 tensor 相干(rank= 0 或 1),进入 if 的第一个分支,将 tensor 后果赋值给 outpus。如果过程与 tensor 无关(rank=2),进入第二个分支,给 outputs 赋一个空的 tensor。

RecursiveGetBoxingOutput 是用 CheckConsistentTensorMeta 润饰的 CalcBoxingOutput。裁剪的外围逻辑由 CalcBoxingOutput 实现。

3.4.1 tensor 元数据校验:CheckConsistentTensorMeta

顾名思义,CheckConsistentTensorMeta 的作用是校验各个节点的 tensor 的元数据,外围是调用 LaunchTensorMetaConsistencyCheck,tensor 相干的每个过程会执行 4 次节点间通信以及数据比拟。如果元数据被缓存后,就只须要 LaunchTensorMetaConsistencyCheck 中的一次通信。

通信须要传输校验如下数据:

  • FlatTensorConsistency
  • FlatConsistentTensorMeta
  • FlatNdSbp
  • FlatParallelConf

3.4.2 tensor 数据裁剪:CalcBoxingOutput

Boxing 是不同 sbp 的 tensor 之间的主动匹配机制。CalcBoxingOutput 应该是 Boxing 机制实现一部分。这个例子的情景也比较简单,只是对 broad tensor 进行裁剪。CalcBoxingOutput 的调用流程如下:

首先会获取一个 EagerBoxingInterpreter 对象 boxing_interpreter,理论的类型是 NaiveEagerBoxingInterpreter。

在结构 boxing_interpreter 这个解释器时,会先获取 BoxingExpr,RawMainBoxingExpr 是将多个表达式通过 Or 运算串起来形成一个复合表达式,所以返回的是一个 OrBoxingExpr。

失去表达式之后的一个重要步骤,就是获取 BoxingFunction,这是理论执行 boxing 的函数。对于 OrBoxingExpr 来说,就是一一校验各子表达式与输入输出是否匹配。找到一个子表达式后,就调用这个子表达式的 GetBoxingFunction 函数。

RawMainBoxingExpr 的每个表达式都有一个名字,能够晓得本文例子对应的表达式是名字为 symmetric-b-to-s 的 AtomicBoxingExpr。这是一个事后注册的函数,对应的 BoxingFunction 是 SymmetricB2S。b-to-s应该是 broad to split 的缩写。整个意思应该是把一个残缺的播送 tensor 平均切分。在这个函数中会确定切分的参数 start, stop 和 step。

在 CPU 环境下,流程会执行到 SliceKernelUtil::Forward。其中 SwitchDoForward 是通过宏定义的一个函数。宏开展后的代码相似上面这样。理论会进入 DoForward 执行裁剪后的数据拷贝。

  template<typename... Args>
  static void SwitchDoForward(const std::tuple<int32_t>& switch_tuple, Args&&... args) {static const std::map<std::tuple<int32_t>, std::function<void(Args && ...)>> case_handlers{{SwitchCase(1),
         [](Args&&... args) {return SliceKernelUtil<DeviceType::kCPU, T>::DoForward<1>(std::forward<Args>(args)...);
         }},
    };
    return case_handlers.at(switch_tuple)(std::forward<Args>(args)...);
  };

4 用 randn 结构随机 tensor

randn 这种操作就不须要过程间的数据交互,各个过程只生成本人负责的数据就能够了。比方上面的例子:

import oneflow as flow
P0 = flow.placement("cpu", ranks=[0, 1])
a0_sbp = flow.sbp.split(0)
A0 = flow.randn(4, 5, placement=P0, sbp=a0_sbp)
# print(A0)

randn 是一个 UserOpExpr,global 版本对应的 Functor 是 ConsistentRandNFunctor,会进入 Interpret 执行。
EagerConsistentTensorImpl::New 时会调用 GetPhysicalShape 获取 local tensor 的 shape。randn 的 kernel 就只生成 local tensor 的数据。

参考资料

  • oneflow v0.7.0
  • OneFlow 源码浏览 1:算子签名的主动推断
  • OneFlow 源码浏览 2:Op、Kernel 与解释器
  • OneFlow 源码浏览 3:Op 指令在虚拟机中的执行
  • OneFlow 源码浏览 4:tensor 体系与 local tensor
  • Global Tensor
  • 集群的全局视角
  • Global View 的概念和实现
  • OneFlow 的 Global Tensor 学习笔记和实习总结

正文完
 0