关于腾讯云:云原生的弹性-AI-训练系列之二PyTorch-190-弹性分布式训练的设计与实现

62次阅读

共计 11406 个字符,预计需要花费 29 分钟才能阅读完成。

背景

机器学习工作负载与传统的工作负载相比,一个比较显著的特点是 对 GPU 的需要旺盛 。在之前的文章中介绍过(https://mp.weixin.qq.com/s/Na… 和 https://mp.weixin.qq.com/s/X4…),目前 GPU 的显存曾经不足以跟上模型参数规模的倒退。随着 Transformer 等新的模型构造的呈现,这一问题越来越显著。算法工程师们训练模型所须要的资源越来越多, 分布式训练也随之成为了工业界进行模型训练的规范形式。

弹性训练可能在训练过程中动静地调整参加训练的实例数量,极大水平进步集群资源的利用率。同时,配合云上的竞价实例等资源类型,可能以更低的老本进行模型调优,进一步降本增效。在 PyTorch 最新公布的 1.9.0 版本中,其本来分布式训练的形式 torch.distributed.launch 行将被废除,转而举荐用户应用弹性的分布式训练接口 torch.distributed.run

借此机会,咱们对这一新个性进行简略地介绍,并且与 Horovod Elastic 进行简略地比照和剖析。最初总结一下应用弹性训练时,须要留神的问题。

PyTorch 1.9.0 之前的设计

PyTorch 是目前最风行的深度学习框架之一,它最让人称道的是 易用性。无论是单机训练还是分布式训练,PyTorch 都提供了简洁的 API。

PyTorch 1.9.0 版本之前,分布式训练的形式通常是通过如下的形式进行。

python -m torch.distributed.launch
        --nnodes=NODE_SIZE
        --nproc_per_node=TRAINERS_PER_NODE
        --node_rank=NODE_RANK
        --master_port=HOST_PORT
        --master_addr=HOST_NODE_ADDR
        YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)

其中 nnodes 是参加训练的节点个数,nproc_per_node 是每个节点上运行的过程数量。node_rank 是以后节点的标识符,master_addrmaster_port 是 master 监听的地址和端口。torch.distributed.launch 会设置一些环境变量,其中包含 WORLD_SIZEMASTER_PORTMASTER_ADDR 等。

随后在以后机器上会创立对应过程进行训练,以后机器会有 TRAINERS_PER_NODE 个过程,这些过程组成了一个 local worker group。一共有 NODE_SIZE 个机器参加训练,一共有 NODE_SIZE * TRAINERS_PER_NODE 个过程。如果想要发动一个分布式训练任务,须要在所有的机器上执行相应的命令。

PyTorch 1.9.0 中的新设计

在 PyTorch 1.9 中,torch.distributed.launch 行将被废除,取而代之的是基于 pytorch/elastic 的 torch.distributed.run。这一新的形式与之前相比有一些应用上的改变,如下所示。

python -m torch.distributed.run
        --nnodes=MIN_SIZE:MAX_SIZE
        --nproc_per_node=TRAINERS_PER_NODE
        --rdzv_id=JOB_ID
        --rdzv_backend=c10d
        --rdzv_endpoint=HOST_NODE_ADDR
        YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)

它提供了一些新的能力:首先是更好的容错,当 worker 失败后会主动重启持续训练;其次是 RANK 和 WORLD_SIZE 这些字段不再须要手动设置。最初也是最重要的,反对弹性训练,动静地减少或缩小参加训练的 worker 数量。在下面的例子中,nnodes 的设置不再是一个固定的值,而是一个区间。训练任务能够容忍在这一区间范畴内的 worker 数量变动。

如果要反对弹性能力,训练代码也须要进行一些批改。

def main():
     args = parse_args(sys.argv[1:])
     state = load_checkpoint(args.checkpoint_path)
     initialize(state)
     # torch.distributed.run ensure that this will work
     # by exporting all the env vars needed to initialize the process group
     torch.distributed.init_process_group(backend=args.backend)
     for i in range(state.epoch, state.total_num_epochs)
          for batch in iter(state.dataset)
              train(batch, state.model)
          state.epoch += 1
          save_checkpoint(state)

