Python 链接各种中间件[Mysql\redis\mssql\tdengine]

文章目录

  • 链接参数
  • 设置logger 日志
  • redis 链接
  • mysql 链接
  • emqx 链接
  • mssql 链接
  • tdengine 链接
  • 采集OPCUA的点表的配置信息
    • 设备
    • 点表
  • OPCUA 采集 数据程序
  • 数据采集逻辑

链接参数

import randomtdengine_connection_params = {'username': 'root','password': 'taosdata','host': '127.0.0.1','port': 6030,'database': 'test'
}mysql_connection_params = {'username': 'root','password': 'root','host': '127.0.0.1','port': 3306,'database': 'th'
}mssql_connection_params = {'username': 'sa','password': 'Z','host': '127.0.0.1','port': 1433,'database': 'SistarData','driver': 'ODBC+Driver+17+for+SQL+Server'
}redis_connection_params = {'host': 'localhost',  # Redis 服务器的地址'port': 6379,  # Redis 服务器的端口'db': 0,  # 使用的 Redis 数据库编号'max_connections': 10,  # 连接池的最大连接数'decode_responses': True
}emqx_connection_params = {'broker': '127.0.0.1','port': 1883,'client_id': f'python_mqtt_client_{random.randint(1, 10)}','keep_alive_interval': 60,'password': None,'username': None,'topic_sub': None,'topic_pub': None
}logger_params = {"logger_name": "opcua_adapter_logger","total_level": 20,"file_handler_level": 10,"control_handler_level": 20,"file_name": r"E:\TH\core\basic_service\opcua_adapter\logs\opcua_adapter_logger.txt","mode": "a","max_bytes": 10485760,"backup_count": 10,"encoding": "UTF-8","format": "%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s"
}opcua_adapter_params = {'alarm_consumer_queue_length': 100,'archive_consumer_queue_length': 100,'emqx_consumer_queue_length': 100,'acquisition_frequency':1, # 采集周期'monitor_frequency': 2, # 监控周期'alarm_table_name': 'alarm', # 报警的表格'archive_table_name': 'test', # 归档的数据库名称'emqx_worker': 1,'alarm_worker': 5,'archive_worker':5
}

设置logger 日志

import logging
from concurrent_log_handler import ConcurrentRotatingFileHandlerfrom config import logger_paramsdef get_logger(logger_name, total_level, file_name, mode, max_bytes, backup_count, encoding, file_handler_level,control_handler_level, format) -> logging.getLogger():logger = logging.getLogger(logger_name)logger.setLevel(total_level)  # 设置日志级别file_handler = ConcurrentRotatingFileHandler(filename=file_name,mode=mode,maxBytes=max_bytes,backupCount=backup_count,encoding=encoding)  # 设置输出文件file_handler.setLevel(file_handler_level)control_handler = logging.StreamHandler()control_handler.setLevel(control_handler_level)formatter = logging.Formatter(format)file_handler.setFormatter(formatter)control_handler.setFormatter(formatter)logger.addHandler(file_handler)logger.addHandler(control_handler)return loggerlogger = get_logger(**logger_params)

redis 链接

import redisfrom basic_service.opcua_adapter.config import redis_connection_paramspool = redis.ConnectionPool(host=redis_connection_params['host'],port=redis_connection_params['port'],db=redis_connection_params['db'],max_connections=redis_connection_params['max_connections'],decode_responses=redis_connection_params['decode_responses'],)# 获取连接对象
def get_redis_connection():while True:try:redis_conn = redis.Redis(connection_pool=pool)response = redis_conn.ping()if response:return redis_connelse:continueexcept Exception as e:print.error(str(e))conn = get_redis_connection()

mysql 链接

"""
用于连接tdengine
"""
import time
from basic_service.opcua_adapter.get_logger import logger
from basic_service.opcua_adapter.config import mysql_connection_paramsfrom sqlalchemy import create_engineengine = create_engine(f"mysql+pymysql://{mysql_connection_params['username']}:{mysql_connection_params['password']}@{mysql_connection_params['host']}:{mysql_connection_params['port']}/{mysql_connection_params['database']}")def get_mysql_connection():while True:try:conn = engine.connect()return connexcept Exception:logger.error(f"MySQL连接建立失败,请检查网络是否畅通(ping {mysql_connection_params['host']}),服务器是否正常,是否开启远程连接")time.sleep(3)continueget_mysql_connection()

emqx 链接

