NVIDIA NCCL 源码学习(十四)- NVLink SHARP

发布于:2024-04-20 ⋅ 阅读:(51) ⋅ 点赞:(0)

背景

上节我们介绍了IB SHARP的工作原理,进一步的,英伟达在Hopper架构机器中引入了第三代NVSwitch,就像机间IB SHARP一样,机内可以通过NVSwitch执行NVLink SHARP,简称nvls,这节我们会介绍下NVLink SHARP如何工作的。

后续为了方便都是以nranks为2举例的,但值得注意的是nranks为2实际上不会用到nvls。

图搜索

ncclResult_t ncclNvlsInit(struct ncclComm* comm) {
  ...
  if (comm->nvlsSupport == 1) comm->nvlsChannels = std::max(comm->config.minCTAs, std::min(comm->config.maxCTAs, (int)ncclParamNvlsChannels()));
  return ncclSuccess;
}

init过程中主要是判断是否支持,如果支持的话会将nvlsSupport设置为1,然后设置comm->nvlsChannels,这里需要注意下,这个是kernel实际启动的block数,而不是搜索出来的channel数。

然后开始执行搜索过程,nvls的channel和其他算法搜索出来的channel不太一样,我们具体看下

  nvlsGraph.pattern = NCCL_TOPO_PATTERN_NVLS;
  nvlsGraph.minChannels = 1; 
  nvlsGraph.maxChannels = MAXCHANNELS;
  if (comm->nvlsSupport) {
    NCCLCHECKGOTO(ncclTopoCompute(comm->topo, &nvlsGraph), ret, fail);
    NCCLCHECKGOTO(ncclTopoPrintGraph(comm->topo, &nvlsGraph), ret, fail);
  }

设置pattern为NCCL_TOPO_PATTERN_NVLS,然后开始搜索,通过pattern确定到backToNet和backToFirstRank均为-1。

ncclResult_t ncclTopoSearchRec(struct ncclTopoSystem* system, struct ncclTopoGraph* graph, struct ncclTopoGraph* saveGraph, int* time) {
  int backToNet, backToFirstRank;
  NCCLCHECK(ncclTopoSearchParams(system, graph->pattern, &backToNet, &backToFirstRank));
  if (system->nodes[NET].count) {
  } else {
    if (graph->pattern == NCCL_TOPO_PATTERN_NVLS) {
      NCCLCHECK(ncclTopoSearchTryGpu(system, graph, saveGraph, 0, backToNet, backToFirstRank, 0, time, -1, -1, graph->nChannels));
      return ncclSuccess;
    } 
    ...
  }
  return ncclSuccess;
}

此时graph->nChannels为0。

ncclResult_t ncclTopoSearchTryGpu(struct ncclTopoSystem* system, struct ncclTopoGraph* graph, struct ncclTopoGraph* saveGraph, int step, int backToNet, int backToFirstRank, int forcedOrder, int *time, int type, int index, int g) {
  const uint64_t flag = 1ULL<<(graph->nChannels);
  struct ncclTopoNode* gpu;
  NCCLCHECK(ncclTopoFollowPath(system, graph, type, index, GPU, g, 1, &gpu));
  if (gpu) {
    gpu->used ^= flag;
    NCCLCHECK(ncclTopoSearchRecGpu(system, graph, saveGraph, gpu, step, backToNet, backToFirstRank, forcedOrder, time));
    gpu->used ^= flag;
    NCCLCHECK(ncclTopoFollowPath(system, graph, type, index, GPU, g, -1, &gpu));
  } 
  return ncclSuccess;
}     

由于type为-1,因此ncclTopoFollowPath直接返回gpu0,从gpu0开始搜索。

ncclResult_t ncclTopoSearchRecGpu(struct ncclTopoSystem* system, struct ncclTopoGraph* graph, struct ncclTopoGraph* saveGraph, struct ncclTopoNode* gpu, int step, int backToNet, int backToFirstRank, int forcedOrder, int *time) {
  if ((*time) <= 0) return ncclSuccess;
  (*time)--;

  int ngpus = system->nodes[GPU].count;
  if (step == ngpus) {
  }
  graph->intra[graph->nChannels*ngpus+step] = gpu->gpu.rank;
  int g = gpu - system->nodes[GPU].nodes;
  if (step == backToNet) {
  } else if (graph->pattern == NCCL_TOPO_PATTERN_NVLS) {
    NCCLCHECK(ncclTopoSearchTryNvls(system, graph, saveGraph, g, ngpus, time));
  } else if (step < system->nodes[GPU].count-1) {
  } else if (step == backToFirstRank) {
  } else {
  }
  return ncclSuccess;
}

将0号GPU填到graph->intra,由于pattern为NCCL_TOPO_PATTERN_NVLS,因此直接执行ncclTopoSearchTryNvls。

ncclResult_t ncclTopoSearchTryNvls(struct ncclTopoSystem* system, struct ncclTopoGraph* graph, struct ncclTopoGraph* saveGraph, int g, int ngpus, int *time) {
  struct ncclTopoNode* nvs;
  struct ncclTopoNode* gpu;
  int d0=0; // See if there is enough bandwidth for NVS->GPU traffic
  do {
    NCCLCHECK(ncclTopoFollowPath(system, graph, NVS, 0, GPU, d0, d0 == g ? 2 : 1, &gpu));
    d0++;
  } while (gpu && d0 < system->nodes[GPU].count);
  if (gpu == NULL) {
    d0--;
  } else {
    int d1=0; // See if there is enough bandwidth for GPU->NVS traffic
    do {
      NCCLCHECK(ncclTopoFollowPath(system, graph, GPU, d1, NVS, 0, d1 == g ? 2 : 1, &nvs));
      d1++;
    } while (nvs && d1 < system->nodes[GPU].count);
    if (nvs == NULL) {
      d1--;
    } else { // Both directions worked. Move on to the next path.
      NCCLCHECK(ncclTopoSearchRecGpu(system, graph, saveGraph, NULL, ngpus, -1, -1, 0, time));
    }
    while (d1) {
      d1--;
      NCCLCHECK(ncclTopoFollowPath(system, graph, GPU, d1, NVS, 0, d1 == g ? -2 : -1, &nvs));
    }
  }
  while (d0) {
    d0--;
    NCCLCHECK(ncclTopoFollowPath(system, graph, NVS, 0, GPU, d0, d0 == g ? -2 : -1, &gpu));
  }
  return ncclSuccess;
}

这里就是判断带宽是否满足要求,我们先看下实际机器中GPU和NVSwitch的拓扑如图1所示

在这里插入图片描述

图 1

但是因为NVSwitch对用户来说是透明的,因此NCCL中构建的拓扑实际下所示

在这里插入图片描述

图 2

假设现在的搜索条件中带宽为bw,g为GPU0,那么这里搜索一个channel的逻辑是判断所有GPU节点到NVSwitch的双向带宽是否大于bw,如果大于的话,则减去bw,特殊的是GPU0,需要判断现有链路带宽是否大于2 * bw,这个原因后边会介绍。

在这里插入图片描述

图 3

然后继续执行ncclTopoSearchRecGpu,注意这里step指定为了ngpus,所以就完成了一个channel的搜索,nChannels变为1,那么下次执行ncclTopoSearchTryGpu的时候将从GPU1开始,重复这一过程知道搜索到了ngpus个channel,以四卡为例,搜索出的channel如下所示

0
1
2
3

其他算法的channel表示了节点内的传输顺序,而nvls搜索出来的channel不太一样,比如第一个channel里的0,nccl称为nvlsHead,他用来表示某一段内存的reduce之类的工作由谁来负责,后边我们会看到。

channel连接

static ncclResult_t connectNvls(struct ncclComm* comm, int* nvlsHeads, struct ncclTopoGraph* nvlsGraph) {
  int nHeads = nvlsGraph->nChannels;
  int headRank = -1; 
  for (int h=0; h<nHeads; h++) {
    if (nvlsGraph->intra[h*comm->localRanks] == comm->rank) headRank = h;
  }
  for (int c=0; c<comm->nvlsChannels; c++) {
    struct ncclChannel* channel = comm->channels+c;
    channel->nvls.nHeads = nHeads;
    for (int h=0; h<nHeads; h++) channel->nvls.up[h] = comm->nRanks+1+h;
    for (int h=nHeads; h<NCCL_MAX_NVLS_ARITY; h++) channel->nvls.up[h] = -1; 
    channel->nvls.down = comm->nRanks+1+headRank;
    channel->nvls.out = -1;       // NVLS+SHARP not yet implemented.
    channel->nvls.headRank = headRank;
    channel->nvls.treeUp = channel->nvls.treeDown[0] = channel->nvls.treeDown[1] = channel->nvls.treeDown[2] = -1; 
    channel->nvls.node = comm->node;
    channel->nvls.nNodes = comm->nNodes;
  }
  if (comm->nNodes == 1) return ncclSuccess;
}