其中比拟显著的变动是,用户须要手动地解决 checkpoint。这是因为当 worker 呈现生效时,所有的 worker 都会重启,所以须要 checkpoint 机制来保障重启后训练可能继续下去。这一新的分布式训练形式引入不少新的概念,包含 agent、rendezvous 等。接下来咱们自用户能接触到的 torch.distributed.run 开始,介绍这些新的设计。

def run(args):
    if args.standalone:
        args.rdzv_backend = "c10d"
        args.rdzv_endpoint = "localhost:29400"
        args.rdzv_id = str(uuid.uuid4())
        log.info(
            f"\n**************************************\n"
            f"Rendezvous info:\n"
            f"--rdzv_backend={args.rdzv_backend}"
            f"--rdzv_endpoint={args.rdzv_endpoint}"
            f"--rdzv_id={args.rdzv_id}\n"
            f"**************************************\n"
        )
    config, cmd, cmd_args = config_from_args(args)
    elastic_launch(
        config=config,
        entrypoint=cmd,
    )(*cmd_args)

其中次要辨别了两个模式,Standalone 模式和分布式模式。Standalone 模式是分布式模式的一种特例,它次要针对单机多 Worker 的形式提供了一些便当的设置,不再须要设置一些多余的参数如 rdzv_backendrdzv_endpoint 等。

两者最初都会通过 elastic_launch 发动真正的训练过程。elastic_launch 会通过 elastic agent 来治理 worker 的生命周期,它的返回是每个 worker 的输入。

class elastic_launch:
    ...
    def __call__(self, *args):
        return launch_agent(self._config, self._entrypoint, list(args))
def launch_agent(
    config: LaunchConfig,
    entrypoint: Union[Callable, str, None],
    args: List[Any],
) -> Dict[int, Any]:
    ...
    agent = LocalElasticAgent(spec=spec, start_method=config.start_method, log_dir=config.log_dir)
    ...
    result = agent.run()
    ...
    return result.return_values

Elastic Agent 的设计:如何治理多个 worker 过程

elastic agent 是一个独立的过程,负责管理其下的 workers。它起到了相似过程管理系统 supervisor 的作用,会在启动的时候确保每个 worker 的设置正确。因为无关 WORLD_SIZE 和 RANK 的信息不再须要用户提供,elastic agent 会负责解决。

除此之外,worker 的生效也是由 elastic agent 负责捕捉解决。能够说 elastic agent 是弹性训练中最外围的抽象概念

上图展现的是elastic agent 的工作原理。

不同的 elastic agent 之间通过 rendezvous 进行 worker 之间的互相发现和对成员变动的同步。与此同时,通过对 worker 过程的监控,来捕捉训练过程中的生效。其中外围的逻辑都包装在 LocalElasticAgent.run() 这一函数调用中。

    def run(self, role: str = DEFAULT_ROLE) -> RunResult:
        ...
        result = self._invoke_run(role)
        return result
    def _invoke_run(self, role: str = DEFAULT_ROLE) -> RunResult:
        ...
        self._initialize_workers(self._worker_group)
        while True:
            ...
            run_result = self._monitor_workers(self._worker_group)
            state = run_result.state
            ...
            if state == WorkerState.SUCCEEDED:
                ...
                return run_result
            elif state in {WorkerState.UNHEALTHY, WorkerState.FAILED}:
                if self._remaining_restarts > 0:
                    ...
                    self._restart_workers(self._worker_group)
                else:
                    ...
                    return run_result
            elif state == WorkerState.HEALTHY:
                ...
                if num_nodes_waiting > 0:
                    ...
                    self._restart_workers(self._worker_group)
            else:
                raise Exception(f"[{role}] Worker group in {state.name} state")

能够看到,外围的逻辑在 _invoke_run。其中 _initialize_workers 执行了大部分初始化的工作,其中包含为每个 worker 调配 RANK 等。在默认的实现中 elastic agent 和 workers 过程在同一机器上,因而 self._monitor_workers(self._worker_group) 通过 multiprocessing 对 workers 的运行状态进行了监控。并且依据不同的状态,进行不同的解决。