import paho.mqtt.client as mqtt
import timefrom basic_service.opcua_adapter.config import emqx_connection_params
from get_logger import loggerclass MQTTClient:def __init__(self, broker, port, client_id, keep_alive_interval, password=None, username=None, topic_sub=None,topic_pub=None):self.broker = brokerself.port = portself.client_id = client_idself.keep_alive_interval = keep_alive_intervalself.topic_sub = topic_subself.topic_pub = topic_pubself.password = passwordself.username = username# 创建 MQTT 客户端self.client = mqtt.Client(client_id=self.client_id)if self.username and self.password:self.client.username_pw_set(username=self.username, password=self.password)  # 如果有需要设置用户名密码self.client.on_connect = self.on_connectself.client.on_disconnect = self.on_disconnectself.client.on_message = self.on_messageself.client.on_publish = self.on_publishself.connect()self.client.loop_start()def on_connect(self, client, userdata, flags, rc):if rc == 0:logger.info(f"Connected to MQTT Broker! Returned code={rc}")if self.topic_sub:client.subscribe(self.topic_sub)  # 连接成功后,订阅主题else:logger.error(f"Failed to connect, return code {rc}")def on_disconnect(self, client, userdata, rc):logger.info("Disconnected from MQTT Broker with code: " + str(rc))# 这里可以实现重连逻辑self.reconnect()def on_message(self, client, userdata, msg):# logger.info(f"Received message: {msg.topic} -> {msg.payload.decode()}")passdef on_publish(self, client, userdata, mid):# print(f"Message {mid} has been published.")pass# 重连机制def reconnect(self):try:logger.error("Attempting to reconnect...")self.client.reconnect()  # 尝试重连except Exception as e:logger.error(f"Reconnection failed: {e}")time.sleep(5)  # 延时后再尝试def connect(self):try:self.client.connect(self.broker, self.port, self.keep_alive_interval)except Exception as e:logger.error(f"Connection failed: {e}")self.reconnect()get_emqx_connection = MQTTClient(**emqx_connection_params)# 主循环发布消息
# try:
#     while True:
#         message = "Hello MQTT"
#         result = mqtt_client.client.publish('/test', message, qos=0)
#         status = result[0]
#         if status == 0:
#             print(f"Sent `{message}` to topic /test")
#         else:
#             print(f"Failed to send message to topic /test")
#         time.sleep(10)  # 每10秒发布一次
# except KeyboardInterrupt:
#     print("Interrupted by user, stopping...")
# finally:
#     mqtt_client.client.loop_stop()
#     mqtt_client.client.disconnect()

mssql 链接

"""
用于连接tdengine
"""
import time
#from basic_service.opcua_adapter.get_logger import logger
from basic_service.opcua_adapter.config import mysql_connection_params, mssql_connection_paramsfrom sqlalchemy import create_engine, textengine = create_engine(f'mssql+pyodbc://sa:Z#@127.0.0.1:1433/sistarData?driver=ODBC+Driver+17+for+SQL+Server')def get_mssql_connection():while True:try:conn = engine.connect()return connexcept Exception:logger.error(f"MSSQL连接建立失败,请检查网络是否畅通(ping {mssql_connection_params['host']}),服务器是否正常,是否开启远程连接")time.sleep(3)continueconn = get_mssql_connection()result = conn.execute(text("SELECT TABLE_NAME,COLUMN_NAME,COLUMN_DEFAULT,IS_NULLABLE,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'sistar_eng_areas';"))print(result.fetchall())

tdengine 链接

"""
用于连接tdengine
"""
import time
from basic_service.opcua_adapter.get_logger import logger
from basic_service.opcua_adapter.config import tdengine_connection_paramsfrom sqlalchemy import create_engineengine = create_engine(f"taos://{tdengine_connection_params['username']}:{tdengine_connection_params['password']}@{tdengine_connection_params['host']}:{tdengine_connection_params['port']}/{tdengine_connection_params['database']}")def get_tdengine_connection():while True:try:conn = engine.connect()return connexcept Exception:logger.error(f"TDEngine连接建立失败,请检查网络是否畅通(ping {tdengine_connection_params['host']}),服务器是否正常,是否开启远程连接,是否安装与服务器相同版本的客户端")time.sleep(3)continue

采集OPCUA的点表的配置信息

设备

[{"No.": 1,"device_name": "device1","url": "opc.tcp:\/\/192.168.10.132:4862"},{"No.": 2,"device_name": "device2","url": "opc.tcp:\/\/192.168.10.132:4863"}
]

点表

