关于人工智能:医学图像的深度学习的完整代码示例使用Pytorch对MRI脑扫描的图像进行分割

41次阅读

共计 19499 个字符,预计需要花费 49 分钟才能阅读完成。

图像宰割是医学图像剖析中最重要的工作之一,在许多临床利用中往往是第一步也是最要害的一步。在脑 MRI 剖析中,图像宰割通常用于测量和可视化解剖构造,剖析大脑变动,描述病理区域以及手术打算和图像疏导干涉,宰割是大多数形态学剖析的先决条件。

本文咱们将介绍如何应用 QuickNAT 对人脑的图像进行宰割。应用 MONAI, PyTorch 和用于数据可视化和计算的常见 Python 库,如 NumPy, TorchIO 和 matplotlib。

本文将次要设计以下几个方面:

  • 设置数据集和摸索数据
  • 解决和筹备数据集适当的模型训练
  • 创立一个训练循环
  • 评估模型并剖析后果

残缺的代码会在本文最初提供。

设置数据目录

应用 MONAI 的第一步是设置 MONAI_DATA_DIRECTORY 环境变量指定目录,如果未指定将应用长期目录。

 directory=os.environ.get("MONAI_DATA_DIRECTORY")
 root_dir=tempfile.mkdtemp() ifdirectoryisNoneelsedirectory
 print(root_dir)

设置数据集

将 CNN 模型扩大到大脑宰割的次要挑战之一是人工正文的训练数据的有限性。作者引入了一种新的训练策略,利用没有手动标签的大型数据集和有手动标签的小型数据集。

首先,应用现有的软件工具 (例如 FreeSurfer) 从大型未标记数据集中取得主动生成的宰割,而后应用这些工具对网络进行预训练。在第二步中,应用更小的手动正文数据 [2] 对网络进行微调。

IXI 数据集由 581 个衰弱受试者的未标记 MRI T1 扫描组成。这些数据是从伦敦 3 家不同的医院收集来的。应用该数据集的次要毛病是标签不是公开可用的,因而为了遵循与钻研论文中雷同的办法,本文将应用 FreeSurfer 为这些 MRI T1 扫描生成宰割。

FreeSurfer 是一个用于剖析和可视化构造的软件包。下载和装置阐明能够在这里找到。能够间接应用了“recon-all”命令来执行所有皮层重建过程。

只管 FreeSurfer 是一个十分有用的工具,能够利用大量未标记的数据,并以监督的形式训练网络,然而扫描生成这些标签须要长达 5 个小时,所以咱们这里间接应用 OASIS 数据集来训练模型,OASIS 数据集是一个较小的数据集,具备公开可用的手动正文。

OASIS 是一个向科学界收费提供大脑神经成像数据集的我的项目。OASIS- 1 是由 39 个受试者的横断面组成的数据集,获取形式如下:

 resource="https://download.nrg.wustl.edu/data/oasis_cross-sectional_disc1.tar.gz"
 md5="c83e216ef8654a7cc9e2a30a4cdbe0cc"
 
 compressed_file=os.path.join(root_dir, "oasis_cross-sectional_disc1.tar.gz")
 data_dir=os.path.join(root_dir, "Oasis_Data")
 ifnotos.path.exists(data_dir):
     download_and_extract(resource, compressed_file, data_dir, md5)

数据摸索

如果你关上 ’ oasis_crosssectional_disc1 .tar.gz ‘,你会发现每个主题都有不同的文件夹。例如,对于主题 OAS1_0001_MR1,是这样的:

镜像数据文件门路:disc1\OAS1_0001_MR1\PROCESSED\MPRAGE\T88_111\ oas1_0001_mr1_mpr_n4_anon_111_t88_masked_ggc .img

标签文件:disc1\OAS1_0001_MR1\FSL_SEG\OAS1_0001_MR1_mpr_n4_anon_111_t88_masked_gfc_fseg.img

数据加载和预处理

下载数据集并将其提取到长期目录后,须要对其进行重构,咱们心愿咱们的目录看起来像这样:

所以须要依照上面的步骤加载数据:

将。img 文件转换为。nii 文件并保留到新文件夹中:创立两个新文件夹。Oasis_Data_Processed 包含每个受试者的解决过的 MRI T1 扫描,Oasis_Labels_Processed 包含相应的标签。

 new_path_data=root_dir+'/Oasis_Data_Processed/'
 ifnotos.path.exists(new_path_data):
   os.makedirs(new_path_data) 
 
 new_path_labels=root_dir+'/Oasis_Labels_Processed/'
 ifnotos.path.exists(new_path_labels):
   os.makedirs(new_path_labels)

而后就是对其进行操作:

 foriin [xforxinrange(1, 43) ifx!=8andx!=24andx!=36]:
   ifi<7ori==9:
     filename=root_dir+'/Oasis_Data/disc1/OAS1_000'+str(i) +'_MR1/PROCESSED/MPRAGE/T88_111/OAS1_000'+str(i) +'_MR1_mpr_n4_anon_111_t88_masked_gfc.img'
   elifi==7: 
     filename=root_dir+'/Oasis_Data/disc1/OAS1_000'+str(i) +'_MR1/PROCESSED/MPRAGE/T88_111/OAS1_000'+str(i) +'_MR1_mpr_n3_anon_111_t88_masked_gfc.img'
   elifi==15ori==16ori==20ori==24ori==26ori==34ori==38ori==39:
     filename=root_dir+'/Oasis_Data/disc1/OAS1_00'+str(i) +'_MR1/PROCESSED/MPRAGE/T88_111/OAS1_00'+str(i) +'_MR1_mpr_n3_anon_111_t88_masked_gfc.img'
   else: 
     filename=root_dir+'/Oasis_Data/disc1/OAS1_00'+str(i) +'_MR1/PROCESSED/MPRAGE/T88_111/OAS1_00'+str(i) +'_MR1_mpr_n4_anon_111_t88_masked_gfc.img'
   img=nib.load(filename)
   nib.save(img, filename.replace('.img', '.nii'))
   i=i+1  

具体代码就不再粘贴了,有趣味的看看最初的残缺代码。下一步就是读取图像和标签文件名

 image_files=sorted(glob(os.path.join(root_dir+'/Oasis_Data_Processed', '*.nii')))
 label_files=sorted(glob(os.path.join(root_dir+'/Oasis_Labels_Processed', '*.nii')))
 files= [{'image': image_name, 'label': label_name} forimage_name, label_nameinzip(image_files, label_files)]

为了可视化带有相应标签的图像,能够应用 TorchIO,这是一个 Python 库,用于深度学习中多维医学图像的加载、预处理、加强和采样。

 image_filename=root_dir+'/Oasis_Data_Processed/OAS1_0001_MR1_mpr_n4_anon_111_t88_masked_gfc.nii'
 label_filename=root_dir+'/Oasis_Labels_Processed/OAS1_0001_MR1_mpr_n4_anon_111_t88_masked_gfc_fseg.nii'
 subject=torchio.Subject(image=torchio.ScalarImage(image_filename), label=torchio.LabelMap(label_filename))
 subject.plot()

上面就是将数据分成 3 局部——训练、验证和测试。将数据分成三个不同的类别的目标是建设一个牢靠的机器学习模型,防止过拟合。

咱们将整个数据集分成三个局部:

Train: 80%,Validation: 10%,Test: 10%

 train_inds, val_inds, test_inds=partition_dataset(data=np.arange(len(files)), ratios= [8, 1, 1], shuffle=True)
 
 train= [files[i] foriinsorted(train_inds)]
 val= [files[i] foriinsorted(val_inds)]
 test= [files[i] foriinsorted(test_inds)]
 
 print(f"Training count: {len(train)}, Validation count: {len(val)}, Test count: {len(test)}")

因为模型须要的是二维切片,所以将每个切片保留在不同的文件夹中,如下图所示。这两个代码单元将训练集的每个 MRI 体积的切片保留为“.png”格局。

 Savecoronalslicesfortrainingimages
 dir=root_dir+'/TrainData'
 os.makedirs(os.path.join(dir, "Coronal"))
 path=root_dir+'/TrainData/Coronal/'
 
 forfileinsorted(glob(os.path.join(root_dir+'/TrainData', '*.nii'))):
   image=torchio.ScalarImage(file)
   data=image.data
   filename=os.path.basename(file)
   filename=os.path.splitext(filename)
   foriinrange(0, 208):
     slice=data[0, :, i]
     array=slice.numpy()
     data_dir=root_dir+'/TrainData/Coronal/'+filename[0] +'_slice'+str(i) +'.png'
     plt.imsave(fname=data_dir, arr=array, format='png', cmap=plt.cm.gray)

