关于人工智能:从-PyTorch-DDP-到-Accelerate-到-Trainer轻松掌握分布式训练

41次阅读

共计 9588 个字符,预计需要花费 24 分钟才能阅读完成。

概述

本教程假设你曾经对于 PyToch 训练一个简略模型有肯定的根底了解。本教程将展现应用 3 种封装层级不同的办法调用 DDP (DistributedDataParallel) 过程,在多个 GPU 上训练同一个模型:

  • 应用 pytorch.distributed 模块的原生 PyTorch DDP 模块
  • 应用 🤗 Accelerate 对 pytorch.distributed 的轻量封装,确保程序能够在不批改代码或者大量批改代码的状况下在单个 GPU 或 TPU 下失常运行
  • 应用 🤗 Transformer 的高级 Trainer API,该 API 形象封装了所有代码模板并且反对不同设施和分布式场景。

什么是分布式训练,为什么它很重要?

上面是一些十分根底的 PyTorch 训练代码,它基于 Pytorch 官网在 MNIST 上创立和训练模型的示例。

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

class BasicNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)
        self.act = F.relu

    def forward(self, x):
        x = self.act(self.conv1(x))
        x = self.act(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.act(self.fc1(x))
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output

咱们定义训练设施 (cuda):

device = "cuda"

构建一些根本的 PyTorch DataLoaders:

transform = transforms.Compose([transforms.ToTensor(),
    transforms.Normalize((0.1307), (0.3081))
])

train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

把模型放入 CUDA 设施:

model = BasicNet().to(device)

构建 PyTorch optimizer (优化器)

optimizer = optim.AdamW(model.parameters(), lr=1e-3)

最终创立一个简略的训练和评估循环,训练循环会应用全副训练数据集进行训练,评估循环会计算训练后模型在测试数据集上的准确度:

model.train()
for batch_idx, (data, target) in enumerate(train_loader):
    data, target = data.to(device), target.to(device)
    output = model(data)
    loss = F.nll_loss(output, target)
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

model.eval()
correct = 0
with torch.no_grad():
    for data, target in test_loader:
        output = model(data)
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

通常从这里开始,就能够将所有的代码放入 Python 脚本或在 Jupyter Notebook 上运行它。

然而,只执行 python myscript.py 只会应用单个 GPU 运行脚本。如果有多个 GPU 资源可用,您将如何让这个脚本在两个 GPU 或多台机器上运行,通过分布式训练进步训练速度?这是 torch.distributed 发挥作用的中央。

PyTorch 分布式数据并行

顾名思义,torch.distributed 旨在配置分布式训练。你能够应用它配置多个节点进行训练,例如:多机器下的单个 GPU,或者单台机器下的多个 GPU,或者两者的任意组合。

为了将上述代码转换为分布式训练,必须首先定义一些设置配置,具体细节请参阅 DDP 应用教程

首先必须申明 setupcleanup 函数。这将创立一个过程组,并且所有计算过程都能够通过这个过程组通信。

留神:在本教程的这一部分中,假设这些代码是在 Python 脚本文件中启动。稍后将探讨应用 🤗 Accelerate 的启动器,就不用申明 setupcleanup 函数了

import os
import torch.distributed as dist

def setup(rank, world_size):
    "Sets up the process group and configuration for PyTorch Distributed Data Parallelism"
    os.environ["MASTER_ADDR"] = 'localhost'
    os.environ["MASTER_PORT"] = "12355"

    # Initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    "Cleans up the distributed environment"
    dist.destroy_process_group()

最初一个疑难是,我怎么把我的数据和模型发送到另一个 GPU 上?

这正是 DistributedDataParallel 模块发挥作用的中央,它将您的模型复制到每个 GPU 上,并且当 loss.backward() 被调用进行反向流传的时候,所有这些模型正本的梯度将被同步地均匀 / 降落 (reduce)。这确保每个设施在执行优化器步骤后具备雷同的权重。

上面是咱们的训练设置示例,咱们应用了 DistributedDataParallel 重构了训练函数:

留神:此处的 rank 是以后 GPU 与所有其余可用 GPU 相比的总体 rank,这意味着它们的 rank 为 0 -> n-1

from torch.nn.parallel import DistributedDataParallel as DDP

def train(model, rank, world_size):
    setup(rank, world_size)
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])
    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
    # Train for one epoch
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    cleanup()