[{"No": 1,"tag_uuid": "tag0001","node_id": "ns=1;s=t|tag1","interval": 1,"active": true,"active_alarm": true,"alarm_up": 100,"alarm_down": 10,"alarm_up_info": "\u4e0a\u9650\u62a5\u8b66","alarm_down_info": "\u4e0b\u9650\u62a5\u8b66","alarm_up_change": null,"alarm_down_change": null,"active_archive": true,"archive_onchange": true,"archive_interval": 1,"active_scale": true,"scale_sign": "add","scale_factor": 1,"mqtt_topic_name": "topic_name:\/opcua\/device1\/group1,Qos:0;topic_name:\/opcua\/device1\/group2,Qos:1\n","unit": "m\u00b3\/H","comments": "\u704c\u88c5\u673a\u901f\u5ea61"},{"No": 2,"tag_uuid": "tag0002","node_id": "ns=1;s=t|tag2","interval": 1,"active": true,"active_alarm": true,"alarm_up": 100,"alarm_down": 10,"alarm_up_info": "\u4e0a\u9650\u62a5\u8b66","alarm_down_info": "\u4e0b\u9650\u62a5\u8b66","alarm_up_change": null,"alarm_down_change": null,"active_archive": true,"archive_onchange": true,"archive_interval": 1,"active_scale": true,"scale_sign": "sub","scale_factor": 2,"mqtt_topic_name": "topic_name:\/opcua\/device1\/group1,Qos:0;topic_name:\/opcua\/device1\/group2,Qos:1\n","unit": "m\u00b3\/H","comments": "\u704c\u88c5\u673a\u901f\u5ea62"}]

在这里插入图片描述

在这里插入图片描述

用户可以支持从excel导入数据。

从excel 上传数据到 系统中

"""
用于初始化,创建数据库,创建数据库表
"""
import randomimport pandas as pd
from sqlalchemy import text
from connectors.tdengine import get_tdengine_connection
from get_logger import loggerxlsx = r'E:\TH\core\basic_service\opcua_adapter\static\opcua_template.xlsx'connection = get_tdengine_connection()db_name = 'test'
stable = 'meters'df = pd.read_excel(xlsx, sheet_name='device1')tag_uuid = df.loc[:, ['tag_uuid', 'unit']]create_table_sql = ''for item in tag_uuid.values:_sql = f"""CREATE TABLE IF NOT EXISTS {db_name}.{item[0]} USING {db_name}.{stable} TAGS ('{item[1]}', '{random.randint(1, 10)}');"""create_table_sql += _sqlcreate_db_stable_sql = f"""CREATE DATABASE IF NOT EXISTS {db_name} KEEP 3650;
use {db_name};
CREATE STABLE IF NOT EXISTS {db_name}.{stable} (ts TIMESTAMP, val FLOAT) TAGS ( unit BINARY(20),   s_type BINARY(20));
"""
total_sql = create_db_stable_sql + create_table_sql
print(total_sql)
try:connection.execute(text(total_sql))connection.commit()logger.info('init success')
except Exception as e:print(str(e))connection.rollback()logger.error('init failed')

OPCUA 采集 数据程序

