作者|郑建华
更新|赵露阳
通过这篇笔记,心愿能初步理解 OneFlow 在 Eager 模式下对设施的治理形式、设施执行计算的过程以及如何充分利用设施计算能力。这里的设施次要指相似 CUDA 这样的并行计算减速设施。
1
设施、流相干类型及关系
框架通过流(Stream)向设施(Device)提交计算工作。一个 Stream 是一个命令序列,能够类比 CUDA Stream(https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.h…),或者 CPU Thread 的指令序列。同一个 Stream 中的命令按程序执行;不同 Stream 之间的命令有依赖关系时,须要同步。不同的工作,比方 kernel 计算、host2device、device2host 等都有本人独立的 Stream,能够并发执行,从而在 Eager 模式下尽可能充分利用设施的异步并发执行能力。
OneFlow 中 Device 和 Stream 相干的局部类构造如下所示:
Device 相干类型
oneflow::Deviceoneflow::Device 是用于示意设施的根底类型,例如:构建 tensor 时 flow.tensor(shape, device="cuda:1")
就会在外部结构出这个根底的 Device 类型,其中设施编号为 1、设施类型为 CUDA。
oneflow/core/framework/device.h:
class Device final {public: ... private: Device(const std::string& type, int64_t device_id); Maybe<void> Init(); const std::string type_; DeviceType enum_type_; const int64_t device_id_; const size_t hash_value_; std::shared_ptr<MemoryCase> mem_case_;};
oneflow::Device
中最重要的两个成员变量别离是用于 示意设施类型的 DeviceType
; 用于示意设施编号的 device_id_。
DeviceType
DeviceType
是一个枚举类,不同的值代表不同的计算设施类型,其定义位于 oneflow/core/common/device_type.proto
:
enum DeviceType {kInvalidDevice = 0; // 有效设施 kCPU = 1; // cpu 设施 kCUDA = 2; // cuda 设施 kMockDevice = 3; // pseudo device for test.}
目前在 oneflow master 分支中,次要有 kCPU
示意 cpu 设施;kCUDA
示意 nvidia cuda 设施;在其余多设施反对的分支中,这里还减少了更多的设施类型。
oneflow::ep::Device
oneflow::Device
是 oneflow 中对设施的根底封装类型,而 oneflow::ep::Device
则是一个抽象类,属于 oneflow 的 ep 模块(execution provider),是对设施行为的封装,ep 模块为多硬件设施提供了更高层次的形象,不便 oneflow 反对和兼容多硬件设施提供了更高的灵活性和可拓展性。
oneflow::ep::Device
不仅提供了示意设施类型的 device_type()
办法、示意设施编号的 device_index()
办法,还提供了创立 / 销毁ep::Stream
、创立 / 销毁Event
、在设施上申请 / 开释内存的各种办法。
oneflow/core/ep/include/device.h
:
class Device {public: OF_DISALLOW_COPY_AND_MOVE(Device); Device() = default; virtual ~Device() = default; virtual void SetAsActiveDevice() = 0; virtual DeviceType device_type() const = 0; virtual size_t device_index() const = 0; virtual DeviceManager* device_manager() const = 0; virtual Stream* CreateStream() = 0; virtual void DestroyStream(Stream* stream) = 0; virtual Event* CreateEvent(); virtual void DestroyEvent(Event* event); virtual void CreateEvents(Event** events, size_t count) = 0; virtual void DestroyEvents(Event** events, size_t count) = 0; virtual Maybe<void> Alloc(const AllocationOptions& options, void** ptr, size_t size) = 0; virtual void Free(const AllocationOptions& options, void* ptr) = 0; virtual Maybe<void> AllocPinned(const AllocationOptions& options, void** ptr, size_t size) = 0; virtual void FreePinned(const AllocationOptions& options, void* ptr) = 0; virtual bool IsStreamOrderedMemoryAllocationSupported() const;};
oneflow::ep::Device
有如下子类实现:
Stream 相干类型
oneflow::Stream 和 cuda device 以及 stream 的关系相似,oneflow 中也存在相似的根底 Stream 类型。
oneflow/core/framework/stream.h
:
class Stream final {.... private: Stream(Symbol<Device> device, StreamType stream_type, size_t thread_uid); static Maybe<Symbol<Stream>> RawNew(Symbol<Device> device, StreamType stream_type, size_t thread_uid); Maybe<void> Init(size_t unique_stream_id); Symbol<Device> device_; StreamType stream_type_; size_t thread_uid_; size_t unique_stream_id_;};
能够看见 Stream 类中的成员变量:
- device_ 示意该 Stream 对象将在何种设施上执行
- streamtype_ 示意该 Stream 的类型,是用于计算的 compute stream 还是用于数据搬运的 host2device、device2host stream 等
- threaduid_ 示意负责启动该 Stream 的线程 id
- unique_streamid_ 示意这个 stream 本身的 unique id
StreamType
和 DeviceType
分为 kCpu 和 kCuda 相似,Stream
也有各种类型之分,具体如下:
oneflow/core/common/stream_type.h
enum class StreamType {kInvalid = 0, // 有效 kCompute, // kernel 计算流 kHost2Device, // 数据搬运(host -> device)流 kDevice2Host, // 数据搬运(device -> host)流 kCcl, // 汇合通信流 kBarrier, // 线程屏障流 kCriticalSection,// 临界区流 kLazyJobLauncher,// job 启动流(lazy mode)kPinnedCompute // pinned memory kernel 计算流};
oneflow::ep::Stream
oneflow 中的 ep 模块提供了一个更高层次的对 Stream 的抽象类,除了能够获取设施的 device()
、获取设施类型的device_type()
办法外,还提供了一系列虚办法如:
- 同步 Sync()
- 执行 Event 事件 RecordEvent()
oneflow/core/ep/include/stream.h
:
class Stream {public: OF_DISALLOW_COPY_AND_MOVE(Stream); Stream() = default; virtual ~Stream() = default; virtual DeviceType device_type() const = 0; virtual Device* device() const = 0; virtual Maybe<void> Sync() = 0; virtual void RecordEvent(Event* event) = 0; virtual Maybe<void> GetAsyncError() { return Maybe<void>::Ok(); } virtual Maybe<void> AllocAsync(void** ptr, size_t size) {UNIMPLEMENTED_THEN_RETURN(); } virtual Maybe<void> FreeAsync(void* ptr) {UNIMPLEMENTED_THEN_RETURN(); } template<typename T> Maybe<void> AllocAsync(T** ptr, size_t size) {return AllocAsync(reinterpret_cast<void**>(ptr), size); } virtual Maybe<void> OnExecutionContextSetup() { return Maybe<void>::Ok(); } virtual Maybe<void> OnExecutionContextTeardown() { return Maybe<void>::Ok(); } template<typename T> T* As() { return static_cast<T*>(this); }};
oneflow::ep::Stream
有如下子类实现:
oneflow::vm::Stream
oneflow vm(virtual machine)中的 oneflow::vm::Stream
类型,用于 vm 外部保护 stream 极其依赖关系、StreamPolicy、调度线程等。
oneflow/core/vm/stream.h
:
class Stream final : public intrusive::Base {public: ... private: ... // fields ThreadCtx* thread_ctx_; Symbol<Device> device_; StreamType stream_type_; std::shared_ptr<StreamPolicy> stream_policy_; bool on_scheduler_thread_; std::unique_ptr<char, std::function<void(char*)>> small_pinned_mem_ptr_; ...};
StreamPolicy
StreamPolicy
是 oneflow vm 中独有的概念,提供了一系列虚办法如:
- stream() 获取
oneflow::ep::Stream
指针 - mut_allocator() 获取
vm::Allocator
指针(用于 tensor 内存治理) - device_type() 获取 device 设施类型
除此之外,提供了一系列 vm 相干的指令状态初始化、查问、删除等办法。
oneflow/core/vm/stream_policy.h
:
class StreamPolicy {public: virtual ~StreamPolicy() = default; virtual ep::Stream* stream() = 0; virtual vm::Allocator* mut_allocator() = 0; virtual DeviceType device_type() const = 0; virtual void InitInstructionStatus(const Stream& stream, InstructionStatusBuffer* status_buffer) const = 0; virtual void DeleteInstructionStatus(const Stream& stream, InstructionStatusBuffer* status_buffer) const = 0; virtual bool QueryInstructionStatusLaunched(const Stream& stream, const InstructionStatusBuffer& status_buffer) const = 0; virtual bool QueryInstructionStatusDone(const Stream& stream, const InstructionStatusBuffer& status_buffer) const = 0; virtual bool OnSchedulerThread(StreamType stream_type) const; virtual bool SupportingTransportInstructions() const = 0; void RunIf(Instruction* instruction) const; protected: StreamPolicy() = default; private: virtual void Run(Instruction* instruction) const = 0;};
StreamPolicy 有如下子类实现:
2
Eager Local 模式下的 Device 和 Stream 推导
上面,梳理一下一般的 eager 模式(eager local mode)下,算子执行全过程中 device 和 stream 相干的推导流程。
2.1 推导 Device
首先,对于一个算子(op)来说,要为其设置一个默认的 device 用于理论计算,这一步在:
Symbol<Device> default_device = JUST(GetDefaultDevice(inputs, ctx))
这里 GetDefaultDevice
的逻辑是:
- 1. 如果 inputs tensor 非空,则依据第一个 input tensor 的 device 来设置 default 的 device
- 2. 如果 inputs tensor 为空,则优先从 OpExprInterpContext 中获取 device,若 OpExprInterpContext 中未设置,则会通过
Device::New("cpu")
; 默认给一个 cpu device
值得阐明的是,在 1. 种状况时,如果 input tensor 创立时指定了 device 为 cuda 设施,则这里推导出的 default device 同样为雷同的 cuda device;如果未显示指定,则默认还是 cpu device。
2.2 推导 Stream
oneflow::Stream 的推导次要在:
-
JUST(user_op_expr.mut_local_tensor_infer_cache()->GetOrInfer(infer_args))
);
Symbol<Stream> stream = JUST(InferDeviceAndStream(...));
InferDeviceAndStream
中,Stream
推导的逻辑是会依据 user_op_expr 是否定义了 device_and_stream_infer_fn 而有所区别 - (多数状况)如果该 op 定义了推导函数,则调用此推导函数来推导 Stream,例如 tensor.cuda()办法,inputs 在 CPU 上,outputs 在 CUDA,二者的设施类型不同。这时就不会默认推导而是利用 op 注册的推导函数获取 oneflow::Stream(。例如 CopyOp::InferDeviceAndStream。
-
(少数状况)否则会通过 stream = JUST(GetDefaultStreamByDevice(default_device)); 来推导。
GetDefaultStreamByDevice
的具体实现:Maybe<Symbol<Stream>> RawGetDefaultStreamByDevice(Symbol<Device> device) {return Stream::New(device, StreamType::kCompute);}
能够看见,依据传入的
device
、StreamType::kCompute
,new 了一个oneflow::Stream
。2.3 InstructionsBuilder::Call 和 vm::Stream 推导
在上述 device 和 stream 推导实现后,会通过 InstructionsBuilder 调用 Call 办法:
JUST(PhysicalRun([&](InstructionsBuilder* builder) -> Maybe<void> {return builder->Call(kernel, std::move(input_eager_blob_objects), std::move(output_eager_blob_objects), ctx, result->stream());}));
Call 办法中会通过
- JUST(SoftSyncStream(output_eager_blob_objects, stream));
- JUST(SoftSyncStream(input_eager_blob_objects, stream));
-
auto* vm_stream = JUST(Singleton<VirtualMachine>::Get()->GetVmStream(stream));
实现 outputs inputs tensor 的流同步(SoftSyncStream)过程以及
vm::Stream
的推导,而后通过结构OpCallInstructionPolicy
指令派发至 vm 执行。SoftSyncStream 的同步这里省略,具体过程见第 4 节。
2.3.1 结构 ThreadCtx 对象,启动执行指令的线程
ThreadCtx 对象指针保留在 VirtualMachine 的 HashMap 中。每个 DeviceType(CPU 或 CUDA)对应一个 ThreadCtx 对象;临界区和 LazyJob 有本人的 ThreadCtx 对象。
首次拜访 HashMap 时失去的是零值(空指针),须要调用 CreateThreadCtx 创建对象。理论通过虚拟机指令创建对象,ThreadCtx 对象保留在 VirtualMachineEngine::thread_ctx_list_ 中。
ThreadCtx 对象结构后,会创立一个 worker 线程、执行 WorkerLoop 办法,并增加到 worker_threads_。所以 worker_threads_ 是与 ThreadCtx 对象一一对应的。
这个线程负责其所归属的指令的执行:
- WorkerLoop 在收到告诉后,会调用 ThreadCtx::TryReceiveAndRun 解决指令。
- 在这个函数中,将 ThreadCtx 的指令挪到长期列表、通过 StreamPolicy 执行每个指令。
-
ThreadCtx 的指令,是 VirtualMachineEngine 在 DispatchInstruction 时增加进去的。
ThreadCtx 创立实现后,将持有 vm::Stream 对象。
oneflow::vm::Stream
和oneflow::Stream
的数量是一一对应的,vm::Stream 依照 <DeviceType, StreamRole> 分组存储在对应的 ThreadCtx 中。vm::Stream
的推导流程细节如下: -
auto* vm_stream = JUST(Singleton<VirtualMachine>::Get()->GetVmStream(stream));
-
VirtualMachine::GetVmStream()
- Maybe<vm::Stream*>
VirtualMachine::CreateStream(Symbol<Stream> stream)
- Stream::__Init__(ThreadCtx* thread_ctx, Symbol<Device> device, StreamType stream_type…)
- stream_policy_ = CHECK_JUST(CreateStreamPolicy::Visit(stream_type, device));
2.4 执行 OpCall 指令和 ep::Stream 推导
有几个场景会创立(获取)ep::Stream 对象。比方 kernel 执行时。
OpCall 指令在结构时,指令策略类型是 OpCallInstructionPolicy。虚拟机在 DispatchInstruction 时,无论哪个分支,后续都会调用 EpStreamType::Run,最终通过
-
-
EpStreamPolicyBase::Run()
-
instruction->Compute()
- OpCallInstructionPolicy::Compute()
- OpCallInstructionUtil::Compute()
-
OpCallInstructionUtil::OpKernelCompute()
- op_call_instruction_policy->mut_opkernel()->Compute()执行 kernel 的 Compute 办法
例如 GpuL2NormalizeKernel::Compute,最终在其 kernel 的 Compute 办法中,会通过 ctx->stream()创立(获取)ep::Stream 对象,launch kernel 执行计算。
2.4.1 获取 / 创立 ep::Stream
上面,咱们重点看一下 OpCall 指令理论执行时,调用的 OpCallInstructionUtil::Compute()办法:
static inline Maybe<void> Compute(OpCallInstructionPolicy* op_call_instruction_policy, Instruction* instruction) {Allocator* allocator = instruction->mut_stream()->mut_stream_policy()->mut_allocator(); JUST(AllocateOutputBlobsMemory(op_call_instruction_policy, allocator, instruction)); if (unlikely(op_call_instruction_policy->need_temp_storage())) {JUST(TryAllocateTempStorage(op_call_instruction_policy, allocator)); } ep::Stream* stream = instruction->mut_stream()->mut_stream_policy()->stream(); user_op::OpKernelState* state = nullptr; user_op::OpKernelCache* cache = nullptr; if (op_call_instruction_policy->user_opkernel()->has_state_or_cache()) {TryInitOpKernelStateAndCache(op_call_instruction_policy, stream, &state, &cache); } OpKernelCompute(op_call_instruction_policy, stream, state, cache); if (unlikely(op_call_instruction_policy->need_temp_storage())) {DeallocateTempStorage(op_call_instruction_policy, allocator); } return Maybe<void>::Ok();}
其中会通过
`
ep::Stream* stream = instruction->mut_stream()->mut_stream_policy()->stream()
; 实现
ep::Stream
的推导,之后在OpKernelCompute()
办法中理论实现 op/kernel 的执行。ep::Stream* stream = instruction->mut_stream()->mut_stream_policy()->stream();
-
-
ep::Stream* stream() override { return GetOrCreateEpStream(); }
-
GetOrCreateEpStream()
- ep_stream_ = GetOrCreateEpDevice()->CreateStream();
这里 ->stream()会调用 ep_stream_policy_base.h 中的:
ep::Stream* stream() override { return GetOrCreateEpStream(); }
这是一个 private 办法:
private: ep::Stream* GetOrCreateEpStream() const { if (unlikely(ep_stream_ == nullptr)) {ep_stream_ = GetOrCreateEpDevice()->CreateStream(); CHECK(ep_stream_ != nullptr); } return ep_stream_; }
能够看到,如果成员变量
ep_stream_
非空,则间接返回;否则,通过 ep_stream_ = GetOrCreateEpDevice()->CreateStream(); 来创立创立ep::Stream
。2.4.2 获取 / 创立 ep::Device
而这里的
GetOrCreateEpDevice
办法如下:ep::Device* GetOrCreateEpDevice() const { if (unlikely(ep_device_ == nullptr)) {ep_device_ = Singleton<ep::DeviceManagerRegistry>::Get()->GetDevice(device_->enum_type(), device_->device_id()); CHECK(ep_device_); } return ep_device_.get();}
依据
oneflow::Device
中拿到的 device id 和 device type,去全局单例的ep::DeviceManagerRegistry
中取出对应的oneflow::ep::Device
oneflow::vm::StreamPolicy 和 oneflow::vm::EpStreamPolicy 推导
-
-
stream_policy_ = CHECK_JUST(CreateStreamPolicy::Visit(stream_type, device));
- std::shared_ptr<vm::StreamPolicy>(new vm::EpStreamPolicy(device));
3
Eager Global 模式下的 Device 和 Stream 推导
eager global 模式下,device 信息暗藏在 placement 中,placement 不仅包含了 device type 信息还包含其 tensor 具体散布在哪些 ranks 上的信息,
placement
在 C++ 中的对应类型是 ParallelDesc。所以 device 以及 stream 的局部推导过程和 eager local 模式下有所区别,但 OpCall 指令执行;device、vm::Stream 和 ep::Stream 的推导过程都和 eager local 模式下是相似的。
3.1 推导 Device
3.1.1 placement 的 parallel_id
oneflow 中的 placement 示意 tensor 寄存的设施集群(device group),如:
p = flow.placement(type="cuda", ranks=[0, 1, 2, 3])
示意 tensor 散布于 1 台机器上,cuda device 0、1、2、3 四个设施上;
p = flow.placement(type="cuda", ranks=[[0, 1], [2, 3]])
则示意 tensor 散布于 2 台机器上,host1 的 device0、1 以及 host2 的 device2、3。
在 oneflow 的分布式环境下,各个 host 上须要有雷同数量的 device,每个过程应用一个 device。这样依据环境变量
RANK
能够得出 machine_id,LOCAL_RANK
就是过程在 制订 host 上的 rank 序号。如果 input tensor 的 placement 与以后过程无关,能够省掉很多不必要的计算。通过 placement 的 parallel_id 能够判断计算工作是否与以后过程相干。
placement
在 C++ 中的对应类型是 ParallelDesc,其中并没有 parallel_id 字段,这个信息隐含在其它字段中。ParallelDesc 在结构时会调用 ClearUp 函数,从中能够看到
- ParallelDesc::parallel_id2machine_id_ 是 placement 散布的 machine。
- ParallelDesc::parallel_id2device_id_ 是 placement 散布的 device_id。
- parallel_id 是上述 2 个数组的索引,一个 parallel_id 对应一个 machine_id:device_id 组合。这样,依据 parallel_id 能够查到对应的 machine_id 和 device_id。
-
反过来,依据 machine_id:device_id 也能够从 machine_id2device_id2parallel_id_ 查到 parallel_id。
3.1.2 eager 模式下依据 parallel_id 疏忽无关计算工作
在 eager 散布时场景解决计算工作时,会调用
GetTensorDevice4CurrentProcessCtx
, 推导失去输入 tensor 的 device,以及获取以后过程的 machine_id、device_id 在 placement 中的 parallel_id 值。如果以后过程与该 placement 无关,parallel_id 就是空,后续解决时就能够疏忽一些计算:
- EagerGlobalTensorImpl::New 中只须要用 functional::Empty 结构一个 shape 为 0 的空的 tensor。
- GetBoxingOutput 计算时,如果 parallel_id 为空则示意以后 rank 过程有效,无需计算间接返回。
-
Interpret 能够不给 vm 提交指令、提前返回。
3.2 推导 Stream
在
ConsistentTensorInferCache
中推导 SBP Signature 时,也会同时推导出以后的 tensor 计算工作、在以后过程所用的 device。推导时,会先确认所有 inputs 的 placement 是统一的,都散布在雷同的 device 上。如前所述,如果计算工作与以后过程无关,会提前返回;而一个过程只应用一个 device。这里和 eager local 模式下 stream 的推导相似,通过
JUST(InferDeviceAndStream(user_op_expr, infer_args))
推导出
oneflow::Stream
对象,StreamRole 是 kCompute。区别在于 eager global 模式下3.2.1 unique_stream_id
unique_stream_id
示意oneflow::Stream
对象的创立秩序。所有的oneflow::Stream
对象都保留在全局的 StreamMgr::stream2unique_stream_id_ 中。unique_stream_id2stream_symbol_
可看作是援用类型的正本,unique_stream_id
就是 Stream 对象在这个数组中的索引。与 parallel_id 不同,unique_stream_id 是 Stream 对象在过程内的惟一标识。并不是每次都须要加锁拜访
StreamMgr
。oneflow::Stream
蕴含的都是描述性信息,其援用是以 ThreadLocal 的形式存储的,能够晋升后续读取的效率。虚拟机在执行指令时,也会用 unique_stream_id 进行逻辑判断。4
Eager 模式下的 Stream 同步——SoftSyncStream
构想以下场景:将 CPU 下的 tensor 拷贝到 CUDA 设施,而后在 CUDA 上再进行 tensor add 的计算。这波及到两个流,一个是 Host2Device,一个是 CUDA Compute。这两个流的计算工作是并发执行的。须要有同步措施,能力保障拷贝完再执行 add 计算。
Eager 模式下,在
InstructionsBuilder::Call
中结构指令时,对 SoftSyncStream 的调用会在必要时向指令列表插入同步指令。SoftSyncStream
中,几个重要概念: - tensor 在 oneflow 内存中的理论承载者是 eager_blob_object
- last_used_stream 示意一个 tensor(blob)上一次应用到的 stream,可能是 compute stream、h2d stream、d2h stream、汇合通信 ccl stream 等
-
如果 last_used_stream 与以后计算执行的流 stream 雷同,则能够疏忽,因为雷同 stream 间人造程序执行所以无需同步,否则就须要进行后续的同步解决
SoftSyncStream 代码如下:
Maybe<void> InstructionsBuilder::SoftSyncStream(const vm::EagerBlobObjectList& eager_blob_objects, Symbol<Stream> stream) {JUST(ForEachEagerBlobObjectsNeedingSoftSync( eager_blob_objects, stream, [&](Symbol<Stream> last_used_stream, auto&& dep_objects) -> Maybe<void> {return SoftSyncStreamBetween(std::move(dep_objects), last_used_stream, stream); })); for (const auto& eager_blob_object : eager_blob_objects) {eager_blob_object->set_last_used_stream(stream); } return Maybe<void>::Ok();}
主体逻辑是,会在
ForEachEagerBlobObjectsNeedingSoftSync
办法中遍历每一个 tensor 对象(eager blob object),对于每一个须要同步的 blob 使用 lambda 办法并最终调用SoftSyncStreamBetween
实现 stream 间的同步。这里,咱们看一下 ForEachEagerBlobObjectsNeedingSoftSync 的逻辑:
首先 if/else 的主体业务逻辑是相似的,次要区别在于,当 blob 的 size <= kOpArgsReservedSize 时(默认为 4)会应用 small vector 来寄存 LocalDepObject 变量,效率会更快一些(否则会走到 else 分支,主体逻辑相似,这里就不看了)。
-
const auto& opt_last_used_stream = eager_blob_object->last_used_stream()
;
-
if (unlikely(!opt_last_used_stream.has_value())) {continue;}
这两句是查问该 tensor(blob)上一次被应用时用到的 stream——last_used_stream,如果为空,则间接 continue 跳过,因为如果此 tensor 之前并未被任何 stream 应用,则无需进行 stream 间的同步操作,因为在以后 stream 上不会有对于该 tensor 的其余依赖关系;
if (last_used_stream != stream) {small_vector<intrusive::shared_ptr<LocalDepObject>, kOpArgsReservedSize> dep_objects{ intrusive::shared_ptr<LocalDepObject>( JUST(eager_blob_object->compute_local_dep_object()))}; JUST(DoEach(last_used_stream, std::move(dep_objects))); }
如果
last_used_stream!=stream
则示意须要在两个 stream 间进行同步,则会利用传入的 lambda 函数 DoEach 进行解决,在这里 lambda 函数即:[&](Symbol<Stream> last_used_stream, auto&& dep_objects) -> Maybe<void> {return SoftSyncStreamBetween(std::move(dep_objects), last_used_stream, stream); }));
既理论调用的是 SoftSyncStreamBetween 来实现理论的 stream 间同步,这里次要有 3 个变量:
dep_objects
存储了 tensor 间的依赖关系last_used_stream
则是该 tensor 上一次应用的 stream-
stream
该 tensor 以后应用的 streamSoftSyncStreamBetween 的代码如下:
Maybe<void> InstructionsBuilder::SoftSyncStreamBetween(small_vector<intrusive::shared_ptr<LocalDepObject>, kOpArgsReservedSize>&& dependences, Symbol<Stream> from_stream, Symbol<Stream> to_stream) {CHECK(from_stream != to_stream) << "synchronization is unnecessary"; if (SupportingStreamWait(from_stream, to_stream)) {JUST(StreamWait(std::move(dependences), from_stream, to_stream)); } else {JUST(RecordEvent(std::move(dependences), from_stream)); } return Maybe<void>::Ok();}
SoftSyncStreamBetween
的次要逻辑如下:
- 先额定做了一次 check,检测如果待同步的两个 stream 雷同,则 check 会报错并提醒 ”synchronization is unnecessary”
- 通过
SupportingStreamWait
判断 from 和 to stream 间是否反对 stream wait,是则调用 StreamWait 办法;否则,间接调用RecordEvent
办法 SupportingStreamWait
的次要逻辑是,通过 stream 的 device、以及StreamType
的 Visit 办法来判断。简略来说,如果 from 和 to stream 之间是不同的 device(譬如 cpu stream <-> cuda stream 之间的同步),或者 from stream 的 device 为 cpu,则 SupportingStreamWait 肯定是 false;如果是雷同的,则持续通过其余判断条件进行判断。
SupportingStreamWait 为 True
SupportingStreamWait
为 True 时,即 from to stream 同为 Cuda Stream 间的同步状况,在这种状况下会走到 StreamWait 的函数,该函数最终会派发一个 StreamWaitEventInstructionPolicy
的指令给 vm 执行,StreamWaitEventInstructionPolicy 的执行逻辑次要是两个 cuda event:
- cudaEventRecord
- cudaStreamWaitEvent
- 对于 from_stream 来说,插入一个
cudaEventRecord
,用于标记 from stream 是否实现该 stream 上的 event 事件; - 对于 to_stream 来说,插入一个
cudaStreamWaitEvent
期待 from stream 上的事件实现后,再继续执行 to_stream。
SupportingStreamWait 为 False
SupportingStreamWait
为 False 时,会间接调用JUST(RecordEvent(std::move(dependences)
, from_stream)); 其外部实现会从对象池中获取可复用的 cuda event 对象并执行 event。
这里有个细节,因为 cuda event 的创立和销毁都会引发 cuda kernel 的 launch 由异步转同步,所以基于对象池的 cuda event 能够防止这个开销。
实际上最终调用的还是 cudaEventRecord
, 而cudaEventRecord
自身只是起到一个“占位符”的作用,并不能起到(保障该 stream 上其余 kernel 全副执行完)的作用,真正能保障 stream 同步作用的是 oneflow vm(vitual machine)管制下的指令间依赖关系 / 执行程序。
5
CPU 下的并行计算
CpuStream 只有一个线程。CPU kernel 应该是通过 OpenMP 或者 Intel OneApi 等实现并行计算减速。
参考资料
1.https://github.com/Oneflow-Inc/oneflow/tree/845595e2c0abc3d384ff047e188295afdc41faaa
欢送 Star、试用 OneFlow 最新版本:https://github.com/Oneflow-Inc/oneflow/