计算headRank,即第几个channel的节点是自己这个rank,然后开始设置所有的nvls channel,这里的up和down用于索引peers,由于从nRanks+1开始才是nvls的链接,所以这里要加上nRanks+1,up是所有的head,down是headRank,其实就是自己。

内存注册

内存注册的整体流程如图4所示,首先通过cuMulticastCreate创建一个multicast对象,图中handle指向这个multicast对象,然后每个GPU通过cuMulticastAddDevice将当前device和这个multicast对象关联起来,然后申请显存,最后通过cuMulticastBindAddr或者cuMulticastBindMem将申请到的显存和handle关联起来。
在这里插入图片描述

图 4

ncclResult_t ncclNvlsSetup(struct ncclComm* comm, struct ncclComm* parent) {
    ...
    size_t buffSize = comm->buffSizes[NCCL_PROTO_SIMPLE];
    size_t memSize = NVLS_MEM_ALIGN_SIZE;
    size_t nvlsPerRankSize = nChannels * 2 * (buffSize + memSize);
    size_t nvlsTotalSize = nvlsPerRankSize * nHeads;
    char* shareableHandle = resources->shareableHandle;
    NCCLCHECKGOTO(nvlsGetProperties(comm, resources, dev, comm->localRanks, nvlsTotalSize), res, cleanup);
    ...
}

buffSize为SIMPLE协议的buff大小,memSize用于保存head,tail,然后计算一共需要分配多少内存,nHeads为搜索出来的channel数量,即nRanks,后边会看到为什么内存大小是这样的,然后将内存总大小和localRanks等信息保存到resources。

ncclResult_t ncclNvlsSetup(struct ncclComm* comm, struct ncclComm* parent) {
    ...
    if (comm->localRank == 0) {
      NCCLCHECKGOTO(nvlsGroupCreate(comm, &resources->properties, comm->localRank, comm->localRanks, &resources->mcHandle, shareableHandle), res, cleanup);
      NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shareableHandle, NVLS_HANDLE_SIZE), res, cleanup);
    } else {
      NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shareableHandle, NVLS_HANDLE_SIZE), res, cleanup);
      NCCLCHECKGOTO(nvlsGroupConnect(comm, shareableHandle, comm->localRankToRank[0], &resources->mcHandle), res, cleanup);
    }   
    ...
}

rank0执行nvlsGroupCreate,通过cuMulticastCreate创建一个multicast对象,保存在resources->mcHandle,由于要跨进程共享,所以需要转成shareable handle,这里是直接memcpy的。

ncclResult_t nvlsGroupCreate(struct ncclComm *comm, CUmulticastObjectProp *prop, int rank, unsigned int nranks, CUmemGenericAllocationHandle *mcHandle, char *shareableHandle) {
  size_t size = prop->size;
  CUCHECK(cuMulticastCreate(mcHandle, prop));
  memcpy(shareableHandle, mcHandle, sizeof(CUmemGenericAllocationHandle));
  return ncclSuccess;
}

然后所有rank执行bootstrapIntraNodeBroadcast,rank0将multicast对象的共享handle广播到所有rank的shareableHandle,其他rank在得到shareableHandle之后通过nvlsGroupConnect转成mcHandle。然后通过cuMulticastAddDevice将当前卡bind到mcHandle,这样所有rank就都拿到了mcHandle对应的multicast对象。
ncclResult_t nvlsGroupBindMem(struct ncclComm *comm, struct ncclNvlsSharedRes* resources) {
  size_t size = resources->size;
  size_t granularity;
  CUdeviceptr ptr = 0;
  CUmemAllocationProp prop;

  memset(&prop, 0, sizeof(prop));
  prop.type = CU_MEM_ALLOCATION_TYPE_PINNED;
  prop.location.type = CU_MEM_LOCATION_TYPE_DEVICE;
  prop.location.id = resources->dev;
  prop.requestedHandleTypes = NVLS_CU_MEM_HANDLE_TYPE;
  CUCHECK(cuMemGetAllocationGranularity(&granularity, &prop, CU_MEM_ALLOC_GRANULARITY_RECOMMENDED));
  resources->ucGran = granularity;

  // Map a VA for UC memory
  CUCHECK(cuMemAddressReserve(&ptr, size, granularity, 0U, 0));

  // Alloc local physical mem for this NVLS group
  CUCHECK(cuMemCreate(&resources->ucHandle, size, &prop, 0));
  CUCHECK(cuMemMap(ptr, size, 0, resources->ucHandle, 0));
  CUCHECK(cuMemSetAccess(ptr, size, &resources->accessDesc, 1));
  CUDACHECK(cudaMemset((void*)ptr, 0, size));
  resources->ucBuff = (char*)ptr;

  CUCHECK(cuMulticastBindMem(resources->mcHandle, 0/*mcOffset*/, resources->ucHandle, 0/*memOffset*/, size, 0/*flags*/));

  return ncclSuccess;
}


然后开始分配物理内存并映射到虚拟地址空间,首先预留一段虚拟地址空间到ptr,然后分配物理内存到ucHandle,再将ucHandle指向的物理内存map到ptr,将ptr赋值给ucBuff,如图5所示。

在这里插入图片描述

图 5

最后通过cuMulticastBindMem将ucHandle对应的物理内存bind到mcHandle。

然后开始执行nvlsGroupMapMem将mcHandle映射到虚拟地址空间。

ncclResult_t nvlsGroupMapMem(struct ncclComm *comm, struct ncclNvlsSharedRes* resources) {
  size_t size = resources->size;
  CUdeviceptr ptr = 0;

  // Create a VA for the NVLS
  CUCHECK(cuMemAddressReserve(&ptr, size, resources->granularity, 0U, 0));
  // Map the VA locally
  CUCHECK(cuMemMap(ptr, size, 0, resources->mcHandle, 0));
  resources->mcBuff = (char*)ptr;
  INFO(NCCL_NVLS, "NVLS Mapped MC buffer at %p size %zi", resources->mcBuff, size);

  // Having completed the BindMem we can now call SetAccess
  // NB: It will block until all ranks have bound to the Group
  CUCHECK(cuMemSetAccess((CUdeviceptr)resources->mcBuff, size, &resources->accessDesc, 1));

  return ncclSuccess;
}


同样的,预留虚拟地址空间空间到ptr,然后将mcHandle映射到ptr,保存在mcBuff。此时如图6所示

在这里插入图片描述

图 6

此时这块物理内存被映射到了ucBuff和mcBuff,ucBuff是Unicast buffer,对他的访问只会影响到当前device的内存,mcBuff是Multicast buffer,对他的访问将被NVSwitch广播到所有被添加到mcHandle的device。

然后开始将内存记录到各个peer的connection的buff。

ncclResult_t ncclNvlsSetup(struct ncclComm* comm, struct ncclComm* parent) {
	...
	for (int h = 0; h < nHeads; h++) {
      int nvlsPeer = comm->nRanks + 1 + h;
      for (int c = 0; c < nChannels; c++) {
        struct ncclChannel* channel = comm->channels + c;
        char* mem = NULL;
        struct ncclChannelPeer* peer = channel->peers[nvlsPeer];

        // Reduce UC -> MC
        mem = resources->ucBuff + (h * 2 * nChannels + c) * (buffSize + memSize);
        peer->send[1].transportComm = &nvlsTransport.send;
        peer->send[1].conn.buffs[NCCL_PROTO_SIMPLE] = mem;
        peer->send[1].conn.head = (uint64_t*)(mem + buffSize);
        peer->send[1].conn.tail = (uint64_t*)(mem + buffSize + memSize / 2);
        mem = resources->mcBuff + (h * 2 * nChannels + c) * (buffSize + memSize);
        peer->recv[0].transportComm = &nvlsTransport.recv;
        peer->recv[0].conn.buffs[NCCL_PROTO_SIMPLE] = mem;
        peer->recv[0].conn.head = (uint64_t*)(mem + buffSize);
        peer->recv[0].conn.tail = (uint64_t*)(mem + buffSize + memSize / 2);
        peer->recv[0].conn.flags |= NCCL_NVLS_MIN_POLL;

        // Broadcast MC -> UC
        mem = resources->ucBuff + ((h * 2 + 1) * nChannels + c) * (buffSize + memSize);
        peer->recv[1].transportComm = &nvlsTransport.recv;
        peer->recv[1].conn.buffs[NCCL_PROTO_SIMPLE] = mem;
        peer->recv[1].conn.head = (uint64_t*)(mem + buffSize);
        peer->recv[1].conn.tail = (uint64_t*)(mem + buffSize + memSize / 2);
        mem = resources->mcBuff + ((h * 2 + 1) * nChannels + c) * (buffSize + memSize);
        peer->send[0].transportComm = &nvlsTransport.send;
        peer->send[0].conn.buffs[NCCL_PROTO_SIMPLE] = mem;
        peer->send[0].conn.head = (uint64_t*)(mem + buffSize);
        peer->send[0].conn.tail = (uint64_t*)(mem + buffSize + memSize / 2);
        peer->send[0].conn.flags |= NCCL_NVLS_MIN_POLL;

        CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->send[0], &peer->send[0].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);
        CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->recv[0], &peer->recv[0].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);
        CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->send[1], &peer->send[1].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);
        CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->recv[1], &peer->recv[1].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);
      }
    }
    ...
}

