关于c++:OneFlow源码阅读7静态图与运行时

25次阅读

共计 20332 个字符,预计需要花费 51 分钟才能阅读完成。

oneflow 动态图的训练效率远高于动态图(eager 模式)。本文试图通过一个简略例子,联合 0.8.0 的代码,看一下动态图和运行时的实现机制。
在开始之前,倡议先读一下参考资料中《OneFlow 框架的零碎设计》等系列文章。对动态图、运行时的基本概念和设计理念有根本的理解,会更容易了解代码。

1 代码示例

上面的示例代码来自官网文档,是一个线性模型的前向计算。后续次要基于这段代码进行剖析。

import oneflow as flow
import oneflow.nn as nn

class ModuleMyLinear(nn.Module):
    def __init__(self, in_features, out_features):
        super().__init__()
        self.weight = nn.Parameter(flow.randn(in_features, out_features))
        self.bias = nn.Parameter(flow.randn(out_features))

    def forward(self, input):
        return flow.matmul(input, self.weight) + self.bias

linear_model = ModuleMyLinear(4, 3)

class GraphMyLinear(nn.Graph):
    def __init__(self):
        super().__init__()
        # ModuleBlock
        self.model = linear_model

    def build(self, input):
        # ModuleBlock.__call__
        return self.model(input)

graph_mylinear = GraphMyLinear()
input = flow.randn(1, 4)
out = graph_mylinear(input)
print(out)

2 oneflow 包的初始化

import oneflow在初始化包时,与动态图相干的次要操作如下:

  • GetEnv

    • EnvGlobalObjectsScope::Init

      • 启动各个节点的管制面网络连接
      • 初始化 VM
      • 启动各个节点的数据面网络连接
      • 初始化 KernelObserver
  • NewDefaultSession

    • RegsiterSession 创立 Session,并注册为 default session
    • 创立 Python MultiClientSession 并保留到 dict,但并不 TryInit

      • 创立 C++ MultiClientSessionContext 但并不 TryInit

EnvGlobalObjectsScope::Init中先创立一个全局的 ProcessCtx 对象。而后依据环境变量等配置,在各个过程间创立 gRPC 和 CommNet 的连贯,别离负责管制面和数据面的数据传输。其中在 Bootstrap 过程中会初始化全局的 ProcessCtx,给每个过程调配一个全局惟一的 rank 编号(machine_id)。
本文不波及网络层面的操作,只探讨同一过程内各线程间的交互。

3 Module 类

尽管能够间接用 op 和 tensor 结构模型,然而 op 的粒度太细了,间接用 op 结构模型会比拟繁琐。
Module 是由 op 和 tensor 形成的、可复用的子模块。利用 Module 能够更高效、更快捷的构建简单模型。oneflow.nn 模块导出了很多预约义的 Module。

Module 定义了本人的属性设置逻辑,外围逻辑是

  • 如果 value 是 Parameter 类型,就保留到 Module._parameters
  • 如果 value 是 Module 类型,就保留到 Module._modules
  • 如果 value 是 Tensor 类型,就保留到 Module._buffers
  • 否则按惯例属性解决

Module 能够蕴含子 Module,造成树结构。因为 Module 通过 setattr 将子 Module 和 Parameter 都保留到字典构造中,能够不便的遍历所有 Module 及其参数 tensor。

4 Graph 类

4.1 构造函数

Graph 的构造函数中 GetDefaultSession 失去的 session,就是导入 oneflow 包时 NewDefaultSession 构建的 session。过后没有初始化,而是在 Graph 结构时进行初始化。对应的 C ++ 函数是 MultiClientSessionContext::TryInit,执行时会创立各种全局的资源管理器,比方:

  • LazyJobBuildAndInferCtxMgr
  • BufferMgr
  • RegstMgr
  • ActorMsgBus
  • ThreadMgr

4.2 __setattr__: 将 Module 和 Tensor 封装为 Block

Graph.__setattr__不容许将 Tensor 对象设置为属性。Tensor 只能存到 Module 中。
setattr 最重要的动作就是对_add_block 的调用,_add_block中次要是调用 get_block_cls 并保留后果。
get_block_cls 的作用是将 Module 及其所有 Tensor 属性都转为对应的 Block 对象。为什么要做这个动作呢?次要是动态图编译须要借助 Block 类型的一些性能,这些性能不适宜间接加诸于 Module 和 Tensor。
这个转换是在 ModuleBlock 结构时调用 set_origin 实现的。对于子 Module,会递归调用 get_block_cls 函数,这样所有子 Module 及其 Tensor 属性都会被转换为对应的 Block 对象。
所以,上述示例代码中,GraphMyLinear 理论存储的是 ModuleBlock,Graph.build执行时获取的 model 属性也是 ModuleBlock 对象,ModuleBlock.origin才是 ModuleMyLinear。

4.3 针对不同工作,定义不同的计算图

