模型部署与推理--利用python版本onnxruntime模型部署与推理

发布于:2025-07-02 ⋅ 阅读:(25) ⋅ 点赞:(0)


如何在Windows中利用python版本的onnxruntime部署pytorch模型?
部署的目的是为了脱离原有的训练环境,让模型在生产环境能够运行,并且加速。
我这里以deeplabv3+为例。

第一步:

在pytorch训练环境中安装onnx

pip install onnx

第二步:

利用pytorch导出onnx格式:

# 导出训练好的模型,以供后续部署
import os
import argparse
import torch
from modeling.deeplab import DeepLab#换成自己的模型
if __name__ == '__main__':    
    # 读取模型pth文件
    model_path = r'F:\dataset\CoverSegData_train_result\deeplab-resnet-20250606\model_best.pth.tar'
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Model file {model_path} does not exist.")
    model = DeepLab(num_classes=2, backbone='resnet', output_stride=16, sync_bn=False, freeze_bn=False)
    checkpoint = torch.load(model_path, weights_only=False)
    model.load_state_dict(checkpoint['state_dict'])
    model.eval()  # 设置模型为评估模式
    # 导出为torchscript格式
    dumpy = torch.randn(1, 3, 1024, 1024)  # 假设输入是1张1024x1024的RGB图像
    # 使用torch.jit.trace进行模型跟踪
    trace_model = torch.jit.trace(model, dumpy)   
    export_path = r'F:\dataset\CoverSegData_train_result\deeplab-resnet-20250606\deeplab_resnet_exported.pt'
    trace_model.save(export_path)
    print(f"Model exported to {export_path}")
    # 导出为ONNX格式
    onnx_export_path = r'F:\dataset\CoverSegData_train_result\deeplab-resnet-20250606\deeplab_resnet_exported.onnx'
    torch.onnx.export(model,
                      dumpy,
                      onnx_export_path,
                      export_params=True,
                      opset_version=11,
                      do_constant_folding=True,
                      input_names=['input'],
                      output_names=['output'])
    print(f"Model exported to {onnx_export_path}")

第三步:

conda新建一个推理环境,并安装onnxruntime

conda create -n ort_env python=3.10

第四步:

在ort_env环境中安装onnxruntime
如果安装cpu版本:

pip install onnxruntime

如果安装gpu版本:

pip install onnxruntime-gpu

第五步:

如果安装的cpu版本跳过这一步。
安装cuda和cudnn。

win+R->cmd窗口,查询Windows系统中的cuda和cudnn
1、查询cuda路径:

echo %CUDA_PATH%

这一条命令会找到你的cuda安装的位置。我的电脑显示E:\nvidia\cuda12.2。我下面就以我的路径为例,自己电脑换成自己的路径即可
表示我安装的是cuda12.2,也可以通过nvidia-smi查看。
如果找不到目录,去官网下载安装cuda
重启电脑,这时候重新运行上面的命令,如果还是找不到cuda目录,在系统环境变量->path中添加:CUDA_PATH,E:\nvidia\cuda12.2,重启电脑生效。
2、查询cudnn位置

where cudart64_*.dll

这一条命令会找到你的cudnn安装的位置。cudnn的目录一定在cuda\bin里面。我这里输出的是E:\nvidia\cuda12.2\bin\cudnn64_9.dll。
如果找不到目录,官网下载安装cudnn.安装完cudnn之后,把cudnn目录复制到cuda目录下面:

复制 bin\*     到 E:\nvidia\cuda12.2\bin
复制 include\* 到 E:\nvidia\cuda12.2\include
复制 lib\*     到 E:\nvidia\cuda12.2\lib\x64

第六步

