目录
- 项目环境
- 一、db_config.py
- 二、mysql_executor.py
- 三、test/main.py
在使用mysql-connector-python连接MySQL数据库的时候,如同Java中的jdbc一般,每条sql需要创建和删除连接,很自然就想到写一个抽象方法,但是找了找没有官方标准的,或者使用SQLAlchemy等类似的orm框架,于是调试deepseek写了一个
项目背景:【保姆级喂饭教程】uv教程一文讲透:安装,创建,配置,工具,命令
项目环境
安装库,
【沉浸式解决问题】mysql-connector-python连接数据库:RuntimeError: Failed raising error.
uv add mysql-connector-python==8.0.33
目录,init文件为空
一、db_config.py
class DBConfig:"""数据库配置基类"""@classmethoddef get_config(cls, name: str) -> dict:"""获取指定数据库配置"""return getattr(cls, name).valueclass MySQLConfig(DBConfig):"""MySQL数据库配置"""# 基础配置模板BASE = {"pool_name": "mysql_pool","pool_size": 10,"pool_reset_session": True,"charset": "utf8mb4","autocommit": False}# 具体数据库实例配置PYTHON_DB = {**BASE,"host": "127.0.0.1","port": 3306,"user": "python","password": "python","database": "test-python"}LOG_DB = {**BASE,"host": "log-db.example.com","port": 3307,"user": "log_user","password": "log_password","database": "app_logs"}
二、mysql_executor.py
"""
MySQL 数据库连接池执行器
提供安全、高效的数据库操作接口,支持连接池、事务管理和批量操作
"""from mysql.connector import pooling, Error
from typing import Union, List, Tuple, Dict, Anyclass MySQLExecutor:"""MySQL 数据库操作通用封装类(连接池版)特性:1. 基于连接池实现高效连接管理2. 支持自动和手动事务控制3. 提供查询、执行、批量操作等多种方法4. 完善的异常处理和资源管理5. 详细的日志记录使用示例:with MySQLExecutor(config) as executor:# 执行查询results = executor.query("SELECT * FROM users WHERE age > %s", (25,))# 执行写入操作executor.execute("UPDATE users SET status = 'active' WHERE id = %s", (1,))# 批量操作executor.batch_execute("INSERT INTO logs (message) VALUES (%s)", [('log1',), ('log2',)])"""def __init__(self, config: Dict[str, Any]):"""初始化数据库执行器参数:config: 数据库配置字典,必须包含连接池相关参数示例:{"pool_name": "mysql_pool","pool_size": 10,"host": "127.0.0.1","port": 3306,"user": "root","password": "password","database": "test_db"}异常:ConnectionError: 如果连接池创建失败"""self.config = configself.pool = self._create_pool()self.active_connection = None # 当前活动连接(用于事务)def _create_pool(self) -> pooling.MySQLConnectionPool:"""创建数据库连接池返回:pooling.MySQLConnectionPool: MySQL连接池实例异常:ConnectionError: 如果连接池创建失败"""try:print(f"正在创建MySQL连接池: {self.config.get('pool_name', 'unnamed_pool')}")return pooling.MySQLConnectionPool(**self.config)except Error as e:error_msg = f"创建MySQL连接池失败: {e}"print(error_msg)raise ConnectionError(error_msg) from edef __enter__(self):"""支持with上下文管理返回执行器实例本身"""return selfdef __exit__(self, exc_type, exc_val, exc_tb):"""退出上下文时自动关闭活动连接"""self.close_active_connection()def connection(self):"""返回一个连接上下文管理器使用示例:with executor.connection() as conn:with conn.cursor() as cursor:cursor.execute("SELECT * FROM table")results = cursor.fetchall()"""return self._ConnectionContext(self)class _ConnectionContext:"""连接上下文管理器内部类用于自动管理连接的获取和释放"""def __init__(self, executor):self.executor = executorself.conn = Nonedef __enter__(self) -> pooling.PooledMySQLConnection:"""进入上下文时从连接池获取连接"""try:self.conn = self.executor.get_connection()return self.connexcept Error as e:error_msg = f"获取数据库连接失败: {e}"print(error_msg)raise ConnectionError(error_msg) from edef __exit__(self, exc_type, exc_val, exc_tb):"""退出上下文时关闭连接"""if self.conn and self.conn.is_connected():try:self.conn.close()except Error as e:print(f"关闭数据库连接时出错: {e}")def get_connection(self) -> pooling.PooledMySQLConnection:"""从连接池获取一个新的数据库连接返回:pooling.PooledMySQLConnection: 数据库连接对象注意:使用后需要手动关闭连接,推荐使用connection()上下文管理器"""return self.pool.get_connection()def close_active_connection(self):"""关闭当前活动连接(如果存在)主要用于清理事务连接"""if self.active_connection and self.active_connection.is_connected():try:self.active_connection.close()self.active_connection = Noneexcept Error as e:print(f"关闭活动连接时出错: {e}")def start_transaction(self):"""开始一个新的事务异常:ConnectionError: 如果获取连接失败"""try:if not self.active_connection or not self.active_connection.is_connected():self.active_connection = self.get_connection()self.active_connection.start_transaction()print("事务已开始")except Error as e:error_msg = f"开始事务失败: {e}"print(error_msg)raise ConnectionError(error_msg) from edef commit(self):"""提交当前事务并关闭活动连接异常:ConnectionError: 如果提交失败或没有活动事务"""if not self.active_connection:print("警告: 尝试提交但无活动事务")returntry:self.active_connection.commit()print("事务已提交")except Error as e:error_msg = f"提交事务失败: {e}"print(error_msg)raise ConnectionError(error_msg) from efinally:self.close_active_connection()def rollback(self):"""回滚当前事务并关闭活动连接异常:ConnectionError: 如果回滚失败"""if not self.active_connection:print("警告: 尝试回滚但无活动事务")returntry:self.active_connection.rollback()print("事务已回滚")except Error as e:error_msg = f"回滚事务失败: {e}"print(error_msg)raise ConnectionError(error_msg) from efinally:self.close_active_connection()def query(self,sql: str,params: Union[Tuple, List, None] = None,dictionary: bool = False) -> Union[List[Tuple], List[Dict]]:"""执行查询语句(SELECT)参数:sql: SQL查询语句params: 查询参数(可选)dictionary: 是否返回字典格式结果(默认为元组)返回:查询结果列表(元组或字典格式)异常:DatabaseError: 如果查询执行失败"""try:with self.connection() as conn:with conn.cursor(dictionary=dictionary) as cursor:cursor.execute(sql, params)return cursor.fetchall()except Error as e:error_msg = f"查询执行失败: {e}\nSQL: {sql}\n参数: {params}"print(error_msg)raise DatabaseError(error_msg) from edef execute(self,sql: str,params: Union[Tuple, List, None] = None,commit: bool = True) -> int:"""执行非查询语句(INSERT/UPDATE/DELETE)参数:sql: SQL操作语句params: 操作参数(可选)commit: 是否自动提交事务(默认为True)返回:受影响的行数异常:DatabaseError: 如果操作执行失败"""try:with self.connection() as conn:with conn.cursor() as cursor:cursor.execute(sql, params)affected_rows = cursor.rowcountif commit:conn.commit()return affected_rowsexcept Error as e:error_msg = f"操作执行失败: {e}\nSQL: {sql}\n参数: {params}"print(error_msg)raise DatabaseError(error_msg) from edef batch_execute(self,sql: str,params_list: List[Union[Tuple, List]],commit: bool = True) -> int:"""批量执行操作(高效)参数:sql: SQL操作语句params_list: 参数列表commit: 是否提交事务(默认为True)返回:受影响的总行数异常:DatabaseError: 如果批量操作失败ValueError: 如果参数列表为空"""if not params_list:raise ValueError("参数列表不能为空")try:with self.connection() as conn:with conn.cursor() as cursor:cursor.executemany(sql, params_list)affected_rows = cursor.rowcountif commit:conn.commit()return affected_rowsexcept Error as e:error_msg = f"批量操作失败: {e}\nSQL: {sql}\n参数数量: {len(params_list)}"print(error_msg)# 尝试回滚事务try:if conn and conn.is_connected():conn.rollback()except Error as rollback_err:print(f"回滚批量操作失败: {rollback_err}")raise DatabaseError(error_msg) from edef execute_many(self,sql_commands: List[str],params_list: List[Union[Tuple, List, None]] = None,commit: bool = True) -> int:"""执行多个SQL命令(可包含不同操作)参数:sql_commands: SQL命令列表params_list: 参数列表(可选,默认为None)commit: 是否提交事务(默认为True)返回:受影响的总行数异常:DatabaseError: 如果执行失败ValueError: 如果命令和参数长度不匹配"""if params_list is None:params_list = [None] * len(sql_commands)if len(sql_commands) != len(params_list):raise ValueError(f"SQL命令({len(sql_commands)})和参数列表({len(params_list)})长度不一致")try:with self.connection() as conn:with conn.cursor() as cursor:total_affected = 0for i, (sql, params) in enumerate(zip(sql_commands, params_list)):try:cursor.execute(sql, params)total_affected += cursor.rowcountexcept Error as e:# 记录具体哪个命令失败error_msg = f"执行第 {i + 1} 条命令失败: {e}\nSQL: {sql}\n参数: {params}"print(error_msg)raise DatabaseError(error_msg) from eif commit:conn.commit()return total_affectedexcept Error as e:# 回滚整个事务try:if conn and conn.is_connected():conn.rollback()except Error as rollback_err:print(f"回滚操作失败: {rollback_err}")raise DatabaseError(f"多命令执行失败: {e}") from e# 自定义异常类,提供更清晰的错误分类
class DatabaseError(Exception):"""数据库操作异常基类"""passclass ConnectionError(DatabaseError):"""数据库连接相关异常"""pass
三、test/main.py
from mysql.db_config import MySQLConfig
from mysql.mysql_executor import MySQLExecutor, ConnectionError, DatabaseError# 获取数据库配置
db_config = MySQLConfig.get_config("PYTHON_DB")try:# 创建执行器(使用with上下文确保资源清理)with MySQLExecutor(db_config) as executor:print("=" * 50)print("示例1: 基本查询")print("=" * 50)try:# 执行查询 - 返回字典格式结果users = executor.query("SELECT * FROM users WHERE age > %s",(25,),dictionary=True)print(f"查询成功,获取到 {len(users)} 条记录")if users:print(f"第一条记录: {users[0]}")except DatabaseError as e:print(f"查询操作失败: {e}")# 在实际应用中,这里可以记录日志或执行恢复操作print("\n" + "=" * 50)print("示例2: 插入数据")print("=" * 50)try:# 插入新用户new_user = ("Alice", "alice@example.com", 30)insert_sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"affected = executor.execute(insert_sql, new_user)print(f"插入成功,影响行数: {affected}")except DatabaseError as e:print(f"插入操作失败: {e}")print("\n" + "=" * 50)print("示例3: 批量插入")print("=" * 50)try:# 批量插入用户users_data = [("Bob", "bob@example.com", 28),("Charlie", "charlie@example.com", 35),("David", "david@example.com", 42)]insert_sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"# 批量执行batch_affected = executor.batch_execute(insert_sql, users_data)print(f"批量插入成功,总影响行数: {batch_affected}")except (DatabaseError, ValueError) as e:print(f"批量插入失败: {e}")print("\n" + "=" * 50)print("示例4: 事务处理")print("=" * 50)try:# 开始事务executor.start_transaction()print("事务已开始")try:# 事务内操作1:更新用户update_sql = "UPDATE users SET age = %s WHERE id = %s"executor.execute(update_sql, (31, 1), commit=False)print("用户更新成功")# 事务内操作2:插入日志log_sql = "INSERT INTO activity_log (user_id, action) VALUES (%s, %s)"executor.execute(log_sql, (1, "age_update"), commit=False)print("日志插入成功")# 查询事务内数据user_data = executor.query("SELECT * FROM users WHERE id = %s",(1,),dictionary=True)print(f"事务内用户数据: {user_data}")# 提交事务executor.commit()print("事务操作成功完成")except DatabaseError as e:# 事务内部操作失败,回滚事务print(f"事务内操作失败: {e}")executor.rollback()print("已回滚事务")raise # 继续向上抛出异常except (DatabaseError, ConnectionError) as e:print(f"事务处理失败: {e}")print("\n" + "=" * 50)print("示例5: 复杂操作(多个SQL命令)")print("=" * 50)try:# 转账操作(多个SQL命令)sql_commands = ["UPDATE accounts SET balance = balance - 100 WHERE id = 1","UPDATE accounts SET balance = balance + 100 WHERE id = 2","INSERT INTO transactions (from_acc, to_acc, amount) VALUES (1, 2, 100)"]# 执行多个命令total_affected = executor.execute_many(sql_commands)print(f"转账操作完成,总影响行数: {total_affected}")except (DatabaseError, ValueError) as e:print(f"多命令执行失败: {e}")print("\n" + "=" * 50)print("示例6: 直接使用连接上下文")print("=" * 50)try:# 直接使用连接上下文with executor.connection() as conn:with conn.cursor(dictionary=True) as cursor:cursor.execute("SELECT * FROM users WHERE status = 'active'")active_users = cursor.fetchall()print(f"活跃用户数量: {len(active_users)}")# 同一个连接中执行另一个操作with conn.cursor() as cursor:cursor.execute("UPDATE users SET last_login = NOW() WHERE status = 'active'")conn.commit()print(f"更新了 {cursor.rowcount} 个用户的登录时间")except (ConnectionError, DatabaseError) as e:print(f"连接上下文操作失败: {e}")except ConnectionError as e:print(f"数据库连接初始化失败: {e}")# 在实际应用中,这里应该进行更严格的错误处理
finally:print("所有数据库操作完成")