依据 Oneflow Model Zoo 的模型示例,train/eval/infer 等阶段能够创立不同的 Graph 子类。在这些不同阶段,Graph 构造函数的行为、build 函数的输入输出都有各自特点。理解这些,看后续代码时会更容易了解各个参数的具体含意。

  • 构造函数

    • train 阶段,构造函数会引入 Module、损失函数、优化器和 data_loader
    • eval 阶段,只须要引入 Module 和 data_loader
    • infer 阶段,只须要引入 Module
  • build 函数

    • train

      • 导入样本和 label
      • 调用 Module 失去前向计算结果
      • 计算损失
      • 计算梯度
      • 返回 loss
    • eval

      • 导入样本和 label
      • 调用 Module 失去预估后果
      • 返回预估后果和 label
    • infer

      • 传入的参数包含样本
      • 调用 Module 失去预估后果
      • 返回预估后果

4.4 小结

上述几个类型的关系如下:

上面形容了 GraphMyLinear 的结构流程

* `__init__`
  * `Graph.__init__`
  * self.model = linear_model
    * `Graph.__setattr__`
      * _add_block
        * get_block_cls: 递归地把 Module 转为 ModuleBlock
        * `ModuleBlock.__init__`
          * ModuleBlock.set_origin
            * `ModuleBlock._origin = origin` (Module)
            * 对 origin 的 sub modules, parameters, buffers 递归调用 get_block_cls
            * `ModuleBlock.__setattr__`

5 逻辑图的编译

计算机语言的编译,是将高级语言的语句编译为汇编或机器指令。深度学习框架对计算工作的编译,是将用户的特定语句操作转换为 DAG 图。oneflow 中用 Job 形容逻辑的计算图。
不同于 eager 模式的动态图,动态图在开始执行前能够失去整个计算工作的所有信息,能够对 DAG 进行多轮优化。每轮优化都是输出一个 Job、失去一个新 Job。
最初,依据分布式环境配置,将逻辑图 Job 转换为物理执行的计算图 Plan。在物理图中,一个 op 可能散布在多个节点 / 过程。

启动 DAG 计算须要调用Graph.__call__,这个函数的执行次要分以下几个步骤:

  • __call__

    • _compile if not _is_compiled

      • build_graph

        • __build_graph
      • finish_complie_and_init_runtime
    • __run

逻辑图编译次要在 __build_graph 中进行。finish_complie_and_init_runtime会持续做一些优化 pass,而后构建物理图、初始化运行时 Actor 零碎。__run会启动一次 DAG 的运算。

5.1 graph_build_context: 为逻辑图编译设置根本环境

__build_graph中的 graph_build_context 尽管只有一行代码,但却做了几件十分重要的事件。

首先在 context 作用域内设置全局的 lazy_mode 为 True。在这个 context 作用域内,所有 op 都由 LazyInterpreter 解释执行。

其次,在 JobBuildAndInferCtx 作用域内,JobBuildAndInferCtx_Open 调用相似如下 C ++ 代码

// oneflow/api/python/job_build/job_build_and_infer.h
// oneflow/core/job/job_build_and_infer_ctx_mgr.cpp

// 如前所述,LazyJobBuildAndInferCtxMgr 在 MultiClientSessionContext::TryInit 执行时初始化。// LazyJobBuildAndInferCtxMgr mgr;
mgr.OpenJobBuildAndInferCtx(job_name);

OpenJobBuildAndInferCtx 会新建一个 Job 对象、一个 LazyJobBuildAndInferCtx 对象。LazyJobBuildAndInferCtx 负责依据用户定制的 op 等操作,批改 Job。

5.2 __build_io:为计算图增加 input 和 output Op

self.__build_io("input", graph_build_util.build_graph_input_arg, *args, **kwargs)

下面这行代码的作用是,对于用户传递给 graph_mylinear(input) 的 input 参数,针对其中的每个 tensor 都在逻辑计算图中插入一个 FeedInputOp 节点。也就是说,model 的输出(比方样本 tensor,具体参考 4.3 节),在动态图中也视为一个 op 操作。

__build_io内会用 args(即 input)和 kwargs 结构一个 ArgsTree。示例代码中 kwargs 是空的。
而后遍历 ArgsTree,对 args 和 kwargs 的每个 tensor 都调用传入的 build_func,对于 input 来说,就是 build_graph_input_arg。前面会看到,model 的 output 也会调用__build_io,所以这个函数名的意思应该就是对 model 的输出、输入进行动态图的构图工作。

build_graph_input_arg 外部会结构一个 FeedInputOpExpr,提交给解释器执行。因为是在 lazy 作用域内,由 LazyInterpreter 解释执行,LazyInterpreter 会将对应的 op 插入动态图。

附:build input 时 ArgsTree 的内部结构