在推理阶段一定要保证和导出onnx输入的照片(张量变量:dumpy)通道和尺寸一样!
核心推理代码:

      if self.use_gpu and 'CUDAExecutionProvider' in ort.get_available_providers():
            providers = ['CUDAExecutionProvider']
            self.provider = 'GPU'
        else:
            providers = ['CPUExecutionProvider']
            self.provider = 'CPU'
            
        print(f"使用提供者: {self.provider}")
        # 创建会话选项
        session_options = ort.SessionOptions()
        session_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
        
        
        self.session = ort.InferenceSession
                self.onnx_path, 
                providers=providers,
                sess_options=session_options
            


   if self.provider == 'GPU':
            # 使用 IO 绑定优化 GPU 推理
            io_binding = self.session.io_binding()
            
            # 绑定输入到 GPU
            input_tensor = ort.OrtValue.ortvalue_from_numpy(input_img, 'cuda', 0)
            io_binding.bind_input(
                self.input_name, 
                'cuda', 
                0, 
                np.float32, 
                input_img.shape, 
                input_tensor.data_ptr()
            )
            
            output_shape = (1, 2, self.input_shape[2], self.input_shape[3])
            output_array = np.empty(output_shape, dtype=np.float32)
            output_ortvalue = ort.OrtValue.ortvalue_from_numpy(output_array, 'cuda', 0)

            # 绑定输出
            io_binding.bind_ortvalue_output(self.output_name, output_ortvalue)  # 更简洁的新API

            # 运行推理
            self.session.run_with_iobinding(io_binding)
            
            # 获取输出
           # outputs = output_tensor.numpy()
            outputs = output_ortvalue.numpy()
        else:
            # 普通 CPU 推理
            outputs = self.session.run([self.output_name], {self.input_name: input_img})[0]

因为推理的第一张照片需要编译模型并且拷贝显存,所以不能反应真实的推理时间,所以在我的代码中支持输入文件夹批量推理,并对比cpu和gpu推理的时间(去掉第一张照片),代码如下:

import onnxruntime as ort
import numpy as np
import time
import cv2
import os
import glob
from tqdm import tqdm

def preprocess_image(img, target_size=(1024, 1024)):
    """预处理图像以匹配模型输入要求"""
    # 如果输入是文件路径,则读取图像
    if isinstance(img, str):
        img = cv2.imread(img)
        if img is None:
            raise ValueError(f"无法读取图像: {img}")
    
    # 确保图像是 NumPy 数组
    if not isinstance(img, np.ndarray):
        raise TypeError("输入必须是文件路径或 NumPy 数组")
    
    # 转换颜色空间 (OpenCV 默认是 BGR)
    if len(img.shape) == 3 and img.shape[2] == 3:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    
    # 调整大小
    img = cv2.resize(img, target_size)
    
    # 归一化 (根据模型训练时的预处理)
    img = img.astype(np.float32) / 255.0
    
    # 转换为 CHW 格式 [Height, Width, Channel] -> [Channel, Height, Width]
    if len(img.shape) == 3:
        img = np.transpose(img, (2, 0, 1))
    
    # 添加批次维度 [Channel, Height, Width] -> [Batch, Channel, Height, Width]
    img = np.expand_dims(img, axis=0)
    
    return img

def visualize_segmentation(output, original_img, class_colors=None):
    """可视化分割结果"""
    if class_colors is None:
        # 默认颜色映射 (背景, 前景)
        class_colors = [(0, 0, 0), (255, 0, 0)]  # 黑色背景,红色前景
    
    # 获取预测类别
    pred_class = np.argmax(output[0], axis=0).astype(np.uint8)
    
    # 创建彩色分割图
    seg_image = np.zeros((pred_class.shape[0], pred_class.shape[1], 3), dtype=np.uint8)
    for class_idx, color in enumerate(class_colors):
        seg_image[pred_class == class_idx] = color
    
    # 如果原始图像是预处理过的,还原它
    if original_img.shape[0] == 3 or original_img.shape[0] == 1:  # [C, H, W]
        orig_image = (np.transpose(original_img, (1, 2, 0)) * 255).astype(np.uint8)
    else:  # [H, W, C]
        orig_image = (original_img * 255).astype(np.uint8)
    
    # 叠加原图与分割结果
    overlay = cv2.addWeighted(orig_image, 0.7, seg_image, 0.3, 0)
    
    # 创建纯分割掩码
    mask = seg_image
    
    return overlay, mask

