关于人工智能:NCCL源码解析①初始化及ncclUniqueId的产生

30次阅读

共计 12641 个字符,预计需要花费 32 分钟才能阅读完成。

作者|KIDGINBROOK
更新|潘丽晨

NCCL 是英伟达开源的 GPU 通信库,反对汇合通信和点对点通信。

看下官网给的一个 demo:

#include <stdio.h>
#include "cuda_runtime.h"
#include "nccl.h"
#include "mpi.h"
#include <unistd.h>
#include <stdint.h>
 
 
#define MPICHECK(cmd) do {                          \
  int e = cmd;                                      \
  if(e != MPI_SUCCESS) {                          \
    printf("Failed: MPI error %s:%d'%d'\n",        \
        __FILE__,__LINE__, e);   \
    exit(EXIT_FAILURE);                             \
  }                                                 \
} while(0)
 
 
#define CUDACHECK(cmd) do {                         \
  cudaError_t e = cmd;                              \
  if(e != cudaSuccess) {                          \
    printf("Failed: Cuda error %s:%d'%s'\n",             \
        __FILE__,__LINE__,cudaGetErrorString(e));   \
    exit(EXIT_FAILURE);                             \
  }                                                 \
} while(0)
 
 
#define NCCLCHECK(cmd) do {                         \
  ncclResult_t r = cmd;                             \
  if (r!= ncclSuccess) {                            \
    printf("Failed, NCCL error %s:%d'%s'\n",             \
        __FILE__,__LINE__,ncclGetErrorString(r));   \
    exit(EXIT_FAILURE);                             \
  }                                                 \
} while(0)
 
 
static uint64_t getHostHash(const char* string) {
  // Based on DJB2a, result = result * 33 ^ char
  uint64_t result = 5381;
  for (int c = 0; string != '\0'; c++){result = ((result << 5) + result) ^ string;
  }
  return result;
}
 
 
static void getHostName(char* hostname, int maxlen) {gethostname(hostname, maxlen);
  for (int i=0; i< maxlen; i++) {if (hostname[i] == '.') {hostname[i] = '\0';
        return;
    }
  }
}
 
 
int main(int argc, char* argv[])
{
  int size = 32*1024*1024;
 
  int myRank, nRanks, localRank = 0;
 
 
  //initializing MPI
  MPICHECK(MPI_Init(&argc, &argv));
  MPICHECK(MPI_Comm_rank(MPI_COMM_WORLD, &myRank));
  MPICHECK(MPI_Comm_size(MPI_COMM_WORLD, &nRanks));
 
 
  //calculating localRank which is used in selecting a GPU
  uint64_t hostHashs[nRanks];
  char hostname[1024];
  getHostName(hostname, 1024);
  hostHashs[myRank] = getHostHash(hostname);
  MPICHECK(MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD));
  for (int p=0; p<nRanks; p++) {if (p == myRank) break;
     if (hostHashs[p] == hostHashs[myRank]) localRank++;
  }
 
 
  //each process is using two GPUs
  int nDev = 2;
 
 
  float** sendbuff = (float**)malloc(nDev * sizeof(float*));
  float** recvbuff = (float**)malloc(nDev * sizeof(float*));
  cudaStream_t* s = (cudaStream_t*)malloc(sizeof(cudaStream_t)*nDev);
 
 
  //picking GPUs based on localRank
  for (int i = 0; i < nDev; ++i) {CUDACHECK(cudaSetDevice(localRank*nDev + i));
    CUDACHECK(cudaMalloc(sendbuff + i, size * sizeof(float)));
    CUDACHECK(cudaMalloc(recvbuff + i, size * sizeof(float)));
    CUDACHECK(cudaMemset(sendbuff[i], 1, size * sizeof(float)));
    CUDACHECK(cudaMemset(recvbuff[i], 0, size * sizeof(float)));
    CUDACHECK(cudaStreamCreate(s+i));
  }
 
 
  ncclUniqueId id;
  ncclComm_t comms[nDev];
 
 
  //generating NCCL unique ID at one process and broadcasting it to all
  if (myRank == 0) ncclGetUniqueId(&id);
  MPICHECK(MPI_Bcast((void *)&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD));
 
 
  //initializing NCCL, group API is required around ncclCommInitRank as it is
  //called across multiple GPUs in each thread/process
  NCCLCHECK(ncclGroupStart());
  for (int i=0; i<nDev; i++) {CUDACHECK(cudaSetDevice(localRank*nDev + i));
     NCCLCHECK(ncclCommInitRank(comms+i, nRanks*nDev, id, myRank*nDev + i));
  }
  NCCLCHECK(ncclGroupEnd());
 
 
  //calling NCCL communication API. Group API is required when using
  //multiple devices per thread/process
  NCCLCHECK(ncclGroupStart());
  for (int i=0; i<nDev; i++)
     NCCLCHECK(ncclAllReduce((const void*)sendbuff[i], (void*)recvbuff[i], size, ncclFloat, ncclSum,
           comms[i], s[i]));
  NCCLCHECK(ncclGroupEnd());
 
 
  //synchronizing on CUDA stream to complete NCCL communication
  for (int i=0; i<nDev; i++)
      CUDACHECK(cudaStreamSynchronize(s[i]));
 
 
  //freeing device memory
  for (int i=0; i<nDev; i++) {CUDACHECK(cudaFree(sendbuff[i]));
     CUDACHECK(cudaFree(recvbuff[i]));
  }
 
 
  //finalizing NCCL
  for (int i=0; i<nDev; i++) {ncclCommDestroy(comms[i]);
  }
 
 
  //finalizing MPI
  MPICHECK(MPI_Finalize());
 
 
  printf("[MPI Rank %d] Success \n", myRank);
  return 0;
}