__build_io(input) 中 ArgsTree 的外部数据组织示意

  • _named_io_args: NamedArg

    • _value: tuple

      • [0]: NamedArg

        • _value: tuple of NamedArg

          • [0]: NamedArg

            • _value: args tensor from Graph.__call__
      • [1]: NamedArg

        • _value: empty kwargs from Graph.__call__

通过 pdb 命令能够查看变量: p args_tree._named_io_args._value[0]._value[0]._value.to_numpy()

5.2.1 将 op 增加到逻辑图

LazyInterpreter::ApplyImpl 在执行时,GetCurInferCtx()返回的就是 graph_build_context 中 OpenJobBuildAndInferCtx 创立的那个 LazyJobBuildAndInferCtx 对象,这个对象负责逻辑图的构建。增加 op 的次要调用流程如下:

  • infer_ctx->AddAndInferConsistentOp
  • AddAndInferOp
  • ConstructOp
  • CheckAndConstructOp
  • NewObj

OperatorConf 中,多种 op 配置共享 op_type 字段,protobuf oneof 的 op_type_case 常量作为注册 NewObj 的 key。
零碎预约义的 op 在 oneflow/core/operator 下,例如 UserOp。

AddAndInferOp 将返回的 Operator 保留到 LazyJobBuildAndInferCtx 的字典中。后续的函数调用,次要是进行推导并批改动态图 Job,使得各个节点形成一个 DAG。

JobBuildAndInferCtx 相干的类关系如下:

5.2.2 lazy tensor 和 eager tensor 的区别

LazyInterpreter::ApplyImpl 的最初,会调用 BuildTensor 结构一个 lazy tensor,作为 build_graph_input_arg 的返回值。所以 __build_io 返回的 lazy_args 是 lazy tensor,它将代替 eager 的 args(也就是用户输出的 input)参加后续的计算图构建。

那么 lazy tensor 和 eager tensor 的区别是什么呢?我的了解是,eager tensor 是要即时计算的,所以须要携带数据;而 lazy tensor 仅在动态图编译阶段用于推导,只须要形容性质的元信息,不须要相似样本那样的数据。同时,动态图编译是在 lazy 模式下运行,应用 lazy tensor 在各种查看校验时应该会更顺畅(?)。
前面会看到,动态图的运行期曾经没有 tensor 的概念。运行期看到的只是更狭义的 Regst 存储,可能代表 tensor/blob,也可能是其它管制信息。动态图运行时的输出,应该是通过 op 间接读取 tensor 的 blob(或者复用地址?)到 regst;输入应该是 op 写到 regst,通过 blob 结构 eager tensor。

5.3 build: 将 UserOp 和 FeedVariableOp 增加到逻辑图

__build_graph中的 self.build()会调用GraphMyLinear.build(),以及ModuleMyLinear.forward()。因为是在 lazy 模式下运行,matmul 和 add 都会调用 UserOpExpr 重载版本的 LazyInterpreter::ApplyImpl,进而调用 AddAndInferConsistentOp 进行构图操作。

须要阐明的是,在援用 Module 的 Parameter 属性时(如 weight/bias),会触发 FeedVariableOp 的构图操作、调用对应版本的 LazyInterpreter::ApplyImpl。这个是怎么执行的呢?

__build_graph中,在进入 lazy 模式之前,先调用了_create_states_builder。其中 self._state()返回所有 Module 的所有 Parameter(包含子 Module)。

state_block 的类型是 TensorBlock。所有的 state_block 的 lazy_origin_builder().method 都被设置为调用 build_graph_state。

给 build_graph_state 设置个断点能让整个调用过程显形,次要的调用栈如下:

-> out = graph_mylinear(input)
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(221)__call__()
-> self._compile(*args, **kwargs)
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(741)_compile()
-> _, eager_outputs = self.build_graph(*args, **kwargs)
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(759)build_graph()
-> outputs = self.__build_graph(*args, **kwargs)
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(864)__build_graph()
-> outputs = self.build(*lazy_args, **lazy_kwargs)
  /mnt/project/machine-learning/oneflow/oneflow/test.py(21)build()
-> return self.model(input)
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(234)__call__()
-> result = self.__block_forward(*args, **kwargs)
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(266)__block_forward()
-> result = self._origin.__class__.forward(self, *args, **kwargs)
  /mnt/project/machine-learning/oneflow/oneflow/test.py(11)forward()
-> return flow.matmul(input, self.weight) + self.bias
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(483)__getattr__()
-> p_state = self._get_from_states(name, "_parameters")
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(521)_get_from_states()
-> _s_block.try_build()
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(679)try_build()
-> self._lazy_origin_builder.try_build(self)
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(627)try_build()
-> self.result = self.method()
> /usr/local/lib64/python3.6/site-packages/oneflow/framework/graph_build_util.py(227)build_graph_state()
-> op_name, var_conf_str, ["in_0"], ["out_0"]

