oneflow动态图的训练效率远高于动态图(eager模式)。本文试图通过一个简略例子,联合0.8.0的代码,看一下动态图和运行时的实现机制。
在开始之前,倡议先读一下参考资料中《OneFlow框架的零碎设计》等系列文章。对动态图、运行时的基本概念和设计理念有根本的理解,会更容易了解代码。

1 代码示例

上面的示例代码来自官网文档,是一个线性模型的前向计算。后续次要基于这段代码进行剖析。

import oneflow as flowimport oneflow.nn as nnclass ModuleMyLinear(nn.Module):    def __init__(self, in_features, out_features):        super().__init__()        self.weight = nn.Parameter(flow.randn(in_features, out_features))        self.bias = nn.Parameter(flow.randn(out_features))    def forward(self, input):        return flow.matmul(input, self.weight) + self.biaslinear_model = ModuleMyLinear(4, 3)class GraphMyLinear(nn.Graph):    def __init__(self):        super().__init__()        # ModuleBlock        self.model = linear_model    def build(self, input):        # ModuleBlock.__call__        return self.model(input)graph_mylinear = GraphMyLinear()input = flow.randn(1, 4)out = graph_mylinear(input)print(out)

2 oneflow包的初始化

import oneflow在初始化包时,与动态图相干的次要操作如下:

  • GetEnv

    • EnvGlobalObjectsScope::Init

      • 启动各个节点的管制面网络连接
      • 初始化VM
      • 启动各个节点的数据面网络连接
      • 初始化KernelObserver
  • NewDefaultSession

    • RegsiterSession 创立 Session,并注册为 default session
    • 创立 Python MultiClientSession 并保留到dict,但并不 TryInit

      • 创立 C++ MultiClientSessionContext 但并不 TryInit

EnvGlobalObjectsScope::Init中先创立一个全局的ProcessCtx对象。而后依据环境变量等配置,在各个过程间创立gRPC和CommNet的连贯,别离负责管制面和数据面的数据传输。其中在Bootstrap过程中会初始化全局的ProcessCtx,给每个过程调配一个全局惟一的rank编号(machine_id)。
本文不波及网络层面的操作,只探讨同一过程内各线程间的交互。

3 Module类

尽管能够间接用op和tensor结构模型,然而op的粒度太细了,间接用op结构模型会比拟繁琐。
Module是由op和tensor形成的、可复用的子模块。利用Module能够更高效、更快捷的构建简单模型。oneflow.nn模块导出了很多预约义的Module。

Module定义了本人的属性设置逻辑,外围逻辑是

  • 如果value是Parameter类型,就保留到Module._parameters
  • 如果value是Module类型,就保留到Module._modules
  • 如果value是Tensor类型,就保留到Module._buffers
  • 否则按惯例属性解决

Module能够蕴含子Module,造成树结构。因为Module通过setattr将子Module和Parameter都保留到字典构造中,能够不便的遍历所有Module及其参数tensor。

4 Graph类

4.1 构造函数

Graph的构造函数中GetDefaultSession失去的session,就是导入oneflow包时NewDefaultSession构建的session。过后没有初始化,而是在Graph结构时进行初始化。对应的C++函数是MultiClientSessionContext::TryInit,执行时会创立各种全局的资源管理器,比方:

  • LazyJobBuildAndInferCtxMgr
  • BufferMgr
  • RegstMgr
  • ActorMsgBus
  • ThreadMgr

4.2 __setattr__: 将Module和Tensor封装为Block

Graph.__setattr__不容许将Tensor对象设置为属性。Tensor只能存到Module中。
setattr最重要的动作就是对_add_block的调用,_add_block中次要是调用get_block_cls并保留后果。
get_block_cls的作用是将Module及其所有Tensor属性都转为对应的Block对象。为什么要做这个动作呢?次要是动态图编译须要借助Block类型的一些性能,这些性能不适宜间接加诸于Module和Tensor。
这个转换是在ModuleBlock结构时调用set_origin实现的。对于子Module,会递归调用get_block_cls函数,这样所有子Module及其Tensor属性都会被转换为对应的Block对象。
所以,上述示例代码中,GraphMyLinear理论存储的是ModuleBlock,Graph.build执行时获取的model属性也是ModuleBlock对象,ModuleBlock.origin才是ModuleMyLinear。

4.3 针对不同工作,定义不同的计算图

