如何将 YOLO 模型部署到生产环境

本文共有10389个字,关键词:

在深度学习项目中,训练一个高性能的模型只是第一步。真正让模型产生价值的关键在于将其高效、稳定地部署到生产环境中。本文以 Ultralytics YOLO11 为例,详细介绍如何将 .pt 模型文件一步步转换为 TensorRT 引擎(.engine),并使用 FastAPI 构建高性能 RESTful 接口,实现低延迟、高吞吐的缺陷检测服务。

一、环境与版本说明

本方案基于以下软硬件环境和依赖版本,确保兼容性和稳定性:

硬件环境

使用autodl的线上服务器

  • CPU:16 vCPU Intel(R) Xeon(R) Platinum 8481C
  • 内存:80GB
  • 硬盘:系统盘 30GB,数据盘 50G
  • GPU: RTX 4090D(24GB)
  • CUDA:12.8

软件环境

组件版本安装命令
Python3.123.12(ubuntu22.04) autodl自带镜像
PyTorch2.8.0系统自带镜像
Ultralytics (YOLOv8)8.2.0pip install ultralytics==8.2.0
ONNX1.19.1直接运行yolo_export.py会自动安装onnx相关
TensorRT10.8.0.43pip install tensorrt==10.8.0.43
CUDA Toolkit12.8系统级安装

注意:TensorRT 与 CUDA 版本需严格匹配。本文基于 CUDA 12.8 + TensorRT 10.8。

二、PyTorch (.pt) 转 ONNX (.onnx)

为了将 YOLO 模型集成到 TensorRT 中,首先需要将其从 PyTorch 格式转换为 ONNX 格式。

步骤 1:导出 ONNX 模型

from ultralytics import YOLO

'''
    将 YOLOv8 模型导出为包含 NMS 的 ONNX 格式
    onnx                           1.19.1
    onnxruntime                    1.23.2     
    onnxslim                       0.1.71
'''

# 加载模型
model = YOLO('runs/defect/train8/weights/best.pt')

# 2. 导出为 ONNX 格式,包含 NMS 和简化
path = model.export(
    format="onnx",           # 导出格式
    opset=12,                # ONNX 算子集版本(推荐 12)
    simplify=True,           # 简化模型(合并 BatchNorm、消除冗余节点)
    nms=True,                # 关键:在模型中集成 NMS 后处理
    imgsz=896,               # 输入图像尺寸(必须与推理时一致)
)  # 返回导出模型的路径
print(path)

参数说明

  • dynamic=True:允许输入尺寸变化,如 (1,3,H,W)。不要使用,因为需要改代码
  • device=0: 不要使用gpu,因为下载onnxruntime-gpu太慢,不如直接使用cpu导出
  • nms=True: 建议使用,因为生产环境需要,可加速,但不可以配置iou

输出文件

  • best.onnx:可用于 Netron 可视化,确认输入输出结构。

三、ONNX (.onnx) 转 TensorRT Engine (.engine)

使用 TensorRT 将 ONNX 模型编译为 .engine 文件,实现极致推理性能。需要先安装tensorrt

步骤 1:编写转换脚本 onnx2engine.py

脚本链接:https://gitee.com/luori/yolo_steel_defect_detect/blob/master/onnx2engine.py

步骤 2:运行转换

python onnx2engine.py

输出

  • best.engine:可在 GPU 上高速推理的 TensorRT 引擎。

四、FastAPI 项目部署

使用 FastAPI 构建 REST API,提供 /predict 接口接收图像并返回检测结果。

项目链接:https://gitee.com/luori/steel_defect_detection

1. 推理类 lib/inference.py

import cv2
import tensorrt as trt
import pycuda.driver as cuda
import numpy as np

CLASSES = ['crazing', 'inclusion', 'patches', 'pitted_surface', 'rolled-in_scale', 'scratches']  # 类别列表
IMG_SIZE = (896, 896)  # 输入图像尺寸