在上边的示例中,rank0 会执行 ncclGetUniqueId 获取 Id,而后通过 mpi 播送给其余 rank,接下来看下 UniqueId 是怎么产生的。

ncclResult_t ncclGetUniqueId(ncclUniqueId* out) {NCCLCHECK(ncclInit());
  NCCLCHECK(PtrCheck(out, "GetUniqueId", "out"));
  return bootstrapGetUniqueId(out);
}

而后看下 ncclInit。

首先执行 initEnv,设置环境变量。

而后执行 initNet,用来初始化 nccl 所须要的网络,包含两个,一个是 bootstrap 网络,另外一个是数据通信网络,bootstrap 网络次要用于初始化时替换一些简略的信息,比方每个机器的 ip 端口,因为数据量很小,而且次要是在初始化阶段执行一次,因而 bootstrap 应用的是 tcp;而通信网络是用于理论数据的传输,因而会优先应用 rdma(反对 gdr 的话会优先应用 gdr)。

ncclResult_t initNet() {
  // Always initialize bootstrap network
  NCCLCHECK(bootstrapNetInit());
 
  NCCLCHECK(initNetPlugin(&ncclNet, &ncclCollNet));
  if (ncclNet != NULL) return ncclSuccess;
  if (initNet(&ncclNetIb) == ncclSuccess) {ncclNet = &ncclNetIb;} else {NCCLCHECK(initNet(&ncclNetSocket));
    ncclNet = &ncclNetSocket;
  }
  return ncclSuccess;
}

bootstrapNetInit 就是 bootstrap 网络的初始化,次要就是通过 findInterfaces 遍历机器上所有的网卡信息,通过 prefixList 匹配抉择应用哪些网卡,将可用网卡的信息保留下来,将 ifa_name 保留到全局的 bootstrapNetIfNames,ip 地址保留到全局 bootstrapNetIfAddrs,默认除了 docker 和 lo 其余的网卡都能够应用。

例如在测试机器上有三张网卡,别离是 xgbe0、xgbe1、xgbe2,那么就会把这三个 ifaname 和对应的 ip 地址保留下来,另外 nccl 提供了环境变量 NCCL_SOCKET_IFNAME 能够用来指定想用的网卡名,例如通过 export NCCL_SOCKET_IFNAME=xgbe0 来指定应用 xgbe0,其实就是通过 prefixList 来匹配做到的。

