从调用NCCL到深入NCCL源码

发布于:2024-10-13 ⋅ 阅读:(15) ⋅ 点赞:(0)

本小白目前研究GPU多卡互连的方案,主要参考NCCL和RCCL进行学习,如有错误,请及时指正!

内容还在整理中,近期不断更新!!

直接进入主题首先例程为:单线程/单进程 调用 单个GPU设备代码:

#include <stdio.h>
#include "cuda_runtime.h"
#include "nccl.h"
#include "mpi.h"
#include <unistd.h>
#include <stdint.h>
#include <stdlib.h>


#define MPICHECK(cmd) do {                          \
  int e = cmd;                                      \
  if( e != MPI_SUCCESS ) {                          \
    printf("Failed: MPI error %s:%d '%d'\n",        \
        __FILE__,__LINE__, e);   \
    exit(EXIT_FAILURE);                             \
  }                                                 \
} while(0)


#define CUDACHECK(cmd) do {                         \
  cudaError_t e = cmd;                              \
  if( e != cudaSuccess ) {                          \
    printf("Failed: Cuda error %s:%d '%s'\n",             \
        __FILE__,__LINE__,cudaGetErrorString(e));   \
    exit(EXIT_FAILURE);                             \
  }                                                 \
} while(0)


#define NCCLCHECK(cmd) do {                         \
  ncclResult_t r = cmd;                             \
  if (r!= ncclSuccess) {                            \
    printf("Failed, NCCL error %s:%d '%s'\n",             \
        __FILE__,__LINE__,ncclGetErrorString(r));   \
    exit(EXIT_FAILURE);                             \
  }                                                 \
} while(0)


static uint64_t getHostHash(const char* string) {
  // Based on DJB2a, result = result * 33 ^ char
  uint64_t result = 5381;
  for (int c = 0; string[c] != '\0'; c++){
    result = ((result << 5) + result) ^ string[c];
  }
  return result;
}


static void getHostName(char* hostname, int maxlen) {
  gethostname(hostname, maxlen);
  for (int i=0; i< maxlen; i++) {
    if (hostname[i] == '.') {
        hostname[i] = '\0';
        return;
    }
  }
}


int main(int argc, char* argv[])
{
  int size = 32*1024*1024;


  int myRank, nRanks, localRank = 0;


  //initializing MPI
  MPICHECK(MPI_Init(&argc, &argv));
  MPICHECK(MPI_Comm_rank(MPI_COMM_WORLD, &myRank));
  MPICHECK(MPI_Comm_size(MPI_COMM_WORLD, &nRanks));


  //calculating localRank based on hostname which is used in selecting a GPU
  uint64_t hostHashs[nRanks];
  char hostname[1024];
  getHostName(hostname, 1024);
  hostHashs[myRank] = getHostHash(hostname);
  MPICHECK(MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD));
  for (int p=0; p<nRanks; p++) {
     if (p == myRank) break;
     if (hostHashs[p] == hostHashs[myRank]) localRank++;
  }


  ncclUniqueId id;
  ncclComm_t comm;
  float *sendbuff, *recvbuff;
  cudaStream_t s;


  //get NCCL unique ID at rank 0 and broadcast it to all others
  if (myRank == 0) ncclGetUniqueId(&id);
  MPICHECK(MPI_Bcast((void *)&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD));


  //picking a GPU based on localRank, allocate device buffers
  CUDACHECK(cudaSetDevice(localRank));
  CUDACHECK(cudaMalloc(&sendbuff, size * sizeof(float)));
  CUDACHECK(cudaMalloc(&recvbuff, size * sizeof(float)));
  CUDACHECK(cudaStreamCreate(&s));


  //initializing NCCL
  NCCLCHECK(ncclCommInitRank(&comm, nRanks, id, myRank));


  //communicating using NCCL
  NCCLCHECK(ncclAllReduce((const void*)sendbuff, (void*)recvbuff, size, ncclFloat, ncclSum,
        comm, s));


  //completing NCCL operation by synchronizing on the CUDA stream
  CUDACHECK(cudaStreamSynchronize(s));


  //free device buffers
  CUDACHECK(cudaFree(sendbuff));
  CUDACHECK(cudaFree(recvbuff));


  //finalizing NCCL
  ncclCommDestroy(comm);


  //finalizing MPI
  MPICHECK(MPI_Finalize());


  printf("[MPI Rank %d] Success \n", myRank);
  return 0;
}