这个调用过程比拟容易困扰的是,执行对象会在 Grpah、GraphMyLinear、ModuleMyLinear、ModuleBlock 之间切换。
后面在探讨 Graph 的结构时曾经提过,执行 self.model(input) 时,Graph.__getattr__返回的属性 model 是 ModuleBlock 对象,所以理论调用的是 ModuleBlock.__call__。在这个函数内调用__block_forward,其中的_origin 是 ModuleMyLinear,进入到它的forward 办法,执行到 flow.matmul(input, self.weight) + self.bias 时,会触发调用ModuleBlock.__getattr__,进而调用_get_from_states,调用 TensorBlock.try_build()。这里执行的就是进入 lazy 模式之前设置的 build_graph_state。从而减少一个 FeedVariableOp 到计算图。

为什么设置和调用会间隔这么远呢?可能是因为,如果在援用 weights 等 Parameter 时再 try_build,会不不便解决多个 block 共享一个 tensor 的状况。
再前面的步骤就是调用__build_io 插入 FetchOutputOp。也就是说,获取 model 的 output 也是一个 op。

到目前为止,前向计算图就构建实现了。它的 json 示意能够参考附录。net.op 是计算图的节点,通过 input 等属性能够看出节点之间的连贯关系。
示例代码的前向计算图如下。从这个图能够看到,input、output、weights 等都是 op。

5.4 逻辑图优化

__build_graph 中会调用 CurJobBuildAndInferCtx_Complete 对动态图进行多轮优化,对应的 C ++ 函数是 LazyJobBuildAndInferCtx::Complete()。

这之后生成的 Job 是 full_job。本文的示例代码比较简单,并不是典型的计算场景,其 forwar 和 ful 计算图 l 的拓扑是一样的。
到这里,逻辑图构建的主体局部就完结了。

随后会构建一个 CNNGraph 对象,对应的 C ++ 类型是 NNGraph。这个对象将负责构建物理计算图 Plan。它也是整个运行时的拥有者和维护者。这个对象析构时,整个运行时也会有序终止并开释资源。

5.5 物理图的编译

接下来就是执行 finish_complie_and_init_runtime,其中的外围调用是 self._c_nn_graph.complie_and_init_runtime(),对应的 C ++ 函数是 NNGraph::CompileAndInitRuntime。
在这个函数中,JobCompleter().Complete()会持续对逻辑图做几轮批改优化,Compiler().Compile()将逻辑图转为物理图,并持续对 Plan 进行批改优化。

Plan 的编译是在 master 节点进行的。master 节点会将 Plan 通过 gRPC 推送给各个 worker 节点,worker 节点从 master 拉取物理计算图。

之后调用 NewRuntimeBuffers 创立 Buffer 对象,Buffer 应该是次要用于过程内的信息同步。

而后就筹备初始化运行时了。

示例代码生成的 compiled_job 和物理图 Plan 的 json 参见附录。
最终生成的 compiled 逻辑图如下。框架主动插入了很多系统控制节点。

5.6 Plan 的构造

示例代码输入的 Plan json 数据见附录。

Plan 在逻辑上和 compiled_job 是等价的。这里次要关注 task/op 之间的关系。
Plan.task 中的每个元素是一个 task,其中的 exec_sequence.exec_node 对应 job 中的 op,通常只有一个 op(数组能够反对 sub graph)。
exec_node.kernel_conf.op_attribute 形容了 op 信息。其中 op_conf 蕴含 op name 信息。
kernel_conf.op_attribute.op_conf 就是 Job 中的 OperatorConf。

kernel_conf.op_attribute.arg_signature.bn_in_op2lbi 体现了 task/op 之间的连贯关系。
bn_in_op 就是 blob name in op,即 op 输出的 blob name。

以 System-AutoTick-DstSubsetTick_21 为例

{
  "out": {
    "op_name": "System-AutoTick-DstSubsetTick_21",
    "blob_name": "out"
  },
  "in_0": {
    "op_name": "System-EagerCriticalSection-Interface-End-Tick-19",
    "blob_name": "out"
  },
  "in_1": {
    "op_name": "System-AutoTick-SrcSubsetTick_20",
    "blob_name": "out"
  }
}

exec_node.bn_in_op2regst_desc_id 在 task 层面体现了连贯关系。这个 map 中的 key 示意输入输出,value 是 register id。

{
  "out": "29",
  "in_0": "27",
  "in_1": "28"
}

task.produced_regst_desc 形容了对应 task 生产的 register,consumer_task_id 是消费者,
produced_regst_desc.out.regst_desc_type.data_regst_desc.lbi2blob_desc.lbi 就是这个 register 的 logic blob id。
task.consumed_regst_desc_id 形容了对应 task 生产的 register 信息

6 运行时的初始化

NNGraph::CompileAndInitRuntime 中,new Runtime 这行代码会初始化运行时。次要做的事件包含:

  • 创立 Thread
  • 告诉 Thread 创立 Actor,Actor 会创立 Regst 和 Kernel
  • 给没有输出的 source_tasks 发送启动信号 kStart

