【python】基于pycharm的海康相机SDK二次开发

海康威视二次开发相机管理

在这里插入图片描述

这段代码基于python开发的,用了opencv的一些库函数。实现了一个完整的海康机器人相机管理工具,支持多相机连接、参数配置、图像采集和实时显示功能。目前USB相机测试无误,除了丢一些包。

1. 主要类结构

HKCameraManager

这是整个系统的核心类,负责管理所有相机的生命周期和操作。
全局可调参数

# 相机参数配置
EXPOSURE_MODE = 0  # 曝光模式:0:关闭;1:一次;2:自动曝光
EXPOSURE_TIME = 40000 # 曝光时间
GAIN_VALUE = 10 #增益值
ReverseX_enable = True # 水平翻转
ReverseY_enable = True # 垂直翻转
#图像显示大小
scale_width = 0.2  # 宽度缩放因子
scale_height = 0.2  # 高度缩放因子
PacketSizeLog = True  # 启用丢包信息检测
主要属性:
  • cameras: 字典,存储所有已连接相机的信息和句柄
  • _last_error: 记录最后一次错误信息
  • _running: 字典,记录每个相机的运行状态
  • _lock: 线程锁,保证线程安全
  • _display_threads: 字典,存储每个相机的显示线程
  • _fps: 字典,记录每个相机的帧率
 def __init__(self):"""初始化相机管理器"""self.cameras: Dict[int, Dict] = {}  # 存储所有相机实例和信息self._last_error: str = ""self._running = {}  # 每个相机的运行状态self._lock = threading.Lock()self._display_threads = {}  # 每个相机的显示线程self._fps = {}  # 每个相机的FPS

2. 核心功能流程

2.1 设备枚举

  • 通过enumerate_devices()方法枚举所有可用设备
  • 支持GigE和USB两种接口类型的相机
  • 返回设备列表,包含型号、序列号、IP地址等信息
    def enumerate_devices(self) -> Optional[List[dict]]:"""枚举所有可用设备"""try:# 设置要枚举的设备类型tlayer_type = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICE# 初始化设备列表结构体device_list = MV_CC_DEVICE_INFO_LIST()memset(byref(device_list), 0, sizeof(device_list))# 创建临时相机实例用于枚举temp_cam = MvCamera()# 枚举设备ret = temp_cam.MV_CC_EnumDevices(tlayer_type, device_list)if ret != 0:self._log_error("枚举设备", ret)return None# 检查找到的设备数量if device_list.nDeviceNum == 0:print("未检测到任何相机设备")return []devices = []for i in range(device_list.nDeviceNum):# 获取设备信息指针device_info = cast(device_list.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents# 根据传输层类型处理设备信息if device_info.nTLayerType == MV_GIGE_DEVICE:# GigE设备device_data = {'model': ctypes.string_at(device_info.SpecialInfo.stGigEInfo.chModelName).decode('utf-8'),'serial': ctypes.string_at(device_info.SpecialInfo.stGigEInfo.chSerialNumber).decode('utf-8'),'ip': ".".join(map(str, device_info.SpecialInfo.stGigEInfo.nCurrentIp)),'type': 'GigE','index': i}elif device_info.nTLayerType == MV_USB_DEVICE:# USB设备# 修正USB设备信息获取方式usb_info = device_info.SpecialInfo.stUsb3VInfo# 使用ctypes的string_at函数获取字符串model_name = string_at(usb_info.chModelName).decode('utf-8', errors='ignore')serial_num = string_at(usb_info.chSerialNumber).decode('utf-8', errors='ignore')device_data = {'model': model_name.strip('\x00'),'serial': serial_num.strip('\x00'),'type': 'USB','index': i}else:continuedevices.append(device_data)return devicesexcept Exception as e:self._last_error = f"枚举设备时发生异常: {str(e)}"print(self._last_error)import tracebacktraceback.print_exc()  # 打印完整的错误堆栈return None