同理,上面是保留标签:

 dir=root_dir+'/TrainLabels'
 os.makedirs(os.path.join(dir, "Coronal"))
 path=root_dir+'/TrainLabels/Coronal/'
 
 forfileinsorted(glob(os.path.join(root_dir+'/TrainLabels', '*.nii'))):
   label=torchio.LabelMap(file)
   data=label.data
   filename=os.path.basename(file)
   filename=os.path.splitext(filename)
   foriinrange(0, 208):
     slice=data[0, :, i]
     array=slice.numpy()
     data_dir=root_dir+'/TrainLabels/Coronal/'+filename[0] +'_slice'+str(i) +'.png'
     plt.imsave(fname=data_dir, arr=array, format='png')

为训练和验证定义图像的变换处理

在本例中,咱们将应用 Dictionary Transforms,其中数据是 Python 字典。

 train_images_coronal= []
 forfileinsorted(glob(os.path.join(root_dir+'/TrainData/Coronal', '*.png'))):
   train_images_coronal.append(file)
 train_images_coronal=natsort.natsorted(train_images_coronal)
 
 train_labels_coronal= []
 forfileinsorted(glob(os.path.join(root_dir+'/TrainLabels/Coronal', '*.png'))):
   train_labels_coronal.append(file)
 train_labels_coronal=natsort.natsorted(train_labels_coronal)
 
 val_images_coronal= []
 forfileinsorted(glob(os.path.join(root_dir+'/ValData/Coronal', '*.png'))):
   val_images_coronal.append(file)
 val_images_coronal=natsort.natsorted(val_images_coronal)
 
 val_labels_coronal= []
 forfileinsorted(glob(os.path.join(root_dir+'/ValLabels/Coronal', '*.png'))):
   val_labels_coronal.append(file)
 val_labels_coronal=natsort.natsorted(val_labels_coronal)
 
 train_files_coronal= [{'image': image_name, 'label': label_name} forimage_name, label_nameinzip(train_images_coronal, train_labels_coronal)]
 val_files_coronal= [{'image': image_name, 'label': label_name} forimage_name, label_nameinzip(val_images_coronal, val_labels_coronal)]

当初咱们将利用以下变换:

LoadImaged: 加载图像数据和元数据。咱们应用 ’ PILReader ‘ 来加载图像和标签文件。ensure_channel_first 设置为 True,将图像数组形态转换为通道优先。

Rotate90d: 咱们将图像和标签旋转 90 度,因为当咱们下载它们时,它们方向是不正确的。

ToTensord: 将输出的图像和标签转换为张量。

NormalizeIntensityd: 对输出进行规范化。

 train_transforms=Compose(
      [LoadImaged(keys= ['image', 'label'], reader=PILReader(converter=lambdaimage: image.convert("L")), ensure_channel_first=True),
         Rotate90d(keys= ['image', 'label'], k=2),
         ToTensord(keys= ['image', 'label']),
         NormalizeIntensityd(keys= ['image'])
      ]
  )
 
 val_transforms=Compose(
      [LoadImaged(keys= ['image', 'label'], reader=PILReader(converter=lambdaimage: image.convert("L")), ensure_channel_first=True),
         Rotate90d(keys= ['image', 'label'], k=2),
         ToTensord(keys= ['image', 'label']),
         NormalizeIntensityd(keys= ['image'])
      ]
  )

MaskColorMap 将咱们定义了一个新的转换,将相应的像素值以一种格局映射为多个标签。这种转换在语义宰割中是必不可少的,因为咱们必须为每个可能的类别提供二元特色。One-Hot Encoding 将对应于原始类别的每个样本的特色赋值为 1。

因为 OASIS- 1 数据集只有 3 个大脑构造标签,对于更具体的宰割,现实的状况是像他们在钻研论文中那样对 28 个皮质构造进行正文。在 OASIS- 1 下载阐明中,能够找到应用 FreeSurfer 取得的更多大脑构造的标签。

