我想开发一个基于深度学习的分类小软件,逐渐了解到了TensorRT在模型推理速度上的优势,经过一下午资料的查找实现了将onnx模型转为TensorRT格式模型的推理及测试过程。将实现过程记录下来方便日后查看。
本文实验设备是MX350显卡 2G显存
一 、安装TensorRT
点击TensorRT下载链接,选择合适的TensorRT版本下载,读者选择使用TensorRT进行推理,默认已经配置好cuda和cudnn环境,如果没配置好请移步这篇博客Windows配置深度学习环境(从查询合适的torch版本开始)——torch+CUDA+cuDNN
TensorRT与cuda版本对应方式查看如下:
点击TensorRT版本
点击同意
点击版本号
查看cuda版本是否符合你设备,点击下载即可
二、环境配置
- 下载后得到文件结构如下所示
- 添加环境变量,右键此电脑点击属性,根据图中序号依次点击并添加环境变量
我的环境变量如下所示
D:\Software\TensorRT-8.6.1.6\lib
D:\Software\TensorRT-8.6.1.6\bin
三、模型转换
打开命令行窗口,切换到D:\Software\TensorRT-8.6.1.6\bin目录,执行如下命令
trtexec --onnx=mymodel.onnx --saveEngine=model.trt --fp16
这里的–fp16应该也可以改成int8,但是精度损失会有点大,我没有实验
这个mymodel.onnx需要你自己的onnx文件名,这个model.trt 就随便起名字了
如下图所示为转换成功
四、TensorRT与ONNX推理速度与精度测试
推理时间测试
- TensorRT推理时间测试代码
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
import cv2
import os
import numpy as np
import time
from typing import Tuple
class TensorRTPredictor:
def __init__(self, engine_path: str):
"""初始化TensorRT预测器"""
self.logger = trt.Logger(trt.Logger.WARNING)
self.engine = self._load_engine(engine_path)
self.context = self.engine.create_execution_context()
self.input_shape = tuple(self.engine.get_tensor_shape(self.engine.get_tensor_name(0)))
self.output_shape = tuple(self.engine.get_tensor_shape(self.engine.get_tensor_name(1)))
self.is_warmed_up = False
def _load_engine(self, engine_path: str) -> trt.ICudaEngine:
"""加载TensorRT引擎"""
load_start_time = time.time()
with open(engine_path, "rb") as f, trt.Runtime(self.logger) as runtime:
engine = runtime.deserialize_cuda_engine(f.read())
load_end_time = time.time()
load_time = (load_end_time - load_start_time) * 1000
print(f"加载引擎时间: {load_time:.2f} ms")
return engine
def preprocess_image(self, image_path: str) -> np.ndarray:
"""图像预处理"""
preprocess_start_time = time.time()
if not os.path.exists(image_path):
raise FileNotFoundError(f"图像文件不存在: {os.path.abspath(image_path)}")
image = cv2.imread(image_path)
if image is None:
raise ValueError("无法读取图像,请检查文件格式和完整性")
try:
image = cv2.resize(image, (224, 224))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = np.ascontiguousarray(image.transpose(2, 0, 1).astype(np.float32) / 255.0)
mean = np.array([0.362, 0.279, 0.258]).reshape(3, 1, 1)
std = np.array([0.222, 0.191, 0.185]).reshape(3, 1, 1)
image = (image - mean) / std
except Exception as e:
raise RuntimeError(f"图像预处理失败: {str(e)}")
preprocess_end_time = time.time()
preprocess_time = (preprocess_end_time - preprocess_start_time) * 1000
print(f" 预处理时间: {preprocess_time:.2f} ms")
return image
def warmup(self, iterations: int = 10):
"""模型预热"""
if self.is_warmed_up:
print("模型已经预热,跳过预热步骤")
return
warmup_start_time = time.time()
input_size = int(np.prod(self.input_shape)) * np.float32().itemsize
output_size = int(np.prod(self.output_shape)) * np.float32().itemsize
d_input = cuda.mem_alloc(input_size)
d_output = cuda.mem_alloc(output_size)
stream = cuda.Stream()
dummy_input = np.random.rand(*self.input_shape).astype(np.float32)
for _ in range(iterations):
cuda.memcpy_htod_async(d_input, dummy_input, stream)
self.context.execute_async_v2(
bindings=[int(d_input), int(d_output)],
stream_handle=stream.handle
)
stream.synchronize()
d_input.free()
d_output.free()
warmup_end_time = time.time()
warmup_time = (warmup_end_time - warmup_start_time) * 1000
print(f" 预热时间: {warmup_time:.2f} ms")
self.is_warmed_up = True
def infer(self, image: np.ndarray) -> Tuple[float, np.ndarray]:
"""执行TensorRT推理"""
if not self.is_warmed_up:
print("警告:模型尚未预热,推理性能可能受影响")
input_size = int(np.prod(self.input_shape)) * np.float32().itemsize
output_size = int(np.prod(self.output_shape)) * np.float32().itemsize
d_input = cuda.mem_alloc(input_size)
d_output = cuda.mem_alloc(output_size)
stream = cuda.Stream()
input_data = np.ascontiguousarray(np.expand_dims(image, axis=0), dtype=np.float32)
# 正式推理
infer_start_time = time.time()
cuda.memcpy_htod_async(d_input, input_data, stream)
self.context.execute_async_v2(
bindings=[int(d_input), int(d_output)],
stream_handle=stream.handle
)
stream.synchronize()
infer_end_time = time.time()
infer_time = (infer_end_time - infer_start_time) * 1000
print(f" TensorRT 推理时间: {infer_time:.2f} ms")
# 获取输出
output_data = np.empty(self.output_shape, dtype=np.float32)
output_start_time = time.time()
cuda.memcpy_dtoh_async(output_data, d_output, stream)
stream.synchronize()
output_end_time = time.time()
output_time = (output_end_time - output_start_time) * 1000
print(f" 获取输出时间: {output_time:.2f} ms")
d_input.free()
d_output.free()
return infer_time, output_data
if __name__ == "__main__":
# 配置路径
PATHS = {
"image_folder": "D:/Desktop/DATA/balance_bei_liao_hu/temp", # 图片文件夹路径
"engine": "mnv4.engine" # TensorRT引擎文件路径
}
# 验证文件夹和文件存在
if not os.path.exists(PATHS["image_folder"]):
print(f"错误: 图片文件夹不存在 -> {os.path.abspath(PATHS['image_folder'])}")
exit(1)
if not os.path.exists(PATHS["engine"]):
print(f"错误: 引擎文件不存在 -> {os.path.abspath(PATHS['engine'])}")
exit(1)
# 获取文件夹中所有图片文件(包括子文件夹)
image_files = []
for root, _, files in os.walk(PATHS["image_folder"]):
for file in files:
if file.endswith(('.jpg', '.png', '.bmp', '.jpeg')):
image_files.append(os.path.join(root, file))
if not image_files:
print(f"错误: 文件夹中没有图片文件 -> {PATHS['image_folder']}")
exit(1)
# 初始化预测器
predictor = TensorRTPredictor(PATHS["engine"])
predictor.warmup(iterations=10) # 预热模型
total_time = 0
total_preprocess_time = 0
for image_path in image_files:
try:
print(f"处理图片: {image_path}")
img = predictor.preprocess_image(image_path)
trt_time, trt_out = predictor.infer(img)
print(f" TensorRT 输出: {np.argmax(trt_out)} (置信度: {np.max(trt_out):.4f})")
total_time += trt_time
except Exception as e:
print(f"处理图片时出错: {image_path} -> {str(e)}")
avg_time = total_time / len(image_files)
print(f"\n平均推理时间: {avg_time:.2f} ms")
这里TensorRT推理150张224×224图片平均速度为5.50ms
- ONNX推理时间测试代码
import onnxruntime as ort
import numpy as np
from PIL import Image
from torchvision import transforms
from timm.data import IMAGENET_DEFAULT_MEAN, IMAGENET_DEFAULT_STD
import time
import os
class ONNXPredictor:
def __init__(self, model_path="mobilenetv4_hybrid_medium.onnx", size=224):
# 自动检测可用provider
self.providers = self._get_available_providers()
print(f"可用推理后端: {self.providers}")
# 初始化ONNX Runtime会话
self.session = ort.InferenceSession(model_path, providers=self.providers)
# 获取当前使用的provider信息
current_provider = self.session.get_providers()
print(f"实际使用的推理后端: {current_provider}")
# 获取输入输出名称
self.input_name = self.session.get_inputs()[0].name
self.output_name = self.session.get_outputs()[0].name
# 预处理变换
self.transform = self.build_transform(size)
# 预热标志
self.is_warmed_up = False
def _get_available_providers(self):
"""获取可用的推理后端,优先使用CUDA且仅使用CUDA(如果可用)"""
available_providers = ort.get_available_providers()
# 优先使用CUDA且仅使用CUDA
if 'CUDAExecutionProvider' in available_providers:
return ['CUDAExecutionProvider'] # 仅返回CUDA
# 如果没有CUDA,则回退到CPU
elif 'CPUExecutionProvider' in available_providers:
return ['CPUExecutionProvider']
else:
raise RuntimeError("没有可用的执行提供程序(既没有CUDA也没有CPU)")
def build_transform(self, size: int):
"""构建图像预处理流水线"""
return transforms.Compose([
transforms.Resize(size, interpolation=transforms.InterpolationMode.BICUBIC),
transforms.CenterCrop(size),
transforms.ToTensor(),
transforms.Normalize(IMAGENET_DEFAULT_MEAN, IMAGENET_DEFAULT_STD)
])
def preprocess(self, image):
"""预处理图像"""
# 如果输入是文件路径,先加载图像
if isinstance(image, str):
image = Image.open(image).convert('RGB')
# 应用变换并添加batch维度
return self.transform(image).unsqueeze(0).numpy()
def warmup(self, iterations=10):
"""预热模型"""
dummy_input = np.random.rand(1, 3, 224, 224).astype(np.float32)
for _ in range(iterations):
self.session.run([self.output_name], {self.input_name: dummy_input})
self.is_warmed_up = True
print(f"模型已预热 {iterations} 次")
def predict(self, image):
"""执行预测"""
# 预处理
input_data = self.preprocess(image)
# 运行模型
outputs = self.session.run([self.output_name], {self.input_name: input_data})[0]
return outputs
if __name__ == "__main__":
# 配置路径
PATHS = {
"image_folder": "D:/Desktop/DATA/balance_bei_liao_hu/temp", # 图片文件夹路径
"model_path": "mobilenetv4_hybrid_medium.onnx" # ONNX模型文件路径
}
# 验证文件夹和文件存在
if not os.path.exists(PATHS["image_folder"]):
print(f"错误: 图片文件夹不存在 -> {os.path.abspath(PATHS['image_folder'])}")
exit(1)
if not os.path.exists(PATHS["model_path"]):
print(f"错误: 模型文件不存在 -> {os.path.abspath(PATHS['model_path'])}")
exit(1)
# 获取文件夹中所有图片文件(包括子文件夹)
image_files = []
for root, _, files in os.walk(PATHS["image_folder"]):
for file in files:
if file.endswith(('.jpg', '.png', '.bmp', '.jpeg')):
image_files.append(os.path.join(root, file))
if not image_files:
print(f"错误: 文件夹中没有图片文件 -> {PATHS['image_folder']}")
exit(1)
# 初始化预测器
predictor = ONNXPredictor(model_path=PATHS["model_path"], size=224)
predictor.warmup(iterations=10) # 预热模型
total_time = 0
for image_path in image_files:
try:
print(f"处理图片: {image_path}")
start_time = time.time()
predictions = predictor.predict(image_path)
end_time = time.time()
inference_time = (end_time - start_time) * 1000 # 转换为毫秒
print(f" ONNX 推理时间: {inference_time:.2f} ms")
print(f" ONNX 输出: {np.argmax(predictions)} (置信度: {np.max(predictions):.4f})")
total_time += inference_time
except Exception as e:
print(f"处理图片时出错: {image_path} -> {str(e)}")
avg_time = total_time / len(image_files)
print(f"\n平均推理时间: {avg_time:.2f} ms")
两种格式的模型分别预测了150张尺寸为224×224的三类图片,每一类有50张,调用TensorRT平均每张图片需要5.5ms,而onnx平均每张图片需要11.51ms,TensorRT模型的推理速度缩短为onnx的二分之一,缩短的时间可能与设备有关。
精度测试
- TensorRT推理代码
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
import cv2
import os
import numpy as np
import time
from typing import Tuple
from sklearn.metrics import classification_report, accuracy_score, f1_score
from collections import Counter
class TensorRTPredictor:
def __init__(self, engine_path: str):
"""初始化TensorRT预测器"""
self.logger = trt.Logger(trt.Logger.WARNING)
self.engine = self._load_engine(engine_path)
self.context = self.engine.create_execution_context()
self.input_shape = tuple(self.engine.get_tensor_shape(self.engine.get_tensor_name(0)))
self.output_shape = tuple(self.engine.get_tensor_shape(self.engine.get_tensor_name(1)))
self.is_warmed_up = False
self.warmup(iterations=10) # 在初始化时进行预热
def _load_engine(self, engine_path: str) -> trt.ICudaEngine:
"""加载TensorRT引擎"""
load_start_time = time.time()
with open(engine_path, "rb") as f, trt.Runtime(self.logger) as runtime:
engine = runtime.deserialize_cuda_engine(f.read())
load_end_time = time.time()
load_time = (load_end_time - load_start_time) * 1000
print(f"加载引擎时间: {load_time:.2f} ms")
return engine
def preprocess_image(self, image_path: str) -> np.ndarray:
"""图像预处理"""
preprocess_start_time = time.time()
if not os.path.exists(image_path):
raise FileNotFoundError(f"图像文件不存在: {os.path.abspath(image_path)}")
image = cv2.imread(image_path)
if image is None:
raise ValueError("无法读取图像,请检查文件格式和完整性")
try:
image = cv2.resize(image, (224, 224))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = np.ascontiguousarray(image.transpose(2, 0, 1).astype(np.float32) / 255.0)
mean = np.array([0.362, 0.279, 0.258]).reshape(3, 1, 1)
std = np.array([0.222, 0.191, 0.185]).reshape(3, 1, 1)
image = (image - mean) / std
except Exception as e:
raise RuntimeError(f"图像预处理失败: {str(e)}")
preprocess_end_time = time.time()
preprocess_time = (preprocess_end_time - preprocess_start_time) * 1000
print(f" 预处理时间: {preprocess_time:.2f} ms")
return image
def warmup(self, iterations: int = 10):
"""模型预热"""
if self.is_warmed_up:
print("模型已经预热,跳过预热步骤")
return
warmup_start_time = time.time()
input_size = int(np.prod(self.input_shape)) * np.float32().itemsize
output_size = int(np.prod(self.output_shape)) * np.float32().itemsize
d_input = cuda.mem_alloc(input_size)
d_output = cuda.mem_alloc(output_size)
stream = cuda.Stream()
dummy_input = np.random.rand(*self.input_shape).astype(np.float32)
for _ in range(iterations):
cuda.memcpy_htod_async(d_input, dummy_input, stream)
self.context.execute_async_v2(
bindings=[int(d_input), int(d_output)],
stream_handle=stream.handle
)
stream.synchronize()
d_input.free()
d_output.free()
warmup_end_time = time.time()
warmup_time = (warmup_end_time - warmup_start_time) * 1000
print(f" 预热时间: {warmup_time:.2f} ms")
self.is_warmed_up = True
def infer(self, image: np.ndarray) -> Tuple[float, np.ndarray]:
"""执行TensorRT推理"""
if not self.is_warmed_up:
print("警告:模型尚未预热,推理性能可能受影响")
input_size = int(np.prod(self.input_shape)) * np.float32().itemsize
output_size = int(np.prod(self.output_shape)) * np.float32().itemsize
d_input = cuda.mem_alloc(input_size)
d_output = cuda.mem_alloc(output_size)
stream = cuda.Stream()
input_data = np.ascontiguousarray(np.expand_dims(image, axis=0), dtype=np.float32)
# 正式推理
infer_start_time = time.time()
cuda.memcpy_htod_async(d_input, input_data, stream)
self.context.execute_async_v2(
bindings=[int(d_input), int(d_output)],
stream_handle=stream.handle
)
stream.synchronize()
infer_end_time = time.time()
infer_time = (infer_end_time - infer_start_time) * 1000
print(f" TensorRT 推理时间: {infer_time:.2f} ms")
# 获取输出
output_data = np.empty(self.output_shape, dtype=np.float32)
output_start_time = time.time()
cuda.memcpy_dtoh_async(output_data, d_output, stream)
stream.synchronize()
output_end_time = time.time()
output_time = (output_end_time - output_start_time) * 1000
print(f" 获取输出时间: {output_time:.2f} ms")
d_input.free()
d_output.free()
return infer_time, output_data
if __name__ == "__main__":
# 配置路径
PATHS = {
"image_folder": "D:/Desktop/DATA/balance_bei_liao_hu/temp", # 图片文件夹路径
"engine": "mnv4.engine" # TensorRT引擎文件路径
}
# 验证文件夹和文件存在
if not os.path.exists(PATHS["image_folder"]):
print(f"错误: 图片文件夹不存在 -> {os.path.abspath(PATHS['image_folder'])}")
exit(1)
if not os.path.exists(PATHS["engine"]):
print(f"错误: 引擎文件不存在 -> {os.path.abspath(PATHS['engine'])}")
exit(1)
# 获取文件夹中所有图片文件(包括子文件夹)
image_files = []
for root, _, files in os.walk(PATHS["image_folder"]):
for file in files:
if file.endswith(('.jpg', '.png', '.bmp', '.jpeg')):
image_files.append(os.path.join(root, file))
if not image_files:
print(f"错误: 文件夹中没有图片文件 -> {PATHS['image_folder']}")
exit(1)
# 初始化预测器
predictor = TensorRTPredictor(PATHS["engine"])
# 初始化分类结果统计
true_labels = []
predicted_labels = []
label_mapping = {0: "B", 1: "D", 2: "E"}
total_time = 0
for image_path in image_files:
try:
print(f"处理图片: {image_path}")
img = predictor.preprocess_image(image_path)
trt_time, trt_out = predictor.infer(img)
print(f" TensorRT 推理时间: {trt_time:.2f} ms")
predicted_label = np.argmax(trt_out)
predicted_labels.append(predicted_label)
# 从文件路径中提取真实标签
true_label = os.path.basename(os.path.dirname(image_path))
true_labels.append(true_label)
total_time += trt_time
except Exception as e:
print(f"处理图片时出错: {image_path} -> {str(e)}")
avg_time = total_time / len(image_files)
print(f"\n平均推理时间: {avg_time:.2f} ms")
# 计算分类结果
true_labels = [label for label in true_labels]
predicted_labels = [label_mapping[label] for label in predicted_labels]
print("\n分类结果统计:")
print(f"图片总数: {len(image_files)}")
print(f"分类结果: {Counter(predicted_labels)}")
# 计算准确率和 F1 分数
accuracy = accuracy_score(true_labels, predicted_labels)
f1 = f1_score(true_labels, predicted_labels, average='weighted')
print(f"准确率: {accuracy:.4f}") # 保留四位小数
print(f"F1 分数: {f1:.4f}") # 保留四位小数
# 输出详细的分类报告
print("\n分类报告:")
print(classification_report(true_labels, predicted_labels, digits=4)) # 保留四位小数
- onnx推理代码
from datasets.split_data import read_split_data
from datasets.mydataset import MyDataset
from torchvision import transforms
from timm.data import IMAGENET_DEFAULT_MEAN, IMAGENET_DEFAULT_STD
import torch
from estimate_model import Predictor, Plot_ROC
from timm.models import create_model
import os, cv2, json, random
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
def read_test_data(root, plot_image=False):
filepaths = []
labels = []
bad_images = []
random.seed(0)
assert os.path.exists(root), 'Your root does not exists!!!'
classes = [cla for cla in os.listdir(root) if os.path.isdir(os.path.join(root, cla))]
classes.sort()
class_indices = {k: v for v, k in enumerate(classes)}
json_str = json.dumps({v: k for k, v in class_indices.items()}, indent=4)
with open('output/classes_indices.json', 'w') as json_file:
json_file.write(json_str)
every_class_num = []
supported = ['.jpg', '.png', '.jpeg', '.PNG', '.JPG', '.JPEG', '.bmp']
for klass in classes:
classpath = os.path.join(root, klass)
images = [os.path.join(root, klass, i) for i in os.listdir(classpath) if os.path.splitext(i)[-1] in supported]
every_class_num.append(len(images))
flist = sorted(os.listdir(classpath))
desc = f'{klass:23s}'
for f in tqdm(flist, ncols=110, desc=desc, unit='file', colour='blue'):
fpath = os.path.join(classpath, f)
fl = f.lower()
index = fl.rfind('.')
ext = fl[index:]
if ext in supported:
try:
img = cv2.imread(fpath)
filepaths.append(fpath)
labels.append(klass)
except:
bad_images.append(fpath)
print('defective image file: ', fpath)
else:
bad_images.append(fpath)
Fseries = pd.Series(filepaths, name='filepaths')
Lseries = pd.Series(labels, name='labels')
df = pd.concat([Fseries, Lseries], axis=1)
print(f'{len(df.labels.unique())} kind of images were found in the dataset')
test_image_path = df['filepaths'].tolist()
test_image_label = [class_indices[i] for i in df['labels'].tolist()]
sample_df = df.sample(n=50, replace=False)
ht, wt, count = 0, 0, 0
for i in range(len(sample_df)):
fpath = sample_df['filepaths'].iloc[i]
try:
img = cv2.imread(fpath)
h = img.shape[0]
w = img.shape[1]
ht += h
wt += w
count += 1
except:
pass
have = int(ht / count)
wave = int(wt / count)
aspect_ratio = have / wave
print('{} images were found in the dataset.\n{} for test'.format(
sum(every_class_num), len(test_image_path)
))
print('average image height= ', have, ' average image width= ', wave, ' aspect ratio h/w= ', aspect_ratio)
if plot_image:
plt.bar(range(len(classes)), every_class_num, align='center')
plt.xticks(range(len(classes)), classes)
for i, v in enumerate(every_class_num):
plt.text(x=i, y=v + 5, s=str(v), ha='center')
plt.xlabel('image class')
plt.ylabel('number of images')
plt.title('class distribution')
plt.show()
return test_image_path, test_image_label
test_image_path, test_image_label = read_test_data(
'D:/Desktop/DATA/balance_bei_liao_hu/temp', False)
def build_transform(img_size):
t = []
t.append(
# to maintain same ratio w.r.t. 224 images
transforms.Resize(img_size, interpolation=3),
)
t.append(transforms.CenterCrop(img_size))
t.append(transforms.ToTensor())
t.append(transforms.Normalize(IMAGENET_DEFAULT_MEAN, IMAGENET_DEFAULT_STD))
return transforms.Compose(t)
test_transform = build_transform(224)
test_set = MyDataset(test_image_path, test_image_label, test_transform)
sampler_val = torch.utils.data.SequentialSampler(test_set)
data_loader_val = torch.utils.data.DataLoader(
test_set, sampler=sampler_val,
batch_size=int(1.5 * 24),
num_workers=0,
pin_memory=True,
drop_last=False
)
model_predict = create_model('mobilenetv4_hybrid_medium')
model_predict.reset_classifier(num_classes=3)
model_predict.to('cuda')
device = torch.device('cuda')
Predictor(model_predict, data_loader_val, f'./output/mobilenetv4_hybrid_medium_best_checkpoint.pth', device)
Plot_ROC(model_predict, data_loader_val, f'./output/mobilenetv4_hybrid_medium_best_checkpoint.pth', device)
- 结果:
TensorRT:
onnx:
可以观察到在转成TensorRT推理后模型精度下降明显,宏平均Precision下降了约4%,宏平均召回下降了约10%,宏平均F1下降了约10%。