class TRTInference:
    def __init__(self, engine_path):
        # 初始化CUDA驱动
        cuda.init()
        
        # 创建CUDA上下文
        self.ctx = cuda.Device(0).make_context()
        
        self.engine = self.load_engine(engine_path)
        self.context = self.engine.create_execution_context()
        
        # 分配GPU内存
        self.inputs, self.outputs, self.bindings, self.stream = self.allocate_buffers()
    
    # 加载TensorRT引擎
    def load_engine(self, engine_path):
        TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
        with open(engine_path, 'rb') as f, trt.Runtime(TRT_LOGGER) as runtime:
            return runtime.deserialize_cuda_engine(f.read())
    
    # 分配输入输出缓冲区
    def allocate_buffers(self):
        inputs = []
        outputs = []
        bindings = []
        stream = cuda.Stream()
        
        # 使用新API获取绑定信息 (适用于TensorRT 10.x)
        for i in range(self.engine.num_io_tensors):
            name = self.engine.get_tensor_name(i)
            shape = self.engine.get_tensor_shape(name)
            trt_dtype = self.engine.get_tensor_dtype(name)
            dtype = trt.nptype(trt_dtype)
            
            # 计算张量大小,不再使用max_batch_size
            size = trt.volume(shape)
            # print('shape:', shape)  # 输出两次shape: (1, 3, 896, 896)、shape: (1, 10, 16464)
            # 如果第一个维度是动态的(-1),则需要特殊处理
            if shape[0] == -1:
                # 对于动态批次大小,使用optimal_shape
                opt_shape = self.engine.get_tensor_profile_shape(name, 0)[1]  # [0]是profile索引,[1]是optimal shape
                size = trt.volume(opt_shape)
            
            # 获取数据类型大小
            # 创建一个该类型的实例来获取itemsize
            dtype_instance = dtype(1)
            dtype_size = dtype_instance.itemsize
            
            # 分配GPU内存
            device_mem = cuda.mem_alloc(size * dtype_size)
            bindings.append(int(device_mem))
            
            if self.engine.get_tensor_mode(name) == trt.TensorIOMode.INPUT:
                inputs.append({'name': name, 'dtype': dtype, 'host_memory': None, 'device_memory': device_mem})
            else:
                host_mem = cuda.pagelocked_empty(size, dtype)
                outputs.append({'name': name, 'dtype': dtype, 'host_memory': host_mem, 'device_memory': device_mem})
        print(inputs, outputs, bindings, stream)
        return inputs, outputs, bindings, stream
    
    # 预处理图像,返回 (1, 3, 896, 896) 的numpy array
    def preprocess(self, image: np.ndarray):
        # 调整大小
        resized = cv2.resize(image, IMG_SIZE)
        # BGR to RGB
        rgb = resized[:, :, ::-1]
        # HWC to CHW
        chw = np.transpose(rgb, (2, 0, 1))
        # 归一化到 [0, 1]
        normalized = chw.astype(np.float32) / 255.0
        # 添加 batch 维度
        batched = np.expand_dims(normalized, axis=0)
        # 确保内存连续
        return np.ascontiguousarray(batched)

    # 后处理YOLO输出
    def postprocess(self, output, conf_threshold=0.25):
        # 转为 numpy 数组
        output = np.array(output)  # shape: (1, 1800)
        
        if output.size == 0:
            return []
        
        # 展平并 reshape 成 (N, 6),N=300 是 ONNX-NMS 的最大检测数
        num_detections = output.shape[-1] // 6  # 1800 // 6 = 300
        detections = output.reshape(-1, 6)  # (300, 6) 或 (1, 300, 6) → (300, 6)
        
        results = []
        for det in detections:
            x1, y1, x2, y2, conf, cls_id = det
            
            # ONNX NMS 会用 0 填充无效检测,所以过滤掉 conf=0 的
            if conf < 0.01:  # 非常低的阈值,过滤填充项
                continue
                
            # 置信度过滤(业务阈值)
            if conf < conf_threshold:
                continue

            # 转为中心点 + 宽高
            cx = (x1 + x2) / 2
            cy = (y1 + y2) / 2
            w = x2 - x1
            h = y2 - y1

            results.append({
                "class_name": CLASSES[int(cls_id)],
                "class_id": int(cls_id),
                "confidence": float(conf),
                "bbox": [float(cx), float(cy), float(w), float(h)]
            })
        
        print(f"[INFO] Found {len(results)} valid detections")
        return results

    # 推理方法
    def infer(self, input_data):
        # 确保CUDA上下文处于活动状态
        self.ctx.push()
        
        try:
            # print("输入数据形状:", input_data.shape)  # (200, 200, 3)
            # print("输入数据范围:", input_data.min(), input_data.max())  # 11 255
            # print("输入数据 dtype:", input_data.dtype)  # uint8
            # 图像预处理
            input_data = self.preprocess(input_data)
            # print("输入数据形状:", input_data.shape)  # (1, 3, 896, 896)
            # print("输入数据范围:", input_data.min(), input_data.max())  # 0.047058824 1.0
            # print("输入数据 dtype:", input_data.dtype)  # float32
            # 设置输入张量地址
            self.context.set_tensor_address(self.inputs[0]['name'], int(self.inputs[0]['device_memory']))
            # 设置输出张量地址
            self.context.set_tensor_address(self.outputs[0]['name'], int(self.outputs[0]['device_memory']))

            # 将输入数据复制到GPU
            cuda.memcpy_htod_async(self.inputs[0]['device_memory'], input_data, self.stream)
            
            # 执行推理 (使用新API)
            self.context.execute_async_v3(stream_handle=self.stream.handle)
            
            # 将输出从GPU复制回CPU
            for output in self.outputs:
                cuda.memcpy_dtoh_async(output['host_memory'], output['device_memory'], self.stream)
            
            # 同步流
            self.stream.synchronize()
            
            # 获取输出
            output_data = [out['host_memory'] for out in self.outputs]
            # print('-'*50)
            # print(output_data)  # [array([496.5, 279. , 774.5, ...,   0. ,   0. ,   0. ], dtype=float32)]
            # print("原始输出数据:", output_data[0].shape)  # (1800,)
            # print("前10个值:", output_data[0][:10])  # [4.965000e+02 2.790000e+02 7.745000e+02 8.870000e+02 8.730469e-01 2.000000e+00 1.423750e+02 1.250000e+00 4.420000e+02 7.125000e+02]
            # print("统计: min={:.6f}, max={:.6f}, mean={:.6f}".format(output_data[0].min(), output_data[0].max(), output_data[0].mean()))  # min=-0.375000, max=894.500000, mean=2.952405
            # 后处理
            results = self.postprocess(output_data)

            # 返回输出
            return results
        finally:
            # 弹出上下文
            self.ctx.pop()
    
    def __del__(self):
        """析构函数,清理CUDA资源"""
        if hasattr(self, 'ctx'):
            self.ctx.pop()
            self.ctx.detach()