所以本文将宰割更多的神经解剖构造。咱们要将模型的参数 num_classes 批改为相应的标签数量,以便模型的输入是具备 N 个通道的特色映射,等于 num_classes。

为了简化本教程,咱们将应用以下标签,比 OASIS- 1 然而要比 FreeSurfer 的少:

  • Label 0: Background
  • Label 1: LeftCerebralExterior
  • Label 2: LeftWhiteMatter
  • Label 3: LeftCerebralCortex

所以 MaskColorMap 的代码如下:

 class MaskColorMap(Enum):
     Background = (30)
     LeftCerebralExterior = (91)
     LeftWhiteMatter = (137)
     LeftCerebralCortex = (215)

数据集和数据加载

数据集和数据加载器从存储中提取数据,并将其分批发送给训练循环。这里咱们应用 monai.data.Dataset 加载之前定义的训练和验证字典,并对输出数据利用相应的转换。dataloader 用于将数据集加载到内存中。咱们将为训练和验证以及每个视图定义一个数据集和数据加载器。

为了不便演示,咱们应用通过应用 torch.utils.data.Subset,在指定的索引处创立一个子集,只是用局部数据训练放慢演示速度。

 train_dataset_coronal=Dataset(data=train_files_coronal, transform=train_transforms)
 train_loader_coronal=DataLoader(train_dataset_coronal, batch_size=1, shuffle=True)
 
 val_dataset_coronal=Dataset(data=val_files_coronal, transform=val_transforms)
 val_loader_coronal=DataLoader(val_dataset_coronal, batch_size=1, shuffle=False)
 
 # We will use a subset of the dataset
 subset_train=list(range(90, len(train_dataset_coronal), 120))
 train_dataset_coronal_subset=torch.utils.data.Subset(train_dataset_coronal, subset_train)
 train_loader_coronal_subset=DataLoader(train_dataset_coronal_subset, batch_size=1, shuffle=True)
 
 subset_val=list(range(90, len(val_dataset_coronal), 50))
 val_dataset_coronal_subset=torch.utils.data.Subset(val_dataset_coronal, subset_val)
 val_loader_coronal_subset=DataLoader(val_dataset_coronal_subset, batch_size=1, shuffle=False)

定义模型

给定一组 MRI 脑扫描 I = {I1,…In}及其对应的宰割 S = {S1,…Sn},咱们想要学习一个函数 fseg: I -> S。咱们将这个函数示意为 F -CNN 模型,称为 QuickNAT:

QuickNAT 由三个二维 f – cnn 组成,别离在 coronal, axial, sagittal 视图上操作,而后通过聚合步骤推断最终的宰割后果,该宰割后果由三个网络的概率图组合而成。每个 F -CNN 都有一个编码器 / 解码器架构,其中有 4 个编码器和 4 个解码器,并由瓶颈层分隔。最初一层是带有 softmax 的分类器块。该架构还包含每个编码器 / 解码器块内的残差链接。

 classQuickNat(nn.Module):
     """A PyTorch implementation of QuickNAT"""
 
     def__init__(self, params):
         """:param params: {'num_channels':1,'num_filters':64,'kernel_h':5,'kernel_w':5,'stride_conv':1,'pool':2,'stride_pool':2,'num_classes':28'se_block': False,'drop_out':0.2}"""
         super(QuickNat, self).__init__()
 
         # from monai.networks.blocks import squeeze_and_excitation as se
         # self.cSE = ChannelSELayer(num_channels, reduction_ratio)
 
         # self.encode1 = sm.EncoderBlock(params, se_block_type=se.SELayer.CSSE)
         # params["num_channels"] = params["num_filters"]
         # self.encode2 = sm.EncoderBlock(params, se_block_type=se.SELayer.CSSE)
         # self.encode3 = sm.EncoderBlock(params, se_block_type=se.SELayer.CSSE)
         # self.encode4 = sm.EncoderBlock(params, se_block_type=se.SELayer.CSSE)
         # self.bottleneck = sm.DenseBlock(params, se_block_type=se.SELayer.CSSE)
         # params["num_channels"] = params["num_filters"] * 2
         # self.decode1 = sm.DecoderBlock(params, se_block_type=se.SELayer.CSSE)
         # self.decode2 = sm.DecoderBlock(params, se_block_type=se.SELayer.CSSE)
         # self.decode3 = sm.DecoderBlock(params, se_block_type=se.SELayer.CSSE)
         # self.decode4 = sm.DecoderBlock(params, se_block_type=se.SELayer.CSSE)
 
         # self.encode1 = EncoderBlock(params, se_block_type=se.ChannelSELayer)
         self.encode1=EncoderBlock(params, se_block_type=se.SELayer.CSSE)
         params["num_channels"] =params["num_filters"]
         self.encode2=EncoderBlock(params, se_block_type=se.SELayer.CSSE)
         self.encode3=EncoderBlock(params, se_block_type=se.SELayer.CSSE)
         self.encode4=EncoderBlock(params, se_block_type=se.SELayer.CSSE)
         self.bottleneck=DenseBlock(params, se_block_type=se.SELayer.CSSE)
         params["num_channels"] =params["num_filters"] *2
         self.decode1=DecoderBlock(params, se_block_type=se.SELayer.CSSE)
         self.decode2=DecoderBlock(params, se_block_type=se.SELayer.CSSE)
         self.decode3=DecoderBlock(params, se_block_type=se.SELayer.CSSE)
         self.decode4=DecoderBlock(params, se_block_type=se.SELayer.CSSE)
         params["num_channels"] =params["num_filters"]
         self.classifier=ClassifierBlock(params)
 
     defforward(self, input):
         """
         :param input: X
         :return: probabiliy map
 
         """
 
         e1, out1, ind1=self.encode1.forward(input)
         e2, out2, ind2=self.encode2.forward(e1)
         e3, out3, ind3=self.encode3.forward(e2)
         e4, out4, ind4=self.encode4.forward(e3)
 
         bn=self.bottleneck.forward(e4)
 
         d4=self.decode4.forward(bn, out4, ind4)
         d3=self.decode1.forward(d4, out3, ind3)
         d2=self.decode2.forward(d3, out2, ind2)
         d1=self.decode3.forward(d2, out1, ind1)
         prob=self.classifier.forward(d1)
 
         returnprob
 
     defenable_test_dropout(self):
         """
         Enables test time drop out for uncertainity
         :return:
         """attr_dict=self.__dict__["_modules"]
         foriinrange(1, 5):
             encode_block, decode_block= (attr_dict["encode"+str(i)],
                 attr_dict["decode"+str(i)],
             )
             encode_block.drop_out=encode_block.drop_out.apply(nn.Module.train)
             decode_block.drop_out=decode_block.drop_out.apply(nn.Module.train)
 
     @property
     defis_cuda(self):
         """Check if model parameters are allocated on the GPU."""
         returnnext(self.parameters()).is_cuda
 
     defsave(self, path):
         """
         Save model with its parameters to the given path. Conventionally the
         path should end with '*.model'.
 
         Inputs:
         - path: path string
         """print("Saving model... %s"%path)
         torch.save(self.state_dict(), path)
 
     defpredict(self, X, device=0, enable_dropout=False):
         """
         Predicts the output after the model is trained.
         Inputs:
         - X: Volume to be predicted
         """
         self.eval()
         print("tensor size before transformation", X.shape)
 
         iftype(X) isnp.ndarray:
             # X = torch.tensor(X, requires_grad=False).type(torch.FloatTensor)
             X= (torch.tensor(X, requires_grad=False)
                 .type(torch.FloatTensor)
                 .cuda(device, non_blocking=True)
             )
         eliftype(X) istorch.TensorandnotX.is_cuda:
             X=X.type(torch.FloatTensor).cuda(device, non_blocking=True)
 
         print("tensor size", X.shape)
 
         ifenable_dropout:
             self.enable_test_dropout()
 
         withtorch.no_grad():
             out=self.forward(X)
 
         max_val, idx=torch.max(out, 1)
         idx=idx.data.cpu().numpy()
         prediction=np.squeeze(idx)
         print("prediction shape", prediction.shape)
         delX, out, idx, max_val
         returnprediction

