乐趣区

关于人工智能:PyTorch之分布式操作Barrier

PyTorch 之分布式操作 Barrier

原始文档:https://www.yuque.com/lart/ug…

对于 barrier 的概念

对于 barrier 这个概念能够参考 Wiki 中的介绍:同步屏障 (Barrier) 是并行计算中的一种同步办法。对于一群过程或线程,程序中的一个同步屏障意味着任何线程 / 过程执行到尔后必须期待,直到所有线程 / 过程都达到此点才可继续执行下文。

这里要留神,barrier 这一办法并不是 pytorch 独有的,这是并行计算中的一个基本概念,其余的并行计算的场景下也可能会波及这一概念和操作。本文次要探讨 pytorch 中的状况。

torch.distributed.barrier(group=None, async_op=False, device_ids=None)

Synchronizes all processes.

This collective blocks processes until the whole group enters this function, if async_op is False, or if async work handle is called on wait().

Parameters
group (ProcessGroup, optional) – The process group to work on. If None, the default process group will be used.
async_op (bool, optional) – Whether this op should be an async op
device_ids ([int], optional) – List of device/GPU ids. Valid only for NCCL backend.

Returns
Async work handle, if async_op is set to True. None, if not async_op or if not part of the group

在多卡训练的时候,因为不同的 GPU 往往被设定在不同的过程中,有时候为了在独自的过程中执行一些工作,然而又同时心愿限度其余过程的执行进度,就有了应用 barrier 的需要。
一个理论的场景是筹备数据集:咱们只须要在 0 号过程解决,其余过程没必要也执行这一工作,然而其余过程的后续工作却依赖筹备好的数据。于是就须要在 0 号过程执行过程中阻塞其余的过程,使其进入期待状态。等到解决好之后,再一起放行。

这种需要下,一个典型的基于上下文管理器模式的结构如下:

# https://github.com/ultralytics/yolov5/blob/7d56d451241e94cd9dbe4fcb9bfba0e92c6e0e23/utils/torch_utils.py#L29-L38

@contextmanager
def torch_distributed_zero_first(local_rank: int):
    """
    Decorator to make all processes in distributed training
    wait for each local_master to do something.
    """
    if local_rank not in [-1, 0]:
        dist.barrier(device_ids=[local_rank])
    yield
    if local_rank == 0:
        dist.barrier(device_ids=[0])

对于 barrier 的细节

# -*- coding: utf-8 -*-

import os
import time

import torch.distributed as dist
import torch.multiprocessing as mp


def ddp_test_v0(local_rank, word_size):
    # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
    dist.init_process_group(backend="nccl", world_size=word_size, rank=local_rank)

    print("first before barrier{}\n".format(local_rank))
    if local_rank != 0:
        dist.barrier()
    print("first after barrier{}\n".format(local_rank))

    print("inter {}".format(local_rank))

    print("second before barrier{}\n".format(local_rank))
    if local_rank == 0:
        dist.barrier()
    print("second after barrier{}\n".format(local_rank))

    print("{} exit".format(local_rank))


def ddp_test_v1(local_rank, word_size):
    # Initializes the distributed backend which will take care of synchronizing nodes/GPUs
    dist.init_process_group(backend="nccl", world_size=word_size, rank=local_rank)

    if local_rank != 0:
        print("1 before barrier{}\n".format(local_rank))
        start = time.time()
        time.sleep(5)
        dist.barrier()
        print(time.time() - start)
        print("1 after barrier{}\n".format(local_rank))
        dist.barrier()
        print("1 after barrier{}\n".format(local_rank))
    else:
        print("0 before barrier{}\n".format(local_rank))
        start = time.time()
        dist.barrier()
        print(time.time() - start)
        print("0 after barrier{}\n".format(local_rank))
        print("0 after barrier{}\n".format(local_rank))
        dist.barrier()
        print("0 after barrier{}\n".format(local_rank))

    print("{} exit".format(local_rank))


def main():
    world_size = 2
    os.environ["MASTER_ADDR"] = "127.0.0.1"
    os.environ["MASTER_PORT"] = "29500"
    mp.spawn(ddp_test_v0, args=(world_size,), nprocs=world_size, join=True)


if __name__ == "__main__":
    main()

这里展现了两个例子,实际上在官网展现的 dist.barrier  之外显示了该办法的一个重要个性,就是其操作实际上是每一个过程外部都须要对应的执行同样的次数,才会对应的由阻塞变为失常运行。
先看第一个例子:

def ddp_test(local_rank, word_size):
    # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
    dist.init_process_group(backend="nccl", world_size=word_size, rank=local_rank)

    print("first before barrier{}\n".format(local_rank))
    if local_rank != 0:
        dist.barrier()
    print("first after barrier{}\n".format(local_rank))

    print("inter {}".format(local_rank))

    print("second before barrier{}\n".format(local_rank))
    if local_rank == 0:
        dist.barrier()
    print("second after barrier{}\n".format(local_rank))

    print("{} exit".format(local_rank))

其输入是:

first before barrier1
first before barrier0


first after barrier0

inter 0
second before barrier0

second after barrier0

0 exit
first after barrier1

inter 1
second before barrier1

second after barrier1

1 exit

Process finished with exit code 0

