概述

本教程假设你曾经对于 PyToch 训练一个简略模型有肯定的根底了解。本教程将展现应用 3 种封装层级不同的办法调用 DDP (DistributedDataParallel) 过程,在多个 GPU 上训练同一个模型:

  • 应用 pytorch.distributed 模块的原生 PyTorch DDP 模块
  • 应用 Accelerate 对 pytorch.distributed 的轻量封装,确保程序能够在不批改代码或者大量批改代码的状况下在单个 GPU 或 TPU 下失常运行
  • 应用 Transformer 的高级 Trainer API ,该 API 形象封装了所有代码模板并且反对不同设施和分布式场景。

什么是分布式训练,为什么它很重要?

上面是一些十分根底的 PyTorch 训练代码,它基于 Pytorch 官网在 MNIST 上创立和训练模型的示例。

import torchimport torch.nn as nnimport torch.nn.functional as Fimport torch.optim as optimfrom torchvision import datasets, transformsclass BasicNet(nn.Module):    def __init__(self):        super().__init__()        self.conv1 = nn.Conv2d(1, 32, 3, 1)        self.conv2 = nn.Conv2d(32, 64, 3, 1)        self.dropout1 = nn.Dropout(0.25)        self.dropout2 = nn.Dropout(0.5)        self.fc1 = nn.Linear(9216, 128)        self.fc2 = nn.Linear(128, 10)        self.act = F.relu    def forward(self, x):        x = self.act(self.conv1(x))        x = self.act(self.conv2(x))        x = F.max_pool2d(x, 2)        x = self.dropout1(x)        x = torch.flatten(x, 1)        x = self.act(self.fc1(x))        x = self.dropout2(x)        x = self.fc2(x)        output = F.log_softmax(x, dim=1)        return output

咱们定义训练设施 (cuda):

device = "cuda"

构建一些根本的 PyTorch DataLoaders:

transform = transforms.Compose([    transforms.ToTensor(),    transforms.Normalize((0.1307), (0.3081))])train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)test_dset = datasets.MNIST('data', train=False, transform=transform)train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

把模型放入 CUDA 设施:

model = BasicNet().to(device)

构建 PyTorch optimizer (优化器)

optimizer = optim.AdamW(model.parameters(), lr=1e-3)

最终创立一个简略的训练和评估循环,训练循环会应用全副训练数据集进行训练,评估循环会计算训练后模型在测试数据集上的准确度:

model.train()for batch_idx, (data, target) in enumerate(train_loader):    data, target = data.to(device), target.to(device)    output = model(data)    loss = F.nll_loss(output, target)    loss.backward()    optimizer.step()    optimizer.zero_grad()model.eval()correct = 0with torch.no_grad():    for data, target in test_loader:        output = model(data)        pred = output.argmax(dim=1, keepdim=True)        correct += pred.eq(target.view_as(pred)).sum().item()print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

通常从这里开始,就能够将所有的代码放入 Python 脚本或在 Jupyter Notebook 上运行它。

然而,只执行 python myscript.py 只会应用单个 GPU 运行脚本。如果有多个 GPU 资源可用,您将如何让这个脚本在两个 GPU 或多台机器上运行,通过分布式训练进步训练速度?这是 torch.distributed 发挥作用的中央。

PyTorch 分布式数据并行

顾名思义,torch.distributed 旨在配置分布式训练。你能够应用它配置多个节点进行训练,例如:多机器下的单个 GPU,或者单台机器下的多个 GPU,或者两者的任意组合。

为了将上述代码转换为分布式训练,必须首先定义一些设置配置,具体细节请参阅 DDP 应用教程

首先必须申明 setupcleanup 函数。这将创立一个过程组,并且所有计算过程都能够通过这个过程组通信。

留神:在本教程的这一部分中,假设这些代码是在 Python 脚本文件中启动。稍后将探讨应用 Accelerate 的启动器,就不用申明 setupcleanup 函数了
import osimport torch.distributed as distdef setup(rank, world_size):    "Sets up the process group and configuration for PyTorch Distributed Data Parallelism"    os.environ["MASTER_ADDR"] = 'localhost'    os.environ["MASTER_PORT"] = "12355"    # Initialize the process group    dist.init_process_group("gloo", rank=rank, world_size=world_size)def cleanup():    "Cleans up the distributed environment"    dist.destroy_process_group()

最初一个疑难是,我怎么把我的数据和模型发送到另一个 GPU 上?

这正是 DistributedDataParallel 模块发挥作用的中央, 它将您的模型复制到每个 GPU 上 ,并且当 loss.backward() 被调用进行反向流传的时候,所有这些模型正本的梯度将被同步地均匀/降落 (reduce)。这确保每个设施在执行优化器步骤后具备雷同的权重。

