作者|DR. VAIBHAV KUMAR
编译|VK
起源|Analytics In Diamag

PyTorch通过提供大量弱小的工具和技术,始终在推动计算机视觉和深度学习畛域的倒退。

在计算机视觉畛域,基于深度学习的执行须要解决大量的图像数据集,因而须要一个减速的环境来放慢执行过程以达到可承受的精度程度。

PyTorch通过XLA(减速线性代数)提供了这一个性,XLA是一种线性代数编译器,能够针对多种类型的硬件,包含GPU和TPU。PyTorch/XLA环境与Google云TPU集成,实现了更快的执行速度。

在本文中,咱们将在PyTorch中应用TPU演示一种深卷积神经网络ResNet50的实现。

该模型将在PyTorch/XLA环境中进行训练和测试,以实现CIFAR10数据集的分类工作。咱们还将查看在50个epoch训练所破费的工夫。

ResNet50在Pytorch的实现

为了利用TPU的性能,这个实现是在Google Colab中实现的。首先,咱们须要从Notebook设置下的硬件加速器中抉择TPU。

抉择TPU后,咱们将应用上面的行验证环境代码:

import osassert os.environ['COLAB_TPU_ADDR']

如果启用了TPU,它将胜利执行,否则它将抛出‘KeyError: ‘COLAB_TPU_ADDR’’。你也能够通过打印TPU地址来查看TPU。

TPU_Path = 'grpc://'+os.environ['COLAB_TPU_ADDR']print('TPU Address:', TPU_Path)

在下一步中,咱们将装置XLA环境以放慢执行过程。咱们在上一篇文章中实现了卷积神经网络。

VERSION = "20200516"!curl https://raw.githubusercontent.com/pytorch/xla/master/contrib/scripts/env-setup.py -o pytorch-xla-env-setup.py!python pytorch-xla-env-setup.py --version $VERSION

当初,咱们将在这里导入所有必须的库。

from matplotlib import pyplot as pltimport numpy as npimport osimport timeimport torchimport torch.nn as nnimport torch.nn.functional as Fimport torch.optim as optimimport torch_xlaimport torch_xla.core.xla_model as xmimport torch_xla.debug.metrics as metimport torch_xla.distributed.parallel_loader as plimport torch_xla.distributed.xla_multiprocessing as xmpimport torch_xla.utils.utils as xuimport torchvisionfrom torchvision import datasets, transformsimport timefrom google.colab.patches import cv2_imshowimport cv2

导入库之后,咱们将定义并初始化所需的参数。

# 定义参数FLAGS = {}FLAGS['data_dir'] = "/tmp/cifar"FLAGS['batch_size'] = 128FLAGS['num_workers'] = 4FLAGS['learning_rate'] = 0.02FLAGS['momentum'] = 0.9FLAGS['num_epochs'] = 50FLAGS['num_cores'] = 8FLAGS['log_steps'] = 20FLAGS['metrics_debug'] = False

在下一步中,咱们将定义ResNet50模型。

class BasicBlock(nn.Module):  expansion = 1  def __init__(self, in_planes, planes, stride=1):    super(BasicBlock, self).__init__()    self.conv1 = nn.Conv2d(        in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)    self.bn1 = nn.BatchNorm2d(planes)    self.conv2 = nn.Conv2d(        planes, planes, kernel_size=3, stride=1, padding=1, bias=False)    self.bn2 = nn.BatchNorm2d(planes)    self.shortcut = nn.Sequential()    if stride != 1 or in_planes != self.expansion * planes:      self.shortcut = nn.Sequential(          nn.Conv2d(              in_planes,              self.expansion * planes,              kernel_size=1,              stride=stride,              bias=False), nn.BatchNorm2d(self.expansion * planes))  def forward(self, x):    out = F.relu(self.bn1(self.conv1(x)))    out = self.bn2(self.conv2(out))    out += self.shortcut(x)    out = F.relu(out)    return outclass ResNet(nn.Module):  def __init__(self, block, num_blocks, num_classes=10):    super(ResNet, self).__init__()    self.in_planes = 64    self.conv1 = nn.Conv2d(        3, 64, kernel_size=3, stride=1, padding=1, bias=False)    self.bn1 = nn.BatchNorm2d(64)    self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)    self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)    self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)    self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)    self.linear = nn.Linear(512 * block.expansion, num_classes)  def _make_layer(self, block, planes, num_blocks, stride):    strides = [stride] + [1] * (num_blocks - 1)    layers = []    for stride in strides:      layers.append(block(self.in_planes, planes, stride))      self.in_planes = planes * block.expansion    return nn.Sequential(*layers)  def forward(self, x):    out = F.relu(self.bn1(self.conv1(x)))    out = self.layer1(out)    out = self.layer2(out)    out = self.layer3(out)    out = self.layer4(out)    out = F.avg_pool2d(out, 4)    out = torch.flatten(out, 1)    out = self.linear(out)    return F.log_softmax(out, dim=1)def ResNet50():  return ResNet(BasicBlock, [3, 4, 6, 4, 3])

