关于人工智能:PyTorch-并行训练-DistributedDataParallel完整代码示例

1次阅读

共计 6870 个字符,预计需要花费 18 分钟才能阅读完成。

应用大型数据集训练大型深度神经网络 (DNN) 的问题是深度学习畛域的次要挑战。随着 DNN 和数据集规模的减少,训练这些模型的计算和内存需要也会减少。这使得在计算资源无限的单台机器上训练这些模型变得艰难甚至不可能。应用大型数据集训练大型 DNN 的一些次要挑战包含:

  • 训练工夫长:训练过程可能须要数周甚至数月能力实现,具体取决于模型的复杂性和数据集的大小。
  • 内存限度:大型 DNN 可能须要大量内存来存储训练期间的所有模型参数、梯度和两头激活。这可能会导致内存不足谬误并限度可在单台机器上训练的模型的大小。

为了应答这些挑战,曾经开发了各种技术来扩充具备大型数据集的大型 DNN 的训练,包含模型并行性、数据并行性和混合并行性,以及硬件、软件和算法的优化。

在本文中咱们将演示应用 PyTorch 的数据并行性和模型并行性。

咱们所说的并行性个别是指在多个 gpu,或多台机器上训练深度神经网络(dnn),以实现更少的训练工夫。数据并行背地的根本思维是将训练数据分成更小的块,让每个 GPU 或机器解决一个独自的数据块。而后将每个节点的后果组合起来,用于更新模型参数。在数据并行中,模型体系结构在每个节点上是雷同的,但模型参数在节点之间进行了分区。每个节点应用调配的数据块训练本人的本地模型,在每次训练迭代完结时,模型参数在所有节点之间同步。这个过程一直反复,直到模型收敛到一个令人满意的后果。

上面咱们用用 ResNet50 和 CIFAR10 数据集来进行残缺的代码示例:

在数据并行中,模型架构在每个节点上放弃雷同,但模型参数在节点之间进行了分区,每个节点应用调配的数据块训练本人的本地模型。

PyTorch 的 DistributedDataParallel 库能够进行跨节点的梯度和模型参数的高效通信和同步,实现分布式训练。本文提供了如何应用 ResNet50 和 CIFAR10 数据集应用 PyTorch 实现数据并行的示例,其中代码在多个 gpu 或机器上运行,每台机器解决训练数据的一个子集。训练过程应用 PyTorch 的 DistributedDataParallel 库进行并行化。

导入必须要的库

 importos
 fromdatetimeimportdatetime
 fromtimeimporttime
 importargparse
 importtorchvision
 importtorchvision.transformsastransforms
 importtorch
 importtorch.nnasnn
 importtorch.distributedasdist
 fromtorch.nn.parallelimportDistributedDataParallel

接下来,咱们将查看 GPU

 importsubprocess
 result=subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE)
 print(result.stdout.decode())

因为咱们须要在多个服务器上运行,所以手动一个一个执行并不事实,所以须要有一个调度程序。这里咱们应用 SLURM 文件来运行代码(slurm面向 Linux 和 Unix 相似内核的收费和开源工作调度程序),

 defmain():
         
     # get distributed configuration from Slurm environment
     
     parser=argparse.ArgumentParser()
     parser.add_argument('-b', '--batch-size', default=128, type=int,
                         help='batch size. it will be divided in mini-batch for each worker')
     parser.add_argument('-e','--epochs', default=2, type=int, metavar='N',
                         help='number of total epochs to run')
     parser.add_argument('-c','--checkpoint', default=None, type=str,
                         help='path to checkpoint to load')
     args=parser.parse_args()
     
     rank=int(os.environ['SLURM_PROCID'])
     local_rank=int(os.environ['SLURM_LOCALID'])
     size=int(os.environ['SLURM_NTASKS'])
     master_addr=os.environ["SLURM_SRUN_COMM_HOST"]
     port="29500"
     node_id=os.environ['SLURM_NODEID']
     ddp_arg= [rank, local_rank, size, master_addr, port, node_id]
     train(args, ddp_arg)     