依据Oneflow Model Zoo的模型示例,train/eval/infer等阶段能够创立不同的Graph子类。在这些不同阶段,Graph构造函数的行为、build函数的输入输出都有各自特点。理解这些,看后续代码时会更容易了解各个参数的具体含意。

  • 构造函数

    • train阶段,构造函数会引入Module、损失函数、优化器和data_loader
    • eval阶段,只须要引入Module和data_loader
    • infer阶段,只须要引入Module
  • build函数

    • train

      • 导入样本和label
      • 调用Module失去前向计算结果
      • 计算损失
      • 计算梯度
      • 返回loss
    • eval

      • 导入样本和label
      • 调用Module失去预估后果
      • 返回预估后果和label
    • infer

      • 传入的参数包含样本
      • 调用Module失去预估后果
      • 返回预估后果

4.4 小结

上述几个类型的关系如下:

上面形容了GraphMyLinear的结构流程

* `__init__`  * `Graph.__init__`  * self.model = linear_model    * `Graph.__setattr__`      * _add_block        * get_block_cls: 递归地把Module转为ModuleBlock        * `ModuleBlock.__init__`          * ModuleBlock.set_origin            * `ModuleBlock._origin = origin` (Module)            * 对origin的sub modules, parameters, buffers递归调用get_block_cls            * `ModuleBlock.__setattr__`

5 逻辑图的编译

计算机语言的编译,是将高级语言的语句编译为汇编或机器指令。深度学习框架对计算工作的编译,是将用户的特定语句操作转换为DAG图。oneflow中用Job形容逻辑的计算图。
不同于eager模式的动态图,动态图在开始执行前能够失去整个计算工作的所有信息,能够对DAG进行多轮优化。每轮优化都是输出一个Job、失去一个新Job。
最初,依据分布式环境配置,将逻辑图Job转换为物理执行的计算图Plan。在物理图中,一个op可能散布在多个节点/过程。

启动DAG计算须要调用Graph.__call__,这个函数的执行次要分以下几个步骤:

  • __call__

    • _compile if not _is_compiled

      • build_graph

        • __build_graph
      • finish_complie_and_init_runtime
    • __run

逻辑图编译次要在__build_graph中进行。finish_complie_and_init_runtime会持续做一些优化pass,而后构建物理图、初始化运行时Actor零碎。__run会启动一次DAG的运算。

5.1 graph_build_context: 为逻辑图编译设置根本环境

__build_graph中的graph_build_context尽管只有一行代码,但却做了几件十分重要的事件。

首先在context作用域内设置全局的lazy_mode为True。在这个context作用域内,所有op都由LazyInterpreter解释执行。

其次,在JobBuildAndInferCtx作用域内,JobBuildAndInferCtx_Open调用相似如下C++代码

// oneflow/api/python/job_build/job_build_and_infer.h// oneflow/core/job/job_build_and_infer_ctx_mgr.cpp// 如前所述,LazyJobBuildAndInferCtxMgr 在 MultiClientSessionContext::TryInit 执行时初始化。// LazyJobBuildAndInferCtxMgr mgr;mgr.OpenJobBuildAndInferCtx(job_name);

OpenJobBuildAndInferCtx会新建一个Job对象、一个LazyJobBuildAndInferCtx对象。LazyJobBuildAndInferCtx负责依据用户定制的op等操作,批改Job。

5.2 __build_io:为计算图增加input和output Op

self.__build_io("input", graph_build_util.build_graph_input_arg, *args, **kwargs)

下面这行代码的作用是,对于用户传递给graph_mylinear(input)的input参数,针对其中的每个tensor都在逻辑计算图中插入一个FeedInputOp节点。也就是说,model的输出(比方样本tensor,具体参考4.3节),在动态图中也视为一个op操作。

__build_io内会用args(即input)和kwargs结构一个ArgsTree。示例代码中kwargs是空的。
而后遍历ArgsTree,对args和kwargs的每个tensor都调用传入的build_func,对于input来说,就是build_graph_input_arg。前面会看到,model的output也会调用__build_io,所以这个函数名的意思应该就是对model的输出、输入进行动态图的构图工作。

build_graph_input_arg外部会结构一个FeedInputOpExpr,提交给解释器执行。因为是在lazy作用域内,由LazyInterpreter解释执行,LazyInterpreter会将对应的op插入动态图。

附: build input时ArgsTree的内部结构

__build_io(input) 中 ArgsTree 的外部数据组织示意

  • _named_io_args: NamedArg

    • _value: tuple

      • [0]: NamedArg

        • _value: tuple of NamedArg

          • [0]: NamedArg

            • _value: args tensor from Graph.__call__
      • [1]: NamedArg

        • _value: empty kwargs from Graph.__call__

