作者|郑建华
更新|许啸宇、张文骁、成诚
OneFlow动态图的训练效率远高于动态图(eager模式)。本文试图通过一个简略例子,联合v0.8.0版本的代码,解读一下动态图和运行时的实现机制。
在开始之前,倡议先读一下参考资料中《OneFlow框架的零碎设计(https://zhuanlan.zhihu.com/p/...)》等系列文章。对动态图、运行时的基本概念和设计理念有根本的理解,会更容易了解代码。
1
代码示例
上面的示例代码来自官网文档(https://docs.oneflow.org/mast...),是一个线性模型的前向计算。后续次要基于这段代码进行剖析。
import oneflow as flowimport oneflow.nn as nnclass ModuleMyLinear(nn.Module): def __init__(self, in_features, out_features): super().__init__() self.weight = nn.Parameter(flow.randn(in_features, out_features)) self.bias = nn.Parameter(flow.randn(out_features)) def forward(self, input): return flow.matmul(input, self.weight) + self.biaslinear_model = ModuleMyLinear(4, 3)class GraphMyLinear(nn.Graph): def __init__(self): super().__init__() # ModuleBlock self.model = linear_model def build(self, input): # ModuleBlock.__call__ return self.model(input)graph_mylinear = GraphMyLinear()input = flow.randn(1, 4)out = graph_mylinear(input)print(out)
2
oneflow包的初始化
import oneflow在初始化包(https://github.com/Oneflow-In...)时,与动态图相干的次要操作如下:
GetEnv(https://github.com/Oneflow-In...)
EnvGlobalObjectsScope::Init(https://github.com/Oneflow-In...)
- 启动各个节点的管制面(https://github.com/Oneflow-In...)网络连接
- 初始化VM(https://github.com/Oneflow-In...)
- 启动各个节点的数据面网络连接(https://github.com/Oneflow-In...)
- 初始化KernelObserver(https://github.com/Oneflow-In...)
NewDefaultSession(https://github.com/Oneflow-In...)
- RegsiterSession(https://github.com/Oneflow-In...) 创立 Session,并注册为 default session(https://github.com/Oneflow-In...)
创立 Python MultiClientSession 并保留到dict(https://github.com/Oneflow-In...),但并不 TryInit
- 创立 C++ MultiClientSessionContext(https://github.com/Oneflow-In...) 但并不 TryInit
EnvGlobalObjectsScope::Init中先创立一个全局的ProcessCtx(https://github.com/Oneflow-In...)对象。而后依据环境变量等配置,在各个过程间创立gRPC和CommNet的连贯,别离负责管制面和数据面的数据传输。其中在Bootstrap过程中会初始化全局的ProcessCtx(https://github.com/Oneflow-In...),给每个过程调配一个全局惟一的rank编号(https://github.com/Oneflow-In...)(machine_id(https://github.com/Oneflow-In...))。
本文不波及网络层面的操作,只探讨同一过程内各线程间的交互。
3
Module类
尽管能够间接用op和tensor结构模型,然而op的粒度太细了,间接用op结构模型会比拟繁琐。
Module(https://github.com/Oneflow-In...)是由op和tensor形成的、可复用的子模块。利用Module能够更高效、更快捷的构建简单模型。oneflow.nn(https://github.com/Oneflow-In...)模块导出了很多预约义的Module。
Module定义了本人的属性设置逻辑(https://github.com/Oneflow-In...),外围逻辑是
- 如果value是Parameter类型,就保留到Module._parameters中
- 如果value是Module类型,就保留到Module._modules中
- 如果value是Tensor类型,就保留到Module._buffers中
- 否则按惯例属性解决
Module能够蕴含子Module,造成树结构。因为Module通过setattr将子Module和Parameter都保留到字典构造中,能够不便的遍历所有Module及其参数tensor。
4
Graph类
4.1 构造函数
Graph的构造函数中GetDefaultSession(https://github.com/Oneflow-In...)失去的session,就是导入oneflow包时NewDefaultSession(https://github.com/Oneflow-In...)构建的session。过后没有初始化,而是在Graph结构时进行初始化(https://github.com/Oneflow-In...)。对应的C++函数是MultiClientSessionContext::TryInit(https://github.com/Oneflow-In...),执行时会创立各种全局的资源管理器,比方:
- LazyJobBuildAndInferCtxMgr
- BufferMgr
- RegstMgr
- ActorMsgBus
- ThreadMgr
4.2 __setattr__: 将Module和Tensor封装为Block
Graph.__setattr__ 反对通过设置属性的形式把一个 Module 增加到 Graph 中,之后改 Module 就能够被 Graph 调用了。增加到 Graph 中的 Module,会被包装到 Block 外面,Block 起到了代理执行的作用,它会给原 Eager 下的 Module 扩大出动态执行须要的一些非凡性能。
增加到 Graph 中的 Module 和原 Module 共享了状态(Parameter、Buffer)和 forward 执行逻辑。共享 forward 执行逻辑使得动态和动静执行计算逻辑雷同。共享状态则能够使动态图下的模型状态被动态图复用。基于此,两个 Graph,一个用于训练,一个用于预测,他们都复用对立模型 Module,这样训练和预测 Graph 也就实现了模型共享。
setattr最重要的动作就是对_add_block的调用(https://github.com/Oneflow-In...),_add_block中次要是调用get_block_cls并保留后果(https://github.com/Oneflow-In...)。get_block_cls(https://github.com/Oneflow-In...)的作用是将Module及其所有Tensor属性都转为对应的Block对象。为什么要做这个动作呢?次要是动态图编译须要借助Block类型来实现代理执行的性能,这些性能不适宜间接写到 eager 下的 Module 和 Tensor 上。
这个转换是在ModuleBlock结构时调用set_origin(https://github.com/Oneflow-In...)实现的。对于子Module,会递归调用get_block_cls函数(https://github.com/Oneflow-In...),这样所有子Module及其Tensor属性都会被转换为对应的Block对象。
所以,上述示例代码中,GraphMyLinear理论存储的是ModuleBlock,Graph.build执行时获取的model属性也是ModuleBlock对象,ModuleBlock.origin才是ModuleMyLinear。
Graph.__setattr__不容许将Tensor对象设置为属性(https://github.com/Oneflow-In...)。Tensor只能存到Module中,因为 Module 是做状态共享的根本单位,而 Graph 是不容许复用的。
4.3 针对不同工作,定义不同的计算图
依据Oneflow Model Zoo的模型示例(https://github.com/Oneflow-In...),train/eval等阶段能够创立不同的Graph子类。动态图下提供了 Module、Optimizer、Dataloader等模块,这些模型都能够被增加到 Graph 中。不同的组合能够构建不同类型的工作。
在这些不同阶段,Graph构造函数的行为、build函数的输入输出都有各自特点。理解这些,看后续代码时会更容易了解各个参数的具体含意。
构造函数
- train阶段,须要增加Module、损失函数、优化器和dataloader
- eval阶段,只须要增加Module和dataloader
build函数
train
- 导入样本和label
- 调用Module失去前向计算结果
- 计算损失
- 计算梯度
- 返回loss
eval
- 导入样本和label
- 调用Module失去预估后果
- 返回预估后果和label
4.4 小结
上述几个类型的关系如下:
上面形容了GraphMyLinear的结构流程
* `__init__` * `Graph.__init__` * self.model = linear_model * `Graph.__setattr__` * _add_block * get_block_cls: 递归地把Module转为ModuleBlock * `ModuleBlock.__init__` * ModuleBlock.set_origin * `ModuleBlock._origin = origin` (Module) * 对origin的sub modules, parameters, buffers递归调用get_block_cls * `ModuleBlock.__setattr__`
5
逻辑图的编译
计算机语言的编译,是将高级语言的语句编译为汇编或机器指令。深度学习框架对计算工作的编译,是将用户的特定语句操作转换为DAG图。oneflow中用Job(https://github.com/Oneflow-In...)形容逻辑的计算图。
不同于eager模式的动态图,动态图在开始执行前能够失去整个计算工作的所有信息,能够对DAG进行多轮优化。每轮优化都是输出一个Job、失去一个新Job。
最初,依据分布式环境配置,将逻辑图Job转换为物理执行的计算图Plan(https://github.com/Oneflow-In...)。在物理图中,一个op可能散布在多个节点/过程。
启动DAG计算须要调用Graph.__call__,这个函数的执行次要分以下几个步骤:
call
_compile(https://github.com/Oneflow-In...) if not _is_compiled
build_graph(https://github.com/Oneflow-In...)
- __build_graph(https://github.com/Oneflow-In...)
- finish_complie_and_init_runtime(https://github.com/Oneflow-In...)
- __run(https://github.com/Oneflow-In...)
逻辑图编译次要在__build_graph中进行。finish_complie_and_init_runtime会持续做一些优化pass,而后构建物理图、初始化运行时Actor零碎。__run会启动一次DAG的运算。
5.1 graph_build_context: 为逻辑图编译设置根本环境
在 Graph 中,build 函数外面的代码执行都在 graph_build_context 的作用域下,这样实现了动静转动态的性能。
__build_graph中的graph_build_context(https://github.com/Oneflow-In...)尽管只有一行代码,但却做了几件十分重要的事件。
首先在context作用域内设置全局的lazy_mode为True(https://github.com/Oneflow-In...)。在这个context作用域内,所有op都由LazyInterpreter解释执行。
其次,在JobBuildAndInferCtx(https://github.com/Oneflow-In...)作用域内,JobBuildAndInferCtx_Open(https://github.com/Oneflow-In...)调用相似如下C++代码
// oneflow/api/python/job_build/job_build_and_infer.h// oneflow/core/job/job_build_and_infer_ctx_mgr.cpp// 如前所述,LazyJobBuildAndInferCtxMgr 在 MultiClientSessionContext::TryInit 执行时初始化。// LazyJobBuildAndInferCtxMgr mgr;mgr.OpenJobBuildAndInferCtx(job_name);
OpenJobBuildAndInferCtx会新建一个Job对象(https://github.com/Oneflow-In...)、一个LazyJobBuildAndInferCtx对象(https://github.com/Oneflow-In...)。LazyJobBuildAndInferCtx负责依据用户定制的op等操作,批改Job,其中最次要的性能是增加新 Op。
5.2 __build_io:为计算图增加input和output Op
self.__build_io("input", graph_build_util.build_graph_input_arg, *args, **kwargs)
下面这行代码(https://github.com/Oneflow-In...)的作用是,对于用户传递给graph_mylinear(input)的input参数,针对其中的每个tensor都在逻辑计算图中插入一个FeedInputOp(https://github.com/Oneflow-In...)节点。也就是说,model的输出(比方样本tensor,具体参考4.3节),在动态图中也视为一个op操作。
__build_io内会用args(即input)和kwargs结构一个ArgsTree。ArgsTree 把 Python 下的输出、输入形象成了一个树,输出、输入能够是嵌套的 Tuple、List、Dict,元素是 Tensor,嵌套的构造刚好能够示意为树,而 Tensor 是树中的叶子节点。示例代码中kwargs是空的。
遍历ArgsTree,对args和kwargs的每个tensor都调用传入的build_func,对于input来说,就是build_graph_input_arg(https://github.com/Oneflow-In...)。前面会看到,model的output也会调用__build_io,所以这个函数名的意思应该就是对model的输出、输入进行动态图的构图工作。
build_graph_input_arg外部会结构一个FeedInputOpExpr(https://github.com/Oneflow-In...),提交给解释器执行。因为是在lazy作用域内,由LazyInterpreter解释执行(https://github.com/Oneflow-In...),LazyInterpreter会将对应的op插入动态图。
附:build input时ArgsTree的内部结构
5.2.1 将op增加到逻辑图
LazyInterpreter::ApplyImpl(https://github.com/Oneflow-In...)在执行时,GetCurInferCtx()(https://github.com/Oneflow-In...)返回的就是graph_build_context中OpenJobBuildAndInferCtx(https://github.com/Oneflow-In...)创立的那个LazyJobBuildAndInferCtx对象,这个对象负责逻辑图的构建。增加op的次要调用流程如下:
- infer_ctx->AddAndInferConsistentOp(https://github.com/Oneflow-In...)
- AddAndInferOp(https://github.com/Oneflow-In...)
- ConstructOp(https://github.com/Oneflow-In...)
- CheckAndConstructOp(https://github.com/Oneflow-In...)
- NewObj(https://github.com/Oneflow-In...)
OperatorConf中,多种op配置共享op_type字段(https://github.com/Oneflow-In...),protobuf oneof的op_type_case常量作为注册NewObj的key。
零碎预约义的op在oneflow/core/operator(https://github.com/Oneflow-In...)下,例如UserOp(https://github.com/Oneflow-In...)。
AddAndInferOp将返回的Operator保留到LazyJobBuildAndInferCtx的字典中。后续的函数调用,次要是进行推导并批改动态图Job,使得各个节点形成一个DAG。
JobBuildAndInferCtx相干的类关系如下:
5.2.2 lazy tensor 和 eager tensor 的区别
LazyInterpreter::ApplyImpl的最初,会调用BuildTensor(https://github.com/Oneflow-In...)结构一个lazy tensor,作为build_graph_input_arg的返回值(https://github.com/Oneflow-In...)。所以__build_io返回的lazy_args(https://github.com/Oneflow-In...)是lazy tensor,它将代替eager的args(https://github.com/Oneflow-In...)(也就是用户输出的input)参加后续的计算图构建。
那么lazy tensor和eager tensor的区别是什么呢?eager tensor是要即时计算的,所以须要实在数据;而lazy tensor仅在动态图编译阶段用于推导,只须要形容性质的元信息。动态图编译是在lazy模式下运行,只是应用lazy tensor 做计算机构图和校验。
前面会看到,动态图的运行期曾经没有tensor的概念。运行期看到的只是更狭义的Regst存储,可能代表tensor/blob,也可能是其它管制信息。动态图运行时的输出,是间接读取内部 eager tensor的内存数据到到regst;输入应该是op写到regst,通过blob结构eager tensor。
5.3 build: 将UserOp和FeedVariableOp增加到逻辑图
__build_graph中的self.build()(https://github.com/Oneflow-In...)会调用GraphMyLinear.build(),以及ModuleMyLinear.forward()。因为是在lazy模式下运行,matmul和add都会调用UserOpExpr重载版本的LazyInterpreter::ApplyImpl(https://github.com/Oneflow-In...),进而调用AddAndInferConsistentOp(https://github.com/Oneflow-In...)进行构图操作。
须要阐明的是,在援用Module的Parameter属性时(如weight/bias),会触发FeedVariableOp的构图操作(https://github.com/Oneflow-In...)、调用对应版本的LazyInterpreter::ApplyImpl(https://github.com/Oneflow-In...)。这个是怎么执行的呢?
__build_graph中,在进入lazy模式之前,先调用了_create_states_builder(https://github.com/Oneflow-In...)。其中self._state()(https://github.com/Oneflow-In...)返回所有Module的所有Parameter(包含子Module)。
state_block的类型是TensorBlock(https://github.com/Oneflow-In...)。所有的state_block的lazy_origin_builder().method(https://github.com/Oneflow-In...)都被设置为调用build_graph_state(https://github.com/Oneflow-In...)。
给build_graph_state(https://github.com/Oneflow-In...)设置个断点能让整个调用过程显形,次要的调用栈如下:
-> out = graph_mylinear(input) /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(221)__call__()-> self._compile(*args, **kwargs) /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(741)_compile()-> _, eager_outputs = self.build_graph(*args, **kwargs) /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(759)build_graph()-> outputs = self.__build_graph(*args, **kwargs) /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(864)__build_graph()-> outputs = self.build(*lazy_args, **lazy_kwargs) /mnt/project/machine-learning/oneflow/oneflow/test.py(21)build()-> return self.model(input) /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(234)__call__()-> result = self.__block_forward(*args, **kwargs) /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(266)__block_forward()-> result = self._origin.__class__.forward(self, *args, **kwargs) /mnt/project/machine-learning/oneflow/oneflow/test.py(11)forward()-> return flow.matmul(input, self.weight) + self.bias /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(483)__getattr__()-> p_state = self._get_from_states(name, "_parameters") /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(521)_get_from_states()-> _s_block.try_build() /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(679)try_build()-> self._lazy_origin_builder.try_build(self) /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(627)try_build()-> self.result = self.method()> /usr/local/lib64/python3.6/site-packages/oneflow/framework/graph_build_util.py(227)build_graph_state()-> op_name, var_conf_str, ["in_0"], ["out_0"]
这个调用过程比拟容易困扰的是,执行对象会在Grpah、GraphMyLinear、ModuleMyLinear、ModuleBlock之间切换。
后面在探讨Graph的结构时曾经提过,执行self.model(input)时,Graph.__getattr__返回的属性model是ModuleBlock对象,所以理论调用的是ModuleBlock.__call__。
在这个函数内调用__block_forward(https://github.com/Oneflow-In...),其中的_origin(https://github.com/Oneflow-In...)是ModuleMyLinear,进入到它的forward办法,执行到flow.matmul(input, self.weight) + self.bias时,matmul 会被LazyOpInterpreter 所执行,在 LazyOpInterpreter 中调用 AddAndInferConsistentOp(https://github.com/Oneflow-In...)
,在 Job 中增加一个 matmul operator。同理前面的加法会在 job 中增加一个 add operator。
self.weight 和 self.bias 会触发调用ModuleBlock.__getattr__,进而调用_get_from_states(https://github.com/Oneflow-In...),调用TensorBlock.try_build()(https://github.com/Oneflow-In...)。这里执行的就是进入lazy模式之前设置的build_graph_state(https://github.com/Oneflow-In...)。从而减少一个FeedVariableOp到计算图(https://github.com/Oneflow-In...)。为什么设置和调用会间隔这么远呢?次要是为了让参数尽量和生产参数的 Operator 在一个作用域下,所以实现成了惰性求值来达到提早计算的目标。
再前面的步骤就是调用__build_io(https://github.com/Oneflow-In...)插入FetchOutputOp(https://github.com/Oneflow-In...)。也就是说,获取model的output也是一个op。
到目前为止,前向计算图就构建实现了。它的json示意能够参考附录。net.op是计算图的节点,通过input等属性能够看出节点之间的连贯关系。
示例代码的前向计算图如下。从这个图能够看到,input、output、weights等都是op。
5.4 逻辑图优化
在__build_graph中会调用CurJobBuildAndInferCtx_Complete对动态图进行多轮优化(https://github.com/Oneflow-In...),对应的C++函数是LazyJobBuildAndInferCtx::Complete()(https://github.com/Oneflow-In...)。
这之后生成的Job是full_job。本文的示例代码比较简单,并不是典型的计算场景,其forwar和ful计算图的拓扑是一样的。理论大部的图优化都实现在这个阶段,如 Op fusion、AMP、ZeRO、常量折叠等等。
到这里,逻辑图构建的主体局部就完结了。
随后会构建一个CNNGraph对象(https://github.com/Oneflow-In...),对应的C++类型是NNGraph(https://github.com/Oneflow-In...)。这个对象将负责构建物理计算图Plan。它也是整个运行时的拥有者和维护者。这个对象析构时,整个运行时也会有序终止并开释资源。
5.5 物理图的编译
接下来就是执行finish_complie_and_init_runtime(https://github.com/Oneflow-In...),其中的外围调用是self._c_nn_graph.complie_and_init_runtime()(https://github.com/Oneflow-In...),对应的C++函数是NNGraph::CompileAndInitRuntime(https://github.com/Oneflow-In...)。
在这个函数中,JobCompleter().Complete()(https://github.com/Oneflow-In...)会持续对逻辑图做几轮批改优化,补全 Runtime 执行所须要的附加信息,Compiler().Compile()(https://github.com/Oneflow-In...)将逻辑图转为分设施的物理图,并持续对Plan进行批改优化。
Plan的编译是在master节点进行的(https://github.com/Oneflow-In...)。master节点会将Plan通过gRPC推送给各个worker节点(https://github.com/Oneflow-In...),worker节点从master拉取物理计算图(https://github.com/Oneflow-In...)。
之后调用NewRuntimeBuffers创立Buffer对象(https://github.com/Oneflow-In...),Buffer应该是次要用于过程内的信息同步。
而后就筹备初始化运行时了。
示例代码生成的compiled_job和物理图Plan的json参见附录。
最终生成的compiled逻辑图如下。框架主动插入了很多系统控制节点。
5.6 Plan的构造
示例代码输入的Plan json数据见附录。
Plan在逻辑上和compiled_job是等价的。这里次要关注task/op之间的关系。
Plan.task中的每个元素是一个task,其中的exec_sequence.exec_node对应job中的op,通常只有一个op(数组能够反对sub graph)。
exec_node.kernel_conf.op_attribute形容了op信息。其中op_conf蕴含op name信息。
kernel_conf.op_attribute.op_conf就是Job中的OperatorConf。
kernel_conf.op_attribute.arg_signature.bn_in_op2lbi体现了task/op之间的连贯关系。
bn_in_op就是blob name in op,即op输出的blob name。
以System-AutoTick-DstSubsetTick_21为例
{ "out": { "op_name": "System-AutoTick-DstSubsetTick_21", "blob_name": "out" }, "in_0": { "op_name": "System-EagerCriticalSection-Interface-End-Tick-19", "blob_name": "out" }, "in_1": { "op_name": "System-AutoTick-SrcSubsetTick_20", "blob_name": "out" }}
exec_node.bn_in_op2regst_desc_id在task层面体现了连贯关系。这个map中的key示意输入输出,value是register id。
{"out": "29","in_0": "27","in_1": "28"}
task.produced_regst_desc形容了对应task生产的register,consumer_task_id是消费者,
produced_regst_desc.out.regst_desc_type.data_regst_desc.lbi2blob_desc.lbi就是这个register的logic blob id。
task.consumed_regst_desc_id形容了对应task生产的register信息
6
运行时的初始化
NNGraph::CompileAndInitRuntime中,new Runtime这行代码会初始化运行时(https://github.com/Oneflow-In...)。次要做的事件包含:
- 创立Thread
- 告诉Thread创立Actor,Actor会创立Regst和Kernel
- 给没有输出的source_tasks发送启动信号kStart
6.1 Runtime创立Thread
在Runtime的构造函数中,DumpThreadIdsFromPlan(https://github.com/Oneflow-In...)会将Plan中属于以后过程的task的thread id存入thread_ids_变量。AddThreads创立这些Thread对象(https://github.com/Oneflow-In...)。
Thread在结构时会创立一个物理线程( https://github.com/Oneflow-In...),线程执行的是PollMsgChannel办法(https://github.com/Oneflow-In...),Thread就是在这里继续期待须要解决的新音讯。
Thread只解决两类命令音讯:线程终止音讯,创立Actor的音讯。其它音讯交给Actor::ProcessMsg解决(https://github.com/Oneflow-In...)。
6.2 Runtime告诉Thread创立Actor
在Runtime的构造函数中,tasks被分为两类:source_tasks和other_tasks。在示例代码中,source_tasks(https://github.com/Oneflow-In...)是没有输出边的task。
从代码逻辑看,在Plan proto中,task的consumed_regst_desc_id字段是一个map。如果这个map的所有key都是in_ctrl(https://github.com/Oneflow-In...),这个task就是source_tasks。
一些source_tasks的示例如下:
- System-Src-WaitAndSendIds_16
- System-AutoTick-AppendDeviceTick_9
- System-EagerCriticalSection-Interface-End-Tick-19
- System-EagerCriticalSection-Interface-End-Tick-25
Runtime调用HandoutTasks函数(https://github.com/Oneflow-In...)会给ActorMsgBus发送构建Actor的kConstructActor音讯(https://github.com/Oneflow-In...)。
6.3 ActorMsgBus和Thread的音讯解决
从接口看,ActorMsgBus (https://github.com/Oneflow-In...)负责音讯的发送(Actor通过ActorMsgBus发送音讯),Thread::PollMsgChannel(https://github.com/Oneflow-In...) 负责音讯的接管和解决。
相干实体的协作关系如下
Actor是自调度的根本单元,承受音讯而后工作,工作完后再持续发送音讯。
- actor_id就是task_id,是在编译Plan时就确定的。task是编译时概念,actor是对等的运行时概念。
- task_id有特定的编码格局(https://github.com/Oneflow-In...),从中能够解析出machine_id(https://github.com/Oneflow-In...)和thread_id(https://github.com/Oneflow-In...)。
- 在跨网络的整个物理图Plan中,actor id相当于地址,通过它能够定位惟一的actor实体。
Actor 通过 ActorMsgBus::SendMsg(https://github.com/Oneflow-In...) 发送 ActorMsg(https://github.com/Oneflow-In...) 音讯。
- ActorMsg蕴含源和目标actor id(https://github.com/Oneflow-In...)。
- 如果是过程内通信(https://github.com/Oneflow-In...),将通过 ActorMsgBus::SendMsgWithoutCommNet (https://github.com/Oneflow-In...)把 ActorMsg 朝目标 actor 所在的 thread 入队音讯(https://github.com/Oneflow-In...)。
- Thread::EnqueueActorMsg 会判断以后 thread 是否是 actor thread,如果是则入本地队列,否则则入 actor thead 的 channel 队列。
- 如果ActorMsg是跨过程音讯,ActorMsgBus通过CommNet发送音讯(https://github.com/Oneflow-In...),接管方的CommNet应该会依据actor id取得线程id,从ThreadMgr查到Thread,将音讯交给Thread解决。
Thread::PollMsgChannel(https://github.com/Oneflow-In...) 负责音讯的接管和解决。
- 如果线程本地队列local_msg_queue_为空,则从thread的channel队列中取出全副ActorMsg放入本地队列(https://github.com/Oneflow-In...)。
- 从本地队列中取出一个ActorMsg,而后开始解决。
- 解决一些非凡的kCmdMsg音讯(https://github.com/Oneflow-In...),而后一般音讯交给Actor自行处理(https://github.com/Oneflow-In...)。
- Actor收到音讯后,会判断是否满足了Act的条件,如果满足,则会执行Act,从而调用LaunchKernel执行计算,Act执行完结后通过ActorMsgBus发消息告诉上下游Actor。
这些对象之间的消息传递关系如下图所示
6.4 激活source Actor
目前的实现中,Actor全副是自调度的,只能承受来自其余Actor的音讯。Actor中有一类比拟非凡的source actors,它们与source tasks对应。
source actors 没有上游 actor,它们会朝上游actor发送音讯从而激活所有的Actor运行。
source actors 自身是如何执行的呢?它们在承受到 kStart 音讯后就会始终 Act 直到进入退出流程。然而其 kernel 会阻塞在 Buffer(https://github.com/Oneflow-In...) 处,始终期待其余线程往 buffer 中增加数据后,阻塞会被激活,而后 kernel 执行读取,kernel 实现后,actor 的 Act 完结,往上游发送音讯。
source actors 因为会产生阻塞,所以其必须有独自的 actor thread。
Runtime 初始化的的最初一步就是朝各 source actors 发送 kStart 音讯用以激活它们,但 source actors 只有承受到 buffer 的数据后才会往下执行,而后朝上游 actors 发送音讯,使所有的 actors 都执行起来。
7
Actor
7.1 Actor的创立
Thread在创立Actor时,会先尝试创立为LightActor(https://github.com/Oneflow-In...),如果不胜利,再尝试用事后注册的工厂创立Actor。
有几种TaskType能够用于LightActor(https://github.com/Oneflow-In...):
- kNormalForward,比方matmul、add等user op。
- kCopyHd
- kTick
- kCollectiveBoxingGeneric
目前大概有20多种Actor的子类型。其它Actor类型依据TaskType(https://github.com/Oneflow-In...)事后注册。例如WaitAndSendIdsActor。
示例代码的各个节点对应的actor类型参见附录。
Actor相干的类关系如下(蕴含关系只是示意能够拜访到相干信息,并不意味着创立或着领有该类型对象)
7.2 Actor的初始化
Actor的构造函数个别都是空的,构建之后须要执行Init(https://github.com/Oneflow-In...)函数进行初始化。
LightActor继承自ActorBase,不是Actor的子类,有本人的Init函数实现。这里只探讨Actor的初始化。
在Actor::Init(https://github.com/Oneflow-In...)中,首先调用ConstructKernel(https://github.com/Oneflow-In...)创立kernel实例。和Operator相似,kernel也是以OpTypeCase作为注册的key,例如WaitAndSendIdsKernel(https://github.com/Oneflow-In...)。一个Actor通常只有一个kernel。
之后调用NewRegsts创立Regst(https://github.com/Oneflow-In...)。Tensor是用户侧的概念。对应的运行时概念是Regst(https://github.com/Oneflow-In...),它持有Kernel须要读写的内存。Regst的概念比Tensor更宽泛,比方框架主动增加的管制Op也会用到Regst。
Actor将本人创立的Regst保留到produced_regsts_(https://github.com/Oneflow-In...)。
TakeOverNaiveConsumed(https://github.com/Oneflow-In...)只记录须要生产的regst id,但并不push到consumed_regsts_。
TakeOverNaiveProduced(https://github.com/Oneflow-In...)既记录生产的regst id,也push到naive_produced_rs_(https://github.com/Oneflow-In...)。这种区别是为了首次执行计算时,actor能顺利执行。前面剖析Actor的音讯解决时会再回过头来讨论一下。
调用InitBnInOp2BlobInfo会初始化BlobInfo(https://github.com/Oneflow-In...)。
之后就是调用VirtualActorInit(https://github.com/Oneflow-In...),这里容许各个Actor子类定制本人的初始化逻辑。通常会调用OF_SET_MSG_HANDLER宏(https://github.com/Oneflow-In...)设置Actor的音讯处理函数。
7.3 Actor的音讯解决
LightActor 首先会依据音讯类型别离解决 kRegstMsg 和 kEordMsg 音讯。HandleRegstMsg(https://github.com/Oneflow-In...) 中依据 RegstMsg 的 type (kProduced 或 kComsumed) 来别离解决各种读写状态计数。
而后判断读写计数是否达到了判断条件,如果达到了意味着满足了读写 regst 的条件,而后就 执行 ActOnce(https://github.com/Oneflow-In...)。
LightActor::ActOnce 会在第一次执行时去 InitBnInOp2Blob 和 InitActMsg。InitBnInOp2Blob 初始化 resgt 中的 bn 与 Blob 的映射关系,为 kernel 提供通过 bn 拜访 Blob 的性能。InitActMsg 会初始化好所有须要发送的音讯防止后继发音讯时反复的构建音讯。
而后就是 LaunchKernel,接着会 ResetState 重置 regst 状态。
LaunchKernel 后就会把之前构建好的音讯发送进来,同步音讯会间接入队 thread 音讯队列,异步音讯通过 callback 发送到 ActorMsgBus。
一般 Actor::ProcessMsg 会调用 msg handler 来解决音讯,最常见的 msg handler 就是 Actor::HandlerNormal(https://github.com/Oneflow-In...)。
Actor::HandlerNormal 中流程跟 LightActor 中相似,会依据不同的 regst 类型来别离解决,Actor 中对 regst 的状态治理形式与 LightActor 不同,LightActor 中的形式更加高效,Actor 中能解决一些非凡状况。
音讯处理完毕后,就会调用 ActUntilFail,ActUntilFail 会判断 IsReadReady 和 IsWriteReady 来决定是否能够进行 Act。
最常见的 NaiveActor::Act() 就是执行 AsyncLaunchKernel。
Act 实现后,就开始朝上下游发送 regst 音讯。
还有一些非凡的 Actor,咱们以WaitAndSendIdsActor为例,察看一下这类Actor的音讯解决机制。
之所以抉择这个例子,一是这个Actor比较简单;二是这是一个典型的source task,想看一下计算图是怎么被触发启动计算的。
Thread收到的音讯如果不是kStopThread或kConstructActor,就调用Actor::ProcessMsg(https://github.com/Oneflow-In...),将音讯转给Actor解决。
ProcessMsg函数只是简略的将音讯转给handler解决(https://github.com/Oneflow-In...)。
WaitAndSendIdsActor::VirtualActorInit中,handler被设置为HandlerWaitToStart(https://github.com/Oneflow-In...)。
Runtime的构造函数中,发送的第一批音讯是给source_tasks的kStart音讯,这个音讯就由HandlerWaitToStart函数解决。
HandlerWaitToStart校验音讯类型后,将handler设置为HandlerNormal(https://github.com/Oneflow-In...)(这也是大部分Actor的默认handler),而后调用ProcessMsg(https://github.com/Oneflow-In...),理论就是调用新设置的handler HandlerNormal。
HandlerNormal中,如果是kCmdMsg,只容许是kStart(https://github.com/Oneflow-In...)。通过音讯类型校验后,会间接调用ActUntilFail(https://github.com/Oneflow-In...)。
7.4 Act执行的条件
LightActor 和 Actor 判断是否进行 Act 采纳了不同的策略,LightActor 的效率更高,Actor 能解决一些非凡状况。
对于 LightActor,当在读的register计数 total_reading_cnt_ 归 0,可生产的register计数 ready_consumed_ 减少到 max_ready_consumed_,前者示意所有的消费者曾经读取以后 LightActor 的 Regst,后者示意以后 LightActor 生产的所有 Regst 曾经达到(由上游发送的 Regst 音讯)。
对于 Actor,Actor::ActUntilFail中,Act办法(https://github.com/Oneflow-In...)是各个子类本人实现的,个别次要是启动kernel计算。
然而在执行Act之前,须要先确认:
- Act执行依赖的数据是否都曾经就绪?(IsReadReady)
- Act生产进去的数据,生产方是否曾经用完、并收到ack音讯确认?(IsWriteReady)
Actor有4个与此相关的成员变量
- RegstSlot naive_produced_rs_;
- RegstSlot inplace_produced_rs_;
- RegstSlot naive_consumed_rs_;
- RegstSlot inplace_consumed_rs_;
xx_produced_rs_存储的是以后Actor的上游consumer返回的、曾经应用结束的ack regst信息。(以后Actor生产的Regst存储在produced_regsts_中。)
运行时在初始化的过程中,所有Actor都没有运行过,任何Actor都不可能收到ack音讯,所以在Actor初始化时,要事后填充xx_produced_rs_,这样能力保障Actor在首次运行前是WriteReady的,能力顺利启动执行。
xx_consumed_rs_存储的是上游依赖发来的数据。它不须要事后填充。因为source_tasks没有输出依赖,天然就是ReadReady的;而xx_produced_rs_在初始化时的事后填充又保障它是WriteReady的,所以source_tasks能够间接运行。source_tasks的输入音讯发给上游,上游也会变为ReadReady,而上游在初始化后也保障是WriteReady的。整个Actor零碎就能够这样运行起来了。
7.5 Actor上下游之间的告诉机制
Act执行结束后,须要将后果数据发给上游consumer。以 WaitAndSendIds 的 Naive Produced 为例,ActUntilFail中的调用流程如下:
AsyncSendNaiveProducedRegstMsgToConsumer(https://github.com/Oneflow-In...)
VirtualAsyncSendNaiveProducedRegstMsgToConsumer(https://github.com/Oneflow-In...)
HandleProducedNaiveDataRegstToConsumer(https://github.com/Oneflow-In...)
HandleRegstToConsumer(https://github.com/Oneflow-In...)
EnqueueAsyncMsg(https://github.com/Oneflow-In...)
- 如果指标线程是以后线程,ActorMsgBus::SendMsg(https://github.com/Oneflow-In...)
- 否则,将音讯退出async_msg_queue_(https://github.com/Oneflow-In...)
- 减少 total_reading_cnt_(https://github.com/Oneflow-In...)(这个变量示意曾经发消息给上游、但未收到的ack数量)
- naive_produced_rs_.PopFrontRegsts(https://github.com/Oneflow-In...)
- AsyncSendProducedCtrlRegstMsgToConsumer
留神naive_produced_rs_.PopFrontRegsts(https://github.com/Oneflow-In...)会将Regst指针从队列中删掉,相应的可用(https://github.com/Oneflow-In...)register计数减1(https://github.com/Oneflow-In...)。
而在Actor::HandlerNormal中解决收到的kRegstMsg音讯(https://github.com/Oneflow-In...)时,如果是consumer发来的ack音讯,会调用TryUpdtStateAsProducedRegst(https://github.com/Oneflow-In...),将Regst再增加到 naive_produced_rs_ 中(https://github.com/Oneflow-In...),以保障以后Actor在收到所有ack后是WriteReady的;同时递加在读的 register 计数total_reading_cnt_。
Actor对依赖的上游音讯的解决是相似的。通过以下函数调用给上游发送ack音讯、告诉 register 曾经用完,能够持续更新了:
- AsyncSendNaiveConsumedRegstMsgToProducer(https://github.com/Oneflow-In...)
- AsyncRetInplaceConsumedRegstIfNoConsumer(https://github.com/Oneflow-In...)在Actor::HandlerNormal中收到kRegstMsg音讯后,将音讯增加到consumed_rs_(https://github.com/Oneflow-In...),以保障以后Actor在收到所有依赖数据后是ReadReady的。
LightActor有本人的音讯解决机制(https://github.com/Oneflow-In...),大抵原理应该是差不多的。
7.6 Act执行的动作
根据上述探讨,Actor收到kRegstMsg后也会进入ActUntilFail执行。如果读写都是Ready,就执行Act(https://github.com/Oneflow-In...)。以WaitAndSendIdsActor为例,次要调用链路如下:
- AsyncLaunchKernel(https://github.com/Oneflow-In...)
- ek.kernel->Launch(https://github.com/Oneflow-In...),启动Kernel计算
- Forward(https://github.com/Oneflow-In...)
- ForwardDataContent(https://github.com/Oneflow-In...)
- buffer->Pull(https://github.com/Oneflow-In...)
- 给regst的存储地址mut_dptr赋值(https://github.com/Oneflow-In...)
buffer->Pull会期待条件变量的告诉(https://github.com/Oneflow-In...)。当初,看上去所有Actor都已准备就绪,只等发令枪一响就开跑了。
8
启动动态图的计算
Graph.__run(https://github.com/Oneflow-In...)会扣动发令枪的板机,启动计算图的一轮计算。
次要调用流程如下:
- RunLazyNNGraph(https://github.com/Oneflow-In...)
- builder->LaunchLazyJob(https://github.com/Oneflow-In...)
- LaunchLazyJobInstructionType(https://github.com/Oneflow-In...)
- Buffer::Push(https://github.com/Oneflow-In...)
这里的Buffer::Push就是WaitAndSendIdsKernel在期待的起跑信号。
9
运行时的退出机制
整个运行时蕴含很多对象和资源,平安有序的退出是庞杂而又粗疏的工作。这里仅以WaitAndSendIds为例,从一个侧面察看一下运行时的退出机制。
运行时的退出始于NNGraph对象的析构(https://github.com/Oneflow-In...)。
9.1 Actor的退出
- NNGraph在析构时,会敞开所有的Buffer对象(https://github.com/Oneflow-In...)。
Buffer在敞开时,会设置is_closed_ = true并告诉所有监听者(https://github.com/Oneflow-In...)。然而Pull会持续解决完曾经提交的计算。
- 所以,Buffer应该是次要用于过程内的通信和异步协调的一个类。
- WaitAndSendIdsKernel这时候正在期待新一轮计算开始(https://github.com/Oneflow-In...),后果收到Pull返回的kBufferStatusErrorClosed(https://github.com/Oneflow-In...)。
WaitAndSendIdsActor::IsCustomizedReadReady当前就始终返回false(https://github.com/Oneflow-In...),IsReadReady也返回false(https://github.com/Oneflow-In...)。
- 这之后,ActUntilFail只会执行异步音讯发送(https://github.com/Oneflow-In...)(不再进入while循环)
- WaitAndSendIdsActor::HandlerNormal依然会解决其它Actor发来的音讯(https://github.com/Oneflow-In...)。但因为IsCustomizedReadReady返回false,会进入AsyncSendEORDMsgForAllProducedRegstDesc(https://github.com/Oneflow-In...)执行。它会给每个上游发送kEordMsg音讯(https://github.com/Oneflow-In...)。
Actor在收到上游发来的kEordMsg音讯后,递加remaining_eord_cnt_(https://github.com/Oneflow-In...)。
- remaining_eord_cnt_被初始化为Actor的输出regst的数量(https://github.com/Oneflow-In...)。
total_reading_cnt_是以后Actor生产的、曾经发给consumer、但尚未收到ack的音讯数量。
- Actor目前仍能够失常接管consumer发来的ack音讯。
当上述2个变量都为0时(https://github.com/Oneflow-In...),意味着所有上游都收回了kEordMsg音讯,也收到了所有上游的ack音讯。Actor就给Thread返回1(https://github.com/Oneflow-In...)。
- 如果上述两个变量有不为0的,就批改handler,由HandlerZombie(https://github.com/Oneflow-In...)解决后续收到的音讯。
- Thread收到Actor返回的1后(https://github.com/Oneflow-In...),将它从本人的存储中删除(https://github.com/Oneflow-In...),并递加运行Actor的数量。
9.2 Thread的退出
- NNGraph重置runtime_导致运行时对象被析构(https://github.com/Oneflow-In...)。
- Runtime删除所有Thread(https://github.com/Oneflow-In...)。
- ThreadMgr给所有Thread发送kStopThread音讯(https://github.com/Oneflow-In...)。同时,重置指针导致Thread析构(https://github.com/Oneflow-In...)。
- Thread的物理线程退出PollMsgChannel循环(https://github.com/Oneflow-In...)。
- Thread期待物理线程完结,敞开channel(https://github.com/Oneflow-In...)。
10
分布式场景的动态图
分布式的compile_job、物理图Plan和单机场景有显著变动。
比方,每个过程都有一套WaitAndSendIds等管制节点。这也容易了解,因为每个节点都要执行__run和Buffer::Push/Pull,都要启动本过程的Actors执行计算。
matmul和broadcast_add等user op也会在两个节点进行计算。
10.1 示例代码
启动形式参考Global Tensor的官网文档。
import oneflow as flowimport oneflow.nn as nnP0 = flow.placement("cpu", ranks=[0, 1])a0_sbp = flow.sbp.split(0)class ModuleMyLinear(nn.Module): def __init__(self, in_features, out_features): super().__init__() self.weight = nn.Parameter(flow.randn(in_features, out_features, placement=P0, sbp=flow.sbp.broadcast)) self.bias = nn.Parameter(flow.randn(1, out_features, placement=P0, sbp=flow.sbp.broadcast)) def forward(self, input): return flow.matmul(input, self.weight) + self.biaslinear_model = ModuleMyLinear(4, 3)class GraphMyLinear(nn.Graph): def __init__(self): super().__init__() # ModuleBlock self.model = linear_model def build(self, input): # ModuleBlock.__call__ return self.model(input)graph_mylinear = GraphMyLinear()input = flow.randn(5, 4, placement=P0, sbp=flow.sbp.split(1))out = graph_mylinear(input)print(out)
11
附录
11.1 断点
11.1.1 Python断点示例
# python3 -m pdb test.pybreak test.py:25break oneflow/nn/graph/graph.py:221break oneflow/nn/graph/graph.py:741break oneflow/nn/graph/graph.py:745break oneflow/nn/graph/graph.py:759break oneflow/nn/graph/graph.py:828break oneflow/nn/graph/graph.py:777break oneflow/nn/graph/graph.py:1066break oneflow/nn/graph/graph.py:1133break oneflow/framework/graph_build_util.py:227
11.1.2 C++断点示例
启动命令
source /mnt/oneflow/build/source.shgdb --args python3 /mnt/oneflow/test.py# set breakpoints# run
断点示例
set breakpoint pending onbreak oneflow::ActorMsg::BuildEordMsgbreak oneflow/core/common/buffer.h:80break oneflow::(anonymous namespace)::CheckAndConstructOpbreak oneflow::WaitAndSendIdsActor::Actbreak oneflow::WaitAndSendIdsActor::HandlerWaitToStartbreak oneflow/core/lazy/actor/light_actor.cpp:452break oneflow/core/lazy/actor/light_actor.cpp:485break oneflow::ForeignInputKernel::ForwardDataContentbreak oneflow::vm::LaunchLazyJobInstructionType::Compute
11.2 动态图的json示意
- forward(https://quip.com/OMc4A0HOOr0C)
- full(https://quip.com/JLaMAHGBLXmK)
- compiled(https://quip.com/tXjuAiS3J0Ab)
- plan(https://quip.com/a0DMAAIte6PQ)
11.3 actor type
naive_actor
System-AutoTick-AppendDeviceTick_9System-AutoTick-DstSubsetTick_12System-AutoTick-DstSubsetTick_21System-AutoTick-DstSubsetTick_27System-AutoTick-Prepend-DeviceTick_7System-AutoTick-SrcSubsetTick_20System-AutoTick-SrcSubsetTick_26System-AutoTick-SrcSubsetTick_8System-AutoTick-Tick_11System-AutoTick-Tick_13System-EagerCriticalSection-Callback-23System-EagerCriticalSection-Callback-29System-EagerCriticalSection-Interface-Begin-Tick-18System-EagerCriticalSection-Interface-Begin-Tick-24System-EagerCriticalSection-Interface-End-Tick-19System-EagerCriticalSection-Interface-End-Tick-25System-EagerCriticalSection-Wait-22System-EagerCriticalSection-Wait-28
light_actor
_GraphMyLinear_0_input.0.0_2_GraphMyLinear_0_output.0.0_2model.biasmodel-broadcast_add-1model-matmul-0model.weightSystem-AutoTick-SinkTick_15System-SyncAllRanksSinkTick_14
wait_and_send_ids_actorSystem-Src-WaitAndSendIds_16
call_back_notify_actorSystem-Sink-CallbackNotify_17
12
参考资料
oneflow v0.8.0(https://github.com/Oneflow-In...)
OneFlow框架的零碎设计(上篇)(https://zhuanlan.zhihu.com/p/...)
OneFlow框架的零碎设计(中篇)(https://zhuanlan.zhihu.com/p/...)
OneFlow框架的零碎设计(下篇)(https://zhuanlan.zhihu.com/p/...)
一个Job在OneFlow中的执行过程—上篇(https://zhuanlan.zhihu.com/p/...)
一个Job在OneFlow中的执行过程—中篇(https://zhuanlan.zhihu.com/p/...)
一个Job在OneFlow中的执行过程—下篇(https://zhuanlan.zhihu.com/p/...)
动态图模块 nn.Graph(https://docs.oneflow.org/mast...)
OneFlow零碎设计(https://docs.oneflow.org/v0.4...)
torch.nn.Module(https://pytorch.org/docs/1.10...)
欢送 Star、试用 OneFlow 最新版本:https://github.com/Oneflow-In...