【保姆级喂饭教程】python基于mysql-connector-python的数据库操作通用封装类(连接池版)

目录

  • 项目环境
  • 一、db_config.py
  • 二、mysql_executor.py
  • 三、test/main.py

在使用mysql-connector-python连接MySQL数据库的时候,如同Java中的jdbc一般,每条sql需要创建和删除连接,很自然就想到写一个抽象方法,但是找了找没有官方标准的,或者使用SQLAlchemy等类似的orm框架,于是调试deepseek写了一个

项目背景:【保姆级喂饭教程】uv教程一文讲透:安装,创建,配置,工具,命令

项目环境

安装库,
【沉浸式解决问题】mysql-connector-python连接数据库:RuntimeError: Failed raising error.

uv add mysql-connector-python==8.0.33

在这里插入图片描述

目录,init文件为空
在这里插入图片描述

一、db_config.py


class DBConfig:"""数据库配置基类"""@classmethoddef get_config(cls, name: str) -> dict:"""获取指定数据库配置"""return getattr(cls, name).valueclass MySQLConfig(DBConfig):"""MySQL数据库配置"""# 基础配置模板BASE = {"pool_name": "mysql_pool","pool_size": 10,"pool_reset_session": True,"charset": "utf8mb4","autocommit": False}# 具体数据库实例配置PYTHON_DB = {**BASE,"host": "127.0.0.1","port": 3306,"user": "python","password": "python","database": "test-python"}LOG_DB = {**BASE,"host": "log-db.example.com","port": 3307,"user": "log_user","password": "log_password","database": "app_logs"}

二、mysql_executor.py

