撰文 | 李响

1

大规模模型分片存储简介

在模型比拟小时(如 100G 以下),还有可能采纳单机存储。当模型参数量比拟大时,要求的样本数也更大,训练后做 dump 进去的模型也会很大,单机必定放不下。

比方,由 DeepSpeed 和 Megatron 驱动的 Megatron 图灵自然语言生成模型(MT-NLG)具备 5300 亿个参数,是迄今为止训练过的最大和最弱小的单片 Transformer 语言模型,反对这样的大规模语言模型须要分片保留和加载,不会应用单机内存。此外,在其余 CV、搜寻、举荐和广告类等场景下,读取样本量增多和模型复杂度减少都会带来模型存储上的难题。

本文将介绍 OneFlow 的大模型分片保留、加载策略以及应用办法。

2

OneFlow 模型分片保留和加载

OneFlow 的大模型分片保留和加载的实现基于全局视角(Global View,https://docs.oneflow.org/mast...)的概念,既利用 Placement 与 SBP 实现模型文件(下文都用 state dict 示意)在各个物理设施上的切分,实用于当模型大到无奈在单个设施的内存或显存上包容下的场景。

flow.utils.global_view.to_global() 接口介绍

为了更好了解下文保留模型和加载模型两个局部的内容,首先对 flow.utils.global_view.to_global() 接口和其实现思路进行剖析。

区别于现有的 Tensor.to_global() 模式(能够解决一般的 Tensor,https://oneflow.readthedocs.i...),提供了多种类型的输出反对,包含 None、Tensor、List、Tuple、nn.Module 的 state dict 、nn.Graph 的 state dict 和几种类型的任意组合,既将 List/Tuple/Dict 中的输出 Tensor 转换为 Global Tensor。值得注意的是,其传入参数中的 SBP 反对用户自定义一个 (x, tensor) -> sbp 的函数来解决不同 Tensor 对应不同 SBP 的需要。

并且,与 to_global() 对应的还有 flow.utils.global_view.to_local() 接口。能够参考 API 文档中对于 to_global() 和 to_local() 更具体的介绍(https://oneflow.readthedocs.i...)。在 flow.utils.global_view.to_global() 的实现(https://github.com/Oneflow-In...)中,反对了多种输出类型实用于现有的 Tensor.to_global() 接口。实现的整体思路大抵为查看输出、播送(空)构造,遍历节点、调用回调函数和返回 to_global() 后的后果。

再回到咱们关注的中央,这个接口如何做到模型分片保留和加载?

比方对于模型并行/流水并行,模型的参数扩散在多个 Rank 上,在保留模型前通过 flow.utils.global_view.to_global() 将 state dict 里的每个 Tensor 在指定 Placement 上转为 Global Tensor,SBP 的类型为 flow.sbp.split,能够设置在特定维度上的切分。同样的,模型也能够按 Split 被加载。当然,SBP 也能够为 Broadcast,反对不同的 SBP 和 Placement 组合。这样,超大规模模型分片存储的问题就被十分好地解决了。

保留模型

大抵理解 flow.utils.global_view.to_global() 接口后,在这一部分演示了如何分片保留模型,代码如下:

# 自定义 get_sbp 函数。def get_sbp(state_dict, tensor):    if tensor is state_dict["System-Train-TrainStep"]:        return flow.sbp.broadcast    if tensor is state_dict["module_pipeline"]["m_stage3.linear.weight"]:        return flow.sbp.split(1)    if tensor is state_dict["module_pipeline"]["m_stage3.linear.bias"]:        return flow.sbp.broadcast    return flow.sbp.split(0)model_file_state_dict = flow.utils.global_view.to_global(    state_dict, placement=model_file_placement, sbp=get_sbp,     ) # 应用 sbp=get_sbp 解决非凡的键,也反对指定一般的 SBP。rank_id = flow.env.get_rank()# 保留模型分片的门路,一个 rank 对应一个门路。state_dict_dir = "./graph_save_load_global_" + str(rank_id)if flow.env.get_rank() in model_file_placement.ranks:    flow.save(        flow.utils.global_view.to_local(model_file_state_dict),        state_dict_dir,    )

首先,将原模型(state_dict)转化到模型文件的 Placement 和 SBP 上,model_file_placement 为要分片保留模型的设施阵列,也就是将 state dict 按 split(0) 分片到 model_file_placement 上。

这里之所以自定义 get_sbp 函数,是因为用户能够传进来一个 (x, tensor) -> sbp 的函数来解决非凡 Tensor 对应不同 SBP 的需要。

举个例子(以后例子基于 Graph 模式),对于 state_dict["System-Train-TrainStep"] 这种 shape 为 [1] 的 Tensor,咱们就不能按 split(0) 分片了,SBP 能够选用 broadcast。而 `
state_dict"module_pipeline"
只能在第 1 维度切分,对于 state_dict"module_pipeline"` 这种不可切分的小 Tensor(s),SBP 能够选用 broadcast。这样反对用户 DIY SBP 的解决,更加灵便。

在前面的解决中,应用 flow.utils.global_view.to_local() 接口失去 model_file_state_dict 的本地重量,并调用 save() 保留模型。其中,state_dict_dir 是带有设施 id 的目录,须要辨别不同设施,举荐一个 rank 对应一个门路,路径名用 rank id 的形式。

加载模型

在指定设施上分片保留模型后,加载模型的代码如下:

if cur_rank in model_file_placement.ranks:    local_state_dict = flow.load(state_dict_dir)else:    local_state_dict = Noneglobal_state_dict = flow.utils.global_view.to_global(     local_state_dict, placement=model_file_placement, sbp=get_sbp,)graph_model.load_state_dict(global_state_dict)

首先,用 load() 办法在每个保留切片的设施上加载 state dict。对应的,须要把 local rank 上的 state dict 转换到模型文件的 placement 和 sbp 上,失去了 global_state_dict。这一步和保留模型应该是对应的,SBP 和 Placement 也是统一的。

最初,global_state_dict 能够胜利加载到 graph_model(nn.Graph) 中。当然,nn.Module 和 nn.Graph 解决办法是统一的。

将 state dict 加载到 nn.Module 中

除了以上两个特色外,在将 state dict 加载到 nn.Module 时,OneFlow 提供了 SBP 和 Placement 的主动转换。

在上面的例子中,首先结构一个 m(nn.Module)对象,再将 global_state_dict 的 SBP 设置为 split(0),而 m 的 SBP 为 broadcast。同时 placement 也放生了变动,从 placement("cpu", ranks=[0, 1])flow.placement("cpu", ranks=[0])。这时用户不须要其余操作,OneFlow 会主动做 SBP 和 placement 的转换过程。

import oneflow as flowm = flow.nn.Linear(2,6)model_file_placement = flow.placement("cpu", ranks=[0, 1])state_dict = {"weight":flow.ones(3,2), "bias":flow.zeros(3)}global_state_dict = flow.utils.global_view.to_global(    state_dict, placement=model_file_placement, sbp=flow.sbp.split(0),)m.to_global(placement=flow.placement("cpu", ranks=[0]), sbp=flow.sbp.broadcast)m.load_state_dict(global_state_dict)print(m.state_dict())

应用 2 卡运行下面的代码,能够看到,咱们本人结构的字典中的全局张量,曾经被加载到 m Module 中。此外,输入 OrderedDict 的 tensor 的 SBP 曾经从 split(0) 主动转换为 broadcast,'weight' 对应 tensor 的形态也是咱们期待的 [6, 2],'bias' 形态为 [6]。

OrderedDict([('weight', tensor([[1., 1.],        [1., 1.],        [1., 1.],        [1., 1.],        [1., 1.],        [1., 1.]], placement=oneflow.placement(type="cpu", ranks=[0]), sbp=(oneflow.sbp.broadcast,), dtype=oneflow.float32,       requires_grad=True)), ('bias', tensor([0., 0., 0., 0., 0., 0.], placement=oneflow.placement(type="cpu", ranks=[0]), sbp=(oneflow.sbp.broadcast,),       dtype=oneflow.float32, requires_grad=True))])

3

一个残缺示例

下面演示了如何分片保留和加载模型。在这一部分,提供一份残缺的代码参考,上面的例子为 4 个 ranks 上的流水并行,模仿了模型分片保留和加载的过程。

import osimport numpy as npimport oneflow as flowmodel_tensor_placement = flow.placement("cuda", ranks=[0, 1, 2, 3])# model_file_placement 为存储模型分片的设施的 placement,示意在 Rank 2 和 Rank 3 上可为 None。model_file_placement = flow.placement("cpu", ranks=[0, 1])P0 = flow.placement(model_tensor_placement.type, ranks=[0])P1 = flow.placement(model_tensor_placement.type, ranks=[1])P2 = flow.placement(model_tensor_placement.type, ranks=[2])P3 = flow.placement(model_tensor_placement.type, ranks=[3])def get_sbp(state_dict, tensor):    if tensor is state_dict["System-Train-TrainStep"]:        return flow.sbp.broadcast    if tensor is state_dict["module_pipeline"]["m_stage3.linear.weight"]:        return flow.sbp.split(1)    if tensor is state_dict["module_pipeline"]["m_stage3.linear.bias"]:        return flow.sbp.broadcast    return flow.sbp.split(0)class Stage0Module(flow.nn.Module):    def __init__(self):        super().__init__()        self.linear = flow.nn.Linear(16, 8)        self.relu = flow.nn.ReLU()    def forward(self, x):        return self.relu(self.linear(x))class Stage1Module(flow.nn.Module):    def __init__(self):        super().__init__()        self.linear = flow.nn.Linear(8, 4)        self.relu = flow.nn.ReLU()    def forward(self, x):        return self.relu(self.linear(x))class Stage2Module(flow.nn.Module):    def __init__(self):        super().__init__()        self.linear = flow.nn.Linear(4, 2)        self.relu = flow.nn.ReLU()    def forward(self, x):        return self.relu(self.linear(x))class Stage3Module(flow.nn.Module):    def __init__(self):        super().__init__()        self.linear = flow.nn.Linear(2, 1)    def forward(self, x):        return self.linear(x)# 模仿 4 个 ranks 上的流水并行class PipelineModule(flow.nn.Module):    def __init__(self):        super().__init__()        self.m_stage0 = Stage0Module()        self.m_stage1 = Stage1Module()        self.m_stage2 = Stage2Module()        self.m_stage3 = Stage3Module()        self.m_stage0.to_global(placement=P0, sbp=flow.sbp.broadcast)        self.m_stage1.to_global(placement=P1, sbp=flow.sbp.broadcast)        self.m_stage2.to_global(placement=P2, sbp=flow.sbp.broadcast)        self.m_stage3.to_global(placement=P3, sbp=flow.sbp.broadcast)    def forward(self, x):        out_stage0 = self.m_stage0(x)        in_stage1 = out_stage0.to_global(placement=P1, sbp=flow.sbp.broadcast)        out_stage1 = self.m_stage1(in_stage1)        in_stage2 = out_stage1.to_global(placement=P2, sbp=flow.sbp.broadcast)        out_stage2 = self.m_stage2(in_stage2)        in_stage3 = out_stage2.to_global(placement=P3, sbp=flow.sbp.broadcast)        out_stage3 = self.m_stage3(in_stage3)        return out_stage3class PipelineGraph(flow.nn.Graph):    def __init__(self, module_pipeline):        super().__init__()        self.module_pipeline = module_pipeline        self.module_pipeline.m_stage0.config.set_stage(0, P0)        self.module_pipeline.m_stage1.config.set_stage(1, P1)        self.module_pipeline.m_stage2.config.set_stage(2, P2)        self.module_pipeline.m_stage3.config.set_stage(3, P3)        self.config.set_gradient_accumulation_steps(2)        self.add_optimizer(            flow.optim.SGD(self.module_pipeline.parameters(), lr=0.001)        )    def build(self, x):        out = self.module_pipeline(x)        out = out.sum()        out.backward()        return outdef train_with_graph(call_cnt=0, state_dict_dir=None, last_state_dict=None):    # 形态为 [2, 16] 的固定输出张量    x = flow.tensor(        [            [                0.4286,                0.7402,                0.4161,                0.6103,                0.7394,                1.1330,                -0.2311,                -0.1013,                0.8537,                0.9757,                -0.9842,                0.3839,                -0.5551,                -0.8832,                0.7820,                0.7421,            ],            [                -0.1581,                -1.0319,                1.8430,                0.3576,                0.7288,                -0.6912,                0.9966,                1.0840,                -1.1760,                1.5683,                -0.2098,                -1.6439,                -2.7049,                0.1949,                1.6377,                0.0745,            ],        ],        dtype=flow.float32,        placement=P0,        sbp=flow.sbp.broadcast,    )    module_pipeline = PipelineModule()    graph_model = PipelineGraph(module_pipeline)    cur_rank = flow.env.get_rank()    if call_cnt == 1:        if cur_rank in model_file_placement.ranks:            local_state_dict = flow.load(state_dict_dir)        else:            local_state_dict = None        # 应用 sbp=get_sbp 解决非凡的键        global_state_dict = flow.utils.global_view.to_global(            local_state_dict, placement=model_file_placement, sbp=get_sbp,        )        graph_model.load_state_dict(global_state_dict)    graph_model(x)    state_dict = graph_model.state_dict()    if call_cnt == 0:        model_file_state_dict = flow.utils.global_view.to_global(            state_dict, placement=model_file_placement, sbp=get_sbp,        )        if flow.env.get_rank() in model_file_placement.ranks:            flow.save(                flow.utils.global_view.to_local(model_file_state_dict),                state_dict_dir,            )if __name__=="__main__":    rank_id = flow.env.get_rank()    # 保留门路,一个 rank 对应一个门路。    state_dict_dir = "./graph_save_load_global_" + str(rank_id)    # 保留模型    train_with_graph(0, state_dict_dir)    # 加载模型    train_with_graph(1, state_dict_dir)

4

结语

本文从简略介绍大规模模型分片存储开始,最终演示了 OneFlow 的如何做模型分片保留和加载的过程,后续 OneFlow 的大模型分片存储的接口还会不断完善。

欢送下载体验 OneFlow v0.8.0 最新版本:https://github.com/Oneflow-In...