autobackend.py
ultralytics\nn\autobackend.py
目录
autobackend.py
1.所需的库和模块
2.def check_class_names(names: Union[List, Dict]) -> Dict[int, str]:
3.def default_class_names(data: Optional[Union[str, Path]] = None) -> Dict[int, str]:
4.class AutoBackend(nn.Module):
1.所需的库和模块
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/licenseimport ast
import json
import platform
import zipfile
from collections import OrderedDict, namedtuple
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Unionimport cv2
import numpy as np
import torch
import torch.nn as nn
from PIL import Imagefrom ultralytics.utils import ARM64, IS_JETSON, LINUX, LOGGER, PYTHON_VERSION, ROOT, YAML
from ultralytics.utils.checks import check_requirements, check_suffix, check_version, check_yaml, is_rockchip
from ultralytics.utils.downloads import attempt_download_asset, is_url
2.def check_class_names(names: Union[List, Dict]) -> Dict[int, str]:
# 这段代码定义了 check_class_names 函数,其主要功能是检查和标准化类别名称的输入格式,确保其符合预期的字典格式,并处理一些特殊情况,如 ImageNet 数据集的类别代码。
# 定义了 check_class_names 函数,该函数接受一个参数 1.names ,它可以是列表( List )或字典( Dict )类型。函数的返回值是一个字典( Dict[int, str] ),其中键是整数类别索引,值是对应的类别名称。
def check_class_names(names: Union[List, Dict]) -> Dict[int, str]:# 检查类名,并根据需要转换为字典格式。# 引发:# KeyError:如果类索引对于数据集大小无效。"""Check class names and convert to dict format if needed.Args:names (list | dict): Class names as list or dict format.Returns:(dict): Class names in dict format with integer keys and string values.Raises:KeyError: If class indices are invalid for the dataset size."""# 检查 names 是否是列表类型。 如果是列表,使用 enumerate 将列表转换为字典,其中键是索引(从0开始),值是列表中的元素。if isinstance(names, list): # names is a listnames = dict(enumerate(names)) # convert to dict# 检查 names 是否是字典类型。如果已经转换为字典,或者原始输入就是字典,进入此分支。if isinstance(names, dict):# Convert 1) string keys to int, i.e. '0' to 0, and non-string values to strings, i.e. True to 'True'# 使用字典推导式,将字典的键(如果为字符串)转换为整数,将值(如果为非字符串)转换为字符串。 这一步确保字典的键是整数,值是字符串。names = {int(k): str(v) for k, v in names.items()}# 计算字典中类别的数量 n 。n = len(names)# 检查字典中最大的键是否小于类别数量 n 。# 如果最大键值大于或等于 n ,抛出 KeyError ,提示类别索引无效。这确保类别索引从0开始,且连续。if max(names.keys()) >= n:raise KeyError(f"{n}-class dataset requires class indices 0-{n - 1}, but you have invalid class indices " # {n} 类数据集需要类索引 0-{n - 1},但您的类索引无效f"{min(names.keys())}-{max(names.keys())} defined in your dataset YAML." # 数据集 YAML 中定义的 {min(names.keys())}-{max(names.keys())}。)# 检查类别索引0对应的值是否是字符串,并且以 "n0" 开头。这是 ImageNet 数据集的类别代码格式。# 如果是,加载 ImageNet 数据集的映射文件( ImageNet.yaml ),该文件包含从类别代码到人类可读名称的映射。# 使用字典推导式,将类别代码映射为人类可读的名称。if isinstance(names[0], str) and names[0].startswith("n0"): # imagenet class codes, i.e. 'n01440764'names_map = YAML.load(ROOT / "cfg/datasets/ImageNet.yaml")["map"] # human-readable namesnames = {k: names_map[v] for k, v in names.items()}# 返回处理后的类别名称字典。return names
# 这段代码定义了 check_class_names 函数,其主要功能是检查和标准化类别名称的输入格式。具体步骤包括: 如果输入是列表,将其转换为字典,键为索引,值为类别名称。 如果输入是字典,确保键是整数,值是字符串。 检查字典中的类别索引是否连续且从0开始,抛出错误如果索引无效。 如果类别名称是 ImageNet 的类别代码,加载映射文件并将其转换为人类可读的名称。通过这些步骤, check_class_names 函数能够确保类别名称的输入格式符合预期,并处理特殊情况,如 ImageNet 数据集的类别代码。这有助于提高代码的健壮性和可读性。
3.def default_class_names(data: Optional[Union[str, Path]] = None) -> Dict[int, str]:
# 这段代码定义了 default_class_names 函数,其主要功能是根据提供的数据路径加载类别名称,如果加载失败,则返回默认的类别名称。
# 定义了 default_class_names 函数,该函数接受一个可选的参数 1.data ,它可以是字符串或 Path 类型,表示数据路径。函数的返回值是一个字典( Dict[int, str] ),其中键是整数类别索引,值是对应的类别名称。
def default_class_names(data: Optional[Union[str, Path]] = None) -> Dict[int, str]:# 将默认类名应用于输入的 YAML 文件或返回数值类名。"""Apply default class names to an input YAML file or return numerical class names.Args:data (str | Path, optional): Path to YAML file containing class names.Returns:(dict): Dictionary mapping class indices to class names."""# 检查是否提供了 data 参数。如果提供了 data ,则尝试从该路径加载类别名称。if data:# 使用 check_yaml(data) 函数检查并加载 YAML 文件。 check_yaml 函数的具体实现未给出,但可以推测它用于验证和加载 YAML 文件。# 使用 YAML.load 函数加载 YAML 文件内容,并尝试从加载的内容中提取 names 键对应的值。# 如果在加载或提取过程中发生任何异常,则捕获异常并跳过该部分代码。try:return YAML.load(check_yaml(data))["names"]except Exception:pass# 如果 data 未提供,或者在加载过程中发生异常,则返回默认的类别名称字典。# 默认的类别名称字典包含从0到998的整数索引,每个索引对应的值是一个字符串,格式为 "class{i}" 。return {i: f"class{i}" for i in range(999)} # return default if above errors
# 这段代码定义了 default_class_names 函数,其主要功能是根据提供的数据路径加载类别名称,如果加载失败,则返回默认的类别名称。具体步骤包括: 检查是否提供了 data 参数。 如果提供了 data ,尝试从该路径加载 YAML 文件并提取 names 键对应的值。 如果加载或提取过程中发生异常,捕获异常并跳过该部分代码。 如果 data 未提供或加载失败,返回默认的类别名称字典,其中每个类别名称的格式为 "class{i}" 。通过这些步骤, default_class_names 函数能够灵活地处理不同情况下的类别名称加载,确保在任何情况下都能返回有效的类别名称字典。
4.class AutoBackend(nn.Module):
# 这段代码定义了一个名为 AutoBackend 的 PyTorch 模块,用于自动处理不同格式的模型权重文件,并在指定的设备上运行推理。它支持多种模型格式,包括 PyTorch、TorchScript、ONNX、OpenVINO、TensorRT、CoreML、TensorFlow 等,并提供了统一的接口来加载模型、进行推理和后处理。
# 定义了一个继承自 PyTorch 的 nn.Module 的类 AutoBackend ,用于处理不同格式的模型推理。
class AutoBackend(nn.Module):# 使用 Ultralytics YOLO 模型运行推理时,处理动态后端选择。# AutoBackend 类旨在为各种推理引擎提供一个抽象层。它支持多种格式,每种格式都有特定的命名约定,如下所示:# 支持的格式和命名约定:# | 格式 | 文件后缀 |# | --------------------- | ----------------- |# | PyTorch | *.pt |# 属性:# model (torch.nn.Module):已加载的 YOLO 模型。# device (torch.device):加载模型的设备(CPU 或 GPU)。# task (str):模型执行的任务类型(检测、分割、分类、姿态)。# names (dict):模型可检测的类名字典。# stride (int):模型步长,YOLO 模型通常为 32。# fp16 (bool):模型是否使用半精度 (FP16) 推理。# nhwc (bool):模型是否需要 NHWC 输入格式而不是 NCHW。# pt (bool):模型是否为 PyTorch 模型。# jit (bool):模型是否为 TorchScript 模型。# onnx (bool):模型是否为 ONNX 模型。# xml (bool):模型是否为 OpenVINO 模型。# engine (bool):模型是否为 TensorRT 引擎。# coreml (bool):模型是否为 CoreML 模型。# saved_model (bool):模型是否为 TensorFlow SavedModel。# pb (bool):模型是否为 TensorFlow GraphDef。# tflite (bool):模型是否为 TensorFlow Lite 模型。# edgetpu (bool):模型是否为 TensorFlow Edge TPU 模型。# tfjs (bool):模型是否为 TensorFlow.js 模型。# paddle (bool):模型是否为 PaddlePaddle 模型。# mnn (bool):模型是否为 MNN 模型。# ncnn (bool):模型是否为 NCNN 模型。# imx (bool):模型是否为 IMX 模型。# rknn (bool):模型是否为 RKNN 模型。# triton (bool):模型是否为 Triton Inference Server 模型。# 方法:# forward:对输入图像进行推理。# from_numpy:将 numpy 数组转换为张量。# warmup:使用虚拟输入预热模型。# _model_type:根据文件路径确定模型类型。"""Handle dynamic backend selection for running inference using Ultralytics YOLO models.The AutoBackend class is designed to provide an abstraction layer for various inference engines. It supports a widerange of formats, each with specific naming conventions as outlined below:Supported Formats and Naming Conventions:| Format | File Suffix || --------------------- | ----------------- || PyTorch | *.pt || TorchScript | *.torchscript || ONNX Runtime | *.onnx || ONNX OpenCV DNN | *.onnx (dnn=True) || OpenVINO | *openvino_model/ || CoreML | *.mlpackage || TensorRT | *.engine || TensorFlow SavedModel | *_saved_model/ || TensorFlow GraphDef | *.pb || TensorFlow Lite | *.tflite || TensorFlow Edge TPU | *_edgetpu.tflite || PaddlePaddle | *_paddle_model/ || MNN | *.mnn || NCNN | *_ncnn_model/ || IMX | *_imx_model/ || RKNN | *_rknn_model/ |Attributes:model (torch.nn.Module): The loaded YOLO model.device (torch.device): The device (CPU or GPU) on which the model is loaded.task (str): The type of task the model performs (detect, segment, classify, pose).names (dict): A dictionary of class names that the model can detect.stride (int): The model stride, typically 32 for YOLO models.fp16 (bool): Whether the model uses half-precision (FP16) inference.nhwc (bool): Whether the model expects NHWC input format instead of NCHW.pt (bool): Whether the model is a PyTorch model.jit (bool): Whether the model is a TorchScript model.onnx (bool): Whether the model is an ONNX model.xml (bool): Whether the model is an OpenVINO model.engine (bool): Whether the model is a TensorRT engine.coreml (bool): Whether the model is a CoreML model.saved_model (bool): Whether the model is a TensorFlow SavedModel.pb (bool): Whether the model is a TensorFlow GraphDef.tflite (bool): Whether the model is a TensorFlow Lite model.edgetpu (bool): Whether the model is a TensorFlow Edge TPU model.tfjs (bool): Whether the model is a TensorFlow.js model.paddle (bool): Whether the model is a PaddlePaddle model.mnn (bool): Whether the model is an MNN model.ncnn (bool): Whether the model is an NCNN model.imx (bool): Whether the model is an IMX model.rknn (bool): Whether the model is an RKNN model.triton (bool): Whether the model is a Triton Inference Server model.Methods:forward: Run inference on an input image.from_numpy: Convert numpy array to tensor.warmup: Warm up the model with a dummy input._model_type: Determine the model type from file path.Examples:>>> model = AutoBackend(weights="yolo11n.pt", device="cuda")>>> results = model(img)"""# 这段代码定义了一个名为 AutoBackend 的 PyTorch 模块,用于自动处理不同格式的模型权重文件,并在指定的设备上运行推理。它支持多种模型格式,包括 PyTorch、TorchScript、ONNX、OpenVINO、TensorRT、CoreML、TensorFlow 等,并提供了统一的接口来加载模型、进行推理和后处理。# @torch.no_grad() :装饰器,用于禁用梯度计算,适用于推理阶段。@torch.no_grad()# __init__ :类的初始化方法,接收多个参数:# 1.weights :模型权重文件的路径或模块实例,默认为 "yolo11n.pt" 。# 2.device :运行模型的设备,默认为 CPU。# 3.dnn :是否使用 OpenCV DNN 模块进行 ONNX 推理,默认为 False 。# 4.data :包含类别名称的额外 data.yaml 文件路径,默认为 None 。# 5.fp16 :是否启用半精度推理,默认为 False 。# 6.batch :推理时假设的批量大小,默认为 1。# 7.fuse :是否对 Conv2D + BatchNorm 层进行优化融合,默认为 True 。# 8.verbose :是否启用详细日志,默认为 True 。def __init__(self,weights: Union[str, List[str], torch.nn.Module] = "yolo11n.pt",device: torch.device = torch.device("cpu"),dnn: bool = False,data: Optional[Union[str, Path]] = None,fp16: bool = False,batch: int = 1,fuse: bool = True,verbose: bool = True,):# 初始化 AutoBackend 进行推理。"""Initialize the AutoBackend for inference.Args:weights (str | List[str] | torch.nn.Module): Path to the model weights file or a module instance.device (torch.device): Device to run the model on.dnn (bool): Use OpenCV DNN module for ONNX inference.data (str | Path, optional): Path to the additional data.yaml file containing class names.fp16 (bool): Enable half-precision inference. Supported only on specific backends.batch (int): Batch-size to assume for inference.fuse (bool): Fuse Conv2D + BatchNorm layers for optimization.verbose (bool): Enable verbose logging."""# 调用父类 nn.Module 的初始化方法,这是 PyTorch 中模块初始化的标准做法。super().__init__()# 将 weights 参数转换为字符串。如果 weights 是一个列表,则取列表中的第一个元素作为模型权重文件的路径。w = str(weights[0] if isinstance(weights, list) else weights)# 检查 weights 是否是一个 PyTorch 模块实例。如果是, nn_module 将被设置为 True 。nn_module = isinstance(weights, torch.nn.Module)# 调用 _model_type 方法来确定模型的类型。 _model_type 方法返回一个布尔值列表,每个布尔值对应一种模型格式(如 PyTorch、TorchScript、ONNX 等)。这些布尔值将分别赋值给 pt 、 jit 、 onnx 等变量。(pt,jit,onnx,xml,engine,coreml,saved_model,pb,tflite,edgetpu,tfjs,paddle,mnn,ncnn,imx,rknn,triton,) = self._model_type(w)# 如果模型是 PyTorch、TorchScript、ONNX、OpenVINO、TensorRT、PyTorch 模块或 Triton 模型,则启用 FP16 推理。这里使用了位运算符 &= 来确保只有在支持 FP16 的模型类型时才启用 FP16。fp16 &= pt or jit or onnx or xml or engine or nn_module or triton # FP16# 如果模型是 CoreML、TensorFlow SavedModel、TensorFlow GraphDef、TensorFlow Lite、TensorFlow Edge TPU 或 RKNN 模型,则使用 NHWC 格式。否则,使用 PyTorch 的默认格式 BCWH。nhwc = coreml or saved_model or pb or tflite or edgetpu or rknn # BHWC formats (vs torch BCWH)# 设置默认的模型步幅为 32 和通道数为 3。这些值可以根据模型的具体情况进行调整。stride, ch = 32, 3 # default stride and channels# 初始化 end2end 和 dynamic 变量为 False 。这些变量可能在后续代码中用于控制模型的行为。end2end, dynamic = False, False# 初始化 model 、 metadata 和 task 变量为 None 。这些变量将在后续代码中根据模型类型和权重文件进行赋值。model, metadata, task = None, None, None# Set device# 检查 device 是否是一个 torch.device 对象,并且 CUDA 是否可用,同时设备类型不是 CPU。 如果所有条件都满足,则将 cuda 设置为 True ,表示可以使用 CUDA 进行加速。 如果不满足条件,则 cuda 保持为 False ,表示不能使用 CUDA,将使用 CPU 进行计算。cuda = isinstance(device, torch.device) and torch.cuda.is_available() and device.type != "cpu" # use CUDA# 如果 cuda 为 True ,但模型不是 PyTorch 模块、PyTorch 模型、TorchScript 模型、TensorRT 引擎、ONNX 模型或 PaddlePaddle 模型,则将设备设置为 CPU。# 这是因为某些模型格式可能不支持 GPU 加速,或者需要特定的 GPU 加载器格式。# 在这种情况下,将 cuda 重新设置为 False ,确保后续代码不会尝试在 GPU 上运行模型。if cuda and not any([nn_module, pt, jit, engine, onnx, paddle]): # GPU dataloader formatsdevice = torch.device("cpu")cuda = False# Download if not local# 如果模型不是 PyTorch 模型、Triton 模型或 PyTorch 模块,则调用 attempt_download_asset 函数尝试下载模型权重文件。# attempt_download_asset 函数的作用是检查模型权重文件是否存在于本地,如果不存在,则从指定的 URL 或路径下载模型权重文件。# 这确保了即使模型权重文件不在本地,代码也能够自动下载并加载模型。if not (pt or triton or nn_module):w = attempt_download_asset(w)# In-memory PyTorch model# 如果 weights 是一个 PyTorch 模块实例( nn_module 为 True ),则进入此分支。if nn_module:# 如果 fuse 参数为 True ,则对模型进行融合优化。融合优化通常会将多个操作合并为一个更高效的单个操作,例如将卷积层和批量归一化层合并。 融合优化通常在将模型移动到 GPU 之前进行,以减少在 GPU 上的计算量。if fuse:weights = weights.fuse(verbose=verbose) # fuse before move to gpu# 将模型移动到指定的设备(CPU 或 GPU)。这确保了模型的参数和缓冲区都在正确的设备上。model = weights.to(device)# 如果模型有 kpt_shape 属性(通常用于姿态估计任务),则获取该属性值。if hasattr(model, "kpt_shape"):kpt_shape = model.kpt_shape # pose-only# 获取模型的步幅(stride)。步幅是模型在特征图上移动的步长,通常用于计算模型的输出尺寸。 如果模型的步幅小于 32,则将其设置为 32,这是 YOLO 模型的默认步幅。stride = max(int(model.stride.max()), 32) # model stride# 获取模型的类别名称。如果模型是一个包装模块( model.module ),则从包装模块中获取类别名称;否则直接从模型中获取。names = model.module.names if hasattr(model, "module") else model.names # get class names# 如果启用了 FP16(半精度浮点数)推理,则将模型转换为半精度;否则保持为全精度(float32)。model.half() if fp16 else model.float()# 从模型的配置文件(通常是 YAML 格式)中获取输入通道数。如果没有指定,则默认为 3(RGB 图像)。ch = model.yaml.get("channels", 3)# 将模型显式分配给类的 model 属性。这使得后续可以通过 self.model 来调用模型的 to() 、 cpu() 、 cuda() 和 half() 等方法。self.model = model # explicitly assign for to(), cpu(), cuda(), half()# 将 pt 标志设置为 True ,表示当前处理的是一个 PyTorch 模型。pt = True# PyTorch# 如果 weights 是一个 PyTorch 模型文件( pt 为 True ),则进入此分支。elif pt:# 从 ultralytics.nn.tasks 模块中导入 attempt_load_weights 函数。from ultralytics.nn.tasks import attempt_load_weights# 调用 attempt_load_weights 函数加载模型权重。该函数接受以下参数:# weights :模型权重文件的路径或权重列表。# device :目标设备(CPU 或 GPU)。# inplace :是否在原地进行操作。# fuse :是否对模型进行融合优化。# 加载后的模型赋值给 model 变量。model = attempt_load_weights(weights if isinstance(weights, list) else w, device=device, inplace=True, fuse=fuse)# 如果模型有 kpt_shape 属性(通常用于姿态估计任务),则获取该属性值。if hasattr(model, "kpt_shape"):kpt_shape = model.kpt_shape # pose-only# 获取模型的步幅(stride)。步幅是模型在特征图上移动的步长,通常用于计算模型的输出尺寸。 如果模型的步幅小于 32,则将其设置为 32,这是 YOLO 模型的默认步幅。stride = max(int(model.stride.max()), 32) # model stride# 获取模型的类别名称。如果模型是一个包装模块( model.module ),则从包装模块中获取类别名称;否则直接从模型中获取。names = model.module.names if hasattr(model, "module") else model.names # get class names# 如果启用了 FP16(半精度浮点数)推理,则将模型转换为半精度;否则保持为全精度(float32)。model.half() if fp16 else model.float()# 从模型的配置文件(通常是 YAML 格式)中获取输入通道数。如果没有指定,则默认为 3(RGB 图像)。ch = model.yaml.get("channels", 3)# 将模型显式分配给类的 model 属性。这使得后续可以通过 self.model 来调用模型的 to() 、 cpu() 、 cuda() 和 half() 等方法。self.model = model # explicitly assign for to(), cpu(), cuda(), half()# 可忽略---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------↓# TorchScriptelif jit:import torchvision # noqa - https://github.com/ultralytics/ultralytics/pull/19747LOGGER.info(f"Loading {w} for TorchScript inference...")extra_files = {"config.txt": ""} # model metadatamodel = torch.jit.load(w, _extra_files=extra_files, map_location=device)model.half() if fp16 else model.float()if extra_files["config.txt"]: # load metadata dictmetadata = json.loads(extra_files["config.txt"], object_hook=lambda x: dict(x.items()))# ONNX OpenCV DNNelif dnn:LOGGER.info(f"Loading {w} for ONNX OpenCV DNN inference...")check_requirements("opencv-python>=4.5.4")net = cv2.dnn.readNetFromONNX(w)# ONNX Runtime and IMXelif onnx or imx:LOGGER.info(f"Loading {w} for ONNX Runtime inference...")check_requirements(("onnx", "onnxruntime-gpu" if cuda else "onnxruntime"))import onnxruntimeproviders = ["CPUExecutionProvider"]if cuda:if "CUDAExecutionProvider" in onnxruntime.get_available_providers():providers.insert(0, "CUDAExecutionProvider")else: # Only log warning if CUDA was requested but unavailableLOGGER.warning("Failed to start ONNX Runtime with CUDA. Using CPU...")device = torch.device("cpu")cuda = FalseLOGGER.info(f"Using ONNX Runtime {providers[0]}")if onnx:session = onnxruntime.InferenceSession(w, providers=providers)else:check_requirements(["model-compression-toolkit>=2.3.0", "sony-custom-layers[torch]>=0.3.0", "onnxruntime-extensions"])w = next(Path(w).glob("*.onnx"))LOGGER.info(f"Loading {w} for ONNX IMX inference...")import mct_quantizers as mctqfrom sony_custom_layers.pytorch.nms import nms_ort # noqasession_options = mctq.get_ort_session_options()session_options.enable_mem_reuse = False # fix the shape mismatch from onnxruntimesession = onnxruntime.InferenceSession(w, session_options, providers=["CPUExecutionProvider"])task = "detect"output_names = [x.name for x in session.get_outputs()]metadata = session.get_modelmeta().custom_metadata_mapdynamic = isinstance(session.get_outputs()[0].shape[0], str)fp16 = "float16" in session.get_inputs()[0].typeif not dynamic:io = session.io_binding()bindings = []for output in session.get_outputs():out_fp16 = "float16" in output.typey_tensor = torch.empty(output.shape, dtype=torch.float16 if out_fp16 else torch.float32).to(device)io.bind_output(name=output.name,device_type=device.type,device_id=device.index if cuda else 0,element_type=np.float16 if out_fp16 else np.float32,shape=tuple(y_tensor.shape),buffer_ptr=y_tensor.data_ptr(),)bindings.append(y_tensor)# OpenVINOelif xml:LOGGER.info(f"Loading {w} for OpenVINO inference...")check_requirements("openvino>=2024.0.0")import openvino as ovcore = ov.Core()device_name = "AUTO"if isinstance(device, str) and device.startswith("intel"):device_name = device.split(":")[1].upper() # Intel OpenVINO devicedevice = torch.device("cpu")if device_name not in core.available_devices:LOGGER.warning(f"OpenVINO device '{device_name}' not available. Using 'AUTO' instead.")device_name = "AUTO"w = Path(w)if not w.is_file(): # if not *.xmlw = next(w.glob("*.xml")) # get *.xml file from *_openvino_model dirov_model = core.read_model(model=str(w), weights=w.with_suffix(".bin"))if ov_model.get_parameters()[0].get_layout().empty:ov_model.get_parameters()[0].set_layout(ov.Layout("NCHW"))# OpenVINO inference modes are 'LATENCY', 'THROUGHPUT' (not recommended), or 'CUMULATIVE_THROUGHPUT'inference_mode = "CUMULATIVE_THROUGHPUT" if batch > 1 else "LATENCY"LOGGER.info(f"Using OpenVINO {inference_mode} mode for batch={batch} inference...")ov_compiled_model = core.compile_model(ov_model,device_name=device_name,config={"PERFORMANCE_HINT": inference_mode},)input_name = ov_compiled_model.input().get_any_name()metadata = w.parent / "metadata.yaml"# TensorRTelif engine:LOGGER.info(f"Loading {w} for TensorRT inference...")if IS_JETSON and check_version(PYTHON_VERSION, "<=3.8.10"):# fix error: `np.bool` was a deprecated alias for the builtin `bool` for JetPack 4 and JetPack 5 with Python <= 3.8.10check_requirements("numpy==1.23.5")try: # https://developer.nvidia.com/nvidia-tensorrt-downloadimport tensorrt as trt # noqaexcept ImportError:if LINUX:check_requirements("tensorrt>7.0.0,!=10.1.0")import tensorrt as trt # noqacheck_version(trt.__version__, ">=7.0.0", hard=True)check_version(trt.__version__, "!=10.1.0", msg="https://github.com/ultralytics/ultralytics/pull/14239")if device.type == "cpu":device = torch.device("cuda:0")Binding = namedtuple("Binding", ("name", "dtype", "shape", "data", "ptr"))logger = trt.Logger(trt.Logger.INFO)# Read filewith open(w, "rb") as f, trt.Runtime(logger) as runtime:try:meta_len = int.from_bytes(f.read(4), byteorder="little") # read metadata lengthmetadata = json.loads(f.read(meta_len).decode("utf-8")) # read metadatadla = metadata.get("dla", None)if dla is not None:runtime.DLA_core = int(dla)except UnicodeDecodeError:f.seek(0) # engine file may lack embedded Ultralytics metadatamodel = runtime.deserialize_cuda_engine(f.read()) # read engine# Model contexttry:context = model.create_execution_context()except Exception as e: # model is NoneLOGGER.error(f"TensorRT model exported with a different version than {trt.__version__}\n")raise ebindings = OrderedDict()output_names = []fp16 = False # default updated belowdynamic = Falseis_trt10 = not hasattr(model, "num_bindings")num = range(model.num_io_tensors) if is_trt10 else range(model.num_bindings)for i in num:if is_trt10:name = model.get_tensor_name(i)dtype = trt.nptype(model.get_tensor_dtype(name))is_input = model.get_tensor_mode(name) == trt.TensorIOMode.INPUTif is_input:if -1 in tuple(model.get_tensor_shape(name)):dynamic = Truecontext.set_input_shape(name, tuple(model.get_tensor_profile_shape(name, 0)[1]))if dtype == np.float16:fp16 = Trueelse:output_names.append(name)shape = tuple(context.get_tensor_shape(name))else: # TensorRT < 10.0name = model.get_binding_name(i)dtype = trt.nptype(model.get_binding_dtype(i))is_input = model.binding_is_input(i)if model.binding_is_input(i):if -1 in tuple(model.get_binding_shape(i)): # dynamicdynamic = Truecontext.set_binding_shape(i, tuple(model.get_profile_shape(0, i)[1]))if dtype == np.float16:fp16 = Trueelse:output_names.append(name)shape = tuple(context.get_binding_shape(i))im = torch.from_numpy(np.empty(shape, dtype=dtype)).to(device)bindings[name] = Binding(name, dtype, shape, im, int(im.data_ptr()))binding_addrs = OrderedDict((n, d.ptr) for n, d in bindings.items())batch_size = bindings["images"].shape[0] # if dynamic, this is instead max batch size# CoreMLelif coreml:LOGGER.info(f"Loading {w} for CoreML inference...")import coremltools as ctmodel = ct.models.MLModel(w)metadata = dict(model.user_defined_metadata)# TF SavedModelelif saved_model:LOGGER.info(f"Loading {w} for TensorFlow SavedModel inference...")import tensorflow as tfkeras = False # assume TF1 saved_modelmodel = tf.keras.models.load_model(w) if keras else tf.saved_model.load(w)metadata = Path(w) / "metadata.yaml"# TF GraphDefelif pb: # https://www.tensorflow.org/guide/migrate#a_graphpb_or_graphpbtxtLOGGER.info(f"Loading {w} for TensorFlow GraphDef inference...")import tensorflow as tffrom ultralytics.engine.exporter import gd_outputsdef wrap_frozen_graph(gd, inputs, outputs):"""Wrap frozen graphs for deployment."""x = tf.compat.v1.wrap_function(lambda: tf.compat.v1.import_graph_def(gd, name=""), []) # wrappedge = x.graph.as_graph_elementreturn x.prune(tf.nest.map_structure(ge, inputs), tf.nest.map_structure(ge, outputs))gd = tf.Graph().as_graph_def() # TF GraphDefwith open(w, "rb") as f:gd.ParseFromString(f.read())frozen_func = wrap_frozen_graph(gd, inputs="x:0", outputs=gd_outputs(gd))try: # find metadata in SavedModel alongside GraphDefmetadata = next(Path(w).resolve().parent.rglob(f"{Path(w).stem}_saved_model*/metadata.yaml"))except StopIteration:pass# TFLite or TFLite Edge TPUelif tflite or edgetpu: # https://ai.google.dev/edge/litert/microcontrollers/pythontry: # https://coral.ai/docs/edgetpu/tflite-python/#update-existing-tf-lite-code-for-the-edge-tpufrom tflite_runtime.interpreter import Interpreter, load_delegateexcept ImportError:import tensorflow as tfInterpreter, load_delegate = tf.lite.Interpreter, tf.lite.experimental.load_delegateif edgetpu: # TF Edge TPU https://coral.ai/software/#edgetpu-runtimedevice = device[3:] if str(device).startswith("tpu") else ":0"LOGGER.info(f"Loading {w} on device {device[1:]} for TensorFlow Lite Edge TPU inference...")delegate = {"Linux": "libedgetpu.so.1", "Darwin": "libedgetpu.1.dylib", "Windows": "edgetpu.dll"}[platform.system()]interpreter = Interpreter(model_path=w,experimental_delegates=[load_delegate(delegate, options={"device": device})],)device = "cpu" # Required, otherwise PyTorch will try to use the wrong deviceelse: # TFLiteLOGGER.info(f"Loading {w} for TensorFlow Lite inference...")interpreter = Interpreter(model_path=w) # load TFLite modelinterpreter.allocate_tensors() # allocateinput_details = interpreter.get_input_details() # inputsoutput_details = interpreter.get_output_details() # outputs# Load metadatatry:with zipfile.ZipFile(w, "r") as zf:name = zf.namelist()[0]contents = zf.read(name).decode("utf-8")if name == "metadata.json": # Custom Ultralytics metadata dict for Python>=3.12metadata = json.loads(contents)else:metadata = ast.literal_eval(contents) # Default tflite-support metadata for Python<=3.11except (zipfile.BadZipFile, SyntaxError, ValueError, json.JSONDecodeError):pass# TF.jselif tfjs:raise NotImplementedError("YOLOv8 TF.js inference is not currently supported.")# PaddlePaddleelif paddle:LOGGER.info(f"Loading {w} for PaddlePaddle inference...")check_requirements("paddlepaddle-gpu" if cuda else "paddlepaddle>=3.0.0")import paddle.inference as pdi # noqaw = Path(w)model_file, params_file = None, Noneif w.is_dir():model_file = next(w.rglob("*.json"), None)params_file = next(w.rglob("*.pdiparams"), None)elif w.suffix == ".pdiparams":model_file = w.with_name("model.json")params_file = wif not (model_file and params_file and model_file.is_file() and params_file.is_file()):raise FileNotFoundError(f"Paddle model not found in {w}. Both .json and .pdiparams files are required.")config = pdi.Config(str(model_file), str(params_file))if cuda:config.enable_use_gpu(memory_pool_init_size_mb=2048, device_id=0)predictor = pdi.create_predictor(config)input_handle = predictor.get_input_handle(predictor.get_input_names()[0])output_names = predictor.get_output_names()metadata = w / "metadata.yaml"# MNNelif mnn:LOGGER.info(f"Loading {w} for MNN inference...")check_requirements("MNN") # requires MNNimport osimport MNNconfig = {"precision": "low", "backend": "CPU", "numThread": (os.cpu_count() + 1) // 2}rt = MNN.nn.create_runtime_manager((config,))net = MNN.nn.load_module_from_file(w, [], [], runtime_manager=rt, rearrange=True)def torch_to_mnn(x):return MNN.expr.const(x.data_ptr(), x.shape)metadata = json.loads(net.get_info()["bizCode"])# NCNNelif ncnn:LOGGER.info(f"Loading {w} for NCNN inference...")check_requirements("git+https://github.com/Tencent/ncnn.git" if ARM64 else "ncnn") # requires NCNNimport ncnn as pyncnnnet = pyncnn.Net()net.opt.use_vulkan_compute = cudaw = Path(w)if not w.is_file(): # if not *.paramw = next(w.glob("*.param")) # get *.param file from *_ncnn_model dirnet.load_param(str(w))net.load_model(str(w.with_suffix(".bin")))metadata = w.parent / "metadata.yaml"# NVIDIA Triton Inference Serverelif triton:check_requirements("tritonclient[all]")from ultralytics.utils.triton import TritonRemoteModelmodel = TritonRemoteModel(w)metadata = model.metadata# RKNNelif rknn:if not is_rockchip():raise OSError("RKNN inference is only supported on Rockchip devices.")LOGGER.info(f"Loading {w} for RKNN inference...")check_requirements("rknn-toolkit-lite2")from rknnlite.api import RKNNLitew = Path(w)if not w.is_file(): # if not *.rknnw = next(w.rglob("*.rknn")) # get *.rknn file from *_rknn_model dirrknn_model = RKNNLite()rknn_model.load_rknn(str(w))rknn_model.init_runtime()metadata = w.parent / "metadata.yaml"
# 可忽略---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------↑# Any other format (unsupported)# 如果前面的代码块(处理各种支持的模型格式)都没有匹配到当前的模型格式,则进入此分支。else:# 从 ultralytics.engine.exporter 模块中导入 export_formats 函数。这个函数可能返回一个包含支持的模型格式的字典或列表。from ultralytics.engine.exporter import export_formats# 构造一个错误信息,提示用户当前的模型文件 w 不是受支持的格式。# 使用 export_formats()['Format'] 获取 Ultralytics 支持的所有模型格式,并将它们包含在错误信息中。# 提供一个链接到 Ultralytics 的文档页面,用户可以访问该页面获取更多帮助。raise TypeError(f"model='{w}' is not a supported model format. Ultralytics supports: {export_formats()['Format']}\n" # model='{w}' 不是受支持的模型格式。Ultralytics 支持:{export_formats()['Format']}f"See https://docs.ultralytics.com/modes/predict for help." # 请参阅https://docs.ultralytics.com/modes/predict以获取帮助。)# Load external metadata YAML# 检查 metadata 是否是一个字符串或 Path 对象,并且对应的文件是否存在。if isinstance(metadata, (str, Path)) and Path(metadata).exists():# 如果文件存在,则使用 YAML.load 函数加载 YAML 文件内容,并将其解析为一个 Python 字典。metadata = YAML.load(metadata)# 如果 metadata 是一个非空字典,则遍历字典中的每个键值对。# 对于特定的键( stride 、 batch 、 channels ),将对应的值转换为整数。# 对于其他特定的键( imgsz 、 names 、 kpt_shape 、 args ),如果值是字符串,则使用 eval 函数将其解析为 Python 对象。这通常用于处理存储为字符串的列表或其他复杂数据结构。if metadata and isinstance(metadata, dict):for k, v in metadata.items():if k in {"stride", "batch", "channels"}:metadata[k] = int(v)elif k in {"imgsz", "names", "kpt_shape", "args"} and isinstance(v, str):metadata[k] = eval(v)# 从元数据字典中提取特定的配置信息,并更新模型的相关属性。# stride :模型的步幅。# task :模型的任务类型(如检测、分割等)。# batch :批量大小。# imgsz :输入图像的尺寸。# names :类别名称。# kpt_shape :关键点的形状(如果存在)。# end2end :是否启用端到端推理(如 NMS)。# dynamic :是否启用动态推理。# ch :输入通道数,默认为 3(RGB 图像)。stride = metadata["stride"]task = metadata["task"]batch = metadata["batch"]imgsz = metadata["imgsz"]names = metadata["names"]kpt_shape = metadata.get("kpt_shape")end2end = metadata.get("args", {}).get("nms", False)dynamic = metadata.get("args", {}).get("dynamic", dynamic)ch = metadata.get("channels", 3)# 如果元数据文件不存在,并且当前处理的模型不是 PyTorch 模型、Triton 模型或 PyTorch 模块,则记录一个警告信息,提示用户未找到元数据。elif not (pt or triton or nn_module):LOGGER.warning(f"Metadata not found for 'model={weights}'") # 未找到“model={weights}”的元数据。# Check names# 检查 names 是否在局部变量中定义。如果未定义(即 names 缺失),则调用 default_class_names(data) 函数来获取默认的类别名称。 default_class_names(data) 函数可能根据提供的数据集信息( data )返回默认的类别名称列表。if "names" not in locals(): # names missingnames = default_class_names(data)# 调用 check_class_names(names) 函数来验证和标准化类别名称。这个函数可能确保类别名称是有效的,并且格式正确。names = check_class_names(names)# Disable gradients# 如果当前处理的是 PyTorch 模型( pt 为 True ),则遍历模型的所有参数,并将每个参数的 requires_grad 属性设置为 False 。 这一步是为了禁用梯度计算,因为在推理阶段不需要计算梯度,这可以节省计算资源并提高推理速度。if pt:for p in model.parameters():p.requires_grad = False# 使用 self.__dict__.update(locals()) 将当前函数中的所有局部变量更新到类的属性中。 这样做的目的是将所有在初始化过程中定义的变量(如 model 、 names 、 stride 等)作为类的属性存储起来,以便后续可以方便地访问这些变量。self.__dict__.update(locals()) # assign all variables to self# AutoBackend 类的 __init__ 方法通过一系列复杂的逻辑,确保了模型能够在多种设备和格式下高效运行。它不仅提供了对多种模型格式的支持,还通过加载和解析元数据、验证类别名称、禁用梯度计算等操作,确保了模型的配置和运行环境的正确性。这使得 AutoBackend 类能够作为一个通用的后端接口,适用于各种推理任务,无论是图像、视频还是实时流。# 这段代码定义了 AutoBackend 类的 forward 方法,用于在不同类型的后端(如 PyTorch、TorchScript、ONNX、TensorRT 等)上运行模型推理。该方法根据模型的类型和配置,将输入图像张量传递给相应的推理引擎,并返回模型的原始输出张量。# 定义了 forward 方法,接收以下参数:# 1.im :输入图像张量。# 2.augment :是否在推理时执行数据增强,默认为 False 。# 3.visualize :是否可视化输出预测,默认为 False 。# 4.embed :可选的特征向量/嵌入列表,用于返回,默认为 None 。# 5.**kwargs :模型配置的额外关键字参数。# 返回值为模型的原始输出张量或张量列表。def forward(self,im: torch.Tensor,augment: bool = False,visualize: bool = False,embed: Optional[List] = None,**kwargs: Any,) -> Union[torch.Tensor, List[torch.Tensor]]:"""Run inference on an AutoBackend model.Args:im (torch.Tensor): The image tensor to perform inference on.augment (bool): Whether to perform data augmentation during inference.visualize (bool): Whether to visualize the output predictions.embed (list, optional): A list of feature vectors/embeddings to return.**kwargs (Any): Additional keyword arguments for model configuration.Returns:(torch.Tensor | List[torch.Tensor]): The raw output tensor(s) from the model."""# 获取输入图像张量 im 的形状,分别表示批量大小( b )、通道数( ch )、高度( h )和宽度( w )。b, ch, h, w = im.shape # batch, channel, height, width# 检查是否启用了半精度浮点数(FP16)推理( self.fp16 为 True )。 如果启用了 FP16 推理且输入张量的数据类型不是 torch.float16 ,则将输入张量转换为半精度浮点数。if self.fp16 and im.dtype != torch.float16:im = im.half() # to FP16# 检查是否需要将输入张量从 PyTorch 的 BCHW 格式转换为 NHWC 格式( self.nhwc 为 True )。 如果需要转换,则使用 permute 方法将张量的维度重新排列为 NHWC 格式。这通常用于与某些推理引擎(如 OpenCV DNN 或 TensorFlow Lite)兼容。if self.nhwc:im = im.permute(0, 2, 3, 1) # torch BCHW to numpy BHWC shape(1,320,192,3)# PyTorch# 检查当前模型是否是 PyTorch 模型或 PyTorch 模块( self.pt 或 self.nn_module 为 True )。# 如果是,则调用模型的推理方法,并将输入张量 im 以及额外的参数(如 augment 、 visualize 、 embed 和其他关键字参数 **kwargs )传递给模型。# 模型的输出结果存储在变量 y 中。if self.pt or self.nn_module:y = self.model(im, augment=augment, visualize=visualize, embed=embed, **kwargs)# 可忽略---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------↓# TorchScriptelif self.jit:y = self.model(im)# ONNX OpenCV DNNelif self.dnn:im = im.cpu().numpy() # torch to numpyself.net.setInput(im)y = self.net.forward()# ONNX Runtimeelif self.onnx or self.imx:if self.dynamic:im = im.cpu().numpy() # torch to numpyy = self.session.run(self.output_names, {self.session.get_inputs()[0].name: im})else:if not self.cuda:im = im.cpu()self.io.bind_input(name="images",device_type=im.device.type,device_id=im.device.index if im.device.type == "cuda" else 0,element_type=np.float16 if self.fp16 else np.float32,shape=tuple(im.shape),buffer_ptr=im.data_ptr(),)self.session.run_with_iobinding(self.io)y = self.bindingsif self.imx:# boxes, conf, clsy = np.concatenate([y[0], y[1][:, :, None], y[2][:, :, None]], axis=-1)# OpenVINOelif self.xml:im = im.cpu().numpy() # FP32if self.inference_mode in {"THROUGHPUT", "CUMULATIVE_THROUGHPUT"}: # optimized for larger batch-sizesn = im.shape[0] # number of images in batchresults = [None] * n # preallocate list with None to match the number of imagesdef callback(request, userdata):"""Place result in preallocated list using userdata index."""results[userdata] = request.results# Create AsyncInferQueue, set the callback and start asynchronous inference for each input imageasync_queue = self.ov.AsyncInferQueue(self.ov_compiled_model)async_queue.set_callback(callback)for i in range(n):# Start async inference with userdata=i to specify the position in results listasync_queue.start_async(inputs={self.input_name: im[i : i + 1]}, userdata=i) # keep image as BCHWasync_queue.wait_all() # wait for all inference requests to completey = np.concatenate([list(r.values())[0] for r in results])else: # inference_mode = "LATENCY", optimized for fastest first result at batch-size 1y = list(self.ov_compiled_model(im).values())# TensorRTelif self.engine:if self.dynamic and im.shape != self.bindings["images"].shape:if self.is_trt10:self.context.set_input_shape("images", im.shape)self.bindings["images"] = self.bindings["images"]._replace(shape=im.shape)for name in self.output_names:self.bindings[name].data.resize_(tuple(self.context.get_tensor_shape(name)))else:i = self.model.get_binding_index("images")self.context.set_binding_shape(i, im.shape)self.bindings["images"] = self.bindings["images"]._replace(shape=im.shape)for name in self.output_names:i = self.model.get_binding_index(name)self.bindings[name].data.resize_(tuple(self.context.get_binding_shape(i)))s = self.bindings["images"].shapeassert im.shape == s, f"input size {im.shape} {'>' if self.dynamic else 'not equal to'} max model size {s}"self.binding_addrs["images"] = int(im.data_ptr())self.context.execute_v2(list(self.binding_addrs.values()))y = [self.bindings[x].data for x in sorted(self.output_names)]# CoreMLelif self.coreml:im = im[0].cpu().numpy()im_pil = Image.fromarray((im * 255).astype("uint8"))# im = im.resize((192, 320), Image.BILINEAR)y = self.model.predict({"image": im_pil}) # coordinates are xywh normalizedif "confidence" in y:raise TypeError("Ultralytics only supports inference of non-pipelined CoreML models exported with "f"'nms=False', but 'model={w}' has an NMS pipeline created by an 'nms=True' export.")# TODO: CoreML NMS inference handling# from ultralytics.utils.ops import xywh2xyxy# box = xywh2xyxy(y['coordinates'] * [[w, h, w, h]]) # xyxy pixels# conf, cls = y['confidence'].max(1), y['confidence'].argmax(1).astype(np.float32)# y = np.concatenate((box, conf.reshape(-1, 1), cls.reshape(-1, 1)), 1)y = list(y.values())if len(y) == 2 and len(y[1].shape) != 4: # segmentation modely = list(reversed(y)) # reversed for segmentation models (pred, proto)# PaddlePaddleelif self.paddle:im = im.cpu().numpy().astype(np.float32)self.input_handle.copy_from_cpu(im)self.predictor.run()y = [self.predictor.get_output_handle(x).copy_to_cpu() for x in self.output_names]# MNNelif self.mnn:input_var = self.torch_to_mnn(im)output_var = self.net.onForward([input_var])y = [x.read() for x in output_var]# NCNNelif self.ncnn:mat_in = self.pyncnn.Mat(im[0].cpu().numpy())with self.net.create_extractor() as ex:ex.input(self.net.input_names()[0], mat_in)# WARNING: 'output_names' sorted as a temporary fix for https://github.com/pnnx/pnnx/issues/130y = [np.array(ex.extract(x)[1])[None] for x in sorted(self.net.output_names())]# NVIDIA Triton Inference Serverelif self.triton:im = im.cpu().numpy() # torch to numpyy = self.model(im)# RKNNelif self.rknn:im = (im.cpu().numpy() * 255).astype("uint8")im = im if isinstance(im, (list, tuple)) else [im]y = self.rknn_model.inference(inputs=im)# TensorFlow (SavedModel, GraphDef, Lite, Edge TPU)else:im = im.cpu().numpy()if self.saved_model: # SavedModely = self.model(im, training=False) if self.keras else self.model.serving_default(im)if not isinstance(y, list):y = [y]elif self.pb: # GraphDefy = self.frozen_func(x=self.tf.constant(im))else: # Lite or Edge TPUdetails = self.input_details[0]is_int = details["dtype"] in {np.int8, np.int16} # is TFLite quantized int8 or int16 modelif is_int:scale, zero_point = details["quantization"]im = (im / scale + zero_point).astype(details["dtype"]) # de-scaleself.interpreter.set_tensor(details["index"], im)self.interpreter.invoke()y = []for output in self.output_details:x = self.interpreter.get_tensor(output["index"])if is_int:scale, zero_point = output["quantization"]x = (x.astype(np.float32) - zero_point) * scale # re-scaleif x.ndim == 3: # if task is not classification, excluding masks (ndim=4) as well# Denormalize xywh by image size. See https://github.com/ultralytics/ultralytics/pull/1695# xywh are normalized in TFLite/EdgeTPU to mitigate quantization error of integer modelsif x.shape[-1] == 6 or self.end2end: # end-to-end modelx[:, :, [0, 2]] *= wx[:, :, [1, 3]] *= hif self.task == "pose":x[:, :, 6::3] *= wx[:, :, 7::3] *= helse:x[:, [0, 2]] *= wx[:, [1, 3]] *= hif self.task == "pose":x[:, 5::3] *= wx[:, 6::3] *= hy.append(x)# TF segment fixes: export is reversed vs ONNX export and protos are transposedif len(y) == 2: # segment with (det, proto) output order reversedif len(y[1].shape) != 4:y = list(reversed(y)) # should be y = (1, 116, 8400), (1, 160, 160, 32)if y[1].shape[-1] == 6: # end-to-end modely = [y[1]]else:y[1] = np.transpose(y[1], (0, 3, 1, 2)) # should be y = (1, 116, 8400), (1, 32, 160, 160)y = [x if isinstance(x, np.ndarray) else x.numpy() for x in y]
# 可忽略---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------↑# 这部分代码被注释掉了,但它的作用是调试输出,用于检查模型输出 y 的每个元素的类型和形状。 如果 y 是一个列表或元组,则打印每个元素的类型和长度;否则,打印元素的类型和形状。# for x in y:# print(type(x), len(x)) if isinstance(x, (list, tuple)) else print(type(x), x.shape) # debug shapes# 检查模型的输出 y 是否是一个列表或元组。if isinstance(y, (list, tuple)):# 如果类别名称列表 self.names 的长度为 999(表示未定义),并且任务是分割( segment )或输出长度为 2,则动态生成类别名称。 计算类别数量 nc ,并生成一个字典,其中键是类别索引,值是类别名称(如 class0 、 class1 等)。if len(self.names) == 999 and (self.task == "segment" or len(y) == 2): # segments and names not definednc = y[0].shape[1] - y[1].shape[1] - 4 # y = (1, 32, 160, 160), (1, 116, 8400)self.names = {i: f"class{i}" for i in range(nc)}# 如果输出 y 是一个列表或元组:# 如果 y 的长度为 1,则将第一个元素从 NumPy 数组转换为 PyTorch 张量并返回。# 如果 y 的长度大于 1,则将每个元素从 NumPy 数组转换为 PyTorch 张量,并返回一个张量列表。return self.from_numpy(y[0]) if len(y) == 1 else [self.from_numpy(x) for x in y]# 如果输出 y 不是一个列表或元组,则直接将 y 从 NumPy 数组转换为 PyTorch 张量并返回。else:return self.from_numpy(y)# AutoBackend 类的 forward 方法是一个多功能的推理接口,能够根据不同的后端(如 PyTorch、TorchScript、ONNX、TensorRT 等)自动选择合适的推理引擎,并处理输入图像张量的预处理和输出结果的后处理。该方法首先对输入张量进行必要的格式转换(如 FP16 转换和 NHWC 格式调整),然后根据模型的类型调用相应的推理方法。对于支持动态形状的模型,它还能够根据输入张量的形状动态调整推理配置。在推理完成后, forward 方法会将输出结果从 NumPy 数组转换为 PyTorch 张量,并在必要时动态生成类别名称,最终返回模型的原始输出张量或张量列表。这一过程确保了模型能够在多种设备和格式下高效运行,同时保持了代码的简洁性和通用性。# 这段代码定义了 AutoBackend 类的 from_numpy 方法,其作用是将 NumPy 数组转换为 PyTorch 张量,并将其移动到指定的设备上(例如 CPU 或 GPU)。# 定义了 from_numpy 方法,接收一个 NumPy 数组 1.x 作为输入。 返回值是一个 PyTorch 张量。def from_numpy(self, x: np.ndarray) -> torch.Tensor:# 将 NumPy 数组转换为张量。"""Convert a numpy array to a tensor.Args:x (np.ndarray): The array to be converted.Returns:(torch.Tensor): The converted tensor"""# 检查输入 x 是否是一个 NumPy 数组( isinstance(x, np.ndarray) )。# 如果是 NumPy 数组,则使用 torch.tensor(x) 将其转换为 PyTorch 张量,并使用 .to(self.device) 将张量移动到指定的设备( self.device )。# 如果输入 x 不是 NumPy 数组,则直接返回 x ,不做任何转换。return torch.tensor(x).to(self.device) if isinstance(x, np.ndarray) else x# from_numpy 方法的主要作用是: 检查输入是否为 NumPy 数组。 如果是 NumPy 数组,则将其转换为 PyTorch 张量,并移动到指定的设备。 如果输入不是 NumPy 数组,则直接返回原输入。这种方法确保了在处理模型输出时,可以灵活地将 NumPy 数组转换为 PyTorch 张量,同时保持了代码的简洁性和通用性。# 这段代码定义了 AutoBackend 类的 warmup 方法,用于对模型进行预热(warmup)。预热的目的是通过运行一次或多次前向传播来初始化模型,确保模型在后续的推理中能够高效运行。# 定义了 warmup 方法,接收一个可选的参数 1.imgsz ,表示输入图像的形状,默认为 (1, 3, 640, 640) ,即批量大小为 1、通道数为 3、高度和宽度均为 640 的图像。 返回值为 None ,表示该方法不返回任何值。def warmup(self, imgsz: Tuple[int, int, int, int] = (1, 3, 640, 640)) -> None:# 通过使用虚拟输入运行一次前向传播来预热模型。"""Warm up the model by running one forward pass with a dummy input.Args:imgsz (tuple): The shape of the dummy input tensor in the format (batch_size, channels, height, width)"""# 导入 torchvision 模块。这里使用了 # noqa 注释,以避免在代码检查时报告未使用的导入。这种做法通常用于确保某些导入不会影响代码的执行时间,特别是在性能敏感的场景中。import torchvision # noqa (import here so torchvision import time not recorded in postprocess time)# 定义了一个元组 warmup_types ,包含所有支持预热的模型类型标志(如 PyTorch、TorchScript、ONNX、TensorRT 等)。warmup_types = self.pt, self.jit, self.onnx, self.engine, self.saved_model, self.pb, self.triton, self.nn_module# 检查是否有任何支持预热的模型类型,并且设备类型不是 CPU 或者是 Triton 模型。如果满足条件,则进行预热。if any(warmup_types) and (self.device.type != "cpu" or self.triton):# 创建一个空的张量 im ,其形状为 imgsz ,数据类型为半精度浮点数(如果启用了 FP16 推理)或全精度浮点数。 将张量移动到指定的设备( self.device )。im = torch.empty(*imgsz, dtype=torch.half if self.fp16 else torch.float, device=self.device) # input# 如果模型是 TorchScript 模型( self.jit 为 True ),则执行两次前向传播;否则执行一次。# 调用 self.forward(im) 方法进行预热,确保模型在后续的推理中能够高效运行。for _ in range(2 if self.jit else 1):self.forward(im) # warmup# warmup 方法的主要作用是: 检查模型是否支持预热,并且设备类型不是 CPU 或者是 Triton 模型。 创建一个空的输入张量,其形状和数据类型与模型的输入要求一致。 通过调用 forward 方法执行一次或多次前向传播,初始化模型并确保其在后续推理中的高效运行。通过预热,可以减少模型在首次推理时的延迟,提高整体的推理性能。# 这段代码定义了 AutoBackend 类的静态方法 _model_type ,用于根据模型文件的路径或名称来确定模型的类型。该方法返回一个布尔值列表,每个布尔值对应一种支持的模型格式。@staticmethod# 定义了一个静态方法 _model_type ,接收一个字符串参数 1.p ,表示模型文件的路径或名称,默认值为 "path/to/model.pt" 。# 返回值是一个布尔值列表,每个布尔值表示模型是否属于某种特定格式。def _model_type(p: str = "path/to/model.pt") -> List[bool]:# 获取模型文件的路径并返回模型类型。"""Take a path to a model file and return the model type.Args:p (str): Path to the model file.Returns:(List[bool]): List of booleans indicating the model type.Examples:>>> model = AutoBackend(weights="path/to/model.onnx")>>> model_type = model._model_type() # returns "onnx""""# 从 ultralytics.engine.exporter 模块中导入 export_formats 函数,该函数返回一个包含支持的导出格式的字典。from ultralytics.engine.exporter import export_formats# 从 export_formats 返回的字典中提取支持的导出格式后缀( Suffix ),并将其存储在变量 sf 中。sf = export_formats()["Suffix"] # export suffixes# 检查 p 是否是一个有效的路径或 URL。如果 p 不是 URL 且不是字符串,则调用 check_suffix 函数进行检查。if not is_url(p) and not isinstance(p, str):check_suffix(p, sf) # checks# 使用 Path 对象从路径 p 中提取文件名,并将其存储在变量 name 中。name = Path(p).name# 创建一个布尔值列表 types ,通过检查文件名 name 是否包含每个支持的格式后缀 s 来确定模型的类型。types = [s in name for s in sf]# 特别处理 CoreML 格式,如果文件名以 .mlmodel 结尾,则将 types[5] 设置为 True ,以支持旧版本的 CoreML 格式。types[5] |= name.endswith(".mlmodel") # retain support for older Apple CoreML *.mlmodel formats# 如果模型是 TensorFlow Lite 格式( types[8] ),但不是 TensorFlow Edge TPU 格式( types[9] ),则保持 types[8] 为 True ,否则将其设置为 False 。types[8] &= not types[9] # tflite &= not edgetpu# 如果 types 中有任何布尔值为 True ,则认为模型是支持的格式之一,将 triton 设置为 False 。if any(types):triton = False# 如果 types 中所有布尔值都为 False ,则检查 p 是否是一个有效的 URL,并且其协议是 http 或 grpc 。如果是,则认为模型是 Triton 模型,将 triton 设置为 True 。else:from urllib.parse import urlspliturl = urlsplit(p)triton = bool(url.netloc) and bool(url.path) and url.scheme in {"http", "grpc"}# 返回布尔值列表 types ,并在末尾追加 triton 的值,形成最终的模型类型列表。return types + [triton]# _model_type 方法的主要作用是: 根据模型文件的路径或名称,检测模型的类型。 返回一个布尔值列表,每个布尔值表示模型是否属于某种特定格式。 特别处理 CoreML 和 TensorFlow Lite 格式,并支持 Triton 模型的检测。通过这种方法, AutoBackend 类能够自动识别模型的格式,并根据格式选择合适的加载和推理方法。
# AutoBackend 类是一个多功能的 PyTorch 模块,旨在为多种不同格式的深度学习模型提供统一的加载、推理和管理接口。它支持包括 PyTorch、TorchScript、ONNX、TensorRT、CoreML、TensorFlow Lite、Edge TPU 等在内的多种模型格式,并能够根据模型类型自动选择合适的后端进行高效推理。通过动态检测模型格式、加载模型权重、处理输入数据以及执行推理, AutoBackend 类确保了模型能够在多种设备(如 CPU、GPU)上无缝运行,同时提供了灵活的配置选项以适应不同的推理需求。此外,它还提供了预热(warmup)功能以优化首次推理的性能,以及从 NumPy 数组到 PyTorch 张量的转换功能,进一步增强了其通用性和易用性。