乐趣区

关于深度学习:OneFlow源码解析Eager模式下的设备管理与并发执行

作者|郑建华
更新|赵露阳

通过这篇笔记,心愿能初步理解 OneFlow 在 Eager 模式下对设施的治理形式、设施执行计算的过程以及如何充分利用设施计算能力。这里的设施次要指相似 CUDA 这样的并行计算减速设施。

1

设施、流相干类型及关系

框架通过流(Stream)向设施(Device)提交计算工作。一个 Stream 是一个命令序列,能够类比 CUDA Stream(https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.h…),或者 CPU Thread 的指令序列。同一个 Stream 中的命令按程序执行;不同 Stream 之间的命令有依赖关系时,须要同步。不同的工作,比方 kernel 计算、host2device、device2host 等都有本人独立的 Stream,能够并发执行,从而在 Eager 模式下尽可能充分利用设施的异步并发执行能力。

OneFlow 中 Device 和 Stream 相干的局部类构造如下所示:

Device 相干类型

oneflow::Deviceoneflow::Device 是用于示意设施的根底类型,例如:构建 tensor 时 flow.tensor(shape, device="cuda:1")就会在外部结构出这个根底的 Device 类型,其中设施编号为 1、设施类型为 CUDA。

oneflow/core/framework/device.h:

class Device final {public:  ... private:  Device(const std::string& type, int64_t device_id);  Maybe<void> Init();  const std::string type_;  DeviceType enum_type_;  const int64_t device_id_;  const size_t hash_value_;  std::shared_ptr<MemoryCase> mem_case_;};

oneflow::Device中最重要的两个成员变量别离是用于 示意设施类型的 DeviceType 用于示意设施编号的 device_id_

DeviceType

DeviceType是一个枚举类,不同的值代表不同的计算设施类型,其定义位于 oneflow/core/common/device_type.proto

enum DeviceType {kInvalidDevice = 0;  // 有效设施  kCPU = 1;            // cpu 设施  kCUDA = 2;           // cuda 设施  kMockDevice = 3;     // pseudo device for test.}

目前在 oneflow master 分支中,次要有 kCPU 示意 cpu 设施;kCUDA示意 nvidia cuda 设施;在其余多设施反对的分支中,这里还减少了更多的设施类型。

oneflow::ep::Device

oneflow::Device是 oneflow 中对设施的根底封装类型,而 oneflow::ep::Device 则是一个抽象类,属于 oneflow 的 ep 模块(execution provider),是对设施行为的封装,ep 模块为多硬件设施提供了更高层次的形象,不便 oneflow 反对和兼容多硬件设施提供了更高的灵活性和可拓展性

oneflow::ep::Device不仅提供了示意设施类型的 device_type() 办法、示意设施编号的 device_index() 办法,还提供了创立 / 销毁ep::Stream、创立 / 销毁Event、在设施上申请 / 开释内存的各种办法。

oneflow/core/ep/include/device.h

class Device {public:  OF_DISALLOW_COPY_AND_MOVE(Device);  Device() = default;  virtual ~Device() = default;  virtual void SetAsActiveDevice() = 0;  virtual DeviceType device_type() const = 0;  virtual size_t device_index() const = 0;  virtual DeviceManager* device_manager() const = 0;  virtual Stream* CreateStream() = 0;  virtual void DestroyStream(Stream* stream) = 0;  virtual Event* CreateEvent();  virtual void DestroyEvent(Event* event);  virtual void CreateEvents(Event** events, size_t count) = 0;  virtual void DestroyEvents(Event** events, size_t count) = 0;  virtual Maybe<void> Alloc(const AllocationOptions& options, void** ptr, size_t size) = 0;  virtual void Free(const AllocationOptions& options, void* ptr) = 0;  virtual Maybe<void> AllocPinned(const AllocationOptions& options, void** ptr, size_t size) = 0;  virtual void FreePinned(const AllocationOptions& options, void* ptr) = 0;  virtual bool IsStreamOrderedMemoryAllocationSupported() const;};

oneflow::ep::Device有如下子类实现:

Stream 相干类型

oneflow::Stream 和 cuda device 以及 stream 的关系相似,oneflow 中也存在相似的根底 Stream 类型。

oneflow/core/framework/stream.h

class Stream final {.... private:  Stream(Symbol<Device> device, StreamType stream_type, size_t thread_uid);  static Maybe<Symbol<Stream>> RawNew(Symbol<Device> device, StreamType stream_type,                                      size_t thread_uid);  Maybe<void> Init(size_t unique_stream_id);  Symbol<Device> device_;  StreamType stream_type_;  size_t thread_uid_;  size_t unique_stream_id_;};

能够看见 Stream 类中的成员变量:

  • device_  示意该 Stream 对象将在何种设施上执行
  • streamtype_  示意该 Stream 的类型,是用于计算的 compute stream 还是用于数据搬运的 host2device、device2host stream 等
  • threaduid_ 示意负责启动该 Stream 的线程 id
  • unique_streamid_ 示意这个 stream 本身的 unique id

StreamType

DeviceType 分为 kCpu 和 kCuda 相似,Stream也有各种类型之分,具体如下:

oneflow/core/common/stream_type.h

enum class StreamType {kInvalid = 0,    // 有效    kCompute,        // kernel 计算流    kHost2Device,    // 数据搬运(host -> device)流    kDevice2Host,    // 数据搬运(device -> host)流    kCcl,            // 汇合通信流    kBarrier,        // 线程屏障流    kCriticalSection,// 临界区流    kLazyJobLauncher,// job 启动流(lazy mode)kPinnedCompute   // pinned memory kernel 计算流};

oneflow::ep::Stream

oneflow 中的 ep 模块提供了一个更高层次的对 Stream 的抽象类,除了能够获取设施的 device()、获取设施类型的device_type() 办法外,还提供了一系列虚办法如:

  • 同步 Sync()
  • 执行 Event 事件 RecordEvent()

oneflow/core/ep/include/stream.h

class Stream {public:  OF_DISALLOW_COPY_AND_MOVE(Stream);  Stream() = default;  virtual ~Stream() = default;  virtual DeviceType device_type() const = 0;  virtual Device* device() const = 0;  virtual Maybe<void> Sync() = 0;  virtual void RecordEvent(Event* event) = 0;  virtual Maybe<void> GetAsyncError() { return Maybe<void>::Ok(); }  virtual Maybe<void> AllocAsync(void** ptr, size_t size) {UNIMPLEMENTED_THEN_RETURN(); }  virtual Maybe<void> FreeAsync(void* ptr) {UNIMPLEMENTED_THEN_RETURN(); }  template<typename T>  Maybe<void> AllocAsync(T** ptr, size_t size) {return AllocAsync(reinterpret_cast<void**>(ptr), size);  }  virtual Maybe<void> OnExecutionContextSetup() { return Maybe<void>::Ok(); }  virtual Maybe<void> OnExecutionContextTeardown() { return Maybe<void>::Ok(); }  template<typename T>  T* As() {    return static_cast<T*>(this);  }};

oneflow::ep::Stream有如下子类实现:

oneflow::vm::Stream

oneflow vm(virtual machine)中的 oneflow::vm::Stream 类型,用于 vm 外部保护 stream 极其依赖关系、StreamPolicy、调度线程等。

oneflow/core/vm/stream.h

class Stream final : public intrusive::Base {public:  ... private:  ...  // fields  ThreadCtx* thread_ctx_;  Symbol<Device> device_;  StreamType stream_type_;  std::shared_ptr<StreamPolicy> stream_policy_;  bool on_scheduler_thread_;  std::unique_ptr<char, std::function<void(char*)>> small_pinned_mem_ptr_;  ...};

StreamPolicy

StreamPolicy是 oneflow vm 中独有的概念,提供了一系列虚办法如:

  • stream() 获取 oneflow::ep::Stream 指针
  • mut_allocator() 获取 vm::Allocator 指针(用于 tensor 内存治理)
  • device_type() 获取 device 设施类型

除此之外,提供了一系列 vm 相干的指令状态初始化、查问、删除等办法。

oneflow/core/vm/stream_policy.h

class StreamPolicy {public:  virtual ~StreamPolicy() = default;  virtual ep::Stream* stream() = 0;  virtual vm::Allocator* mut_allocator() = 0;  virtual DeviceType device_type() const = 0;  virtual void InitInstructionStatus(const Stream& stream,                                     InstructionStatusBuffer* status_buffer) const = 0;  virtual void DeleteInstructionStatus(const Stream& stream,                                       InstructionStatusBuffer* status_buffer) const = 0;  virtual bool QueryInstructionStatusLaunched(const Stream& stream, const InstructionStatusBuffer& status_buffer) const = 0;  virtual bool QueryInstructionStatusDone(const Stream& stream,                                          const InstructionStatusBuffer& status_buffer) const = 0;  virtual bool OnSchedulerThread(StreamType stream_type) const;  virtual bool SupportingTransportInstructions() const = 0;  void RunIf(Instruction* instruction) const; protected:  StreamPolicy() = default; private:  virtual void Run(Instruction* instruction) const = 0;};

StreamPolicy 有如下子类实现:

2

Eager Local 模式下的 Device 和 Stream 推导

上面,梳理一下一般的 eager 模式(eager local mode)下,算子执行全过程中 device 和 stream 相干的推导流程。

2.1 推导 Device

首先,对于一个算子(op)来说,要为其设置一个默认的 device 用于理论计算,这一步在:

Symbol<Device> default_device = JUST(GetDefaultDevice(inputs, ctx))

这里 GetDefaultDevice 的逻辑是:

  • 1. 如果 inputs tensor 非空,则依据第一个 input tensor 的 device 来设置 default 的 device
  • 2. 如果 inputs tensor 为空,则优先从 OpExprInterpContext 中获取 device,若 OpExprInterpContext 中未设置,则会通过Device::New("cpu"); 默认给一个 cpu device

值得阐明的是,在 1. 种状况时,如果 input tensor 创立时指定了 device 为 cuda 设施,则这里推导出的 default device 同样为雷同的 cuda device;如果未显示指定,则默认还是 cpu device。

2.2 推导 Stream

oneflow::Stream 的推导次要在:

  • JUST(user_op_expr.mut_local_tensor_infer_cache()->GetOrInfer(infer_args))

    );

     Symbol<Stream> stream = JUST(InferDeviceAndStream(...));

    InferDeviceAndStream中,Stream推导的逻辑是会依据 user_op_expr 是否定义了 device_and_stream_infer_fn 而有所区别