通过pdb命令能够查看变量: p args_tree._named_io_args._value[0]._value[0]._value.to_numpy()

5.2.1 将op增加到逻辑图

LazyInterpreter::ApplyImpl在执行时,GetCurInferCtx()返回的就是graph_build_context中OpenJobBuildAndInferCtx创立的那个LazyJobBuildAndInferCtx对象,这个对象负责逻辑图的构建。增加op的次要调用流程如下:

  • infer_ctx->AddAndInferConsistentOp
  • AddAndInferOp
  • ConstructOp
  • CheckAndConstructOp
  • NewObj

OperatorConf中,多种op配置共享op_type字段,protobuf oneof的op_type_case常量作为注册NewObj的key。
零碎预约义的op在oneflow/core/operator下,例如UserOp。

AddAndInferOp将返回的Operator保留到LazyJobBuildAndInferCtx的字典中。后续的函数调用,次要是进行推导并批改动态图Job,使得各个节点形成一个DAG。

JobBuildAndInferCtx相干的类关系如下:

5.2.2 lazy tensor 和 eager tensor 的区别

LazyInterpreter::ApplyImpl的最初,会调用BuildTensor结构一个lazy tensor,作为build_graph_input_arg的返回值。所以__build_io返回的lazy_args是lazy tensor,它将代替eager的args(也就是用户输出的input)参加后续的计算图构建。

那么lazy tensor和eager tensor的区别是什么呢?我的了解是,eager tensor是要即时计算的,所以须要携带数据;而lazy tensor仅在动态图编译阶段用于推导,只须要形容性质的元信息,不须要相似样本那样的数据。同时,动态图编译是在lazy模式下运行,应用lazy tensor在各种查看校验时应该会更顺畅(?)。
前面会看到,动态图的运行期曾经没有tensor的概念。运行期看到的只是更狭义的Regst存储,可能代表tensor/blob,也可能是其它管制信息。动态图运行时的输出,应该是通过op间接读取tensor的blob(或者复用地址?)到regst;输入应该是op写到regst,通过blob结构eager tensor。

5.3 build: 将UserOp和FeedVariableOp增加到逻辑图

__build_graph中的self.build()会调用GraphMyLinear.build(),以及ModuleMyLinear.forward()。因为是在lazy模式下运行,matmul和add都会调用UserOpExpr重载版本的LazyInterpreter::ApplyImpl,进而调用AddAndInferConsistentOp进行构图操作。

须要阐明的是,在援用Module的Parameter属性时(如weight/bias),会触发FeedVariableOp的构图操作、调用对应版本的LazyInterpreter::ApplyImpl。这个是怎么执行的呢?

__build_graph中,在进入lazy模式之前,先调用了_create_states_builder。其中self._state()返回所有Module的所有Parameter(包含子Module)。

state_block的类型是TensorBlock。所有的state_block的lazy_origin_builder().method都被设置为调用build_graph_state。

给build_graph_state设置个断点能让整个调用过程显形,次要的调用栈如下:

-> out = graph_mylinear(input)  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(221)__call__()-> self._compile(*args, **kwargs)  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(741)_compile()-> _, eager_outputs = self.build_graph(*args, **kwargs)  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(759)build_graph()-> outputs = self.__build_graph(*args, **kwargs)  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(864)__build_graph()-> outputs = self.build(*lazy_args, **lazy_kwargs)  /mnt/project/machine-learning/oneflow/oneflow/test.py(21)build()-> return self.model(input)  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(234)__call__()-> result = self.__block_forward(*args, **kwargs)  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(266)__block_forward()-> result = self._origin.__class__.forward(self, *args, **kwargs)  /mnt/project/machine-learning/oneflow/oneflow/test.py(11)forward()-> return flow.matmul(input, self.weight) + self.bias  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(483)__getattr__()-> p_state = self._get_from_states(name, "_parameters")  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(521)_get_from_states()-> _s_block.try_build()  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(679)try_build()-> self._lazy_origin_builder.try_build(self)  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(627)try_build()-> self.result = self.method()> /usr/local/lib64/python3.6/site-packages/oneflow/framework/graph_build_util.py(227)build_graph_state()-> op_name, var_conf_str, ["in_0"], ["out_0"]