2.2 相机连接

  • connect_camera()方法连接指定索引的相机
  • 步骤:
    1. 检查相机是否已连接
    2. 枚举设备并选择指定索引的设备
    3. 创建相机句柄
    4. 打开设备
    5. 配置相机参数(曝光、增益等)
    6. 开始采集图像
    7. 存储相机信息到字典中
    def connect_camera(self, device_index: int) -> bool:"""连接指定索引的相机设备"""try:with self._lock:if device_index in self.cameras and self.cameras[device_index]['connected']:print(f"相机 {device_index} 已连接")return True# 枚举设备tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICEdeviceList = MV_CC_DEVICE_INFO_LIST()memset(byref(deviceList), 0, sizeof(deviceList))# 实例化相机cam = MvCamera()# 枚举设备ret = cam.MV_CC_EnumDevices(tlayerType, deviceList)if ret != 0:self._log_error("枚举设备", ret)return Falseif deviceList.nDeviceNum == 0:self._last_error = "未找到任何设备"print(self._last_error)return Falseif device_index >= deviceList.nDeviceNum:self._last_error = f"设备索引超出范围,最大可用索引: {deviceList.nDeviceNum - 1}"print(self._last_error)return False# 选择指定设备stDeviceList = cast(deviceList.pDeviceInfo[device_index], POINTER(MV_CC_DEVICE_INFO)).contents# 创建句柄ret = cam.MV_CC_CreateHandleWithoutLog(stDeviceList)if ret != MV_OK:self._log_error("创建句柄", ret)return False# 获取设备信息if stDeviceList.nTLayerType == MV_GIGE_DEVICE:model_name = ctypes.string_at(stDeviceList.SpecialInfo.stGigEInfo.chModelName).decode('utf-8')serial_num = ctypes.string_at(stDeviceList.SpecialInfo.stGigEInfo.chSerialNumber).decode('utf-8')ip_addr = ".".join(map(str, stDeviceList.SpecialInfo.stGigEInfo.nCurrentIp))device_type = 'GigE'print(f"正在连接设备 {device_index}: {model_name} (SN: {serial_num}, IP: {ip_addr}。GiGe)")else:usb_info = stDeviceList.SpecialInfo.stUsb3VInfomodel_name = string_at(usb_info.chModelName).decode('utf-8', errors='ignore')serial_num = string_at(usb_info.chSerialNumber).decode('utf-8', errors='ignore')ip_addr = Nonedevice_type = 'USB'print(f"正在连接设备 {device_index}: {model_name} (SN: {serial_num}, USB-3.0)")# 打开相机ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)if ret != MV_OK:# 特别处理USB相机连接问题if stDeviceList.nTLayerType == MV_USB_DEVICE:# 尝试设置USB传输大小(海康USB相机常见问题)ret = cam.MV_CC_SetIntValue("TransferSize", 0x100000)if ret == MV_OK:ret = cam.MV_CC_SetIntValue("NumTransferBuffers", 8)if ret == MV_OK:ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)if ret != 0:self._log_error("打开设备", ret)return False# 配置相机参数if not self._configure_camera(cam):cam.MV_CC_CloseDevice()cam.MV_CC_DestroyHandle()return False# 开始取流ret = cam.MV_CC_StartGrabbing()if ret != 0:self._log_error("开始取流", ret)cam.MV_CC_CloseDevice()cam.MV_CC_DestroyHandle()return False# 存储相机信息 - 确保所有必要字段都正确设置self.cameras[device_index] = {'handle': cam,'model': model_name.strip('\x00') if isinstance(model_name, str) else model_name,'serial': serial_num.strip('\x00') if isinstance(serial_num, str) else serial_num,'type': device_type,'ip': ip_addr,'connected': True,  # 确保连接状态正确设置为True'frame_count': 0,'last_frame_time': time.time()}# 初始化FPS计数器self._fps[device_index] = 0print(f"相机 {device_index} 连接成功: {model_name} (SN: {serial_num})")return Trueexcept Exception as e:self._last_error = f"连接相机时发生异常: {str(e)}"print(self._last_error)if 'cam' in locals():cam.MV_CC_DestroyHandle()return False

2.3 相机参数配置

  • _configure_camera()私有方法处理相机参数配置
  • 可配置项:
    • 触发模式(连续采集)
    • 曝光模式(手动/自动)
    • 增益设置
    • 图像翻转(水平/垂直)
    def _configure_camera(self, cam: MvCamera) -> bool:"""配置相机参数"""try:# 设置触发方式为连续采集ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)if ret != 0:self._log_error("设置触发模式", ret)return False# 设置曝光模式match EXPOSURE_MODE:case 0:  # 手动设置参数ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_OFF)if ret != 0:print("警告: 关闭自动曝光设置失败,将采用自动曝光")# 设置曝光时间exposure = float(EXPOSURE_TIME)ret = cam.MV_CC_SetFloatValue("ExposureTime", exposure)if ret != 0:raise RuntimeError(f"Set ExposureTime failed with error {ret}")case 1:  # 一次曝光ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_ONCE)if ret != 0:print("警告: 一次曝光设置失败,将继续使用手动曝光")case 2:  # 自动曝光ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_CONTINUOUS)if ret != 0:print("警告: 自动曝光设置失败,将继续使用手动曝光")# 设置增益ret = cam.MV_CC_SetEnumValue("GainAuto", MV_GAIN_MODE_OFF)if ret != 0:print("警告: 手动增益设置失败,将采用自动增益")gain_val = float(GAIN_VALUE)ret = cam.MV_CC_SetFloatValue("Gain", gain_val)if ret != 0:raise RuntimeError(f"Set gain failed with error {ret}")# 设置水平翻转flip = c_int(1 if ReverseX_enable else 0)ret = cam.MV_CC_SetBoolValue("ReverseX", flip)if ret != 0:raise RuntimeError(f"Set horizontal flip failed with error {ret}")print(f"Horizontal flip {'enabled' if ReverseX_enable else 'disabled'}")# 设置垂直翻转flip = c_int(1 if ReverseY_enable else 0)ret = cam.MV_CC_SetBoolValue("ReverseY", flip)if ret != 0:raise RuntimeError(f"Set vertical flip failed with error {ret}")print(f"Vertical flip {'enabled' if ReverseY_enable else 'disabled'}")return Trueexcept Exception as e:self._last_error = f"配置相机时发生异常: {str(e)}"print(self._last_error)return False