static int findInterfaces(const char* prefixList, char* names, union socketAddress *addrs, int sock_family, int maxIfNameSize, int maxIfs) {struct netIf userIfs[MAX_IFS];
  bool searchNot = prefixList && prefixList[0] == '^';
  if (searchNot) prefixList++;
  bool searchExact = prefixList && prefixList[0] == '=';
  if (searchExact) prefixList++;
  int nUserIfs = parseStringList(prefixList, userIfs, MAX_IFS);
 
  int found = 0;
  struct ifaddrs *interfaces, *interface;
  getifaddrs(&interfaces);
  for (interface = interfaces; interface && found < maxIfs; interface = interface->ifa_next) {if (interface->ifa_addr == NULL) continue;
 
    int family = interface->ifa_addr->sa_family;
    if (family != AF_INET && family != AF_INET6)
      continue;
 
    if (sock_family != -1 && family != sock_family)
      continue;
 
    if (family == AF_INET6) {struct sockaddr_in6* sa = (struct sockaddr_in6*)(interface->ifa_addr);
      if (IN6_IS_ADDR_LOOPBACK(&sa->sin6_addr)) continue;
    }
 
    if (!(matchIfList(interface->ifa_name, -1, userIfs, nUserIfs, searchExact) ^ searchNot)) {continue;}
    bool duplicate = false;
    for (int i = 0; i < found; i++) {if (strcmp(interface->ifa_name, names+i*maxIfNameSize) == 0) {duplicate = true; break;}
    }
 
    if (!duplicate) {strncpy(names+found*maxIfNameSize, interface->ifa_name, maxIfNameSize);
      int salen = (family == AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6);
      memcpy(addrs+found, interface->ifa_addr, salen);
      found++;
    }
  }
 
  freeifaddrs(interfaces);
  return found;
}

开始初始化通信网络。

ncclNet_t 构造体是一系列的函数指针,比方初始化,发送,接管等;socket,IB 等通信形式都实现了本人的 ncclNet_t,如 ncclNetSocket,ncclNetIb,初始化通信网络的过程就是顺次看哪个通信模式可用,而后赋值给全局的 ncclNet。

首先执行 initNetPlugin,查看是否有 libnccl-net.so,测试环境没有这个 so,所以间接返回。

而后尝试应用 IB 网络:

首先执行 ncclNetIb 的 init 函数,就是 ncclIbInit。