在上述的代码中须要为每个正本设施上的模型 (因而在这里是 ddp_model 的参数而不是 model 的参数) 申明优化器,以便正确计算每个正本设施上的梯度。

最初,要运行脚本,PyTorch 有一个不便的 torchrun 命令行模块能够提供帮忙。只需传入它应该应用的节点数以及要运行的脚本即可:

torchrun --nproc_per_nodes=2 --nnodes=1 example_script.py

下面的代码能够在在一台机器上的两个 GPU 上运行训练脚本,这是应用 PyTorch 只进行分布式训练的状况 (不能够在单机单卡上运行)。

当初让咱们谈谈 🤗 Accelerate,一个旨在使并行化更加无缝并有助于一些最佳实际的库。

🤗 Accelerate

🤗 Accelerate 是一个库,旨在无需大幅批改代码的状况下实现并行化。除此之外,🤗 Accelerate 附带的数据 pipeline 还能够进步代码的性能。

首先,让咱们将刚刚执行的所有上述代码封装到一个函数中,以帮忙咱们直观地看到差别:

def train_ddp(rank, world_size):
    setup(rank, world_size)
    # Build DataLoaders
    transform = transforms.Compose([transforms.ToTensor(),
        transforms.Normalize((0.1307), (0.3081))
    ])

    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_dset = datasets.MNIST('data', train=False, transform=transform)

    train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
    test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

    # Build model
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    # Build optimizer
    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)

    # Train for a single epoch
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    
    # Evaluate
    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

接下来让咱们谈谈 🤗 Accelerate 如何便当地实现并行化的。下面的代码有几个问题:

  1. 该代码有点低效,因为每个设施都会创立一个 dataloader
  2. 这些代码只能运行在多 GPU 下,当想让这个代码运行在单个 GPU 或 TPU 时,还须要额定进行一些批改。

🤗 Accelerate 通过 Accelerator 类解决上述问题。通过它,不论是单节点还是多节点,除了三行代码外,其余代码简直放弃不变,如下所示:

def train_ddp_accelerate():
    accelerator = Accelerator()
    # Build DataLoaders
    transform = transforms.Compose([transforms.ToTensor(),
        transforms.Normalize((0.1307), (0.3081))
    ])

    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_dset = datasets.MNIST('data', train=False, transform=transform)

    train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
    test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

    # Build model
    model = BasicModel()

    # Build optimizer
    optimizer = optim.AdamW(model.parameters(), lr=1e-3)

    # Send everything through `accelerator.prepare`
    train_loader, test_loader, model, optimizer = accelerator.prepare(train_loader, test_loader, model, optimizer)

    # Train for a single epoch
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        output = model(data)
        loss = F.nll_loss(output, target)
        accelerator.backward(loss)
        optimizer.step()
        optimizer.zero_grad()
    
    # Evaluate
    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

借助 Accelerator 对象,您的 PyTorch 训练循环当初已配置为能够在任何分布式状况运行。应用 Accelerator 革新后的代码依然能够通过 torchrun CLI 或通过 🤗 Accelerate 本人的 CLI 界面启动(启动你的🤗 Accelerate 脚本)。

因而,当初能够尽可能放弃 PyTorch 原生代码不变的前提下,应用 🤗 Accelerate 执行分布式训练。

早些时候有人提到 🤗 Accelerate 还能够使 DataLoaders 更高效。这是通过自定义采样器实现的,它能够在训练期间主动将局部批次发送到不同的设施,从而容许每个设施只须要贮存数据的一部分,而不是一次将数据复制四份存入内存,具体取决于配置。因而,内存总量中只有原始数据集的一个残缺正本。该数据集会拆分后调配到各个训练节点上,从而容许在单个实例上训练更大的数据集,而不会使内存爆炸

应用 notebook_launcher

