GPU多机多卡通信带宽测试

发布于:2024-05-23 ⋅ 阅读:(161) ⋅ 点赞:(0)

GPU多机多卡通信带宽测试

以下代码用于测试GPU多机多卡通信带宽

1.效果图

在这里插入图片描述

2.代码

tee scan_topo.py <<-'EOF'
import deepspeed
import torch
import torch.distributed as dist
import time
import networkx as nx
import matplotlib.pyplot as plt
import os

import matplotlib
matplotlib.use('Agg')

def initialize_process():
    """
    初始化PyTorch分布式进程组
    """
    deepspeed.init_distributed()
    local_rank=int(os.environ['LOCAL_RANK'])
    torch.cuda.set_device(local_rank)

def bandwidth_test(src, dst, size=102400):
    """
    针对源节点和目标节点进行带宽测试
    """
    tensor = torch.ones(size).to(torch.float32).cuda()

    # 预热
    if dist.get_rank() == src:
        dist.send(tensor, dst)
    elif dist.get_rank() == dst:
        dist.recv(tensor, src)


    # 正式测试
    start = None
    if dist.get_rank() == src or dist.get_rank() == dst:
        start = time.time()
    for _ in range(10):
        if dist.get_rank() == src:
            dist.send(tensor, dst)
        elif dist.get_rank() == dst:
            dist.recv(tensor, src)
    end = None
    if dist.get_rank() == src or dist.get_rank() == dst:
        end = time.time()

    elapsed_time = None
    if start is not None and end is not None:
        elapsed_time = end - start

    elapsed_time_list = [None for _ in range(dist.get_world_size())]
    dist.all_gather_object(elapsed_time_list, elapsed_time)

    elapsed_time = max(list(filter(None, elapsed_time_list)))

    bandwidth = size*4 / elapsed_time / 1e9  # 单位: GB/s
    return bandwidth

def measure_bandwidth_topology(size):
    """
    测量带宽拓扑
    """
    bandwidth_matrix = [[0 for _ in range(size)] for _ in range(size)]
    for src in range(size):
        for dst in range(size):
            if src != dst:
                bandwidth = bandwidth_test(src, dst)
                bandwidth_matrix[src][dst] = bandwidth

    return bandwidth_matrix

def plot_bandwidth_topology(bandwidth_matrix, size):
    """
    绘制带宽拓扑
    """
    G = nx.DiGraph()

    for i in range(size):
        for j in range(size):
            if i != j:
                G.add_edge(i, j, weight=bandwidth_matrix[i][j])

    # 生成颜色列表,将每个节点分配一个唯一的颜色
    colors = plt.cm.tab20(range(size))

    # 手动设置节点位置并稍微打乱以避免节点和标签重叠
    pos = {}
    for i in range(size):
        row = i // 8  # 行号 (0 或 1)
        col = i % 8   # 列号 (0 到 7)
        pos[i] = (col , -row)

    plt.figure(figsize=(20, 10))  # 设置绘图区域大小
    nx.draw(G, pos, with_labels=False, node_size=400, node_color=[colors[i] for i in range(size)], font_weight='bold')

    # 添加边标签的文本块
    for node in range(size):
        edges = [(i, j) for i, j in G.edges() if i == node]
        label_text = "\n".join([f'{i:02d}->{j:02d}: {bandwidth_matrix[i][j]:5.2f}GB/s' for i, j in edges])
        if label_text:
            x, y = pos[node]
            if node<8:
                y+=0.25
            else:
                y-=0.03
            # 在节点正下方显示标签
            plt.text(x, y, label_text, fontsize=8, ha='center', va='top',
                     bbox=dict(facecolor='white', alpha=0, edgecolor='none'),
                     color=colors[node % len(colors)])

    # 添加节点标签
    for node, (x, y) in pos.items():
        plt.text(x, y, str(node), fontsize=10, ha='center',
                 bbox=dict(facecolor='white', alpha=0, edgecolor='none'),
                 color='black')

    plt.savefig('top.png', dpi=600, transparent=False, bbox_inches='tight')

def main():
    """
    主函数:进行分布式带宽测试并生成拓扑图
    """
    initialize_process()
    size = dist.get_world_size()

    # 所有进程进行带宽测试
    bandwidth_matrix = measure_bandwidth_topology(size)

    # 只有主节点(rank 0)生成拓扑图
    if dist.get_rank() == 0:
        plot_bandwidth_topology(bandwidth_matrix, size)

    dist.barrier()  # 确保所有节点同步完成
    dist.destroy_process_group()

if __name__ == "__main__":
    main()

EOF


tee hostfile <<-'EOF'
worker_1 slots=8
worker_2 slots=8
EOF

export NCCL_SOCKET_IFNAME=bond0
export MAX_JOBS=32
export NCCL_DEBUG=ERROR
export NCCL_IB_DISABLE=1
export NVTE_FLASH_ATTN=0
export NVTE_FUSED_ATTN=0
export NCCL_IB_HCA='mlx5_bond_0' 

cat > ds_config.json <<-'EOF'
{
  "train_batch_size": 1,
  "steps_per_print": 1,
  "fp16": {
    "enabled": true,
    "initial_scale_power": 16
  }
}
EOF

deepspeed --hostfile ./hostfile  --no_local_rank \
    --no_python /usr/bin/bash -c 'cd ./ && python3 -u ./scan_topo.py --deepspeed \
    --deepspeed_config=ds_config.json --distributed-backend=nccl'