这个调用过程比拟容易困扰的是,执行对象会在Grpah、GraphMyLinear、ModuleMyLinear、ModuleBlock之间切换。
后面在探讨Graph的结构时曾经提过,执行self.model(input)时,Graph.__getattr__返回的属性model是ModuleBlock对象,所以理论调用的是ModuleBlock.__call__。在这个函数内调用__block_forward,其中的_origin是ModuleMyLinear,进入到它的forward办法,执行到flow.matmul(input, self.weight) + self.bias时,会触发调用ModuleBlock.__getattr__,进而调用_get_from_states,调用TensorBlock.try_build()。这里执行的就是进入lazy模式之前设置的build_graph_state。从而减少一个FeedVariableOp到计算图。

为什么设置和调用会间隔这么远呢?可能是因为,如果在援用weights等Parameter时再try_build,会不不便解决多个block共享一个tensor的状况。
再前面的步骤就是调用__build_io插入FetchOutputOp。也就是说,获取model的output也是一个op。

到目前为止,前向计算图就构建实现了。它的json示意能够参考附录。net.op是计算图的节点,通过input等属性能够看出节点之间的连贯关系。
示例代码的前向计算图如下。从这个图能够看到,input、output、weights等都是op。

5.4 逻辑图优化

__build_graph中会调用CurJobBuildAndInferCtx_Complete对动态图进行多轮优化,对应的C++函数是LazyJobBuildAndInferCtx::Complete()。

这之后生成的Job是full_job。本文的示例代码比较简单,并不是典型的计算场景,其forwar和ful计算图l的拓扑是一样的。
到这里,逻辑图构建的主体局部就完结了。

随后会构建一个CNNGraph对象,对应的C++类型是NNGraph。这个对象将负责构建物理计算图Plan。它也是整个运行时的拥有者和维护者。这个对象析构时,整个运行时也会有序终止并开释资源。

5.5 物理图的编译

接下来就是执行finish_complie_and_init_runtime,其中的外围调用是self._c_nn_graph.complie_and_init_runtime(),对应的C++函数是NNGraph::CompileAndInitRuntime。
在这个函数中,JobCompleter().Complete()会持续对逻辑图做几轮批改优化,Compiler().Compile()将逻辑图转为物理图,并持续对Plan进行批改优化。

Plan的编译是在master节点进行的。master节点会将Plan通过gRPC推送给各个worker节点,worker节点从master拉取物理计算图。

之后调用NewRuntimeBuffers创立Buffer对象,Buffer应该是次要用于过程内的信息同步。

而后就筹备初始化运行时了。

示例代码生成的compiled_job和物理图Plan的json参见附录。
最终生成的compiled逻辑图如下。框架主动插入了很多系统控制节点。

5.6 Plan的构造

示例代码输入的Plan json数据见附录。

Plan在逻辑上和compiled_job是等价的。这里次要关注task/op之间的关系。
Plan.task中的每个元素是一个task,其中的exec_sequence.exec_node对应job中的op,通常只有一个op(数组能够反对sub graph)。
exec_node.kernel_conf.op_attribute形容了op信息。其中op_conf蕴含op name信息。
kernel_conf.op_attribute.op_conf就是Job中的OperatorConf。

kernel_conf.op_attribute.arg_signature.bn_in_op2lbi体现了task/op之间的连贯关系。
bn_in_op就是blob name in op,即op输出的blob name。

以System-AutoTick-DstSubsetTick_21为例

{  "out": {    "op_name": "System-AutoTick-DstSubsetTick_21",    "blob_name": "out"  },  "in_0": {    "op_name": "System-EagerCriticalSection-Interface-End-Tick-19",    "blob_name": "out"  },  "in_1": {    "op_name": "System-AutoTick-SrcSubsetTick_20",    "blob_name": "out"  }}

exec_node.bn_in_op2regst_desc_id在task层面体现了连贯关系。这个map中的key示意输入输出,value是register id。

{  "out": "29",  "in_0": "27",  "in_1": "28"}

task.produced_regst_desc形容了对应task生产的register,consumer_task_id是消费者,
produced_regst_desc.out.regst_desc_type.data_regst_desc.lbi2blob_desc.lbi就是这个register的logic blob id。
task.consumed_regst_desc_id形容了对应task生产的register信息

6 运行时的初始化

NNGraph::CompileAndInitRuntime中,new Runtime这行代码会初始化运行时。次要做的事件包含:

  • 创立Thread
  • 告诉Thread创立Actor,Actor会创立Regst和Kernel
  • 给没有输出的source_tasks发送启动信号kStart