import datetime
import json
import os
import queue
import threading
import time
from threading import Thread
from typing import List, Any
import opcua.client.client
import psutil as psutil
from opcua.client.client import Client
from opcua.common.node import Node
from opcua.ua import UaStatusCodeError
from config import opcua_adapter_params
from connectors._redis import get_redis_connection
from connectors.emqx import get_emqx_connection
from connectors.mysql import get_mysql_connection
from connectors.tdengine import get_tdengine_connection
from get_logger import logger
from sqlalchemy import textemqx_connection = get_emqx_connection.clientclass OPCUAAdapter(object):"""OPCUA数据采集类"""def __init__(self, device_info, tag_id_to_node_id_map, tag_id_to_detail_map, use_subscribe=True):self.url: str = device_info.get('device_url')self._ua: opcua.client.client.Client = Client(url=self.url, timeout=5)self.connected: bool = Falseself.thread_list: List[Thread] = []self.raw_dict = {}self.tag_id_to_node_id_map = tag_id_to_node_id_mapself.tag_id_to_detail_map = tag_id_to_detail_mapself.device_info = device_infoself.alarm_consumer_queue = queue.Queue(maxsize=opcua_adapter_params['alarm_consumer_queue_length'])self.archive_consumer_queue = queue.Queue(maxsize=opcua_adapter_params['archive_consumer_queue_length'])self.emqx_consumer_queue = queue.Queue(maxsize=opcua_adapter_params['emqx_consumer_queue_length'])def connect(self):"""连接函数:return:"""try:if self.connected:returnelse:self._ua.connect()self.connected = Truelogger.info(f"初次连接{self.url}成功!")returnexcept Exception as e:self.disconnect()self.connected = Falselogger.error("初次连接失败,失败原因:", e)# 开始重连self.reconnect()def disconnect(self):"""断开连接:return:"""try:if self._ua and self.connected:self._ua.disconnect()self.connected = Falselogger.info("主动断开连接成功")except Exception as e:logger.error("主动断开连接失败,失败原因:", str(e))def reconnect(self):"""重连:return:"""index = 0while True:try:self._ua.connect()self.connected = Trueindex = 0logger.info(f"重连{self.url}成功!")returnexcept AttributeError as e:index += 1logger.error(f"第{index}次重连失败,失败原因:{str(e)}!")self.connected = Falsetime.sleep(index * 1)continueexcept ConnectionRefusedError as e:index += 1logger.error(f"第{index}次重连失败,失败原因:{str(e)}!")self.connected = Falsetime.sleep(index * 1)continueexcept OSError as e:index += 1logger.error(f"与OPCUA服务器未能建立连接,失败原因:{str(e)}!")self.connected = Falsetime.sleep(index * 1)continueexcept Exception as e:index += 1logger.error(f"第{index}次重连失败,失败原因:{str(e)}!")self.connected = Falsetime.sleep(index * 1)continuedef interval_read(self, interval: int) -> None:"""按照采集频率定时去采集:param interval::return:"""connection = get_redis_connection()thread_name = threading.current_thread().namenodes = []while True:# 每分钟采集多少次,采集超时多少次,采集node数量,多少个node是Nonestart_time = time.time()if not self.connected:# 如果没有连接成功,开启重连self.reconnect()else:try:nodes_str_list = self.tag_id_to_node_id_map.keys()nodes = [self._ua.get_node(node) for node in nodes_str_list]values = self._ua.get_values(nodes)self.raw_dict = dict(zip(nodes_str_list, values))except AttributeError as e:logger.error(f"属性读取错误:{str(e)}!")except TimeoutError:logger.error(f"接收服务端报文超时")except ConnectionRefusedError as e:self.disconnect()self.reconnect()logger.error(f"数据获取失败,失败原因:{str(e)}!")except ConnectionAbortedError as e:self.disconnect()self.reconnect()logger.error(f"数据获取失败,失败原因:{str(e)}!")except UaStatusCodeError as e:self.disconnect()self.reconnect()logger.error(f"数据获取失败,失败原因:{str(e)}!")except OSError as e:self.disconnect()self.reconnect()logger.error(f"数据获取失败,失败原因:{str(e)}!")except RuntimeError as e:self.disconnect()self.reconnect()logger.error(f'运行错误,失败原因:{str(e)}')except Exception as e:self.disconnect()self.reconnect()logger.error(f"未捕获到的异常:{str(e)}")finally:end_time = time.time()try:connection.hmset('performance',mapping={'nodes': len(nodes), f'{thread_name}_use_time': f'{(end_time - start_time):.2f}'})except Exception:if connection:connection.close()connection = get_redis_connection()time.sleep(interval)def node_write(self, nodes: List[Node], values: List[Any]):"""写入node:param nodes::param values::return:"""try:self._ua.set_values(nodes, values)except Exception as e:logger.error(f"数据写入失败,失败原因:{str(e)}")def monitor_thread(self):"""监视线程:return:"""redis_connection = get_redis_connection()while True:try:current_process_id = os.getpid()process = psutil.Process(current_process_id)# 获取进程的基本信息process_name = process.name()cpu_usage = process.cpu_percent(interval=1)  # 进程的 CPU 使用率,间隔 1 秒memory_info = process.memory_info()  # 进程的内存使用情况io_counters = process.io_counters()  # 进程的 IO 计数disk_usage = psutil.disk_usage('/')  # 获取根目录的磁盘使用情况thread_list = []for thread in threading.enumerate():thread_list.append((thread.ident, thread.name, thread.is_alive()))if thread.name == 'continuous_thread' and thread.is_alive() == False:logger.error(f'读取线程出错,请尽快联系管理员处理!')redis_connection.hmset(name='performance',mapping={'process_name': process_name, 'cpu_usage': cpu_usage,'memory_info_RSS': f'{memory_info.rss / (1024 * 1024):.2f} MB','memory_info_VMS': f'{memory_info.vms / (1024 * 1024): .2f} MB','io_read': f"{io_counters.read_bytes / (1024 * 1024):.2f} MB",'io_write': f"{io_counters.write_bytes / (1024 * 1024):.2f} MB",'disk_usage': f'{disk_usage.percent}%','threads': json.dumps(thread_list),'pid': current_process_id,'archive_consumer_length': self.archive_consumer_queue.qsize(),'alarm_consumer_length': self.alarm_consumer_queue.qsize(),'emqx_consumer_length': self.emqx_consumer_queue.qsize(),})except Exception as e:logger.error(f'监控子线程出错:{str(e)}')if redis_connection:redis_connection.close()redis_connection = get_redis_connection()finally:time.sleep(int(opcua_adapter_params['monitor_frequency']))def change_data_notifier(self, timestamp, node_id, new_data, old_data):""":param timestamp::param node::param new_data::param old_data::return:"""try:tag_id = self.tag_id_to_node_id_map[node_id]except KeyError:passelse:content = {'timestamp': timestamp,'tag_id': tag_id,'new_data': new_data,'old_data': old_data}self.alarm_consumer_queue.put(content)self.emqx_consumer_queue.put(content)self.archive_consumer_queue.put(content)def consumer_alarm_info(self):"""处理报警信息:return:"""redis_connection = get_redis_connection()alarm_table_name = opcua_adapter_params['alarm_table_name']connection = get_mysql_connection()thread_name = threading.current_thread().namealarm_count = 0while True:try:start_time = time.time()content = self.alarm_consumer_queue.get()tag_detail = self.tag_id_to_detail_map.get(content['tag_id'])active_alarm = tag_detail.get('active_alarm')if active_alarm:up_limit = tag_detail.get('alarm_up')down_limit = tag_detail.get('alarm_down')if float(content['new_data']) > float(up_limit):sql = f"""insert into {alarm_table_name} (device_name, tag_uuid, tag_name,alarm_message,alarm_limit,value) values ("{self.device_info['device_name']}","{content["tag_id"]}", "{tag_detail.get("comments")}", "{tag_detail.get("alarm_up_info")}", "{tag_detail.get("alarm_up")}", "{content["new_data"]}")"""try:connection.execute(text(sql))connection.commit()alarm_count += 1except Exception as e:logger.error(f'数据插入错误,错误原因:{str(e)}!')connection.rollback()elif float(content['new_data']) < float(down_limit):sql = f"""insert into {alarm_table_name} (device_name, tag_uuid, tag_name,alarm_message,alarm_limit,value) values ("{self.device_info['device_name']}","{content["tag_id"]}", "{tag_detail.get("comments")}", "{tag_detail.get("alarm_down_info")}", "{tag_detail.get("alarm_down")}", "{content["new_data"]}")"""try:connection.execute(text(sql))connection.commit()alarm_count += 1except Exception as e:logger.error(f'数据插入错误,错误原因:{str(e)}!')connection.rollback()except Exception as e:logger.error(str(e))if connection:connection.close()connection = get_mysql_connection()finally:end_time = time.time()try:redis_connection.hmset('performance',mapping={f'{thread_name}_use_time': f'{(end_time - start_time):.2f}','alarm_count': alarm_count})except Exception:if redis_connection:redis_connection.close()redis_connection = get_redis_connection()time.sleep(2)def consumer_archive_info(self):thread_name = threading.current_thread().nameredis_connection = get_redis_connection()connection = get_tdengine_connection()exception_buffer = {}buffer = {}db_name = opcua_adapter_params['archive_table_name']exception_time = 0total_time = 0for k in self.tag_id_to_node_id_map.values():buffer.setdefault(k, [])# 异常buffer,当数据没有被正常插入时,数据不被丢弃,放到异常队列中,一旦恢复了连接,先将异常队列中的数据恢复exception_buffer.setdefault(k, [])while True:try:start_time = time.time()for k, v in exception_buffer.items():if len(v) > 0:sql = f"""INSERT INTO {db_name}.{k} VALUES {str(v).replace('[', '').replace(']', '')}"""try:connection.execute(sql)connection.commit()exception_buffer.setdefault(k, [])except Exception:exception_time += 1connection.rollback()content = self.archive_consumer_queue.get()tag_detail = self.tag_id_to_detail_map.get(content['tag_id'])if tag_detail.get('active_archive'):timestamp = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')try:data = float(content['new_data'])except ValueError:data = 0.0except TypeError:data = 0.0except Exception:data = 0.0if len(buffer[content['tag_id']]) < 100:buffer[content['tag_id']].append((timestamp, data))else:sql = f"""INSERT INTO {db_name}.{content['tag_id']} VALUES {str(buffer[content["tag_id"]]).replace('[', '').replace(']', '')};"""try:connection.execute(text(sql))connection.commit()buffer[content['tag_id']] = []except Exception:logger.error(f'insert error:{sql}')connection.rollback()exception_time += 1if len(exception_buffer) < 10000:exception_buffer[content['tag_id']].extend(buffer[content['tag_id']])buffer[content['tag_id']] = []else:# 如果超过设定的缓存值滞后,将旧值丢弃掉exception_buffer[content['tag_id']] = exception_buffer[content['tag_id']][100:]total_time += 1except Exception as e:logger.error(str(e))exception_time += 1if connection:connection.close()connection = get_tdengine_connection()finally:end_time = time.time()try:redis_connection.hmset('performance',mapping={'buffer': len(buffer), 'exception_buffer': len(exception_buffer),f'{thread_name}_use_time': f'{(end_time - start_time):.2f}','total_time': total_time,'exception_time': exception_time})except Exception:if redis_connection:redis_connection.close()redis_connection = get_redis_connection()time.sleep(2)def consumer_emqx_info(self):while True:try:content = self.emqx_consumer_queue.get()tag_detail = self.tag_id_to_detail_map.get(content['tag_id'])mqtt_topic_str = tag_detail.get('mqtt_topic_name')topic_list = []for topic in mqtt_topic_str.split(';'):topic_name, qos = topic.split(',')topic_list.append({'topic_name': topic_name.split(':')[1].strip().replace('\n', ''),'qos': qos.split(':')[1].strip().replace('\n', '')})payload = json.dumps({content['tag_id']: content['new_data']})for topic in topic_list:emqx_connection.publish(topic=topic['topic_name'], payload=payload, qos=int(topic['qos']))except Exception as e:logger.error(str(e))def subscribe_data_change(self):copy_raw_dict = self.raw_dict.copy()flag = Falsewhile True:d1_keys = self.raw_dict.keys()d2_keys = copy_raw_dict.keys()if _ := d1_keys - d2_keys:flag = Truefor k in list(_):self.change_data_notifier(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), k,self.raw_dict[k],0)if _ := d2_keys - d1_keys:flag = Truefor k in list(_):self.change_data_notifier(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), k, 0,self.raw_dict[k])commen_keys = d1_keys & d2_keysfor key in commen_keys:if copy_raw_dict[key] != self.raw_dict[key]:self.change_data_notifier(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), key,copy_raw_dict[key], self.raw_dict[key])flag = Trueif flag:copy_raw_dict = self.raw_dict.copy()flag = Falsetime.sleep(0.5)def run(self):"""启动服务:return:"""try:self.connect()interval_acqusition_task = Thread(target=self.interval_read, name='continuous_thread', args=(1,))monitor_thread_task = Thread(target=self.monitor_thread, name='monitor_thread')subscribe_thread_task = Thread(target=self.subscribe_data_change, name='subscribe_thread')for i in range(opcua_adapter_params['alarm_worker']):consumer_alarm_info_task = Thread(target=self.consumer_alarm_info, name=f'consumer_alarm_info_{i+1}')self.thread_list.append(consumer_alarm_info_task)for i in range(opcua_adapter_params['archive_worker']):consumer_archive_info_task = Thread(target=self.consumer_archive_info, name=f'consumer_archive_info_{i+1}')self.thread_list.append(consumer_archive_info_task)for i in range(opcua_adapter_params['emqx_worker']):consumer_emqx_info_task = Thread(target=self.consumer_emqx_info, name=f'consumer_emqx_info_{i+1}')self.thread_list.append(consumer_emqx_info_task)self.thread_list.append(interval_acqusition_task)self.thread_list.append(monitor_thread_task)self.thread_list.append(subscribe_thread_task)for th in self.thread_list:th.start()for th in self.thread_list:th.join()except Exception as e:logger.error(str(e))finally:get_emqx_connection.client.loop_stop()get_emqx_connection.client.disconnect()def init():conn = get_redis_connection()while True:try:device_info = json.loads(conn.get('device_info'))tag_id_to_node_id_map = json.loads(conn.get('tag_id_to_node_id_map'))tag_id_to_detail_map = json.loads(conn.get('tag_id_to_detail_map'))if device_info and tag_id_to_detail_map and tag_id_to_node_id_map:return device_info, tag_id_to_node_id_map, tag_id_to_detail_mapelse:logger.error('init error')time.sleep(3)continueexcept Exception:logger.error('Init Failed!')time.sleep(3)continuedef main():device_info, tag_id_to_node_id_map, tag_id_to_detail_map = init()opcua_adapter = OPCUAAdapter(device_info=device_info, tag_id_to_node_id_map=tag_id_to_node_id_map,tag_id_to_detail_map=tag_id_to_detail_map)opcua_adapter.run()if __name__ == '__main__':main()

