FastAPI集成APsecheduler的BackgroundScheduler+mongodb(精简)

项目架构:

    FastAPI(folder)

                 >app(folder)

                 >core(folder)

                 >models(folder)

                 >routers(folder)

                 >utils(folder)

                   main.py(file)

1 utils文件夹下新建schedulers.py   

from apscheduler.schedulers.background import BackgroundScheduler

from apscheduler.jobstores.mongodb import MongoDBJobStore

#特殊说明:此处显示指定DB,会在DB里创建数据库apscheduler_db

jobstores={

        'default':MongoDBJobStore(

              database='apscheduler_db',

              collection='custom_jobs',  

              host='localhost',

              port=27017

        )

}

#特殊说明:replace_existing=True会覆盖同名的JOB,但不影响数据库中的,仅处理job_id相同的冲突

scheduler=BackgroundScheduler(jobstores=jobstores,replace_existing=True)

2  main.py中在lifespan上下文初始化和关闭scheduler

import uvicorn

from contextlib import asynccontextmanager

from app.utils.schedulers import jobstores

scheduler=None

#特殊说明:yield中可以监控到正常结束比如ctrl+c,异常结束不能执行yield后代码

@asynccontextmanager

async def lifespan(app:FastAPI):

    jobstores['default'].remove_all_jobs()

    from app.utils.schedulers import scheduler

    yield

    scheduler.remove_all_jobs()

    scheduler.shutdown(wait=False)

app=FastAPI(lifespan=lifespan)

 3 models文件夹新建scheduler.py文件配置基础参数类和默认值

from pydantic import BaseModel

class job_config(BaseModel):

        job_id:str="default"

        job_name:str="default"

        trigger_type:str="interval"

        trigger_kwargs:dict={}

        seconds:int=30

        pass

4 api文件夹下添加sechedulers.py配置添加创建job方法

from app.utils.schedulers import scheduler

from app.models.schedulers import job_config

from fastapi import APIRouter

from typing import Coroutine,Callable

from datetime import datetime

@router.get("create_job")

def create_job(jobconfig,func):

    try:

        if jobconfig is None:

           jobconfig=job_config()

        scheduler.add_job(

            func,

            trigger=jobconfig.trigger_type,

            kwargs=jobconfig.trigger_kwargs,#特殊说明:这里可以添加自定义参数

            id=jobconfig.job_id,

            name=jobconfig.job_name,

            seconds=jobconfig.seconds

        )

       

        scheduler.start()

    except Exception as e:

        raise e

    except (KeyboardInterrupt, SystemExit):

        scheduler.shutdown()

 5 测试,调用test方法

@router.get("/function")

def function1():

    try:

        with open("D:\\demo.txt", "a") as file:

             print("写入文件"+ str(datetime.now()), file=file)

    except:

        pass

@router.get("/test")

def test():

    try:

        create_job(None,function1)

    except:

        pass

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/pingmian/82895.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

聊一聊接口测试中耗时请求如何合理安排?

目录 一、异步处理与轮询机制 轮询检查机制 二、 并行化测试执行 三、模拟与桩技术(Mock/Stub) 四、动态超时与重试策略 五、测试架构设计优化 分层测试策略 并行化执行 网络优化 六、测试用例分层管理 金字塔策略 七、 缓存与数据复用 响应…

深入详解DICOMweb:WADO与STOW-RS的技术解析与实现

🧑 博主简介:CSDN博客专家、CSDN平台优质创作者,高级开发工程师,数学专业,10年以上C/C, C#, Java等多种编程语言开发经验,拥有高级工程师证书;擅长C/C、C#等开发语言,熟悉Java常用开…

Splunk Validated Architecture (SVA):构建企业级可观测性与安全的基石

Splunk Validated Architecture (SVA) 是 Splunk 官方提供的一套经过严格测试、性能验证和最佳实践指导的参考架构蓝图。它并非单一固定方案,而是根据企业数据规模、性能需求、高可用性目标和合规要求,提供一系列可落地的部署模型。SVA 的核心价值在于为…

Armv7l或树莓派32位RPI 4B编译faiss

pip3 install faiss-cpu当然找不到预编译的包 手动下载 git clone https://github.com/facebookresearch/faiss.git cd faiss #能需要切换到特定版本标签,例如 v1.7.1,这个版本Cmake 3.18可以过,因为apt install安装的cmake只更新到这里&am…

C++之string的模拟实现

