在深度学习项目中,训练一个高性能的模型只是第一步。真正让模型产生价值的关键在于将其高效、稳定地部署到生产环境中。本文以 Ultralytics YOLO11 为例,详细介绍如何将 .pt 模型文件一步步转换为 TensorRT 引擎(.engine),并使用 FastAPI 构建高性能 RESTful 接口,实现低延迟、高吞吐的缺陷检测服务。
一、环境与版本说明
本方案基于以下软硬件环境和依赖版本,确保兼容性和稳定性:
硬件环境
使用autodl的线上服务器
- CPU:16 vCPU Intel(R) Xeon(R) Platinum 8481C
- 内存:80GB
- 硬盘:系统盘 30GB,数据盘 50G
- GPU: RTX 4090D(24GB)
- CUDA:12.8
软件环境
| 组件 | 版本 | 安装命令 |
|---|---|---|
| Python | 3.12 | 3.12(ubuntu22.04) autodl自带镜像 |
| PyTorch | 2.8.0 | 系统自带镜像 |
| Ultralytics (YOLOv8) | 8.2.0 | pip install ultralytics==8.2.0 |
| ONNX | 1.19.1 | 直接运行yolo_export.py会自动安装onnx相关 |
| TensorRT | 10.8.0.43 | pip install tensorrt==10.8.0.43 |
| CUDA Toolkit | 12.8 | 系统级安装 |
注意:TensorRT 与 CUDA 版本需严格匹配。本文基于 CUDA 12.8 + TensorRT 10.8。
二、PyTorch (.pt) 转 ONNX (.onnx)
为了将 YOLO 模型集成到 TensorRT 中,首先需要将其从 PyTorch 格式转换为 ONNX 格式。
步骤 1:导出 ONNX 模型
from ultralytics import YOLO
'''
将 YOLOv8 模型导出为包含 NMS 的 ONNX 格式
onnx 1.19.1
onnxruntime 1.23.2
onnxslim 0.1.71
'''
# 加载模型
model = YOLO('runs/defect/train8/weights/best.pt')
# 2. 导出为 ONNX 格式,包含 NMS 和简化
path = model.export(
format="onnx", # 导出格式
opset=12, # ONNX 算子集版本(推荐 12)
simplify=True, # 简化模型(合并 BatchNorm、消除冗余节点)
nms=True, # 关键:在模型中集成 NMS 后处理
imgsz=896, # 输入图像尺寸(必须与推理时一致)
) # 返回导出模型的路径
print(path)参数说明
dynamic=True:允许输入尺寸变化,如 (1,3,H,W)。不要使用,因为需要改代码device=0: 不要使用gpu,因为下载onnxruntime-gpu太慢,不如直接使用cpu导出nms=True: 建议使用,因为生产环境需要,可加速,但不可以配置iou
输出文件
- best.onnx:可用于 Netron 可视化,确认输入输出结构。
三、ONNX (.onnx) 转 TensorRT Engine (.engine)
使用 TensorRT 将 ONNX 模型编译为 .engine 文件,实现极致推理性能。需要先安装tensorrt
步骤 1:编写转换脚本 onnx2engine.py
脚本链接:https://gitee.com/luori/yolo_steel_defect_detect/blob/master/onnx2engine.py
步骤 2:运行转换
python onnx2engine.py输出
- best.engine:可在 GPU 上高速推理的 TensorRT 引擎。
四、FastAPI 项目部署
使用 FastAPI 构建 REST API,提供 /predict 接口接收图像并返回检测结果。
项目链接:https://gitee.com/luori/steel_defect_detection
1. 推理类 lib/inference.py
import cv2
import tensorrt as trt
import pycuda.driver as cuda
import numpy as np
CLASSES = ['crazing', 'inclusion', 'patches', 'pitted_surface', 'rolled-in_scale', 'scratches'] # 类别列表
IMG_SIZE = (896, 896) # 输入图像尺寸
class TRTInference:
def __init__(self, engine_path):
# 初始化CUDA驱动
cuda.init()
# 创建CUDA上下文
self.ctx = cuda.Device(0).make_context()
self.engine = self.load_engine(engine_path)
self.context = self.engine.create_execution_context()
# 分配GPU内存
self.inputs, self.outputs, self.bindings, self.stream = self.allocate_buffers()
# 加载TensorRT引擎
def load_engine(self, engine_path):
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
with open(engine_path, 'rb') as f, trt.Runtime(TRT_LOGGER) as runtime:
return runtime.deserialize_cuda_engine(f.read())
# 分配输入输出缓冲区
def allocate_buffers(self):
inputs = []
outputs = []
bindings = []
stream = cuda.Stream()
# 使用新API获取绑定信息 (适用于TensorRT 10.x)
for i in range(self.engine.num_io_tensors):
name = self.engine.get_tensor_name(i)
shape = self.engine.get_tensor_shape(name)
trt_dtype = self.engine.get_tensor_dtype(name)
dtype = trt.nptype(trt_dtype)
# 计算张量大小,不再使用max_batch_size
size = trt.volume(shape)
# print('shape:', shape) # 输出两次shape: (1, 3, 896, 896)、shape: (1, 10, 16464)
# 如果第一个维度是动态的(-1),则需要特殊处理
if shape[0] == -1:
# 对于动态批次大小,使用optimal_shape
opt_shape = self.engine.get_tensor_profile_shape(name, 0)[1] # [0]是profile索引,[1]是optimal shape
size = trt.volume(opt_shape)
# 获取数据类型大小
# 创建一个该类型的实例来获取itemsize
dtype_instance = dtype(1)
dtype_size = dtype_instance.itemsize
# 分配GPU内存
device_mem = cuda.mem_alloc(size * dtype_size)
bindings.append(int(device_mem))
if self.engine.get_tensor_mode(name) == trt.TensorIOMode.INPUT:
inputs.append({'name': name, 'dtype': dtype, 'host_memory': None, 'device_memory': device_mem})
else:
host_mem = cuda.pagelocked_empty(size, dtype)
outputs.append({'name': name, 'dtype': dtype, 'host_memory': host_mem, 'device_memory': device_mem})
print(inputs, outputs, bindings, stream)
return inputs, outputs, bindings, stream
# 预处理图像,返回 (1, 3, 896, 896) 的numpy array
def preprocess(self, image: np.ndarray):
# 调整大小
resized = cv2.resize(image, IMG_SIZE)
# BGR to RGB
rgb = resized[:, :, ::-1]
# HWC to CHW
chw = np.transpose(rgb, (2, 0, 1))
# 归一化到 [0, 1]
normalized = chw.astype(np.float32) / 255.0
# 添加 batch 维度
batched = np.expand_dims(normalized, axis=0)
# 确保内存连续
return np.ascontiguousarray(batched)
# 后处理YOLO输出
def postprocess(self, output, conf_threshold=0.25):
# 转为 numpy 数组
output = np.array(output) # shape: (1, 1800)
if output.size == 0:
return []
# 展平并 reshape 成 (N, 6),N=300 是 ONNX-NMS 的最大检测数
num_detections = output.shape[-1] // 6 # 1800 // 6 = 300
detections = output.reshape(-1, 6) # (300, 6) 或 (1, 300, 6) → (300, 6)
results = []
for det in detections:
x1, y1, x2, y2, conf, cls_id = det
# ONNX NMS 会用 0 填充无效检测,所以过滤掉 conf=0 的
if conf < 0.01: # 非常低的阈值,过滤填充项
continue
# 置信度过滤(业务阈值)
if conf < conf_threshold:
continue
# 转为中心点 + 宽高
cx = (x1 + x2) / 2
cy = (y1 + y2) / 2
w = x2 - x1
h = y2 - y1
results.append({
"class_name": CLASSES[int(cls_id)],
"class_id": int(cls_id),
"confidence": float(conf),
"bbox": [float(cx), float(cy), float(w), float(h)]
})
print(f"[INFO] Found {len(results)} valid detections")
return results
# 推理方法
def infer(self, input_data):
# 确保CUDA上下文处于活动状态
self.ctx.push()
try:
# print("输入数据形状:", input_data.shape) # (200, 200, 3)
# print("输入数据范围:", input_data.min(), input_data.max()) # 11 255
# print("输入数据 dtype:", input_data.dtype) # uint8
# 图像预处理
input_data = self.preprocess(input_data)
# print("输入数据形状:", input_data.shape) # (1, 3, 896, 896)
# print("输入数据范围:", input_data.min(), input_data.max()) # 0.047058824 1.0
# print("输入数据 dtype:", input_data.dtype) # float32
# 设置输入张量地址
self.context.set_tensor_address(self.inputs[0]['name'], int(self.inputs[0]['device_memory']))
# 设置输出张量地址
self.context.set_tensor_address(self.outputs[0]['name'], int(self.outputs[0]['device_memory']))
# 将输入数据复制到GPU
cuda.memcpy_htod_async(self.inputs[0]['device_memory'], input_data, self.stream)
# 执行推理 (使用新API)
self.context.execute_async_v3(stream_handle=self.stream.handle)
# 将输出从GPU复制回CPU
for output in self.outputs:
cuda.memcpy_dtoh_async(output['host_memory'], output['device_memory'], self.stream)
# 同步流
self.stream.synchronize()
# 获取输出
output_data = [out['host_memory'] for out in self.outputs]
# print('-'*50)
# print(output_data) # [array([496.5, 279. , 774.5, ..., 0. , 0. , 0. ], dtype=float32)]
# print("原始输出数据:", output_data[0].shape) # (1800,)
# print("前10个值:", output_data[0][:10]) # [4.965000e+02 2.790000e+02 7.745000e+02 8.870000e+02 8.730469e-01 2.000000e+00 1.423750e+02 1.250000e+00 4.420000e+02 7.125000e+02]
# print("统计: min={:.6f}, max={:.6f}, mean={:.6f}".format(output_data[0].min(), output_data[0].max(), output_data[0].mean())) # min=-0.375000, max=894.500000, mean=2.952405
# 后处理
results = self.postprocess(output_data)
# 返回输出
return results
finally:
# 弹出上下文
self.ctx.pop()
def __del__(self):
"""析构函数,清理CUDA资源"""
if hasattr(self, 'ctx'):
self.ctx.pop()
self.ctx.detach()
# 使用示例
# infer = TRTInference("best.engine")
# 假设你有一张图像
# image = cv2.imread("test.jpg")
# outputs = infer.infer(input_data)
# 注意:输出是网络的原始输出,需要进行后处理(如解码边界框、NMS等)2. 依赖文件 requirements.txt
fastapi==0.119.1
uvicorn==0.38.0
python-multipart==0.0.20
opencv-python==4.5.5.64
pycuda==2025.1.2 # 对应cuda12.8
tensorrt==10.8.0.43 # 对应cuda12.8
numpy==1.26.44. 启动服务
uvicorn api.app:app --host 0.0.0.0 --port 8000 --reload五、接口调用示例
使用 curl 调用
curl -X POST "http://localhost:8000/detect" \
-H "accept: application/json" \
-F "file=@test.jpg" \
-F "conf=0.3" \
-F "iou=0.5"使用 Python requests
import requests
url = "http://localhost:8000/predict"
files = {'file': open('test.jpg', 'rb')}
response = requests.post(url, files=files, data=data)
print(response.json())返回示例
{
"results": [
{
"class_name": "DefectA",
"class_id": 0,
"confidence": 0.92,
"bbox": [450.1, 300.5, 120.3, 80.2]
}
]
}总结
本文完整展示了从 YOLO 模型训练到生产部署的全流程:
- 模型导出:使用 nms=True 方便生产环境使用;
- 加速推理:通过 TensorRT 实现 FP16 加速;
- 服务封装:FastAPI 提供 RESTful 接口;
- 参数可控:支持运行时调节 conf。