# 使用示例
# infer = TRTInference("best.engine")

# 假设你有一张图像
# image = cv2.imread("test.jpg")
# outputs = infer.infer(input_data)

# 注意:输出是网络的原始输出,需要进行后处理(如解码边界框、NMS等)

2. 依赖文件 requirements.txt

fastapi==0.119.1
uvicorn==0.38.0
python-multipart==0.0.20
opencv-python==4.5.5.64
pycuda==2025.1.2  # 对应cuda12.8
tensorrt==10.8.0.43  # 对应cuda12.8
numpy==1.26.4

4. 启动服务

uvicorn api.app:app --host 0.0.0.0 --port 8000 --reload

五、接口调用示例

使用 curl 调用

curl -X POST "http://localhost:8000/detect" \
     -H "accept: application/json" \
     -F "file=@test.jpg" \
     -F "conf=0.3" \
     -F "iou=0.5"

使用 Python requests

import requests

url = "http://localhost:8000/predict"
files = {'file': open('test.jpg', 'rb')}

response = requests.post(url, files=files, data=data)
print(response.json())

返回示例

{
  "results": [
    {
      "class_name": "DefectA",
      "class_id": 0,
      "confidence": 0.92,
      "bbox": [450.1, 300.5, 120.3, 80.2]
    }
  ]
}

总结

本文完整展示了从 YOLO 模型训练到生产部署的全流程:

  1. 模型导出:使用 nms=True 方便生产环境使用;
  2. 加速推理:通过 TensorRT 实现 FP16 加速;
  3. 服务封装:FastAPI 提供 RESTful 接口;
  4. 参数可控:支持运行时调节 conf。
版权声明:本文为作者原创,如需转载须联系作者本人同意,未经作者本人同意不得擅自转载。
添加新评论
暂无评论