6.1 Runtime创立Thread

在Runtime的构造函数中,DumpThreadIdsFromPlan会将Plan中属于以后过程的task的thread id存入thread_ids_变量。AddThreads创立这些Thread对象。
Thread在结构时会创立一个物理线程,线程执行的是PollMsgChannel办法,Thread就是在这里继续期待须要解决的新音讯。
Thread只解决两类命令音讯:线程终止音讯,创立Actor的音讯。其它音讯交给Actor::ProcessMsg解决。

6.2 Runtime告诉Thread创立Actor

在Runtime的构造函数中,tasks被分为两类:source_tasks和other_tasks。在示例代码中,source_tasks是没有输出边的task。
从代码逻辑看,在Plan proto中,task的consumed_regst_desc_id字段是一个map。如果这个map的所有key都是in_ctrl,这个task就是source_tasks。

示例代码的source_tasks列表如下:

  • System-Src-WaitAndSendIds_16
  • System-AutoTick-AppendDeviceTick_9
  • System-EagerCriticalSection-Interface-End-Tick-19
  • System-EagerCriticalSection-Interface-End-Tick-25

Runtime调用HandoutTasks函数会给ActorMsgBus发送构建Actor的kConstructActor音讯。

6.3 ActorMsgBus和Thread的音讯解决

从接口看,ActorMsgBus负责音讯的发送(Actor通过ActorMsgBus发送音讯),Thread负责音讯的接管(一般音讯转给Actor解决)。

相干实体的协作关系如下

  • Actor是调度的根本单元。

    • actor_id就是task_id,是在编译Plan时就确定的。task是编译器概念,actor是对等的运行时概念。
    • task_id有特定的编码格局,从中能够解析出machine_id和thread_id。
    • 在跨网络的整个物理图Plan中,actor id相当于地址,通过它能够定位惟一的actor实体。
  • Actor之间通过ActorMsgBus::SendMsg进行音讯通信。

    • ActorMsg蕴含源和目标actor id。
    • 如果是过程内通信,ActorMsgBus将音讯发给Thread,Thread转给Actor解决音讯。
    • 如果是跨过程音讯,ActorMsgBus通过CommNet发送音讯,接管方的CommNet应该会依据actor id取得线程id,从ThreadMgr查到Thread,将音讯交给Thread解决。
  • Thread通过EnqueueActorMsg接管发给本人治理的Actor的音讯。

    • 如果是本线程内的actor之间的音讯,间接将音讯放到local队列。否则放到Channel的异步音讯队列。
    • Thread::PollMsgChannel会在结构Thread时启动的物理线程中接管Channel的音讯,一般音讯转给Actor解决。
  • Actor收到音讯后,负责LaunchKernel执行计算,计算完结后通过ActorMsgBus告诉上下游Actor。

这些对象之间的消息传递关系如下图所示

6.4 Runtime激活Actor零碎

到这里,Actor之间的协作关系根本分明了。然而整个Actor零碎还处于静止待命的状态。Actor是音讯驱动的,总要有音讯触发能力让它们转起来。

Runtime在构造函数中,给没有输出依赖的source_tasks发送kStart音讯,让Actor零碎处于可运行状态。

从DAG的角度看,只有激活source_tasks就行,这些节点给上游发送音讯,天然会触发所有Actor的执行。

但这个kStart音讯并没有启动计算图的一轮计算。因为这是在Runtime的构造函数中,还处于运行时初始化阶段。发送kStart更像是让Actor零碎处于激活状态。咱们在探讨完Actor之后,再看看计算图的每一轮计算是怎么触发的。

7 Actor

7.1 Actor的创立

Thread在创立Actor时,会先尝试创立为LightActor,如果不胜利,再尝试用事后注册的工厂创立Actor。

有几种TaskType能够用于LightActor:

  • kNormalForward,比方matmul、add等user op。
  • kCopyHd
  • kTick
  • kCollectiveBoxingGeneric

目前大概有20多种Actor的子类型。其它Actor类型依据TaskType事后注册。例如WaitAndSendIdsActor。

示例代码的各个节点对应的actor类型参见附录。

Actor相干的类关系如下(蕴含关系只是示意能够拜访到相干信息,并不意味着创立或着领有该类型对象)

7.2 Actor的初始化

Actor的构造函数个别都是空的,构建之后须要执行Init函数进行初始化。

LightActor继承自ActorBase,不是Actor的子类,有本人的Init函数实现。这里只探讨Actor的初始化。

