关于机器学习:SageMaker管道模式下如何使用Horovod实现多GPU分布式训练

51次阅读

共计 8425 个字符,预计需要花费 22 分钟才能阅读完成。

以后,咱们能够应用多种技术通过大量数据训练出深度学习模型,具体包含针对图像分类工作的迁徙学习、少样本学习甚至是一次性学习等,也能够基于预训练的 BERT 或 GPT2 模型对语言模型进行微调。然而,在局部利用用例中咱们依然须要引入大量训练数据。例如,如果以后图像与 ImageNet 数据集内的图像齐全不同,或者以后语言语料库只针对特定畛域、而非通用类型,那么单凭迁徙学习将很难带来现实的模型性能。

作为深度学习钻研人员,咱们可能须要从零开始尝试新的思路或办法。在这种状况下,咱们必须应用大型数据集训练出大型深度学习模型;在找不到最佳训练方法的状况下,整个过程可能须要几天、几周甚至是几个月。

在本文中,咱们将一起理解如何在 Amazon SageMaker 的繁多实例之上运行多 GPU 训练,并探讨如何在 Amazon SageMaker 上实现高效多 GPU 与多节点分布式训练。

Horovod 基础知识

在应用大量数据进行模型训练时,最好是将训练作业调配给多个 GPU(繁多实例或者多个实例)。深度学习框架提供内置办法以反对多 GPU 训练或分布式训练。但除此之外,还有另外一种实现办法,即间接应用分布式深度学习框架(例如 Horovod)。

Horovod 是 Uber 公司打造的分布式深度学习开源框架,可能与 TensorFlow、Keras、PyTorch 以及 Apache MXNet 等一线热门深度学习工具包协同应用。Horovod 应用 all-reduce 算法取代以往的参数服务器办法进行疾速分布式训练,其中还提供多种优化办法以进一步放慢分布式训练的执行速度。对于更多详细信息,请参阅遇见 Horovod:面向 TensorFlow 的 Uber 开源分布式深度学习框架。

为 Horovod 筹备数据

在应用 Horovod 执行训练作业时,Horovod 会为其集群当中的每个 GPU 上的工作节点启动独立的过程(每个 GPU 对应一个工作节点)。例如,如果应用一个蕴含 4 GPU 的训练实例(一台 Amazon SageMaker ml.p3.8xlarge 或 Amazon Elastic Compute Cloud (Amazon EC2) p3.8xlarge 实例)运行 Horovod 训练作业,则将对应启动 4 个工作过程。数据集本体曾经出于数据并行性的需要而被拆分为多个分片,所有这 4 个工作节点都将别离读取本人的数据集分片。如果有 40000 个训练样本,则每个工作节点将取得 10000 个互不反复的训练样本。

如果应用 Horovod 进行分布式训练甚至是多 GPU 训练,则应当时做好数据分片筹备,并指引工作节点从文件系统中读取各个分片。(某些深度学习框架能够主动执行此操作,如 PyTorch 的 DataParallel 与 DistributedDataParallel)。

下图所示,为进行分片存储的两种可行架构。

咱们能够通过多种不同形式为 Amazon SageMaker 训练作业提供数据集。一种最典型的办法就是将所有数据集存储在 Amazon Simple Storage Service (Amazon S3) 存储桶内,并在须要时进行拜访。大家当然能够应用共享文件系统(例如 Amazon FSx for Lustre 或 Amazon Elastic File System,简称 Amazon EFS) 实现数据存储,但通过 Amazon SageMaker 内置的两种输出模式(文件模式与管道模式)间接从 Amazon S3 中检索数据可能防止零碎产生额定的服务老本。

在文件模式下,当 Amazon SageMaker 启动训练作业后,数据集将从指定的 S3 存储桶被传送至训练实例当中,并将其搁置在某个特定目录之内。但如果应用的数据集极为宏大,那么将对象从存储桶复制至训练实例的存储上往往须要消耗很长时间,而且直到数据传输实现,训练作业才会真正开始。这会在某些状况下拖慢机器学习(ML)的执行流程,甚至影响到翻新或钻研工作的我的项目进度。

另外,大家也能够通过管道模式间接拜访存储在 Amazon S3 中的数据集。管道模式在训练实例与 S3 存储桶之间创立间接输出管道,并容许训练过程间接拜访对象,这就打消了在训练开始之前将所有对象复制至训练实例中的工作。要对给定 Amazon S3 URI 中的数据集以管道模式加以拜访,请在创立 Amazon SageMaker Estimator 时将输出模式设置为 Pipe,具体参见以下代码:

from sagemaker.tensorflow import TensorFlow

tf_estimator = TensorFlow(entry_point='train.py',
 role='SageMakerRole',
 train_instance_type='ml.p3.2xlarge',
 train_instance_count=2,
 framework_version='2.1.0',
 py_version='py3',
 input_mode='Pipe')