string 手写C字符串类类的基本结构与成员变量一、构造函数与析构函数二、赋值运算符重载三、迭代器支持四、内存管理与扩容机制五、字符串操作函数六、运算符重载总结 手写C字符串类 从零实现一个简易版std::string 类的基本结构与成员变量 namespace zzh { class string { …

修改Docker镜像源

配置文件位置: sudo vim /etc/docker/daemon.json Docker 或 containerd 的镜像加速器配置,旨在提高从 Docker Hub 拉取镜像的速度。 { "features": { "buildkit": true, "containerd-snapshotter": true }, …

服务器带宽线路的区别(GIA、CN2、BGP、CMI等)

服务器带宽线路的区别(GIA、CN2、BGP、CMI等) 一、BGP线路 1. 定义与技术特点 BGP(Border Gateway Protocol,边界网关协议)是一种用于不同自治系统(AS)之间交换路由信息的协议,属…

从0到1搭建AI绘画模型:Stable Diffusion微调全流程避坑指南

从0到1搭建AI绘画模型:Stable Diffusion微调全流程避坑指南 系统化学习人工智能网站(收藏):https://www.captainbed.cn/flu 文章目录 从0到1搭建AI绘画模型:Stable Diffusion微调全流程避坑指南摘要引言一、数据集构…

VSCode + GD32F407 构建烧录

前言 最近调试一块 GD32F407VET6(168Mhz,8Mhz晶振) 板子时,踩了一些“启动失败”的坑。本以为是时钟配置有误,最后发现是链接脚本(.ld 文件)没有配置好,导致程序根本没能正常执行 ma…

AI绘画提示词:从零开始掌握Prompt Engineering的艺术

文章目录 什么是AI绘画提示词?提示词的基本结构主体描述场景/背景风格指定技术参数负面提示人物肖像模板风景模板 高级技巧权重调整混合风格颜色控制情绪氛围 常见问题与解决方法手部变形问题构图不理想风格不够突出 提示词示例库科幻场景奇幻人物静物画 结语 在当今…

在 Linux 上安装 Minikube:轻松搭建本地 Kubernetes 单节点集群

🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、Minikube 是什么? Minikube 是 Kubernetes 官方推出的轻量级工具,专为开发者设计,用于在本地快速搭建单节点 Kube…

day41 python图像识别任务

目录 一、数据预处理:为模型打下坚实基础 二、模型构建:多层感知机的实现 三、训练过程:迭代优化与性能评估 四、测试结果:模型性能的最终检验 五、总结与展望 在深度学习的旅程中,多层感知机(MLP&…

JS数组 concat() 与扩展运算符的深度解析与最佳实践

文章目录 前言一、语法对比1. Array.prototype.concat()2. 扩展运算符(解构赋值) 二、性能差异(大规模数组)关键差异原因 三、适用场景建议总结 前言 最近工作中遇到了一个大规模数组合并相关的问题,在数据合并时有些…

一套qt c++的串口通信

实现了创建线程使用串口的功能 具备功能: 1.线程使用串口 2.定时发送队列内容,防止粘包 3.没处理接收粘包,根据你的需求来,handleReadyRead函数中,可以通过m_receiveBuffer来缓存接收,然后拆分数据来处理 源码 seri…

设计模式-发布订阅

文章目录 发布订阅概念发布订阅 vs 监听者例子代码 发布订阅概念 发布/订阅者模式最大的特点就是实现了松耦合,也就是说你可以让发布者发布消息、订阅者接受消息,而不是寻找一种方式把两个分离 的系统连接在一起。当然这种松耦合也是发布/订阅者模式最大…

windows-cmd 如何查询cpu、内存、磁盘的使用情况

在 Windows 中,您可以使用命令提示符(CMD)通过一些命令来查询 CPU、内存和磁盘的使用情况。以下是常用的命令和方法: 1. 查询 CPU 使用情况 使用 wmic 命令 wmic cpu get loadpercentage 这个命令会显示当前 CPU 的使用百分比…

allWebPlugin中间件VLC专用版之截图功能介绍

背景 VLC控件原有接口具有视频截图方法,即video对象的takeSnapshot方法,但是该方法返回的是一个IPicture对象,不适合在谷歌等现代浏览器上使用。因此,本人增加一个新的视频截图方法takeSnapshot2B64方法,直接将视频截图…

第Y5周:yolo.py文件解读

🍨 本文为🔗365天深度学习训练营 中的学习记录博客🍖 原作者:K同学啊 本次任务:将YOLOv5s网络模型中的C3模块按照下图方式修改形成C2模块,并将C2模块插入第2层与第3层之间,且跑通YOLOv5s。 任务…

宝塔安装ssh证书报错:/usr/bin/curl: symbol lookup error: curl_easy_header

原因: 你当前的 curl 命令版本是 7.70.0(不是系统默认版本,应该是你手动安装的)。它链接的是 /usr/local/lib/libcurl.so.4,而不是 CentOS 系统默认的 /usr/lib64/libcurl.so.4。/usr/local/lib/libcurl.so.4 很可能是…

Apache SeaTunnel 引擎深度解析:原理、技术与高效实践

Apache SeaTunnel 作为新一代高性能分布式数据集成平台,其核心引擎设计融合了现代大数据处理架构的精髓。 Apache SeaTunnel引擎通过分布式架构革新、精细化资源控制及企业级可靠性设计,显著提升了数据集成管道的执行效率与运维体验。其模块化设计允许用…