✅ 功能简介
该监控系统具备如下主要功能:
📁 目录监控 实时监听指定主目录及其所有子目录内文件的变动情况。
🔒 文件哈希校验 对文件内容生成 SHA256 哈希,确保变更检测基于内容而非时间戳。
🚫 排除机制 支持设置需排除的子目录或特定文件,避免频繁改动目录带来的干扰。
🧾 日志记录 所有非排除目录中的新增或被篡改的文件将自动写入 update.txt 日志文件中。
🧠 基线建立 系统首次运行时自动建立哈希基线作为“可信状态”,便于后续变更比对
python代码
import os
import hashlib
import json
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler# === 配置路径(可改为环境变量支持更灵活)===
MONITOR_FOLDER = "D:\\WorkRoot\\xxxx\\fms"
HASH_DB_FILE = "D:\\hash_baseline.json"
LOG_FILE = "D:\\update.txt"
#D:\WorkRoot\xxxx\fms\UploadFiles
# ❗ 排除监控的相对路径(相对于 MONITOR_FOLDER)
EXCLUDE_PATHS = ["UploadFiles","areweb\\database"
]# === 工具函数 ===
def is_excluded(path):rel_path = os.path.relpath(path, MONITOR_FOLDER)for ex in EXCLUDE_PATHS:if rel_path == ex or rel_path.startswith(ex + os.sep):return Truereturn Falsedef calculate_hash(file_path):hasher = hashlib.sha256()try:with open(file_path, 'rb') as f:while chunk := f.read(8192):hasher.update(chunk)return hasher.hexdigest()except Exception:return Nonedef log_change(message):timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())with open(LOG_FILE, 'a', encoding='utf-8') as f:f.write(f"[{timestamp}] {message}\n")def build_baseline():hash_dict = {}for root, _, files in os.walk(MONITOR_FOLDER):for file in files:full_path = os.path.join(root, file)if is_excluded(full_path):continuerel_path = os.path.relpath(full_path, MONITOR_FOLDER)file_hash = calculate_hash(full_path)if file_hash:hash_dict[rel_path] = file_hashwith open(HASH_DB_FILE, 'w') as f:json.dump(hash_dict, f, indent=4)print("✅ 哈希基线已建立")def load_baseline():if not os.path.exists(HASH_DB_FILE):return {}with open(HASH_DB_FILE, 'r') as f:return json.load(f)# === 文件监控事件处理 ===
class FileChangeHandler(FileSystemEventHandler):def __init__(self, baseline):self.baseline = baselinedef on_modified(self, event):if event.is_directory or is_excluded(event.src_path):returnrel_path = os.path.relpath(event.src_path, MONITOR_FOLDER)current_hash = calculate_hash(event.src_path)old_hash = self.baseline.get(rel_path)if old_hash and current_hash and current_hash != old_hash:log_change(f"🚨 文件被篡改:{rel_path}")def on_created(self, event):if event.is_directory or is_excluded(event.src_path):returnrel_path = os.path.relpath(event.src_path, MONITOR_FOLDER)log_change(f"🆕 新增文件:{rel_path}")def on_deleted(self, event):if event.is_directory or is_excluded(event.src_path):returnrel_path = os.path.relpath(event.src_path, MONITOR_FOLDER)if rel_path in self.baseline:log_change(f"❌ 文件被删除:{rel_path}")# === 启动监控器 ===
def start_monitor():if not os.path.exists(HASH_DB_FILE):build_baseline()baseline = load_baseline()print(f"🚀 正在监控文件夹:{MONITOR_FOLDER}")print(f"🚫 排除路径:{EXCLUDE_PATHS}")print(f"📄 日志写入文件:{LOG_FILE}")event_handler = FileChangeHandler(baseline)observer = Observer()observer.schedule(event_handler, MONITOR_FOLDER, recursive=True)observer.start()try:while True:time.sleep(1)except KeyboardInterrupt:observer.stop()observer.join()# === 程序入口 ===
if __name__ == "__main__":start_monitor()
打包exe程序 --icon=logo.ico是logo标识,可删除
pyinstaller --noconsole --onefile --icon=logo.ico 篡改脚本.py
优化:
可配置路径config.json读取路径地址
{"monitor_folder": "D:\\WX\\fms_bak","hash_db_file": "D:\\hash_baseline.json","log_file": "D:\\update.txt","exclude_paths": ["areweb"]
}
python代码
import os
import hashlib
import json
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler# === 加载配置 ===
CONFIG_FILE = "C:\\config.json"def load_config():with open(CONFIG_FILE, 'r', encoding='utf-8') as f:return json.load(f)config = load_config()
MONITOR_FOLDER = config['monitor_folder']
HASH_DB_FILE = config['hash_db_file']
LOG_FILE = config['log_file']
EXCLUDE_PATHS = config['exclude_paths']# === 工具函数 ===
def is_excluded(path):rel_path = os.path.relpath(path, MONITOR_FOLDER)for ex in EXCLUDE_PATHS:if rel_path == ex or rel_path.startswith(ex + os.sep):return Truereturn Falsedef calculate_hash(file_path):hasher = hashlib.sha256()try:with open(file_path, 'rb') as f:while chunk := f.read(8192):hasher.update(chunk)return hasher.hexdigest()except Exception:return Nonedef log_change(message):timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())with open(LOG_FILE, 'a', encoding='utf-8') as f:f.write(f"[{timestamp}] {message}\n")def build_baseline():hash_dict = {}for root, _, files in os.walk(MONITOR_FOLDER):for file in files:full_path = os.path.join(root, file)if is_excluded(full_path):continuerel_path = os.path.relpath(full_path, MONITOR_FOLDER)file_hash = calculate_hash(full_path)if file_hash:hash_dict[rel_path] = file_hashwith open(HASH_DB_FILE, 'w') as f:json.dump(hash_dict, f, indent=4)print("✅ 哈希基线已建立")def load_baseline():if not os.path.exists(HASH_DB_FILE):return {}with open(HASH_DB_FILE, 'r') as f:return json.load(f)# === 文件监控处理器 ===
class FileChangeHandler(FileSystemEventHandler):def __init__(self, baseline):self.baseline = baselinedef on_modified(self, event):if event.is_directory or is_excluded(event.src_path):returnrel_path = os.path.relpath(event.src_path, MONITOR_FOLDER)current_hash = calculate_hash(event.src_path)old_hash = self.baseline.get(rel_path)if old_hash and current_hash and current_hash != old_hash:log_change(f"🚨 文件被篡改:{rel_path}")def on_created(self, event):if event.is_directory or is_excluded(event.src_path):returnrel_path = os.path.relpath(event.src_path, MONITOR_FOLDER)log_change(f"🆕 新增文件:{rel_path}")def on_deleted(self, event):if event.is_directory or is_excluded(event.src_path):returnrel_path = os.path.relpath(event.src_path, MONITOR_FOLDER)log_change(f"❌ 文件被删除:{rel_path}")# === 启动监控器 ===
def start_monitor():if not os.path.exists(HASH_DB_FILE):build_baseline()baseline = load_baseline()print(f"🚀 正在监控文件夹:{MONITOR_FOLDER}")print(f"🚫 排除路径:{EXCLUDE_PATHS}")print(f"📄 日志写入文件:{LOG_FILE}")event_handler = FileChangeHandler(baseline)observer = Observer()observer.schedule(event_handler, MONITOR_FOLDER, recursive=True)observer.start()try:while True:time.sleep(1)except KeyboardInterrupt:observer.stop()observer.join()# === 程序入口 ===
if __name__ == "__main__":start_monitor()