class ONNXModelInferencer:
    def __init__(self, onnx_path, use_gpu=True):
        """初始化 ONNX 模型推理器"""
        self.onnx_path = onnx_path
        self.use_gpu = use_gpu
        self.session = None
        self.input_name = None
        self.output_name = None
        self.input_shape = None
        self.provider = None
        
        self._initialize_session()
    
    def _initialize_session(self):
        """初始化 ONNX Runtime 会话"""
        # 选择提供者
        if self.use_gpu and 'CUDAExecutionProvider' in ort.get_available_providers():
            providers = ['CUDAExecutionProvider']
            self.provider = 'GPU'
        else:
            providers = ['CPUExecutionProvider']
            self.provider = 'CPU'
            
        print(f"使用提供者: {self.provider}")
        # 创建会话选项
        session_options = ort.SessionOptions()
        session_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
        
        try:
            self.session = ort.InferenceSession(
                self.onnx_path, 
                providers=providers,
                sess_options=session_options
            )
            
            # 获取输入输出信息
            self.input_name = self.session.get_inputs()[0].name
            input_info = self.session.get_inputs()[0]
            self.input_shape = input_info.shape  # [batch, channels, height, width]
            self.output_name = self.session.get_outputs()[0].name
            
            print(f" ONNX 模型加载成功 (提供者: {self.provider})")
            print(f"输入名称: {self.input_name}, 形状: {self.input_shape}")
            print(f"输出名称: {self.output_name}")
            
        except Exception as e:
            print(f" 无法创建 ONNX 会话: {e}")
            raise
    
    def infer_image(self, image_input, return_raw=False):
        """对单个图像进行推理"""
        # 预处理图像
        if isinstance(image_input, str):  # 文件路径
            img = cv2.imread(image_input)
            if img is None:
                raise ValueError(f"无法读取图像: {image_input}")
            original_img = img.copy()
        else:  # NumPy 数组
            original_img = image_input.copy()
        
        # 预处理图像
        input_img = preprocess_image(
            original_img, 
            target_size=(self.input_shape[3], self.input_shape[2])  # (width, height)
        )
        
        # 运行推理
        start_time = time.time()
        
        if self.provider == 'GPU':
            # 使用 IO 绑定优化 GPU 推理
            io_binding = self.session.io_binding()
            
            # 绑定输入到 GPU
            input_tensor = ort.OrtValue.ortvalue_from_numpy(input_img, 'cuda', 0)
            io_binding.bind_input(
                self.input_name, 
                'cuda', 
                0, 
                np.float32, 
                input_img.shape, 
                input_tensor.data_ptr()
            )
            
            # 绑定输出到 GPU
            # output_shape = (1, 2, self.input_shape[2], self.input_shape[3])  # [1, num_classes, H, W]
            # output_tensor = ort.OrtValue.ortvalue_from_shape(output_shape, np.float32, 'cuda', 0)
            # io_binding.bind_output(
            #     self.output_name, 
            #     'cuda', 
            #     0, 
            #     np.float32, 
            #     output_shape, 
            #     output_tensor.data_ptr()
            # )
            output_shape = (1, 2, self.input_shape[2], self.input_shape[3])
            output_array = np.empty(output_shape, dtype=np.float32)
            output_ortvalue = ort.OrtValue.ortvalue_from_numpy(output_array, 'cuda', 0)

            # 绑定输出
            io_binding.bind_ortvalue_output(self.output_name, output_ortvalue)  # 更简洁的新API

            # 运行推理
            self.session.run_with_iobinding(io_binding)
            
            # 获取输出
           # outputs = output_tensor.numpy()
            outputs = output_ortvalue.numpy()
        else:
            # 普通 CPU 推理
            outputs = self.session.run([self.output_name], {self.input_name: input_img})[0]
        
        inference_time = time.time() - start_time
        
        if return_raw:
            return outputs, inference_time
        
        # 可视化结果
        overlay, mask = visualize_segmentation(outputs, original_img)
        
        return overlay, mask, inference_time
    
    def batch_infer_folder(self, input_folder, output_folder, save_overlay=True, save_mask=True):
        """批量处理整个文件夹中的图像"""
        # 创建输出文件夹
        os.makedirs(output_folder, exist_ok=True)
        
        # 获取所有支持的图像文件
        image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.tiff']
        image_paths = []
        for ext in image_extensions:
            image_paths.extend(glob.glob(os.path.join(input_folder, ext)))
        
        if not image_paths:
            print(f" 在文件夹 {input_folder} 中未找到图像文件")
            return 0  # 返回0表示没有处理任何图像
        
        print(f" 找到 {len(image_paths)} 张图像进行处理")
        
        # 处理统计
        total_time = 0
        min_time = float('inf')
        max_time = 0
        processed_count = 0  # 记录成功处理的图像数量
        
        # 处理每张图像
        
        for img_path in tqdm(image_paths, desc=f"处理图像 ({self.provider})"):
            try:
                # 推理图像
                overlay, mask, infer_time = self.infer_image(img_path)
                
                # 更新计时统计
                # 不统计第一张照片,因为第一张照片涉及编译和内存分配等开销
                if processed_count != 0:
                    total_time += infer_time
                processed_count += 1
                min_time = min(min_time, infer_time)
                max_time = max(max_time, infer_time)
                
                
                # 准备输出路径
                filename = os.path.basename(img_path)
                name, ext = os.path.splitext(filename)
                
                # 保存结果
                if save_overlay:
                    overlay_bgr = cv2.cvtColor(overlay, cv2.COLOR_RGB2BGR)
                    overlay_path = os.path.join(output_folder, f"{name}_overlay{ext}")
                    cv2.imwrite(overlay_path, overlay_bgr)
                
                if save_mask:
                    mask_bgr = cv2.cvtColor(mask, cv2.COLOR_RGB2BGR)
                    mask_path = os.path.join(output_folder, f"{name}_mask{ext}")
                    cv2.imwrite(mask_path, mask_bgr)
               
                
            except Exception as e:
                print(f" 处理 {img_path} 时出错: {str(e)}")
        
        # 打印统计信息
        print("\n" + "="*50)
        print(f"处理完成 ({self.provider})")
        print(f"总图像数: {len(image_paths)}")
        print(f"成功处理: {processed_count} 张图像")
        
        if processed_count > 0:
            avg_time = total_time / (processed_count-1)
            print(f"总推理时间: {total_time:.4f} 秒")
            print(f"平均推理时间: {avg_time:.4f} 秒/图像")
            print(f"最短推理时间: {min_time:.4f} 秒")
            print(f"最长推理时间: {max_time:.4f} 秒")
            print(f"速度: {(processed_count-1)/total_time:.2f} 图像/秒")
        else:
            print(" 没有图像被成功处理")
        
        print("="*50)
        
        return total_time