elastic agent 的 可扩展性十分好,在 1.9.0 版本中,一共有三个 Agent,别离是 ElasticAgentSimpleElasticAgentLocalElasticAgent

其中 ElasticAgent 是一个 Abstract Class,SimpleElasticAgent 对其中的某些函数进行了实现,而 LocalElasticAgent 则实现了治理单机上所有 worker 过程的 elastic agent。

SimpleElasticAgent 这一个形象次要是为了不便扩大新的 agent 实现,比方如果你想通过一个 agent 治理多机上所有的 worker,而不只是本机上的 worker,则能够通过扩大 SimpleElasticAgent 来实现。

rendezvous 的设计:如何在不同的节点间确定 RANK

接下来,咱们再看另外一个 外围的形象 rendezvous。为了实现弹性训练,worker 之间要可能动静地进行 membership 的变更。rendezvous 就是实现这一个性的用于同步的组件。

rendezvous 最外围的办法 是:

    @abstractmethod
    def next_rendezvous(self,) -> Tuple[Store, int, int]:
        """Main entry-point into the rendezvous barrier.
        Blocks until the rendezvous is complete and the current process is
        included in the formed worker group, or a timeout occurs, or the
        rendezvous was marked closed.
        Returns:
            A tuple of :py:class:`torch.distributed.Store`, ``rank``, and
            ``world size``.
        Raises:
            RendezvousClosedError:
                The rendezvous is closed.
            RendezvousConnectionError:
                The connection to the rendezvous backend has failed.
            RendezvousStateError:
                The rendezvous state is corrupt.
            RendezvousTimeoutError:
                The rendezvous did not complete on time.
        """

如正文所示,这一函数调用会被阻塞,直到 worker 的数量达到了要求。在 worker 被初始化,或者重启的时候,这一函数都会被调用。当函数返回时,不同的 worker 会以返回中的 rank 作为惟一的标示。rendezvous 一共有四个实现,别离是 etcdetcd-v2c10dstatic

class EtcdRendezvousHandler(RendezvousHandler):
    def next_rendezvous(self):
        rdzv_version, rank, world_size = self._rdzv_impl.rendezvous_barrier()
        log.info("Creating EtcdStore as the c10d::Store implementation")
        store = self._rdzv_impl.setup_kv_store(rdzv_version)
        return store, rank, world_size

其中 etcd 相干的是之前举荐应用的实现,在 c10d 呈现后就不再举荐了。etcd 的实现中,不同 worker 之间的状态通过 etcd 的 KV 接口存储。

确定参加训练的实例和对应的 RANK 的过程 如下图所示。

首先会在 /rdzv/active_version 下尝试写一个值 status: setup。在整个过程中,/rdzv/active_version 会作为存储 rendezvous 过程中间状态的 KV store,以及 rendezvous 过程中的排他锁来应用。

如果写失败了,阐明目前曾经有对应的 rendezvous 过程正在进行中。

在胜利后,会更新 /rdzv/version_counter 为原值加一。而后会创立一个目录 /rdzv/v_${version_counter}。这些操作做完后,会将 /rdzv/active_version 的状态写为 joinable,这时就进入了 join 阶段。

在 join 阶段,不同的 agent 在锁的爱护下,会顺次更新 /rdzv/active_version 下的 paticipants,调配到递增的 rank,这里的 rank 并不是每个 worker 过程调配到的 global rank,而是 agent 本人的 rank。worker 过程的 rank 会依据 agent rank 通过肯定的计算失去。这也是一个非常容易混同的设计,窃以为有优化的空间。

    def init_phase(self):
        try:
            active_version = self.try_create_rendezvous()
            state = json.loads(active_version.value)
            log.info("New rendezvous state created:" + str(state))
        except etcd.EtcdAlreadyExist:
            # 曾经有了一个新的 rendezvous 过程
            active_version, state = self.get_rdzv_state()
            # Note: it is possible for above query to fail (etcd.EtcdKeyNotFound),
            # but this is ok for us - just means we'll restart from beginning.
            log.info("Observed existing rendezvous state:" + str(state))
        if state["status"] == "closed":
            raise RendezvousClosedError()
        if state["status"] == "joinable":
            return self.join_phase(state["version"])
        if state["status"] == "final":
            self.handle_existing_rendezvous(state["version"])
            raise EtcdRendezvousRetryImmediately()
        self.try_wait_for_state_change(etcd_index=active_version.etcd_index + 1)
        raise EtcdRendezvousRetryableFailure()

