1、封装sql连接
test_db.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from fastapi import Request, Depends# 1. 数据库连接配置
async_engine = create_async_engine("mysql+aiomysql://root:root@127.0.0.1:3306/dade?charset=utf8mb4",pool_size=30, # 数据库连接池max_overflow=60, # 连接池最大溢出连接数pool_recycle=3600, # 设置时间以限制数据库自动断开pool_timeout=60, # 连接超时时间,默认30秒,超过时间连接都会失败pool_pre_ping=True, # 连接有效性校验,在从连接池获取连接前,自动发送一个 “心跳检测”(如 SELECT 1),校验连接是否有效(避免使用 “已断开的死连接”)。
)# 2. 异步会话工厂,class_=AsyncSession,# 指定异步会话类
AsyncSessionLocal = sessionmaker(bind=async_engine,class_=AsyncSession,autocommit=False,autoflush=False)# 3. 全局数据库依赖(负责初始化会话到 Request.state)
async def get_db(request: Request):if not hasattr(request.state, "db"):request.state.db = AsyncSessionLocal() # 初始化会话并存入请求状态try:yield request.state.dbfinally:# 请求结束后关闭会话,避免资源泄漏if hasattr(request.state, "db"):await request.state.db.close()del request.state.db# 4. 从外部传入的 Request 中提取会话
async def get_current_db(request: Request):"""直接从路由的 Request 对象中获取数据库会话"""# 验证会话是否已被全局依赖初始化if not hasattr(request.state, "db"):raise RuntimeError("数据库会话未初始化!请检查 main.py 是否配置了全局依赖 get_db")return request.state.db
2、main.py文件
import sys
from pathlib import Path# 1. 注入项目根目录到 Python 搜索路径,获取当前文件(user/main.py)的根目录(dome001)
root_dir = Path(__file__).resolve().parent.parent
sys.path.append(str(root_dir))from dotenv import load_dotenv
load_dotenv(dotenv_path=root_dir / ".env")
# 获取.env中的参数
import os
db_host = os.getenv("DADE")from fastapi import FastAPI,Depends
from starlette.staticfiles import StaticFiles
from demo.common.test_db import get_db# 微服务http请求
from common_http.background_user import *
print(http_user_dade("dadae微服务"))#dependencies全局注入数据库
app = FastAPI(dependencies=[Depends(get_db)])# 静态文件
current_dir = Path(__file__).parent # __file__ 是当前脚本的路径,.parent 是其所在目录(app/)
static_dir = current_dir.parent / "static" # 上级目录(cashier_v5/) + static 文件夹
# 检查并创建(如果不存在)
if not static_dir.exists():static_dir.mkdir(parents=True, exist_ok=True) # parents=True 表示递归创建上级目录,exist_ok=True 表示目录已存在时不报错print(f"已自动创建静态文件目录:{static_dir.resolve()}")
else:print(f"静态文件目录已存在:{static_dir.resolve()}")
app.mount("/static", StaticFiles(directory=static_dir), name="static")from urls import global_router
app.include_router(global_router)
@app.get("/",tags=["服务健康检测"])
async def root():return {"message": "欢迎demo服务"}if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=9000)
主要
from fastapi import FastAPI,Depends
from starlette.staticfiles import StaticFiles
from demo.common.test_db import get_db
#dependencies全局注入数据库
app = FastAPI(dependencies=[Depends(get_db)])
3、全局可以这样使用,通过 request.state.db
@index_routes.post("/req_data",summary="请求体传参,文档",description="这是一个示例接口",response_model=ReqDataSerializer)
async def req_data(request: Request,req: ReqDataSerializer):# 1. 传入路由的 request 对象,获取数据库会话db = await get_current_db(request)# 2. 执行数据库操作(示例:查询 users 表)sql = text("select * from users limit 1")result = await db.execute(sql)user = result.mappings().first() # 转为字典格式print("查询到的用户数据:", user)return req
4、可直接在main.py封装个方法
# 创建工具函数:从请求对象中直接获取db会话
async def get_current_db(request: Request) -> AsyncSession:"""从请求对象中提取已注入的数据库会话"""return request.state.db
在任何路由函数中,直接调用 await get_current_db(Request) 即可获取数据库会话,例如:
@app.get("/products/")
async def read_products():db = await get_current_db(Request)# 执行查询:products = await db.query(Product).all()return {"message": "产品数据查询成功"}