def compare_gpu_cpu_performance(onnx_path, input_folder, output_base_folder):
    """比较 GPU 和 CPU 性能"""
    # 创建 GPU 推理器
    print("\n" + "="*50)
    print("初始化 GPU 推理器")
    print("="*50)
    gpu_inferencer = ONNXModelInferencer(onnx_path, use_gpu=True)
    
    # 创建 CPU 推理器
    print("\n" + "="*50)
    print("初始化 CPU 推理器")
    print("="*50)
    cpu_inferencer = ONNXModelInferencer(onnx_path, use_gpu=False)
    
    # 创建输出文件夹
    gpu_output_folder = os.path.join(output_base_folder, "gpu_results")
    cpu_output_folder = os.path.join(output_base_folder, "cpu_results")
    os.makedirs(gpu_output_folder, exist_ok=True)
    os.makedirs(cpu_output_folder, exist_ok=True)
    
    # 运行 GPU 推理
    print("\n" + "="*50)
    print("开始 GPU 批量推理")
    print("="*50)
    gpu_time = gpu_inferencer.batch_infer_folder(input_folder, gpu_output_folder)
    
    # 运行 CPU 推理
    print("\n" + "="*50)
    print("开始 CPU 批量推理")
    print("="*50)
    cpu_time = cpu_inferencer.batch_infer_folder(input_folder, cpu_output_folder)
    
    # 性能比较 - 只有当两者都成功处理了图像时才进行比较
    if gpu_time is not None and cpu_time is not None and gpu_time > 0 and cpu_time > 0:
        print("\n" + "="*50)
        print("性能比较结果")
        print("="*50)
        print(f"GPU 总时间: {gpu_time:.4f} 秒")
        print(f"CPU 总时间: {cpu_time:.4f} 秒")
        print(f"加速比: {cpu_time/gpu_time:.2f}x")
        print("="*50)
    else:
        print("\n 无法比较性能,因为一个或多个设备没有处理任何图像")