6.1 Runtime 创立 Thread

在 Runtime 的构造函数中,DumpThreadIdsFromPlan 会将 Plan 中属于以后过程的 task 的 thread id 存入 thread_ids_ 变量。AddThreads 创立这些 Thread 对象。
Thread 在结构时会创立一个物理线程,线程执行的是 PollMsgChannel 办法,Thread 就是在这里继续期待须要解决的新音讯。
Thread 只解决两类命令音讯:线程终止音讯,创立 Actor 的音讯。其它音讯交给 Actor::ProcessMsg 解决。

6.2 Runtime 告诉 Thread 创立 Actor

在 Runtime 的构造函数中,tasks 被分为两类:source_tasks 和 other_tasks。在示例代码中,source_tasks 是没有输出边的 task。
从代码逻辑看,在 Plan proto 中,task 的 consumed_regst_desc_id 字段是一个 map。如果这个 map 的所有 key 都是 in_ctrl,这个 task 就是 source_tasks。

示例代码的 source_tasks 列表如下:

  • System-Src-WaitAndSendIds_16
  • System-AutoTick-AppendDeviceTick_9
  • System-EagerCriticalSection-Interface-End-Tick-19
  • System-EagerCriticalSection-Interface-End-Tick-25

Runtime 调用 HandoutTasks 函数会给 ActorMsgBus 发送构建 Actor 的 kConstructActor 音讯。

6.3 ActorMsgBus 和 Thread 的音讯解决

从接口看,ActorMsgBus 负责音讯的发送(Actor 通过 ActorMsgBus 发送音讯),Thread 负责音讯的接管(一般音讯转给 Actor 解决)。

相干实体的协作关系如下

  • Actor 是调度的根本单元。

    • actor_id 就是 task_id,是在编译 Plan 时就确定的。task 是编译器概念,actor 是对等的运行时概念。
    • task_id 有特定的编码格局,从中能够解析出 machine_id 和 thread_id。
    • 在跨网络的整个物理图 Plan 中,actor id 相当于地址,通过它能够定位惟一的 actor 实体。
  • Actor 之间通过 ActorMsgBus::SendMsg 进行音讯通信。

    • ActorMsg 蕴含源和目标 actor id。
    • 如果是过程内通信,ActorMsgBus 将音讯发给 Thread,Thread 转给 Actor 解决音讯。
    • 如果是跨过程音讯,ActorMsgBus 通过 CommNet 发送音讯,接管方的 CommNet 应该会依据 actor id 取得线程 id,从 ThreadMgr 查到 Thread,将音讯交给 Thread 解决。
  • Thread 通过 EnqueueActorMsg 接管发给本人治理的 Actor 的音讯。

    • 如果是本线程内的 actor 之间的音讯,间接将音讯放到 local 队列。否则放到 Channel 的异步音讯队列。
    • Thread::PollMsgChannel 会在结构 Thread 时启动的物理线程中接管 Channel 的音讯,一般音讯转给 Actor 解决。
  • Actor 收到音讯后,负责 LaunchKernel 执行计算,计算完结后通过 ActorMsgBus 告诉上下游 Actor。

这些对象之间的消息传递关系如下图所示

6.4 Runtime 激活 Actor 零碎

到这里,Actor 之间的协作关系根本分明了。然而整个 Actor 零碎还处于静止待命的状态。Actor 是音讯驱动的,总要有音讯触发能力让它们转起来。

Runtime 在构造函数中,给没有输出依赖的 source_tasks 发送 kStart 音讯,让 Actor 零碎处于可运行状态。

从 DAG 的角度看,只有激活 source_tasks 就行,这些节点给上游发送音讯,天然会触发所有 Actor 的执行。

但这个 kStart 音讯并没有启动计算图的一轮计算。因为这是在 Runtime 的构造函数中,还处于运行时初始化阶段。发送 kStart 更像是让 Actor 零碎处于激活状态。咱们在探讨完 Actor 之后,再看看计算图的每一轮计算是怎么触发的。

7 Actor

7.1 Actor 的创立

Thread 在创立 Actor 时,会先尝试创立为 LightActor,如果不胜利,再尝试用事后注册的工厂创立 Actor。

有几种 TaskType 能够用于 LightActor:

  • kNormalForward,比方 matmul、add 等 user op。
  • kCopyHd
  • kTick
  • kCollectiveBoxingGeneric

目前大概有 20 多种 Actor 的子类型。其它 Actor 类型依据 TaskType 事后注册。例如 WaitAndSendIdsActor。

示例代码的各个节点对应的 actor 类型参见附录。

Actor 相干的类关系如下(蕴含关系只是示意能够拜访到相干信息,并不意味着创立或着领有该类型对象)

7.2 Actor 的初始化