"""
MySQL 数据库连接池执行器
提供安全、高效的数据库操作接口,支持连接池、事务管理和批量操作
"""from mysql.connector import pooling, Error
from typing import Union, List, Tuple, Dict, Anyclass MySQLExecutor:"""MySQL 数据库操作通用封装类(连接池版)特性:1. 基于连接池实现高效连接管理2. 支持自动和手动事务控制3. 提供查询、执行、批量操作等多种方法4. 完善的异常处理和资源管理5. 详细的日志记录使用示例:with MySQLExecutor(config) as executor:# 执行查询results = executor.query("SELECT * FROM users WHERE age > %s", (25,))# 执行写入操作executor.execute("UPDATE users SET status = 'active' WHERE id = %s", (1,))# 批量操作executor.batch_execute("INSERT INTO logs (message) VALUES (%s)", [('log1',), ('log2',)])"""def __init__(self, config: Dict[str, Any]):"""初始化数据库执行器参数:config: 数据库配置字典,必须包含连接池相关参数示例:{"pool_name": "mysql_pool","pool_size": 10,"host": "127.0.0.1","port": 3306,"user": "root","password": "password","database": "test_db"}异常:ConnectionError: 如果连接池创建失败"""self.config = configself.pool = self._create_pool()self.active_connection = None  # 当前活动连接(用于事务)def _create_pool(self) -> pooling.MySQLConnectionPool:"""创建数据库连接池返回:pooling.MySQLConnectionPool: MySQL连接池实例异常:ConnectionError: 如果连接池创建失败"""try:print(f"正在创建MySQL连接池: {self.config.get('pool_name', 'unnamed_pool')}")return pooling.MySQLConnectionPool(**self.config)except Error as e:error_msg = f"创建MySQL连接池失败: {e}"print(error_msg)raise ConnectionError(error_msg) from edef __enter__(self):"""支持with上下文管理返回执行器实例本身"""return selfdef __exit__(self, exc_type, exc_val, exc_tb):"""退出上下文时自动关闭活动连接"""self.close_active_connection()def connection(self):"""返回一个连接上下文管理器使用示例:with executor.connection() as conn:with conn.cursor() as cursor:cursor.execute("SELECT * FROM table")results = cursor.fetchall()"""return self._ConnectionContext(self)class _ConnectionContext:"""连接上下文管理器内部类用于自动管理连接的获取和释放"""def __init__(self, executor):self.executor = executorself.conn = Nonedef __enter__(self) -> pooling.PooledMySQLConnection:"""进入上下文时从连接池获取连接"""try:self.conn = self.executor.get_connection()return self.connexcept Error as e:error_msg = f"获取数据库连接失败: {e}"print(error_msg)raise ConnectionError(error_msg) from edef __exit__(self, exc_type, exc_val, exc_tb):"""退出上下文时关闭连接"""if self.conn and self.conn.is_connected():try:self.conn.close()except Error as e:print(f"关闭数据库连接时出错: {e}")def get_connection(self) -> pooling.PooledMySQLConnection:"""从连接池获取一个新的数据库连接返回:pooling.PooledMySQLConnection: 数据库连接对象注意:使用后需要手动关闭连接,推荐使用connection()上下文管理器"""return self.pool.get_connection()def close_active_connection(self):"""关闭当前活动连接(如果存在)主要用于清理事务连接"""if self.active_connection and self.active_connection.is_connected():try:self.active_connection.close()self.active_connection = Noneexcept Error as e:print(f"关闭活动连接时出错: {e}")def start_transaction(self):"""开始一个新的事务异常:ConnectionError: 如果获取连接失败"""try:if not self.active_connection or not self.active_connection.is_connected():self.active_connection = self.get_connection()self.active_connection.start_transaction()print("事务已开始")except Error as e:error_msg = f"开始事务失败: {e}"print(error_msg)raise ConnectionError(error_msg) from edef commit(self):"""提交当前事务并关闭活动连接异常:ConnectionError: 如果提交失败或没有活动事务"""if not self.active_connection:print("警告: 尝试提交但无活动事务")returntry:self.active_connection.commit()print("事务已提交")except Error as e:error_msg = f"提交事务失败: {e}"print(error_msg)raise ConnectionError(error_msg) from efinally:self.close_active_connection()def rollback(self):"""回滚当前事务并关闭活动连接异常:ConnectionError: 如果回滚失败"""if not self.active_connection:print("警告: 尝试回滚但无活动事务")returntry:self.active_connection.rollback()print("事务已回滚")except Error as e:error_msg = f"回滚事务失败: {e}"print(error_msg)raise ConnectionError(error_msg) from efinally:self.close_active_connection()def query(self,sql: str,params: Union[Tuple, List, None] = None,dictionary: bool = False) -> Union[List[Tuple], List[Dict]]:"""执行查询语句(SELECT)参数:sql: SQL查询语句params: 查询参数(可选)dictionary: 是否返回字典格式结果(默认为元组)返回:查询结果列表(元组或字典格式)异常:DatabaseError: 如果查询执行失败"""try:with self.connection() as conn:with conn.cursor(dictionary=dictionary) as cursor:cursor.execute(sql, params)return cursor.fetchall()except Error as e:error_msg = f"查询执行失败: {e}\nSQL: {sql}\n参数: {params}"print(error_msg)raise DatabaseError(error_msg) from edef execute(self,sql: str,params: Union[Tuple, List, None] = None,commit: bool = True) -> int:"""执行非查询语句(INSERT/UPDATE/DELETE)参数:sql: SQL操作语句params: 操作参数(可选)commit: 是否自动提交事务(默认为True)返回:受影响的行数异常:DatabaseError: 如果操作执行失败"""try:with self.connection() as conn:with conn.cursor() as cursor:cursor.execute(sql, params)affected_rows = cursor.rowcountif commit:conn.commit()return affected_rowsexcept Error as e:error_msg = f"操作执行失败: {e}\nSQL: {sql}\n参数: {params}"print(error_msg)raise DatabaseError(error_msg) from edef batch_execute(self,sql: str,params_list: List[Union[Tuple, List]],commit: bool = True) -> int:"""批量执行操作(高效)参数:sql: SQL操作语句params_list: 参数列表commit: 是否提交事务(默认为True)返回:受影响的总行数异常:DatabaseError: 如果批量操作失败ValueError: 如果参数列表为空"""if not params_list:raise ValueError("参数列表不能为空")try:with self.connection() as conn:with conn.cursor() as cursor:cursor.executemany(sql, params_list)affected_rows = cursor.rowcountif commit:conn.commit()return affected_rowsexcept Error as e:error_msg = f"批量操作失败: {e}\nSQL: {sql}\n参数数量: {len(params_list)}"print(error_msg)# 尝试回滚事务try:if conn and conn.is_connected():conn.rollback()except Error as rollback_err:print(f"回滚批量操作失败: {rollback_err}")raise DatabaseError(error_msg) from edef execute_many(self,sql_commands: List[str],params_list: List[Union[Tuple, List, None]] = None,commit: bool = True) -> int:"""执行多个SQL命令(可包含不同操作)参数:sql_commands: SQL命令列表params_list: 参数列表(可选,默认为None)commit: 是否提交事务(默认为True)返回:受影响的总行数异常:DatabaseError: 如果执行失败ValueError: 如果命令和参数长度不匹配"""if params_list is None:params_list = [None] * len(sql_commands)if len(sql_commands) != len(params_list):raise ValueError(f"SQL命令({len(sql_commands)})和参数列表({len(params_list)})长度不一致")try:with self.connection() as conn:with conn.cursor() as cursor:total_affected = 0for i, (sql, params) in enumerate(zip(sql_commands, params_list)):try:cursor.execute(sql, params)total_affected += cursor.rowcountexcept Error as e:# 记录具体哪个命令失败error_msg = f"执行第 {i + 1} 条命令失败: {e}\nSQL: {sql}\n参数: {params}"print(error_msg)raise DatabaseError(error_msg) from eif commit:conn.commit()return total_affectedexcept Error as e:# 回滚整个事务try:if conn and conn.is_connected():conn.rollback()except Error as rollback_err:print(f"回滚操作失败: {rollback_err}")raise DatabaseError(f"多命令执行失败: {e}") from e# 自定义异常类,提供更清晰的错误分类
class DatabaseError(Exception):"""数据库操作异常基类"""passclass ConnectionError(DatabaseError):"""数据库连接相关异常"""pass