这一过程完成后将如图7所示。
在这里插入图片描述

图 7

图中黄色的为ucBuff,蓝色的为mcBuff,mcBuff即PTX中的multimem,multimem操作如下:

The multimem.* operations operate on multimem addresses and accesses all of the multiple memory locations which the multimem address points to.

以ld_reduce为例:

multimem.ld_reduce{.ldsem}{.scope}{.ss}.op.type d, [a];

假设GPU0对peer[0]->send[1].buff执行multimem.ld_reduce,这将会load GPU0和GPU1对应位置的数据,然后执行reduce,结果保存在d。

以ReduceScatter为例介绍一下kernel流程

2.19版本改动较多,所以在看nvls kernel之前,我们先介绍下新版kernel执行的过程。

kernel的转发

首先看下kernel是如何launch的,以及如何一步步执行到对应的proto,algo等对应的device函数中的。
由于有多种api,reduce类型,数据类型,算法,协议,而kernel个是这些变量笛卡尔积,所以nccl用generate.py生成这些kernel定义,主要生成两个数组,ncclDevKernelForFunc和ncclDevFuncTable,分别为global函数和device函数。

static ncclResult_t scheduleCollTasksToPlan(...) {
	  ...
      NCCLCHECK(computeColl(&info, &workFuncIndex, &workElem, &proxyOp));

      ...
      if (!plan->kernelSpecialized) {
        plan->kernelFn = ncclDevKernelForFunc[workFuncIndex];
        plan->kernelSpecialized = ncclDevKernelForFuncIsSpecialized[workFuncIndex];
      }
      ...
}

enqueue过程通过computeColl计算出workFuncIndex,然后记录下kernelFn为ncclDevKernelForFunc[workFuncIndex],以ReduceScatter sum为例,得到的workFuncIndex为485,查询ncclDevKernelForFunc[485]为ncclDevKernel_ReduceScatter_Sum_f32_RING_LL,那么launch kernel就会执行这一kernel,这里注意下,我们实际用的是SIMPLE协议,但是这个kerne为LL,我们看下如何进一步转发的。

DEFINE_ncclDevKernel(ReduceScatter_Sum_f32_RING_LL, ncclFuncReduceScatter, FuncSum, float, NCCL_ALGO_RING, NCCL_PROTO_LL, 483)
#define DEFINE_ncclDevKernel(suffix, coll, redop, ty, algo, proto, specializedFnId) \
  __global__ void ncclDevKernel_##suffix(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) { \
    ncclKernelMain<specializedFnId, RunWork<coll, ty, redop<ty>, algo, proto>>(comm, channelMask, workHead); \
  }

ncclDevKernel_ReduceScatter_Sum_f32_RING_LL这个函数定义如上,specializedFnId为483,然后直接执行ncclKernelMain。

在看ncclKernelMain之前我们先看下现有参数信息存放。

__shared__ ncclShmemData ncclShmem;

struct ncclShmemGroup {
  ncclConnInfo *recvConns[NCCL_MAX_NVLS_ARITY];
  ncclConnInfo *sendConns[NCCL_MAX_NVLS_ARITY];
  void* srcs[NCCL_MAX_NVLS_ARITY+1];
  void* dsts[NCCL_MAX_NVLS_ARITY+1];
  union {
    unpackGroupShmem unpack;
  } devicePlugin;
};

struct ncclShmemData {
  struct ncclShmemGroup groups[NCCL_MAX_GROUPS];
  uint64_t redOpArgs[NCCL_MAX_NVLS_ARITY+1];
  int channelId;
  int aborted;
  alignas(16) struct ncclDevComm comm;
  alignas(16) struct ncclDevChannel channel;
  alignas(16) struct ncclWork work;
  alignas(16) union {
    unpackShmem unpack;
  } devicePlugin;
};

ncclShmem位于共享内存,存储了kernel需要的参数信息,比如channelId,comm,channel等,一个block中的线程都会使用这些信息用于收发数据。之前版本中一个block所有线程的peer是一样的,而新版中不同线程可能会对应不同的peer,比如send/recv,一个block可以收发8个peer,再比如本节介绍的nvls,一个block中不同warp使用流水线的方式完成整体流程,因此引入了数据结构ncclShmemGroup groups,一个group表示执行相同逻辑的线程组,group所需要的conn,srcs,dsts等信息存储在groups中。

template<int SpecializedFnId, typename SpecializedRunWork>
__device__ void ncclKernelMain(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) {
  int tid = threadIdx.x;
  if (tid < WARP_SIZE) {
    int x = tid;
    if (channelMask & (1ull<<x)) {
      int y = __popcll(channelMask & ((1ull<<x)-1));
      if (blockIdx.x == y) ncclShmem.channelId = x;
    }   
    ... 
  }
  __syncthreads(); // publish ncclShmem.channelId
  int channelId = ncclShmem.channelId;
  ...
}

选择block和channel的对应关系,就是计算当前block应该处理channel。

template<int SpecializedFnId, typename SpecializedRunWork>
__device__ void ncclKernelMain(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) {
  ...
  while (true) {
    // Notify host that all fifo reads are complete.
    ...
    if (0 <= SpecializedFnId && ncclShmem.work.header.funcIndex == (unsigned)SpecializedFnId) {
      SpecializedRunWork().run(&ncclShmem.work);
    } else {
      ncclDevFuncTable[ncclShmem.work.header.funcIndex]();
    }   

    int workIxNext = ncclShmem.work.header.workNext;
    __syncthreads();
	...
  }
  ...
}	

funcIndex为485,而SpecializedFnId为483,因此会再次去ncclDevFuncTable中找funcIndex对应的函数,函数为ncclDevFunc_ReduceScatter_Sum_f32_RING_SIMPLE,这样就找到了需要执行的函数。

DEFINE_ncclDevFunc(ReduceScatter_Sum_f32_RING_SIMPLE, ncclFuncReduceScatter, FuncSum, float, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE)

#define DEFINE_ncclDevFunc(suffix, coll, redop, ty, algo, proto) \
  __device__ void ncclDevFunc_##suffix() { \
    RunWork<coll, ty, redop<ty>, algo, proto>().run(&ncclShmem.work); \
  }

template<ncclFunc_t Fn, typename T, typename RedOp, int Algo, int Proto>
struct RunWork {
  // This __forceinline__ is necessary. The compiler was inserting a function call
  // here from the LL ncclKernel.
  __device__ __forceinline__ void run(ncclWork *w) {
    int wid = threadIdx.x / WARP_SIZE;
    ncclWorkElem* we = w->header.type == ncclWorkTypeRegColl ? &w->regElems[0].elem : &w->elems[0];
    int stride = w->header.type == ncclWorkTypeRegColl ? sizeof(ncclWorkElemReg) : sizeof(ncclWorkElem);
    #pragma unroll 1
    while ((char*)we + stride <= (char*)(w+1) && we->isUsed) {
      if (wid < we->nWarps) {
        RunWorkElement<Fn, T, RedOp, Algo, Proto>().run(we);
      }
      we = (ncclWorkElem*)((char*)we + stride);
    }
  }
};

我们看下这个函数的定义,可以得到,Fn为ncclFuncReduceScatter,T为float,RedOp为FuncSum<float>,algo为NCCL_ALGO_RING,协议为NCCL_PROTO_SIMPLE,然后开始执行runRing

  template<typename T, typename RedOp, typename Proto>
  __device__ __forceinline__ void runRing(ncclWorkElem *args) {
    ...
    const ssize_t loopSize = nChannels*chunkSize;
    const ssize_t size = args->count;
    
    Primitives<T, RedOp, FanSymmetric<1>, 0, Proto, 0>
      prims(tid, nthreads, &ring->prev, &ring->next, args->sendbuff, args->recvbuff, args->redOpArg);
      
    for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
      ssize_t realChunkSize;
      ...
      realChunkSize = int(realChunkSize);

      ssize_t chunkOffset = gridOffset + bid*int(realChunkSize);

      /// begin ReduceScatter steps ///
      ssize_t offset;
      int nelem = min(realChunkSize, size-chunkOffset);
      int rankDest;

      // step 0: push data to next GPU
      rankDest = ringRanks[nranks-1];
      offset = chunkOffset + rankDest * size;
      prims.send(offset, nelem);

第一步,执行自己rank对应的block数据的send,send就是将数据发送到下一个rank的buffer。


      // k-2 steps: reduce and copy to next GPU
      for (int j=2; j<nranks; ++j) {
        rankDest = ringRanks[nranks-j];
        offset = chunkOffset + rankDest * size;
        prims.recvReduceSend(offset, nelem);
      }

然后接下来的nranks - 2次步骤,执行recvReduceSend,就是将前一个rank发送到自己buffer中的数据和自己的用户输入数据中对应位置执行reduce,然后发送给下一个rank。

      // step k-1: reduce this buffer and data, which will produce the final result
      rankDest = ringRanks[0];
      offset = chunkOffset + rankDest * size;
      prims.recvReduceCopy(offset, chunkOffset, nelem, /*postOp=*/true);
    }
  }
}

