作者|DR. VAIBHAV KUMAR
编译|VK
起源|Analytics In Diamag
PyTorch通过提供大量弱小的工具和技术,始终在推动计算机视觉和深度学习畛域的倒退。
在计算机视觉畛域,基于深度学习的执行须要解决大量的图像数据集,因而须要一个减速的环境来放慢执行过程以达到可承受的精度程度。
PyTorch通过XLA(减速线性代数)提供了这一个性,XLA是一种线性代数编译器,能够针对多种类型的硬件,包含GPU和TPU。PyTorch/XLA环境与Google云TPU集成,实现了更快的执行速度。
在本文中,咱们将在PyTorch中应用TPU演示一种深卷积神经网络ResNet50的实现。
该模型将在PyTorch/XLA环境中进行训练和测试,以实现CIFAR10数据集的分类工作。咱们还将查看在50个epoch训练所破费的工夫。
ResNet50在Pytorch的实现
为了利用TPU的性能,这个实现是在Google Colab中实现的。首先,咱们须要从Notebook设置下的硬件加速器中抉择TPU。
抉择TPU后,咱们将应用上面的行验证环境代码:
import osassert os.environ['COLAB_TPU_ADDR']
如果启用了TPU,它将胜利执行,否则它将抛出‘KeyError: ‘COLAB_TPU_ADDR’’。你也能够通过打印TPU地址来查看TPU。
TPU_Path = 'grpc://'+os.environ['COLAB_TPU_ADDR']print('TPU Address:', TPU_Path)
在下一步中,咱们将装置XLA环境以放慢执行过程。咱们在上一篇文章中实现了卷积神经网络。
VERSION = "20200516"!curl https://raw.githubusercontent.com/pytorch/xla/master/contrib/scripts/env-setup.py -o pytorch-xla-env-setup.py!python pytorch-xla-env-setup.py --version $VERSION
当初,咱们将在这里导入所有必须的库。
from matplotlib import pyplot as pltimport numpy as npimport osimport timeimport torchimport torch.nn as nnimport torch.nn.functional as Fimport torch.optim as optimimport torch_xlaimport torch_xla.core.xla_model as xmimport torch_xla.debug.metrics as metimport torch_xla.distributed.parallel_loader as plimport torch_xla.distributed.xla_multiprocessing as xmpimport torch_xla.utils.utils as xuimport torchvisionfrom torchvision import datasets, transformsimport timefrom google.colab.patches import cv2_imshowimport cv2
导入库之后,咱们将定义并初始化所需的参数。
# 定义参数FLAGS = {}FLAGS['data_dir'] = "/tmp/cifar"FLAGS['batch_size'] = 128FLAGS['num_workers'] = 4FLAGS['learning_rate'] = 0.02FLAGS['momentum'] = 0.9FLAGS['num_epochs'] = 50FLAGS['num_cores'] = 8FLAGS['log_steps'] = 20FLAGS['metrics_debug'] = False
在下一步中,咱们将定义ResNet50模型。
class BasicBlock(nn.Module): expansion = 1 def __init__(self, in_planes, planes, stride=1): super(BasicBlock, self).__init__() self.conv1 = nn.Conv2d( in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(planes) self.conv2 = nn.Conv2d( planes, planes, kernel_size=3, stride=1, padding=1, bias=False) self.bn2 = nn.BatchNorm2d(planes) self.shortcut = nn.Sequential() if stride != 1 or in_planes != self.expansion * planes: self.shortcut = nn.Sequential( nn.Conv2d( in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False), nn.BatchNorm2d(self.expansion * planes)) def forward(self, x): out = F.relu(self.bn1(self.conv1(x))) out = self.bn2(self.conv2(out)) out += self.shortcut(x) out = F.relu(out) return outclass ResNet(nn.Module): def __init__(self, block, num_blocks, num_classes=10): super(ResNet, self).__init__() self.in_planes = 64 self.conv1 = nn.Conv2d( 3, 64, kernel_size=3, stride=1, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(64) self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1) self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2) self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2) self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2) self.linear = nn.Linear(512 * block.expansion, num_classes) def _make_layer(self, block, planes, num_blocks, stride): strides = [stride] + [1] * (num_blocks - 1) layers = [] for stride in strides: layers.append(block(self.in_planes, planes, stride)) self.in_planes = planes * block.expansion return nn.Sequential(*layers) def forward(self, x): out = F.relu(self.bn1(self.conv1(x))) out = self.layer1(out) out = self.layer2(out) out = self.layer3(out) out = self.layer4(out) out = F.avg_pool2d(out, 4) out = torch.flatten(out, 1) out = self.linear(out) return F.log_softmax(out, dim=1)def ResNet50(): return ResNet(BasicBlock, [3, 4, 6, 4, 3])
上面的代码片段将定义加载CIFAR10数据集、筹备训练和测试数据集、训练过程和测试过程的函数。
SERIAL_EXEC = xmp.MpSerialExecutor()# 只在内存中实例化一次模型权重。WRAPPED_MODEL = xmp.MpModelWrapper(ResNet50())def train_resnet50(): torch.manual_seed(1) def get_dataset(): norm = transforms.Normalize( mean=(0.4914, 0.4822, 0.4465), std=(0.2023, 0.1994, 0.2010)) transform_train = transforms.Compose([ transforms.RandomCrop(32, padding=4), transforms.RandomHorizontalFlip(), transforms.ToTensor(), norm, ]) transform_test = transforms.Compose([ transforms.ToTensor(), norm, ]) train_dataset = datasets.CIFAR10( root=FLAGS['data_dir'], train=True, download=True, transform=transform_train) test_dataset = datasets.CIFAR10( root=FLAGS['data_dir'], train=False, download=True, transform=transform_test) return train_dataset, test_dataset # 应用串行执行器能够防止多个过程 # 下载雷同的数据。 train_dataset, test_dataset = SERIAL_EXEC.run(get_dataset) train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=xm.xrt_world_size(), rank=xm.get_ordinal(), shuffle=True) train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=FLAGS['batch_size'], sampler=train_sampler, num_workers=FLAGS['num_workers'], drop_last=True) test_loader = torch.utils.data.DataLoader( test_dataset, batch_size=FLAGS['batch_size'], shuffle=False, num_workers=FLAGS['num_workers'], drop_last=True) # 将学习率缩放 learning_rate = FLAGS['learning_rate'] * xm.xrt_world_size() # 获取损失函数、优化器和模型 device = xm.xla_device() model = WRAPPED_MODEL.to(device) optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=FLAGS['momentum'], weight_decay=5e-4) loss_fn = nn.NLLLoss() def train_loop_fn(loader): tracker = xm.RateTracker() model.train() for x, (data, target) in enumerate(loader): optimizer.zero_grad() output = model(data) loss = loss_fn(output, target) loss.backward() xm.optimizer_step(optimizer) tracker.add(FLAGS['batch_size']) if x % FLAGS['log_steps'] == 0: print('[xla:{}]({}) Loss={:.2f} Time={}'.format(xm.get_ordinal(), x, loss.item(), time.asctime()), flush=True) def test_loop_fn(loader): total_samples = 0 correct = 0 model.eval() data, pred, target = None, None, None for data, target in loader: output = model(data) pred = output.max(1, keepdim=True)[1] correct += pred.eq(target.view_as(pred)).sum().item() total_samples += data.size()[0] accuracy = 100.0 * correct / total_samples print('[xla:{}] Accuracy={:.2f}%'.format( xm.get_ordinal(), accuracy), flush=True) return accuracy, data, pred, target # 训练和评估的循环 accuracy = 0.0 data, pred, target = None, None, None for epoch in range(1, FLAGS['num_epochs'] + 1): para_loader = pl.ParallelLoader(train_loader, [device]) train_loop_fn(para_loader.per_device_loader(device)) xm.master_print("Finished training epoch {}".format(epoch)) para_loader = pl.ParallelLoader(test_loader, [device]) accuracy, data, pred, target = test_loop_fn(para_loader.per_device_loader(device)) if FLAGS['metrics_debug']: xm.master_print(met.metrics_report(), flush=True) return accuracy, data, pred, target
当初,咱们将开始ResNet50的训练。训练将在咱们在参数中定义的50个epoch内实现。训练开始前,咱们会记录训练工夫,训练完结后,咱们将打印总工夫。
start_time = time.time()# 启动训练流程def training(rank, flags): global FLAGS FLAGS = flags torch.set_default_tensor_type('torch.FloatTensor') accuracy, data, pred, target = train_resnet50() if rank == 0: # 检索TPU外围0上的张量并绘制。 plot_results(data.cpu(), pred.cpu(), target.cpu())xmp.spawn(training, args=(FLAGS,), nprocs=FLAGS['num_cores'], start_method='fork')
训练完结后,咱们会打印训练过程所破费的工夫。
最初,在训练过程中,咱们将模型对样本测试数据的预测可视化。
end_time = time.time()print("Time taken = ", end_time-start_time)
原文链接:https://analyticsindiamag.com...
欢送关注磐创AI博客站:
http://panchuang.net/
sklearn机器学习中文官网文档:
http://sklearn123.com/
欢送关注磐创博客资源汇总站:
http://docs.panchuang.net/