三、test/main.py

from mysql.db_config import MySQLConfig
from mysql.mysql_executor import MySQLExecutor, ConnectionError, DatabaseError# 获取数据库配置
db_config = MySQLConfig.get_config("PYTHON_DB")try:# 创建执行器(使用with上下文确保资源清理)with MySQLExecutor(db_config) as executor:print("=" * 50)print("示例1: 基本查询")print("=" * 50)try:# 执行查询 - 返回字典格式结果users = executor.query("SELECT * FROM users WHERE age > %s",(25,),dictionary=True)print(f"查询成功,获取到 {len(users)} 条记录")if users:print(f"第一条记录: {users[0]}")except DatabaseError as e:print(f"查询操作失败: {e}")# 在实际应用中,这里可以记录日志或执行恢复操作print("\n" + "=" * 50)print("示例2: 插入数据")print("=" * 50)try:# 插入新用户new_user = ("Alice", "alice@example.com", 30)insert_sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"affected = executor.execute(insert_sql, new_user)print(f"插入成功,影响行数: {affected}")except DatabaseError as e:print(f"插入操作失败: {e}")print("\n" + "=" * 50)print("示例3: 批量插入")print("=" * 50)try:# 批量插入用户users_data = [("Bob", "bob@example.com", 28),("Charlie", "charlie@example.com", 35),("David", "david@example.com", 42)]insert_sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"# 批量执行batch_affected = executor.batch_execute(insert_sql, users_data)print(f"批量插入成功,总影响行数: {batch_affected}")except (DatabaseError, ValueError) as e:print(f"批量插入失败: {e}")print("\n" + "=" * 50)print("示例4: 事务处理")print("=" * 50)try:# 开始事务executor.start_transaction()print("事务已开始")try:# 事务内操作1:更新用户update_sql = "UPDATE users SET age = %s WHERE id = %s"executor.execute(update_sql, (31, 1), commit=False)print("用户更新成功")# 事务内操作2:插入日志log_sql = "INSERT INTO activity_log (user_id, action) VALUES (%s, %s)"executor.execute(log_sql, (1, "age_update"), commit=False)print("日志插入成功")# 查询事务内数据user_data = executor.query("SELECT * FROM users WHERE id = %s",(1,),dictionary=True)print(f"事务内用户数据: {user_data}")# 提交事务executor.commit()print("事务操作成功完成")except DatabaseError as e:# 事务内部操作失败,回滚事务print(f"事务内操作失败: {e}")executor.rollback()print("已回滚事务")raise  # 继续向上抛出异常except (DatabaseError, ConnectionError) as e:print(f"事务处理失败: {e}")print("\n" + "=" * 50)print("示例5: 复杂操作(多个SQL命令)")print("=" * 50)try:# 转账操作(多个SQL命令)sql_commands = ["UPDATE accounts SET balance = balance - 100 WHERE id = 1","UPDATE accounts SET balance = balance + 100 WHERE id = 2","INSERT INTO transactions (from_acc, to_acc, amount) VALUES (1, 2, 100)"]# 执行多个命令total_affected = executor.execute_many(sql_commands)print(f"转账操作完成,总影响行数: {total_affected}")except (DatabaseError, ValueError) as e:print(f"多命令执行失败: {e}")print("\n" + "=" * 50)print("示例6: 直接使用连接上下文")print("=" * 50)try:# 直接使用连接上下文with executor.connection() as conn:with conn.cursor(dictionary=True) as cursor:cursor.execute("SELECT * FROM users WHERE status = 'active'")active_users = cursor.fetchall()print(f"活跃用户数量: {len(active_users)}")# 同一个连接中执行另一个操作with conn.cursor() as cursor:cursor.execute("UPDATE users SET last_login = NOW() WHERE status = 'active'")conn.commit()print(f"更新了 {cursor.rowcount} 个用户的登录时间")except (ConnectionError, DatabaseError) as e:print(f"连接上下文操作失败: {e}")except ConnectionError as e:print(f"数据库连接初始化失败: {e}")# 在实际应用中,这里应该进行更严格的错误处理
finally:print("所有数据库操作完成")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/917921.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/917921.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【MCP服务】蓝耘元生代 | 蓝耘MCP平台来袭!DeepSeek MCP服务器玩转大模型集成