上面是咱们的训练设置示例,咱们应用了 DistributedDataParallel 重构了训练函数:

留神:此处的 rank 是以后 GPU 与所有其余可用 GPU 相比的总体 rank,这意味着它们的 rank 为 0 -> n-1
from torch.nn.parallel import DistributedDataParallel as DDPdef train(model, rank, world_size):    setup(rank, world_size)    model = model.to(rank)    ddp_model = DDP(model, device_ids=[rank])    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)    # Train for one epoch    model.train()    for batch_idx, (data, target) in enumerate(train_loader):        data, target = data.to(device), target.to(device)        output = model(data)        loss = F.nll_loss(output, target)        loss.backward()        optimizer.step()        optimizer.zero_grad()    cleanup()

在上述的代码中须要为每个正本设施上的模型 (因而在这里是ddp_model的参数而不是 model 的参数) 申明优化器,以便正确计算每个正本设施上的梯度。

最初,要运行脚本,PyTorch 有一个不便的 torchrun 命令行模块能够提供帮忙。只需传入它应该应用的节点数以及要运行的脚本即可:

torchrun --nproc_per_nodes=2 --nnodes=1 example_script.py

下面的代码能够在在一台机器上的两个 GPU 上运行训练脚本,这是应用 PyTorch 只进行分布式训练的状况 (不能够在单机单卡上运行)。

当初让咱们谈谈 Accelerate,一个旨在使并行化更加无缝并有助于一些最佳实际的库。

Accelerate

Accelerate 是一个库,旨在无需大幅批改代码的状况下实现并行化。除此之外, Accelerate 附带的数据 pipeline 还能够进步代码的性能。

首先,让咱们将刚刚执行的所有上述代码封装到一个函数中,以帮忙咱们直观地看到差别:

def train_ddp(rank, world_size):    setup(rank, world_size)    # Build DataLoaders    transform = transforms.Compose([        transforms.ToTensor(),        transforms.Normalize((0.1307), (0.3081))    ])    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)    test_dset = datasets.MNIST('data', train=False, transform=transform)    train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)    test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)    # Build model    model = model.to(rank)    ddp_model = DDP(model, device_ids=[rank])    # Build optimizer    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)    # Train for a single epoch    model.train()    for batch_idx, (data, target) in enumerate(train_loader):        data, target = data.to(device), target.to(device)        output = model(data)        loss = F.nll_loss(output, target)        loss.backward()        optimizer.step()        optimizer.zero_grad()        # Evaluate    model.eval()    correct = 0    with torch.no_grad():        for data, target in test_loader:            data, target = data.to(device), target.to(device)            output = model(data)            pred = output.argmax(dim=1, keepdim=True)            correct += pred.eq(target.view_as(pred)).sum().item()    print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

接下来让咱们谈谈 Accelerate 如何便当地实现并行化的。下面的代码有几个问题:

  1. 该代码有点低效,因为每个设施都会创立一个 dataloader
  2. 这些代码只能运行在多 GPU 下,当想让这个代码运行在单个 GPU 或 TPU 时,还须要额定进行一些批改。

Accelerate 通过 Accelerator 类解决上述问题。通过它,不论是单节点还是多节点,除了三行代码外,其余代码简直放弃不变,如下所示:

def train_ddp_accelerate():    accelerator = Accelerator()    # Build DataLoaders    transform = transforms.Compose([        transforms.ToTensor(),        transforms.Normalize((0.1307), (0.3081))    ])    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)    test_dset = datasets.MNIST('data', train=False, transform=transform)    train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)    test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)    # Build model    model = BasicModel()    # Build optimizer    optimizer = optim.AdamW(model.parameters(), lr=1e-3)    # Send everything through `accelerator.prepare`    train_loader, test_loader, model, optimizer = accelerator.prepare(        train_loader, test_loader, model, optimizer    )    # Train for a single epoch    model.train()    for batch_idx, (data, target) in enumerate(train_loader):        output = model(data)        loss = F.nll_loss(output, target)        accelerator.backward(loss)        optimizer.step()        optimizer.zero_grad()        # Evaluate    model.eval()    correct = 0    with torch.no_grad():        for data, target in test_loader:            data, target = data.to(device), target.to(device)            output = model(data)            pred = output.argmax(dim=1, keepdim=True)            correct += pred.eq(target.view_as(pred)).sum().item()    print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

借助 Accelerator 对象,您的 PyTorch 训练循环当初已配置为能够在任何分布式状况运行。应用 Accelerator 革新后的代码依然能够通过 torchrun CLI 或通过 Accelerate 本人的 CLI 界面启动(启动你的 Accelerate 脚本)。