Actor 的构造函数个别都是空的,构建之后须要执行 Init 函数进行初始化。

LightActor 继承自 ActorBase,不是 Actor 的子类,有本人的 Init 函数实现。这里只探讨 Actor 的初始化。

在 Actor::Init 中,首先调用 ConstructKernel 创立 kernel 实例。和 Operator 相似,kernel 也是以 OpTypeCase 作为注册的 key,例如 WaitAndSendIdsKernel。一个 Actor 通常只有一个 kernel。

之后调用 NewRegsts 创立 Regst。Tensor 是用户侧的概念。对应的运行时概念是 Regst,它持有 Kernel 须要读写的内存。Regst 的概念比 Tensor 更宽泛,比方框架主动增加的管制 Op 也会用到 Regst。
Actor 将本人创立的 Regst 保留到 produced_regsts_。

TakeOverNaiveConsumed 只记录须要生产的 regst id,但并不 push 到consumed_regsts_

TakeOverNaiveProduced 既记录生产的 regst id,也 push 到 naive_produced_rs_。这种区别是为了首次执行计算时,actor 能顺利执行。前面剖析 Actor 的音讯解决时会再回过头来讨论一下。

调用 InitBnInOp2BlobInfo 会初始化 BlobInfo。
之后就是调用 VirtualActorInit,这里容许各个 Actor 子类定制本人的初始化逻辑。通常会调用 OF_SET_MSG_HANDLER 宏设置 Actor 的音讯处理函数。

7.3 Actor 的音讯解决

咱们以 WaitAndSendIds 为例,察看一下 Actor 的音讯解决机制。其 Actor 是 WaitAndSendIdsActor,kernel 是 WaitAndSendIdsKernel。

之所以抉择这个例子,一是这个 Actor 比较简单;二是这是一个典型的 source task,想看一下计算图是怎么被触发启动计算的。

Thread 收到的音讯如果不是 kStopThread 或 kConstructActor,就调用 Actor::ProcessMsg,将音讯转给 Actor 解决。
ProcessMsg 函数只是简略的将音讯转给 handler 解决。

WaitAndSendIdsActor::VirtualActorInit 中,handler 被设置为 HandlerWaitToStart。
Runtime 的构造函数中,发送的第一批音讯是给 source_tasks 的 kStart 音讯,这个音讯就由 HandlerWaitToStart 函数解决。

HandlerWaitToStart 校验音讯类型后,将 handler 设置为 HandlerNormal(这也是大部分 Actor 的默认 handler),而后调用 ProcessMsg,理论就是调用新设置的 handler HandlerNormal。

HandlerNormal 中,如果是 kCmdMsg,只容许是 kStart。通过音讯类型校验后,会间接调用 ActUntilFail。

7.4 Act 执行的条件

ActUntilFail 中,Act 办法是各个子类本人实现的,个别次要是启动 kernel 计算。
然而在执行 Act 之前,须要先确认:

  • Act 执行依赖的数据是否都曾经就绪?(IsReadReady)
  • Act 生产进去的数据,生产方是否曾经用完、并收到 ack 音讯确认?(IsWriteReady)

Actor 有 4 个与此相关的成员变量

  • RegstSlot naive_produced_rs_;
  • RegstSlot inplace_produced_rs_;
  • RegstSlot naive_consumed_rs_;
  • RegstSlot inplace_consumed_rs_;

xx_produced_rs_存储的是以后 Actor 的上游 consumer 返回的、曾经应用结束的 ack regst 信息。(以后 Actor 生产的 Regst 存储在 produced_regsts_ 中。)

运行时在初始化的过程中,所有 Actor 都没有运行过,任何 Actor 都不可能收到 ack 音讯,所以在 Actor 初始化时,要事后填充xx_produced_rs_,这样能力保障 Actor 在首次运行前是 WriteReady 的,能力顺利启动执行。

xx_consumed_rs_存储的是上游依赖发来的数据。它不须要事后填充。因为 source_tasks 没有输出依赖,天然就是 ReadReady 的;而 xx_produced_rs_ 在初始化时的事后填充又保障它是 WriteReady 的,所以 source_tasks 能够间接运行。source_tasks 的输入音讯发给上游,上游也会变为 ReadReady,而上游在初始化后也保障是 WriteReady 的。整个 Actor 零碎就能够这样运行起来了。

7.5 Actor 上下游之间的告诉机制

Act 执行结束后,须要将后果数据发给上游 consumer。以 WaitAndSendIds 的 Naive Produced 为例,ActUntilFail 中的调用流程如下:

  • AsyncSendNaiveProducedRegstMsgToConsumer

    • VirtualAsyncSendNaiveProducedRegstMsgToConsumer

      • HandleProducedNaiveDataRegstToConsumer

        • HandleRegstToConsumer

          • EnqueueAsyncMsg

            • 如果指标线程是以后线程,ActorMsgBus::SendMsg
            • 否则,将音讯退出 async_msg_queue_
          • 减少 total_reading_cnt_(这个变量示意曾经发消息给上游、但未收到的 ack 数量)
        • naive_produced_rs_.PopFrontRegsts
    • AsyncSendProducedCtrlRegstMsgToConsumer