能够看到,有几个细节:

  • barrier  之前,所有的操作都是各 GPU 过程本人输入本人的。

    • 因为 local_rank=0  执行到本人可见的 barrier  两头会输入多个,而 local_rank=1  则只有一条 first before barrier1
  • second before barrier0  之后,0 号执行到了属于本人的 barrier,这回让使得其余过程不再阻塞,开始失常运行。因为两头操作的工夫,所以先是 0 号输入本人的 second after barrier0  并随之退出,之后 1 号也接着开始输入本人的后果。

这里有一点值得注意,不同过程的 barrier  实际上是相互对应的,必须所有过程都执行一次 barrier,才会从新放行失常后退。
对于第二段代码:

def ddp_test_v1(local_rank, word_size):
    # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
    dist.init_process_group(backend="nccl", world_size=word_size, rank=local_rank)

    if local_rank != 0:
        print("1 before barrier{}\n".format(local_rank))
        start = time.time()
        time.sleep(5)
        dist.barrier()
        print(time.time() - start)
        print("1 after barrier{}\n".format(local_rank))
        dist.barrier()
        print("1 after barrier{}\n".format(local_rank))
    else:
        print("0 before barrier{}\n".format(local_rank))
        start = time.time()
        dist.barrier()
        print(time.time() - start)
        print("0 after barrier{}\n".format(local_rank))
        print("0 after barrier{}\n".format(local_rank))
        dist.barrier()
        print("0 after barrier{}\n".format(local_rank))

    print("{} exit".format(local_rank))

则是有输入:

1 before barrier1
0 before barrier0


5.002117395401001
5.0021262168884281 after barrier1


0 after barrier0

0 after barrier0

0 after barrier0

0 exit
1 after barrier1

1 exit

Process finished with exit code 0

能够看到一个重要的点,就是这两处 print(time.time() - start)  的输入是根本一样的,不论后面延时多少,barrier  前面的工夫都是依照最长达到并执行 barrier  的间隔时间来的。这个更体现了不同过程 barrier  之间的相互限度关系。而 0 达到本人的第二个 barrier  之后,会使得 1 号再次运行。然而此时 0 是先完结的。
另外,能够验证,如果某个编号对应的代码中的两个 barrier  之中的一个,那么另一个就会陷入有限期待之中。
例如:


def ddp_test_v1(local_rank, word_size):
    # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
    dist.init_process_group(backend="nccl", world_size=word_size, rank=local_rank)

    if local_rank != 0:
        print("1 before barrier{}\n".format(local_rank))
        start = time.time()
        time.sleep(5)
        dist.barrier()
        print(time.time() - start)
        print("1 after barrier{}\n".format(local_rank))
        # dist.barrier()
        print("1 after barrier{}\n".format(local_rank))
    else:
        print("0 before barrier{}\n".format(local_rank))
        start = time.time()
        time.sleep(3)
        dist.barrier()
        print(time.time() - start)
        print("0 after barrier{}\n".format(local_rank))
        print("0 after barrier{}\n".format(local_rank))
        dist.barrier()
        print("0 after barrier{}\n".format(local_rank))

    print("{} exit".format(local_rank))

输入:

0 before barrier0
1 before barrier1


5.002458572387695
1 after barrier1

1 after barrier1

1 exit
5.002473831176758
0 after barrier0

0 after barrier0

Traceback (most recent call last):
  File "/home/lart/Coding/SODBetterProj/tools/dist_experiment_test.py", line 67, in <module>
    main()
  File "/home/lart/Coding/SODBetterProj/tools/dist_experiment_test.py", line 63, in main
    mp.spawn(ddp_test_v1, args=(world_size,), nprocs=world_size, join=True)
  File "/home/lart/miniconda3/envs/pt17/lib/python3.8/site-packages/torch/multiprocessing/spawn.py", line 199, in spawn
    return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
  File "/home/lart/miniconda3/envs/pt17/lib/python3.8/site-packages/torch/multiprocessing/spawn.py", line 157, in start_processes
    while not context.join():
  File "/home/lart/miniconda3/envs/pt17/lib/python3.8/site-packages/torch/multiprocessing/spawn.py", line 75, in join
    ready = multiprocessing.connection.wait(
  File "/home/lart/miniconda3/envs/pt17/lib/python3.8/multiprocessing/connection.py", line 931, in wait
    ready = selector.select(timeout)
  File "/home/lart/miniconda3/envs/pt17/lib/python3.8/selectors.py", line 415, in select
    fd_event_list = self._selector.poll(timeout)
KeyboardInterrupt

Process finished with exit code 137 (interrupted by signal 9: SIGKILL)

会在第二个 barrier  处有限期待上来。
这一特点在这个答复中也被提到了:

when a process encounters a barrier it will block the position of the barrier is not important (not all processes have to enter the same if-statement, for instance) a process is blocked by a barrier until all processes have encountered a barrier, upon which the barrier is lifted for all processes

https://stackoverflow.com/a/59766443

重要的参考资料

  • 原创[PyTorch] DDP 系列

    • 第一篇:https://zhuanlan.zhihu.com/p/178402798
    • 第二篇:https://zhuanlan.zhihu.com/p/187610959
    • 第三篇:https://zhuanlan.zhihu.com/p/250471767
  • PyTorch 单机多 GPU 训练方法与原理整顿

    • https://github.com/jia-zhuang/pytorch-multi-gpu-training
  • Pytorch 分布式训练(图示十分敌对)

    • https://zhuanlan.zhihu.com/p/76638962
  • Distribution is all you need(丰盛全面)

    • https://github.com/tczhangzhi/pytorch-distributed
退出移动版