上面的代码片段将定义加载CIFAR10数据集、筹备训练和测试数据集、训练过程和测试过程的函数。

SERIAL_EXEC = xmp.MpSerialExecutor()# 只在内存中实例化一次模型权重。WRAPPED_MODEL = xmp.MpModelWrapper(ResNet50())def train_resnet50():  torch.manual_seed(1)  def get_dataset():    norm = transforms.Normalize(        mean=(0.4914, 0.4822, 0.4465), std=(0.2023, 0.1994, 0.2010))    transform_train = transforms.Compose([        transforms.RandomCrop(32, padding=4),        transforms.RandomHorizontalFlip(),        transforms.ToTensor(),        norm,    ])    transform_test = transforms.Compose([        transforms.ToTensor(),        norm,    ])    train_dataset = datasets.CIFAR10(        root=FLAGS['data_dir'],        train=True,        download=True,        transform=transform_train)    test_dataset = datasets.CIFAR10(        root=FLAGS['data_dir'],        train=False,        download=True,        transform=transform_test)       return train_dataset, test_dataset   # 应用串行执行器能够防止多个过程  # 下载雷同的数据。  train_dataset, test_dataset = SERIAL_EXEC.run(get_dataset)  train_sampler = torch.utils.data.distributed.DistributedSampler(      train_dataset,      num_replicas=xm.xrt_world_size(),      rank=xm.get_ordinal(),      shuffle=True)  train_loader = torch.utils.data.DataLoader(      train_dataset,      batch_size=FLAGS['batch_size'],      sampler=train_sampler,      num_workers=FLAGS['num_workers'],      drop_last=True)  test_loader = torch.utils.data.DataLoader(      test_dataset,      batch_size=FLAGS['batch_size'],      shuffle=False,      num_workers=FLAGS['num_workers'],      drop_last=True)  # 将学习率缩放  learning_rate = FLAGS['learning_rate'] * xm.xrt_world_size()  # 获取损失函数、优化器和模型  device = xm.xla_device()  model = WRAPPED_MODEL.to(device)  optimizer = optim.SGD(model.parameters(), lr=learning_rate,                        momentum=FLAGS['momentum'], weight_decay=5e-4)  loss_fn = nn.NLLLoss()  def train_loop_fn(loader):    tracker = xm.RateTracker()    model.train()    for x, (data, target) in enumerate(loader):      optimizer.zero_grad()      output = model(data)      loss = loss_fn(output, target)      loss.backward()      xm.optimizer_step(optimizer)      tracker.add(FLAGS['batch_size'])      if x % FLAGS['log_steps'] == 0:        print('[xla:{}]({}) Loss={:.2f} Time={}'.format(xm.get_ordinal(), x, loss.item(), time.asctime()), flush=True)  def test_loop_fn(loader):    total_samples = 0    correct = 0    model.eval()    data, pred, target = None, None, None    for data, target in loader:      output = model(data)      pred = output.max(1, keepdim=True)[1]      correct += pred.eq(target.view_as(pred)).sum().item()      total_samples += data.size()[0]    accuracy = 100.0 * correct / total_samples    print('[xla:{}] Accuracy={:.2f}%'.format(        xm.get_ordinal(), accuracy), flush=True)    return accuracy, data, pred, target  # 训练和评估的循环  accuracy = 0.0  data, pred, target = None, None, None  for epoch in range(1, FLAGS['num_epochs'] + 1):    para_loader = pl.ParallelLoader(train_loader, [device])    train_loop_fn(para_loader.per_device_loader(device))    xm.master_print("Finished training epoch {}".format(epoch))    para_loader = pl.ParallelLoader(test_loader, [device])    accuracy, data, pred, target  = test_loop_fn(para_loader.per_device_loader(device))    if FLAGS['metrics_debug']:      xm.master_print(met.metrics_report(), flush=True)  return accuracy, data, pred, target

当初,咱们将开始ResNet50的训练。训练将在咱们在参数中定义的50个epoch内实现。训练开始前,咱们会记录训练工夫,训练完结后,咱们将打印总工夫。

start_time = time.time()# 启动训练流程def training(rank, flags):  global FLAGS  FLAGS = flags  torch.set_default_tensor_type('torch.FloatTensor')  accuracy, data, pred, target = train_resnet50()  if rank == 0:    # 检索TPU外围0上的张量并绘制。    plot_results(data.cpu(), pred.cpu(), target.cpu())xmp.spawn(training, args=(FLAGS,), nprocs=FLAGS['num_cores'],          start_method='fork')


训练完结后,咱们会打印训练过程所破费的工夫。

最初,在训练过程中,咱们将模型对样本测试数据的预测可视化。

end_time = time.time()print("Time taken = ", end_time-start_time)

原文链接:https://analyticsindiamag.com...

欢送关注磐创AI博客站:
http://panchuang.net/

sklearn机器学习中文官网文档:
http://sklearn123.com/

欢送关注磐创博客资源汇总站:
http://docs.panchuang.net/