留神 naive_produced_rs_.PopFrontRegsts 会将 Regst 指针从队列中删掉,相应的计数减 1。
而在 Actor::HandlerNormal 中解决收到的 kRegstMsg 音讯时,如果是 consumer 发来的 ack 音讯,会调用 TryUpdtStateAsProducedRegst,将 Regst 再增加到 naive_produced_rs_ 中,以保障以后 Actor 在收到所有 ack 后是 WriteReady 的;同时递加 total_reading_cnt_。

Actor 对依赖的上游音讯的解决是相似的。通过以下函数调用给上游发送 ack 音讯、告诉数据曾经用完,能够持续更新了:

  • AsyncSendNaiveConsumedRegstMsgToProducer
  • AsyncRetInplaceConsumedRegstIfNoConsumer
    在 Actor::HandlerNormal 中收到 kRegstMsg 音讯后,将音讯增加到 consumed_rs_,以保障以后 Actor 在收到所有依赖数据后是 ReadReady 的。

LightActor 有本人的音讯解决机制,大抵原理应该是差不多的。

7.6 Act 执行的动作

根据上述探讨,Actor 收到 kRegstMsg 后也会进入 ActUntilFail 执行。如果读写都是 Ready,就执行 Act。以 WaitAndSendIdsActor 为例,次要调用链路如下:

  • AsyncLaunchKernel
  • ek.kernel->Launch,启动 Kernel 计算
  • Forward
  • ForwardDataContent
  • buffer->Pull
  • 给 regst 的存储地址 mut_dptr 赋值

buffer->Pull 会期待条件变量的告诉。当初,看上去所有 Actor 都已准备就绪,只等发令枪一响就开跑了。

8 启动动态图的计算

Graph.__run 会扣动发令枪的板机,启动计算图的一轮计算。
次要调用流程如下:

  • RunLazyNNGraph
  • builder->LaunchLazyJob
  • LaunchLazyJobInstructionType
  • Buffer::Push

这里的 Buffer::Push 就是 WaitAndSendIdsKernel 在期待的起跑信号。

9 运行时的退出机制

整个运行时蕴含很多对象和资源,平安有序的退出是庞杂而又粗疏的工作。这里仅以 WaitAndSendIds 为例,从一个侧面察看一下运行时的退出机制。

运行时的退出始于 NNGraph 对象的析构。

9.1 Actor 的退出

  • NNGraph 在析构时,会敞开所有的 Buffer 对象。
  • Buffer 在敞开时,会设置 is_closed_ = true 并告诉所有监听者。然而 Pull 会持续解决完曾经提交的计算。

    • 所以,Buffer 应该是次要用于过程内的通信和异步协调的一个类。
  • WaitAndSendIdsKernel 这时候正在期待新一轮计算开始,后果收到 Pull 返回的 kBufferStatusErrorClosed。
  • WaitAndSendIdsActor::IsCustomizedReadReady 当前就始终返回 false,IsReadReady 也返回 false。

    • 这之后,ActUntilFail 只会执行异步音讯发送(不再进入 while 循环)
  • WaitAndSendIdsActor::HandlerNormal 依然会解决其它 Actor 发来的音讯。但因为 IsCustomizedReadReady 返回 false,会进入 AsyncSendEORDMsgForAllProducedRegstDesc 执行。它会给每个上游发送 kEordMsg 音讯。
  • Actor 在收到上游发来的 kEordMsg 音讯后,递加 remaining_eord_cnt_。

    • remaining_eord_cnt_被初始化为 Actor 的输出 regst 的数量。
  • total_reading_cnt_是以后 Actor 生产的、曾经发给 consumer、但尚未收到 ack 的音讯数量。

    • Actor 目前仍能够失常接管 consumer 发来的 ack 音讯。
  • 当上述 2 个变量都为 0 时,意味着所有上游都收回了 kEordMsg 音讯,也收到了所有上游的 ack 音讯。Actor 就给 Thread 返回 1。

    • 如果上述两个变量有不为 0 的,就批改 handler,由 HandlerZombie 解决后续收到的音讯。
  • Thread 收到 Actor 返回的 1 后,将它从本人的存储中删除,并递加运行 Actor 的数量。

9.2 Thread 的退出

  • NNGraph 重置 runtime_导致运行时对象被析构。
  • Runtime 删除所有 Thread。
  • ThreadMgr 给所有 Thread 发送 kStopThread 音讯。同时,重置指针导致 Thread 析构。
  • Thread 的物理线程退出 PollMsgChannel 循环。
  • Thread 期待物理线程完结,敞开 channel。