损失函数

神经网络的训练须要一个损失函数来计算模型误差。训练的指标是最小化预测输入和指标输入之间的损失。咱们的模型应用 Dice Loss 和 Weighted Logistic Loss 的联结损失函数进行优化,其中权重弥补数据中的高类不均衡,并激励正确宰割解剖边界。

优化器

优化算法容许咱们持续更新模型的参数并最小化损失函数的值,咱们设置了以下的超参数:

学习率: 初始设置为 0.1,10 次后升高 1 阶。这能够通过学习率调度器来实现。

权重衰减:0.0001。

批量大小:1。

动量: 设置为 0.95 的高值,以弥补因为小批量大小而产生的噪声梯度。

训练网络

当初能够训练模型了。对于 QuickNAT 须要在 3 个(coronal, axial, sagittal)2d 切片上训练 3 个模型。而后再聚合步骤中组合三个模型的概率生成最终后果,然而本文中只演示在 coronal 视图的 2D 切片上训练一个 F -CNN 模型,因为其余两个与之相似。

 num_epochs=20
 start_epoch=1
 
 val_interval=1
 
 train_loss_epoch_values= []
 val_loss_epoch_values= []
 
 best_ds_mean=-1
 best_ds_mean_epoch=-1
 
 ds_mean_train_values= []
 ds_mean_val_values= []
 # ds_LCE_values = []
 # ds_LWM_values = []
 # ds_LCC_values = []
 
 print("START TRAINING. : model name =", "quicknat")
 
 forepochinrange(start_epoch, num_epochs):
     print("==== Epoch ["+str(epoch) +"/"+str(num_epochs)+"] DONE ====")   
 
     checkpoint_name=CHECKPOINT_DIR+"/checkpoint_epoch_"+str(epoch) +"."+CHECKPOINT_EXTENSION
     print(checkpoint_name)
     state= {
                 "epoch": epoch,
                 "arch": "quicknat",
                 "state_dict": model_coronal.state_dict(),
                 "optimizer": optimizer.state_dict(),
                 "scheduler": scheduler.state_dict(),}
     save_checkpoint(state=state, filename=checkpoint_name)
 
     print("\n==== Epoch [ %d  /  %d] START ===="% (epoch, num_epochs))
 
     steps_per_epoch=len(train_dataset_coronal_subset) /train_loader_coronal_subset.batch_size
 
     model_coronal.train()
     train_loss_epoch=0
     val_loss_epoch=0
     step=0
 
     predictions_train= []
     labels_train= []
 
     predictions_val= []
     labels_val= []    
 
     fori_batch, sample_batchedinenumerate(train_loader_coronal_subset):
       inputs=sample_batched['image'].type(torch.FloatTensor)
       labels=sample_batched['label'].type(torch.LongTensor)
 
       # print(f"Train Input Shape: {inputs.shape}")
 
       labels=labels.squeeze(1)
       _img_channels, _img_height, _img_width=labels.shape
       encoded_label=np.zeros((_img_height, _img_width, 1)).astype(int)
 
       forj, clsinenumerate(MaskColorMap):
           encoded_label[np.all(labels==cls.value, axis=0)] =j
 
       labels=encoded_label
       labels=torch.from_numpy(labels)
       labels=torch.permute(labels, (2, 1, 0))
 
       # print(f"Train Label Shape: {labels.shape}")
       # plt.title("Train Label")
       # plt.imshow(labels[0, :, :])
       # plt.show()
 
       optimizer.zero_grad()
       outputs=model_coronal(inputs)
       loss=loss_function(outputs, labels)
         
       loss.backward()
       optimizer.step()
       scheduler.step()
 
       withtorch.no_grad():
         _, batch_output=torch.max(outputs, dim=1)
         # print(f"Train Prediction Shape: {batch_output.shape}")
         # plt.title("Train Prediction")
         # plt.imshow(batch_output[0, :, :])
         # plt.show()
 
         predictions_train.append(batch_output.cpu())
         labels_train.append(labels.cpu())
         train_loss_epoch+=loss.item()
         print(f"{step}/{len(train_dataset_coronal_subset) //train_loader_coronal_subset.batch_size}, Training_loss: {loss.item():.4f}")
         step+=1
 
         predictions_train_arr, labels_train_arr=torch.cat(predictions_train), torch.cat(labels_train)
 
         #  print(predictions_train_arr.shape)
 
         dice_metric(predictions_train_arr, labels_train_arr)
 
     ds_mean_train=dice_metric.aggregate().item()
     ds_mean_train_values.append(ds_mean_train)    
     dice_metric.reset()
 
     train_loss_epoch/=step
     train_loss_epoch_values.append(train_loss_epoch)
     print(f"Epoch {epoch+1} Train Average Loss: {train_loss_epoch:.4f}")
     
     if (epoch+1) %val_interval==0:
 
       model_coronal.eval()
       step=0
 
       withtorch.no_grad():
 
         fori_batch, sample_batchedinenumerate(val_loader_coronal_subset):
           inputs=sample_batched['image'].type(torch.FloatTensor)
           labels=sample_batched['label'].type(torch.LongTensor)
 
           # print(f"Val Input Shape: {inputs.shape}")
 
           labels=labels.squeeze(1)
           integer_encoded_labels= []
           _img_channels, _img_height, _img_width=labels.shape
           encoded_label=np.zeros((_img_height, _img_width, 1)).astype(int)
 
           forj, clsinenumerate(MaskColorMap):
               encoded_label[np.all(labels==cls.value, axis=0)] =j
 
           labels=encoded_label
           labels=torch.from_numpy(labels)
           labels=torch.permute(labels, (2, 1, 0))
 
           # print(f"Val Label Shape: {labels.shape}")
           # plt.title("Val Label")
           # plt.imshow(labels[0, :, :])
           # plt.show()
 
           val_outputs=model_coronal(inputs)
 
           val_loss=loss_function(val_outputs, labels)
 
           predicted=torch.argmax(val_outputs, dim=1)
 
           # print(f"Val Prediction Shape: {predicted.shape}")
           # plt.title("Val Prediction")
           # plt.imshow(predicted[0, :, :])
           # plt.show()
         
           predictions_val.append(predicted)
           labels_val.append(labels)
 
           val_loss_epoch+=val_loss.item()
           print(f"{step}/{len(val_dataset_coronal_subset) //val_loader_coronal_subset.batch_size}, Validation_loss: {val_loss.item():.4f}")
           step+=1
 
           predictions_val_arr, labels_val_arr=torch.cat(predictions_val), torch.cat(labels_val)
 
           dice_metric(predictions_val_arr, labels_val_arr)
           # dice_metric_batch(predictions_val_arr, labels_val_arr)
             
         ds_mean_val=dice_metric.aggregate().item()
         ds_mean_val_values.append(ds_mean_val) 
         # ds_mean_val_batch = dice_metric_batch.aggregate()
         # ds_LCE = ds_mean_val_batch[0].item()
         # ds_LCE_values.append(ds_LCE)
         # ds_LWM = ds_mean_val_batch[1].item()
         # ds_LWM_values.append(ds_LWM)
         # ds_LCC = ds_mean_val_batch[2].item()
         # ds_LCC_values.append(ds_LCC)
 
         dice_metric.reset()      
         # dice_metric_batch.reset()    
 
         ifds_mean_val>best_ds_mean:
             best_ds_mean=ds_mean_val
             best_ds_mean_epoch=epoch+1
             torch.save(model_coronal.state_dict(), os.path.join(BESTMODEL_DIR, "best_metric_model_coronal.pth"))
             print("Saved new best metric model coronal")
 
         print(f"Current Epoch: {epoch+1} Current Mean Dice score is: {ds_mean_val:.4f}"
             f"\nBest Mean Dice score: {best_ds_mean:.4f}"
             # f"\nMean Dice score Left Cerebral Exterior: {ds_LCE:.4f} Mean Dice score Left White Matter: {ds_LWM:.4f} Mean Dice score Left Cerebral Cortex: {ds_LCC:.4f}"
             f"at Epoch: {best_ds_mean_epoch}"
         )
 
     val_loss_epoch/=step
     val_loss_epoch_values.append(val_loss_epoch)
     print(f"Epoch {epoch+1} Average Validation Loss: {val_loss_epoch:.4f}")
 
 print("FINISH.")         