因而,当初能够尽可能放弃 PyTorch 原生代码不变的前提下,应用 Accelerate 执行分布式训练。

早些时候有人提到 Accelerate 还能够使 DataLoaders 更高效。这是通过自定义采样器实现的,它能够在训练期间主动将局部批次发送到不同的设施,从而容许每个设施只须要贮存数据的一部分,而不是一次将数据复制四份存入内存,具体取决于配置。因而,内存总量中只有原始数据集的一个残缺正本。该数据集会拆分后调配到各个训练节点上,从而容许在单个实例上训练更大的数据集,而不会使内存爆炸

应用 notebook_launcher

之前提到您能够间接从 Jupyter Notebook 运行分布式代码。这来自 Accelerate 的 notebook_launcher 模块,它能够在 Jupyter Notebook 外部的代码启动多 GPU 训练。

应用它就像导入 launcher 一样简略:

from accelerate import notebook_launcher

接着传递咱们之前申明的训练函数、要传递的任何参数以及要应用的过程数(例如 TPU 上的 8 个,或两个 GPU 上的 2 个)。上面两个训练函数都能够运行,但请留神,启动单次启动后,实例须要重新启动能力产生另一个:

notebook_launcher(train_ddp, args=(), num_processes=2)

或者:

notebook_launcher(train_accelerate_ddp, args=(), num_processes=2)

应用 Trainer

终于咱们来到了最高级的 API——Hugging Face Trainer.

它涵盖了尽可能多的训练类型,同时依然可能在分布式系统上进行训练,用户基本不须要做任何事件。

首先咱们须要导入 Trainer:

from transformers import Trainer

而后咱们定义一些 TrainingArguments 来管制所有罕用的超参数。 Trainer 须要的训练数据是字典类型的,因而须要制作自定义整顿性能。

最初,咱们将训练器子类化并编写咱们本人的 compute_loss.

之后,这段代码也能够分布式运行,而无需批改任何训练代码!

from transformers import Trainer, TrainingArgumentsmodel = BasicNet()training_args = TrainingArguments(    "basic-trainer",    per_device_train_batch_size=64,    per_device_eval_batch_size=64,    num_train_epochs=1,    evaluation_strategy="epoch",    remove_unused_columns=False)def collate_fn(examples):    pixel_values = torch.stack([example[0] for example in examples])    labels = torch.tensor([example[1] for example in examples])    return {"x":pixel_values, "labels":labels}class MyTrainer(Trainer):    def compute_loss(self, model, inputs, return_outputs=False):        outputs = model(inputs["x"])        target = inputs["labels"]        loss = F.nll_loss(outputs, target)        return (loss, outputs) if return_outputs else losstrainer = MyTrainer(    model,    training_args,    train_dataset=train_dset,    eval_dataset=test_dset,    data_collator=collate_fn,)
trainer.train()
***** Running training *****  Num examples = 60000  Num Epochs = 1  Instantaneous batch size per device = 64  Total train batch size (w. parallel, distributed & accumulation) = 64  Gradient Accumulation steps = 1  Total optimization steps = 938
Epoch训练损失验证损失
10.8757000.282633

与下面的 notebook_launcher 示例相似,也能够将这个过程封装成一个训练函数:

def train_trainer_ddp():    model = BasicNet()    training_args = TrainingArguments(        "basic-trainer",        per_device_train_batch_size=64,        per_device_eval_batch_size=64,        num_train_epochs=1,        evaluation_strategy="epoch",        remove_unused_columns=False    )    def collate_fn(examples):        pixel_values = torch.stack([example[0] for example in examples])        labels = torch.tensor([example[1] for example in examples])        return {"x":pixel_values, "labels":labels}    class MyTrainer(Trainer):        def compute_loss(self, model, inputs, return_outputs=False):            outputs = model(inputs["x"])            target = inputs["labels"]            loss = F.nll_loss(outputs, target)            return (loss, outputs) if return_outputs else loss    trainer = MyTrainer(        model,        training_args,        train_dataset=train_dset,        eval_dataset=test_dset,        data_collator=collate_fn,    )    trainer.train()notebook_launcher(train_trainer_ddp, args=(), num_processes=2)

相干资源

  • 要理解无关 PyTorch 分布式数据并行性的更多信息,请查看:
    <url>https://pytorch.org/docs/stab...</url>
  • 要理解无关 Accelerate 的更多信息,请查看:
    <url>https://hf.co/docs/accelerate</url>
  • 要理解无关 Transformer 的更多信息,请查看:
    <url>https://hf.co/docs/transformers</url>


原文作者:Zachary Mueller

译者:innovation64 (李洋)

审校:yaoqi (胡耀淇)

排版:zhongdongy (阿东)