在Actor::Init中,首先调用ConstructKernel创立kernel实例。和Operator相似,kernel也是以OpTypeCase作为注册的key,例如WaitAndSendIdsKernel。一个Actor通常只有一个kernel。

之后调用NewRegsts创立Regst。Tensor是用户侧的概念。对应的运行时概念是Regst,它持有Kernel须要读写的内存。Regst的概念比Tensor更宽泛,比方框架主动增加的管制Op也会用到Regst。
Actor将本人创立的Regst保留到produced_regsts_。

TakeOverNaiveConsumed只记录须要生产的regst id,但并不push到consumed_regsts_

TakeOverNaiveProduced既记录生产的regst id,也push到naive_produced_rs_。这种区别是为了首次执行计算时,actor能顺利执行。前面剖析Actor的音讯解决时会再回过头来讨论一下。

调用InitBnInOp2BlobInfo会初始化BlobInfo。
之后就是调用VirtualActorInit,这里容许各个Actor子类定制本人的初始化逻辑。通常会调用OF_SET_MSG_HANDLER宏设置Actor的音讯处理函数。

7.3 Actor的音讯解决

咱们以WaitAndSendIds为例,察看一下Actor的音讯解决机制。其Actor是WaitAndSendIdsActor,kernel是WaitAndSendIdsKernel。

之所以抉择这个例子,一是这个Actor比较简单;二是这是一个典型的source task,想看一下计算图是怎么被触发启动计算的。

Thread收到的音讯如果不是kStopThread或kConstructActor,就调用Actor::ProcessMsg,将音讯转给Actor解决。
ProcessMsg函数只是简略的将音讯转给handler解决。

WaitAndSendIdsActor::VirtualActorInit中,handler被设置为HandlerWaitToStart。
Runtime的构造函数中,发送的第一批音讯是给source_tasks的kStart音讯,这个音讯就由HandlerWaitToStart函数解决。

HandlerWaitToStart校验音讯类型后,将handler设置为HandlerNormal(这也是大部分Actor的默认handler),而后调用ProcessMsg,理论就是调用新设置的handler HandlerNormal。

HandlerNormal中,如果是kCmdMsg,只容许是kStart。通过音讯类型校验后,会间接调用ActUntilFail。

7.4 Act执行的条件

ActUntilFail中,Act办法是各个子类本人实现的,个别次要是启动kernel计算。
然而在执行Act之前,须要先确认:

  • Act执行依赖的数据是否都曾经就绪?(IsReadReady)
  • Act生产进去的数据,生产方是否曾经用完、并收到ack音讯确认?(IsWriteReady)

Actor有4个与此相关的成员变量

  • RegstSlot naive_produced_rs_;
  • RegstSlot inplace_produced_rs_;
  • RegstSlot naive_consumed_rs_;
  • RegstSlot inplace_consumed_rs_;

xx_produced_rs_存储的是以后Actor的上游consumer返回的、曾经应用结束的ack regst信息。(以后Actor生产的Regst存储在produced_regsts_中。)

运行时在初始化的过程中,所有Actor都没有运行过,任何Actor都不可能收到ack音讯,所以在Actor初始化时,要事后填充xx_produced_rs_,这样能力保障Actor在首次运行前是WriteReady的,能力顺利启动执行。

xx_consumed_rs_存储的是上游依赖发来的数据。它不须要事后填充。因为source_tasks没有输出依赖,天然就是ReadReady的;而xx_produced_rs_在初始化时的事后填充又保障它是WriteReady的,所以source_tasks能够间接运行。source_tasks的输入音讯发给上游,上游也会变为ReadReady,而上游在初始化后也保障是WriteReady的。整个Actor零碎就能够这样运行起来了。

7.5 Actor上下游之间的告诉机制

Act执行结束后,须要将后果数据发给上游consumer。以 WaitAndSendIds 的 Naive Produced 为例,ActUntilFail中的调用流程如下:

  • AsyncSendNaiveProducedRegstMsgToConsumer

    • VirtualAsyncSendNaiveProducedRegstMsgToConsumer

      • HandleProducedNaiveDataRegstToConsumer

        • HandleRegstToConsumer

          • EnqueueAsyncMsg

            • 如果指标线程是以后线程,ActorMsgBus::SendMsg
            • 否则,将音讯退出async_msg_queue_
          • 减少 total_reading_cnt_(这个变量示意曾经发消息给上游、但未收到的ack数量)
        • naive_produced_rs_.PopFrontRegsts
    • AsyncSendProducedCtrlRegstMsgToConsumer