代码也是传统的 Pytorch 的训练步骤,就不具体解释了

绘制损失和精度曲线

训练曲线示意模型的学习状况,验证曲线示意模型泛化到未见实例的状况。咱们应用 matplotlib 来绘制图形。还能够应用 TensorBoard,它使了解和调试深度学习程序变得更容易,并且是实时的。

 epoch=range(1, num_epochs+1)
 
 # Plot Loss Curves
 plt.figure(figsize=(18, 6))
 plt.subplot(1, 3, 1)
 plt.plot(epoch, train_loss_epoch_values, label='Training Loss')
 plt.plot(epoch, val_loss_epoch_values, label='Validation Loss')
 plt.title('Training and Validation Loss')
 plt.xlabel('Epoch')
 plt.legend()
 plt.figure()
 plt.show()
 
 # Plot Train Dice Coefficient Curve
 plt.figure(figsize=(18, 6))
 plt.subplot(1, 3, 2)
 x= [(i+1) foriinrange(len(ds_mean_train_values))]
 plt.plot(x, ds_mean_train_values, 'blue', label='Train Mean Dice Score')
 plt.title("Training Mean Dice Coefficient")
 plt.xlabel('Epoch')
 plt.ylabel('Mean Dice Score')
 plt.show()
 
 # Plot Validation Dice Coefficient Curve
 plt.figure(figsize=(18, 6))
 plt.subplot(1, 3, 3)
 x= [(i+1) foriinrange(len(ds_mean_val_values))]
 plt.plot(x, ds_mean_val_values, 'orange', label='Validation Mean Dice Score')
 plt.title("Validation Mean Dice Coefficient")
 plt.xlabel('Epoch')
 plt.ylabel('Mean Dice Score')
 plt.show()

