1. 训练MNIST模型
import torch #导入pytorch 核心库
import torch.nn as nn #神经网络模块,如卷积层
import torch.optim as optim #优化器
from torchvision import datasets, transforms #数据集与图像预处理工具
#定义CNN模型
class SimpleCNN(nn.Module): #PyTorch 库中所有神经网络的 “基础模板”,我们的网络继承它的功能(比如训练、保存等)。 作用:声明这是一个神经网络类。
def __init__(self): #定义构造函数,和self(我自己)实例,用于初始化各层结构
super(SimpleCNN, self).__init__() #super调用父类功能。作用:让我们的网络继承nn.Module的所有功能
# 第一层卷积 + ReLU + 最大池化
self.conv1 = nn.Conv2d(1, 8, kernel_size=3, stride=1, padding=1) #nn.Conv2d:PyTorch 提供的卷积层工具(处理 2D 图像的卷积),功能就是找特征
self.relu1 = nn.ReLU() #nn.ReLU:一种 “激活函数”,作用是 “筛选有用特征”。简单理解:把负数变成 0,正数保留(类似 “只保留重要信息”)。
self.pool1 = nn.MaxPool2d(kernel_size=2) #池化层,作用是 “压缩图片”。图片尺寸缩小一半(28×28 → 14×14),减少计算量
# 第二层卷积 + ReLU + 最大池化
self.conv2 = nn.Conv2d(8, 16, kernel_size=3, stride=1, padding=1)
self.relu2 = nn.ReLU()
self.pool2 = nn.MaxPool2d(kernel_size=2)
# 全连接层
self.fc1 = nn.Linear(16 * 7 * 7, 64) # 16个7x7特征图 -> 64个神经元 ,类似 “汇总特征做决策”,作用:把前面提取的空间特征,转化为一维的 “特征向量”。
self.relu3 = nn.ReLU()
self.fc2 = nn.Linear(64, 10) # 64 -> 10个类别(0-9),输出 10 个结果,对应数字 0-9(告诉我们图片可能是哪个数字)
#定义前向传播路径
def forward(self, x):
x = self.pool1(self.relu1(self.conv1(x))) # 第一层卷积+激活+池化
x = self.pool2(self.relu2(self.conv2(x))) # 第二层卷积+激活+池化
# 将多维张量展平为一维向量(batch_size, 16*7*7)
x = x.view(-1, 16 * 7 * 7)
# 全连接层分类
x = self.relu3(self.fc1(x))
x = self.fc2(x)
return x
#数据预处理与加载
def main():
# 数据预处理:转为Tensor并归一化
# transforms.Compose把多个数据处理步骤 “打包成流水线”,图片会按顺序依次经过这些步骤。
transform = transforms.Compose([
transforms.ToTensor(), # 转为Tensor (0-1范围),包括把像素值转为浮点数
transforms.Normalize((0.1307,), (0.3081,)) # 归一化到均值0.1307,标准差0.3081,对像素值做 “标准化处理”,让数据分布更稳定,加速模型训练。让像素值分布在 0 附近(大部分在 -1 到 1 之间),避免某些过大的像素值干扰模型学习。
])
# 加载训练集(自动下载到./data目录)
train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
# 初始化模型、损失函数和优化器
model = SimpleCNN() # 创建学生(神经网络)
criterion = nn.CrossEntropyLoss() # 评分标准(交叉熵损失函数)
optimizer = optim.Adam(model.parameters(), lr=0.001) # 学习方法(Adam优化器,学习率0.001)
# 训练模型
for epoch in range(5): # 让学生学习5个学期(遍历整个数据集5次)
model.train() # 开启训练模式
total_loss = 0 # 记录这学期的总错误
for batch_idx, (data, target) in enumerate(train_loader):# 每次取64张图片(一个批次)
optimizer.zero_grad() # 清空之前的“错误记录”(避免累积)
output = model(data) # 让模型预测图片对应的数字(学生做题)
loss = criterion(output, target) # 计算预测错误(评分)
loss.backward() # 反向传播,计算“错误梯度”(分析错题,找出需要改进的地方)
optimizer.step() # 根据梯度调整模型参数(学生改正错误,调整学习方法)
total_loss += loss.item() # 累加当前批次的损失
print(f'Epoch {epoch+1}, Loss: {total_loss/len(train_loader)}')
# 保存模型
torch.save(model.state_dict(), 'simple_cnn_mnist.pth') # 保存模型参数
print("模型已保存为: simple_cnn_mnist.pth")
2. 进行模型量化部署
import torch
import torch.nn as nn
import numpy as np # 用于处理数组和矩阵,并将数据保存为 CSV 文件
# 先定义模型结构(和训练时的SimpleCNN完全一致)
class SimpleCNN(nn.Module):
def __init__(self):
super(SimpleCNN, self).__init__()
self.conv1 = nn.Conv2d(1, 8, kernel_size=3, stride=1, padding=1)
self.relu1 = nn.ReLU()
self.pool1 = nn.MaxPool2d(kernel_size=2)
self.conv2 = nn.Conv2d(8, 16, kernel_size=3, stride=1, padding=1)
self.relu2 = nn.ReLU()
self.pool2 = nn.MaxPool2d(kernel_size=2)
self.fc1 = nn.Linear(16 * 7 * 7, 64)
self.relu3 = nn.ReLU()
self.fc2 = nn.Linear(64, 10)
def forward(self, x):
x = self.pool1(self.relu1(self.conv1(x)))
x = self.pool2(self.relu2(self.conv2(x)))
x = x.view(-1, 16 * 7 * 7)
x = self.relu3(self.fc1(x))
x = self.fc2(x)
return x
# 加载模型
model = SimpleCNN()
model.load_state_dict(torch.load('simple_cnn_mnist.pth')) # 加载之前保存的模型
model.eval() # 切换到评估模式
# 量化模型,将 32 位浮点数(如 0.1234)压缩为 8 位整数(如 123),大幅减少模型大小和计算量。
quantized_model = torch.quantization.quantize_dynamic(
model, {nn.Conv2d, nn.Linear}, dtype=torch.qint8
)
# 导出权重和偏置
def export_weights(model, filename):
weights = {}
for name, param in model.named_parameters():
if isinstance(param, torch.Tensor):
weights[name] = param.detach().numpy() # 转为NumPy数组
# 保存为CSV格式
for name, data in weights.items():
np.savetxt(f'{filename}_{name}.csv', data.flatten(), delimiter=',')
export_weights(quantized_model, 'quantized_weights')
print("量化后的权重已导出为CSV文件")
用 “整理零件” 类比整个过程
假设你有一个 “机器人模型”(神经网络),要把它的所有零件拆下来,分类整理到不同的盒子里:
准备工作:
def export_weights(model, filename)
:设计一个 “整理机器人零件” 的流程。
weights = {}
:准备一个空箱子,用来存放所有零件。拆卸零件:
for name, param in model.named_parameters():
:逐个拆卸机器人的零件(卷积核、连接线等)。
weights[name] = param.detach().numpy()
:给每个零件贴上标签(如 “左眼”“右臂”),并放进箱子。装箱保存:
for name, data in weights.items():
:从箱子里取出每个零件。
np.savetxt(...)
:把零件的详细信息(尺寸、材质等)写在纸上,装进对应的小盒子(CSV 文件)。完成通知:
print("量化后的权重已导出为CSV文件")
:告诉你 “所有零件都已整理好,可以使用了”。
3. 进行FPGA卷积层加速设计
//=====================================================================
// 模块名称:conv_layer
// 功能描述:单通道卷积层硬件加速器
//=====================================================================
module conv_layer #(
parameter DATA_WIDTH = 8 // 数据位宽
) (
input wire clk, // 时钟信号
input wire rst_n, // 复位信号(低电平有效)
input wire start, // 启动信号
// 将3×3数组拆解为9个独立端口
input wire [DATA_WIDTH-1:0] data_in_00, data_in_01, data_in_02,
input wire [DATA_WIDTH-1:0] data_in_10, data_in_11, data_in_12,
input wire [DATA_WIDTH-1:0] data_in_20, data_in_21, data_in_22,
input wire [DATA_WIDTH-1:0] kernel_00, kernel_01, kernel_02,
input wire [DATA_WIDTH-1:0] kernel_10, kernel_11, kernel_12,
input wire [DATA_WIDTH-1:0] kernel_20, kernel_21, kernel_22,
output reg [DATA_WIDTH*2-1:0] result, // 卷积结果
output reg done // 计算完成标志
);
// 内部信号
reg [DATA_WIDTH*2-1:0] sum;
always @(posedge clk or negedge rst_n) begin
if (!rst_n) begin
sum <= 0;
result <= 0;
done <= 0;
end else if (start) begin
// 手动展开3×3卷积计算
sum <= data_in_00*kernel_00 + data_in_01*kernel_01 + data_in_02*kernel_02 +
data_in_10*kernel_10 + data_in_11*kernel_11 + data_in_12*kernel_12 +
data_in_20*kernel_20 + data_in_21*kernel_21 + data_in_22*kernel_22;
result <= sum;
done <= 1;
end
end
endmodule
然后封装成AXI IP核,生成比特流文件,进行hardware的export
4. VITIS端控制
将项目导入VITIS中,进行PS端开发控制PL
#include "xil_printf.h"
#include "xparameters.h"
#include "xil_io.h"
#define CONV_LAYER_BASEADDR XPAR_CONV_LAYER_IP_0_S00_AXI_BASEADDR
#define CONV_LAYER_CTRL_ADDR (CONV_LAYER_BASEADDR + 0x00)
#define CONV_LAYER_INPUT_ADDR (CONV_LAYER_BASEADDR + 0x10)
#define CONV_LAYER_OUTPUT_ADDR (CONV_LAYER_BASEADDR + 0x18)
int main() {
print("MNIST FPGA Accelerator Demo\n");
// 配置输入/输出地址
Xil_Out32(CONV_LAYER_INPUT_ADDR, 0x10000000); // 输入缓冲区地址
Xil_Out32(CONV_LAYER_OUTPUT_ADDR, 0x10010000); // 输出缓冲区地址
// 启动加速器
Xil_Out32(CONV_LAYER_CTRL_ADDR, 0x01);
// 等待完成
while((Xil_In32(CONV_LAYER_CTRL_ADDR) & 0x04) == 0);
print("推理完成!\n");
return 0;
}
最后进行运行和调试即可