导入相关库

import osfrom glob import globimport torch as t# 设置随机种子是为了保证结果的可重复性t.random.manual_seed(0)t.cuda.manual_seed_all(0)# Benchmark模式会提升计算速度,但是由于计算中有随机性,每次网络前馈结果略有差异t.backends.cudnn.benchmark = True# 避免上一句所带来的波动t.backends.cudnn.deterministic = Truefrom PIL import Imageimport torch.nn as nnfrom tqdm.auto import tqdmfrom torchvision import transformsfrom torchvision.utils import save_image, make_gridfrom torch.optim import SGDfrom torch.optim.lr_scheduler import CosineAnnealingWarmRestarts, MultiStepLR, CosineAnnealingLRfrom torch.utils.data import DataLoader, Dataset, WeightedRandomSamplerimport pandas as pdimport numpy as npimport matplotlib.pyplot as pltimport matplotlib.patches as patch import torch.nn.functional as Fimport jsonfrom torchvision.models.mobilenet import mobilenet_v2from torchvision.models.resnet import resnet18, resnet34from torchsummary import summary%matplotlib inline

设置网络配置参数

class Config:        batch_size = 16        # 初始学习率    lr = 1e-2        # 动量    momentum = 0.9        # 衰减系数    weights_decay = 1e-5        class_num = 11        # 每隔多少个epoch进行一次网络评估    eval_interval = 1        # 每隔多少个epoch保存一次模型    checkpoint_interval = 1        # 每隔多少个iteration进行进度条更新或输出log    print_interval = 50        # 模型保存路径    checkpoints = 'drive/My Drive/Data/Datawhale-DigitsRecognition/checkpoints/'        # 预训练模型加载路径    pretrained = '/content/drive/My Drive/Data/Datawhale-DigitsRecognition/checkpoints/epoch-32_acc-0.67.pth'        # 开始训练的epoch    start_epoch = 0        # 一共训练的epoch数目    epoches = 50        # label smooth参数,为1表示不使用label smooth    smooth = 0.1        # 随机擦除的概率, 为0表示不擦除    erase_prob = 0.5    config = Config()

构建网络模型

通常而言,在构建Baseline时,会选择参数尽可能少,模型复杂度较低的轻量级网络作为backbone。如果可以work,后期才会用更复杂的backbone来替换它。

这里选用的是MobileNet V2作为backbone, 来搭建一个分类网络

class DigitsMobilenet(nn.Module):    def __init__(self, class_num=11):        super(DigitsMobilenet, self).__init__()            self.net = mobilenet_v2(pretrained=True)        self.net.classifier = nn.Sequential(            nn.AdaptiveAvgPool2d((1, 1))            )        self.fc1 = nn.Linear(1280, class_num)        self.fc2 = nn.Linear(1280, class_num)        self.fc3 = nn.Linear(1280, class_num)        self.fc4 = nn.Linear(1280, class_num)        self.fc5 = nn.Linear(1280, class_num)        def forward(self, img):        """        Params:            img(tensor): shape [N, C, H, W]                    Returns:            fc1(tensor): 代表第1个字符的presentation            fc2(tensor): 代表第2个字符的presentation            fc3(tensor): 代表第3个字符的presentation            fc4(tensor): 代表第4个字符的presentation            fc5(tensor): 代表第5个字符的presentation                """        features = self.net(img).view(-1, 1280)            fc1 = self.fc1(features)        fc2 = self.fc2(features)        fc3 = self.fc3(features)        fc4 = self.fc4(features)        fc5 = self.fc5(features)            return fc1, fc2, fc3, fc4, fc5        class DigitsResnet18(nn.Module):    def __init__(self, class_num=11):        super(DigitsMobilenet, self).__init__()        self.net = resnet18(pretrained=True)                # nn.Identity表示空层, 输入等于输出        self.net.fc = nn.Identity()        self.fc1 = nn.Linear(512, class_num)        self.fc2 = nn.Linear(512, class_num)        self.fc3 = nn.Linear(512, class_num)        self.fc4 = nn.Linear(512, class_num)        self.fc5 = nn.Linear(512, class_num)        def forward(self, img):        features = self.net(img).squeeze()            fc1 = self.fc1(features)        fc2 = self.fc2(features)        fc3 = self.fc3(features)        fc4 = self.fc4(features)        fc5 = self.fc5(features)            return fc1, fc2, fc3, fc4, fc5