数据采集逻辑

# 断线重连机制
多进程中# 多进程模型
一个设备一个进程# 多线程模型
不同的采集频率一个线程,一个进程中的多个线程共享一个链接,进程# 客户端采用订阅的方式# 客户端采用轮旋的方式# 字段的含义
字段名称                字段含义            数值
No                     自增id
tag_uuid	           变量的唯一ID
node_id	               UA node_id
interval	           采集周期,以秒为单位
active	               是否激活,TURE, FALSE 必须大写
active_alarm           是否激活报警
alarm_up               报警上限
alarm_down             报警下限
alarm_up_info          报警上限值
alarm_down_info        报警下限值
alarm_up_change        达到报警上限的修改值
alarm_down_change      达到报警下限的修改值
active_archive         激活归档
archive_onchange       变化时归档 为FALSE archive_interval有效
archive_interval       轮训归档周期
active_scale           激活变换
scale_sign             符号
scale_factor           因子
mqtt_topic_name
unit
comments

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/916228.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/916228.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C Primer Plus 第6版 编程练习——第11章(上)

本章共16题&#xff0c;分上中下三篇1.设计并测试一个函数&#xff0c;从输入中获取n个字符&#xff08;包括空白、制表符、换行符)&#xff0c;把结果存储在一个数组里&#xff0c;它的地址被传递作为一个参数。int get_n_char(char arr[], int n) {int i 0;char ch;while (i…