def batch_infer_all_images(onnx_path, input_folder, output_folder, use_gpu=True):
    """批量处理整个文件夹中的图像(单一设备)"""
    # 创建推理器
    print("\n" + "="*50)
    device_name = "GPU" if use_gpu else "CPU"
    print(f"初始化 {device_name} 推理器")
    print("="*50)
    inferencer = ONNXModelInferencer(onnx_path, use_gpu=use_gpu)
    
    # 运行推理
    print("\n" + "="*50)
    print(f"开始 {device_name} 批量推理")
    print("="*50)
    inferencer.batch_infer_folder(input_folder, output_folder)

if __name__ == "__main__":
    # 配置参数
    onnx_model_path = r'F:\dataset\CoverSegData_train_result\deeplab-resnet-20250606\deeplab_resnet_exported.onnx'
    input_image_folder = r'F:\dataset\test'  # 替换为你的图像文件夹路径
    output_base_folder = r'F:\dataset\test_results'  # 结果输出文件夹
    
    # 检查 ONNX 文件是否存在
    if not os.path.exists(onnx_model_path):
        raise FileNotFoundError(f"ONNX 模型文件 {onnx_model_path} 不存在")
    
    # 检查输入文件夹是否存在
    if not os.path.exists(input_image_folder):
        raise FileNotFoundError(f"输入文件夹 {input_image_folder} 不存在")
    
    # 创建输出基础文件夹
    os.makedirs(output_base_folder, exist_ok=True)
    
    # 选择运行模式
    mode = input("请选择运行模式:\n1. 比较 GPU 和 CPU 性能\n2. 仅使用 GPU 推理\n3. 仅使用 CPU 推理\n> ")
    
    if mode == '1':
        # 比较 GPU 和 CPU 性能
        compare_gpu_cpu_performance(onnx_model_path, input_image_folder, output_base_folder)
    elif mode == '2':
        # 仅使用 GPU 推理
        gpu_output_folder = os.path.join(output_base_folder, "gpu_only")
        batch_infer_all_images(onnx_model_path, input_image_folder, gpu_output_folder, use_gpu=True)
    elif mode == '3':
        # 仅使用 CPU 推理
        cpu_output_folder = os.path.join(output_base_folder, "cpu_only")
        batch_infer_all_images(onnx_model_path, input_image_folder, cpu_output_folder, use_gpu=False)
    else:
        print(" 无效的选择,请重新运行程序并输入 1, 2 或 3")

结果:

PS F:\python_projects\deeplab_deploy> & E:/anaconda3/python.exe f:/python_projects/deeplab_deploy/test2.py
请选择运行模式:
1. 比较 GPU 和 CPU 性能
2. 仅使用 GPU 推理
3. 仅使用 CPU 推理
> 1

==================================================
初始化 GPU 推理器
==================================================
使用提供者: GPU
ONNX 模型加载成功 (提供者: GPU)
输入名称: input, 形状: [1, 3, 1024, 1024]
输出名称: output

==================================================
初始化 CPU 推理器
==================================================
使用提供者: CPU
ONNX 模型加载成功 (提供者: CPU)
输入名称: input, 形状: [1, 3, 1024, 1024]
输出名称: output

性能比较结果
==================================================
GPU 总时间: 0.4545 秒
CPU 总时间: 6.2390 秒
加速比: 13.73x
==================================================

可以看出来速度提高了13倍。
参考:
https://onnxruntime.ai/docs/get-started/with-python.html
https://onnxruntime.ai/docs/execution-providers/CUDA-ExecutionProvider.html#requirements


网站公告

今日签到

点亮在社区的每一天
去签到