最后一次执行recvReduceCopy,就是将前一个rank发送过来的数据和自己用户输入中对应位置执行reduce,并拷贝到用户输出。

primitive初始化

回顾下ReduceScatter中的primitives的构造,recvPeers为ring中的前一个rank,sendPeers为ring中下一个rank,inputBuf和outputBuf为用户执行api提供的输入输出buff,group为默认参数0。redOpArg用于比如mean的操作,会被设置为nranks,在reduceCopy的时候会除以nranks,本例为sum操作,可以忽略redOpArg。

      Primitives<T, RedOp, FanSymmetric<1>, 0, Proto, 0>
        prims(tid, nthreads, &ring->prev, &ring->next, args->sendbuff, args->recvbuff, args->redOpArg);


  __device__ Primitives(
      int tid, int nthreads, int const *recvPeers, int const *sendPeers,
      void const *inputBuf, void *outputBuf, uint64_t redOpArg, uint8_t group=0,
      uint8_t connIndexRecv = 0, uint8_t connIndexSend = 0, struct ncclWorkElem* e = nullptr, int stepSize_=0
    ):
    tid(tid), nthreads(nthreads), tidInBlock(threadIdx.x), group(group),
    stepSize(stepSize_ == 0 ? ncclShmem.comm.buffSizes[NCCL_PROTO_SIMPLE]/NCCL_STEPS/sizeof(T) : stepSize_) {
    } 

模板参数中Direct和P2p为0,Fan为FanSymmetric<1>,作用是记录有几个recv和send,此时MaxRecv和MaxArity均为1。

template<int MaxArity>
struct FanSymmetric {
  static constexpr int MaxRecv = MaxArity, MaxSend = MaxArity;
  int n;
  FanSymmetric() = default;
  __device__ FanSymmetric(int nrecv, int nsend): n(nrecv) {
    // assert(nrecv == nsend && nrecv <= MaxArity);
  }
  __device__ int nrecv() const { return n; }
  __device__ int nsend() const { return n; }
};

然后继续看初始化过程

  __device__ Primitives(...)
    // For send operations, we need an extra warp to overlap the threadfence and the copy
    this->nworkers = nthreads - (MaxSend > 0 && nthreads-WARP_SIZE >= 64 ? WARP_SIZE : 0); 

    int nrecv=0, nsend=0;
    while (nrecv < MaxRecv && recvPeers[nrecv] != -1) nrecv++;
    while (nsend < MaxSend && sendPeers[nsend] != -1) nsend++;
    this->fan = Fan(nrecv, nsend);

    constexpr int ThreadPerSync = 8;
    static_assert(MaxSend <= ThreadPerSync && MaxRecv <= ThreadPerSync, "Not enough threads to cover all peers");

    int g = tid / ThreadPerSync;
    int ng = nthreads / ThreadPerSync;
    index = tid % ThreadPerSync;
    flags = 0;
    if (g == 0) {
      if (index < nrecv) flags |= RoleWaitRecv;
      if (index == nrecv) flags |= RoleInput;
    } else if (g == 1) {
      if (index < nsend) flags |= RoleWaitSend;
      if (index == nsend) flags |= RoleOutput;
    } else if (g == ng - 2) {
      if (index < nrecv) flags |= RolePostRecv;
    } else if (g == ng - 1) {
      if (index < nsend) flags |= RolePostSend;
    }
    ...
  }

nthreads为执行的总线程数,本例子中等于block中的线程数,nworkers是实际干活的线程数,由于发送的时候需要一个warp执行threadfence,因此实际干活的线程数为nthreads减去一个warp,不过当总的warp数少的时候就不会使用独立的同步warp。记录nsend和nrecv,此时均为1。

然后开始设置每个线程的role,将nthreads按照8分成多个小组,假设一共有n-1个组,假设recvPeer和sendPeer都有两个,那么各个线程的role分配如图8所示,其中WaitRecv表示这个线程负责等待直到fifo中有数据可以接收,本例中g[0]的thr[0]负责等待第0个recvPeer,thr[1]负责第1个recvPeer,Input线程负责写入用户buff的地址,PostRecv负责在接收到数据后通知recvPeer,g[n-2]的thr[0]负责通知第0个recvPeer,thr[1]负责通知第1个recvPeer,同理对于send。
在这里插入图片描述

图 8

  __device__ __forceinline__ void loadRecvConn(ncclDevChannelPeer *peer, int connIndex, struct ncclWorkElem* e) {
    if (flags & (RoleWaitRecv|RolePostRecv)) {
      auto *conn = &peer->recv[connIndex];
      step = conn->step;
      step = roundUp(step, SlicePerChunk*StepPerSlice);
      if (flags & RolePostRecv) {
        connStepPtr = conn->head;
        *connStepPtr = step; // Return credits in case we rounded up.
      }
      if (flags & RoleWaitRecv) {
        ncclShmem.groups[group].recvConns[index] = conn; // WaitRecv role saves since that's who needs it in setDataPtrs()
        flags |= (conn->flags & NCCL_NVLS_MIN_POLL) ? NvlsMinPolling : 0;
        connStepPtr = conn->tail;
        connStepCache = loadStepValue(connStepPtr);
        flags |= (conn->offsFifo != nullptr) ? OffsFifoEnabled : 0;
        if (Direct) {
            ...
        }
        if (flags & OffsFifoEnabled)
          connOffsFifoPtr = conn->offsFifo;
        connEltsFifo = (T*)conn->buffs[NCCL_PROTO_SIMPLE];
      }
    }
  }

只有RoleWaitRecv和RolePostRecv的线程才会执行loadRecvConn,读取step,之前章节中介绍过,step表示在fifo中的位置;RolePostRecv线程负责通知recvPeer,所以需要保存conn中的head指针到connStepPtr,RoleWaitRecv线程负责等待直到fifo中有新的数据,因此需要保存conn中的tail指针到connStepPtr,并将内容cache到connStepCache,以避免频繁的global mem读取,最后将conn->buff,即fifo,记录到connEltsFifo中。

  __device__ __forceinline__ void loadSendConn(ncclDevChannelPeer *peer, int connIndex, struct ncclWorkElem* e) {
    if (flags & (RoleWaitSend|RolePostSend)) {
      auto *conn = &peer->send[connIndex];
      step = conn->step;
      step = roundUp(step, SlicePerChunk*StepPerSlice);
      if (flags & RolePostSend) {
        connStepPtr = conn->tail;
        connEltsFifo = (T*)conn->buffs[NCCL_PROTO_SIMPLE];
      }
      if (flags & RoleWaitSend) {
        ncclShmem.groups[group].sendConns[index] = conn; // WaitSend role saves since that's who needs it in setDataPtrs()
        flags |= (conn->flags & NCCL_NVLS_MIN_POLL) ? NvlsMinPolling : 0;
        connStepPtr = conn->head;
        connStepCache = loadStepValue(connStepPtr);
        flags |= (conn->offsFifo != nullptr) ? OffsFifoEnabled : 0;
        if (flags & OffsFifoEnabled)
          connOffsFifoPtr = conn->offsFifo;
        connEltsFifo = (T*)conn->buffs[NCCL_PROTO_SIMPLE];
        ...
      }
    }
  }

loadSendConn逻辑一样,RolePostSend线程负责通知send peer,因此持有tail指针,RoleWaitSend线程负责等待send peer,因此持有head指针,然后记录fifo。
最后是执行setDataPtrs,设置userBuff为用户的输入输出。

  __device__ void setDataPtrs(void const *inputBuf, void *outputBuf, uint64_t redOpArg, struct ncclWorkElemReg* e) {
    if (flags & RoleInput) {
      userBuff = (T*)inputBuf;
      ncclShmem.redOpArgs[0] = redOpArg;  // scaler for local input
    }   
    if (flags & RoleOutput) userBuff = (T*)outputBuf;
    ...
  }

到这里就完成了初始化。

