文章目录
- Python 常用定时任务框架简介
- 🧩 一、轻量级方案(适合简单任务)
- 1. **`schedule`库**
- ⚙️ 二、中级方案(平衡功能与复杂度)
- 2. **APScheduler**
- 3. **Celery + Celery Beat**
- 🚀 三、异步专用方案(高性能I/O密集型)
- 4. **AsyncIOScheduler(APScheduler子类)**
- 5. **aiocron**
- 🔍 四、框架横向对比
- ⚠️ 五、避坑指南
- 💎 总结
Python 常用定时任务框架简介
以下是对Python主流定时任务框架的详细对比及使用指南,我们可结合场景需求可快速定位合适方案:
🧩 一、轻量级方案(适合简单任务)
1. schedule
库
-
特点:语法直观,无需复杂配置,适合小型脚本或低频任务。
-
安装
uv add schedule
-
示例代码:
import schedule import timedef job():print(f"Task executed at {time.strftime('%Y-%m-%d %H:%M:%S')}")# 每10秒钟执行一次 schedule.every(10).seconds.do(job) # 每天16:00执行 schedule.every().day.at("16:00").do(job)while True:schedule.run_pending()time.sleep(1)
-
适用场景:单机脚本、低频任务(如每日报表生成)。
-
缺点:不支持持久化或动态任务管理。
⚙️ 二、中级方案(平衡功能与复杂度)
2. APScheduler
-
核心优势:
- 支持三种触发器:
interval
(固定间隔)、cron
(类Linux Cron语法)、date
(单次任务)。 - 可持久化任务到数据库(SQLite/MongoDB/Redis等)。
- 提供异步调度器(
AsyncIOScheduler
)兼容asyncio
。
- 支持三种触发器:
-
安装:
uv add apscheduler
-
示例代码:
from apscheduler.schedulers.background import BackgroundScheduler import timedef job():print(f"Task executed at {time.strftime('%Y-%m-%d %H:%M:%S')}")# 创建调度器 scheduler = BackgroundScheduler() # 每20秒执行(interval模式) scheduler.add_job(job, 'interval', seconds=20) # 每天16点08分执行(cron模式) scheduler.add_job(job, 'cron', hour=16, minute=8)scheduler.start() try:while True:time.sleep(2) except KeyboardInterrupt:scheduler.shutdown()
-
适用场景:需动态增删任务(如Web后台定时任务)、复杂调度逻辑(如每月最后一个工作日)。
3. Celery + Celery Beat
-
特点:分布式任务队列,天然支持异步和横向扩展,需搭配消息中间件(如Redis/RabbitMQ)。
-
安装:
uv add celery
本文使用Redis故需要安装上含有Redis访问库的版本,否则访问Redis会报错,使用如下语句:
uv add celery[redis]
-
示例代码:
# tasks.pyfrom celery_app import app@app.taskdef add(x: int, y: int) -> int:result = x + yprint("add task result ", result)return result@app.taskdef multiply(x: int, y: int) -> int:result = x * yprint("multiply task ", result)return result# celery_app.pyfrom celery import Celeryfrom celery.schedules import crontabapp = Celery('celery_app', broker='redis://localhost:6379/0')# 设置时区为东八区app.conf.timezone = 'Asia/Shanghai'app.conf.update(result_backend='redis://localhost:6379/0',beat_schedule={'add-every-10-seconds': {'task': 'tasks.add','schedule': 10.0, # run every 10 seconds'args': (10, 10)},'multiply-at-1715': {'task': 'tasks.multiply','schedule': crontab(hour='17', minute='15'),'args': (4, 5)}},include=['tasks'])
该方案执行起来相对复杂了些:
-
首先启动 Celery worker:
celery -A celery_app worker --loglevel=info
-
【可选】如需观察定时任务自动运行,可启动 Celery beat:
celery -A celery_app beat --loglevel=info
-
你也可以通过运行脚本手动触发任务
# main.py from tasks import add if __name__ == "__main__":result = add.delay(4, 5)print('Task result: ', result.get())
-
-
适用场景:分布式系统、高可用需求(如微服务集群定时任务)。
🚀 三、异步专用方案(高性能I/O密集型)
4. AsyncIOScheduler(APScheduler子类)
-
用途:在异步环境中调度协程任务。
-
示例:
from apscheduler.schedulers.asyncio import AsyncIOScheduler import asyncio import time import osdef tick():print(f"Tick! The time is: {time.strftime('%Y-%m-%d %H:%M:%S')}")async def main():scheduler = AsyncIOScheduler()scheduler.add_job(tick, 'interval', seconds=5)scheduler.start()print("Press Ctrl+{} to exit".format("Break" if os.name == "nt" else "C"))while True:await asyncio.sleep(1000)if __name__ == "__main__":# Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.try:asyncio.run(main())except (KeyboardInterrupt, SystemExit):pass
5. aiocron
-
特点:极简异步Cron调度器,语法类似Unix cron。
-
安装:
uv add aiocron
-
示例:
import asyncio import datetime import aiocron# 定义一个简单的异步函数 async def attime_job():"""这个协程会每分钟被调用一次。"""print(f"[{datetime.datetime.now()}] 你好,世界!这是一个每分钟执行的任务。")# 定义另一个需要参数的异步函数 async def attime_job_with_args(name):"""这个协程会在每小时的第5分钟被调用。"""print(f"[{datetime.datetime.now()}] 你好,{name}!这是一个带参数的任务。")await asyncio.sleep(1) # 模拟一些 I/O 操作print(f"[{datetime.datetime.now()}] {name} 的任务执行完毕。")if __name__ == '__main__':print("程序启动,等待定时任务执行... (按 Ctrl+C 退出)")# 1. 创建一个每分钟执行的 cron 任务 ('* * * * * *')# cron 格式: a b c d e# a: 分钟 (0-59)# b: 小时 (0-23)# c: 日 (1-31)# d: 月 (1-12)# e: 周几 (0-6, 0是周日)aiocron.crontab('* * * * *', func=attime_job)# 2. 创建一个在每小时的第 5 分钟执行的任务# 并向任务函数传递参数 'Python'aiocron.crontab('5 * * * *', func=attime_job_with_args, args=('Python',))# 创建并运行 asyncio 事件循环# loop.run_forever() 会一直运行,直到被停止loop = asyncio.get_event_loop()try:loop.run_forever()except KeyboardInterrupt:print("\n程序被用户中断。")finally:loop.close()print("事件循环已关闭。")
🔍 四、框架横向对比
框架 | 适用场景 | 异步支持 | 持久化 | 分布式 | 学习成本 |
---|---|---|---|---|---|
schedule | 小型脚本、低频任务 | ❌ | ❌ | ❌ | ⭐ |
APScheduler | 单机复杂调度、动态任务管理 | ✅(选配) | ✅ | ❌ | ⭐⭐ |
Celery + Beat | 分布式系统、高可用场景 | ✅ | ✅ | ✅ | ⭐⭐⭐ |
AsyncIOScheduler | 异步应用(如FastAPI/Starlette) | ✅ | ✅ | ❌ | ⭐⭐ |
aiocron | 极简异步定时任务 | ✅ | ❌ | ❌ | ⭐ |
💎 推荐策略:
- 快速验证或简单脚本 →
schedule
;- 单机复杂调度(如动态启停任务) → APScheduler;
- 分布式生产环境 → Celery;
- 异步应用(如FastAPI) → AsyncIOScheduler。
⚠️ 五、避坑指南
- 阻塞问题:
- 避免在同步调度器(如
BackgroundScheduler
)中调用耗时同步任务,否则会阻塞主线程。异步任务需用AsyncIOScheduler
。
- 避免在同步调度器(如
- 持久化配置:
- APScheduler若需重启后保留任务,务必配置
jobstore
(如SQLite)。
- APScheduler若需重启后保留任务,务必配置
- 时区陷阱:
- 所有调度器默认使用UTC时间,需显式设置时区(如
scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
)。
- 所有调度器默认使用UTC时间,需显式设置时区(如
💎 总结
根据场景复杂度与规模选择框架:轻量级选schedule
,单机灵活调度用APScheduler,分布式系统用Celery,异步生态选AsyncIOScheduler或aiocron。务必注意任务持久化和时区配置,避免生产环境故障。