作者 |DR. VAIBHAV KUMAR
编译 |VK
起源 |Analytics In Diamag
PyTorch 通过提供大量弱小的工具和技术,始终在推动计算机视觉和深度学习畛域的倒退。
在计算机视觉畛域,基于深度学习的执行须要解决大量的图像数据集,因而须要一个减速的环境来放慢执行过程以达到可承受的精度程度。
PyTorch 通过 XLA(减速线性代数)提供了这一个性,XLA 是一种线性代数编译器,能够针对多种类型的硬件,包含 GPU 和 TPU。PyTorch/XLA 环境与 Google 云 TPU 集成,实现了更快的执行速度。
在本文中,咱们将在 PyTorch 中应用 TPU 演示一种深卷积神经网络 ResNet50 的实现。
该模型将在 PyTorch/XLA 环境中进行训练和测试,以实现 CIFAR10 数据集的分类工作。咱们还将查看在 50 个 epoch 训练所破费的工夫。
ResNet50 在 Pytorch 的实现
为了利用 TPU 的性能,这个实现是在 Google Colab 中实现的。首先,咱们须要从 Notebook 设置下的硬件加速器中抉择 TPU。
抉择 TPU 后,咱们将应用上面的行验证环境代码:
import os
assert os.environ['COLAB_TPU_ADDR']
如果启用了 TPU,它将胜利执行,否则它将抛出‘KeyError:‘COLAB_TPU_ADDR’’。你也能够通过打印 TPU 地址来查看 TPU。
TPU_Path = 'grpc://'+os.environ['COLAB_TPU_ADDR']
print('TPU Address:', TPU_Path)
在下一步中,咱们将装置 XLA 环境以放慢执行过程。咱们在上一篇文章中实现了卷积神经网络。
VERSION = "20200516"
!curl https://raw.githubusercontent.com/pytorch/xla/master/contrib/scripts/env-setup.py -o pytorch-xla-env-setup.py
!python pytorch-xla-env-setup.py --version $VERSION
当初,咱们将在这里导入所有必须的库。
from matplotlib import pyplot as plt
import numpy as np
import os
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch_xla
import torch_xla.core.xla_model as xm
import torch_xla.debug.metrics as met
import torch_xla.distributed.parallel_loader as pl
import torch_xla.distributed.xla_multiprocessing as xmp
import torch_xla.utils.utils as xu
import torchvision
from torchvision import datasets, transforms
import time
from google.colab.patches import cv2_imshow
import cv2
导入库之后,咱们将定义并初始化所需的参数。
# 定义参数
FLAGS = {}
FLAGS['data_dir'] = "/tmp/cifar"
FLAGS['batch_size'] = 128
FLAGS['num_workers'] = 4
FLAGS['learning_rate'] = 0.02
FLAGS['momentum'] = 0.9
FLAGS['num_epochs'] = 50
FLAGS['num_cores'] = 8
FLAGS['log_steps'] = 20
FLAGS['metrics_debug'] = False
在下一步中,咱们将定义 ResNet50 模型。
class BasicBlock(nn.Module):
expansion = 1
def __init__(self, in_planes, planes, stride=1):
super(BasicBlock, self).__init__()
self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(planes)
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(planes)
self.shortcut = nn.Sequential()
if stride != 1 or in_planes != self.expansion * planes:
self.shortcut = nn.Sequential(
nn.Conv2d(
in_planes,
self.expansion * planes,
kernel_size=1,
stride=stride,
bias=False), nn.BatchNorm2d(self.expansion * planes))
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out += self.shortcut(x)
out = F.relu(out)
return out
class ResNet(nn.Module):
def __init__(self, block, num_blocks, num_classes=10):
super(ResNet, self).__init__()
self.in_planes = 64
self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(64)
self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
self.linear = nn.Linear(512 * block.expansion, num_classes)
def _make_layer(self, block, planes, num_blocks, stride):
strides = [stride] + [1] * (num_blocks - 1)
layers = []
for stride in strides:
layers.append(block(self.in_planes, planes, stride))
self.in_planes = planes * block.expansion
return nn.Sequential(*layers)
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.layer1(out)
out = self.layer2(out)
out = self.layer3(out)
out = self.layer4(out)
out = F.avg_pool2d(out, 4)
out = torch.flatten(out, 1)
out = self.linear(out)
return F.log_softmax(out, dim=1)
def ResNet50():
return ResNet(BasicBlock, [3, 4, 6, 4, 3])
上面的代码片段将定义加载 CIFAR10 数据集、筹备训练和测试数据集、训练过程和测试过程的函数。
SERIAL_EXEC = xmp.MpSerialExecutor()
# 只在内存中实例化一次模型权重。WRAPPED_MODEL = xmp.MpModelWrapper(ResNet50())
def train_resnet50():
torch.manual_seed(1)
def get_dataset():
norm = transforms.Normalize(mean=(0.4914, 0.4822, 0.4465), std=(0.2023, 0.1994, 0.2010))
transform_train = transforms.Compose([transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
norm,
])
transform_test = transforms.Compose([transforms.ToTensor(),
norm,
])
train_dataset = datasets.CIFAR10(root=FLAGS['data_dir'],
train=True,
download=True,
transform=transform_train)
test_dataset = datasets.CIFAR10(root=FLAGS['data_dir'],
train=False,
download=True,
transform=transform_test)
return train_dataset, test_dataset
# 应用串行执行器能够防止多个过程
# 下载雷同的数据。train_dataset, test_dataset = SERIAL_EXEC.run(get_dataset)
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset,
num_replicas=xm.xrt_world_size(),
rank=xm.get_ordinal(),
shuffle=True)
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=FLAGS['batch_size'],
sampler=train_sampler,
num_workers=FLAGS['num_workers'],
drop_last=True)
test_loader = torch.utils.data.DataLoader(
test_dataset,
batch_size=FLAGS['batch_size'],
shuffle=False,
num_workers=FLAGS['num_workers'],
drop_last=True)
# 将学习率缩放
learning_rate = FLAGS['learning_rate'] * xm.xrt_world_size()
# 获取损失函数、优化器和模型
device = xm.xla_device()
model = WRAPPED_MODEL.to(device)
optimizer = optim.SGD(model.parameters(), lr=learning_rate,
momentum=FLAGS['momentum'], weight_decay=5e-4)
loss_fn = nn.NLLLoss()
def train_loop_fn(loader):
tracker = xm.RateTracker()
model.train()
for x, (data, target) in enumerate(loader):
optimizer.zero_grad()
output = model(data)
loss = loss_fn(output, target)
loss.backward()
xm.optimizer_step(optimizer)
tracker.add(FLAGS['batch_size'])
if x % FLAGS['log_steps'] == 0:
print('[xla:{}]({}) Loss={:.2f} Time={}'.format(xm.get_ordinal(), x, loss.item(), time.asctime()), flush=True)
def test_loop_fn(loader):
total_samples = 0
correct = 0
model.eval()
data, pred, target = None, None, None
for data, target in loader:
output = model(data)
pred = output.max(1, keepdim=True)[1]
correct += pred.eq(target.view_as(pred)).sum().item()
total_samples += data.size()[0]
accuracy = 100.0 * correct / total_samples
print('[xla:{}] Accuracy={:.2f}%'.format(xm.get_ordinal(), accuracy), flush=True)
return accuracy, data, pred, target
# 训练和评估的循环
accuracy = 0.0
data, pred, target = None, None, None
for epoch in range(1, FLAGS['num_epochs'] + 1):
para_loader = pl.ParallelLoader(train_loader, [device])
train_loop_fn(para_loader.per_device_loader(device))
xm.master_print("Finished training epoch {}".format(epoch))
para_loader = pl.ParallelLoader(test_loader, [device])
accuracy, data, pred, target = test_loop_fn(para_loader.per_device_loader(device))
if FLAGS['metrics_debug']:
xm.master_print(met.metrics_report(), flush=True)
return accuracy, data, pred, target
当初,咱们将开始 ResNet50 的训练。训练将在咱们在参数中定义的 50 个 epoch 内实现。训练开始前,咱们会记录训练工夫,训练完结后,咱们将打印总工夫。
start_time = time.time()
# 启动训练流程
def training(rank, flags):
global FLAGS
FLAGS = flags
torch.set_default_tensor_type('torch.FloatTensor')
accuracy, data, pred, target = train_resnet50()
if rank == 0:
# 检索 TPU 外围 0 上的张量并绘制。plot_results(data.cpu(), pred.cpu(), target.cpu())
xmp.spawn(training, args=(FLAGS,), nprocs=FLAGS['num_cores'],
start_method='fork')
训练完结后,咱们会打印训练过程所破费的工夫。
最初,在训练过程中,咱们将模型对样本测试数据的预测可视化。
end_time = time.time()
print("Time taken =", end_time-start_time)
原文链接:https://analyticsindiamag.com…
欢送关注磐创 AI 博客站:
http://panchuang.net/
sklearn 机器学习中文官网文档:
http://sklearn123.com/
欢送关注磐创博客资源汇总站:
http://docs.panchuang.net/