recvReduceSend

  __device__ __forceinline__ void recvReduceSend(intptr_t inpIx, int eltN, bool postOp=false) {
    genericOp<0, 0, 1, 1, Input, -1>(inpIx, -1, eltN, postOp);
  }
   
  template <int DirectRecv1, int DirectSend1, int Recv, int Send, int SrcBuf, int DstBuf>
  __device__ __forceinline__ void genericOp(
      intptr_t srcIx, intptr_t dstIx, int nelem, bool postOp
    ) { 
    constexpr int DirectRecv = 1 && Direct && DirectRecv1;
    constexpr int DirectSend = 1 && Direct && DirectSend1;
    constexpr int Src = SrcBuf != -1; 
    constexpr int Dst = DstBuf != -1;
    
    nelem = nelem < 0 ? 0 : nelem;
    int sliceSize = stepSize*StepPerSlice;
    sliceSize = max(divUp(nelem, 16*SlicePerChunk)*16, sliceSize/32);
    int slice = 0;
    int offset = 0;
    ...
  }

模板参数中Recv表示是否需要执行recv,Send表示是否执行Send,SrcBuf表示输入中是否有用户的src buff,DstBuf表示输出中是否包含了用户的dst buff。然后计算得到DirectRecv和DirectSend为0,Src为1,Dst为0。

  template <int DirectRecv1, int DirectSend1, int Recv, int Send, int SrcBuf, int DstBuf>
  __device__ __forceinline__ void genericOp(
      intptr_t srcIx, intptr_t dstIx, int nelem, bool postOp
    ) { 
    ...
    if (tid < nworkers && offset < nelem) {
      do {
        sliceSize = sliceSize < nelem-offset ? sliceSize : nelem-offset;
        if (Src && (flags & (SrcBuf==Input ? RoleInput : RoleOutput)))
          ncclShmem.groups[group].srcs[0] = userBuff + srcIx + offset;
        if (Dst && (flags & (DstBuf==Input ? RoleInput : RoleOutput)))
          ncclShmem.groups[group].dsts[0] = userBuff + dstIx + offset;
        waitPeer<DirectRecv, DirectSend, Recv, Send, Src, Dst>(srcIx, dstIx, offset, sliceSize);
        ...
      } while (slice < SlicePerChunk && offset < nelem);
    }   
    ...
  }

2.7.8中负责数据收发的工作线程和负责同步的线程都在一个循环里,会引入很多分支指令影响性能,新版中将逻辑拆分成了两个循环,以提高性能,工作线程执行第一个循环,同步线程执行第二个循环。

RoleInput线程将用户buff填入到srcs[0],然后执行waitPeer,waitPeer函数就是之前的waitSend和waitRecv,会等待直到可以发送和接收数据,并将数据地址填入到srcs和dsts。

  template <int DirectRecv, int DirectSend, int Recv, int Send, int Src, int Dst>
  __device__ __forceinline__ void waitPeer(intptr_t srcIx, intptr_t dstIx, int offset, int nelts) {
    const bool isSendNotRecv = (Send && Recv) ? (flags & RoleWaitSend) : Send;
    const bool noRecvWait = DirectRecv && Src && (flags & DirectRead);        // no wait when directly reading from remote input
    const bool noSendWait = DirectSend && (flags & (DirectRead|DirectWrite)); // no wait in empty send (e.g. directScatter) or direct remote write
    if (((flags & (Recv*RoleWaitRecv)) && !noRecvWait) ||
        ((flags & (Send*RoleWaitSend)) && !noSendWait)) {
      int spins = 0;
      while (connStepCache + (isSendNotRecv ? NCCL_STEPS : 0) < step + StepPerSlice) {
        connStepCache = loadStepValue(connStepPtr);
        if (checkAbort(spins)) break;
      }   
    }
    ...
  }   

noRecvWait和noSendWait都为0,RoleWaitSend线程的isSendNotRecv为1,由于持有的connStepPtr是head指针,所以他判断等待的逻辑是如果head指针加上队列容量小于step + StepPerSlice,那么不能执行send,否则会超过队列容量,因此循环等待;而RoleWaitRecv线程的isSendNotRecv为0,由于持有的connStepPtr是tail指针,所以他判断等待逻辑是如果step + StepPerSlice超过了队尾指针,说明队列中已经没有数据了,那么就需要等待。

  template <int DirectRecv, int DirectSend, int Recv, int Send, int Src, int Dst>
  __device__ __forceinline__ void waitPeer(intptr_t srcIx, intptr_t dstIx, int offset, int nelts) {
    ...
    if (flags & (Recv*RoleWaitRecv | Send*RoleWaitSend)) {
      if (isSendNotRecv && (flags & SizesFifoEnabled))
        connSizesFifoPtr[step%NCCL_STEPS] = nelts*sizeof(T);

      void **ptrs = isSendNotRecv ? (ncclShmem.groups[group].dsts + Dst)
                                  : (ncclShmem.groups[group].srcs + Src);
      if (flags & OffsFifoEnabled)
      else if (isSendNotRecv && DirectSend) {
      } else if (!isSendNotRecv && DirectRecv) {
      }   
      else {
        ptrs[index] = connEltsFifo + (step%NCCL_STEPS)*stepSize;
      }   
      step += StepPerSlice;
    }   
  }

然后开始填充srcs和dsts数组,就是将自己持有fifo对应的slot填进去,然后更新step。所以对于recvReduceSend,srcs[0]为用户buff,srcs[1]为前一个ran的fifo,dsts[0]为下一个rank的fifo,因此就能达到前边描述的作用,接收前一个rank的数据,和用户的输入buff执行reduce,然后发送给下一个rank。

  template <int DirectRecv1, int DirectSend1, int Recv, int Send, int SrcBuf, int DstBuf>
  __device__ __forceinline__ void genericOp(
      intptr_t srcIx, intptr_t dstIx, int nelem, bool postOp
    ) { 
    ...
    if (tid < nworkers && offset < nelem) {
      do {
        ...
        subBarrier();
        int workSize = ncclShmem.aborted ? 0 : sliceSize;

        if (DirectRecv && ncclShmem.groups[group].srcs[0] == ncclShmem.groups[group].dsts[0]
        } else if (DirectSend && !DirectRecv && SrcBuf != Input && ncclShmem.groups[group].dsts[Dst] == nullptr) {
        } else {
          constexpr int PreOpSrcs = SrcBuf != Input ? 0 : 
                                    DirectRecv*MaxRecv == NCCL_MAX_DIRECT_ARITY ? (1+NCCL_MAX_DIRECT_ARITY) : 1;
          reduceCopy<Unroll, RedOp, T,
            MultimemSrcs, Recv+Src, Recv*MaxRecv+Src,
            MultimemDsts, Send+Dst, Send*MaxSend+Dst, PreOpSrcs>
            (tid, nworkers, ncclShmem.redOpArgs[0], ncclShmem.redOpArgs, postOp,
             Recv*fan.nrecv()+Src, ncclShmem.groups[group].srcs,
             Send*fan.nsend()+Dst, ncclShmem.groups[group].dsts,
             workSize);
        }   
        barrier(); // This barrier has a counterpart in following loop
        postPeer<Recv, Send>(0 < sliceSize);
        offset += sliceSize;
        slice += 1;
      } while (slice < SlicePerChunk && offset < nelem);
    }   

    while (slice < SlicePerChunk) {
      sliceSize = sliceSize < nelem-offset ? sliceSize : nelem-offset;
      barrier(); // Has couterpart in preceding worker-only loop.
      postPeer<Recv, Send>(0 < sliceSize);
      offset += sliceSize;
      slice += 1;
    }   
  }

waitPeer完成之后,说明可以执行数据收发了,这里先执行subBarrier(),作用是同步一下所有的工作线程,保证在waitPeer完成之后才进入收发数据的逻辑。然后执行reduceCopy将数据从srcs完成reduce并拷贝到dsts。然后执行barrier(),barrier所有的线程,即工作线程加同步线程,因为同步线程只有在等到数据收发结束才能开始post,然后看下同步线程执行的postPeer。

  template<int Recv, int Send>
  inline __device__ void postPeer(bool dataStored) {
    if (flags & (Recv*RolePostRecv | Send*RolePostSend)) {
      step += StepPerSlice;
      if (Send && (flags & RolePostSend) && dataStored) fence_acq_rel_sys();
      st_relaxed_sys_global(connStepPtr, step);
    }   
  }

RolePost类线程需要更新step,然后将step写入到connStepPtr,对于RolePostRecv,持有的是head指针,直接接入就好;对于RolePostSend,持有的是tail指针,为了保证先完成数据的写之后再完成post,需要加一个fence,这里使用了acq_rel的屏障,其实这个场景使用release语义就是足够的,不过查了一下PTX,好像没有单独的release语义指令。对于读数据的场景,也是需要配对使用读屏障,但是nccl的实现使用的是volatile,这样可以bypass L1 cache,因此不需要使用屏障。

nvls