10 分布式场景的动态图

分布式的 compile_job、物理图 Plan 和单机场景有显著变动。
比方,每个过程都有一套 WaitAndSendIds 等管制节点。这也容易了解,因为每个节点都要执行 __runBuffer::Push/Pull,都要启动本过程的 Actors 执行计算。
matmul 和 broadcast_add 等 user op 也会在两个节点进行计算。

10.1 示例代码

启动形式参考 Global Tensor 的官网文档。

import oneflow as flow
import oneflow.nn as nn

P0 = flow.placement("cpu", ranks=[0, 1])
a0_sbp = flow.sbp.split(0)

class ModuleMyLinear(nn.Module):
    def __init__(self, in_features, out_features):
        super().__init__()
        self.weight = nn.Parameter(flow.randn(in_features, out_features,
            placement=P0, sbp=flow.sbp.broadcast))
        self.bias = nn.Parameter(flow.randn(1, out_features,
            placement=P0, sbp=flow.sbp.broadcast))

    def forward(self, input):
        return flow.matmul(input, self.weight) + self.bias

linear_model = ModuleMyLinear(4, 3)

class GraphMyLinear(nn.Graph):
    def __init__(self):
        super().__init__()
        # ModuleBlock
        self.model = linear_model

    def build(self, input):
        # ModuleBlock.__call__
        return self.model(input)

graph_mylinear = GraphMyLinear()
input = flow.randn(5, 4, placement=P0, sbp=flow.sbp.split(1))
out = graph_mylinear(input)
print(out)

11 附录

11.1 断点

11.1.1 Python 断点示例

# python3 -m pdb test.py
break test.py:25
break oneflow/nn/graph/graph.py:221
break oneflow/nn/graph/graph.py:741
break oneflow/nn/graph/graph.py:745
break oneflow/nn/graph/graph.py:759
break oneflow/nn/graph/graph.py:828
break oneflow/nn/graph/graph.py:777
break oneflow/nn/graph/graph.py:1066
break oneflow/nn/graph/graph.py:1133
break oneflow/framework/graph_build_util.py:227

11.1.2 C++ 断点示例

启动命令

source /mnt/oneflow/build/source.sh
gdb --args python3 /mnt/oneflow/test.py
# set breakpoints
# run

断点示例

set breakpoint pending on
break oneflow::ActorMsg::BuildEordMsg
break oneflow/core/common/buffer.h:80
break oneflow::(anonymous namespace)::CheckAndConstructOp
break oneflow::WaitAndSendIdsActor::Act
break oneflow::WaitAndSendIdsActor::HandlerWaitToStart
break oneflow/core/lazy/actor/light_actor.cpp:452
break oneflow/core/lazy/actor/light_actor.cpp:485
break oneflow::ForeignInputKernel::ForwardDataContent
break oneflow::vm::LaunchLazyJobInstructionType::Compute

11.2 动态图的 json 示意

  • forward
  • full
  • compiled
  • plan

11.3 actor type

naive_actor

System-AutoTick-AppendDeviceTick_9
System-AutoTick-DstSubsetTick_12
System-AutoTick-DstSubsetTick_21
System-AutoTick-DstSubsetTick_27
System-AutoTick-Prepend-DeviceTick_7
System-AutoTick-SrcSubsetTick_20
System-AutoTick-SrcSubsetTick_26
System-AutoTick-SrcSubsetTick_8
System-AutoTick-Tick_11
System-AutoTick-Tick_13
System-EagerCriticalSection-Callback-23
System-EagerCriticalSection-Callback-29
System-EagerCriticalSection-Interface-Begin-Tick-18
System-EagerCriticalSection-Interface-Begin-Tick-24
System-EagerCriticalSection-Interface-End-Tick-19
System-EagerCriticalSection-Interface-End-Tick-25
System-EagerCriticalSection-Wait-22
System-EagerCriticalSection-Wait-28

light_actor

_GraphMyLinear_0_input.0.0_2
_GraphMyLinear_0_output.0.0_2
model.bias
model-broadcast_add-1
model-matmul-0
model.weight
System-AutoTick-SinkTick_15
System-SyncAllRanksSinkTick_14

wait_and_send_ids_actor

System-Src-WaitAndSendIds_16

call_back_notify_actor

System-Sink-CallbackNotify_17

12 参考资料

  • oneflow v0.8.0
  • OneFlow 框架的零碎设计(上篇)
  • OneFlow 框架的零碎设计(中篇)
  • OneFlow 框架的零碎设计(下篇)
  • 一个 Job 在 OneFlow 中的执行过程—上篇
  • 一个 Job 在 OneFlow 中的执行过程—中篇
  • 一个 Job 在 OneFlow 中的执行过程—下篇
  • 动态图模块 nn.Graph
  • OneFlow 零碎设计
  • torch.nn.Module

正文完
 0