规则分配脚本

需求:

1.根据用户编写的要报规则,去mysql库里SysManage_Rule表获取已经启用的规则作为条件(例如[{“field”: “关键词”, “logic”: “AND”, “value”: “阿尔法”, “operator”: “=”,, “assign_user”: “user222”}])条件即为:关键词=阿尔法
2.根据此条件去lxdb的all_report表进行查询,查询逻辑是每10min获取最新数据,满足条件的对all_report表的report_handler字段打上分配人名以及yb_importanceid填上要报规则的id
要报规则页面:
在这里插入图片描述
SysManage_Rule表
在这里插入图片描述

解决办法

规则分配脚本(supervisor运行):

import argparse
import os
import django
import time
from datetime import datetime, timedelta
import sys
import json# 添加项目路径,确保 Django 配置正确
sys.path.append('/home/rpadmin/web/yb-backend')# 初始化 Django 环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "yb.settings")
django.setup()# 导入自定义模块和参数
from DBreport.functions.get_table_mapping import get_gather_table
from wrapper.Params import TOREPORT, GATHER_TABLE
from DBconn.DBfunctions.dynamicQueryBase import DynamicQueryBaseEngine
from SysManage.models import SysmanageRule# 字段名映射:中文字段名 -> 英文字段名
field_mapping = {"报文要素": "text_content","语种": "audio_languagename","关键词": "keyword","文种": "text_lang","实体": "text_entity","来源手段": "hj_means"
}# 初始记录时间,第一次运行查很早之前的数据
last_run_time = datetime.now() - timedelta(minutes=1600000)def replace_field_names(data, mapping):new_data = {}for user, rules in data.items():new_rules = []for rule in rules:new_rule = rule.copy()if new_rule.get('field') in mapping:new_rule['field'] = mapping[new_rule['field']]new_rules.append(new_rule)new_data[user] = new_rulesreturn new_datadef run_rule_dispatch():global last_run_time# 获取数据库连接和数据表对象db_conn = get_gather_table()gather_table = db_conn.load_table(GATHER_TABLE)print("[调试] all_report 字段列表:", [col.name for col in gather_table.columns])# 查询所有启用状态的规则rules = SysmanageRule.objects.filter(rule_status=1)for rule in rules:rule_id = rule.idrule_name = rule.rule_namerule_content = rule.rule_content# 检查规则内容格式是否为列表if isinstance(rule_content, list):conditions = rule_contentelse:print(f"[跳过] 规则《{rule_name}》内容应为列表格式")continue# 替换条件中的字段名conditions = replace_field_names({rule_name: conditions}, field_mapping)[rule_name]filters = []keyword_value = None  # 保存关键词值(如果存在)for cond in conditions:field = cond.get("field")operator = cond.get("operator")value = cond.get("value")if cond.get("assign_user"):assign_user = cond["assign_user"]if not field or not operator or value is None:continue# 如果字段为 keyword 且是等于操作,延迟处理if field == "keyword" and operator == "=":keyword_value = valueelse:filters.append((field, operator, value))# 增加时间过滤:只查上次运行之后新增的数据filters.append(('hj_createtime', '>=', last_run_time))# 关键词特殊处理:模糊匹配 audio_asr 或 trans_sidebyside 的 original_content/ translat_content 字段if keyword_value is not None:# audio_asr 模糊匹配filters_audio = filters + [('audio_asr', 'like', f"%{keyword_value}%")]# 执行原始查询(不带关键词)用于后续文本内容筛选raw_trans_records = db_conn.execute_dynamic_query(gather_table,filters,limit=5,  # 仅预览前5条,必要时可调整或移除ignore_fields=[])matched_trans_data = []for row in raw_trans_records['data']:try:trans_data = row.get('trans_sidebyside')if isinstance(trans_data, str):trans_data = json.loads(trans_data)  # JSON 反序列化if isinstance(trans_data, dict):original = trans_data.get("original_content", "")translated = trans_data.get("translat_content", "")if keyword_value in original or keyword_value in translated:matched_trans_data.append(row)except Exception:continue# 执行 audio_asr 匹配查询result1 = db_conn.execute_dynamic_query(gather_table, filters_audio, ignore_fields=[])# 合并两类结果(根据 lxid 去重)combined_data = {row['lxid']: row for row in result1['data']}for row in matched_trans_data:combined_data.setdefault(row['lxid'], row)total_count = len(combined_data)print(f"[调试] 条件命中记录数:{total_count}")if total_count:first_row = list(combined_data.values())[0]print("[调试] 命中示例记录:", first_row)# 构造更新内容updates = {'report_handler': assign_user,'yb_importanceid': str(rule_id),'yb_importancename': rule_name,'assign_rule_type': TOREPORT}# 遍历命中记录逐条更新affected = 0for row in combined_data.values():row_filter = [('lxid', '=', row['lxid'])]try:count = DynamicQueryBaseEngine.update_record(db_conn, gather_table, updates, row_filter)affected += countexcept Exception as e:print(f"[×] 更新失败:{str(e)}")print(f"[✓] 规则《{rule_name}》执行成功,分配人:{assign_user},更新{affected}条")continue# 正常流程处理:无关键词匹配逻辑print(f"[调试] 规则《{rule_name}》筛选条件:{filters}")result_data = db_conn.execute_dynamic_query(gather_table,filters,limit=5,ignore_fields=[])print(f"[调试] 条件命中记录数:{result_data['total_count']}")if result_data['data']:print("[调试] 命中示例记录:", result_data['data'][0])updates = {'report_handler': assign_user,'yb_importanceid': str(rule_id),'yb_importancename': rule_name,'assign_rule_type': TOREPORT}try:affected = DynamicQueryBaseEngine.update_record(db_conn, gather_table, updates, filters)print(f"[✓] 规则《{rule_name}》执行成功,分配人:{assign_user},更新{affected}条")except Exception as e:print(f"[×] 规则《{rule_name}》执行失败:{str(e)}")# 关闭连接,记录当前时间用于下轮过滤db_conn.close()last_run_time = datetime.now()if __name__ == "__main__":while True:print(f"\n[调试] 开始执行调度任务,当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")run_rule_dispatch()print(f"[调试] 任务执行完成,等待10分钟...\n")time.sleep(600)  # 每10分钟运行一次

