第一节:
首先是环境的搭建。
环境的搭建。
root@topeet:/home/topeet/source_code/wang_onvif_python# python -m venv venv
Command 'python' not found, did you mean:command 'python3' from deb python3command 'python' from deb python-is-python3
root@topeet:/home/topeet/source_code/wang_onvif_python# python^Cm venv venv
root@topeet:/home/topeet/source_code/wang_onvif_python# ^C
root@topeet:/home/topeet/source_code/wang_onvif_python# ^C
root@topeet:/home/topeet/source_code/wang_onvif_python# python3
python3 python3.10 python3.10-config python3-config
root@topeet:/home/topeet/source_code/wang_onvif_python# python3
python3 python3.10 python3.10-config python3-config
root@topeet:/home/topeet/source_code/wang_onvif_python# python3 -m venv myenv
root@topeet:/home/topeet/source_code/wang_onvif_python# source myenv/bin/activate
(myenv) root@topeet:/home/topeet/source_code/wang_onvif_python# pip3 install WSDiscovery
Collecting WSDiscoveryDownloading WSDiscovery-2.1.2.tar.gz (23 kB)Preparing metadata (setup.py) ... done
Collecting clickDownloading click-8.2.1-py3-none-any.whl (102 kB)━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 102.2/102.2 KB 618.4 kB/s eta 0:00:00
Collecting ifaddrDownloading ifaddr-0.2.0-py3-none-any.whl (12 kB)
Using legacy 'setup.py install' for WSDiscovery, since package 'wheel' is not installed.
Installing collected packages: ifaddr, click, WSDiscoveryRunning setup.py install for WSDiscovery ... done
Successfully installed WSDiscovery-2.1.2 click-8.2.1 ifaddr-0.2.0
这里没有使用 requirement.txt 。
然后是一些 基本的介绍。
主要是 profile S
这是 接口文档。
这个实际上是 HTTP 协议。
这是基本的功能。
这两个是 必选的。
第二节:
然后是发现设备。
然后是基本的使用。
deepseek :
from wsdiscovery import WSDiscovery
from wsdiscovery import QNamedef discover_onvif_cameras():wsd = WSDiscovery()try:wsd.start()# ONVIF 设备类型onvif_type = QName("http://www.onvif.org/ver10/network/wsdl", "NetworkVideoTransmitter")# 搜索 ONVIF 设备services = wsd.searchServices(types=[onvif_type])print(f"发现 {len(services)} 个 ONVIF 摄像头:")for service in services:print(f"\n摄像头地址: {service.getXAddrs()[0]}")print(f"设备类型: {service.getTypes()}")finally:wsd.stop()if __name__ == "__main__":discover_onvif_cameras()
基本上 使用的话, 这个 库就够用了。
我自己 测试使用的代码
43 from wsdiscovery import WSDiscovery2 def test_find():1 # http://www.onvif.org/onvif/ver10/network/wsdl/remotediscovery.wsdl5 # from wsdiscovery.threaded import NetworkingThread, MULTICAST_PORT12 clients = []34 wsd = WSDiscovery()5 wsd.start()6 services = wsd.searchServices()78 for service in services:9 get_ip = str(service.getXAddrs())10 get_types = str(service.getTypes())11 clients.append(get_ip)121314 wsd.stop()15 clients.sort()1617 print("------------------start------------------------")18 for client in clients:19 print(client)2021 if __name__ == '__main__':22 print("main.py")23 print("ONVIFClientManager")2425 test_find()26
执行 编译+执行 命令。
这是返回的结果。
我的 局域网内 是有一个大华 摄像头的。
(myenv) root@topeet:/home/topeet/source_code/wang_onvif_python/my_code# python3 client_find.py
main.py
ONVIFClientManager
------------------start------------------------
['http://192.168.1.114:5357/e70e9f7e-9aa8-42c7-ab3d-10537725dc6d/']
['http://192.168.1.46/onvif/device_service']
['http://TOPEET_FILE:5357/c44da3fc-8dad-480c-9463-c3e887093657']
(myenv) root@topeet:/home/topeet/source_code/wang_onvif_python/my_code#
(myenv) root@topeet:/home/topeet/source_code/wang_onvif_python/my_code#
然后是一些基本的了解。
多播可以在 广域网上传播,但是广播不行。
D类 地址 专门用于 广播。
这里为什么是 239.255.255.250 呢? 这个是固定的吗?
第三节:
然后是 截屏,分辨率的配置。
一些基本的了解:
它的流程是这样的。
客户端探测--->WSDiscovery ---->返回devcemgt 地址------>devicemgt 地址+ getservice----->得到其他模块的地址。
这两个函数是一样的。
然后是我自己的代码。
运行的有个报错。
(myenv) root@topeet:/home/topeet/source_code/wang_onvif_python/my_code# pip3 uninstall suds-passworddigest
WARNING: Skipping suds-passworddigest as it is not installed.
(myenv) root@topeet:/home/topeet/source_code/wang_onvif_python/my_code# pip3 install --upgrade onvif_zeep
Collecting onvif_zeepDownloading onvif_zeep-0.2.12.tar.gz (163 kB)━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 163.1/163.1 KB 685.2 kB/s eta 0:00:00Preparing metadata (setup.py) ... done
(myenv) root@topeet:/home/topeet/source_code/wang_onvif_python/my_code# pip3 install pillow
Collecting pillowDownloading pillow-11.3.0-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (6.6 MB)━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 6.6/6.6 MB 4.6 MB/s eta 0:00:00
Installing collected packages: pillow
Successfully installed pillow-11.3.0
(myenv) root@topeet:/home/topeet/source_code/wang_onvif_python/my_code#
验证一下截图的代码:
13 from mypackage import client12 import os1110 def test_client():98 #密码+账户 任意7 wang_client = client.Client('192.168.1.46', 'admin', 'Admin123')65 # rtsp://admin:Admin123@192.168.1.145/live/0/MAIN4 # rtsp://admin:Admin123@192.168.1.145/live/0/SUB321 if not wang_client.connect():14 exit(0)12 root_dir = os.path.dirname(os.path.abspath(__file__))3 wang_client.Snapshot(file_dir=os.path.join(root_dir,"data"))45 # streamUri = wang_client.GetStreamUri()6 # print(streamUri)7 # profiles = client.GetProfiles()8 # osds = client.GetOSDs()9 # info = client.GetDeviceInformation()10 # videoSourceConfig = client.GetVideoSourceConfigurations()11 #12 # encoderConfig1 = client.GetVideoEncoderConfigurations()13 # client.SetVideoEncoderConfiguration()14 # encoderConfig2 = client.GetVideoEncoderConfigurations()15 #16 # test_client(client)1718 print("end")192021 if __name__ == '__main__':22 print("main.py")23 print("ONVIFClientManager")2425 test_client()26
前提是 已经把 client 这个 包 移植好。
import osfrom onvif import ONVIFCamera,ONVIFError
import zeep
import time
from datetime import datetime
import requests
from requests.auth import HTTPDigestAuth
from PIL import Imagedef zeep_pythonvalue(self, xmlvalue):return xmlvalueclass Client(object):def __init__(self, ip: str, username: str, password: str):self.ip = ipself.username = usernameself.password = passwordzeep.xsd.simple.AnySimpleType.pythonvalue = zeep_pythonvalue# zeep.xsd.simple.AnySimpleType.pythonvalue = lambda x:xdef connect(self):"""连接相机:return:"""try:self.camera = ONVIFCamera(self.ip, 80, self.username, self.password)self.media = self.camera.create_media_service()profiles = self.GetProfiles()self.media_profile = profiles[0] # 获取配置信息print("连接成功")return Trueexcept Exception as e:print("连接失败")print(e)return Falsedef Snapshot(self,file_dir='data'):"""截图:return:"""if not os.path.exists(file_dir):os.makedirs(file_dir)file_path = os.path.join(file_dir,str(datetime.now().strftime("%Y%m%d_%H_%M_%S"))+".jpg")res = self.media.GetSnapshotUri({'ProfileToken': self.media_profile.token})# response = requests.get(res.Uri, auth=HTTPDigestAuth(self.username, self.password))response = requests.get(res.Uri)with open(file_path, 'wb') as f: # 保存截图f.write(response.content)print("保存截图成功:%s"%file_path)def Snapshot_resize(self, file_dir='data', size=None):"""截图并调整尺寸:param picname: 保存截图的文件名:param new_size: 调整后的尺寸,格式为(width, height)"""if not os.path.exists(file_dir):os.makedirs(file_dir)file_path = os.path.join(file_dir,str(datetime.now().strftime("%Y%m%d_%H_%M_%S"))+".jpg")res = self.media.GetSnapshotUri({'ProfileToken': self.media_profile.token})response = requests.get(res.Uri, auth=HTTPDigestAuth(self.username, self.password))with open(file_path, 'wb') as f:f.write(response.content)if size:with Image.open(file_path) as img:img = img.resize(size)img.save(file_path)print("保存截图成功:%s"%file_path)def GetStreamUri(self):obj = self.media.create_type('GetStreamUri')obj.StreamSetup = {'Stream': 'rtp-unicast', 'Transport': {'Protocol': 'RTSP'}}obj.ProfileToken = self.media_profile.tokenres = self.media.GetStreamUri(obj)url = res['Uri']# url示例 rtsp://192.168.1.176:554/cam/realmonitor?channel=1&subtype=0&unicast=true&proto=Onvifif url.startswith("rtsp://"):url_suffix = url[7:]url = "rtsp://%s:%s@%s"%(self.username,self.password,url_suffix)return urldef GetDeviceInformation(self):# https://www.onvif.org/onvif/ver10/device/wsdl/devicemgmt.wsdldevicemgmt = self.camera.devicemgmtcapabilities = devicemgmt.GetCapabilities()services = devicemgmt.GetServices(True)hostname1 = devicemgmt.GetHostname()devicemgmt.SetHostname("hh")hostname2 = devicemgmt.GetHostname()# 查看设备支持的范围scopes1 = devicemgmt.GetScopes()devicemgmt.SetScopes(["onvif://www.onvif.org/name/qj92122","onvif://www.onvif.org/location/city/hs22"])scopes2 = devicemgmt.GetScopes()info = devicemgmt.GetDeviceInformation()interfaces = devicemgmt.GetNetworkInterfaces()return infodef GetVideoSourceConfigurations(self):return self.media.GetVideoSourceConfigurations()def GetVideoEncoderConfigurations(self):return self.media.GetVideoEncoderConfigurations()def GetProfiles(self):return self.media.GetProfiles()def GetOSDs(self):return self.media.GetOSDs()def SetVideoEncoderConfiguration(self, ratio=(1920, 1080), Encoding='H264', bitrate=2044, fps=30, gop=50):videoSourceConfig = self.GetVideoSourceConfigurations()encoderConfig = self.GetVideoEncoderConfigurations()if ratio[0] > videoSourceConfig[0]['Bounds']['width']:raise ONVIFError(f'ratio Error: max width should less then {videoSourceConfig[0]["Bounds"]["width"]}')if ratio[1] > videoSourceConfig[0]['Bounds']['height']:raise ONVIFError(f'ratio Error: max height should less then {videoSourceConfig[0]["Bounds"]["height"]}')if Encoding.upper() not in ('H265', 'H264'):raise ONVIFError(f'encoding Error: enconding format should be "H265" or "H264"')encoderConfig[0]['Resolution']['Width'] = ratio[0]encoderConfig[0]['Resolution']['Height'] = ratio[1]encoderConfig[0]['Encoding'] = Encoding.upper()encoderConfig[0]['RateControl']['FrameRateLimit'] =fpsencoderConfig[0]['RateControl']['BitrateLimit'] = bitrate# encoderConfig[0]['GovLength'] = gopself.media.SetVideoEncoderConfiguration({'Configuration':encoderConfig[0],'ForcePersistence':True})
最后的结果也是 生成了 图片。
然后测试一下 关于 rtsp 的功能。
目前测试了 rtsp 的功能。
osd 的返回值。
deviceinfo 的返回值。
videosource
videoencoder
设置 videoencoder 的功能。
4 from mypackage import client3 import os21 def test_client():51 #密码+账户 任意2 wang_client = client.Client('192.168.1.46', 'admin', 'Admin123')34 # rtsp://admin:Admin123@192.168.1.145/live/0/MAIN5 # rtsp://admin:Admin123@192.168.1.145/live/0/SUB678 if not wang_client.connect():9 exit(0)10 #11 # root_dir = os.path.dirname(os.path.abspath(__file__))12 # wang_client.Snapshot(file_dir=os.path.join(root_dir,"data"))1314 streamUri = wang_client.wang_GetStreamUri()15 print(streamUri)16 profiles = wang_client.GetProfiles()17 # print(profiles)18 osds = wang_client.GetOSDs()19 # print(osds)20 info = wang_client.GetDeviceInformation()21 # print(info)22232425 videoSourceConfig = wang_client.GetVideoSourceConfigurations()26 # print(videoSourceConfig)27 encoderConfig1 = wang_client.GetVideoEncoderConfigurations()28 print(encoderConfig1)29 wang_client.SetVideoEncoderConfiguration()30 encoderConfig2 = wang_client.GetVideoEncoderConfigurations()31 print(encoderConfig2)3233 print("end")343536 if __name__ == '__main__':37 print("main.py")38 print("ONVIFClientManager")3940 test_client()41
第四节:
然后是Ptz 的配置。、
首先是 移植一下 Pzt 的模块
这里我有一个疑问,就是
为什么 ptz 的控制 ,在代码中要通过 media 模块呢?
from time import sleep1 from mypackage import client23 class PTZ():4 def __init__(self, client: client.Client):56 self.media = client.media78 self.media_profile = self.media.GetProfiles()[0]9 token = self.media_profile.token10 self.ptz = self.media.create_ptz_service()1112 #Get available PTZ services13 request = self.ptz.create_type('GetServiceCapabilities')14 Service_Capabilities = self.ptz.GetServiceCapabilities(request)1516 #Get PTZ status17 status = self.ptz.GetStatus({'ProfileToken':token})1819 # Get PTZ configuration options for getting option ranges20 request = self.ptz.create_type('GetConfigurationOptions')21 request.ConfigurationToken = self.media_profile.PTZConfiguration.token22 ptz_configuration_options = self.ptz.GetConfigurationOptions(request)2324 self.requestc = self.ptz.create_type('ContinuousMove')25 self.requestc.ProfileToken = self.media_profile.token2627 self.requesta = self.ptz.create_type('AbsoluteMove')28 self.requesta.ProfileToken = self.media_profile.token2930 self.requestr = self.ptz.create_type('RelativeMove')31 self.requestr.ProfileToken = self.media_profile.token323334 self.requests = self.ptz.create_type('Stop')35 self.requests.ProfileToken = self.media_profile.token3637 self.requestp = self.ptz.create_type('SetPreset')38 self.requestp.ProfileToken = self.media_profile.token3940 self.requestg = self.ptz.create_type('GotoPreset')41 self.requestg.ProfileToken = self.media_profile.token4243 # self.stop()4445 def __del__(self):46 self.stop()4748 def stop(self):49 self.requests.PanTilt = True50 self.requests.Zoom = True5152 self.ptz.Stop(self.requests)53
#Continuous move functions44 def perform_move(self,timeout,x,y):43 # Start continuous move42 # ret = self.ptz.ContinuousMove(self.requestc)4140 self.ptz.ContinuousMove({39 'ProfileToken': self.media_profile.token,38 'Velocity': {37 'PanTilt': {'x': x, 'y': y}36 # 'Zoom': {'x': 1.0}35 }34 })3332 sleep(timeout)31 self.stop()3029 def move_absolute(self, x, y, velocity):28 # self.requesta.Position.PanTilt._x = pan27 # self.requesta.Position.PanTilt._y = tilt26 # self.requesta.Speed.PanTilt._x = velocity25 # self.requesta.Speed.PanTilt._y = velocity24 # ret = self.ptz.AbsoluteMove(self.requesta)2322 self.ptz.AbsoluteMove({21 'ProfileToken': self.media_profile.token,20 'Position': {19 'PanTilt': {'x': x, 'y': y}18 },17 'Speed': {16 'PanTilt': {'x': velocity, 'y': velocity}15 }14 })1312 # ret = self.ptz.AbsoluteMove(self.requesta, Position={11 # 'PanTilt': {'x': pan, 'y': tilt}10 # })98 sleep(2)76 # Relative move functions --NO ERRORS BUT CAMERA DOES NOT MOVE5 def move_relative(self, pan, tilt, velocity):4 # self.requestr.Translation.PanTilt._x = pan3 # self.requestr.Translation.PanTilt._y = tilt2 # self.requestr.Speed.PanTilt._x = velocity1 # ret = self.requestr.Speed.PanTilt._y = velocity101 # self.ptz.RelativeMove(self.requestr)12 self.ptz.RelativeMove({3 'ProfileToken': self.media_profile.token,4 'Translation': {'PanTilt': {'x': pan, 'y': tilt}}5 })6 sleep(2.0)46 def zoom(self, zoom, timeout):45 pass44 # self.requestc.Velocity.Zoom._x = velocity43 # self.perform_move(timeout)42 self.ptz.ContinuousMove({41 'ProfileToken': self.media_profile.token,40 'Velocity': {39 'PanTilt': {'x': 0, 'y': 0},38 'Zoom': {'x': zoom}37 }36 })3534 sleep(timeout)33 self.stop()32313029 def move_y(self, val, timeout):28 # self.requestc.Velocity.PanTilt._x = 0.027 # self.requestc.Velocity.PanTilt._y = velocity26 self.perform_move(timeout,x=0.0,y=val)2524 def move_x(self, val, timeout):23 # self.requestc.Velocity.PanTilt._x = velocity22 # self.requestc.Velocity.PanTilt._y = 0.021 self.perform_move(timeout,x=val,y=0.0)20191817 #Sets preset set, query and and go to16 def set_preset(self, name):15 self.requestp.PresetName = name14 self.requestp.PresetToken = '1'13 self.preset = self.ptz.SetPreset(self.requestp) #returns the PresetToken1211 def get_preset(self):10 presets = self.ptz.GetPresets({'ProfileToken': self.media_profile.token})9 print("presets:",presets)87 def goto_preset(self, name):6 try:5 self.ptz.GotoPreset(4 {'ProfileToken': self.media_profile.token, "PresetToken": name}) # 移动到指定预置点位置3 except Exception as e:2 print("云台控制失败:%s"%str(e))11551
应用就是:
from mypackage import client1 from mypackage import ptz2 import os34 def test_ptz(client : client.Client):5 wang_ptz = ptz.PTZ(client)67 while True:8 # zoom in9 # ptz.zoom(1.0, 2)10 # zoom out11 print("开始缩小")12 wang_ptz.zoom(-1.0, 2)13 wang_ptz.zoom(-1.0, 2)14 wang_ptz.zoom(-1.0, 2)1516 print("开始放大")17 wang_ptz.zoom(1.0, 2)1819 # # move down20 # ptz.move_x(val=-1.0, timeout=2)21 #22 # time.sleep(10)23 # ptz.move_x(val=1.0, timeout=2)24 # time.sleep(10)25 #26 # exit(0)27 # # Set preset28 # # ptz.move_x(x=1.0, timeout=1)29 # # ptz.set_preset('home')30 #31 # # move right -- (velocity, duration of move)32 # ptz.move_x(val=1.0, timeout=2)33 #34 # # move left35 # ptz.move_x(val=-1.0, timeout=2)36 #37 # # move down38 # ptz.move_y(val=-1.0, timeout=2)39 #40 # # Move up41 # ptz.move_y(val=1.0, timeout=2)42 #43 #44 #45 # # Absolute pan-tilt (pan position, tilt position, velocity)46 # # DOES NOT RESULT IN CAMERA MOVEMENT47 # ptz.move_absolute(x=-1.0, y=1.0, velocity=1.0)48 # ptz.move_absolute(x=1.0, y=-1.0, velocity=1.0)49 #50 # # Relative move (pan increment, tilt increment, velocity)51 # # DOES NOT RESULT IN CAMERA MOVEMENT52 # # ptz.move_relative(0.5, 0.5, 8.0)
#47 # # Get presets46 # ptz.get_preset()45 # # Go back to preset44 # ptz.goto_preset('home')434241 def test_client():4039 #密码+账户 任意38 wang_client = client.Client('192.168.1.46', 'admin', 'Admin123')3736 # rtsp://admin:Admin123@192.168.1.145/live/0/MAIN35 # rtsp://admin:Admin123@192.168.1.145/live/0/SUB343332 if not wang_client.connect():31 exit(0)30 #29 # root_dir = os.path.dirname(os.path.abspath(__file__))28 # wang_client.Snapshot(file_dir=os.path.join(root_dir,"data"))2726 streamUri = wang_client.wang_GetStreamUri()25 print(streamUri)24 profiles = wang_client.GetProfiles()23 # print(profiles)22 osds = wang_client.GetOSDs()21 # print(osds)20 info = wang_client.GetDeviceInformation()19 # print(info)18171615 videoSourceConfig = wang_client.GetVideoSourceConfigurations()14 # print(videoSourceConfig)13 encoderConfig1 = wang_client.GetVideoEncoderConfigurations()12 #print(encoderConfig1)11 wang_client.SetVideoEncoderConfiguration()10 encoderConfig2 = wang_client.GetVideoEncoderConfigurations()9 #print(encoderConfig2)87 #print("end")654 if __name__ == '__main__':3 print("main.py")2 print("ONVIFClientManager")1102 #test_client()12 wang_client = client.Client('192.168.1.46', 'admin', 'Admin123')3 if not wang_client.connect():4 exit(0)5 test_ptz(wang_client)
但是目前没有测试。因为我的摄像头不支持 ptz ,运行的话会报错。
暂时先搁置。
然后就是将代码 上传gitee.