Java开发岗面试记录合集

一、Java 核心1. 基础语法final关键字的作用修饰类&#xff1a;类不可被继承&#xff08;如String类&#xff09;&#xff0c;保证类的稳定性和安全性。修饰方法&#xff1a;方法不可被重写&#xff08;防止子类篡改父类核心逻辑&#xff0c;如工具类方法&#xff09;。修饰变量…

Linux 系统时间设置(date 和 ntpdate)-linux028

date 命令&#xff1a;查看或设置系统时间1. 查看当前时间date示例输出&#xff1a;Tue Mar 4 01:36:45 CST 20142. 设置时间&#xff08;不设置日期&#xff09;date -s 09:38:40设置后输出&#xff1a;Tue Mar 4 09:38:40 CST 20143. 设置完整日期和时间&#xff08;推荐格…

iOS上使用WebRTC推拉流的案例

一、库集成 首先&#xff0c;确保在你的 Podfile 中添加依赖&#xff1a; pod GoogleWebRTC然后执行 pod install 安装库。 二、代码示例 2.1、权限配置&#xff1a;在 Info.plist 中添加摄像头、麦克风权限 <!-- 需要在 Info.plist 中添加以下权限 --> <key>NSCam…

API: return response as HTML table

想要把response table变成HTML的table&#xff0c;即想达到下面这种的话<table boarder"1" style"width:100%; boarder-collapse: collapse; text-align:left"><tr><th>Customer</th><th>Date</th><th>Debit Am…