ReduceScatter kernel

      if (tid < tidEndScatter) {
        // Scatter
        using Proto = ProtoSimple<1, 1, COLL_UNROLL>;
        Primitives<T, RedOp, FanAsymmetric<0, NCCL_MAX_NVLS_ARITY>, /*Direct=*/0, Proto, 0>
          prims(tid, nThreadsScatter, NULL, nvls->up, args->sendbuff, NULL,
            args->redOpArg, 0 * Proto::MaxGroupWidth, 1, 1); 
        for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
          ssize_t offset = gridOffset + bid * chunkSize;
          int nelem = min(chunkSize, size - offset);
          prims.scatter(offset, nvls->nHeads * size, nelem, size, -1, 0);
        }
      } 

scatter线程会通过prim执行scatter操作,sendPeers为up,因此就是所有的rank,inputBuf为用户的输入args->sendbuff,connIndexSend为1,因此load第1个send conn。

  __device__ __forceinline__ void
  scatter(intptr_t inpIx, ssize_t totalElem, int peerElem, ssize_t peerOffset, int skip, int shift) {
    ScatterGatherOp<0, 0, 0, 1>(inpIx, -1, totalElem, peerElem, peerOffset, skip, shift, /*postOp=*/false);
  }

  template <int DirectRecv1, int DirectSend1, int Recv, int Send>
  __device__ __forceinline__ void
  ScatterGatherOp(intptr_t inpIx, intptr_t outIx, ssize_t totalElem, int peerElem, ssize_t peerOffset, int skip, int shift, bool postOp) {
    constexpr int DirectRecv = 1 && Direct && DirectRecv1;
    constexpr int DirectSend = 1 && Direct && DirectSend1;
    int offset = 0; // slice offset
    int sliceSize = stepSize*StepPerSlice;
    int dataSize = max(DIVUP(peerElem, 16*SlicePerChunk)*16, sliceSize/32);  // per-peer slice size

    #pragma unroll
    for (int slice=0; slice<SlicePerChunk; ++slice) {
      ssize_t realSize = max(0, min(dataSize, peerElem-offset));
      bool fenceNeeded = false;
      if (tid < nworkers) {
        if (Send) {
          // Scatter pre-scales data of input buffer only in non-Direct case
          constexpr int PreOpSrcs = DirectSend ? 0 : 1;
          if (flags & RoleInput) ncclShmem.groups[group].srcs[0] = userBuff + inpIx + offset;
          // realSize is not accurate here; but intra-node does not rely on sizes FIFO
          waitPeer<0, DirectSend, 0, 1, 1, 0>(0, inpIx, offset, realSize);
          subBarrier();
          #pragma unroll
          // Loop over peers
          for (int j=0; j<fan.nsend(); j++) {
            int i = (j+shift)%fan.nsend();
            ssize_t pOffset = i*peerOffset;
            // Skip the data I am responsible of reducing myself
            if (skip >= 0 && i >= skip) pOffset += peerElem;
            void* src0 = (T*)ncclShmem.groups[group].srcs[0] + pOffset;
            ssize_t realPeerSize = min(realSize, totalElem-pOffset);
            if (realPeerSize > 0 && ncclShmem.groups[group].dsts[i] != nullptr) {
              reduceCopy<Unroll, RedOp, T, 0,1,1, 0,1,1, PreOpSrcs>(tid, nworkers, ncclShmem.redOpArgs[0], ncclShmem.redOpArgs, false, 1, &src0, 1, ncclShmem.groups[group].dsts+i, realPeerSize);
              // Mark for threadfence at the end
              fenceNeeded |= true;
            }
          }
        } else if (Recv) {
        }
      }
      fenceNeeded = barrierAny(fenceNeeded);
      postPeer<Recv, Send>(fenceNeeded);
      offset += realSize;
    }
  }

如图9所示,scatter做的就是将userBuff的数据按照peerOffset的间隔,发送到所有的sendPeer对应的buff,即图中的peer[0]->send[1].buff和peer[1]->send[1].buff。
在这里插入图片描述

图 9

对于reduce线程,sendPeers为NULL,recvPeers为nvls->down,connIndexRecv为1,因此load第一个recv conn,然后执行recv。

	  else if (tid < tidEndReduce) {
        // Reduce through NVLS
        using Proto = ProtoSimple<1, 1, COLL_UNROLL, 1, 0>; 
        Primitives<T, RedOp, FanAsymmetric<1, 0>, /*Direct=*/0, Proto, 0>
          prims(tid - tidEndScatter, nThreadsReduce, &nvls->down, NULL, NULL, args->recvbuff,
            args->redOpArg, 3 * Proto::MaxGroupWidth, 0, 0); 
        for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
          ssize_t offset = gridOffset + bid * chunkSize;
          int nelem = min(chunkSize, size - offset);
          prims.recv(offset, nelem);
        }   
      }   

recv函数中执行会将dst设置为args->recvbuff,src为自己rank对应的Multicast buffer,如图10所示,执行完成之后,GPU0的recvBuff就拿到了所有卡对应值reduce的结果。
在这里插入图片描述

图 10

reduceCopy kernel

以ReduceScatter过程为例,我们看下reduceCopy kernel如何同时支持Unicast buffer和Multicast buffer的。

template<int Unroll, typename RedFn, typename T,
         int MultimemSrcs, int MinSrcs, int MaxSrcs,
         int MultimemDsts, int MinDsts, int MaxDsts, int PreOpSrcs,
         typename IntBytes>
