oneflow动态图的训练效率远高于动态图(eager模式)。本文试图通过一个简略例子,联合0.8.0的代码,看一下动态图和运行时的实现机制。
在开始之前,倡议先读一下参考资料中《OneFlow框架的零碎设计》等系列文章。对动态图、运行时的基本概念和设计理念有根本的理解,会更容易了解代码。
1 代码示例
上面的示例代码来自官网文档,是一个线性模型的前向计算。后续次要基于这段代码进行剖析。
import oneflow as flowimport oneflow.nn as nnclass ModuleMyLinear(nn.Module): def __init__(self, in_features, out_features): super().__init__() self.weight = nn.Parameter(flow.randn(in_features, out_features)) self.bias = nn.Parameter(flow.randn(out_features)) def forward(self, input): return flow.matmul(input, self.weight) + self.biaslinear_model = ModuleMyLinear(4, 3)class GraphMyLinear(nn.Graph): def __init__(self): super().__init__() # ModuleBlock self.model = linear_model def build(self, input): # ModuleBlock.__call__ return self.model(input)graph_mylinear = GraphMyLinear()input = flow.randn(1, 4)out = graph_mylinear(input)print(out)
2 oneflow包的初始化
import oneflow
在初始化包时,与动态图相干的次要操作如下:
GetEnv
EnvGlobalObjectsScope::Init
- 启动各个节点的管制面网络连接
- 初始化VM
- 启动各个节点的数据面网络连接
- 初始化KernelObserver
NewDefaultSession
- RegsiterSession 创立 Session,并注册为 default session
创立 Python MultiClientSession 并保留到dict,但并不 TryInit
- 创立 C++ MultiClientSessionContext 但并不 TryInit
EnvGlobalObjectsScope::Init
中先创立一个全局的ProcessCtx对象。而后依据环境变量等配置,在各个过程间创立gRPC和CommNet的连贯,别离负责管制面和数据面的数据传输。其中在Bootstrap过程中会初始化全局的ProcessCtx,给每个过程调配一个全局惟一的rank编号(machine_id)。
本文不波及网络层面的操作,只探讨同一过程内各线程间的交互。
3 Module类
尽管能够间接用op和tensor结构模型,然而op的粒度太细了,间接用op结构模型会比拟繁琐。
Module是由op和tensor形成的、可复用的子模块。利用Module能够更高效、更快捷的构建简单模型。oneflow.nn模块导出了很多预约义的Module。
Module定义了本人的属性设置逻辑,外围逻辑是
- 如果value是Parameter类型,就保留到
Module._parameters
中 - 如果value是Module类型,就保留到
Module._modules
中 - 如果value是Tensor类型,就保留到
Module._buffers
中 - 否则按惯例属性解决
Module能够蕴含子Module,造成树结构。因为Module通过setattr将子Module和Parameter都保留到字典构造中,能够不便的遍历所有Module及其参数tensor。
4 Graph类
4.1 构造函数
Graph的构造函数中GetDefaultSession失去的session,就是导入oneflow包时NewDefaultSession构建的session。过后没有初始化,而是在Graph结构时进行初始化。对应的C++函数是MultiClientSessionContext::TryInit,执行时会创立各种全局的资源管理器,比方:
- LazyJobBuildAndInferCtxMgr
- BufferMgr
- RegstMgr
- ActorMsgBus
- ThreadMgr
4.2 __setattr__
: 将Module和Tensor封装为Block
Graph.__setattr__
不容许将Tensor对象设置为属性。Tensor只能存到Module中。
setattr最重要的动作就是对_add_block的调用,_add_block
中次要是调用get_block_cls并保留后果。
get_block_cls的作用是将Module及其所有Tensor属性都转为对应的Block对象。为什么要做这个动作呢?次要是动态图编译须要借助Block类型的一些性能,这些性能不适宜间接加诸于Module和Tensor。
这个转换是在ModuleBlock结构时调用set_origin实现的。对于子Module,会递归调用get_block_cls函数,这样所有子Module及其Tensor属性都会被转换为对应的Block对象。
所以,上述示例代码中,GraphMyLinear理论存储的是ModuleBlock,Graph.build
执行时获取的model属性也是ModuleBlock对象,ModuleBlock.origin
才是ModuleMyLinear。
4.3 针对不同工作,定义不同的计算图
依据Oneflow Model Zoo的模型示例,train/eval/infer等阶段能够创立不同的Graph子类。在这些不同阶段,Graph构造函数的行为、build函数的输入输出都有各自特点。理解这些,看后续代码时会更容易了解各个参数的具体含意。
构造函数
- train阶段,构造函数会引入Module、损失函数、优化器和data_loader
- eval阶段,只须要引入Module和data_loader
- infer阶段,只须要引入Module
build函数
train
- 导入样本和label
- 调用Module失去前向计算结果
- 计算损失
- 计算梯度
- 返回loss
eval
- 导入样本和label
- 调用Module失去预估后果
- 返回预估后果和label
infer
- 传入的参数包含样本
- 调用Module失去预估后果
- 返回预估后果
4.4 小结
上述几个类型的关系如下:
上面形容了GraphMyLinear的结构流程
* `__init__` * `Graph.__init__` * self.model = linear_model * `Graph.__setattr__` * _add_block * get_block_cls: 递归地把Module转为ModuleBlock * `ModuleBlock.__init__` * ModuleBlock.set_origin * `ModuleBlock._origin = origin` (Module) * 对origin的sub modules, parameters, buffers递归调用get_block_cls * `ModuleBlock.__setattr__`
5 逻辑图的编译
计算机语言的编译,是将高级语言的语句编译为汇编或机器指令。深度学习框架对计算工作的编译,是将用户的特定语句操作转换为DAG图。oneflow中用Job形容逻辑的计算图。
不同于eager模式的动态图,动态图在开始执行前能够失去整个计算工作的所有信息,能够对DAG进行多轮优化。每轮优化都是输出一个Job、失去一个新Job。
最初,依据分布式环境配置,将逻辑图Job转换为物理执行的计算图Plan。在物理图中,一个op可能散布在多个节点/过程。
启动DAG计算须要调用Graph.__call__
,这个函数的执行次要分以下几个步骤:
__call__
_compile if not _is_compiled
build_graph
- __build_graph
- finish_complie_and_init_runtime
- __run
逻辑图编译次要在__build_graph
中进行。finish_complie_and_init_runtime
会持续做一些优化pass,而后构建物理图、初始化运行时Actor零碎。__run
会启动一次DAG的运算。
5.1 graph_build_context: 为逻辑图编译设置根本环境
__build_graph
中的graph_build_context尽管只有一行代码,但却做了几件十分重要的事件。
首先在context作用域内设置全局的lazy_mode为True。在这个context作用域内,所有op都由LazyInterpreter解释执行。
其次,在JobBuildAndInferCtx作用域内,JobBuildAndInferCtx_Open调用相似如下C++代码
// oneflow/api/python/job_build/job_build_and_infer.h// oneflow/core/job/job_build_and_infer_ctx_mgr.cpp// 如前所述,LazyJobBuildAndInferCtxMgr 在 MultiClientSessionContext::TryInit 执行时初始化。// LazyJobBuildAndInferCtxMgr mgr;mgr.OpenJobBuildAndInferCtx(job_name);
OpenJobBuildAndInferCtx会新建一个Job对象、一个LazyJobBuildAndInferCtx对象。LazyJobBuildAndInferCtx负责依据用户定制的op等操作,批改Job。
5.2 __build_io
:为计算图增加input和output Op
self.__build_io("input", graph_build_util.build_graph_input_arg, *args, **kwargs)
下面这行代码的作用是,对于用户传递给graph_mylinear(input)
的input参数,针对其中的每个tensor都在逻辑计算图中插入一个FeedInputOp节点。也就是说,model的输出(比方样本tensor,具体参考4.3节),在动态图中也视为一个op操作。
__build_io
内会用args(即input)和kwargs结构一个ArgsTree。示例代码中kwargs是空的。
而后遍历ArgsTree,对args和kwargs的每个tensor都调用传入的build_func,对于input来说,就是build_graph_input_arg。前面会看到,model的output也会调用__build_io
,所以这个函数名的意思应该就是对model的输出、输入进行动态图的构图工作。
build_graph_input_arg外部会结构一个FeedInputOpExpr,提交给解释器执行。因为是在lazy作用域内,由LazyInterpreter解释执行,LazyInterpreter会将对应的op插入动态图。
附: build input时ArgsTree的内部结构
__build_io(input)
中 ArgsTree 的外部数据组织示意
_named_io_args
: NamedArg_value
: tuple[0]
: NamedArg_value
: tuple of NamedArg[0]
: NamedArg_value
: args tensor fromGraph.__call__
[1]
: NamedArg_value
: empty kwargs fromGraph.__call__
通过pdb命令能够查看变量: p args_tree._named_io_args._value[0]._value[0]._value.to_numpy()
5.2.1 将op增加到逻辑图
LazyInterpreter::ApplyImpl在执行时,GetCurInferCtx()返回的就是graph_build_context中OpenJobBuildAndInferCtx创立的那个LazyJobBuildAndInferCtx对象,这个对象负责逻辑图的构建。增加op的次要调用流程如下:
- infer_ctx->AddAndInferConsistentOp
- AddAndInferOp
- ConstructOp
- CheckAndConstructOp
- NewObj
OperatorConf中,多种op配置共享op_type字段,protobuf oneof的op_type_case常量作为注册NewObj的key。
零碎预约义的op在oneflow/core/operator下,例如UserOp。
AddAndInferOp将返回的Operator保留到LazyJobBuildAndInferCtx的字典中。后续的函数调用,次要是进行推导并批改动态图Job,使得各个节点形成一个DAG。
JobBuildAndInferCtx相干的类关系如下:
5.2.2 lazy tensor 和 eager tensor 的区别
LazyInterpreter::ApplyImpl的最初,会调用BuildTensor结构一个lazy tensor,作为build_graph_input_arg的返回值。所以__build_io
返回的lazy_args是lazy tensor,它将代替eager的args(也就是用户输出的input)参加后续的计算图构建。
那么lazy tensor和eager tensor的区别是什么呢?我的了解是,eager tensor是要即时计算的,所以须要携带数据;而lazy tensor仅在动态图编译阶段用于推导,只须要形容性质的元信息,不须要相似样本那样的数据。同时,动态图编译是在lazy模式下运行,应用lazy tensor在各种查看校验时应该会更顺畅(?)。
前面会看到,动态图的运行期曾经没有tensor的概念。运行期看到的只是更狭义的Regst存储,可能代表tensor/blob,也可能是其它管制信息。动态图运行时的输出,应该是通过op间接读取tensor的blob(或者复用地址?)到regst;输入应该是op写到regst,通过blob结构eager tensor。
5.3 build: 将UserOp和FeedVariableOp增加到逻辑图
__build_graph
中的self.build()会调用GraphMyLinear.build()
,以及ModuleMyLinear.forward()
。因为是在lazy模式下运行,matmul和add都会调用UserOpExpr重载版本的LazyInterpreter::ApplyImpl,进而调用AddAndInferConsistentOp进行构图操作。
须要阐明的是,在援用Module的Parameter属性时(如weight/bias),会触发FeedVariableOp的构图操作、调用对应版本的LazyInterpreter::ApplyImpl。这个是怎么执行的呢?
__build_graph
中,在进入lazy模式之前,先调用了_create_states_builder。其中self._state()返回所有Module的所有Parameter(包含子Module)。
state_block的类型是TensorBlock。所有的state_block的lazy_origin_builder().method都被设置为调用build_graph_state。
给build_graph_state设置个断点能让整个调用过程显形,次要的调用栈如下:
-> out = graph_mylinear(input) /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(221)__call__()-> self._compile(*args, **kwargs) /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(741)_compile()-> _, eager_outputs = self.build_graph(*args, **kwargs) /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(759)build_graph()-> outputs = self.__build_graph(*args, **kwargs) /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(864)__build_graph()-> outputs = self.build(*lazy_args, **lazy_kwargs) /mnt/project/machine-learning/oneflow/oneflow/test.py(21)build()-> return self.model(input) /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(234)__call__()-> result = self.__block_forward(*args, **kwargs) /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(266)__block_forward()-> result = self._origin.__class__.forward(self, *args, **kwargs) /mnt/project/machine-learning/oneflow/oneflow/test.py(11)forward()-> return flow.matmul(input, self.weight) + self.bias /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(483)__getattr__()-> p_state = self._get_from_states(name, "_parameters") /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(521)_get_from_states()-> _s_block.try_build() /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(679)try_build()-> self._lazy_origin_builder.try_build(self) /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(627)try_build()-> self.result = self.method()> /usr/local/lib64/python3.6/site-packages/oneflow/framework/graph_build_util.py(227)build_graph_state()-> op_name, var_conf_str, ["in_0"], ["out_0"]
这个调用过程比拟容易困扰的是,执行对象会在Grpah、GraphMyLinear、ModuleMyLinear、ModuleBlock之间切换。
后面在探讨Graph的结构时曾经提过,执行self.model(input)
时,Graph.__getattr__
返回的属性model是ModuleBlock对象,所以理论调用的是ModuleBlock.__call__
。在这个函数内调用__block_forward,其中的_origin是ModuleMyLinear,进入到它的forward
办法,执行到flow.matmul(input, self.weight) + self.bias
时,会触发调用ModuleBlock.__getattr__
,进而调用_get_from_states,调用TensorBlock.try_build()。这里执行的就是进入lazy模式之前设置的build_graph_state。从而减少一个FeedVariableOp到计算图。
为什么设置和调用会间隔这么远呢?可能是因为,如果在援用weights等Parameter时再try_build,会不不便解决多个block共享一个tensor的状况。
再前面的步骤就是调用__build_io插入FetchOutputOp。也就是说,获取model的output也是一个op。
到目前为止,前向计算图就构建实现了。它的json示意能够参考附录。net.op是计算图的节点,通过input等属性能够看出节点之间的连贯关系。
示例代码的前向计算图如下。从这个图能够看到,input、output、weights等都是op。
5.4 逻辑图优化
在__build_graph
中会调用CurJobBuildAndInferCtx_Complete对动态图进行多轮优化,对应的C++函数是LazyJobBuildAndInferCtx::Complete()。
这之后生成的Job是full_job。本文的示例代码比较简单,并不是典型的计算场景,其forwar和ful计算图l的拓扑是一样的。
到这里,逻辑图构建的主体局部就完结了。
随后会构建一个CNNGraph对象,对应的C++类型是NNGraph。这个对象将负责构建物理计算图Plan。它也是整个运行时的拥有者和维护者。这个对象析构时,整个运行时也会有序终止并开释资源。
5.5 物理图的编译
接下来就是执行finish_complie_and_init_runtime,其中的外围调用是self._c_nn_graph.complie_and_init_runtime(),对应的C++函数是NNGraph::CompileAndInitRuntime。
在这个函数中,JobCompleter().Complete()会持续对逻辑图做几轮批改优化,Compiler().Compile()将逻辑图转为物理图,并持续对Plan进行批改优化。
Plan的编译是在master节点进行的。master节点会将Plan通过gRPC推送给各个worker节点,worker节点从master拉取物理计算图。
之后调用NewRuntimeBuffers创立Buffer对象,Buffer应该是次要用于过程内的信息同步。
而后就筹备初始化运行时了。
示例代码生成的compiled_job和物理图Plan的json参见附录。
最终生成的compiled逻辑图如下。框架主动插入了很多系统控制节点。
5.6 Plan的构造
示例代码输入的Plan json数据见附录。
Plan在逻辑上和compiled_job是等价的。这里次要关注task/op之间的关系。
Plan.task中的每个元素是一个task,其中的exec_sequence.exec_node对应job中的op,通常只有一个op(数组能够反对sub graph)。
exec_node.kernel_conf.op_attribute形容了op信息。其中op_conf蕴含op name信息。
kernel_conf.op_attribute.op_conf就是Job中的OperatorConf。
kernel_conf.op_attribute.arg_signature.bn_in_op2lbi体现了task/op之间的连贯关系。
bn_in_op就是blob name in op,即op输出的blob name。
以System-AutoTick-DstSubsetTick_21为例
{ "out": { "op_name": "System-AutoTick-DstSubsetTick_21", "blob_name": "out" }, "in_0": { "op_name": "System-EagerCriticalSection-Interface-End-Tick-19", "blob_name": "out" }, "in_1": { "op_name": "System-AutoTick-SrcSubsetTick_20", "blob_name": "out" }}
exec_node.bn_in_op2regst_desc_id在task层面体现了连贯关系。这个map中的key示意输入输出,value是register id。
{ "out": "29", "in_0": "27", "in_1": "28"}
task.produced_regst_desc形容了对应task生产的register,consumer_task_id是消费者,
produced_regst_desc.out.regst_desc_type.data_regst_desc.lbi2blob_desc.lbi就是这个register的logic blob id。
task.consumed_regst_desc_id形容了对应task生产的register信息
6 运行时的初始化
NNGraph::CompileAndInitRuntime中,new Runtime这行代码会初始化运行时。次要做的事件包含:
- 创立Thread
- 告诉Thread创立Actor,Actor会创立Regst和Kernel
- 给没有输出的source_tasks发送启动信号kStart
6.1 Runtime创立Thread
在Runtime的构造函数中,DumpThreadIdsFromPlan会将Plan中属于以后过程的task的thread id存入thread_ids_
变量。AddThreads创立这些Thread对象。
Thread在结构时会创立一个物理线程,线程执行的是PollMsgChannel办法,Thread就是在这里继续期待须要解决的新音讯。
Thread只解决两类命令音讯:线程终止音讯,创立Actor的音讯。其它音讯交给Actor::ProcessMsg解决。
6.2 Runtime告诉Thread创立Actor
在Runtime的构造函数中,tasks被分为两类:source_tasks和other_tasks。在示例代码中,source_tasks是没有输出边的task。
从代码逻辑看,在Plan proto中,task的consumed_regst_desc_id字段是一个map。如果这个map的所有key都是in_ctrl,这个task就是source_tasks。
示例代码的source_tasks列表如下:
- System-Src-WaitAndSendIds_16
- System-AutoTick-AppendDeviceTick_9
- System-EagerCriticalSection-Interface-End-Tick-19
- System-EagerCriticalSection-Interface-End-Tick-25
Runtime调用HandoutTasks函数会给ActorMsgBus发送构建Actor的kConstructActor音讯。
6.3 ActorMsgBus和Thread的音讯解决
从接口看,ActorMsgBus负责音讯的发送(Actor通过ActorMsgBus发送音讯),Thread负责音讯的接管(一般音讯转给Actor解决)。
相干实体的协作关系如下
Actor是调度的根本单元。
- actor_id就是task_id,是在编译Plan时就确定的。task是编译器概念,actor是对等的运行时概念。
- task_id有特定的编码格局,从中能够解析出machine_id和thread_id。
- 在跨网络的整个物理图Plan中,actor id相当于地址,通过它能够定位惟一的actor实体。
Actor之间通过ActorMsgBus::SendMsg进行音讯通信。
- ActorMsg蕴含源和目标actor id。
- 如果是过程内通信,ActorMsgBus将音讯发给Thread,Thread转给Actor解决音讯。
- 如果是跨过程音讯,ActorMsgBus通过CommNet发送音讯,接管方的CommNet应该会依据actor id取得线程id,从ThreadMgr查到Thread,将音讯交给Thread解决。
Thread通过EnqueueActorMsg接管发给本人治理的Actor的音讯。
- 如果是本线程内的actor之间的音讯,间接将音讯放到local队列。否则放到Channel的异步音讯队列。
- Thread::PollMsgChannel会在结构Thread时启动的物理线程中接管Channel的音讯,一般音讯转给Actor解决。
- Actor收到音讯后,负责LaunchKernel执行计算,计算完结后通过ActorMsgBus告诉上下游Actor。
这些对象之间的消息传递关系如下图所示
6.4 Runtime激活Actor零碎
到这里,Actor之间的协作关系根本分明了。然而整个Actor零碎还处于静止待命的状态。Actor是音讯驱动的,总要有音讯触发能力让它们转起来。
Runtime在构造函数中,给没有输出依赖的source_tasks发送kStart音讯,让Actor零碎处于可运行状态。
从DAG的角度看,只有激活source_tasks就行,这些节点给上游发送音讯,天然会触发所有Actor的执行。
但这个kStart音讯并没有启动计算图的一轮计算。因为这是在Runtime的构造函数中,还处于运行时初始化阶段。发送kStart更像是让Actor零碎处于激活状态。咱们在探讨完Actor之后,再看看计算图的每一轮计算是怎么触发的。
7 Actor
7.1 Actor的创立
Thread在创立Actor时,会先尝试创立为LightActor,如果不胜利,再尝试用事后注册的工厂创立Actor。
有几种TaskType能够用于LightActor:
- kNormalForward,比方matmul、add等user op。
- kCopyHd
- kTick
- kCollectiveBoxingGeneric
目前大概有20多种Actor的子类型。其它Actor类型依据TaskType事后注册。例如WaitAndSendIdsActor。
示例代码的各个节点对应的actor类型参见附录。
Actor相干的类关系如下(蕴含关系只是示意能够拜访到相干信息,并不意味着创立或着领有该类型对象)
7.2 Actor的初始化
Actor的构造函数个别都是空的,构建之后须要执行Init函数进行初始化。
LightActor继承自ActorBase,不是Actor的子类,有本人的Init函数实现。这里只探讨Actor的初始化。
在Actor::Init中,首先调用ConstructKernel创立kernel实例。和Operator相似,kernel也是以OpTypeCase作为注册的key,例如WaitAndSendIdsKernel。一个Actor通常只有一个kernel。
之后调用NewRegsts创立Regst。Tensor是用户侧的概念。对应的运行时概念是Regst,它持有Kernel须要读写的内存。Regst的概念比Tensor更宽泛,比方框架主动增加的管制Op也会用到Regst。
Actor将本人创立的Regst保留到produced_regsts_。
TakeOverNaiveConsumed只记录须要生产的regst id,但并不push到consumed_regsts_
。
TakeOverNaiveProduced既记录生产的regst id,也push到naive_produced_rs_。这种区别是为了首次执行计算时,actor能顺利执行。前面剖析Actor的音讯解决时会再回过头来讨论一下。
调用InitBnInOp2BlobInfo会初始化BlobInfo。
之后就是调用VirtualActorInit,这里容许各个Actor子类定制本人的初始化逻辑。通常会调用OF_SET_MSG_HANDLER宏设置Actor的音讯处理函数。
7.3 Actor的音讯解决
咱们以WaitAndSendIds为例,察看一下Actor的音讯解决机制。其Actor是WaitAndSendIdsActor,kernel是WaitAndSendIdsKernel。
之所以抉择这个例子,一是这个Actor比较简单;二是这是一个典型的source task,想看一下计算图是怎么被触发启动计算的。
Thread收到的音讯如果不是kStopThread或kConstructActor,就调用Actor::ProcessMsg,将音讯转给Actor解决。
ProcessMsg函数只是简略的将音讯转给handler解决。
WaitAndSendIdsActor::VirtualActorInit中,handler被设置为HandlerWaitToStart。
Runtime的构造函数中,发送的第一批音讯是给source_tasks的kStart音讯,这个音讯就由HandlerWaitToStart函数解决。
HandlerWaitToStart校验音讯类型后,将handler设置为HandlerNormal(这也是大部分Actor的默认handler),而后调用ProcessMsg,理论就是调用新设置的handler HandlerNormal。
HandlerNormal中,如果是kCmdMsg,只容许是kStart。通过音讯类型校验后,会间接调用ActUntilFail。
7.4 Act执行的条件
ActUntilFail中,Act办法是各个子类本人实现的,个别次要是启动kernel计算。
然而在执行Act之前,须要先确认:
- Act执行依赖的数据是否都曾经就绪?(IsReadReady)
- Act生产进去的数据,生产方是否曾经用完、并收到ack音讯确认?(IsWriteReady)
Actor有4个与此相关的成员变量
- RegstSlot naive_produced_rs_;
- RegstSlot inplace_produced_rs_;
- RegstSlot naive_consumed_rs_;
- RegstSlot inplace_consumed_rs_;
xx_produced_rs_
存储的是以后Actor的上游consumer返回的、曾经应用结束的ack regst信息。(以后Actor生产的Regst存储在produced_regsts_
中。)
运行时在初始化的过程中,所有Actor都没有运行过,任何Actor都不可能收到ack音讯,所以在Actor初始化时,要事后填充xx_produced_rs_
,这样能力保障Actor在首次运行前是WriteReady的,能力顺利启动执行。
xx_consumed_rs_
存储的是上游依赖发来的数据。它不须要事后填充。因为source_tasks没有输出依赖,天然就是ReadReady的;而xx_produced_rs_
在初始化时的事后填充又保障它是WriteReady的,所以source_tasks能够间接运行。source_tasks的输入音讯发给上游,上游也会变为ReadReady,而上游在初始化后也保障是WriteReady的。整个Actor零碎就能够这样运行起来了。
7.5 Actor上下游之间的告诉机制
Act执行结束后,须要将后果数据发给上游consumer。以 WaitAndSendIds 的 Naive Produced 为例,ActUntilFail中的调用流程如下:
AsyncSendNaiveProducedRegstMsgToConsumer
VirtualAsyncSendNaiveProducedRegstMsgToConsumer
HandleProducedNaiveDataRegstToConsumer
HandleRegstToConsumer
EnqueueAsyncMsg
- 如果指标线程是以后线程,ActorMsgBus::SendMsg
- 否则,将音讯退出async_msg_queue_
- 减少 total_reading_cnt_(这个变量示意曾经发消息给上游、但未收到的ack数量)
- naive_produced_rs_.PopFrontRegsts
- AsyncSendProducedCtrlRegstMsgToConsumer
留神naive_produced_rs_.PopFrontRegsts会将Regst指针从队列中删掉,相应的计数减1。
而在Actor::HandlerNormal中解决收到的kRegstMsg音讯时,如果是consumer发来的ack音讯,会调用TryUpdtStateAsProducedRegst,将Regst再增加到 naive_produced_rs_ 中,以保障以后Actor在收到所有ack后是WriteReady的;同时递加total_reading_cnt_。
Actor对依赖的上游音讯的解决是相似的。通过以下函数调用给上游发送ack音讯、告诉数据曾经用完,能够持续更新了:
- AsyncSendNaiveConsumedRegstMsgToProducer
- AsyncRetInplaceConsumedRegstIfNoConsumer
在Actor::HandlerNormal中收到kRegstMsg音讯后,将音讯增加到consumed_rs_,以保障以后Actor在收到所有依赖数据后是ReadReady的。
LightActor有本人的音讯解决机制,大抵原理应该是差不多的。
7.6 Act执行的动作
根据上述探讨,Actor收到kRegstMsg后也会进入ActUntilFail执行。如果读写都是Ready,就执行Act。以WaitAndSendIdsActor为例,次要调用链路如下:
- AsyncLaunchKernel
- ek.kernel->Launch,启动Kernel计算
- Forward
- ForwardDataContent
- buffer->Pull
- 给regst的存储地址mut_dptr赋值
buffer->Pull会期待条件变量的告诉。当初,看上去所有Actor都已准备就绪,只等发令枪一响就开跑了。
8 启动动态图的计算
Graph.__run会扣动发令枪的板机,启动计算图的一轮计算。
次要调用流程如下:
- RunLazyNNGraph
- builder->LaunchLazyJob
- LaunchLazyJobInstructionType
- Buffer::Push
这里的Buffer::Push就是WaitAndSendIdsKernel
在期待的起跑信号。
9 运行时的退出机制
整个运行时蕴含很多对象和资源,平安有序的退出是庞杂而又粗疏的工作。这里仅以WaitAndSendIds为例,从一个侧面察看一下运行时的退出机制。
运行时的退出始于NNGraph对象的析构。
9.1 Actor的退出
- NNGraph在析构时,会敞开所有的Buffer对象。
Buffer在敞开时,会设置is_closed_ = true并告诉所有监听者。然而Pull会持续解决完曾经提交的计算。
- 所以,Buffer应该是次要用于过程内的通信和异步协调的一个类。
- WaitAndSendIdsKernel这时候正在期待新一轮计算开始,后果收到Pull返回的kBufferStatusErrorClosed。
WaitAndSendIdsActor::IsCustomizedReadReady当前就始终返回false,IsReadReady也返回false。
- 这之后,ActUntilFail只会执行异步音讯发送(不再进入while循环)
- WaitAndSendIdsActor::HandlerNormal依然会解决其它Actor发来的音讯。但因为IsCustomizedReadReady返回false,会进入AsyncSendEORDMsgForAllProducedRegstDesc执行。它会给每个上游发送kEordMsg音讯。
Actor在收到上游发来的kEordMsg音讯后,递加remaining_eord_cnt_。
- remaining_eord_cnt_被初始化为Actor的输出regst的数量。
total_reading_cnt_
是以后Actor生产的、曾经发给consumer、但尚未收到ack的音讯数量。- Actor目前仍能够失常接管consumer发来的ack音讯。
当上述2个变量都为0时,意味着所有上游都收回了kEordMsg音讯,也收到了所有上游的ack音讯。Actor就给Thread返回1。
- 如果上述两个变量有不为0的,就批改handler,由HandlerZombie解决后续收到的音讯。
- Thread收到Actor返回的1后,将它从本人的存储中删除,并递加运行Actor的数量。
9.2 Thread的退出
- NNGraph重置runtime_导致运行时对象被析构。
- Runtime删除所有Thread。
- ThreadMgr给所有Thread发送kStopThread音讯。同时,重置指针导致Thread析构。
- Thread的物理线程退出PollMsgChannel循环。
- Thread期待物理线程完结,敞开channel。
10 分布式场景的动态图
分布式的compile_job、物理图Plan和单机场景有显著变动。
比方,每个过程都有一套WaitAndSendIds等管制节点。这也容易了解,因为每个节点都要执行__run
和Buffer::Push/Pull
,都要启动本过程的Actors执行计算。
matmul和broadcast_add等user op也会在两个节点进行计算。
10.1 示例代码
启动形式参考Global Tensor的官网文档。
import oneflow as flowimport oneflow.nn as nnP0 = flow.placement("cpu", ranks=[0, 1])a0_sbp = flow.sbp.split(0)class ModuleMyLinear(nn.Module): def __init__(self, in_features, out_features): super().__init__() self.weight = nn.Parameter(flow.randn(in_features, out_features, placement=P0, sbp=flow.sbp.broadcast)) self.bias = nn.Parameter(flow.randn(1, out_features, placement=P0, sbp=flow.sbp.broadcast)) def forward(self, input): return flow.matmul(input, self.weight) + self.biaslinear_model = ModuleMyLinear(4, 3)class GraphMyLinear(nn.Graph): def __init__(self): super().__init__() # ModuleBlock self.model = linear_model def build(self, input): # ModuleBlock.__call__ return self.model(input)graph_mylinear = GraphMyLinear()input = flow.randn(5, 4, placement=P0, sbp=flow.sbp.split(1))out = graph_mylinear(input)print(out)
11 附录
11.1 断点
11.1.1 Python断点示例
# python3 -m pdb test.pybreak test.py:25break oneflow/nn/graph/graph.py:221break oneflow/nn/graph/graph.py:741break oneflow/nn/graph/graph.py:745break oneflow/nn/graph/graph.py:759break oneflow/nn/graph/graph.py:828break oneflow/nn/graph/graph.py:777break oneflow/nn/graph/graph.py:1066break oneflow/nn/graph/graph.py:1133break oneflow/framework/graph_build_util.py:227
11.1.2 C++断点示例
启动命令
source /mnt/oneflow/build/source.shgdb --args python3 /mnt/oneflow/test.py# set breakpoints# run
断点示例
set breakpoint pending onbreak oneflow::ActorMsg::BuildEordMsgbreak oneflow/core/common/buffer.h:80break oneflow::(anonymous namespace)::CheckAndConstructOpbreak oneflow::WaitAndSendIdsActor::Actbreak oneflow::WaitAndSendIdsActor::HandlerWaitToStartbreak oneflow/core/lazy/actor/light_actor.cpp:452break oneflow/core/lazy/actor/light_actor.cpp:485break oneflow::ForeignInputKernel::ForwardDataContentbreak oneflow::vm::LaunchLazyJobInstructionType::Compute
11.2 动态图的json示意
- forward
- full
- compiled
- plan
11.3 actor type
naive_actor
System-AutoTick-AppendDeviceTick_9System-AutoTick-DstSubsetTick_12System-AutoTick-DstSubsetTick_21System-AutoTick-DstSubsetTick_27System-AutoTick-Prepend-DeviceTick_7System-AutoTick-SrcSubsetTick_20System-AutoTick-SrcSubsetTick_26System-AutoTick-SrcSubsetTick_8System-AutoTick-Tick_11System-AutoTick-Tick_13System-EagerCriticalSection-Callback-23System-EagerCriticalSection-Callback-29System-EagerCriticalSection-Interface-Begin-Tick-18System-EagerCriticalSection-Interface-Begin-Tick-24System-EagerCriticalSection-Interface-End-Tick-19System-EagerCriticalSection-Interface-End-Tick-25System-EagerCriticalSection-Wait-22System-EagerCriticalSection-Wait-28
light_actor
_GraphMyLinear_0_input.0.0_2_GraphMyLinear_0_output.0.0_2model.biasmodel-broadcast_add-1model-matmul-0model.weightSystem-AutoTick-SinkTick_15System-SyncAllRanksSinkTick_14
wait_and_send_ids_actor
System-Src-WaitAndSendIds_16
call_back_notify_actor
System-Sink-CallbackNotify_17
12 参考资料
- oneflow v0.8.0
- OneFlow框架的零碎设计(上篇)
- OneFlow框架的零碎设计(中篇)
- OneFlow框架的零碎设计(下篇)
- 一个Job在OneFlow中的执行过程—上篇
- 一个Job在OneFlow中的执行过程—中篇
- 一个Job在OneFlow中的执行过程—下篇
- 动态图模块 nn.Graph
- OneFlow零碎设计
- torch.nn.Module