留神naive_produced_rs_.PopFrontRegsts会将Regst指针从队列中删掉,相应的计数减1。
而在Actor::HandlerNormal中解决收到的kRegstMsg音讯时,如果是consumer发来的ack音讯,会调用TryUpdtStateAsProducedRegst,将Regst再增加到 naive_produced_rs_ 中,以保障以后Actor在收到所有ack后是WriteReady的;同时递加total_reading_cnt_。

Actor对依赖的上游音讯的解决是相似的。通过以下函数调用给上游发送ack音讯、告诉数据曾经用完,能够持续更新了:

  • AsyncSendNaiveConsumedRegstMsgToProducer
  • AsyncRetInplaceConsumedRegstIfNoConsumer
    在Actor::HandlerNormal中收到kRegstMsg音讯后,将音讯增加到consumed_rs_,以保障以后Actor在收到所有依赖数据后是ReadReady的。

LightActor有本人的音讯解决机制,大抵原理应该是差不多的。

7.6 Act执行的动作

根据上述探讨,Actor收到kRegstMsg后也会进入ActUntilFail执行。如果读写都是Ready,就执行Act。以WaitAndSendIdsActor为例,次要调用链路如下:

  • AsyncLaunchKernel
  • ek.kernel->Launch,启动Kernel计算
  • Forward
  • ForwardDataContent
  • buffer->Pull
  • 给regst的存储地址mut_dptr赋值

buffer->Pull会期待条件变量的告诉。当初,看上去所有Actor都已准备就绪,只等发令枪一响就开跑了。

8 启动动态图的计算

Graph.__run会扣动发令枪的板机,启动计算图的一轮计算。
次要调用流程如下:

  • RunLazyNNGraph
  • builder->LaunchLazyJob
  • LaunchLazyJobInstructionType
  • Buffer::Push

这里的Buffer::Push就是WaitAndSendIdsKernel在期待的起跑信号。

9 运行时的退出机制

整个运行时蕴含很多对象和资源,平安有序的退出是庞杂而又粗疏的工作。这里仅以WaitAndSendIds为例,从一个侧面察看一下运行时的退出机制。

运行时的退出始于NNGraph对象的析构。

9.1 Actor的退出

  • NNGraph在析构时,会敞开所有的Buffer对象。
  • Buffer在敞开时,会设置is_closed_ = true并告诉所有监听者。然而Pull会持续解决完曾经提交的计算。

    • 所以,Buffer应该是次要用于过程内的通信和异步协调的一个类。
  • WaitAndSendIdsKernel这时候正在期待新一轮计算开始,后果收到Pull返回的kBufferStatusErrorClosed。
  • WaitAndSendIdsActor::IsCustomizedReadReady当前就始终返回false,IsReadReady也返回false。

    • 这之后,ActUntilFail只会执行异步音讯发送(不再进入while循环)
  • WaitAndSendIdsActor::HandlerNormal依然会解决其它Actor发来的音讯。但因为IsCustomizedReadReady返回false,会进入AsyncSendEORDMsgForAllProducedRegstDesc执行。它会给每个上游发送kEordMsg音讯。
  • Actor在收到上游发来的kEordMsg音讯后,递加remaining_eord_cnt_。

    • remaining_eord_cnt_被初始化为Actor的输出regst的数量。
  • total_reading_cnt_是以后Actor生产的、曾经发给consumer、但尚未收到ack的音讯数量。

    • Actor目前仍能够失常接管consumer发来的ack音讯。
  • 当上述2个变量都为0时,意味着所有上游都收回了kEordMsg音讯,也收到了所有上游的ack音讯。Actor就给Thread返回1。

    • 如果上述两个变量有不为0的,就批改handler,由HandlerZombie解决后续收到的音讯。
  • Thread收到Actor返回的1后,将它从本人的存储中删除,并递加运行Actor的数量。

9.2 Thread的退出

  • NNGraph重置runtime_导致运行时对象被析构。
  • Runtime删除所有Thread。
  • ThreadMgr给所有Thread发送kStopThread音讯。同时,重置指针导致Thread析构。
  • Thread的物理线程退出PollMsgChannel循环。
  • Thread期待物理线程完结,敞开channel。

10 分布式场景的动态图