【作者主页】Francek Chen 【专栏介绍】⌈⌈⌈人工智能与大模型应用⌋⌋⌋ 人工智能(AI)通过算法模拟人类智能,利用机器学习、深度学习等技术驱动医疗、金融等领域的智能化。大模型是千亿参数的深度神经网络(如ChatGPT&#xff09…

Spring Boot 整合 Minio 实现高效文件存储解决方案(本地和线上)

文章目录前言一、配置1.配置文件:application.yml2.配置类:MinioProperties3.工具类:MinioUtil3.1 初始化方法3.2 核心功能3.3 关键技术点二、使用示例1.控制器类:FileController2.服务类3.效果展示总结前言 Minio 是一个高性能的…

【Unity3D实例-功能-镜头】第三人称视觉-镜头优化

这一篇我们一起来调整一下Cinemachine的第三人称视觉的镜头设置。一般用于ARPG角色扮演游戏的场景中。Unity里头,这种视角简直就是标配。来吧,咱们一起研究研究怎么调出这种视角效果!目录:1.调整虚拟摄像机的Y轴2.调整虚拟摄像机的…

二叉树算法之【中序遍历】

目录 LeetCode-94题 LeetCode-94题 给定一个二叉树的根节点root&#xff0c;返回它的中序遍历结果。 class Solution {public List<Integer> inorderTraversal(TreeNode root) {List<Integer> result new ArrayList<>();order(root, result);return res…

Android14的QS面板的加载解析

/frameworks/base/packages/SystemUI/src/com/android/systemui/statusbar/phone/CentralSurfacesImpl.java QS 面板的创建 getNotificationShadeWindowView()&#xff1a;整个systemui的最顶级的视图容器&#xff08;super_notification_shade.xml&#xff09;R.id.qs_frame &…

解锁webpack核心技能(二):配置文件和devtool配置指南

一、配置文件webpack 提供的 cli 支持很多的参数&#xff0c;例如 --mode 。在我们平时的开发过程中&#xff0c;我们要学习很多的功能&#xff0c;这些很多都是可以用参数来完成的。那么后边就会导致参数越来越多&#xff0c;我们使用命令特别的不方便&#xff0c;所以我们会使…

Gitlab+Jenkins+K8S+Registry 建立 CI/CD 流水线

一、前言 DevOps是一种将开发&#xff08;Development&#xff09;和运维&#xff08;Operations&#xff09;相结合的软件开发方法论。它通过自动化和持续交付的方式&#xff0c;将软件开发、测试和部署等环节紧密集成&#xff0c;以提高效率和产品质量。在本篇博客中&#xf…

【Linux】特效爆满的Vim的配置方法 and make/Makefile原理

一、软件包管理器 1、Linux下安装软件的常见方式&#xff1a; 1&#xff09;源代码安装——不推荐。 2&#xff09;rpm包安装——不推荐。 3&#xff09;包管理器安装——推荐 2、安装软件命令 # Centos$ sudo yum install -y lrzsz# Ubuntu$ sudo apt install -y lrzsz 3、卸…

Spring Boot Actuator 监控功能的简介及禁用

Spring Boot Actuator: Production-ready Features 1. 添加依赖 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency> </dependencie…

Matlab(1)

一、基本操作1. matlab四则运算规则&#xff1a;先乘除后加减&#xff0c;从左到右2、对数和指数的表示sin(pi^0.5)log(tan(1))exp&#xff08;sin&#xff08;10&#xff09;&#xff09;3、类型&#xff1a;matlab变量默认为double4、who&whos&#xff1a;命令行输入who&…

Kotlin Android 开发脚手架封装

Kotlin Android 开发脚手架封装&#xff08;模块化版本&#xff09; 我将按照模块化设计原则&#xff0c;将脚手架拆分为多个文件&#xff0c;每个文件负责特定功能领域&#xff1a; 1. 核心初始化模块 文件路径: core/AppScaffold.kt object AppScaffold {lateinit var contex…

Flutter 报错解析:No TabController for TabBar 的完整解决方案

目录 Flutter 报错解析&#xff1a;No TabController for TabBar 的完整解决方案 一、错误场景&#xff1a;当 TabBar 失去 "指挥官" 二、为什么 TabBar 必须依赖 Controller&#xff1f; 1. TabBar 与 TabController 的协作关系 2. 状态管理的核心作用 3. 实战…

【24】C++实战篇——【 C++ 外部变量】 C++多个文件共用一个枚举变量,外部变量 extern,枚举外部变量 enum

文章目录1 方法2 外部变量 应用2.1 普通外部全局变量2.2 枚举外部全局变量 应用2.2.2 枚举外部变量优化c多个文件中如何共用一个全局变量 c头文件的使用和多个文件中如何共用一个全局变量 C共享枚举类型给QML 1 方法 ①头文件中 声明外部全局变量&#xff1b; ②在头文件对…

Linux SELinux 核心概念与管理

Linux SELinux 核心概念与管理一、SELinux 基本概念 SELinux 即安全增强型 Linux&#xff08;Security-Enhanced Linux&#xff09;&#xff0c;由美国国家安全局&#xff08;NSA&#xff09;开发&#xff0c;是一套基于强制访问控制&#xff08;MAC&#xff09;的安全机制&…

Git 中**未暂存**和**未跟踪**的区别:

文件状态分类 Git 中的文件有以下几种状态&#xff1a; 工作区文件状态&#xff1a; ├── 未跟踪 (Untracked) ├── 已跟踪 (Tracked)├── 未修改 (Unmodified) ├── 已修改未暂存 (Modified/Unstaged)└── 已暂存 (Staged)1. 未跟踪 (Untracked) 定义&#xff1a;Gi…

前端1.0

目录 一、 什么是前端 二、 HTML 1.0 概述 2.0 注释 三、开发环境的搭建 1.0 插件 2.0 笔记 四、 常见标签&#xff08;重点&#xff09; 四、案例展示&#xff08;图片代码&#xff09; 五、CSS引入 一、 什么是前端 web前端 用来直接给用户呈现一个一个的网页 …

Flutter镜像替换

一、核心镜像替换&#xff08;针对 Maven 仓库&#xff09; Flutter 依赖的 Google Maven 仓库&#xff08;https://maven.google.com 或 https://dl.google.com/dl/android/maven2&#xff09;可替换为国内镜像&#xff0c;常见的有&#xff1a;阿里云镜像&#xff08;推荐&am…

MATLAB实现的改进遗传算法用于有约束优化问题

基于MATLAB实现的改进遗传算法&#xff08;GA&#xff09;用于有约束优化问题的代码&#xff0c;包括处理非线性约束。此代码通过引入惩罚函数和修复机制&#xff0c;有效处理约束条件&#xff0c;提高算法的鲁棒性和收敛速度。 1. 定义优化问题 % 定义目标函数 function f ob…

Qt子类化QWidget后,使用setStyleSheet设置样式无效的解决方案

关键代码&#xff1a; #include <QPainter> #include <QStyleOption>void paintEvent(QPaintEvent *e) {QStyleOption opt;opt.init(this);QPainter p(this);style()->drawPrimitive(QStyle::PE_Widget, &opt, &p, this);QWidget::paintEvent(e); }定义…

【python中级】关于Flask服务在同一系统里如何只被运行一次

【python中级】关于Flask服务在同一系统里如何只被运行一次 1.背景 2.方案1 2.方案2 1.背景 python Flask实现的一个http服务,打包成应用程序exe后在windows10系统运行; 由于我会不断的更新这个http服务,我希望运行这个http服务的时候之前的http服务被停掉; 即实现 Pytho…