2.4 图像获取

  • get_image()方法获取指定相机的图像
  • 步骤:
    1. 获取图像缓冲区
    2. 复制图像数据
    3. 根据像素类型处理图像数据
    4. 转换为灰度图像
    5. 释放图像缓冲区
    6. 更新帧统计信息
    def get_image(self, device_index: int, timeout: int = 300) -> Optional[Tuple[np.ndarray, np.ndarray]]:"""获取指定相机的图像并返回原始图像和灰度图像"""with self._lock:if device_index not in self.cameras or not self.cameras[device_index]['connected']:self._last_error = f"相机 {device_index} 未连接"print(self._last_error)return Nonecam = self.cameras[device_index]['handle']try:# 初始化帧输出结构stOutFrame = MV_FRAME_OUT()memset(byref(stOutFrame), 0, sizeof(stOutFrame))# 获取图像ret = cam.MV_CC_GetImageBuffer(stOutFrame, timeout)if ret != 0:self._log_error(f"相机 {device_index} 获取图像", ret)return None# 获取图像信息frame_info = stOutFrame.stFrameInfonPayloadSize = frame_info.nFrameLenpData = stOutFrame.pBufAddr# 打印调试信息# print(f"相机 {device_index} 图像信息: "#       f"Width={frame_info.nWidth}, Height={frame_info.nHeight}, "#       f"PixelType={frame_info.enPixelType}, Size={nPayloadSize}")# 复制图像数据data_buf = (c_ubyte * nPayloadSize)()cdll.msvcrt.memcpy(byref(data_buf), pData, nPayloadSize)# 转换为numpy数组temp = np.frombuffer(data_buf, dtype=np.uint8)# 获取图像参数width = frame_info.nWidthheight = frame_info.nHeightpixel_type = frame_info.enPixelType# 根据像素类型处理图像img = self._process_image_data(temp, width, height, pixel_type)if img is None:if PacketSizeLog:print(f"相机 {device_index} 图像处理失败 - 数据大小: {len(temp)}, "f"预期大小: {width * height * (3 if pixel_type in [PixelType_Gvsp_RGB8_Packed, PixelType_Gvsp_BGR8_Packed] else 1)}")cam.MV_CC_FreeImageBuffer(stOutFrame)return None# 转换为灰度图像if len(img.shape) == 2:  # 已经是灰度图像gray = img.copy()else:gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)# 释放图像缓存cam.MV_CC_FreeImageBuffer(stOutFrame)# 更新帧统计信息self.cameras[device_index]['frame_count'] += 1self.cameras[device_index]['last_frame_time'] = time.time()return img, grayexcept Exception as e:self._last_error = f"相机 {device_index} 获取图像时发生异常: {str(e)}"print(self._last_error)if 'stOutFrame' in locals():cam.MV_CC_FreeImageBuffer(stOutFrame)return None

2.5 图像显示

  • start_display()启动相机实时显示
  • 为每个相机创建独立的显示线程
  • 显示线程中:
    • 循环获取图像
    • 计算并显示FPS
    • 显示图像到窗口
    • 处理用户按键(ESC退出)
    def start_display(self,device_index: int) -> bool:"""启动所有已连接相机的实时显示"""with self._lock:  # 添加线程锁# 检查相机是否已连接if device_index not in self.cameras or not self.cameras[device_index]['connected']:print(f"相机 {device_index} 未连接,无法启动显示")return Falseif device_index in self._running and self._running[device_index]:print(f"相机 {device_index} 显示已启动")return True# 设置运行标志self._running[device_index] = True# 创建并启动显示线程display_thread = threading.Thread(target=self._display_thread,args=(device_index,),daemon=True)self._display_threads[device_index] = display_threaddisplay_thread.start()print(f"相机 {device_index} 显示线程已启动")return True

2.6 断开连接

  • disconnect_camera()断开单个相机连接
  • disconnect_all()断开所有相机连接
  • 释放所有资源
    def disconnect_camera(self, device_index: int) -> bool:"""断开指定相机的连接"""with self._lock:if device_index not in self.cameras or not self.cameras[device_index]['connected']:print(f"相机 {device_index} 未连接")return Truecam = self.cameras[device_index]['handle']try:success = True# 停止取流ret = cam.MV_CC_StopGrabbing()if ret != 0:self._log_error(f"相机 {device_index} 停止取流", ret)success = False# 关闭设备ret = cam.MV_CC_CloseDevice()if ret != 0:self._log_error(f"相机 {device_index} 关闭设备", ret)success = False# 销毁句柄ret = cam.MV_CC_DestroyHandle()if ret != 0:self._log_error(f"相机 {device_index} 销毁句柄", ret)success = Falseif success:print(f"相机 {device_index} 已成功断开连接")self.cameras[device_index]['connected'] = False# 从字典中移除相机del self.cameras[device_index]# 停止显示线程if device_index in self._running:self._running[device_index] = Falsereturn successexcept Exception as e:self._last_error = f"断开相机 {device_index} 连接时发生异常: {str(e)}"print(self._last_error)return Falsedef disconnect_all(self) -> None:"""断开所有相机的连接"""self.stop_display()  # 先停止所有显示for device_index in list(self.cameras.keys()):if self.cameras[device_index]['connected']:self.disconnect_camera(device_index)

3. 目前程序的一些功能和特点如下

  1. 多线程支持

    • 每个相机的显示使用独立线程
    • 使用线程锁保证线程安全
  2. 错误处理

    • 详细的错误日志记录
    • 异常捕获和处理
      在这里插入图片描述
  3. 相机参数配置

    • 支持多种曝光模式
    • 可配置增益、翻转等参数
  4. 图像处理

    • 支持多种像素格式(Mono8, RGB8, BGR8等)
    • 自动处理数据大小不匹配的情况
    • 图像缩放显示
  5. 性能监控

    • 实时计算和显示FPS
    • 帧计数统计

4. 完整代码如下