构建训练模块

这里使用了几个Tricks

  • Label Smooth标签平滑
    标签平滑是一种正则化技术,避免由于数据量小导致的过拟合。
    Label smooth的公式如下,表示平滑度(实验中设置为0.1)C表示多分类的类别数,Pi表示软化后的标签概率。
    $$P_i=\begin{cases} 1-\epsilon \quad if(i=y)\\\frac{\epsilon}{C-1}\quad if(i\neq y) \end{cases}$$

    比如一个label的one-hot 编码向量为[0, 1, 0, 0], 经过label smooth之后的one-hot 编码向量变为[0.033, 0.9, 0.033, 0.033]。

  • 余弦衰减+warmup
    通常而言,刚开始梯度是极其不稳定的,因此应该使用较小的学习率先train几个迭代次数,然后将学习率恢复到初始学习率,开始正常训练。
    warmup在前n(n设为10)次迭代过程中,线性调整学习率到达初始学习率.一定程度上保证了训练的稳定性,并且可以更好的收敛到极小值。
    而余弦衰减调整策略则可以很好的跳出局部极小值,有更大的可能得到更优的局部极小值。
    如下图所示,分别表示warmup和余弦衰减策略下的学习率曲线
# ----------------------------------- LabelSmoothEntropy ----------------------------------- #class LabelSmoothEntropy(nn.Module):    def __init__(self, smooth=0.1, class_weights=None, size_average='mean'):        super(LabelSmoothEntropy, self).__init__()        self.size_average = size_average        self.smooth = smooth            self.class_weights = class_weights                def forward(self, preds, targets):            lb_pos, lb_neg = 1 - self.smooth, self.smooth / (preds.shape[0] - 1)            smoothed_lb = t.zeros_like(preds).fill_(lb_neg).scatter_(1, targets[:, None], lb_pos)            log_soft = F.log_softmax(preds)            if self.class_weights is not None:            loss = -log_soft * smoothed_lb * self.class_weights[None, :]            else:            loss = -log_soft * smoothed_lb            loss = loss.sum(1)        if self.size_average == 'mean':            return loss.mean()            elif self.size_average == 'sum':            return loss.sum()        else:            raise NotImplementedError        class Trainer:    def __init__(self):            self.device = t.device('cuda') if t.cuda.is_available() else t.device('cpu')            self.train_set = DigitsDataset(data_dir['train_data'], data_dir['train_label'])        self.train_loader = DataLoader(self.train_set, batch_size=config.batch_size, num_workers=8, pin_memory=True, drop_last=True)                    self.val_loader = DataLoader(DigitsDataset(data_dir['val_data'], data_dir['val_label'], aug=False), batch_size=config.batch_size,\                        num_workers=8, pin_memory=True, drop_last=True)            self.model = DigitsMobilenet(config.class_num).to(self.device)            # 使用Label Smooth        self.criterion = LabelSmoothEntropy().to(self.device)                self.optimizer = SGD(self.model.parameters(), lr=config.lr, momentum=config.momentum, weight_decay=config.weights_decay, nesterov=True)                # 使用余弦衰减学习率调整策略        self.lr_scheduler = CosineAnnealingWarmRestarts(self.optimizer, 10, 2, eta_min=10e-4)        # self.lr_scheduler = (self.optimizer, [10, 20, 30], 0.5)        self.best_acc = 0            if config.pretrained is not None:            self.load_model(config.pretrained)            # print('Load model from %s'%config.pretrained)            acc = self.eval()            self.best_acc = acc            print('Load model from %s, Eval Acc: %.2f'%(config.pretrained, acc * 100))        def train(self):        for epoch in range(config.start_epoch, config.epoches):        self.train_epoch(epoch)        if (epoch + 1) % config.eval_interval == 0:            print('Start Evaluation')            acc = self.eval()                if acc > self.best_acc:            os.makedirs(config.checkpoints, exist_ok=True)            save_path = config.checkpoints+'epoch-%d_acc-%.2f.pth'%(epoch+1, acc)            self.save_model(save_path)            print('%s saved successfully...'%save_path)            self.best_acc = acc        def train_epoch(self, epoch):        total_loss = 0        corrects = 0        tbar = tqdm(self.train_loader)        self.model.train()        for i, (img, label) in enumerate(tbar):        img = img.to(self.device)        label = label.to(self.device)        self.optimizer.zero_grad()        pred = self.model(img)        loss = self.criterion(pred[0], label[:, 0]) + \            self.criterion(pred[1], label[:, 1]) + \            self.criterion(pred[2], label[:, 2]) + \            self.criterion(pred[3], label[:, 3]) + \            self.criterion(pred[4], label[:, 4])        total_loss += loss.item()        loss.backward()        self.optimizer.step()        temp = t.stack([\                pred[0].argmax(1) == label[:, 0], \                pred[1].argmax(1) == label[:, 1], \                pred[2].argmax(1) == label[:, 2], \                pred[3].argmax(1) == label[:, 3], \                pred[4].argmax(1) == label[:, 4]\            ], dim=1)        # 只有预测的数字全部正确才算正确        corrects += t.all(temp, dim=1).sum().item()        if (i + 1) % config.print_interval == 0:            self.lr_scheduler.step()            tbar.set_description('loss: %.3f, acc: %.3f'%(loss/(i+1), corrects*100/((i + 1) * config.batch_size)))        def eval(self):        self.model.eval()        corrects = 0        with t.no_grad():        tbar = tqdm(self.val_loader)        for i, (img, label) in enumerate(tbar):            img = img.to(self.device)            label = label.to(self.device)            pred = self.model(img)                temp = t.stack([                pred[0].argmax(1) == label[:, 0], \                pred[1].argmax(1) == label[:, 1], \                pred[2].argmax(1) == label[:, 2], \                pred[3].argmax(1) == label[:, 3], \                pred[4].argmax(1) == label[:, 4]\            ], dim=1)                corrects += t.all(temp, dim=1).sum().item()            tbar.set_description('Val Acc: %.2f'%(corrects * 100 /((i+1)*config.batch_size)))        self.model.train()        return corrects / (len(self.val_loader) * config.batch_size)        def save_model(self, save_path, save_opt=False, save_config=False):        # 保存模型        dicts = {}        dicts['model'] = self.model.state_dict()        if save_opt:            dicts['opt'] = self.optimizer.state_dict()            if save_config:            dicts['config'] = {s: config.__getattribute__(s) for s in dir(config) if not s.startswith('_')}            t.save(dicts, save_path)        def load_model(self, load_path, save_opt=False, save_config=False):        # 加载模型        dicts = t.load(load_path)        self.model.load_state_dict(dicts['model'])            if save_opt:            self.optimizer.load_state_dict(dicts['opt'])            if save_config:            for k, v in dicts['config'].items():                config.__setattr__(k, v)

总结

总的来说,个人觉得用分类的思想还是挺新颖的,刚开始我都没想过要用分类来做。如果分类模型就能搞定,那何必用目标检测来干呢。当然,针对竞赛而言,目标检测效果应该会更好。

这部分内容和之前的内容是高度相关的,这部分用到了之前的代码。
代码放在我的gihub仓库,欢迎Star。

所有数据我也通过云盘共享,这是地址

ok,暂时就这样了