  • (多数状况)如果该 op 定义了推导函数,则调用此推导函数来推导 Stream,例如 tensor.cuda()办法,inputs 在 CPU 上,outputs 在 CUDA,二者的设施类型不同。这时就不会默认推导而是利用 op 注册的推导函数获取 oneflow::Stream(。例如 CopyOp::InferDeviceAndStream。
  • (少数状况)否则会通过 stream = JUST(GetDefaultStreamByDevice(default_device)); 来推导。

    GetDefaultStreamByDevice的具体实现:

    Maybe<Symbol<Stream>> RawGetDefaultStreamByDevice(Symbol<Device> device) {return Stream::New(device, StreamType::kCompute);}

    能够看见,依据传入的deviceStreamType::kCompute,new 了一个oneflow::Stream

    2.3 InstructionsBuilder::Call 和 vm::Stream 推导

    在上述 device 和 stream 推导实现后,会通过 InstructionsBuilder 调用 Call 办法:

    JUST(PhysicalRun([&](InstructionsBuilder* builder) -> Maybe<void> {return builder->Call(kernel, std::move(input_eager_blob_objects),                         std::move(output_eager_blob_objects), ctx, result->stream());}));

    Call 办法中会通过

  • JUST(SoftSyncStream(output_eager_blob_objects, stream));
  • JUST(SoftSyncStream(input_eager_blob_objects, stream));
  • auto* vm_stream = JUST(Singleton<VirtualMachine>::Get()->GetVmStream(stream));

    实现 outputs inputs tensor 的流同步(SoftSyncStream)过程以及 vm::Stream 的推导,而后通过结构 OpCallInstructionPolicy 指令派发至 vm 执行。

    SoftSyncStream 的同步这里省略,具体过程见第 4 节。

    2.3.1 结构 ThreadCtx 对象,启动执行指令的线程

    ThreadCtx 对象指针保留在 VirtualMachine 的 HashMap 中。每个 DeviceType(CPU 或 CUDA)对应一个 ThreadCtx 对象;临界区和 LazyJob 有本人的 ThreadCtx 对象。

    首次拜访 HashMap 时失去的是零值(空指针),须要调用 CreateThreadCtx 创建对象。理论通过虚拟机指令创建对象,ThreadCtx 对象保留在 VirtualMachineEngine::thread_ctx_list_ 中。

    ThreadCtx 对象结构后,会创立一个 worker 线程、执行 WorkerLoop 办法,并增加到 worker_threads_。所以 worker_threads_ 是与 ThreadCtx 对象一一对应的。

    这个线程负责其所归属的指令的执行:

  • WorkerLoop 在收到告诉后,会调用 ThreadCtx::TryReceiveAndRun 解决指令。
  • 在这个函数中,将 ThreadCtx 的指令挪到长期列表、通过 StreamPolicy 执行每个指令。
  • ThreadCtx 的指令,是 VirtualMachineEngine 在 DispatchInstruction 时增加进去的。

    ThreadCtx 创立实现后,将持有 vm::Stream 对象。oneflow::vm::Streamoneflow::Stream 的数量是一一对应的,vm::Stream 依照 <DeviceType, StreamRole> 分组存储在对应的 ThreadCtx 中。

    vm::Stream的推导流程细节如下:

  • auto* vm_stream = JUST(Singleton<VirtualMachine>::Get()->GetVmStream(stream));

    • VirtualMachine::GetVmStream()

      • Maybe<vm::Stream*> 

        VirtualMachine::CreateStream(Symbol<Stream> stream)

    • Stream::__Init__(ThreadCtx* thread_ctx, Symbol<Device> device, StreamType stream_type…)
    • stream_policy_ = CHECK_JUST(CreateStreamPolicy::Visit(stream_type, device));

    2.4 执行 OpCall 指令和 ep::Stream 推导

    有几个场景会创立(获取)ep::Stream 对象。比方 kernel 执行时。

    OpCall 指令在结构时,指令策略类型是 OpCallInstructionPolicy。虚拟机在 DispatchInstruction 时,无论哪个分支,后续都会调用 EpStreamType::Run,最终通过

  • EpStreamPolicyBase::Run()

    • instruction->Compute()

      • OpCallInstructionPolicy::Compute()
    • OpCallInstructionUtil::Compute()
    • OpCallInstructionUtil::OpKernelCompute()

      • op_call_instruction_policy->mut_opkernel()->Compute()执行 kernel 的 Compute 办法

    例如 GpuL2NormalizeKernel::Compute,最终在其 kernel 的 Compute 办法中,会通过 ctx->stream()创立(获取)ep::Stream 对象,launch kernel 执行计算。

    2.4.1 获取 / 创立 ep::Stream

    上面,咱们重点看一下 OpCall 指令理论执行时,调用的 OpCallInstructionUtil::Compute()办法:

    static inline Maybe<void> Compute(OpCallInstructionPolicy* op_call_instruction_policy,                                    Instruction* instruction) {Allocator* allocator = instruction->mut_stream()->mut_stream_policy()->mut_allocator();    JUST(AllocateOutputBlobsMemory(op_call_instruction_policy, allocator, instruction));    if (unlikely(op_call_instruction_policy->need_temp_storage())) {JUST(TryAllocateTempStorage(op_call_instruction_policy, allocator));    }    ep::Stream* stream = instruction->mut_stream()->mut_stream_policy()->stream();    user_op::OpKernelState* state = nullptr;    user_op::OpKernelCache* cache = nullptr;    if (op_call_instruction_policy->user_opkernel()->has_state_or_cache()) {TryInitOpKernelStateAndCache(op_call_instruction_policy, stream, &state, &cache);    }    OpKernelCompute(op_call_instruction_policy, stream, state, cache);    if (unlikely(op_call_instruction_policy->need_temp_storage())) {DeallocateTempStorage(op_call_instruction_policy, allocator);    }    return Maybe<void>::Ok();}

    其中会通过`

    ep::Stream* stream = instruction->mut_stream()->mut_stream_policy()->stream()

    ; 实现 ep::Stream 的推导,之后在 OpKernelCompute() 办法中理论实现 op/kernel 的执行。

    ep::Stream* stream = instruction->mut_stream()->mut_stream_policy()->stream();

  • ep::Stream* stream() override { return GetOrCreateEpStream(); }

    • GetOrCreateEpStream()

      • ep_stream_ = GetOrCreateEpDevice()->CreateStream();

    这里 ->stream()会调用 ep_stream_policy_base.h 中的:

    ep::Stream* stream() override { return GetOrCreateEpStream(); }

    这是一个 private 办法:

    private:  ep::Stream* GetOrCreateEpStream() const {    if (unlikely(ep_stream_ == nullptr)) {ep_stream_ = GetOrCreateEpDevice()->CreateStream();      CHECK(ep_stream_ != nullptr);    }    return ep_stream_;  }

    能够看到,如果成员变量 ep_stream_ 非空,则间接返回;否则,通过 ep_stream_ = GetOrCreateEpDevice()->CreateStream(); 来创立创立ep::Stream

    2.4.2 获取 / 创立 ep::Device

    而这里的 GetOrCreateEpDevice 办法如下:

    ep::Device* GetOrCreateEpDevice() const {    if (unlikely(ep_device_ == nullptr)) {ep_device_ = Singleton<ep::DeviceManagerRegistry>::Get()->GetDevice(device_->enum_type(),                                                                          device_->device_id());      CHECK(ep_device_);    }    return ep_device_.get();}

    依据 oneflow::Device 中拿到的 device id 和 device type,去全局单例的 ep::DeviceManagerRegistry 中取出对应的oneflow::ep::Device

    oneflow::vm::StreamPolicy 和 oneflow::vm::EpStreamPolicy 推导

  • stream_policy_ = CHECK_JUST(CreateStreamPolicy::Visit(stream_type, device));

    • std::shared_ptr<vm::StreamPolicy>(new vm::EpStreamPolicy(device));

    3

    Eager Global 模式下的 Device 和 Stream 推导

    eager global 模式下,device 信息暗藏在 placement 中,placement 不仅包含了 device type 信息还包含其 tensor 具体散布在哪些 ranks 上的信息,placement 在 C++ 中的对应类型是 ParallelDesc。

    所以 device 以及 stream 的局部推导过程和 eager local 模式下有所区别,但 OpCall 指令执行;device、vm::Stream 和 ep::Stream 的推导过程都和 eager local 模式下是相似的。

    3.1 推导 Device

    3.1.1 placement 的 parallel_id

    oneflow 中的 placement 示意 tensor 寄存的设施集群(device group),如:

    p = flow.placement(type="cuda", ranks=[0, 1, 2, 3])

    示意 tensor 散布于 1 台机器上,cuda device 0、1、2、3 四个设施上;

    p = flow.placement(type="cuda", ranks=[[0, 1], [2, 3]])

    则示意 tensor 散布于 2 台机器上,host1 的 device0、1 以及 host2 的 device2、3。

    在 oneflow 的分布式环境下,各个 host 上须要有雷同数量的 device,每个过程应用一个 device。这样依据环境变量 RANK 能够得出 machine_id,LOCAL_RANK 就是过程在 制订 host 上的 rank 序号。

    如果 input tensor 的 placement 与以后过程无关,能够省掉很多不必要的计算。通过 placement 的 parallel_id 能够判断计算工作是否与以后过程相干。

    placement 在 C++ 中的对应类型是 ParallelDesc,其中并没有 parallel_id 字段,这个信息隐含在其它字段中。

    ParallelDesc 在结构时会调用 ClearUp 函数,从中能够看到

  • ParallelDesc::parallel_id2machine_id_ 是 placement 散布的 machine。
  • ParallelDesc::parallel_id2device_id_ 是 placement 散布的 device_id。
  • parallel_id 是上述 2 个数组的索引,一个 parallel_id 对应一个 machine_id:device_id 组合。这样,依据 parallel_id 能够查到对应的 machine_id 和 device_id。
  • 反过来,依据 machine_id:device_id 也能够从 machine_id2device_id2parallel_id_ 查到 parallel_id。

    3.1.2 eager 模式下依据 parallel_id 疏忽无关计算工作

    在 eager 散布时场景解决计算工作时,会调用  GetTensorDevice4CurrentProcessCtx 推导失去输入 tensor 的 device,以及获取以后过程的 machine_id、device_id 在 placement 中的 parallel_id 值。

    如果以后过程与该 placement 无关,parallel_id 就是空,后续解决时就能够疏忽一些计算:

  • EagerGlobalTensorImpl::New 中只须要用 functional::Empty 结构一个 shape 为 0 的空的 tensor。
  • GetBoxingOutput 计算时,如果 parallel_id 为空则示意以后 rank 过程有效,无需计算间接返回。
  • Interpret 能够不给 vm 提交指令、提前返回。

    3.2 推导 Stream

    在 ConsistentTensorInferCache 中推导 SBP Signature 时,也会同时推导出以后的 tensor 计算工作、在以后过程所用的 device。推导时,会先确认所有 inputs 的 placement 是统一的,都散布在雷同的 device 上。如前所述,如果计算工作与以后过程无关,会提前返回;而一个过程只应用一个 device。

    这里和 eager local 模式下 stream 的推导相似,通过

    JUST(InferDeviceAndStream(user_op_expr, infer_args))

    推导出 oneflow::Stream 对象,StreamRole 是 kCompute。区别在于 eager global 模式下

    3.2.1 unique_stream_id

    unique_stream_id 示意 oneflow::Stream 对象的创立秩序。所有的 oneflow::Stream 对象都保留在全局的 StreamMgr::stream2unique_stream_id_ 中。
    unique_stream_id2stream_symbol_ 可看作是援用类型的正本,unique_stream_id 就是 Stream 对象在这个数组中的索引。与 parallel_id 不同,unique_stream_id 是 Stream 对象在过程内的惟一标识。

    并不是每次都须要加锁拜访 StreamMgroneflow::Stream 蕴含的都是描述性信息,其援用是以 ThreadLocal 的形式存储的,能够晋升后续读取的效率。虚拟机在执行指令时,也会用 unique_stream_id 进行逻辑判断。

    4

    Eager 模式下的 Stream 同步——SoftSyncStream

    构想以下场景:将 CPU 下的 tensor 拷贝到 CUDA 设施,而后在 CUDA 上再进行 tensor add 的计算。这波及到两个流,一个是 Host2Device,一个是 CUDA Compute。这两个流的计算工作是并发执行的。须要有同步措施,能力保障拷贝完再执行 add 计算。

    Eager 模式下,在 InstructionsBuilder::Call 中结构指令时,对 SoftSyncStream 的调用会在必要时向指令列表插入同步指令。

    SoftSyncStream 中,几个重要概念:

  • tensor 在 oneflow 内存中的理论承载者是 eager_blob_object
  • last_used_stream 示意一个 tensor(blob)上一次应用到的 stream,可能是 compute stream、h2d stream、d2h stream、汇合通信 ccl stream 等
  • 如果 last_used_stream 与以后计算执行的流 stream 雷同,则能够疏忽,因为雷同 stream 间人造程序执行所以无需同步,否则就须要进行后续的同步解决

    SoftSyncStream 代码如下:

    Maybe<void> InstructionsBuilder::SoftSyncStream(const vm::EagerBlobObjectList& eager_blob_objects,                                                Symbol<Stream> stream) {JUST(ForEachEagerBlobObjectsNeedingSoftSync(      eager_blob_objects, stream,      [&](Symbol<Stream> last_used_stream, auto&& dep_objects) -> Maybe<void> {return SoftSyncStreamBetween(std::move(dep_objects), last_used_stream, stream);      }));  for (const auto& eager_blob_object : eager_blob_objects) {eager_blob_object->set_last_used_stream(stream);  }  return Maybe<void>::Ok();}

    主体逻辑是,会在 ForEachEagerBlobObjectsNeedingSoftSync 办法中遍历每一个 tensor 对象(eager blob object),对于每一个须要同步的 blob 使用 lambda 办法并最终调用 SoftSyncStreamBetween 实现 stream 间的同步。

    这里,咱们看一下 ForEachEagerBlobObjectsNeedingSoftSync 的逻辑:

    首先 if/else 的主体业务逻辑是相似的,次要区别在于,当 blob 的 size <= kOpArgsReservedSize 时(默认为 4)会应用 small vector 来寄存 LocalDepObject 变量,效率会更快一些(否则会走到 else 分支,主体逻辑相似,这里就不看了)。

  • const auto& opt_last_used_stream = eager_blob_object->last_used_stream()

    ;

  • if (unlikely(!opt_last_used_stream.has_value())) {continue;}

    这两句是查问该 tensor(blob)上一次被应用时用到的 stream——last_used_stream,如果为空,则间接 continue 跳过,因为如果此 tensor 之前并未被任何 stream 应用,则无需进行 stream 间的同步操作,因为在以后 stream 上不会有对于该 tensor 的其余依赖关系;

    if (last_used_stream != stream) {small_vector<intrusive::shared_ptr<LocalDepObject>, kOpArgsReservedSize> dep_objects{        intrusive::shared_ptr<LocalDepObject>(            JUST(eager_blob_object->compute_local_dep_object()))};    JUST(DoEach(last_used_stream, std::move(dep_objects)));  }

    如果 last_used_stream!=stream 则示意须要在两个 stream 间进行同步,则会利用传入的 lambda 函数 DoEach 进行解决,在这里 lambda 函数即:

    [&](Symbol<Stream> last_used_stream, auto&& dep_objects) -> Maybe<void> {return SoftSyncStreamBetween(std::move(dep_objects), last_used_stream, stream);  }));

    既理论调用的是 SoftSyncStreamBetween 来实现理论的 stream 间同步,这里次要有 3 个变量:

  • dep_objects存储了 tensor 间的依赖关系
  • last_used_stream则是该 tensor 上一次应用的 stream
  • stream该 tensor 以后应用的 stream

    SoftSyncStreamBetween 的代码如下:

    Maybe<void> InstructionsBuilder::SoftSyncStreamBetween(small_vector<intrusive::shared_ptr<LocalDepObject>, kOpArgsReservedSize>&& dependences,    Symbol<Stream> from_stream, Symbol<Stream> to_stream) {CHECK(from_stream != to_stream) << "synchronization is unnecessary";  if (SupportingStreamWait(from_stream, to_stream)) {JUST(StreamWait(std::move(dependences), from_stream, to_stream));  } else {JUST(RecordEvent(std::move(dependences), from_stream));  }  return Maybe<void>::Ok();}

SoftSyncStreamBetween的次要逻辑如下:

  • 先额定做了一次 check,检测如果待同步的两个 stream 雷同,则 check 会报错并提醒 ”synchronization is unnecessary”
  • 通过 SupportingStreamWait 判断 from 和 to stream 间是否反对 stream wait,是则调用 StreamWait 办法;否则,间接调用 RecordEvent 办法
  • SupportingStreamWait的次要逻辑是,通过 stream 的 device、以及 StreamType 的 Visit 办法来判断。简略来说,如果 from 和 to stream 之间是不同的 device(譬如 cpu stream <-> cuda stream 之间的同步),或者 from stream 的 device 为 cpu,则 SupportingStreamWait 肯定是 false;如果是雷同的,则持续通过其余判断条件进行判断。

SupportingStreamWait 为 True

SupportingStreamWait为 True 时,即 from to stream 同为 Cuda Stream 间的同步状况,在这种状况下会走到 StreamWait 的函数,该函数最终会派发一个 StreamWaitEventInstructionPolicy 的指令给 vm 执行,StreamWaitEventInstructionPolicy 的执行逻辑次要是两个 cuda event:

  • cudaEventRecord
  • cudaStreamWaitEvent
  • 对于 from_stream 来说,插入一个cudaEventRecord,用于标记 from stream 是否实现该 stream 上的 event 事件;
  • 对于 to_stream 来说,插入一个 cudaStreamWaitEvent 期待 from stream 上的事件实现后,再继续执行 to_stream。

SupportingStreamWait 为 False

SupportingStreamWait为 False 时,会间接调用JUST(RecordEvent(std::move(dependences), from_stream)); 其外部实现会从对象池中获取可复用的 cuda event 对象并执行 event。

这里有个细节,因为 cuda event 的创立和销毁都会引发 cuda kernel 的 launch 由异步转同步,所以基于对象池的 cuda event 能够防止这个开销。

实际上最终调用的还是 cudaEventRecordcudaEventRecord自身只是起到一个“占位符”的作用,并不能起到(保障该 stream 上其余 kernel 全副执行完)的作用,真正能保障 stream 同步作用的是 oneflow vm(vitual machine)管制下的指令间依赖关系 / 执行程序。

5

CPU 下的并行计算

CpuStream 只有一个线程。CPU kernel 应该是通过 OpenMP 或者 Intel OneApi 等实现并行计算减速。

参考资料
1.https://github.com/Oneflow-Inc/oneflow/tree/845595e2c0abc3d384ff047e188295afdc41faaa

欢送 Star、试用 OneFlow 最新版本:https://github.com/Oneflow-Inc/oneflow/

退出移动版