在管道模式下,训练数据将作为 FIFO 流的模式进行交付。TensorFlow 扩大的 dataset 类极大升高了拜访流数据集的难度。对于管道模式与 TensorFlow 的更多详细信息,请参阅在 Amazon SageMaker 上应用高速管道模式放慢模型训练,以及 Amazon SageMaker TensorFlow 扩大 GitHub repo。

配合 Horovod 应用管道模式

当配合 Horovod 应用管道模式执行单机多卡或者多机多卡的分布式训练时,有一点须要特地留神。下图所示为这类场景的根本架构。

管道模式将数据从 Amazon S3 流式的传送到训练实例当中的 Unix 命名管道 /FIFOs 当中。一个 FIFO 文件仅反对一对写入 / 读取程序,且每轮训练周期内咱们只能为一条通道创立一个 FIFO 文件。通常,人们会为训练数据集定义一条通道,并为验证或测试数据集定义另一条独自的通道,而后将这些输出通道作为 Amazon SageMaker Estimator 中 fit () 函数的参数传递至训练作业。详见以下代码:

from sagemaker.session import s3_input

input_channel = {'train': s3_input('s3://your-bucket-name/train-dataset/')}

tf_estimator.fit(inputs=input_channel) 

这种形式在 Horovod 多 GPU 训练场景下又会造成怎么的影响?简而言之,应用 Horovod 在多 GPU 训练作业中启动的各个过程,会互相争用繁多 FIFO,导致多个过程无奈同时拜访这些 FIFO。而且因为同一时间内只有繁多工作过程可能拜访 FIFO,且在实现训练作业之前不会开释句柄,就导致所有其余工作过程无奈从该 FIFO 中读取数据,最终令训练作业陷入死锁式的有限循环。如果看到相似于以下模式的反复提醒音讯,则表明遇到了这样的问题:

[1,0]<stderr>:Stalled ranks:
[1,0]<stderr>:0: [training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_11_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_12_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_14_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_15_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_18_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_19_0 ...]
[1,0]<stderr>:2: [training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_11_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_12_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_14_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_15_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_18_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_19_0 ...]
[1,0]<stderr>:3: [training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_11_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_12_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_14_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_15_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_18_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_19_0 ...]

咱们能够对 S3 存储桶中的数据集进行分片,且数量与用于训练作业的 GPU 数量绝对应。如果领有 4000 个 TensorFlow 记录文件,且应用一台带有 4 GPU 的 ml.p3.8xlarge 实例进行模型训练,则能够为互不反复的 1000 个 TensorFLow 记录文件设定不同的前缀,如以下代码所示:

s3://your-bucket-name/train/0/
s3://your-bucket-name/train/1/
s3://your-bucket-name/train/2/
s3://your-bucket-name/train/3/

应用 SharedByS3Key 作为 Amazon S3 数据类型调配形式进行的数据集分片办法,并不齐全实用于 Horovod。这是因为在应用 SharedByS3Key 时,分片只会以实例为单位、而非以工作过程为单位进行,且实例中的工作过程与 GPU 的数量保持一致。同样的,各个实例依然只领有一条输出通道。因而,大家须要将数据集的分片数量,设定为与 Horovod 集群内 GPU 数雷同。

接下来,咱们须要为 Amazon SageMaker 训练定义四条输出通道,具体参见以下代码:

from sagemaker.session import s3_input

shuffle_config = sagemaker.session.ShuffleConfig(234)

train_s3_uri_prefix = 's3://your-bucket-name/train'
input_channels = {}

for idx in range(4):
 train_s3_uri = f'{train_s3_uri_prefix}/train/{idx}/'
 train_s3_input = s3_input(train_s3_uri, shuffle_config=shuffle_config)
 input_channels[f'train_{idx}'] = train_s3_input

ShuffleConfig 将确保依据每个训练轮次,对 Amazon S3 前缀下各文件的应用程序进行随机调配。对于更多详细信息,请参阅 ShuffleConfig。

在 Amazon SageMaker Estimator 上调用 fit 办法时,请应用以下通道定义:

tf_estimator.fit(input_channels)