ncclResult_t ncclIbInit(ncclDebugLogger_t logFunction) {
  static int shownIbHcaEnv = 0;
  if(wrap_ibv_symbols() != ncclSuccess) {return ncclInternalError;}
  if (ncclParamIbDisable()) return ncclInternalError;
 
  if (ncclNIbDevs == -1) {pthread_mutex_lock(&ncclIbLock);
    wrap_ibv_fork_init();
    if (ncclNIbDevs == -1) {
      ncclNIbDevs = 0;
      if (findInterfaces(ncclIbIfName, &ncclIbIfAddr, MAX_IF_NAME_SIZE, 1) != 1) {WARN("NET/IB : No IP interface found.");
        return ncclInternalError;
      }
 
      // Detect IB cards
      int nIbDevs;
      struct ibv_device** devices;
 
      // Check if user defined which IB device:port to use
      char* userIbEnv = getenv("NCCL_IB_HCA");
      if (userIbEnv != NULL && shownIbHcaEnv++ == 0) INFO(NCCL_NET|NCCL_ENV, "NCCL_IB_HCA set to %s", userIbEnv);
      struct netIf userIfs[MAX_IB_DEVS];
      bool searchNot = userIbEnv && userIbEnv[0] == '^';
      if (searchNot) userIbEnv++;
      bool searchExact = userIbEnv && userIbEnv[0] == '=';
      if (searchExact) userIbEnv++;
      int nUserIfs = parseStringList(userIbEnv, userIfs, MAX_IB_DEVS);
 
      if (ncclSuccess != wrap_ibv_get_device_list(&devices, &nIbDevs)) return ncclInternalError;
 
      for (int d=0; d<nIbDevs && ncclNIbDevs<MAX_IB_DEVS; d++) {
        struct ibv_context * context;
        if (ncclSuccess != wrap_ibv_open_device(&context, devices[d]) || context == NULL) {WARN("NET/IB : Unable to open device %s", devices[d]->name);
          continue;
        }
        int nPorts = 0;
        struct ibv_device_attr devAttr;
        memset(&devAttr, 0, sizeof(devAttr));
        if (ncclSuccess != wrap_ibv_query_device(context, &devAttr)) {WARN("NET/IB : Unable to query device %s", devices[d]->name);
          if (ncclSuccess != wrap_ibv_close_device(context)) {return ncclInternalError;}
          continue;
        }
        for (int port = 1; port <= devAttr.phys_port_cnt; port++) {
          struct ibv_port_attr portAttr;
          if (ncclSuccess != wrap_ibv_query_port(context, port, &portAttr)) {WARN("NET/IB : Unable to query port %d", port);
            continue;
          }
          if (portAttr.state != IBV_PORT_ACTIVE) continue;
          if (portAttr.link_layer != IBV_LINK_LAYER_INFINIBAND
              && portAttr.link_layer != IBV_LINK_LAYER_ETHERNET) continue;
 
          // check against user specified HCAs/ports
          if (! (matchIfList(devices[d]->name, port, userIfs, nUserIfs, searchExact) ^ searchNot)) {continue;}
          TRACE(NCCL_INIT|NCCL_NET,"NET/IB: [%d] %s:%d/%s", d, devices[d]->name, port,
              portAttr.link_layer == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE");
          ncclIbDevs[ncclNIbDevs].device = d;
          ncclIbDevs[ncclNIbDevs].guid = devAttr.sys_image_guid;
          ncclIbDevs[ncclNIbDevs].port = port;
          ncclIbDevs[ncclNIbDevs].link = portAttr.link_layer;
          ncclIbDevs[ncclNIbDevs].speed = ncclIbSpeed(portAttr.active_speed) * ncclIbWidth(portAttr.active_width);
          ncclIbDevs[ncclNIbDevs].context = context;
          strncpy(ncclIbDevs[ncclNIbDevs].devName, devices[d]->name, MAXNAMESIZE);
          NCCLCHECK(ncclIbGetPciPath(ncclIbDevs[ncclNIbDevs].devName, &ncclIbDevs[ncclNIbDevs].pciPath, &ncclIbDevs[ncclNIbDevs].realPort));
          ncclIbDevs[ncclNIbDevs].maxQp = devAttr.max_qp;
          ncclNIbDevs++;
          nPorts++;
          pthread_create(&ncclIbAsyncThread, NULL, ncclIbAsyncThreadMain, context);
        }
        if (nPorts == 0 && ncclSuccess != wrap_ibv_close_device(context)) {return ncclInternalError;}
      }
      if (nIbDevs && (ncclSuccess != wrap_ibv_free_device_list(devices))) {return ncclInternalError;};
    }
    if (ncclNIbDevs == 0) {INFO(NCCL_INIT|NCCL_NET, "NET/IB : No device found.");
    } else {char line[1024];
      line[0] = '\0';
      for (int d=0; d<ncclNIbDevs; d++) {snprintf(line+strlen(line), 1023-strlen(line), "[%d]%s:%d/%s", d, ncclIbDevs[d].devName,
            ncclIbDevs[d].port, ncclIbDevs[d].link == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE");
      }
      line[1023] = '\0';
      char addrline[1024];
      INFO(NCCL_INIT|NCCL_NET, "NET/IB : Using%s ; OOB %s:%s", line, ncclIbIfName, socketToString(&ncclIbIfAddr.sa, addrline));
    }
    pthread_mutex_unlock(&ncclIbLock);
  }
  return ncclSuccess;
}

首先第三行通过 wrap_ibv_symbols 加载动静库 libibverbs.so,而后获取动静库的各个函数。

而后通过 wrap_ibv_fork_init 防止 fork 引起 rdma 网卡读写出错。

前面会讲到 ib 网络也会用到 socket 进行带外网络的传输,所以这里也通过 findInterfaces 获取一个可用的网卡保留到 ncclIbIfAddr。

通过 ibv_get_device_list 获取所有 rdma 设施到 devices 中,遍历 devices 的每个 device,因为每个 HCA 可能有多个物理 port,所以对每个 device 遍历每一个物理 port,获取每个 port 的信息。

而后将相干信息保留到全局的 ncclIbDevs 中,比方是哪个 device 的哪个 port,应用的是 IB 还是 ROCE,device 的 pci 门路,maxqp,device 的 name 等,留神这里也有相似 bootstrap 网络 NCCL_SOCKET_IFNAME 的环境变量,叫 NCCL_IB_HCA,能够指定应用哪个 IB HCA。

到这里整个初始化的过程就实现了,一句话总结就是,获取了以后机器上所有可用的 IB 网卡和一般以太网卡之后保留下来。

而后开始生成 UniqueId。

ncclResult_t bootstrapCreateRoot(ncclUniqueId* id, bool idFromEnv) {ncclNetHandle_t* netHandle = (ncclNetHandle_t*) id;
  void* listenComm;
  NCCLCHECK(bootstrapNetListen(idFromEnv ? dontCareIf : 0, netHandle, &listenComm));
  pthread_t thread;
  pthread_create(&thread, NULL, bootstrapRoot, listenComm);
  return ncclSuccess;
}

ncclNetHandle_t 也是一个字符数组,而后执行 bootstrapNetListen。

static ncclResult_t bootstrapNetListen(int dev, ncclNetHandle_t* netHandle, void** listenComm) {union socketAddress* connectAddr = (union socketAddress*) netHandle;
  static_assert(sizeof(union socketAddress) < NCCL_NET_HANDLE_MAXSIZE, "union socketAddress size is too large");
  // if dev >= 0, listen based on dev
  if (dev >= 0) {NCCLCHECK(bootstrapNetGetSocketAddr(dev, connectAddr));
  } else if (dev == findSubnetIf) {...} // Otherwise, handle stores a local address
  struct bootstrapNetComm* comm;
  NCCLCHECK(bootstrapNetNewComm(&comm));
  NCCLCHECK(createListenSocket(&comm->fd, connectAddr));
  *listenComm = comm;
  return ncclSuccess;
}

顺次看下这三个函数,通过 bootstrapNetGetSocketAddr 获取一个可用的 ip 地址。

static ncclResult_t bootstrapNetGetSocketAddr(int dev, union socketAddress* addr) {if (dev >= bootstrapNetIfs) return ncclInternalError;
  memcpy(addr, bootstrapNetIfAddrs+dev, sizeof(*addr));
  return ncclSuccess;
}

此时 dev 是 0,bootstrapNetIfs 是初始化 bootstrap 网络的时候一共找到了几个可用的网卡,这里就是获取了第 0 个可用的 ip 地址。

而后通过 bootstrapNetNewComm 创立 bootstrapNetComm,bootstrapNetComm 其实就是 fd,bootstrapNetNewComm 其实就是 new 了一个 bootstrapNetComm。

struct bootstrapNetComm {int fd;};

通过 createListenSocket 启动 socker server。

static ncclResult_t createListenSocket(int *fd, union socketAddress *localAddr) {
  /* IPv4/IPv6 support */
  int family = localAddr->sa.sa_family;
  int salen = (family == AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6);
 
  /* Create socket and bind it to a port */
  int sockfd = socket(family, SOCK_STREAM, 0);
  if (sockfd == -1) {WARN("Net : Socket creation failed : %s", strerror(errno));
    return ncclSystemError;
  }
 
  if (socketToPort(&localAddr->sa)) {
    // Port is forced by env. Make sure we get the port.
    int opt = 1;
#if defined(SO_REUSEPORT)
    SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)), "setsockopt");
#else
    SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)), "setsockopt");