from HK_Camera.MvCameraControl_class import *
from ctypes import *
from typing import Optional, Tuple, List, Dict
import time
import cv2
import numpy as np
import threading# 相机参数配置
EXPOSURE_MODE = 0  # 曝光模式:0:关闭;1:一次;2:自动曝光
EXPOSURE_TIME = 40000 # 曝光时间
GAIN_VALUE = 10 #增益值
ReverseX_enable = True # 水平翻转
ReverseY_enable = True # 垂直翻转
#图像显示大小
scale_width = 0.2  # 宽度缩放因子
scale_height = 0.2  # 高度缩放因子
PacketSizeLog = True  # 启用丢包信息检测class HKCameraManager:def __init__(self):"""初始化相机管理器"""self.cameras: Dict[int, Dict] = {}  # 存储所有相机实例和信息self._last_error: str = ""self._running = {}  # 每个相机的运行状态self._lock = threading.Lock()self._display_threads = {}  # 每个相机的显示线程self._fps = {}  # 每个相机的FPS@propertydef last_error(self) -> str:"""获取最后一次错误信息"""return self._last_errordef _log_error(self, operation: str, ret: int) -> None:"""记录错误日志"""self._last_error = f"{operation}失败,错误码: 0x{ret:x}"print(self._last_error)def enumerate_devices(self) -> Optional[List[dict]]:"""枚举所有可用设备"""try:# 设置要枚举的设备类型tlayer_type = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICE# 初始化设备列表结构体device_list = MV_CC_DEVICE_INFO_LIST()memset(byref(device_list), 0, sizeof(device_list))# 创建临时相机实例用于枚举temp_cam = MvCamera()# 枚举设备ret = temp_cam.MV_CC_EnumDevices(tlayer_type, device_list)if ret != 0:self._log_error("枚举设备", ret)return None# 检查找到的设备数量if device_list.nDeviceNum == 0:print("未检测到任何相机设备")return []devices = []for i in range(device_list.nDeviceNum):# 获取设备信息指针device_info = cast(device_list.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents# 根据传输层类型处理设备信息if device_info.nTLayerType == MV_GIGE_DEVICE:# GigE设备device_data = {'model': ctypes.string_at(device_info.SpecialInfo.stGigEInfo.chModelName).decode('utf-8'),'serial': ctypes.string_at(device_info.SpecialInfo.stGigEInfo.chSerialNumber).decode('utf-8'),'ip': ".".join(map(str, device_info.SpecialInfo.stGigEInfo.nCurrentIp)),'type': 'GigE','index': i}elif device_info.nTLayerType == MV_USB_DEVICE:# USB设备# 修正USB设备信息获取方式usb_info = device_info.SpecialInfo.stUsb3VInfo# 使用ctypes的string_at函数获取字符串model_name = string_at(usb_info.chModelName).decode('utf-8', errors='ignore')serial_num = string_at(usb_info.chSerialNumber).decode('utf-8', errors='ignore')device_data = {'model': model_name.strip('\x00'),'serial': serial_num.strip('\x00'),'type': 'USB','index': i}else:continuedevices.append(device_data)return devicesexcept Exception as e:self._last_error = f"枚举设备时发生异常: {str(e)}"print(self._last_error)import tracebacktraceback.print_exc()  # 打印完整的错误堆栈return Nonedef connect_camera(self, device_index: int) -> bool:"""连接指定索引的相机设备"""try:with self._lock:if device_index in self.cameras and self.cameras[device_index]['connected']:print(f"相机 {device_index} 已连接")return True# 枚举设备tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICEdeviceList = MV_CC_DEVICE_INFO_LIST()memset(byref(deviceList), 0, sizeof(deviceList))# 实例化相机cam = MvCamera()# 枚举设备ret = cam.MV_CC_EnumDevices(tlayerType, deviceList)if ret != 0:self._log_error("枚举设备", ret)return Falseif deviceList.nDeviceNum == 0:self._last_error = "未找到任何设备"print(self._last_error)return Falseif device_index >= deviceList.nDeviceNum:self._last_error = f"设备索引超出范围,最大可用索引: {deviceList.nDeviceNum - 1}"print(self._last_error)return False# 选择指定设备stDeviceList = cast(deviceList.pDeviceInfo[device_index], POINTER(MV_CC_DEVICE_INFO)).contents# 创建句柄ret = cam.MV_CC_CreateHandleWithoutLog(stDeviceList)if ret != MV_OK:self._log_error("创建句柄", ret)return False# 获取设备信息if stDeviceList.nTLayerType == MV_GIGE_DEVICE:model_name = ctypes.string_at(stDeviceList.SpecialInfo.stGigEInfo.chModelName).decode('utf-8')serial_num = ctypes.string_at(stDeviceList.SpecialInfo.stGigEInfo.chSerialNumber).decode('utf-8')ip_addr = ".".join(map(str, stDeviceList.SpecialInfo.stGigEInfo.nCurrentIp))device_type = 'GigE'print(f"正在连接设备 {device_index}: {model_name} (SN: {serial_num}, IP: {ip_addr}。GiGe)")else:usb_info = stDeviceList.SpecialInfo.stUsb3VInfomodel_name = string_at(usb_info.chModelName).decode('utf-8', errors='ignore')serial_num = string_at(usb_info.chSerialNumber).decode('utf-8', errors='ignore')ip_addr = Nonedevice_type = 'USB'print(f"正在连接设备 {device_index}: {model_name} (SN: {serial_num}, USB-3.0)")# 打开相机ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)if ret != MV_OK:# 特别处理USB相机连接问题if stDeviceList.nTLayerType == MV_USB_DEVICE:# 尝试设置USB传输大小(海康USB相机常见问题)ret = cam.MV_CC_SetIntValue("TransferSize", 0x100000)if ret == MV_OK:ret = cam.MV_CC_SetIntValue("NumTransferBuffers", 8)if ret == MV_OK:ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)if ret != 0:self._log_error("打开设备", ret)return False# 配置相机参数if not self._configure_camera(cam):cam.MV_CC_CloseDevice()cam.MV_CC_DestroyHandle()return False# 开始取流ret = cam.MV_CC_StartGrabbing()if ret != 0:self._log_error("开始取流", ret)cam.MV_CC_CloseDevice()cam.MV_CC_DestroyHandle()return False# 存储相机信息 - 确保所有必要字段都正确设置self.cameras[device_index] = {'handle': cam,'model': model_name.strip('\x00') if isinstance(model_name, str) else model_name,'serial': serial_num.strip('\x00') if isinstance(serial_num, str) else serial_num,'type': device_type,'ip': ip_addr,'connected': True,  # 确保连接状态正确设置为True'frame_count': 0,'last_frame_time': time.time()}# 初始化FPS计数器self._fps[device_index] = 0print(f"相机 {device_index} 连接成功: {model_name} (SN: {serial_num})")return Trueexcept Exception as e:self._last_error = f"连接相机时发生异常: {str(e)}"print(self._last_error)if 'cam' in locals():cam.MV_CC_DestroyHandle()return Falsedef _configure_camera(self, cam: MvCamera) -> bool:"""配置相机参数"""try:# 设置触发方式为连续采集ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)if ret != 0:self._log_error("设置触发模式", ret)return False# 设置曝光模式match EXPOSURE_MODE:case 0:  # 手动设置参数ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_OFF)if ret != 0:print("警告: 关闭自动曝光设置失败,将采用自动曝光")# 设置曝光时间exposure = float(EXPOSURE_TIME)ret = cam.MV_CC_SetFloatValue("ExposureTime", exposure)if ret != 0:raise RuntimeError(f"Set ExposureTime failed with error {ret}")case 1:  # 一次曝光ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_ONCE)if ret != 0:print("警告: 一次曝光设置失败,将继续使用手动曝光")case 2:  # 自动曝光ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_CONTINUOUS)if ret != 0:print("警告: 自动曝光设置失败,将继续使用手动曝光")# 设置增益ret = cam.MV_CC_SetEnumValue("GainAuto", MV_GAIN_MODE_OFF)if ret != 0:print("警告: 手动增益设置失败,将采用自动增益")gain_val = float(GAIN_VALUE)ret = cam.MV_CC_SetFloatValue("Gain", gain_val)if ret != 0:raise RuntimeError(f"Set gain failed with error {ret}")# 设置水平翻转flip = c_int(1 if ReverseX_enable else 0)ret = cam.MV_CC_SetBoolValue("ReverseX", flip)if ret != 0:raise RuntimeError(f"Set horizontal flip failed with error {ret}")print(f"Horizontal flip {'enabled' if ReverseX_enable else 'disabled'}")# 设置垂直翻转flip = c_int(1 if ReverseY_enable else 0)ret = cam.MV_CC_SetBoolValue("ReverseY", flip)if ret != 0:raise RuntimeError(f"Set vertical flip failed with error {ret}")print(f"Vertical flip {'enabled' if ReverseY_enable else 'disabled'}")return Trueexcept Exception as e:self._last_error = f"配置相机时发生异常: {str(e)}"print(self._last_error)return Falsedef get_image(self, device_index: int, timeout: int = 300) -> Optional[Tuple[np.ndarray, np.ndarray]]:"""获取指定相机的图像并返回原始图像和灰度图像"""with self._lock:if device_index not in self.cameras or not self.cameras[device_index]['connected']:self._last_error = f"相机 {device_index} 未连接"print(self._last_error)return Nonecam = self.cameras[device_index]['handle']try:# 初始化帧输出结构stOutFrame = MV_FRAME_OUT()memset(byref(stOutFrame), 0, sizeof(stOutFrame))# 获取图像ret = cam.MV_CC_GetImageBuffer(stOutFrame, timeout)if ret != 0:self._log_error(f"相机 {device_index} 获取图像", ret)return None# 获取图像信息frame_info = stOutFrame.stFrameInfonPayloadSize = frame_info.nFrameLenpData = stOutFrame.pBufAddr# 打印调试信息# print(f"相机 {device_index} 图像信息: "#       f"Width={frame_info.nWidth}, Height={frame_info.nHeight}, "#       f"PixelType={frame_info.enPixelType}, Size={nPayloadSize}")# 复制图像数据data_buf = (c_ubyte * nPayloadSize)()cdll.msvcrt.memcpy(byref(data_buf), pData, nPayloadSize)# 转换为numpy数组temp = np.frombuffer(data_buf, dtype=np.uint8)# 获取图像参数width = frame_info.nWidthheight = frame_info.nHeightpixel_type = frame_info.enPixelType# 根据像素类型处理图像img = self._process_image_data(temp, width, height, pixel_type)if img is None:if PacketSizeLog:print(f"相机 {device_index} 图像处理失败 - 数据大小: {len(temp)}, "f"预期大小: {width * height * (3 if pixel_type in [PixelType_Gvsp_RGB8_Packed, PixelType_Gvsp_BGR8_Packed] else 1)}")cam.MV_CC_FreeImageBuffer(stOutFrame)return None# 转换为灰度图像if len(img.shape) == 2:  # 已经是灰度图像gray = img.copy()else:gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)# 释放图像缓存cam.MV_CC_FreeImageBuffer(stOutFrame)# 更新帧统计信息self.cameras[device_index]['frame_count'] += 1self.cameras[device_index]['last_frame_time'] = time.time()return img, grayexcept Exception as e:self._last_error = f"相机 {device_index} 获取图像时发生异常: {str(e)}"print(self._last_error)if 'stOutFrame' in locals():cam.MV_CC_FreeImageBuffer(stOutFrame)return Nonedef _process_image_data(self, data: np.ndarray, width: int, height: int, pixel_type: int) -> Optional[np.ndarray]:"""根据像素类型处理原始图像数据"""try:if PacketSizeLog:# 首先检查数据大小是否匹配预期expected_size = width * heightif pixel_type in [PixelType_Gvsp_Mono8, PixelType_Gvsp_RGB8_Packed, PixelType_Gvsp_BGR8_Packed]:if pixel_type == PixelType_Gvsp_Mono8:expected_size = width * heightelse:expected_size = width * height * 3if len(data) != expected_size:print(f"警告: 数据大小不匹配 (预期: {expected_size}, 实际: {len(data)}), 尝试自动处理")# 尝试自动计算正确的高度if pixel_type == PixelType_Gvsp_Mono8:actual_height = len(data) // widthif actual_height * width == len(data):return data.reshape((actual_height, width))else:actual_height = len(data) // (width * 3)if actual_height * width * 3 == len(data):img = data.reshape((actual_height, width, 3))if pixel_type == PixelType_Gvsp_RGB8_Packed:return cv2.cvtColor(img, cv2.COLOR_RGB2BGR)return imgreturn None# 正常处理流程if pixel_type == PixelType_Gvsp_Mono8:return data.reshape((height, width))elif pixel_type == PixelType_Gvsp_RGB8_Packed:img = data.reshape((height, width, 3))return cv2.cvtColor(img, cv2.COLOR_RGB2BGR)elif pixel_type == PixelType_Gvsp_BGR8_Packed:return data.reshape((height, width, 3))elif pixel_type in [PixelType_Gvsp_Mono10, PixelType_Gvsp_Mono12]:# 对于10位或12位图像,需要进行位转换img = data.view(np.uint16)img = (img >> (pixel_type - PixelType_Gvsp_Mono8)).astype(np.uint8)return img.reshape((height, width))else:self._last_error = f"不支持的像素格式: {pixel_type}"print(self._last_error)return Noneexcept Exception as e:self._last_error = f"图像处理错误: {str(e)}"if PacketSizeLog:print(self._last_error)return Nonedef disconnect_camera(self, device_index: int) -> bool:"""断开指定相机的连接"""with self._lock:if device_index not in self.cameras or not self.cameras[device_index]['connected']:print(f"相机 {device_index} 未连接")return Truecam = self.cameras[device_index]['handle']try:success = True# 停止取流ret = cam.MV_CC_StopGrabbing()if ret != 0:self._log_error(f"相机 {device_index} 停止取流", ret)success = False# 关闭设备ret = cam.MV_CC_CloseDevice()if ret != 0:self._log_error(f"相机 {device_index} 关闭设备", ret)success = False# 销毁句柄ret = cam.MV_CC_DestroyHandle()if ret != 0:self._log_error(f"相机 {device_index} 销毁句柄", ret)success = Falseif success:print(f"相机 {device_index} 已成功断开连接")self.cameras[device_index]['connected'] = False# 从字典中移除相机del self.cameras[device_index]# 停止显示线程if device_index in self._running:self._running[device_index] = Falsereturn successexcept Exception as e:self._last_error = f"断开相机 {device_index} 连接时发生异常: {str(e)}"print(self._last_error)return False
###########################图像视频流显示部分################################################def start_display(self,device_index: int) -> bool:"""启动所有已连接相机的实时显示"""with self._lock:  # 添加线程锁# 检查相机是否已连接if device_index not in self.cameras or not self.cameras[device_index]['connected']:print(f"相机 {device_index} 未连接,无法启动显示")return Falseif device_index in self._running and self._running[device_index]:print(f"相机 {device_index} 显示已启动")return True# 设置运行标志self._running[device_index] = True# 创建并启动显示线程display_thread = threading.Thread(target=self._display_thread,args=(device_index,),daemon=True)self._display_threads[device_index] = display_threaddisplay_thread.start()print(f"相机 {device_index} 显示线程已启动")return Truedef stop_display(self, device_index: int = None) -> None:"""停止指定相机的显示或停止所有相机显示"""if device_index is None:# 停止所有显示for idx in list(self._running.keys()):self._running[idx] = Falsefor idx, thread in self._display_threads.items():if thread.is_alive():thread.join()self._display_threads.clear()cv2.destroyAllWindows()else:# 停止指定相机显示if device_index in self._running:self._running[device_index] = Falseif device_index in self._display_threads:if self._display_threads[device_index].is_alive():self._display_threads[device_index].join()del self._display_threads[device_index]cv2.destroyWindow(f"Camera {device_index}")def _display_thread(self, device_index: int) -> None:"""单个相机的显示线程"""frame_count = 0last_time = time.time()window_name = f"Camera {device_index}"def _window_exists(window_name):"""检查OpenCV窗口是否存在"""try:return cv2.getWindowProperty(window_name, cv2.WND_PROP_VISIBLE) >= 0except:return Falsetry:while self._running.get(device_index, False):try:# 获取图像result = self.get_image(device_index)if result is None:if PacketSizeLog:print(f"相机 {device_index} 获取图像超时")time.sleep(0.1)continueimg, _ = result# 计算FPSframe_count += 1current_time = time.time()if current_time - last_time >= 1.0:self._fps[device_index] = frame_count / (current_time - last_time)frame_count = 0last_time = current_time# 在图像上显示信息info = f"Cam {device_index} | {self.cameras[device_index]['model']} | FPS: {self._fps[device_index]:.1f}"cv2.putText(img, info, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)resized_image_by_scale = cv2.resize(img, None, fx=scale_width, fy=scale_height)# 显示图像cv2.imshow(window_name, resized_image_by_scale)# 检查按键key = cv2.waitKey(1) & 0xFFif key == 27:  # ESC键退出self._running[device_index] = Falsebreakexcept Exception as e:print(f"相机 {device_index} 显示线程异常: {str(e)}")time.sleep(0.1)finally:# 线程结束时清理if _window_exists(window_name):cv2.destroyWindow(window_name)print(f"相机 {device_index} 显示线程已停止")def disconnect_all(self) -> None:"""断开所有相机的连接"""self.stop_display()  # 先停止所有显示for device_index in list(self.cameras.keys()):if self.cameras[device_index]['connected']:self.disconnect_camera(device_index)def __del__(self):"""析构函数,确保资源释放"""self.disconnect_all()def main():# 创建相机管理器cam_manager = HKCameraManager()# 枚举设备devices = cam_manager.enumerate_devices()if not devices:print("未找到任何相机设备")returnprint("找到以下相机设备:")for i, dev in enumerate(devices):# 根据设备类型显示不同信息if dev['type'] == 'GigE':print(f"{i}: {dev['model']} (SN: {dev['serial']}, IP: {dev['ip']})")else:  # USB设备print(f"{i}: {dev['model']} (SN: {dev['serial']}, Type: USB)")# 先连接所有相机for i in range(len(devices)):if not cam_manager.connect_camera(i):print(f"无法连接相机 {i}")continue  # 即使一个相机连接失败,也继续尝试其他相机# 确认连接状态后再启动显示for i in range(len(devices)):if i in cam_manager.cameras and cam_manager.cameras[i]['connected']:cam_manager.start_display(i)try:# 主线程等待while any(cam_manager._running.values()):time.sleep(0.1)except KeyboardInterrupt:print("用户中断...")finally:# 清理资源cam_manager.disconnect_all()print("程序退出")if __name__ == "__main__":main()