对于验证及测试类工作,咱们只需在繁多工作过程上运行(通常应用主工作过程或 Rank 0 工作过程)。在这里,咱们不须要设置多条验证或测试通道。但如果应用 tf.keras.model.fit() 函数进行训练,则训练会在只有一个 Horovod 工作过程进行验证时进行(对于更多详细信息,请参阅 Horovod GitHub repo 上的 issue #600)。如果须要应用 tf.keras.model.fit() 进行验证,大家还应为各验证数据集提供对应的输出通道(相似于训练输出通道)。请留神,截至 2020 年 7 月,管道模式下训练作业的输出通道总数下限为 20 个。具体请参见以下代码:

validation_s3_uri = 's3://your-bucket-name/validation/'

for idx in range(4):
 validation_s3_input = s3_input(validation_s3_uri)
 input_channels[f'validation_{idx}'] = validation_s3_input

eval_s3_uri = 's3://your-bucket-name/eval/'
eval_s3_input = s3_input(eval_s3_uri)
input_channels['eval'] = eval_s3_input

相较于间接应用 S3 存储桶前缀,咱们在这里能够应用蕴含有对象键列表的一般 ManifestFile。对于更多详细信息,请参阅输出数据。

在训练代码中应用数据通道

在训练脚本中,咱们须要强制要求各个 Horovod 工作过程只拜访属于它本人的数据集分片,确保两个工作过程不会拜访同一输出通道。在本文的用例中,咱们将应用从 0 开始的索引定义各输出通道名称。为此能够应用 hvd.rank() 函数,由其为当前工作过程在集群范畴之内提供惟一的排名索引,且排名同样从 0 开始(请参考以下代码中的第 13 行)。在本文示例中,咱们应用 Amazon SageMaker TensorFlow 扩大 PipeModeDataset。对于其余深度学习框架,请在每个训练轮次中从名为 /opt/ml/input/data/[channel_name]_${epoch} 的 FIFO 文件中读取数据。对于更多示例,请参见 GitHub repo。

 1: from sagemaker_tensorflow import PipeModeDataset
 2:
 3: features = {'data': tf.FixedLenFeature([], tf.string),
 4: 'labels': tf.FixedLenFeature([], tf.int64)}
 5:
 6: def parse(record):
 7: parsed = tf.parse_single_example(record, features)
 8: return ({9: 'data': tf.decode_raw(parsed['data'], tf.float64)
10: }, parsed['labels'])
11:
12: # For Horovod and Pipe mode, use the input channel allocated to this worker using rank information
13: channel_name = 'train_{}'.format(hvd.rank())
14:
15: ds = PipeModeDataset(channel=channel_name, record_format='TFRecord')
16: ds = ds.map(parse)
17: ds = ds.batch(64)
18: ds = ds.prefetch(10)

在蕴含一个或多个实例的 Horovod 集群中,排名调配形式为从 0 开始,至 GPU 数量 – 1 完结。只有正确定义了输出通道的名称并从 0 开始应用索引,咱们就不用分神治理各实例或名位的排列程序。

应用 Tensorboard 进行监控

在对训练过程加以灵便监控方面,咱们能够在每个训练轮次完结时首先将日志上传至 S3 存储桶,再通过任意近程计算实例调用 Tensorboard。为此,咱们须要创立一项回调以将本地日志推送至 S3 存储桶门路,此门路仅限于运行在 Horovod 上的主(Rank 0)计算节点。具体请参见以下代码:

class Sync2S3(tf.keras.callbacks.Callback):
 def __init__(self, logdir, s3logdir):
 super(Sync2S3, self).__init__()
 self.logdir = logdir
 self.s3logdir = s3logdir
 def on_epoch_end(self, batch, logs={}):
 os.system('aws s3 sync'+self.logdir+' '+self.s3logdir)
...
if hvd.rank() == 0:
 logdir = args.output_data_dir + '/' + datetime.now().strftime("%Y%m%d-%H%M%S")
 callbacks.append(TensorBoard(log_dir=logdir))
 callbacks.append(Sync2S3(logdir=logdir, s3logdir=tensorboard_logs))

通过将训练日志存储在 S3 存储桶内,大家能够在任意服务器上运行 Tensorboard,包含 EC2 实例、Amazon SgaeMaker notebook 实例甚至是本地计算机,并为该 Tensorboard 托管服务器提供拜访 Amazon S3 日志对象的权限。为了反对从 Amazon S3 源处间接提取日志数据,咱们的 Tensorboard 必须为 1.14.0 或者更高版本。以下命令行应用的是位于 us-east- 1 区域内 S3 存储桶上的日志记录:

S3_REGION=us-east-1
tensorboard --logdir s3://{bucket_name}/tensorboard_logs/

如果是在 Amazon SageMaker notebook 实例上运行以上命令,则可通过 https://<SageMaker-notebook-instance-name>.notebook.<notebook-region>.sagemaker.aws/proxy/6006/ 实现拜访。

资源清理

在实现本文中分布式训练作业之后,请清理相应资源以防止其后续产生额定费用,包含 S3 存储桶、FSx for Lustre 以及各个 Amazon SageMaker 实例。

总结

在 Amazon SageMaker 上以管道模式应用 Horovod 的多 GPU 或分布式训练方法,可能为数据集的各个分片创立独立的训练通道并在数据通道内拜访对应分片,借此实现大规模模型训练。这种形式可能缩短在理论训练开始之前将数据集传输至训练实例所占用的工夫,因而特地实用于具备大规模训练数据集的 Amazon SageMaker 训练场景。

对于在 Amazon SageMaker 上运行的残缺训练示例(管道模式加 Horovod),请参阅 GitHub repo。

正文完
 0