#endif
  }
 
  // localAddr port should be 0 (Any port)
  SYSCHECK(bind(sockfd, &localAddr->sa, salen), "bind");
 
  /* Get the assigned Port */
  socklen_t size = salen;
  SYSCHECK(getsockname(sockfd, &localAddr->sa, &size), "getsockname");
 
#ifdef ENABLE_TRACE
  char line[1024];
  TRACE(NCCL_INIT|NCCL_NET,"Listening on socket %s", socketToString(&localAddr->sa, line));
#endif
 
  /* Put the socket in listen mode
   * NB: The backlog will be silently truncated to the value in /proc/sys/net/core/somaxconn
   */
  SYSCHECK(listen(sockfd, 16384), "listen");
  *fd = sockfd;
  return ncclSuccess;
}

创立监听 fd,ip 由 localaddr 指定,初始端口为 0,bind 时随机找一个可用端口,并通过 getsockname(sockfd, &localAddr->sa, &size) 将 ip 端口写回到 localaddr,这里 localaddr 就是 UniqueId。

到这里 UniqueId 也就产生了,其实就是以后机器的 ip 和 port。

(本文经受权后由 OneFlow 公布。原文:https://blog.csdn.net/KIDGIN7439/article/details/126712106?sp…)

欢送 Star、试用 OneFlow 最新版本:https://github.com/Oneflow-Inc/oneflow/

正文完
 0