指定分配人接口(即向rule_content字段里添加assign_user):

@class_operation_logger(operation_name="/指定要报规则分配人")
class RuleAssignUserUpdateAPIView(APIView):"""接收 assign_user 和 规则id,指定规则的 rule_content 中每一项的 assign_userPOST /api/sys/update-assign-user/Body:{"id": 2,"assign_user": "user002"}"""def post(self, request, *args, **kwargs):rule_id = request.data.get("id")assign_user = request.data.get("assign_user")if not rule_id or not assign_user:return ErrorResponse(data=False, msg="参数id 和 assign_user 都是必填的")# 获取要更新的规则rule = get_object_or_404(SysmanageRule, pk=rule_id)try:content_list = rule.rule_content or []if not isinstance(content_list, list):raise ValueError("rule_content 必须是列表")except Exception as e:return ErrorResponse(data=False, msg=f"读取 rule_content 失败:{str(e)}")# 创建或更新每一项的 assign_userfor cond in content_list:cond['assign_user'] = assign_user# 保存回库rule.rule_content = content_listrule.save(update_fields=['rule_content'])return SuccessResponse(msg=f"id为 {rule_id} 的规则的分配人已更新为 {assign_user}")

Supervisor工具详见文章Supervisor进程管理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/916220.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/916220.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SEO实战派白杨SEO:SEO中说的框计算、知心搜索(知识图谱)是什么?有什么用处?

SEO里框计算是什么?有什么用处?SEO里框计划算是百度2010年提出的,指当用户搜索某些关键词查询时,搜索引擎在结果页直接展示答案的技术(如天气、汇率等),用户无需点击网站即可获取信息&#xff0…

软件工程:软件需求

简介本篇博客记录了我在软件工程学习过程中关于软件需求与面向对象基础知识的学习体会和要点总结。博客共分为三个关卡内容:第1关围绕“软件需求”的定义、分类及分析过程展开,让我清晰地理解了功能性需求、非功能性需求与约束条件的区别;第2…

MES系统是什么,有哪些特性?

MES系统是一套面向制造企业车间执行层的生产信息化管理系统。它能够为操作人员和管理人员提供计划的执行、跟踪以及所有资源(包括人、设备、物料、客户需求等)的当前状态。通过MES系统可以对从订单下达到产品完成的整个生产过程进行优化管理。当工厂发生…

Vue2下

六:vue-router (重要) (一). 对路由的理解 1.什么是路由 路由(Router) 是管理页面跳转和 URL 与视图映射关系的机制,核心作用是:根据不同的 URL 路径,展示对…

在 Windows 上安装设置 MongoDB及常见问题

介绍 MongoDB 是一个开源的 NoSQL 数据库系统,它以一种灵活的类似 JSON 的格式(称为 BSON(二进制 JSON))存储数据。它使用动态模式,这意味着与关系型数据库不同,MongoDB 不需要在向数据库添加数…

Effective C++ 条款01:视 C++ 为一个语言联邦

Effective C 条款01:视 C 为一个语言联邦核心思想:C 是由多个子语言组成的联邦,每个子语言有自己的编程范式。理解这些子语言及其规则切换,是写出高效 C 代码的关键。 四个子语言及其规则: C 语言 基础:过程…