OneNote 当前无法同步笔记。将继续尝试。 (错误代码: 0xE00009C8 bb0ur)问题解决

之前因为同步错误&#xff0c;导致OneNote一个笔记本内容全部消失&#xff0c;笔记本内容如下图同步状态和错误如下&#xff1a;提醒错误为&#xff1a;OneNote 当前无法同步笔记。将继续尝试。 (错误代码: 0xE00009C8 bb0ur)当时心态有点崩&#xff0c;也是查了好些资料&#…

OneCode3.0 Gallery 组件前后端映射机制:从注解配置到前端渲染的完整链路

一、注解体系与前端组件的映射基础 ​ OneCode Gallery 组件实现了 Java 注解与前端 UI 组件的深度绑定&#xff0c;通过GalleryAnnotation、GalleryItemAnnotation和GalleryViewAnnotation三个核心注解&#xff0c;构建了从后端配置到前端渲染的完整链路。这种映射机制的核心价…

规则分配脚本

需求&#xff1a; 1.根据用户编写的要报规则,去mysql库里SysManage_Rule表获取已经启用的规则作为条件&#xff08;例如[{“field”: “关键词”, “logic”: “AND”, “value”: “阿尔法”, “operator”: “”&#xff0c;, “assign_user”: “user222”}]&#xff09;条…

SEO实战派白杨SEO:SEO中说的框计算、知心搜索(知识图谱)是什么?有什么用处?

SEO里框计算是什么&#xff1f;有什么用处&#xff1f;SEO里框计划算是百度2010年提出的&#xff0c;指当用户搜索某些关键词查询时&#xff0c;搜索引擎在结果页直接展示答案的技术&#xff08;如天气、汇率等&#xff09;&#xff0c;用户无需点击网站即可获取信息&#xff0…

