需求:
1.根据用户编写的要报规则,去mysql库里SysManage_Rule表获取已经启用的规则作为条件(例如[{“field”: “关键词”, “logic”: “AND”, “value”: “阿尔法”, “operator”: “=”,, “assign_user”: “user222”}])条件即为:关键词=阿尔法
2.根据此条件去lxdb的all_report表进行查询,查询逻辑是每10min获取最新数据,满足条件的对all_report表的report_handler字段打上分配人名以及yb_importanceid填上要报规则的id
要报规则页面:
SysManage_Rule表
解决办法
规则分配脚本(supervisor运行):
import argparse
import os
import django
import time
from datetime import datetime, timedelta
import sys
import json# 添加项目路径,确保 Django 配置正确
sys.path.append('/home/rpadmin/web/yb-backend')# 初始化 Django 环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "yb.settings")
django.setup()# 导入自定义模块和参数
from DBreport.functions.get_table_mapping import get_gather_table
from wrapper.Params import TOREPORT, GATHER_TABLE
from DBconn.DBfunctions.dynamicQueryBase import DynamicQueryBaseEngine
from SysManage.models import SysmanageRule# 字段名映射:中文字段名 -> 英文字段名
field_mapping = {"报文要素": "text_content","语种": "audio_languagename","关键词": "keyword","文种": "text_lang","实体": "text_entity","来源手段": "hj_means"
}# 初始记录时间,第一次运行查很早之前的数据
last_run_time = datetime.now() - timedelta(minutes=1600000)def replace_field_names(data, mapping):new_data = {}for user, rules in data.items():new_rules = []for rule in rules:new_rule = rule.copy()if new_rule.get('field') in mapping:new_rule['field'] = mapping[new_rule['field']]new_rules.append(new_rule)new_data[user] = new_rulesreturn new_datadef run_rule_dispatch():global last_run_time# 获取数据库连接和数据表对象db_conn = get_gather_table()gather_table = db_conn.load_table(GATHER_TABLE)print("[调试] all_report 字段列表:", [col.name for col in gather_table.columns])# 查询所有启用状态的规则rules = SysmanageRule.objects.filter(rule_status=1)for rule in rules:rule_id = rule.idrule_name = rule.rule_namerule_content = rule.rule_content# 检查规则内容格式是否为列表if isinstance(rule_content, list):conditions = rule_contentelse:print(f"[跳过] 规则《{rule_name}》内容应为列表格式")continue# 替换条件中的字段名conditions = replace_field_names({rule_name: conditions}, field_mapping)[rule_name]filters = []keyword_value = None # 保存关键词值(如果存在)for cond in conditions:field = cond.get("field")operator = cond.get("operator")value = cond.get("value")if cond.get("assign_user"):assign_user = cond["assign_user"]if not field or not operator or value is None:continue# 如果字段为 keyword 且是等于操作,延迟处理if field == "keyword" and operator == "=":keyword_value = valueelse:filters.append((field, operator, value))# 增加时间过滤:只查上次运行之后新增的数据filters.append(('hj_createtime', '>=', last_run_time))# 关键词特殊处理:模糊匹配 audio_asr 或 trans_sidebyside 的 original_content/ translat_content 字段if keyword_value is not None:# audio_asr 模糊匹配filters_audio = filters + [('audio_asr', 'like', f"%{keyword_value}%")]# 执行原始查询(不带关键词)用于后续文本内容筛选raw_trans_records = db_conn.execute_dynamic_query(gather_table,filters,limit=5, # 仅预览前5条,必要时可调整或移除ignore_fields=[])matched_trans_data = []for row in raw_trans_records['data']:try:trans_data = row.get('trans_sidebyside')if isinstance(trans_data, str):trans_data = json.loads(trans_data) # JSON 反序列化if isinstance(trans_data, dict):original = trans_data.get("original_content", "")translated = trans_data.get("translat_content", "")if keyword_value in original or keyword_value in translated:matched_trans_data.append(row)except Exception:continue# 执行 audio_asr 匹配查询result1 = db_conn.execute_dynamic_query(gather_table, filters_audio, ignore_fields=[])# 合并两类结果(根据 lxid 去重)combined_data = {row['lxid']: row for row in result1['data']}for row in matched_trans_data:combined_data.setdefault(row['lxid'], row)total_count = len(combined_data)print(f"[调试] 条件命中记录数:{total_count}")if total_count:first_row = list(combined_data.values())[0]print("[调试] 命中示例记录:", first_row)# 构造更新内容updates = {'report_handler': assign_user,'yb_importanceid': str(rule_id),'yb_importancename': rule_name,'assign_rule_type': TOREPORT}# 遍历命中记录逐条更新affected = 0for row in combined_data.values():row_filter = [('lxid', '=', row['lxid'])]try:count = DynamicQueryBaseEngine.update_record(db_conn, gather_table, updates, row_filter)affected += countexcept Exception as e:print(f"[×] 更新失败:{str(e)}")print(f"[✓] 规则《{rule_name}》执行成功,分配人:{assign_user},更新{affected}条")continue# 正常流程处理:无关键词匹配逻辑print(f"[调试] 规则《{rule_name}》筛选条件:{filters}")result_data = db_conn.execute_dynamic_query(gather_table,filters,limit=5,ignore_fields=[])print(f"[调试] 条件命中记录数:{result_data['total_count']}")if result_data['data']:print("[调试] 命中示例记录:", result_data['data'][0])updates = {'report_handler': assign_user,'yb_importanceid': str(rule_id),'yb_importancename': rule_name,'assign_rule_type': TOREPORT}try:affected = DynamicQueryBaseEngine.update_record(db_conn, gather_table, updates, filters)print(f"[✓] 规则《{rule_name}》执行成功,分配人:{assign_user},更新{affected}条")except Exception as e:print(f"[×] 规则《{rule_name}》执行失败:{str(e)}")# 关闭连接,记录当前时间用于下轮过滤db_conn.close()last_run_time = datetime.now()if __name__ == "__main__":while True:print(f"\n[调试] 开始执行调度任务,当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")run_rule_dispatch()print(f"[调试] 任务执行完成,等待10分钟...\n")time.sleep(600) # 每10分钟运行一次
指定分配人接口(即向rule_content字段里添加assign_user):
@class_operation_logger(operation_name="/指定要报规则分配人")
class RuleAssignUserUpdateAPIView(APIView):"""接收 assign_user 和 规则id,指定规则的 rule_content 中每一项的 assign_userPOST /api/sys/update-assign-user/Body:{"id": 2,"assign_user": "user002"}"""def post(self, request, *args, **kwargs):rule_id = request.data.get("id")assign_user = request.data.get("assign_user")if not rule_id or not assign_user:return ErrorResponse(data=False, msg="参数id 和 assign_user 都是必填的")# 获取要更新的规则rule = get_object_or_404(SysmanageRule, pk=rule_id)try:content_list = rule.rule_content or []if not isinstance(content_list, list):raise ValueError("rule_content 必须是列表")except Exception as e:return ErrorResponse(data=False, msg=f"读取 rule_content 失败:{str(e)}")# 创建或更新每一项的 assign_userfor cond in content_list:cond['assign_user'] = assign_user# 保存回库rule.rule_content = content_listrule.save(update_fields=['rule_content'])return SuccessResponse(msg=f"id为 {rule_id} 的规则的分配人已更新为 {assign_user}")
Supervisor工具详见文章Supervisor进程管理