以下代码用于测试GPU多机多卡通信带宽
1.效果图
2.代码
tee scan_topo.py <<-'EOF'
import deepspeed
import torch
import torch.distributed as dist
import time
import networkx as nx
import matplotlib.pyplot as plt
import os
import matplotlib
matplotlib.use('Agg')
def initialize_process():
"""
初始化PyTorch分布式进程组
"""
deepspeed.init_distributed()
local_rank=int(os.environ['LOCAL_RANK'])
torch.cuda.set_device(local_rank)
def bandwidth_test(src, dst, size=102400):
"""
针对源节点和目标节点进行带宽测试
"""
tensor = torch.ones(size).to(torch.float32).cuda()
# 预热
if dist.get_rank() == src:
dist.send(tensor, dst)
elif dist.get_rank() == dst:
dist.recv(tensor, src)
# 正式测试
start = None
if dist.get_rank() == src or dist.get_rank() == dst:
start = time.time()
for _ in range(10):
if dist.get_rank() == src:
dist.send(tensor, dst)
elif dist.get_rank() == dst:
dist.recv(tensor, src)
end = None
if dist.get_rank() == src or dist.get_rank() == dst:
end = time.time()
elapsed_time = None
if start is not None and end is not None:
elapsed_time = end - start
elapsed_time_list = [None for _ in range(dist.get_world_size())]
dist.all_gather_object(elapsed_time_list, elapsed_time)
elapsed_time = max(list(filter(None, elapsed_time_list)))
bandwidth = size*4 / elapsed_time / 1e9 # 单位: GB/s
return bandwidth
def measure_bandwidth_topology(size):
"""
测量带宽拓扑
"""
bandwidth_matrix = [[0 for _ in range(size)] for _ in range(size)]
for src in range(size):
for dst in range(size):
if src != dst:
bandwidth = bandwidth_test(src, dst)
bandwidth_matrix[src][dst] = bandwidth
return bandwidth_matrix
def plot_bandwidth_topology(bandwidth_matrix, size):
"""
绘制带宽拓扑
"""
G = nx.DiGraph()
for i in range(size):
for j in range(size):
if i != j:
G.add_edge(i, j, weight=bandwidth_matrix[i][j])
# 生成颜色列表,将每个节点分配一个唯一的颜色
colors = plt.cm.tab20(range(size))
# 手动设置节点位置并稍微打乱以避免节点和标签重叠
pos = {}
for i in range(size):
row = i // 8 # 行号 (0 或 1)
col = i % 8 # 列号 (0 到 7)
pos[i] = (col , -row)
plt.figure(figsize=(20, 10)) # 设置绘图区域大小
nx.draw(G, pos, with_labels=False, node_size=400, node_color=[colors[i] for i in range(size)], font_weight='bold')
# 添加边标签的文本块
for node in range(size):
edges = [(i, j) for i, j in G.edges() if i == node]
label_text = "\n".join([f'{i:02d}->{j:02d}: {bandwidth_matrix[i][j]:5.2f}GB/s' for i, j in edges])
if label_text:
x, y = pos[node]
if node<8:
y+=0.25
else:
y-=0.03
# 在节点正下方显示标签
plt.text(x, y, label_text, fontsize=8, ha='center', va='top',
bbox=dict(facecolor='white', alpha=0, edgecolor='none'),
color=colors[node % len(colors)])
# 添加节点标签
for node, (x, y) in pos.items():
plt.text(x, y, str(node), fontsize=10, ha='center',
bbox=dict(facecolor='white', alpha=0, edgecolor='none'),
color='black')
plt.savefig('top.png', dpi=600, transparent=False, bbox_inches='tight')
def main():
"""
主函数:进行分布式带宽测试并生成拓扑图
"""
initialize_process()
size = dist.get_world_size()
# 所有进程进行带宽测试
bandwidth_matrix = measure_bandwidth_topology(size)
# 只有主节点(rank 0)生成拓扑图
if dist.get_rank() == 0:
plot_bandwidth_topology(bandwidth_matrix, size)
dist.barrier() # 确保所有节点同步完成
dist.destroy_process_group()
if __name__ == "__main__":
main()
EOF
tee hostfile <<-'EOF'
worker_1 slots=8
worker_2 slots=8
EOF
export NCCL_SOCKET_IFNAME=bond0
export MAX_JOBS=32
export NCCL_DEBUG=ERROR
export NCCL_IB_DISABLE=1
export NVTE_FLASH_ATTN=0
export NVTE_FUSED_ATTN=0
export NCCL_IB_HCA='mlx5_bond_0'
cat > ds_config.json <<-'EOF'
{
"train_batch_size": 1,
"steps_per_print": 1,
"fp16": {
"enabled": true,
"initial_scale_power": 16
}
}
EOF
deepspeed --hostfile ./hostfile --no_local_rank \
--no_python /usr/bin/bash -c 'cd ./ && python3 -u ./scan_topo.py --deepspeed \
--deepspeed_config=ds_config.json --distributed-backend=nccl'