在曲线中,咱们能够看到模型是过拟合的,因为验证损失回升而训练损失降落。这是深度学习算法中一个常见的陷阱,其中模型最终会记住训练数据,而无奈对未见过的数据进行泛化。

防止适度拟合的技巧:

  • 用更多的数据进行训练: 更大的数据集能够缩小过拟合。
  • 数据加强: 如果咱们不能收集更多的数据,咱们能够利用数据加强来人为地减少数据集的大小。
  • 增加正则化: 正则化是一种限度咱们的网络学习过于简单的模型的技术,因而可能会适度拟合。

评估网络

咱们如何度量模型的性能? 一个胜利的预测是一个最大限度地扩充预测和实在之间的重叠。

这一指标的两个相干但不同的指标是 Dice 和 Intersection / Union (IoU)系数,后者也被称为 Jaccard 系数。两个指标都在 0(无重叠)和 1(齐全重叠)之间。

这两种指标都能够用于相似的状况,然而区别在于 Dice Score 偏向于均匀体现,而 IoU 则帮忙你了解最坏状况下的体现。

咱们能够一一类地查看度量规范,或者取所有类的平均值。这里将应用 monai.metrics.DiceMetric 来计算分数。一个更通用的办法是应用 torchmetrics,然而因为这里应用了 monai 框架,所以就间接应用它内置的函数了。

咱们能够看到 Dice 得分曲线的行为相当不寻常。次要是因为验证均匀 Dice 得分高于 1,这是不可能的,因为这个度量是在 0 和 1 之间。咱们无奈确定这种行为的次要起因,但咱们倡议在多类问题中为每个类独自提供度量计算,并始终提供可视化示例以进行可视化评估。

后果剖析

最初咱们要看看模型是如何推广到未知数据的这个模型预测的简直所有货色都是左脑白质,一些像素是左脑皮层。只管它的预测仿佛是正确的,但仍有很大的改良空间,因为咱们的模型太小了,能够抉择更深的模型取得更好的成果。

总结

在本文中,咱们介绍了如何训练 QuickNAT 来实现具备挑战性的大脑宰割工作。咱们尽可能遵循作者在他们的钻研论文中解释的学习策略,这是本教程为了不便演示只在最简略的步骤上进行了演示,文本的残缺代码:

https://avoid.overfit.cn/post/e185c411051548b2999996c706d0fa51

作者:Ines del Val

正文完
 0