之前提到您能够间接从 Jupyter Notebook 运行分布式代码。这来自 🤗 Accelerate 的 notebook_launcher 模块,它能够在 Jupyter Notebook 外部的代码启动多 GPU 训练。

应用它就像导入 launcher 一样简略:

from accelerate import notebook_launcher

接着传递咱们之前申明的训练函数、要传递的任何参数以及要应用的过程数(例如 TPU 上的 8 个,或两个 GPU 上的 2 个)。上面两个训练函数都能够运行,但请留神,启动单次启动后,实例须要重新启动能力产生另一个:

notebook_launcher(train_ddp, args=(), num_processes=2)

或者:

notebook_launcher(train_accelerate_ddp, args=(), num_processes=2)

应用 🤗 Trainer

终于咱们来到了最高级的 API——Hugging Face Trainer.

它涵盖了尽可能多的训练类型,同时依然可能在分布式系统上进行训练,用户基本不须要做任何事件。

首先咱们须要导入 🤗 Trainer:

from transformers import Trainer

而后咱们定义一些 TrainingArguments 来管制所有罕用的超参数。🤗 Trainer 须要的训练数据是字典类型的,因而须要制作自定义整顿性能。

最初,咱们将训练器子类化并编写咱们本人的 compute_loss.

之后,这段代码也能够分布式运行,而无需批改任何训练代码!

from transformers import Trainer, TrainingArguments

model = BasicNet()

training_args = TrainingArguments(
    "basic-trainer",
    per_device_train_batch_size=64,
    per_device_eval_batch_size=64,
    num_train_epochs=1,
    evaluation_strategy="epoch",
    remove_unused_columns=False
)

def collate_fn(examples):
    pixel_values = torch.stack([example[0] for example in examples])
    labels = torch.tensor([example[1] for example in examples])
    return {"x":pixel_values, "labels":labels}

class MyTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        outputs = model(inputs["x"])
        target = inputs["labels"]
        loss = F.nll_loss(outputs, target)
        return (loss, outputs) if return_outputs else loss

trainer = MyTrainer(
    model,
    training_args,
    train_dataset=train_dset,
    eval_dataset=test_dset,
    data_collator=collate_fn,
)
trainer.train()
***** Running training *****
  Num examples = 60000
  Num Epochs = 1
  Instantaneous batch size per device = 64
  Total train batch size (w. parallel, distributed & accumulation) = 64
  Gradient Accumulation steps = 1
  Total optimization steps = 938
Epoch 训练损失 验证损失
1 0.875700 0.282633

与下面的 notebook_launcher 示例相似,也能够将这个过程封装成一个训练函数:

def train_trainer_ddp():
    model = BasicNet()

    training_args = TrainingArguments(
        "basic-trainer",
        per_device_train_batch_size=64,
        per_device_eval_batch_size=64,
        num_train_epochs=1,
        evaluation_strategy="epoch",
        remove_unused_columns=False
    )

    def collate_fn(examples):
        pixel_values = torch.stack([example[0] for example in examples])
        labels = torch.tensor([example[1] for example in examples])
        return {"x":pixel_values, "labels":labels}

    class MyTrainer(Trainer):
        def compute_loss(self, model, inputs, return_outputs=False):
            outputs = model(inputs["x"])
            target = inputs["labels"]
            loss = F.nll_loss(outputs, target)
            return (loss, outputs) if return_outputs else loss

    trainer = MyTrainer(
        model,
        training_args,
        train_dataset=train_dset,
        eval_dataset=test_dset,
        data_collator=collate_fn,
    )

    trainer.train()

notebook_launcher(train_trainer_ddp, args=(), num_processes=2)

相干资源

  • 要理解无关 PyTorch 分布式数据并行性的更多信息,请查看:
    <url>https://pytorch.org/docs/stab…</url>
  • 要理解无关 🤗 Accelerate 的更多信息,请查看:
    <url>https://hf.co/docs/accelerate</url>
  • 要理解无关 🤗 Transformer 的更多信息,请查看:
    <url>https://hf.co/docs/transformers</url>

原文作者:Zachary Mueller

译者:innovation64 (李洋)

审校:yaoqi (胡耀淇)

排版:zhongdongy (阿东)

正文完
 0