python缓存装饰器实现方案

写python的时候突然想着能不能用注解于是就写了个这个


文章目录

  • 原始版
  • 改进点

原始版

import os
import pickle
import hashlib
import inspect
import functoolsdef _generate_cache_filename(func, *args, **kwargs):"""生成缓存文件名的内部函数"""# 获取调用来源文件的绝对路径caller_frame = inspect.stack()[2]  # 注意调整为2,跳过当前函数和调用者caller_file = os.path.abspath(caller_frame.filename)# 生成调用文件路径的短哈希file_hash = hashlib.md5(caller_file.encode()).hexdigest()[:8]# 生成参数签名args_repr = "_".join([repr(arg) for arg in args])kwargs_repr = "_".join([f"{k}={repr(v)}" for k, v in kwargs.items()])# 处理无参数情况param_repr = f"{args_repr}_{kwargs_repr}" if (args or kwargs) else "no_params"# 组合最终缓存文件名return f"{func.__name__}_{param_repr}_{file_hash}.pkl"def cache(func):@functools.wraps(func)def wrapper(*args, **kwargs):# 使用共享函数生成缓存文件名cache_file = _generate_cache_filename(func, *args, **kwargs)# 缓存逻辑if os.path.exists(cache_file):with open(cache_file, 'rb') as f:return pickle.load(f)result = func(*args, **kwargs)with open(cache_file, 'wb') as f:pickle.dump(result, f)print(f'缓存已保存:{cache_file}')return resultreturn wrapperdef clear_cache(func, *args, **kwargs):"""手动清除缓存文件"""# 使用共享函数生成缓存文件名cache_file = _generate_cache_filename(func, *args, **kwargs)# 删除缓存文件if os.path.exists(cache_file):os.remove(cache_file)print(f"缓存已删除: {cache_file}")else:print(f"缓存文件不存在: {cache_file}")# 测试用例
@cache
def get_data(a, b):print("计算数据")return a + bif __name__ == "__main__":# 第一次调用(创建缓存)print(get_data(1, 2))  # 输出: 计算数据 和 3# 第二次调用(读取缓存)print(get_data(1, 2))  # 无"计算数据"输出# 清除缓存clear_cache(get_data, 1, 2)  # 成功删除# 再次调用(重新计算)print(get_data(1, 2))  # 再次输出"计算数据"

1._generate_cache_filename用于生成缓存文件名字,inspect.stack()[2]获取调用栈中的当前使用的文件名字,提取调用文件的绝对路径并转换为8位MD5哈希值。

*args和**kwargs分别转换为字符串表示,用于区分不同参数的同名函数,当函数无参数时,使用"no_params"。
【这里需要todo一下:传入的参数判断是否能做为合法的文件名字】

最终生成"函数名_参数签名_调用文件哈希.pkl"。
【todo:最终的文件名称不能超过系统保存的最大长度】

需要确保_generate_cache_filename函数能生成唯一且合法的文件名

2.def cache(func)
简单的缓存装饰器,将函数的计算结果持久化到文件中
使用装饰器模式,外层函数接受被装饰函数作为参数

functools.wraps保留原函数的元信息
内层wrapper函数处理实际调用逻辑
通过_generate_cache_filename函数生成唯一的缓存文件名

检查缓存文件是否存在,存在则直接读取并返回缓存结果

否则调用原始函数获取计算结果,使用pickle模块序列化结果到文件,打印缓存保存信息,返回计算结果

注意:
被缓存函数的返回值必须可被pickle序列化
在多进程环境中使用时需注意文件锁问题
缓存文件需要定期清理以避免存储空间占用

需要todo改进:
添加缓存过期机制
支持自定义序列化方法 todo
添加缓存命中率统计
支持分布式缓存存储 todo

3.def clear_cache(func, *args, **kwargs)
用于手动清除特定函数的缓存文件。
检查缓存文件是否存在,若存在则删除并打印确认信息;若不存在则提示文件不存在的状态。