__device__ __forceinline__ void reduceCopy(
    int thread, int nThreads,
    uint64_t redArg, uint64_t *preOpArgs, bool postOp,
    int nSrcs, void **srcPtrs, int nDsts, void **dstPtrs,
    IntBytes nElts
  ) {
  int lane = thread%WARP_SIZE;
  // If a multimem src is present then our biggest pack size is limited to what
  // is supported for this redfn/type.
  constexpr int BigPackSize = (MultimemSrcs == 0) ? 16 : LoadMultimem_BigPackSize<RedFn>::BigPackSize;

  IntBytes nBytesBehind = 0;
  IntBytes nBytesAhead = nElts*sizeof(T);

  #if __cpp_if_constexpr
  if constexpr (BigPackSize > sizeof(T)) {
  #else
  if (BigPackSize > sizeof(T)) {
  #endif
    // Check that all pointers are BigPackSize aligned.
    bool aligned = true;
    if (lane < nSrcs) aligned &= 0 == cvta_to_global(srcPtrs[lane]) % (BigPackSize + !BigPackSize);
    if (lane < nDsts) aligned &= 0 == cvta_to_global(dstPtrs[lane]) % (BigPackSize + !BigPackSize);
    aligned = __all_sync(~0u, aligned);
    if (aligned) {
      reduceCopyPacks<RedFn, T, Unroll, BigPackSize,
        MultimemSrcs, MinSrcs, MaxSrcs, MultimemDsts, MinDsts, MaxDsts, PreOpSrcs>
        (nThreads, /*&*/thread, redArg, preOpArgs, postOp,
         nSrcs, srcPtrs, nDsts, dstPtrs, /*&*/nBytesBehind, /*&*/nBytesAhead);
      if (nBytesAhead == 0) return;

      reduceCopyPacks<RedFn, T, /*Unroll=*/1, BigPackSize,
        MultimemSrcs, MinSrcs, MaxSrcs, MultimemDsts, MinDsts, MaxDsts, PreOpSrcs>
        (nThreads, /*&*/thread, redArg, preOpArgs, postOp,
         nSrcs, srcPtrs, nDsts, dstPtrs, /*&*/nBytesBehind, /*&*/nBytesAhead);
      if (nBytesAhead == 0) return;
    }
  }
  ...
}

模板参数中,MultimemSrcs表示有几个输入是multimem,MultimemDsts表示有几个输出是multimem,入参中thread为tid,nThreads为总的线程数,共有nSrcs个输入,地址位于srcPtrs,共有nDsts个输出,存放于dstPtrs,nElts为元素个数,函数的作用就是将所有的src reduce,然后存到所有的dst。
nBytesBehind表示已经处理过了多少数据,nBytesAhead表示还有多少数据未处理。
然后开始看是否可以用向量化指令,BigPackSize就是load/store指令的粒度,如果输入为非Multimem,那么尝试使用16字节,即128位,如果是multimem,则需要判断Func和数据类型,本例中为FuncSum,因此也是16字节。

  template<typename Fn>
  struct LoadMultimem_BigPackSize {
    using T = typename Fn::EltType;
    static constexpr bool IsSum = std::is_same<Fn, FuncSum<T>>::value ||
                                  std::is_same<Fn, FuncPreMulSum<T>>::value ||
                                  std::is_same<Fn, FuncSumPostDiv<T>>::value;
    static constexpr bool IsMinMax = std::is_same<Fn, FuncMinMax<T>>::value;
    static constexpr bool IsFloat = IsFloatingPoint<T>::value;
    static constexpr int BigPackSize =
      IsFloat && IsSum && sizeof(T) < 8 ? 16 :
      IsFloat && IsSum ? 8 :
      IsFloat && IsMinMax && sizeof(T)==2 ? 16 :
      !IsFloat && (IsSum||IsMinMax) && sizeof(T)>=4 ? sizeof(T) :
      /*multimem.ld_reduce not supported:*/ 0;
  };

使用向量化的前提是输入输出有需要是对齐的,以对齐为例看下reduceCopyPacks的逻辑。

template<typename RedFn, typename T, int Unroll, int BytePerPack,
         int MultimemSrcs, int MinSrcs, int MaxSrcs,
         int MultimemDsts, int MinDsts, int MaxDsts, int PreOpSrcs,
         typename IntBytes>
__device__ __forceinline__ void reduceCopyPacks(
    int nThreads, int &thread,
    uint64_t redArg, uint64_t *preOpArgs, bool postOp,
    int nSrcs, void **srcPtrs, int nDsts, void **dstPtrs,
    IntBytes &nBytesBehind, IntBytes &nBytesAhead
  ) { 

  // A hunk is the amount of contiguous data a warp consumes per loop iteration
  // assuming all threads partake.
  constexpr int BytePerHunk = Unroll*WARP_SIZE*BytePerPack;
  int nWarps = nThreads/WARP_SIZE;
  int warp = thread/WARP_SIZE;
  int lane = thread%WARP_SIZE;
  ...
}

BytePerPack就是BigPackSize,即16字节,假设Unroll为4,那么一个warp的访存模式如图11,一个蓝框的长度为32个16字节,BytePerHunk为一个warp一次性处理的连续数据长度,即4个蓝框,warp里的第一个线程会访问箭头指向的4个蓝框里的第一个16字节。
在这里插入图片描述

图 11

然后初始化各个线程的初始位置

__device__ __forceinline__ void reduceCopyPacks(...) {
  // This thread's initial position.
  IntBytes threadBytesBehind = nBytesBehind + (warp*BytePerHunk + lane*BytePerPack);
  IntBytes threadBytesAhead = nBytesAhead - (warp*BytePerHunk + lane*BytePerPack);
  // Number of hunks to be consumed over all warps.
  IntBytes nHunksAhead = nBytesAhead/(BytePerHunk + !BytePerHunk);
  // Advance collective position.
  nBytesBehind += nHunksAhead*BytePerHunk;
  nBytesAhead -= nHunksAhead*BytePerHunk;
  if (Unroll==1 && BytePerPack <= nBytesAhead) {
    // Only Unroll=1 can do partial hunks (where not all threads partake).
    nHunksAhead += 1;
    nBytesBehind += nBytesAhead - (nBytesAhead%(BytePerPack + !BytePerPack));
    nBytesAhead = nBytesAhead%(BytePerPack + !BytePerPack);
  }
  nHunksAhead -= warp;
  
  RedFn redFn(redArg);
  uintptr_t minSrcs[MinSrcs + !MinSrcs];
  uintptr_t minDsts[MinDsts + !MinDsts];
  #pragma unroll
  for (int s=0; s < MinSrcs; s++)
    minSrcs[s] = cvta_to_global(srcPtrs[s]) + threadBytesBehind;
  #pragma unroll
  for (int d=0; d < MinDsts; d++)
    minDsts[d] = cvta_to_global(dstPtrs[d]) + threadBytesBehind;
  ...
}

threadBytesBehind即当前线程的起始位置,threadBytesAhead为当前线程需要处理的数据量,然后将MinSrcs个src和minDsts个dst指针记录到minSrcs和minDsts。

__device__ __forceinline__ void reduceCopyPacks(...) {
  ...
  while (Unroll==1 ? (BytePerPack <= threadBytesAhead) : (0 < nHunksAhead)) {
    BytePack<BytePerPack> acc[Unroll];

    { RedFn preFn(0 < PreOpSrcs ? preOpArgs[0] : 0);
      #pragma unroll Unroll
      for (int u=0; u < Unroll; u++) {
        if (0 < MultimemSrcs) {
          // applyLoadMultimem uses relaxed semantics for same reason we use volatile below.
          acc[u] = applyLoadMultimem<RedFn, BytePerPack>(redFn, minSrcs[0]);
        } else {
          // Use volatile loads in case credits are polled for with volatile (instead of acquire).
          acc[u] = ld_volatile_global<BytePerPack>(minSrcs[0]);
          if (0 < PreOpSrcs) acc[u] = applyPreOp(preFn, acc[u]);
        }
        minSrcs[0] += WARP_SIZE*BytePerPack;
      }
    }
	...
  }

BytePack在这个场景下就是16字节,由一个union描述,acc用于存储reduce结果。

template<>
union alignas(16) BytePack<16> {
  BytePack<8> half[2];
  uint8_t u8[16];
  uint16_t u16[8];
  uint32_t u32[4];
  uint64_t u64[2];
  ulong2 ul2, native;
};

然后开始初始化acc,如果输入中没有multimem,那么会通过ld_volatile_global将第一个蓝框的对应的128b load到acc[0],然后循环Unroll,将所有蓝框的对应数据load到acc,然后看下ld_volatile_global。

#define DEFINE_ld_st_16__space(space, addr_cxx_ty, addr_reg_ty) \
  template<> \
  __device__ __forceinline__ BytePack<16> ld_##space<16>(addr_cxx_ty addr) { \
    BytePack<16> ans; \
    asm("ld." #space ".v2.b64 {%0,%1}, [%2];" : "=l"(ans.u64[0]), "=l"(ans.u64[1]) : #addr_reg_ty(addr)); \
    return ans; \
  } \
  template<> \
  __device__ __forceinline__ BytePack<16> ld_volatile_##space<16>(addr_cxx_ty addr) { \
    BytePack<16> ans; \
    asm("ld.volatile." #space ".v2.b64 {%0,%1}, [%2];" : "=l"(ans.u64[0]), "=l"(ans.u64[1]) : #addr_reg_ty(addr)); \
    return ans; \
  } \
  template<> \
  __device__ __forceinline__ void st_##space<16>(addr_cxx_ty addr, BytePack<16> value) { \
    asm("st." #space ".v2.b64 [%0], {%1,%2};" :: #addr_reg_ty(addr), "l"(value.u64[0]), "l"(value.u64[1]) : "memory"); \
  }
DEFINE_ld_st_16__space(global, uintptr_t, l)

BytePerPack为16,因此会执行ld_volatile_global<16>,其实就是ld.volatile.global.v2.b64将128b load到ans的u64[0]和u64[1]。
而当输入中包括了multimem之后,会通过applyLoadMultimem将数据load并reduce然后存储到acc;st_global<16>就是将BytePack的u64[0]和u64[1]通过st.global.v2.b64存储到addr。

#define SIZEOF_BytePack_field_u32 4
#define PTX_REG_BytePack_field_u32 "r"
DEFINE_Apply_LoadMultimem_sum_v4(float, f32, u32)
#define DEFINE_Apply_LoadMultimem_sum_v4(T, ptx_ty, pack_field) \
  template<> \
  struct Apply_LoadMultimem<FuncSum<T>, 4*(SIZEOF_BytePack_field_##pack_field)> { \
    static constexpr int PackSize = 4*(SIZEOF_BytePack_field_##pack_field); \
    __device__ static BytePack<PackSize> load(FuncSum<T> fn, uintptr_t addr) { \
      BytePack<PackSize> ans; \
      asm("multimem.ld_reduce.relaxed.sys.global.add.v4." #ptx_ty " {%0,%1,%2,%3}, [%4];" \
        : "=" PTX_REG_BytePack_field_##pack_field(ans.pack_field[0]), \
          "=" PTX_REG_BytePack_field_##pack_field(ans.pack_field[1]), \
          "=" PTX_REG_BytePack_field_##pack_field(ans.pack_field[2]), \
          "=" PTX_REG_BytePack_field_##pack_field(ans.pack_field[3]) \
        : "l"(addr)); \
      return ans; \
    } \
  };

可以看到用的是multimem.ld_reduce.relaxed.sys.global.add.v4.f32将对4个float都执行了reduce操作结果存到acc。

完成第一个src的读取之后,继续读取其他的src到tmp,然后通过Apply_Reduce 执行reduce操作,Apply_Reduce其实就是将4个float执行elmentwise的求和。

__device__ __forceinline__ void reduceCopyPacks(...) {
  ...
  while (Unroll==1 ? (BytePerPack <= threadBytesAhead) : (0 < nHunksAhead)) {
    ...
    #pragma unroll (MinSrcs-1 + !(MinSrcs-1))
    for (int s=1; s < MinSrcs; s++) {
      BytePack<BytePerPack> tmp[Unroll];
      RedFn preFn(s < PreOpSrcs ? preOpArgs[s] : 0);
      #pragma unroll Unroll
      for (int u=0; u < Unroll; u++) {
        if (s < MultimemSrcs) {
          // applyLoadMultimem uses relaxed semantics for same reason we use volatile below.
          acc[u] = applyLoadMultimem<RedFn, BytePerPack>(redFn, minSrcs[s]);
        } else {
          // Use volatile loads in case credits are polled for with volatile (instead of acquire).
          tmp[u] = ld_volatile_global<BytePerPack>(minSrcs[s]);
        }
        minSrcs[s] += WARP_SIZE*BytePerPack;
      }
      #pragma unroll Unroll
      for (int u=0; u < Unroll; u++) {
        if (s < PreOpSrcs) tmp[u] = applyPreOp(preFn, tmp[u]);
        acc[u] = applyReduce(redFn, acc[u], tmp[u]);
      }
    }

    for (int s=MinSrcs; (MinSrcs < MaxSrcs) && (s < MaxSrcs) && (s < nSrcs); s++) {
      uintptr_t src = cvta_to_global(srcPtrs[s]) + threadBytesBehind;
      BytePack<BytePerPack> tmp[Unroll];
      RedFn preFn(s < PreOpSrcs ? preOpArgs[s] : 0);
      #pragma unroll Unroll
      for (int u=0; u < Unroll; u++) {
        // Use volatile loads in case credits are polled for with volatile (instead of acquire).
        tmp[u] = ld_volatile_global<BytePerPack>(src);
        src += WARP_SIZE*BytePerPack;
      }
      #pragma unroll Unroll
      for (int u=0; u < Unroll; u++) {
        if (s < PreOpSrcs) tmp[u] = applyPreOp(preFn, tmp[u]);
        acc[u] = applyReduce(redFn, acc[u], tmp[u]);
      }
    }
    ...
  }
  ...
}
template<typename Fn, int EltPerPack>
struct Apply_Reduce {
  template<int Size>
  __device__ static BytePack<Size> reduce(Fn fn, BytePack<Size> a, BytePack<Size> b) {
    a.half[0] = Apply_Reduce<Fn, EltPerPack/2>::reduce(fn, a.half[0], b.half[0]);
    a.half[1] = Apply_Reduce<Fn, EltPerPack/2>::reduce(fn, a.half[1], b.half[1]);
    return a;
  }
};
template<typename T>
struct Apply_Reduce<FuncSum<T>, /*EltPerPack=*/1> {
  __device__ static BytePack<sizeof(T)> reduce(FuncSum<T> fn, BytePack<sizeof(T)> a, BytePack<sizeof(T)> b) {
    return toPack<T>(fromPack<T>(a) + fromPack<T>(b));
  }
};

到这里就拿到了所有输入reduce的结果,然后开始存储到所有的输出dst。

__device__ __forceinline__ void reduceCopyPacks(...) {
  ...
  while (Unroll==1 ? (BytePerPack <= threadBytesAhead) : (0 < nHunksAhead)) {
    ...
    #pragma unroll (MinDsts + !MinDsts)
    for (int d=0; d < MinDsts; d++) {
      #pragma unroll Unroll
      for (int u=0; u < Unroll; u++) {
        if (d < MultimemDsts) {
          multimem_st_global(minDsts[d], acc[u]);
        } else {
          st_global<BytePerPack>(minDsts[d], acc[u]);
        }
        minDsts[d] += WARP_SIZE*BytePerPack;
      }
    }
    for (int d=MinDsts; (MinDsts < MaxDsts) && (d < MaxDsts) && (d < nDsts); d++) {
      uintptr_t dst = cvta_to_global(dstPtrs[d]) + threadBytesBehind;
      #pragma unroll Unroll
      for (int u=0; u < Unroll; u++) {
        st_global<BytePerPack>(dst, acc[u]);
        dst += WARP_SIZE*BytePerPack;
      }
    }
  }
  ...
}  

到这里就完成了reduceCopy的过程,这里再提一下nvls场景的head,tail等flag都是mcBuff,我们看下waitPeer中如何判断是否需要等待的。

  inline __device__ uint64_t loadStepValue(uint64_t* ptr) {
    #if __CUDA_ARCH__ >= 900 && CUDART_VERSION >= 12010
    if (flags & NvlsMinPolling) {
      uint64_t ans;
      asm("multimem.ld_reduce.acquire.sys.global.min.u64 %0, [%1];" : "=l"(ans) : "l"(cvta_to_global(ptr)));
      return ans;
    }   
    #endif
    // volatile is faster than acquire but not as correct. Make sure reduceCopy
    // loads data using volatile so it doesn't see stale data in L1.
    return ld_volatile_global(ptr);
  }

当为nvls场景的时候,flags会有NvlsMinPolling,这里读取使用的是multimem.ld_reduce读取所有peer的step,然后取min,因此知道所有peer都准备好数据时候,才会接收数据,这里使用aquire语义,和postPeer的release配对,保证内存序。

然后看下postPeer

  template<int Recv, int Send>
  inline __device__ void postPeer(bool dataStored) {
    if (flags & (Recv*RolePostRecv | Send*RolePostSend)) {
      step += StepPerSlice;
      if (Send && (flags & RolePostSend) && dataStored) fence_acq_rel_sys();
      st_relaxed_sys_global(connStepPtr, step);
    }
  }

这里会使用非multimem的指令去写mcBuff,PTX手册中说这是一种未定义的行为,但是官方说对于写操作是可以的。

AllReduce

allreduce kernel主要有三种线程组,scatter线程逻辑如下

      using Proto = ProtoSimple<1, 1, COLL_UNROLL>;
      Primitives<T, RedOp, FanAsymmetric<0, NCCL_MAX_NVLS_ARITY>, /*Direct=*/0, Proto, 0>
        prims(tid, nThreadsScatter, NULL, nvls->up, args->sendbuff, NULL,
          args->redOpArg, 0 * Proto::MaxGroupWidth, 1, 1); 
      for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
        ssize_t offset = gridOffset + bid * nvls->nHeads * chunkSize;
        int nelem = args->regUsed ? 0 : min(nvls->nHeads * chunkSize, size - offset);
        prims.scatter(offset, nelem, chunkSize, chunkSize, -1, 0); 
      }   

和ring allreduce一样,nvls一次循环大小为nranks * chunkSize,即变量nlem或者loopSize,scatter线程负责将sendbuff中的数据发送到nvls->up,connIndexSend为1,因此使用第一个send conn。执行完之后如图12所示。
在这里插入图片描述

图 12

reduce线程组逻辑如下所示。
	else if (tid < tidEndReduce && nvls->headRank != -1) {
      if (!hasOut) {
        // Reduce, broadcast through NVLS
        using Proto = ProtoSimple<1, 1, COLL_UNROLL, 1, 1>;
        Primitives<T, RedOp, FanSymmetric<1>, /*Direct=*/1, Proto, 0>
          prims(tid - tidEndGather, nThreadsReduce, &nvls->down, &nvls->down, NULL, NULL,
            args->redOpArg, 2 * Proto::MaxGroupWidth, 0, 0, args);
        for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
          ssize_t offset = gridOffset + (bid * nvls->nHeads + nvls->headRank) * chunkSize;
          int nelem = min(chunkSize, size - offset);
          prims.directRecvDirectSend(offset, offset, nelem);
        }
      }
      ...
    }

这里MultimemSrcs和MultimemDsts为1,使用第0个conn,因此执行directRecvDirectSend之后效果如图13,GPU 0执行流程如黄色箭头,将两卡的浅黄数据块reduce后得到深黄数据块,然后通过multimem.st将数据广播到两卡,同理GPU 1为绿色箭头,将两卡浅蓝数据块reduce后得到深蓝数据块,此时两卡都有了全局数据。
在这里插入图片描述

图 13

最后是gather线程负责将到嗯全局数据拷贝到recvbuff。
    } else if (tid < tidEndGather) {
      // Gather
      using Proto = ProtoSimple<1, 1, COLL_UNROLL>;
      Primitives<T, RedOp, FanAsymmetric<NCCL_MAX_NVLS_ARITY, 0>, /*Direct=*/0, Proto, 0>
        prims(tid - tidEndScatter, nThreadsGather, nvls->up, NULL, NULL, args->recvbuff,
          args->redOpArg, 1 * Proto::MaxGroupWidth, 1, 1); 
      for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
        ssize_t offset = gridOffset + bid * nvls->nHeads * chunkSize;
        int nelem = args->regUsed ? 0 :min(nvls->nHeads * chunkSize, size - offset);
        prims.gather(offset, nelem, chunkSize, chunkSize, -1, 0); 
      }   

gather流程就是ScatterGatherOp执行recv分支,不再赘述,执行后如图14,到这里就完成了allreduce。
在这里插入图片描述

图 14

搜索时带宽

在这里插入图片描述

图 15
前边提到搜索channel时head对应的带宽需要乘2,以图15为例,回顾刚说的Allreduce过程,GPU 0执行ld_reduce,此时带宽消耗如图16

在这里插入图片描述

图 16
然后执行multimem.st,此时产生的带宽如图17,所以head需要2倍带宽。

在这里插入图片描述

图 17