在参加训练的节点达到 nnodes 的命令行参数中传入的最小值时,会期待肯定工夫,在等待时间完结或者参加训练的节点达到了 nnodes 设定的最大值时,会进入 frozen 阶段。

在 fronzen 阶段中,每个参加训练的节点都须要通过在 /rdzv/v_${version_counter}/rank_${agent_rank} 下写值的形式进行确认。在所有节点都确认结束后,会进入最初的 final 阶段。

在最初的 final 阶段中,后续进入的 agent 都会 pending,曾经达成 rendezvous 的节点上的 agent 会为其治理的 worker 过程调配 RANKRANK 0 的实例会作为 master 的角色存在。随后就会间接创立对应的 worker 过程。在默认的 LocalElasticAgent 中,会利用 python.multiprocessing 在本地创立多个过程。

    @prof
    def _start_workers(self, worker_group: WorkerGroup) -> Dict[int, Any]:
        spec = worker_group.spec
        store = worker_group.store
        ...
        for worker in worker_group.workers:
            local_rank = worker.local_rank
            worker_env = {"LOCAL_RANK": str(local_rank),
                "RANK": str(worker.global_rank),
                ...
            }
            ...
            args[local_rank] = tuple(worker_args)
        ...
        self._pcontext = start_processes(
            name=spec.role,
            entrypoint=spec.entrypoint,
            args=args,
            envs=envs,
            log_dir=attempt_log_dir,
            start_method=self._start_method,
            redirects=spec.redirects,
            tee=spec.tee,
        )
        return self._pcontext.pids()

c10d 新的设计

前文介绍了基于 etcd 的 rendezvous 实现,它能够保障多个实例之间对于参加训练的节点共识的强统一,然而这也为 PyTorch 运行训练任务引入了额定的依赖。因而 PyTorch 也提供了一个内置的实现 c10d。相比于基于 etcd 的实现,c10d 基于 TCP 来进行同步。

def create_backend(params: RendezvousParameters) -> Tuple[C10dRendezvousBackend, Store]:
    ...
    if store_type == "file":
        store = _create_file_store(params)
    elif store_type == "tcp":
        store = _create_tcp_store(params)
    ...
    backend = C10dRendezvousBackend(store, params.run_id)
def _create_tcp_store(params: RendezvousParameters) -> TCPStore:
    host, port = parse_rendezvous_endpoint(params.endpoint, default_port=29400)
    ...
    for is_server in [is_host, False]:
        ...
        store = TCPStore(host, port, is_master=is_server, timeout=timedelta(seconds=read_timeout)
        )
        ...
        break
    return store

c10d 是一个 client-server 的架构,其中的一个 agent 上会运行 c10d 的 TCPServer,它监听给定的端口,提供了 compareAndSetadd 等原语。它也能够被了解为一个简化的,提供 KV 接口的内存数据库,相似于 Redis。无关 rendezvous 的同步,都是由各个 agent 通过一个中心化的 agent 上的 c10d TCPServer 实现的。能够预感这样的实现在可用性上相比于 etcd 是有肯定差距的,然而胜在易用性。用户如果应用 c10d,那么不再须要运维一个 etcd 集群。

PyTorch Elastic on Kubernetes

为了可能享受到弹性训练带来的便当,PyTorch 同时提供了在 Kubernetes 上的反对。相比于 1.9.0 之前的版本,新版本的分布式训练增加了一些新的参数。因而 PyTorch 社区在 Kubeflow PyTorch operator 的根底上,对 CRD 进行了一些批改。一个典型的弹性训练示例如下所示:

apiVersion: elastic.pytorch.org/v1alpha1
kind: ElasticJob
metadata:
  name: imagenet
  namespace: elastic-job