其中关于NCCL的API的主要是以下这三个:

ncclGetUniqueId(&id)
ncclCommInitRank(&comm, nRanks, id, myRank)
ncclAllReduce((const void*)sendbuff, (void*)recvbuff, size, ncclFloat, ncclSum,comm, s)

首先明白前两个API,其中ncclGetUniqueId的作用,再多卡互连的环境中,所有参与的GPU共用此id,用来标识这个(一个)通信域。

来看一下这个id是怎么获得的, if (myRank == 0) ncclGetUniqueId(&id);

ncclResult_t ncclGetUniqueId(ncclUniqueId* out) {  
 
  // 1 NCCL库初始化 
  NCCLCHECK(ncclInit());  
 
  // 检查传入的out指针是否为非空。
  NCCLCHECK(PtrCheck(out, "GetUniqueId", "out"));   
 
  //2 调用bootstrapGetUniqueId函数来获取一个唯一的ID,并将这个ID存储在传入的out指针所指向的内存位置。  
  ncclResult_t res = bootstrapGetUniqueId((struct ncclBootstrapHandle*)out);  
 
  // TRACE_CALL是一个用于日志记录或跟踪的宏。
  TRACE_CALL("ncclGetUniqueId(0x%llx)", (unsigned long long)hashUniqueId(*out));  
 
  // 返回bootstrapGetUniqueId函数的结果。表示操作是否成功 
 
  return res;  
}

一、ncclInit()核心逻辑:源码位置:nccl-master\src\init.cc
1、initEnv(); //初始化环境设置

2、initGdrCopy() //初始化 GPU Direct RDMA (GDR)

3、bootstrapNetInit() //初始化引导网络

4、ncclNetPluginInit() //NCCL网络插件初始化,抽象和封装底层网络细节,方便NCCL灵活应用

备注:“A、setEnvFile(confFilePath);//根据配置文件初始化设置” 的首行缩进表示initEnv()调用了setEnvFile(confFilePath)函数。缩进表示调用,或者该代码片段中使用。

备注:bootstrap引导网络主要在初始化时完成一些小数据量的信息交换,例如ip地址。

二、bootstrapGetUniqueId()核心逻辑:源码位置nccl-master\src\bootstrap.cc

1、生成一个随机数,填充ncclUniqueId的前半部分。

2、如果环境变量中有NCCL_COMM_ID的值,将环境变量解析为网络地址,赋值给ncclUniqueId的后半部分。

3、如果环境变量中没有NCCL_COMM_ID的值,将bootstrap网络地址,赋值给ncclUniqueId的后半部分。

注意,标识通信组的唯一ID ncclUniqueId本质上由两部分组成,前半部分是随机数,后半部分是网络地址。(2/3这部分不确定)

总结:

1.nccl网络初始化:一、bootstrap网络,二、数据通信网络,bootstrap网络主要用于初始化时交换一些简单的信息,比如每个机器的ip和端口,由于数据量很小,而且主要是在初始化阶段执行一次,因此bootstrap使用的是tcp;而通信网络是用于实际数据的传输,因此会优先使用rdma(支持gdr的话会优先使用gdr)

2.生成UniqueID,主进程通常为Rank0 调用ncclGetUniqueId生成一个UniqueID,并且共享给所有参与通信的进程。

上面讲了网络初始化和UniqueID生成,以下总结以下源码整体的内容:

二、Bootstrap网络建立

核心逻辑:

1、函数输入ncclUniqueId,从而获得ncclUniqueId中包含的rank0的网络地址,每个rank上都有rank0的网络地址;

2、所有rank根据rank0的网络地址,建立socket并向rank0发送自己的网络地址,rank0上现在就有所有rank的网络地址了;

3、rank0告诉每个rank它的下一个节点网络地址,完成环形网络建立;

4、AllGather全局收集所有节点的网络地址;

源码位置:nccl-master\src\bootstrap.cc

///
//1、函数的输入handle就是UniqueID,被强制转化欸ncclBootstrapHandle,包含rank0的网络地址
ncclResult_t bootstrapInit(struct ncclBootstrapHandle* handle, struct ncclComm* comm) {
 
  // 获取当前节点的排名
  int rank = comm->rank;
  // 获取参与节点的数量
  int nranks = comm->nRanks;
  
  // 分配内存并初始化bootstrapState结构体,用于管理启动阶段的状态
  struct bootstrapState* state;
  NCCLCHECK(ncclCalloc(&state, 1));
  state->rank = rank; // 设置当前节点的排名
  state->nranks = nranks; // 设置参与节点的数量
  state->abortFlag = comm->abortFlag; // 设置是否应中止通信的标志
  
  // 将bootstrapState指针赋予comm结构体
  comm->bootstrap = state;
  // 设置魔术数字,用于校验
  comm->magic = state->magic = handle->magic;
 
  // 记录日志,显示当前节点的排名和参与节点的数量
  TRACE(NCCL_INIT, "rank %d nranks %d", rank, nranks);
 
  // 为当前节点准备发送给其他节点的信息
  struct extInfo info = { 0 };
  info.rank = rank; // 设置当前节点的排名
  info.nranks = nranks; // 设置参与节点的数量
 
  // 创建一个监听套接字,允许其他节点联系当前节点
  NCCLCHECK(ncclSocketInit(&state->listenSock, &bootstrapNetIfAddr, comm->magic, ncclSocketTypeBootstrap, comm->abortFlag)); // 初始化监听套接字
  NCCLCHECK(ncclSocketListen(&state->listenSock)); // 设置监听状态
  NCCLCHECK(ncclSocketGetAddr(&state->listenSock, &info.extAddressListen)); // 获取监听套接字的地址
 
  // 创建另一个监听套接字,允许根节点联系当前节点
  NCCLCHECK(ncclSocketInit(&listenSockRoot, &bootstrapNetIfAddr, comm->magic, ncclSocketTypeBootstrap, comm->abortFlag)); // 初始化监听套接字
  NCCLCHECK(ncclSocketListen(&listenSockRoot)); // 设置监听状态
  NCCLCHECK(ncclSocketGetAddr(&listenSockRoot, &info.extAddressListenRoot)); // 获取监听套接字的地址
 
  // 如果参与节点的数量大于128,则延迟连接到根节点,以减轻根节点的负载
  if (nranks > 128) {
    long msec = rank; // 计算延迟时间
    struct timespec tv; // 定义时间戳结构体
    tv.tv_sec = msec / 1000; // 秒部分
    tv.tv_nsec = 1000000 * (msec % 1000); // 毫秒部分
    TRACE(NCCL_INIT, "rank %d delaying connection to root by %ld msec", rank, msec); // 记录日志,显示延迟时间
    (void) nanosleep(&tv, NULL); // 延迟指定时间
  }
 
  //
  2、所有根据rank0的网络地址,建立socket并向rank0发送自己的网络地址;
  // 发送当前节点的信息给根节点
  NCCLCHECK(ncclSocketInit(&sock, &handle->addr, comm->magic, ncclSocketTypeBootstrap, comm->abortFlag)); // 初始化套接字
  NCCLCHECK(ncclSocketConnect(&sock)); // 连接到根节点
  NCCLCHECK(bootstrapNetSend(&sock, &info, sizeof(info))); // 发送信息
  NCCLCHECK(ncclSocketClose(&sock)); // 关闭套接字
  
  ///
  //3、rank0告诉每个rank它的下一个节点网络地址,完成环形网络建立;
  // 从根节点接收下一个节点在启动环中的信息
  NCCLCHECK(ncclSocketInit(&sock)); // 初始化套接字
  NCCLCHECK(ncclSocketAccept(&sock, &listenSockRoot)); // 接受来自根节点的连接请求
  NCCLCHECK(bootstrapNetRecv(&sock, &nextAddr, sizeof(union ncclSocketAddress))); // 接收信息
  NCCLCHECK(ncclSocketClose(&sock)); // 关闭套接字
  NCCLCHECK(ncclSocketClose(&listenSockRoot)); // 关闭根节点的监听套接字
 
  // 初始化与下一个节点的发送套接字
  NCCLCHECK(ncclSocketInit(&state->ringSendSocket, &nextAddr, comm->magic, ncclSocketTypeBootstrap, comm->abortFlag)); // 初始化套接字
  NCCLCHECK(ncclSocketConnect(&state->ringSendSocket)); // 连接到下一个节点
 
  // 接受来自前一个节点的环连接请求
  NCCLCHECK(ncclSocketInit(&state->ringRecvSocket)); // 初始化套接字
  NCCLCHECK(ncclSocketAccept(&state->ringRecvSocket, &state->listenSock)); // 接受连接请求
 
  ///
  4、AllGather全局收集所有节点的网络地址;
  // 全局收集所有节点的监听器地址
  NCCLCHECK(ncclCalloc(&state->peerCommAddresses, nranks)); // 分配内存
  NCCLCHECK(ncclSocketGetAddr(&state->listenSock, state->peerCommAddresses+rank)); // 获取当前节点的监听器地址
  NCCLCHECK(bootstrapAllGather(state, state->peerCommAddresses, sizeof(union ncclSocketAddress))); // 全局收集监听器地址
 
  // 创建服务代理套接字
  NCCLCHECK(ncclCalloc(&state->peerProxyAddresses, nranks)); // 分配内存
  NCCLCHECK(ncclCalloc(&state->peerProxyAddressesUDS, nranks)); // 分配内存
 
  // 初始化服务代理
  NCCLCHECK(ncclCalloc(&proxySocket, 1)); // 分配内存
  NCCLCHECK(ncclSocketInit(proxySocket, &bootstrapNetIfAddr, comm->magic, ncclSocketTypeProxy, comm->abortFlag)); // 初始化套接字
  NCCLCHECK(ncclSocketListen(proxySocket)); // 设置监听状态
  NCCLCHECK(ncclSocketGetAddr(proxySocket, state->peerProxyAddresses+rank)); // 获取当前节点的代理地址
  NCCLCHECK(bootstrapAllGather(state, state->peerProxyAddresses, sizeof(union ncclSocketAddress))); // 全局收集代理地址
  uint64_t randId; // 随机ID
  NCCLCHECK(getRandomData(&randId, sizeof(randId))); // 生成随机数据
  state->peerProxyAddressesUDS[rank] = getPidHash()+randId; // 生成唯一的UDS名称
  NCCLCHECK(bootstrapAllGather(state, state->peerProxyAddressesUDS, sizeof(*state->peerProxyAddressesUDS))); // 全局收集UDS名称
  NCCLCHECK(ncclProxyInit(comm, proxySocket, state->peerProxyAddresses, state->peerProxyAddressesUDS)); // 初始化代理
 
  // 记录完成初始化的消息
  TRACE(NCCL_INIT, "rank %d nranks %d - DONE", rank, nranks);
 
  // 返回成功状态
  return ncclSuccess;
}

初始化通信,所有进程使用相同的UniqueID调用ncclCommInitRank函数初始化通信,一般每个GPU都有一个独立的ncclComm,NCCL根据UniqueID和各自的网络配置(IP地址+端口号)建立Socket连接构建通信拓扑。