文件删除操作不可逆,需谨慎调用。

改进点

1、合法文件名处理:

使用正则表达式移除非法字符:re.sub(r’[<>:"/\|?*\x00-\x1F]', ‘_’, name)

处理特殊字符和不可打印字符

2、文件名长度截断:

限制文件名最大长度(255字符)

对长文件名进行智能截断(保留首尾部分)

3、缓存过期机制:

添加expire_seconds参数控制缓存有效期

基于文件修改时间检查过期状态

默认过期时间为24小时

4、日志系统:

使用Python标准logging模块

不同级别的日志(DEBUG、INFO、WARNING、ERROR)

格式化的日志输出

5、异常处理:

捕获并记录文件操作中的所有异常

提供有意义的错误信息

缓存失败时不影响主程序运行

6、自定义缓存目录:

可配置的缓存目录参数

自动创建不存在的目录

默认目录为./.cache

7、缓存统计:

跟踪命中、未命中和过期次数

计算命中率

线程安全的统计计数器

按函数名查看统计信息

7、多线程/进程安全:

使用filelock库实现跨进程文件锁

为每个缓存文件创建对应的锁文件

设置锁超时时间(10秒)

8、增强的缓存清除:

清除特定参数的缓存

清除函数的所有缓存

批量删除操作

9、附加功能:

添加了清理缓存的方法(clear_cache和clear_all_cache)

统计信息查看函数(get_cache_stats和print_cache_stats)

智能缓存路径管理

import os
import pickle
import hashlib
import inspect
import functools
import time
import re
import logging
import threading
from collections import defaultdict
from filelock import FileLock# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger('cache_decorator')# 缓存统计
cache_stats = defaultdict(lambda: {'hits': 0, 'misses': 0, 'expired': 0, 'deleted': 0})
stats_lock = threading.Lock()# 默认配置
DEFAULT_CACHE_DIR = os.path.join(os.getcwd(), '.cache')
DEFAULT_EXPIRE_SECONDS = 24 * 60 * 60  # 默认过期时间: 24小时
MAX_FILENAME_LENGTH = 200  # 最大文件名长度def _sanitize_filename(name):"""移除文件名中的非法字符并截断长度"""# 替换非法字符sanitized = re.sub(r'[<>:"/\\|?*\x00-\x1F]', '_', name)# 截断文件名if len(sanitized) > MAX_FILENAME_LENGTH:prefix = sanitized[:MAX_FILENAME_LENGTH // 2]suffix = sanitized[-MAX_FILENAME_LENGTH // 2:]sanitized = prefix + '...' + suffix# 确保最终长度不超过限制sanitized = sanitized[:MAX_FILENAME_LENGTH]return sanitizeddef _generate_cache_filename(func, *args, **kwargs):"""生成缓存文件名的内部函数"""# 获取调用来源文件的绝对路径caller_frame = inspect.stack()[2]  # 调整堆栈深度caller_file = os.path.abspath(caller_frame.filename)# 生成调用文件路径的短哈希file_hash = hashlib.md5(caller_file.encode()).hexdigest()[:8]# 生成参数签名args_repr = "_".join([repr(arg) for arg in args])kwargs_repr = "_".join([f"{k}={repr(v)}" for k, v in sorted(kwargs.items())])# 处理无参数情况param_repr = f"{args_repr}_{kwargs_repr}" if (args or kwargs) else "no_params"# 组合并清理文件名raw_filename = f"{func.__name__}_{param_repr}_{file_hash}"return _sanitize_filename(raw_filename) + ".pkl"def _get_cache_file_path(cache_dir, cache_file):"""获取缓存文件完整路径,确保目录存在"""# 创建缓存目录(如果不存在)os.makedirs(cache_dir, exist_ok=True)return os.path.join(cache_dir, cache_file)def cache(expire_seconds=DEFAULT_EXPIRE_SECONDS, cache_dir=DEFAULT_CACHE_DIR):"""带参数的缓存装饰器Args:expire_seconds (int): 缓存过期时间(秒)cache_dir (str): 缓存文件存储目录"""def decorator(func):@functools.wraps(func)def wrapper(*args, **kwargs):# 生成缓存文件名cache_file = _generate_cache_filename(func, *args, **kwargs)cache_path = _get_cache_file_path(cache_dir, cache_file)lock_path = cache_path + ".lock"# 使用文件锁确保线程/进程安全with FileLock(lock_path, timeout=10):# 检查缓存是否存在且未过期if os.path.exists(cache_path):file_age = time.time() - os.path.getmtime(cache_path)if expire_seconds is None or file_age < expire_seconds:# 缓存命中try:with open(cache_path, 'rb') as f:result = pickle.load(f)with stats_lock:cache_stats[func.__name__]['hits'] += 1logger.debug(f'缓存命中: {cache_path}')return resultexcept Exception as e:logger.warning(f'加载缓存失败: {e}')# 缓存过期with stats_lock:cache_stats[func.__name__]['expired'] += 1logger.debug(f'缓存已过期: {cache_path}')# 缓存未命中或过期,重新计算with stats_lock:cache_stats[func.__name__]['misses'] += 1result = func(*args, **kwargs)# 保存结果到缓存try:with open(cache_path, 'wb') as f:pickle.dump(result, f)logger.debug(f'缓存已保存: {cache_path}')except Exception as e:logger.error(f'保存缓存失败: {e}')return result# 为包装的函数添加清除缓存的方法def clear_cache(*args, **kwargs):"""清除特定参数的缓存"""cache_file = _generate_cache_filename(func, *args, **kwargs)cache_path = _get_cache_file_path(cache_dir, cache_file)if os.path.exists(cache_path):try:os.remove(cache_path)logger.info(f'缓存已删除: {cache_path}')with stats_lock:cache_stats[func.__name__]['deleted'] += 1return Trueexcept Exception as e:logger.error(f'删除缓存失败: {e}')return Falseelse:logger.warning(f'缓存文件不存在: {cache_path}')return Falsedef clear_all_cache():"""清除该函数的所有缓存"""pattern = re.compile(f"^{func.__name__}_.*\\.pkl$")cleared = 0total = 0for filename in os.listdir(cache_dir):if pattern.match(filename):total += 1file_path = os.path.join(cache_dir, filename)try:os.remove(file_path)cleared += 1with stats_lock:cache_stats[func.__name__]['deleted'] += 1except Exception as e:logger.error(f'删除缓存失败 {filename}: {e}')logger.info(f'已清除 {cleared}/{total} 个缓存文件')return cleareddef clear_expired_cache(expire_seconds=expire_seconds):"""清除该函数的所有过期缓存"""pattern = re.compile(f"^{func.__name__}_.*\\.pkl$")current_time = time.time()removed = 0total = 0for filename in os.listdir(cache_dir):if pattern.match(filename):total += 1file_path = os.path.join(cache_dir, filename)try:# 检查文件是否过期mtime = os.path.getmtime(file_path)if current_time - mtime > expire_seconds:os.remove(file_path)removed += 1with stats_lock:cache_stats[func.__name__]['deleted'] += 1logger.debug(f'已删除过期缓存: {filename}')except Exception as e:logger.error(f'处理缓存文件 {filename} 失败: {e}')logger.info(f'已删除 {removed}/{total} 个过期缓存文件')return removedwrapper.clear_cache = clear_cachewrapper.clear_all_cache = clear_all_cachewrapper.clear_expired_cache = clear_expired_cachereturn wrapperreturn decoratordef get_cache_stats(func_name=None):"""获取缓存统计信息Args:func_name (str): 函数名,None 表示所有函数Returns:dict: 缓存统计信息"""with stats_lock:if func_name:return cache_stats.get(func_name, {'hits': 0, 'misses': 0, 'expired': 0, 'deleted': 0})# 计算总命中率total_stats = {'hits': 0, 'misses': 0, 'expired': 0, 'deleted': 0}for stats in cache_stats.values():for k in total_stats:total_stats[k] += stats[k]# 添加命中率百分比total = total_stats['hits'] + total_stats['misses'] + total_stats['expired']if total > 0:total_stats['hit_rate'] = total_stats['hits'] / total * 100else:total_stats['hit_rate'] = 0.0return total_statsdef print_cache_stats(func_name=None):"""打印缓存统计信息"""stats = get_cache_stats(func_name)if func_name:print(f"\n缓存统计 - {func_name}:")else:print("\n全局缓存统计:")print(f"命中次数: {stats['hits']}")print(f"未命中次数: {stats['misses']}")print(f"过期次数: {stats['expired']}")print(f"删除次数: {stats['deleted']}")if 'hit_rate' in stats:print(f"命中率: {stats['hit_rate']:.2f}%")else:total = stats['hits'] + stats['misses'] + stats['expired']if total > 0:hit_rate = stats['hits'] / total * 100print(f"命中率: {hit_rate:.2f}%")def clear_all_expired_cache(cache_dir=DEFAULT_CACHE_DIR, expire_seconds=DEFAULT_EXPIRE_SECONDS):"""清除缓存目录中所有过期的缓存文件Args:cache_dir (str): 缓存目录expire_seconds (int): 过期时间(秒)"""current_time = time.time()removed = 0total = 0if not os.path.exists(cache_dir):logger.warning(f"缓存目录不存在: {cache_dir}")return 0for filename in os.listdir(cache_dir):if filename.endswith('.pkl'):total += 1file_path = os.path.join(cache_dir, filename)try:# 检查文件是否过期mtime = os.path.getmtime(file_path)if current_time - mtime > expire_seconds:os.remove(file_path)removed += 1with stats_lock:# 尝试找出对应的函数名func_name = filename.split('_')[0]if func_name in cache_stats:cache_stats[func_name]['deleted'] += 1logger.debug(f'已删除过期缓存: {filename}')except Exception as e:logger.error(f'处理缓存文件 {filename} 失败: {e}')logger.info(f'已删除 {removed}/{total} 个过期缓存文件')return removed# 测试用例
@cache(expire_seconds=2, cache_dir="./test_cache")
def get_data(a, b):print("计算数据")return a + bif __name__ == "__main__":# 确保测试缓存目录存在os.makedirs("./test_cache", exist_ok=True)# 第一次调用(创建缓存)print(get_data(1, 2))  # 输出: 计算数据 和 3# 第二次调用(读取缓存)print(get_data(1, 2))  # 无"计算数据"输出# 等待缓存过期time.sleep(3)# 第三次调用(缓存过期后重新计算)print(get_data(1, 2))  # 再次输出"计算数据"# 清除特定参数缓存get_data.clear_cache(1, 2)# 第四次调用(清除后重新计算)print(get_data(1, 2))  # 输出"计算数据"# 创建另一个缓存print(get_data(3, 4))# 等待缓存过期time.sleep(3)# 清除过期缓存(仅限get_data函数)get_data.clear_expired_cache()# 清除整个缓存目录中的过期缓存clear_all_expired_cache("./test_cache", expire_seconds=1)# 清除所有缓存get_data.clear_all_cache()# 打印缓存统计print_cache_stats()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/87717.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/87717.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用 java -jar xxxx.jar 运行 jar 包报错: no main manifest attribute

1、问题描述 在Linux服务器上本想运行一下自己写的一个JAR&#xff0c;但是报错了&#xff01; no main manifest attribute, in first-real-server-1.0-SNAPSHOT.jar 2、解决办法 在自己的Spring项目的启动类&#xff08;xxx.xxx.xxx.XXXXApplication&#xff09;所在的Mo…

信号与槽的总结

信号与槽的总结 QT中的信号与Linux的信号对比 1&#xff09;信号源 2&#xff09;信号的类型 3&#xff09;信号的处理方式 QT信号与Linux信号的深度对比分析 一、信号源对比 QT信号 用户定义信号 &#xff1a;由开发者通过 signals:关键字在QObject派生类中显式声明 cl…

Python Mitmproxy详解:从入门到实战

一、Mitmproxy简介 Mitmproxy是一款开源的交互式HTTPS代理工具&#xff0c;支持拦截、修改和重放HTTP/HTTPS流量。其核心优势在于&#xff1a; 多平台支持&#xff1a;兼容Windows、macOS、Linux三端工具&#xff1a;提供命令行(mitmproxy)、Web界面(mitmweb)、数据流处理(mi…

刷题笔记--串联所有单词的子串

题目&#xff1a;1、我的写法&#xff08;超时&#xff09;从题面自然想到先用回溯算法把words的全排列先算出来&#xff0c;然后遍历字符串s一次将符合条件的位置加入结果全排列计算所有可能字符串算法写法&#xff1a;这是一个模板用于所有全排列算法的情况&#xff0c;本质思…

操作系统【1】【硬件结构】【操作系统结构】

一、CPU如何执行程序&#xff1f; 提纲 图灵机工作方式冯诺依曼模型线路位宽CPU位宽程序执行基本过程执行具体过程 1. 图灵机工作方式 图灵机可以视作“一台带规则的自动草稿机” 图灵机基本组成&#xff1a; 纸带&#xff08;内存&#xff09;&#xff1a;连续格子组成&…

SQLite与MySQL:嵌入式与客户端-服务器数据库的权衡

SQLite与MySQL&#xff1a;嵌入式与客户端-服务器数据库的权衡 在开发应用程序时&#xff0c;数据库选择是一个至关重要的决策&#xff0c;它会影响应用的性能、可扩展性、部署难度和维护成本。SQLite和MySQL是两种广泛使用的关系型数据库管理系统&#xff0c;它们各自针对不同…

CppCon 2018 学习:Smart References

“强类型别名”&#xff08;strong typedefs&#xff09; 的动机和实现&#xff0c;配合一个简单例子说明&#xff1a; 动机&#xff08;Motivation&#xff09; 用 using filename_t string; 和 using url_t string; 来区分不同的字符串类型&#xff08;比如文件名和网址&…

高性能高准确度的CPU电压与温度监测软件HWInfo

&#x1f5a5;️ 一、软件概述 Windows版&#xff1a;图形化界面&#xff0c;支持实时监控&#xff08;温度、电压、风扇转速等&#xff09;、基准测试及报告生成&#xff0c;兼容Windows XP至Windows 11系统。Linux版&#xff1a;命令行工具&#xff0c;由openSUSE社区维护&a…

H3C WA6322 AP版本升级

1、查看当前版本&#xff1a;R2444P01 2、官网下载升级文件&#xff1a; WA6300系列版本说明H3C WA6300系列(适用于WA6330、 WA6322、WA6320H、WA6320、 WTU630H、WTU630、WA6330-LI、WA6320-C、WA6320-D、WA6320H-LI、WA6338、WA6322H、WTU632H-IOT、WAP922E、WAP923、WA6320…

用 YOLOv8 + DeepSORT 实现目标检测、追踪与速度估算

【导读】 目标检测与追踪技术是计算机视觉领域最热门的应用之一&#xff0c;广泛应用于自动驾驶、交通监控、安全防护等场景。今天我们将带你一步步实现一个完整的项目&#xff0c;使用YOLOv8 DeepSORT实现目标检测、追踪与速度估算。>>更多资讯可加入CV技术群获取了解…

Python实例题:基于 Python 的简单聊天机器人

Python实例题 题目 基于 Python 的简单聊天机器人 要求&#xff1a; 使用 Python 构建一个聊天机器人&#xff0c;支持以下功能&#xff1a; 基于规则的简单问答系统关键词匹配和意图识别上下文记忆功能支持多轮对话可扩展的知识库 使用tkinter构建图形用户界面。实现至少 …

相机:Camera原理讲解(使用OpenGL+QT开发三维CAD)

相机为三维场景提供了灵活便捷的视角变换和交互能力&#xff0c;通过相机操作可以实现全方位、各角度的场景浏览。 怎样在三维场景中引入相机&#xff0c;怎样处理和实现视角的放缩、移动、旋转&#xff1f;在视角旋转时以指定目标为中心又该怎样处理&#xff1f; 原文&#…

开源的虚拟电厂预测数据:资源、应用与挑战

引言 虚拟电厂(Virtual Power Plant, VPP)是一种通过聚合分布式能源资源(如太阳能、风能、储能系统、电动汽车和可控负荷)来优化电力系统运行的数字化能源管理平台。准确的预测数据是虚拟电厂高效运行的关键,而开源数据为研究者和企业提供了低成本、高透明度的解决方案。…

IDE全家桶专用快捷键----------个人独家分享!!

给大家分享一下我个人整理的快捷键&#xff0c;其中包含对电脑的操作&#xff0c;以及在编写代码时的操作&#x1f680;Window系列1 WindowsR 开启运行对话框--->输入cmd启动黑窗口​2 WindowsE 快速打开我的电脑 ​3 WindowsL 电脑锁屏 ​4 WindowsD 显示/恢复桌面 ​5 Win…

人工智能概念:RNN中的基础Encoder-Decoder框架

文章目录一、序列&#xff08;Seq2Seq&#xff09;转换的核心架构二、Encoder-Decoder框架基础原理2.1 整体工作流程2.2 编码器&#xff08;Encoder&#xff09;详解2.3 解码器&#xff08;Decoder&#xff09;工作机制与缺陷三、基础框架的核心缺陷分析&#xff08;以"欢…

R 列表:深入解析与高效应用

R 列表&#xff1a;深入解析与高效应用 引言 在R语言中&#xff0c;列表&#xff08;List&#xff09;是一种非常重要的数据结构&#xff0c;它允许我们将不同类型的数据组合在一起。列表在数据分析和统计建模中扮演着至关重要的角色。本文将深入探讨R列表的概念、创建方法、…

uniapp 国密sm2加密

1. uniapp 国密sm2加密 在uniapp中使用国密SM2算法进行加密解密&#xff0c;你可以通过安装第三方库miniprogram-sm-crypto来实现。这个库提供了SM2、SM3和SM4算法的实现&#xff0c;可以在小程序和uniapp项目中使用。 1.1. 安装miniprogram-sm-crypto 首先&#xff0c;你需要…

07_持续集成与部署:DevOps的核心引擎

07_持续集成与部署:DevOps的核心引擎 引言 在快速迭代的软件开发时代,持续集成(CI)与持续部署(CD)已成为企业提升竞争力的关键。通过自动化构建、测试和部署流程,CI/CD能够显著缩短交付周期,提高软件质量,降低发布风险。本文将深入探讨CI/CD的核心理念、实施路径与最…

电脑休眠设置

Dont Sleep的意思就是“不要睡觉”&#xff0c;用在电脑里就是“阻止休眠”的意思。但这款软件其实有“阻止休眠”和“允许休眠”两个功能。 阻止休眠时可以选择事件&#xff0c;是计时器、电池、CPU、网络这几个事件进行触发阻止休假的功能。 允许休眠也可以根据自己的需求进行…

蓝牙墨水屏上位机学习(3)

main.js中sendimg()函数学习&#xff0c;对应发送图片按钮函数代码如下&#xff1a;async function sendimg() {const canvasSize document.getElementById(canvasSize).value;const ditherMode document.getElementById(ditherMode).value;const epdDriverSelect document.…