好久没写文章了,忙毕业设计ING,终于做好了发出来。
做了针对恶意URL的检测,改进了杨老师这篇参考文献的恶意请求检测的方法
[网络安全自学篇] 二十三.基于机器学习的恶意请求识别及安全领域中的机器学习-CSDN博客
选择使用了XGBoost算法进行恶意请求检测模型的训练,代替原先的逻辑回归算法,提升了检测恶意请求的准确率。
XGBoost算法介绍
XGBoost算法(eXtreme Gradient Boosting)是陈天奇等人设计开发的一种机器学习算法,其高效地实现了GBDT算法(Gradient Boosting Decision Tree),是对GBDT算法进行了算法优化和工程改进的增强版本。XGBoost算法被广泛应用在Kaggle竞赛及其他机器学习竞赛中,因其高效性和高精度成为主流工具。
数据集选择
模型训练的数据主要来源于foospidy作者收集的payloads数据集,其为Github上作者整理的公开的有正常请求和恶意攻击请求的数据集。
数据集中收集了常见的OWASP TOP 10类型的恶意请求,如SQL注入攻击、XSS攻击、文件上传攻击、未授权访问等。还有一些正常用户会访问的路径的请求,除此之外,还包括结合网络安全经验优化添加的一些正常与恶意请求。
实验数据包括:48,238条恶意请求,1,294,531条正常请求。
项目地址:https://github.com/foospidy/payloads
特征构造
本系统参考常用的N-grams与TF-IDF结合构造特征矩阵方法。在特征维度上也进行了优化,不同于传统的仅采用TF-IDF将文本转换为特征矩阵。
本系统采用的特征矩阵是TF-IDF(5000维)+自定义特征(36维)融合形成的混合特征矩阵,使其不仅包含文本特征,还包含了恶意请求相关的自定义特征。
模型训练
使用XGBoost算法训练恶意请求检测模型,训练代码如下
def train_xgboost(good_file, bad_file, model_filename, vectorizer_filename, target_ratio=1.0):print("正在训练 XGBoost 模型.............")# 确保 model_filename 和 vectorizer_filename 有有效的目录model_dir = os.path.dirname(os.path.abspath(model_filename))vectorizer_dir = os.path.dirname(os.path.abspath(vectorizer_filename))# 打印保存路径print(f"模型将保存到: {model_filename}")print(f"向量化器将保存到: {vectorizer_filename}")# 确保目录存在os.makedirs(model_dir, exist_ok=True)os.makedirs(vectorizer_dir, exist_ok=True)good_queries = get_query_list(good_file)bad_queries = get_query_list(bad_file)target_good_queries = min(int(len(bad_queries) * target_ratio), len(good_queries))good_queries = random.sample(good_queries, target_good_queries)queries = bad_queries + good_querieslabels = [1] * len(bad_queries) + [0] * len(good_queries)X, vectorizer = prepare_features(queries, training=True)X_train, X_test, y_train, y_test = train_test_split(X, labels, test_size=0.1, random_state=42)X_train_resampled, y_train_resampled = balance_data(X_train, y_train)model = xgb.XGBClassifier(n_estimators=100,max_depth=5,learning_rate=0.1,objective='binary:logistic',eval_metric='logloss')model.fit(X_train_resampled, y_train_resampled)# 计算评估指标y_pred = model.predict(X_test)accuracy = accuracy_score(y_test, y_pred) * 100recall = recall_score(y_test, y_pred) * 100f1 = f1_score(y_test, y_pred) * 100y_pred_proba = model.predict_proba(X_test)[:, 1] # 获取正类的预测概率auc_score = roc_auc_score(y_test, y_pred_proba) * 100 # 计算AUCprint(f"测试集AUC: {auc_score:.2f}%") # 新增输出print(f"训练集准确率: {model.score(X_train_resampled, y_train_resampled):.2f}")print(f"测试集准确率: {accuracy:.2f}%")print(f"recall: {recall:.2f}%")print(f"f1: {f1:.2f}%")print(classification_report(y_test, y_pred))# ------------------ 生成混淆矩阵与ROC曲线 ------------------plot_confusion_matrix(y_test, y_pred, filename="media/confusion_matrix.png")plot_roc_curve(y_test, y_pred_proba, auc_score, filename="media/roc_curve.png")# # 保存模型和向量器# with open(model_filename, 'wb') as model_file, open(vectorizer_filename, 'wb') as vectorizer_file:# pickle.dump(model, model_file)# pickle.dump(vectorizer, vectorizer_file)# 定义达标阈值(例如F1-score ≥ 98%)if f1 >= 98.0:# 保存模型with open(model_filename, 'wb') as model_file, open(vectorizer_filename, 'wb') as vectorizer_file:pickle.dump(model, model_file)pickle.dump(vectorizer, vectorizer_file)else:print("模型未达标,不保存")# 确认文件是否成功保存print(f"模型是否成功保存: {os.path.exists(model_filename)}")print(f"向量化器是否成功保存: {os.path.exists(vectorizer_filename)}")print(f"TF-IDF 特征维度: {X.shape[1]}") #TF-IDF 特征维度# 返回模型、向量器和评估指标return model, vectorizer, (accuracy, recall, f1)
训练结果为:
测试集AUC: 99.73% 训练集准确率: 0.99 测试集准确率: 98.48% recall: 98.37% f1分数: 98.86%
模型预测
用这个模型检测我部署在公网上的WordPress项目的日志文件,预测日志文件的恶意请求
日志部分内容:
预测结果
准确率还行。后续根据这个检测模型制作了实时监测日志文件的恶意请求功能系。
项目部分完整代码会发到Github。大家也可以试试:),或者是改进提升
完整项目地址:https://github.com/MOYANINK/AIRecon
完整代码
import os
import urllib
import pickle
import warnings
import re
import random
import numpy as np
import xgboost as xgb
from datetime import datetime
from collections import defaultdict
from imblearn.over_sampling import SMOTE
from scipy.sparse import hstack, csr_matrix
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import urllib.parse
import requests
import os
import requests
import matplotlib.pyplot as plt
import geopandas as gpd
from sklearn.metrics import accuracy_score, recall_score, f1_score
from sklearn.metrics import roc_auc_score # 添加在文件顶部
# 在代码开头添加
import matplotlib
matplotlib.use("Agg") # 无GUI环境下必加
import seaborn as sns
from sklearn.metrics import confusion_matrix, roc_curve
from re import search
from collections import defaultdict# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei'] # 使用黑体
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题# ANSI 颜色代码
RED = '\033[31m'
GREEN = '\033[32m'
RESET = '\033[0m'# 高风险路径
HIGH_RISK_PATHS = ['/env', '/cf_scripts', '/.env', '/config', '/admin', '/login', '/wp-admin', '/manager', '/backup','/localstart.aspx', '/+CSCOE+/logon.html', '/inicio.cgi', '/cgi-bin/info.cgi', '/admin.php', '/admin.jsp','/magento_version', '/xml/info.xml', '/main.shtml', '/home.html', '/geoserver/web/', '/form.html', '/upl.php','/boaform/admin/formLogin', '/manage/account/login', '/cgi-bin/login.cgi', '/login.jsp','/password.php'
]
# 定义 URL 白名单
ALLOWED_URLS = ['/','/index.html','/home.html','/robots.txt'
]def is_high_risk_url(url):url = url.lower() # 统一转换为小写return any(risk_path in url for risk_path in HIGH_RISK_PATHS)CMS_PATHS = ['/magento_version', '/wp-admin/setup-config.php', '/joomla/configuration.php']# 加载模型和向量化器
def load_model_and_vectorizer(model_file, vectorizer_file):with open(model_file, 'rb') as mf, open(vectorizer_file, 'rb') as vf:model = pickle.load(mf)vectorizer = pickle.load(vf)return model, vectorizer# 解析日志文件,提取时间、IP、请求方法和 URL
def parse_log(log_file):log_entries = []log_pattern = re.compile(r'(?P<ip>\d+\.\d+\.\d+\.\d+) - - \[(?P<time>[^\]]+)\] "(?P<method>GET|POST|PUT|DELETE|HEAD|OPTIONS) (?P<url>[^ ]+) HTTP/\d\.\d"')# 检查文件是否存在if not os.path.exists(log_file):raise FileNotFoundError(f"日志文件未找到: {log_file}")print(f"日志文件路径: {os.path.abspath(log_file)}") # 调试日志路径try:with open(log_file, 'r', encoding='utf-8') as file:for line in file:match = log_pattern.search(line)if match:ip = match.group('ip')try:# 解析时间,处理时区信息timestamp = datetime.strptime(match.group('time'), '%d/%b/%Y:%H:%M:%S %z')except ValueError:print(f"时间解析失败: {match.group('time')}") # 调试时间格式continueurl = match.group('url')method = match.group('method') # 提取请求方法log_entries.append((ip, timestamp, url, method)) # 添加请求方法else:print(f"解析失败: {line.strip()}") # 调试日志格式except UnicodeDecodeError:print("utf-8 解析失败,尝试 ISO-8859-1 编码")with open(log_file, 'r', encoding='ISO-8859-1') as file:for line in file:match = log_pattern.search(line)if match:ip = match.group('ip')try:# 解析时间,处理时区信息timestamp = datetime.strptime(match.group('time'), '%d/%b/%Y:%H:%M:%S %z')except ValueError:print(f"时间解析失败: {match.group('time')}")continueurl = match.group('url')method = match.group('method') # 提取请求方法log_entries.append((ip, timestamp, url, method)) # 添加请求方法else:print(f"解析失败: {line.strip()}") # 调试日志格式if not log_entries:raise ValueError("Error: No log entries parsed from log file. 请检查日志格式!")return log_entries# 获取请求列表def plot_confusion_matrix(y_true, y_pred, filename="media/confusion_matrix.png"):# 确保目录存在os.makedirs(os.path.dirname(filename), exist_ok=True)# 生成混淆矩阵的代码...plt.savefig(filename, dpi=300, bbox_inches="tight")def plot_confusion_matrix(y_true, y_pred, filename="media/confusion_matrix.png"):os.makedirs(os.path.dirname(filename), exist_ok=True) # Create directory if missingcm = confusion_matrix(y_true, y_pred)plt.figure(figsize=(8, 6))sns.heatmap(cm,annot=True,fmt="d",cmap="Blues",xticklabels=["Normal", "Malicious"],yticklabels=["Normal", "Malicious"])plt.xlabel("Predicted Label")plt.ylabel("True Label")plt.title("Confusion Matrix")plt.savefig(filename, dpi=300, bbox_inches="tight")plt.close()print(f"Confusion matrix saved to {filename}")# ------------------ ROC Curve ------------------
def plot_roc_curve(y_true, y_pred_proba, auc_score, filename="media/roc_curve.png"):os.makedirs(os.path.dirname(filename), exist_ok=True)fpr, tpr, _ = roc_curve(y_true, y_pred_proba)plt.figure(figsize=(8, 6))plt.plot(fpr, tpr, color="darkorange", lw=2, label=f"ROC Curve (AUC = {auc_score:.2f}%)")plt.plot([0, 1], [0, 1], color="navy", lw=2, linestyle="--")plt.xlim([0.0, 1.0])plt.ylim([0.0, 1.05])plt.xlabel("False Positive Rate")plt.ylabel("True Positive Rate")plt.title("ROC Curve")plt.legend(loc="lower right")plt.savefig(filename, dpi=300, bbox_inches="tight")plt.close()print(f"ROC curve saved to {filename}")
# # 调用函数生成图表
# plot_confusion_matrix(y_test, y_pred)
# plot_roc_curve(y_test, y_pred_proba, auc_score)def get_query_list(filename):filepath = os.path.join(os.getcwd(), filename)with open(filepath, 'r', encoding='utf-8') as file:data = file.readlines()return list(set([urllib.parse.unquote(line.strip()) for line in data]))# URL 特征提取
def normalize_url(url):# 解码 URL 编码字符url = urllib.parse.unquote(url)# 去除空格和其他符号url = url.replace(' ', '').replace('+', '')return url.lower() # 统一转换为小写def extract_url_features(query):query = normalize_url(query)# 辅助函数:计算URL参数数量def count_params(url):if '?' not in url:return 0query_str = url.split('?', 1)[1]return len(query_str.split('&'))param_count = count_params(query)return [# 1. SQL注入特征int('union' in query),int('select' in query),int('--' in query),int('#' in query),int('>' in query),int('<' in query),int('or1=1' in query), # 注意:归一化后空格已移除int(';' in query),# 2. 命令注入特征int('&&' in query),int('|' in query or '||' in query),int('whoami' in query),int('nc' in query),int('curl' in query),int('eval' in query),int('`' in query),# 3. 路径遍历特征int('..' in query),int('/../' in query),int('%2f../' in query or '%2f..%2f' in query), # 双重编码检测# 4. 敏感目录泄露int('/wp-admin/' in query),int('/.git/' in query),int('token=' in query),int('password=' in query),int('debug=' in query),int('sleep(' in query), # SQL时间盲注int('exec(' in query), # 命令执行# 5. 配置与日志泄露int('/.env' in query),int('/phpmyadmin/' in query),int('/console/' in query),int('config.' in query), # 匹配config.*int('session=' in query),# 6. 统计特征int(param_count > 5), # 参数数量>5int('.log' in query),int('log=' in query),len(query), # URL总长度int(query.count('/') > 6), # 路径深度>6int('${' in query) # 模板注入]
# 生成 n-grams 特征
def get_ngrams(query):query = query.lower() # 统一转换为小写ngrams = [query[i:i + 3] for i in range(len(query) - 3)]# 关键字检测dangerous_keywords = ['admin', 'login', 'config', '.env', 'wp-admin', 'manager', 'database', 'phpinfo', 'password', 'wp-config']for keyword in dangerous_keywords:if keyword in query:ngrams.append(f"keyword:{keyword}")# SQL 注入模式sql_patterns = ['sql', 'union', '--', '%20', 'cmd', 'passwd']for pattern in sql_patterns:if pattern in query:ngrams.append(f"sql_pattern:{pattern}")return ngrams# 组合 TF-IDF + 手工特征
def prepare_features(queries, vectorizer=None, training=True):if not queries: # 避免空列表print("Error: Empty query list received in prepare_features().")return csr_matrix((0, vectorizer.max_features)), vectorizer # 返回空稀疏矩阵if training:vectorizer = TfidfVectorizer(tokenizer=get_ngrams, max_features=5000, max_df=0.5, min_df=5)X_tfidf = vectorizer.fit_transform(queries)else:X_tfidf = vectorizer.transform(queries) # 只使用 transform,避免特征维度变化additional_features = [extract_url_features(query) for query in queries]X_additional = csr_matrix(np.array(additional_features)) # 转换为稀疏矩阵X_combined = hstack([X_tfidf, X_additional]) # **使用稀疏矩阵,避免内存溢出**return X_combined, vectorizer# 解决类别不均衡(SMOTE 过采样)
def balance_data(X, y):smote = SMOTE(sampling_strategy=1.0, random_state=42)return smote.fit_resample(X, y)def train_xgboost(good_file, bad_file, model_filename, vectorizer_filename, target_ratio=1.0):print("正在训练 XGBoost 模型.............")# 确保 model_filename 和 vectorizer_filename 有有效的目录model_dir = os.path.dirname(os.path.abspath(model_filename))vectorizer_dir = os.path.dirname(os.path.abspath(vectorizer_filename))# 打印保存路径print(f"模型将保存到: {model_filename}")print(f"向量化器将保存到: {vectorizer_filename}")# 确保目录存在os.makedirs(model_dir, exist_ok=True)os.makedirs(vectorizer_dir, exist_ok=True)good_queries = get_query_list(good_file)bad_queries = get_query_list(bad_file)target_good_queries = min(int(len(bad_queries) * target_ratio), len(good_queries))good_queries = random.sample(good_queries, target_good_queries)queries = bad_queries + good_querieslabels = [1] * len(bad_queries) + [0] * len(good_queries)X, vectorizer = prepare_features(queries, training=True)X_train, X_test, y_train, y_test = train_test_split(X, labels, test_size=0.1, random_state=42)X_train_resampled, y_train_resampled = balance_data(X_train, y_train)model = xgb.XGBClassifier(n_estimators=100,max_depth=5,learning_rate=0.1,objective='binary:logistic',eval_metric='logloss')model.fit(X_train_resampled, y_train_resampled)# 计算评估指标y_pred = model.predict(X_test)accuracy = accuracy_score(y_test, y_pred) * 100recall = recall_score(y_test, y_pred) * 100f1 = f1_score(y_test, y_pred) * 100y_pred_proba = model.predict_proba(X_test)[:, 1] # 获取正类的预测概率auc_score = roc_auc_score(y_test, y_pred_proba) * 100 # 计算AUCprint(f"测试集AUC: {auc_score:.2f}%") # 新增输出print(f"训练集准确率: {model.score(X_train_resampled, y_train_resampled):.2f}")print(f"测试集准确率: {accuracy:.2f}%")print(f"recall: {recall:.2f}%")print(f"f1: {f1:.2f}%")print(classification_report(y_test, y_pred))# ------------------ 生成混淆矩阵与ROC曲线 ------------------plot_confusion_matrix(y_test, y_pred, filename="media/confusion_matrix.png")plot_roc_curve(y_test, y_pred_proba, auc_score, filename="media/roc_curve.png")# # 保存模型和向量器# with open(model_filename, 'wb') as model_file, open(vectorizer_filename, 'wb') as vectorizer_file:# pickle.dump(model, model_file)# pickle.dump(vectorizer, vectorizer_file)# 定义达标阈值(例如F1-score ≥ 98%)if f1 >= 98.0:# 保存模型with open(model_filename, 'wb') as model_file, open(vectorizer_filename, 'wb') as vectorizer_file:pickle.dump(model, model_file)pickle.dump(vectorizer, vectorizer_file)else:print("模型未达标,不保存")# 确认文件是否成功保存print(f"模型是否成功保存: {os.path.exists(model_filename)}")print(f"向量化器是否成功保存: {os.path.exists(vectorizer_filename)}")print(f"TF-IDF 特征维度: {X.shape[1]}") #TF-IDF 特征维度# 返回模型、向量器和评估指标return model, vectorizer, (accuracy, recall, f1)def detect_malicious_requests(log_entries, model_filename, vectorizer_filename,user_whitelist=None, user_blacklist=None,threshold=0.5):""":param threshold: 恶意概率阈值 (0.0~1.0),默认 0.5(50%)"""# 参数校验if not (0 <= threshold <= 1):raise ValueError("阈值必须在 0.0 到 1.0 之间")results = []malicious_ips = defaultdict(int)# 处理黑白名单(空值保护)user_whitelist = user_whitelist or []user_blacklist = user_blacklist or []combined_whitelist = set(ALLOWED_URLS) | set(user_whitelist)combined_blacklist = set(HIGH_RISK_PATHS) | set(user_blacklist)# 预处理URLurls = [entry[2].lower() for entry in log_entries]# 加载模型model, vectorizer = load_model_and_vectorizer(model_filename, vectorizer_filename)# 特征提取与预测X = prepare_features(urls, vectorizer=vectorizer, training=False)[0]probabilities = model.predict_proba(X)[:, 1] # 0~1 概率值for entry, prob in zip(log_entries, probabilities):ip, timestamp, url, method = entryurl_lower = url.lower()# 1. 白名单检查(精确匹配)trigger = Noneif url_lower in combined_whitelist:label = 0trigger = 'whitelist'else:# 2. 黑名单检查(正则严格匹配)is_black = any(search(rf"\b{re.escape(pattern)}\b", url_lower)for pattern in combined_blacklist)if is_black:# 仅当概率 ≥ 阈值时标记为恶意if prob >= threshold:label = 1trigger = 'blacklist'else:label = 0 # 概率不足视为正常else:# 3. 模型预测label = 1 if prob >= threshold else 0trigger = 'model' if label else None# 统计恶意IPif label == 1:malicious_ips[ip] += 1# print(# f"URL: {url}, "# f"Probability: {prob:.4f}, "# f"Threshold: {threshold}, "# f"Label: {label}, "# f"Trigger: {trigger}"# )results.append({'ip': ip,'time': timestamp,'url': url,'method': method,'probability': prob,'malicious': bool(label),'trigger': trigger # 记录判定依据})return results, malicious_ips
# def detect_malicious_requests(log_entries, model_filename, vectorizer_filename, threshold=0.5):
# results = []
# malicious_ips = defaultdict(int) # 用于统计恶意 IP 的出现次数
#
# # 从 log_entries 中提取 URL 并统一转换为小写
# urls = [entry[2].lower() for entry in log_entries]
#
# # 加载模型和向量化器
# model, vectorizer = load_model_and_vectorizer(model_filename, vectorizer_filename)
#
# if not urls:
# print("Warning: No URLs found in log entries.")
# return [], {}
#
# # 准备特征并进行预测
# X = prepare_features(urls, vectorizer=vectorizer, training=False)[0]
# probabilities = model.predict_proba(X)[:, 1]
#
# # 调试输出 TF-IDF 维度和模型期望的维度
# print(f"[DEBUG] TF-IDF 维度: {X.shape[1]},模型期望维度: {model.n_features_in_}")
#
# for entry, prob in zip(log_entries, probabilities):
# ip, timestamp, url, method = entry # 需要保证 entry 包含 method
#
# # 根据概率判断是否为恶意请求
# label = 1 if prob >= threshold else 0
#
# # 如果 URL 在白名单中,则跳过该请求
# if url in ALLOWED_URLS:
# label = 0 # 不认为这是恶意请求
#
# if is_high_risk_url(url): # 使用统一的小写检测
# label = 1
#
# if label == 1: # 如果是恶意请求,记录 IP
# malicious_ips[ip] += 1
#
# # 将结果添加到返回的列表
# results.append({
# 'ip': ip,
# 'time': timestamp,
# 'url': url,
# 'method': method, # 确保 method 存在
# 'probability': prob,
# 'malicious': bool(label)
# })
#
# return results, malicious_ips# 打印带颜色的检测结果
def print_results(results, malicious_ips):# 输出恶意请求for result in results:if result['url'].endswith(('.css', '.js', '.ico')): # 过滤静态资源请求continuestatus = "恶意请求" if result['malicious'] else "正常请求"color = RED if result['malicious'] else GREENprint(f"{color}时间: {result['time']}, IP: {result['ip']}, URL: {result['url']} -> {status}{RESET}")# 输出恶意 IP 统计print("\n恶意 IP 统计:")for ip, count in sorted(malicious_ips.items(), key=lambda x: x[1], reverse=True):if count < 10:continueprint(f"IP: {ip}, 恶意请求次数: {count}")# 生成恶意 IP 报告#generate_report(malicious_ips)生成恶意 IP 报告#generate_attack_map(malicious_ips)# 加载模型并进行 URL 预测
def detect_url(url, model_filename="../data/xgboost_model.pkl", vectorizer_filename="../data/tfidf_vectorizer.pkl",threshold=0.5):model, vectorizer = load_model_and_vectorizer(model_filename, vectorizer_filename)features = vectorizer.transform([url])probability = model.predict_proba(features)[:, 1][0]return "恶意请求" if probability >= threshold else "正常请求"# 运行程序
if __name__ == '__main__':good_file = 'goodqueries2.txt'bad_file = 'badqueries.txt'model_filename = 'xgboost_model.pkl' #5036vectorizer_filename = 'tfidf_vectorizer.pkl' #5036target_ratio = 0.5flag = 0if flag == 0:log_entries = parse_log('data.txt')if not log_entries:print("Error: No log entries parsed from log file.")exit()results, malicious_ips = detect_malicious_requests(log_entries, model_filename, vectorizer_filename)print_results(results, malicious_ips)else:model, vectorizer, metrics = train_xgboost(good_file, bad_file, model_filename, vectorizer_filename,target_ratio)accuracy, recall, f1 = metrics # 解包评估指标log_entries = parse_log('data.txt')if not log_entries:print("Error: No log entries parsed from log file.")exit()results, malicious_ips = detect_malicious_requests(log_entries, model_filename, vectorizer_filename)print_results(results, malicious_ips)
参考文献
[网络安全自学篇] 二十三.基于机器学习的恶意请求识别及安全领域中的机器学习-CSDN博客