spec:
  # Use "etcd-service:2379" if you already apply etcd.yaml
  rdzvEndpoint: "<your_etcd_endpoint>:<your_etcd_port>"
  minReplicas: 1
  maxReplicas: 2
  replicaSpecs:
    Worker:
      replicas: 2
      restartPolicy: ExitCode
      template:
        apiVersion: v1
        kind: Pod
        spec:
          containers:
            - name: elasticjob-worker
              image: torchelastic/examples:0.2.0
              imagePullPolicy: Always
              args:
                - "--nproc_per_node=1"
                - "/workspace/examples/imagenet/main.py"
                - "--arch=resnet18"
                - "--epochs=20"
                - "--batch-size=32"
                # number of data loader workers (NOT trainers)
                # zero means load the data on the same process as the trainer
                # this is set so that the container does not OOM since
                # pytorch data loaders use shm
                - "--workers=0"
                - "/workspace/data/tiny-imagenet-200"
              resources:
                limits:
                  nvidia.com/gpu: 1

因为在最开始,基于 c10drendezvous 还没有被反对,所以 CRD 中须要定义 rdzvEndpoint,指向一个曾经部署好的 etcd 集群。同时,用户须要指定 minReplicasmaxReplicas。其余就与 Kubeflow PyTorchJob 并无二致。

PyTorch Elastic 与 Horovod Elastic

目前,两者的设计从原理上来说并无二致。相比于 Horovod Elastic,PyTorch Elastic 提供了更灵便的扩展性,它提供了 agentrendezvous 等接口,用户能够依据须要进行扩大。然而从另外一个角度讲,Horovod 的易用性做的更好

PyTorch 并没有提供保留状态的内置反对,为了可能在 worker 过程失败,重建训练任务的时候,须要用户本人实现保留会加载 checkpoint 的逻辑;而 Horovod 则提供了内置的实现。

Horovod 和 PyTorch 在同步机制上也具备比拟大的差别。Horovod Elastic 须要用户提供一个脚本 discovery_hosts.sh,帮忙其在运行时取得正在参加训练的节点。

$ horovodrun -np 8 --host-discovery-script discover_hosts.sh python train.py
...
$ ./discover_hosts.sh
host-1:29500
host-2:29500
host-3:29500

这相当于将节点发现的逻辑交给用户来实现。反观 PyTorch,它利用 etcd、本身实现的 c10d 等组件解决节点间的互相发现问题,显得更为精美。

总结

在文章最初,咱们总结一下目前实现弹性训练须要留神的问题。

首先,也是最重要的,弹性训练须要一种机制来解决节点 / 训练过程间互相发现的问题。训练过程中节点会动静地退出或者退出,如何让其余的节点感知到这一变动,是这一机制次要面对的问题。目前的设计中,Horovod 将这一问题交给用户来解决,Horovod 定期执行用户定义的逻辑来发现目前的节点。PyTorch 通过第三方的分布式一致性中间件 etcd 等来实现高可用的节点发现。除此之外,也有一些探索性的工作,利用基于 Gossip 的协定来进行同步,在兼顾高可用的同时也没有引入过多的组件。

其次,要实现弹性训练还须要捕捉训练生效。Horovod 和 PyTorch 都通过一个后盾过程(Horovod 中是 Driver,PyTorch 中是每个节点的 Local Elastic Agent)来实现这一逻辑。当过程 crash,或在梯度通信中遇到问题时,后盾过程会捕捉到生效并且从新进行节点发现,而后重启训练。

最初,训练时的数据切分的逻辑和学习率 / batch size 的设置也要对应进行批改。因为参加训练的过程会动静的增减,因而可能须要依据新的训练过程的规模来从新设置学习率和数据调配的逻辑,防止影响模型收敛。

在本文中,咱们首先介绍了 PyTorch 1.9.0 版本中弹性训练的设计与实现。而后剖析总结了实现弹性训练的形式和不同框架之间的设计差别。从咱们的角度来看,弹性训练可能很好地贴合云原生的趋势,以极致的弹性来降低成本进步资源利用率,是将来的趋势。因而目前咱们也在积极参与 TensorFlow、PyTorch 和 Kubeflow 等社区的弹性训练的社区奉献工作,后续会公布更多的相干文章,感激关注。

【腾讯云原生】云说新品、云研新术、云游新活、云赏资讯,扫码关注同名公众号,及时获取更多干货!!

正文完
 0