而后咱们应用 DistributedDataParallel 库来执行分布式训练。

 deftrain(args, ddp_arg):
     
     rank, local_rank, size, MASTER_ADDR, port, NODE_ID=ddp_arg
     
     # display info
     ifrank==0:
         #print(">>> Training on", len(hostnames), "nodes and", size, "processes, master node is", MASTER_ADDR)
         print(">>> Training on", size, "GPUs, master node is", MASTER_ADDR)
     #print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
     
     print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
     
     
     # configure distribution method: define address and port of the master node and initialise communication backend (NCCL)
     #dist.init_process_group(backend='nccl', init_method='env://', world_size=size, rank=rank)
     dist.init_process_group(
         backend='nccl',
         init_method='tcp://{}:{}'.format(MASTER_ADDR, port),
         world_size=size,
         rank=rank
     )
     
     # distribute model
     torch.cuda.set_device(local_rank)
     gpu=torch.device("cuda")
     #model = ResNet18(classes=10).to(gpu)
     model=torchvision.models.resnet50(pretrained=False).to(gpu)
     ddp_model=DistributedDataParallel(model, device_ids=[local_rank])
     ifargs.checkpointisnotNone:
         map_location= {'cuda:%d'%0: 'cuda:%d'%local_rank}
         ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location))
     
     # distribute batch size (mini-batch)
     batch_size=args.batch_size
     batch_size_per_gpu=batch_size//size
     
     # define loss function (criterion) and optimizer
     criterion=nn.CrossEntropyLoss()  
     optimizer=torch.optim.SGD(ddp_model.parameters(), 1e-4)
     
     
     transform_train=transforms.Compose([transforms.RandomCrop(32, padding=4),
         transforms.RandomHorizontalFlip(),
         transforms.ToTensor(),
         transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
     ])
 
     # load data with distributed sampler
     #train_dataset = torchvision.datasets.CIFAR10(root='./data',
     #                                           train=True,
     #                                           transform=transform_train,
     #                                           download=False)
     
     # load data with distributed sampler
     train_dataset=torchvision.datasets.CIFAR10(root='./data',
                                                train=True,
                                                transform=transform_train,
                                                download=False)
     
     train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,
                                                                     num_replicas=size,
                                                                     rank=rank)
     
     train_loader=torch.utils.data.DataLoader(dataset=train_dataset,
                                                batch_size=batch_size_per_gpu,
                                                shuffle=False,
                                                num_workers=0,
                                                pin_memory=True,
                                                sampler=train_sampler)
 
     # training (timers and display handled by process 0)
     ifrank==0: start=datetime.now()         
     total_step=len(train_loader)
     
     forepochinrange(args.epochs):
         ifrank==0: start_dataload=time()
         
         fori, (images, labels) inenumerate(train_loader):
             
             # distribution of images and labels to all GPUs
             images=images.to(gpu, non_blocking=True)
             labels=labels.to(gpu, non_blocking=True) 
             
             ifrank==0: stop_dataload=time()
 
             ifrank==0: start_training=time()
             
             # forward pass
             outputs=ddp_model(images)
             loss=criterion(outputs, labels)
 
             # backward and optimize
             optimizer.zero_grad()
             loss.backward()
             optimizer.step()
             
             ifrank==0: stop_training=time() 
             if (i+1) %10==0andrank==0:
                 print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch+1, args.epochs,
                                                                         i+1, total_step, loss.item(), (stop_dataload-start_dataload)*1000,
                                                                         (stop_training-start_training)*1000))
             ifrank==0: start_dataload=time()
                     
         #Save checkpoint at every end of epoch
         ifrank==0:
             torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(size, epoch+1))
 
     ifrank==0:
         print(">>> Training complete in:"+str(datetime.now() -start))
 
 
 if__name__=='__main__':
 
     main()

代码将数据和模型宰割到多个 gpu 上,并以分布式的形式更新模型。上面是代码的一些解释:

train(args, ddp_arg)有两个参数,args 和 ddp_arg,其中 args 是传递给脚本的命令行参数,ddp_arg 蕴含分布式训练相干参数。

rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg: 解包 ddp_arg 中分布式训练相干参数。

如果 rank 为 0,则打印以后应用的 gpu 数量和主节点 IP 地址信息。

dist.init_process_group(backend=’nccl’, init_method=’tcp://{}:{}’.format(MASTER_ADDR, port), world_size=size, rank=rank) : 应用 NCCL 后端初始化分布式过程组。

torch.cuda.set_device(local_rank): 为这个过程抉择指定的 GPU。

model = torchvision.models. ResNet50 (pretrained=False).to(gpu): 从 torchvision 模型中加载 ResNet50 模型,并将其挪动到指定的 gpu。

ddp_model = DistributedDataParallel(model, device_ids=[local_rank]): 将模型包装在 DistributedDataParallel 模块中,也就是说这样咱们就能够进行分布式训练了

加载 CIFAR-10 数据集并利用数据加强转换。

train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=size,rank=rank): 创立一个 DistributedSampler 对象,将数据集宰割到多个 gpu 上。

train_loader =torch.utils.data.DataLoader(dataset=train_dataset,batch_size=batch_size_per_gpu,shuffle=False,num_workers=0,pin_memory=True,sampler=train_sampler): 创立一个 DataLoader 对象,数据将批量加载到模型中,这与咱们平时训练的步骤是统一的只不过是减少了一个分布式的数据采样 DistributedSampler

为指定的 epoch 数训练模型,以分布式的形式应用 optimizer.step()更新权重。

rank0 在每个轮次完结时保留一个检查点。

rank0 每 10 个批次显示损失和训练工夫。

完结训练时打印训练模型所破费的总工夫也是在 rank0 上。

代码测试

在应用 1 个节点 1 /2/3/ 4 个 gpu,2 个节点 6 / 8 个 gpu,每个节点 3 / 4 个 gpu 上进行了训练 Cifar10 上的 Resnet50 的测试如下图所示,每次测试的批处理大小放弃不变。实现每项测试所破费的工夫以秒为单位记录。随着应用的 gpu 数量的减少,实现测试所需的工夫会缩小。当应用 8 个 gpu 时,须要 320 秒能力实现,这是记录中最快的工夫。这是必定的,然而咱们能够看到训练的速度并没有像 GPU 数量增长出现线性的增长,这可能是因为 Resnet50 算是一个比拟小的模型了,并不需要进行并行化训练。

在多个 gpu 上应用数据并行能够显著缩小在给定数据集上训练深度神经网络 (DNN) 所需的工夫。随着 gpu 数量的减少,实现训练过程所需的工夫缩小,这表明 DNN 能够更无效地并行训练。

这种办法在解决大型数据集或简单的 DNN 架构时特地有用。通过利用多个 gpu,能够放慢训练过程,实现更快的模型迭代和试验。然而须要留神的是,通过 Data Parallelism 实现的性能晋升可能会受到通信开销和 GPU 内存限度等因素的限度,须要认真调优能力获得最佳后果。

https://avoid.overfit.cn/post/67095b9014cb40888238b84fea17e872

作者:Joseph El Kettaneh

正文完
 0