5. 写在最后

目前程序还有一些未增加的功能,后续会增加补充

  1. 相机参数动态调整功能
  2. 图像保存功能
  3. 支持更多像素格式
  4. 网络相机重连机制
  5. 日志系统替代print输出

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/diannao/87160.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HTTP 协议各个主要版本的功能特点、核心原理、使用场景总结

我们来系统总结一下 HTTP 协议各个主要版本的功能特点、核心原理(用图示辅助说明)以及典型使用场景。 核心演进目标: 提升性能、安全性、效率和灵活性。 1. HTTP/0.9 (1991) - 远古雏形 功能特点: 极其简单: 只支持 GET 方法。无…

Linux编程:3、进程通信-信号

一、进程通信概述 (一)进程通信的目的 在企业开发中,一个项目常常需要多个进程共同协作,而这些进程之间需要进行通信(交换信息)以便协作。本章内容主要围绕信号讲解,其它进程通信的常用方式请…

深度解析Vue.js组件开发与实战案例

一、Vue.js组件化思想 Vue.js的核心思想之一就是组件化开发。组件系统是Vue的一个重要概念,它允许我们使用小型、独立和通常可复用的组件构建大型应用。在Vue中,组件本质上是一个拥有预定义选项的Vue实例。 1.1 为什么需要组件化 代码复用:避免重复造轮子,提高开发效率可…