分布式的compile_job、物理图Plan和单机场景有显著变动。
比方,每个过程都有一套WaitAndSendIds等管制节点。这也容易了解,因为每个节点都要执行__runBuffer::Push/Pull,都要启动本过程的Actors执行计算。
matmul和broadcast_add等user op也会在两个节点进行计算。

10.1 示例代码

启动形式参考Global Tensor的官网文档。

import oneflow as flowimport oneflow.nn as nnP0 = flow.placement("cpu", ranks=[0, 1])a0_sbp = flow.sbp.split(0)class ModuleMyLinear(nn.Module):    def __init__(self, in_features, out_features):        super().__init__()        self.weight = nn.Parameter(flow.randn(in_features, out_features,            placement=P0, sbp=flow.sbp.broadcast))        self.bias = nn.Parameter(flow.randn(1, out_features,            placement=P0, sbp=flow.sbp.broadcast))    def forward(self, input):        return flow.matmul(input, self.weight) + self.biaslinear_model = ModuleMyLinear(4, 3)class GraphMyLinear(nn.Graph):    def __init__(self):        super().__init__()        # ModuleBlock        self.model = linear_model    def build(self, input):        # ModuleBlock.__call__        return self.model(input)graph_mylinear = GraphMyLinear()input = flow.randn(5, 4, placement=P0, sbp=flow.sbp.split(1))out = graph_mylinear(input)print(out)

11 附录

11.1 断点

11.1.1 Python断点示例

# python3 -m pdb test.pybreak test.py:25break oneflow/nn/graph/graph.py:221break oneflow/nn/graph/graph.py:741break oneflow/nn/graph/graph.py:745break oneflow/nn/graph/graph.py:759break oneflow/nn/graph/graph.py:828break oneflow/nn/graph/graph.py:777break oneflow/nn/graph/graph.py:1066break oneflow/nn/graph/graph.py:1133break oneflow/framework/graph_build_util.py:227

11.1.2 C++断点示例

启动命令

source /mnt/oneflow/build/source.shgdb --args python3 /mnt/oneflow/test.py# set breakpoints# run

断点示例

set breakpoint pending onbreak oneflow::ActorMsg::BuildEordMsgbreak oneflow/core/common/buffer.h:80break oneflow::(anonymous namespace)::CheckAndConstructOpbreak oneflow::WaitAndSendIdsActor::Actbreak oneflow::WaitAndSendIdsActor::HandlerWaitToStartbreak oneflow/core/lazy/actor/light_actor.cpp:452break oneflow/core/lazy/actor/light_actor.cpp:485break oneflow::ForeignInputKernel::ForwardDataContentbreak oneflow::vm::LaunchLazyJobInstructionType::Compute

11.2 动态图的json示意

  • forward
  • full
  • compiled
  • plan

11.3 actor type

naive_actor

System-AutoTick-AppendDeviceTick_9System-AutoTick-DstSubsetTick_12System-AutoTick-DstSubsetTick_21System-AutoTick-DstSubsetTick_27System-AutoTick-Prepend-DeviceTick_7System-AutoTick-SrcSubsetTick_20System-AutoTick-SrcSubsetTick_26System-AutoTick-SrcSubsetTick_8System-AutoTick-Tick_11System-AutoTick-Tick_13System-EagerCriticalSection-Callback-23System-EagerCriticalSection-Callback-29System-EagerCriticalSection-Interface-Begin-Tick-18System-EagerCriticalSection-Interface-Begin-Tick-24System-EagerCriticalSection-Interface-End-Tick-19System-EagerCriticalSection-Interface-End-Tick-25System-EagerCriticalSection-Wait-22System-EagerCriticalSection-Wait-28

light_actor

_GraphMyLinear_0_input.0.0_2_GraphMyLinear_0_output.0.0_2model.biasmodel-broadcast_add-1model-matmul-0model.weightSystem-AutoTick-SinkTick_15System-SyncAllRanksSinkTick_14

wait_and_send_ids_actor

System-Src-WaitAndSendIds_16

call_back_notify_actor

System-Sink-CallbackNotify_17

12 参考资料

  • oneflow v0.8.0
  • OneFlow框架的零碎设计(上篇)
  • OneFlow框架的零碎设计(中篇)
  • OneFlow框架的零碎设计(下篇)
  • 一个Job在OneFlow中的执行过程—上篇
  • 一个Job在OneFlow中的执行过程—中篇
  • 一个Job在OneFlow中的执行过程—下篇
  • 动态图模块 nn.Graph
  • OneFlow零碎设计
  • torch.nn.Module