云效CI/CD教程(PHP项目)

参考文档 参考云效的官方文档https://help.aliyun.com/zh/yunxiao/ 一、新建代码库 这是第一步,和码云的差不多 二、配SSH密钥 这个和码云,github上类似,都需要,云效的SSH密钥证书不是采用 RSA算法,而是采用了ED2…

单片机是怎么控制的

单片机作为电子系统的控制核心,通过接收外部信号、执行预设程序、驱动外部设备的方式实现控制功能,其控制过程涉及信号输入、数据处理和指令输出三个关键环节,每个环节的协同配合决定了整体控制效果。 信号输入:获取外部信息 单片…

deepseek本地部署,轻松实现编程自由

小伙伴们,大家好,今天我们来实现deepseek本地部署,轻松实现编程自由!安装ollama 安装ollama 首先我们安装ollama 打开ollama官网,下载安装符合自己系统的版本。 找到要安装的模型deepseek-r1开始-运行 输入cmd出现…

基础NLP | 常用工具

编辑器 PycharmVSCodeSpyderPython 自带 ideVim 机器学习相关python框架 Pytorch 学术界宠儿,调试方便,目前的主流Tensorflow 大名鼎鼎,工程配套完善Keras 高级封装,简单好用,现已和Tensorflow合体Gensim 训练词向…

Unity3D + VR头显 × RTSP|RTMP播放器:构建沉浸式远程诊疗系统的技术实践

一、背景:远程医疗迈入“沉浸式协同”的新阶段 过去,远程医疗主要依赖视频会议系统,实现基础的远程问诊、会诊或术中指导。虽然初步解决了地域限制问题,但其单视角、平面化、缺乏沉浸感与交互性的特征,已无法满足临床…

海云安斩获“智能金融创新应用“标杆案例 彰显AI安全左移技术创新实力

近日,由中国人民银行广东省分行、广东省金融管理局、广东省政务服务和数据管理局指导,广东省金融科技协会主办的“智能金融 创新应用”优秀案例名单最终揭晓,海云安开发者安全助手系统项目凭借其创新的"AI安全左移"技术架构&#x…

Fluent许可与网络安全策略

在流体动力学模拟领域,Fluent软件因其卓越的性能和广泛的应用而备受用户青睐。然而,随着网络安全威胁的不断增加,确保Fluent许可的安全性和合规性变得尤为重要。本文将探讨Fluent许可与网络安全策略的关系,为您提供一套有效的安全…

如何借助AI工具?打赢通信设备制造的高风险之战?(案例分享)

你是否曾在项目管理中遇到过那种让人心跳加速的瞬间,当一项风险突然暴露出来时,全队似乎都屏住了呼吸?今天,我就来分享一个我亲历的项目案例,讲述我们如何借助具体的AI工具,实现从数据到决策的华丽转变&…

Web服务器(Tomcat、项目部署)

1. 简介 1.1 什么是Web服务器 Web服务器是一个应用程序(软件),对HTTP协议的操作进行封装,使得程序员不必直接对协议进行操作,让Web开发更加便捷。主要功能是"提供网上信息浏览服务"。 Web服务器是安装在服…

list 介绍 及 底层

list的相关文档:list - C Reference 一、list的介绍及使用 list中的接口比较多,此处类似,只需要掌握如何正确的使用,然后再去深入研究背后的原理,已达到可扩展的能力。以下为list中一些常见的重要接口。我们库里的list…

HCIP MGRE实验

一、实验要求 1、R5为ISP,只能进行IP地址配置,其所有地址均配为公有Ip地址; 2、 R1和R5间使用PPP的PAP认证,R5为主认证方; R2与R5之间使用PPP的CHAP认证,R5为主认证方; R3与R5之间使用HDLC封装; 3、R2、R3构建一…

基于PyTorch的多视角二维流场切片三维流场预测模型

基于PyTorch的多视角二维流场切片三维流场预测模型 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家,觉得好请收藏。点击跳转到网站。 1. 引言 计算流体动力学(CFD)在工程设计和科学研究中扮演…

全新轻量化PHP网盘搜索引擎系统源码

内容目录一、详细介绍二、效果展示1.部分代码2.效果图展示三、学习资料下载一、详细介绍 全新轻量化PHP网盘搜索引擎系统源码 基于PHPMYSQL开发 一、多样筛选功能:网站支持5类筛选功能,包括默认搜索、网盘类型、文件大小、时间排序以及网盘来源&#x…

C study notes[3]

文章目录operatonsloopsreferencesoperatons the fundamental operators such as ,-,* in C language can be simply manipulated. int sum 5 3; // sum 8 int difference 10 - 4; // difference 6 int product 6 * 7; // product 42the operator / was left to in…