TensorFlow 2.0 与 Python 3.11 兼容性

TensorFlow 2.0 与 Python 3.11 兼容性 TensorFlow 2.0 官方版本对 Python 3.11 的支持有限,可能出现兼容性问题。建议使用 TensorFlow 2.10 或更高版本,这些版本已适配 Python 3.11。若需强制运行,可通过以下方式解决依赖冲突: …

MyBatisPlus 全面学习路径

MyBatisPlus 全面学习路径 学习目录 第一部分:MyBatisPlus 基础 MyBatisPlus 简介与核心特性快速入门与环境搭建核心功能:BaseMapper 与 CRUD 接口条件构造器(Wrapper)详解ActiveRecord 模式主键策略与通用枚举 第二部分&…

React16,17,18,19更新对比

文章目录 前言一、16更新二、17更新三、18更新四、19更新总结 前言 总结react 16,17,18,19所更新的内容,并且部分会涉及到原理讲解。 一、16更新 1、在16.8之前更新,还是基于class组件的升级和维护更新。并且更新了一…

【git】有两个远程仓库时的推送、覆盖、合并问题

当你执行 git pull origin develop(或默认的 git pull)时,Git 会把远端 origin/develop 的提交合并到你本地的 develop,如果远端已经丢掉(或从未包含)你之前在私库(priv)里提交过的改动,那这些改动就会被「覆盖」,看起来就像「本地修改没了」。 要解决这个问题,分…