软件工程:软件需求

简介本篇博客记录了我在软件工程学习过程中关于软件需求与面向对象基础知识的学习体会和要点总结。博客共分为三个关卡内容&#xff1a;第1关围绕“软件需求”的定义、分类及分析过程展开&#xff0c;让我清晰地理解了功能性需求、非功能性需求与约束条件的区别&#xff1b;第2…

MES系统是什么,有哪些特性?

MES系统是一套面向制造企业车间执行层的生产信息化管理系统。它能够为操作人员和管理人员提供计划的执行、跟踪以及所有资源&#xff08;包括人、设备、物料、客户需求等&#xff09;的当前状态。通过MES系统可以对从订单下达到产品完成的整个生产过程进行优化管理。当工厂发生…

Vue2下

六&#xff1a;vue-router &#xff08;重要&#xff09; &#xff08;一&#xff09;. 对路由的理解 1.什么是路由 路由&#xff08;Router&#xff09; 是管理页面跳转和 URL 与视图映射关系的机制&#xff0c;核心作用是&#xff1a;根据不同的 URL 路径&#xff0c;展示对…

在 Windows 上安装设置 MongoDB及常见问题

介绍 MongoDB 是一个开源的 NoSQL 数据库系统&#xff0c;它以一种灵活的类似 JSON 的格式&#xff08;称为 BSON&#xff08;二进制 JSON&#xff09;&#xff09;存储数据。它使用动态模式&#xff0c;这意味着与关系型数据库不同&#xff0c;MongoDB 不需要在向数据库添加数…

Effective C++ 条款01:视 C++ 为一个语言联邦

Effective C 条款01&#xff1a;视 C 为一个语言联邦核心思想&#xff1a;C 是由多个子语言组成的联邦&#xff0c;每个子语言有自己的编程范式。理解这些子语言及其规则切换&#xff0c;是写出高效 C 代码的关键。 四个子语言及其规则&#xff1a; C 语言 基础&#xff1a;过程…

云效CI/CD教程(PHP项目)

参考文档 参考云效的官方文档https://help.aliyun.com/zh/yunxiao/ 一、新建代码库 这是第一步&#xff0c;和码云的差不多 二、配SSH密钥 这个和码云&#xff0c;github上类似&#xff0c;都需要&#xff0c;云效的SSH密钥证书不是采用 RSA算法&#xff0c;而是采用了ED2…

单片机是怎么控制的

单片机作为电子系统的控制核心&#xff0c;通过接收外部信号、执行预设程序、驱动外部设备的方式实现控制功能&#xff0c;其控制过程涉及信号输入、数据处理和指令输出三个关键环节&#xff0c;每个环节的协同配合决定了整体控制效果。 信号输入&#xff1a;获取外部信息 单片…

deepseek本地部署,轻松实现编程自由

小伙伴们&#xff0c;大家好&#xff0c;今天我们来实现deepseek本地部署&#xff0c;轻松实现编程自由&#xff01;安装ollama 安装ollama 首先我们安装ollama 打开ollama官网&#xff0c;下载安装符合自己系统的版本。 找到要安装的模型deepseek-r1开始-运行 输入cmd出现…

基础NLP | 常用工具

编辑器 PycharmVSCodeSpyderPython 自带 ideVim 机器学习相关python框架 Pytorch 学术界宠儿&#xff0c;调试方便&#xff0c;目前的主流Tensorflow 大名鼎鼎&#xff0c;工程配套完善Keras 高级封装&#xff0c;简单好用&#xff0c;现已和Tensorflow合体Gensim 训练词向…

Unity3D + VR头显 × RTSP|RTMP播放器:构建沉浸式远程诊疗系统的技术实践

一、背景&#xff1a;远程医疗迈入“沉浸式协同”的新阶段 过去&#xff0c;远程医疗主要依赖视频会议系统&#xff0c;实现基础的远程问诊、会诊或术中指导。虽然初步解决了地域限制问题&#xff0c;但其单视角、平面化、缺乏沉浸感与交互性的特征&#xff0c;已无法满足临床…

海云安斩获“智能金融创新应用“标杆案例 彰显AI安全左移技术创新实力

近日&#xff0c;由中国人民银行广东省分行、广东省金融管理局、广东省政务服务和数据管理局指导&#xff0c;广东省金融科技协会主办的“智能金融 创新应用”优秀案例名单最终揭晓&#xff0c;海云安开发者安全助手系统项目凭借其创新的"AI安全左移"技术架构&#x…