Spring Boot 集成国内AI,包含文心一言、通义千问和讯飞星火平台实战教程

Spring Boot 集成国内AI,包含文心一言、通义千问和讯飞星火平台实战教程 一、项目结构二、添加Maven依赖三、配置API密钥 (application.yml)四、配置类1. AI配置类 (AiProperties.java)2. 启用配置类 (AiConfig.java) 五、服务层实现1. 文心一言服务 (WenxinService…

Elastic Search 学习笔记

1. Elasticsearch 是什么?有哪些应用场景? Elasticsearch 整体原理流程? Elasticsearch 是一个为海量数据提供近实时搜索和分析能力的分布式搜索引擎,广泛应用于全文检索、日志分析和大数据处理场景中。 Elasticsearch 整体原理…

动态规划之斐波那契数(一)

解法一&#xff1a;递归 class Solution { public:int fib(int n) {if(n<2) return n;return fib(n-1)fib(n-2);} }; 解法二&#xff1a;dp class Solution { public:int fib(int N) {if (N < 1) return N;int dp[2];dp[0] 0;dp[1] 1;for (int i 2; i < N; i) {…

如何设置爬虫的访问频率?

设置爬虫的访问频率是爬虫开发中的一个重要环节&#xff0c;尤其是在爬取大型网站&#xff08;如1688&#xff09;时&#xff0c;合理的访问频率可以避免对目标网站造成过大负担&#xff0c;同时也能降低被封禁的风险。以下是一些常见的方法和建议&#xff0c;帮助你合理设置爬…

前端面试六之axios

一、axios简介 Axios 是一个基于 Promise 的 HTTP 客户端&#xff0c;用于浏览器和 Node.js 环境。在浏览器端&#xff0c;Axios 的底层实现是基于原生的 XMLHttpRequest&#xff08;XHR&#xff09;。它对 XHR 进行了封装&#xff0c;增加了 Promise 支持、自动转换 JSON 数据…

模板方法模式Template Method Pattern

模式定义 定义一个操作中算法的骨架&#xff0c;而将一些步骤延迟到子类中&#xff0c;模板方法使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤 类行为型模式 模式结构 AbstractClass&#xff1a;抽象类ConcreteClass&#xff1a;具体子类 只有类之间的继…

【行云流水AI笔记】游戏里面的强化学习使用场景

强化学习在游戏中的应用已从早期的棋类博弈扩展到现代复杂游戏的全流程优化&#xff0c;以下是结合最新技术进展的核心应用场景及典型案例&#xff1a; 一、竞技游戏的策略突破 1. 策略博弈类游戏 代表案例&#xff1a;AlphaGo/AlphaZero&#xff08;围棋&#xff09;、Alph…

使用Python和PyTorch框架,基于RetinaNet模型进行目标检测,包含数据准备、模型训练、评估指标计算和可视化

下面是一个完整的实现方案,使用Python和PyTorch框架,基于RetinaNet模型进行目标检测,包含数据准备、模型训练、评估指标计算和可视化。 import os import numpy as np import matplotlib.pyplot as plt import torch import torchvision from torchvision.models.detection…

springboot服务如何获取pod当前ip方案及示例

在 Kubernetes 集群中&#xff0c;Spring Boot 服务获取 Pod 当前 IP 的方案主要有两种&#xff1a;通过环境变量注入 或 通过 Java 代码动态获取网络接口 IP。以下是两种方案的详细说明及示例&#xff1a; 方案一&#xff1a;通过 Kubernetes Downward API 注入环境变量 原理…

1.MySQL三层结构

1.所谓安装的Mysql数据库&#xff0c;就是在电脑上安装了一个数据库管理系统&#xff08;【DBMS】database manage system&#xff09;&#xff0c;这个管理程序可以管理多个数据库。 2.一个数据库中可以创建多个表&#xff0c;以保存数据&#xff08;信息&#xff09;。【数据…

[深度学习]目标检测基础

目录 一、实验目的 二、实验环境 三、实验内容 3.1 LM_BoundBox 3.1.1 实验代码 3.1.2 实验结果 3.2 LM_Anchor 3.2.1 实验代码 3.2.2 实验结果 3.3 LM_Multiscale-object-detection 3.3.1 实验代码 3.3.2 实验结果 四、实验小结 一、实验目的 了解python语…

ALOHA机器人平台:低成本、高精度双臂操作及其进展深度解析

原创1从感知决策到具身智能的技术跃迁与挑战(基座模型与VLA模型)2ALOHA机器人平台&#xff1a;低成本、高精度双臂操作及其进展深度解析3(上)通用智能体与机器人Transformer&#xff1a;Gato和RT-1技术解析及与LLM Transformer的异同4(下)通用智能体与机器人Transformer&#x…

C++: 类 Class 的基础用法

&#x1f3f7;️ 标签&#xff1a;C、面向对象、类、构造函数、成员函数、封装、继承、多态 &#x1f4c5; 更新时间&#xff1a;2025年6月15日 &#x1f4ac; 欢迎在评论区留言交流你的理解与疑问&#xff01; 文章目录 前言一、什么是类&#xff1f;二、类的定义1.基本语法2.…