三十七、【高级特性篇】定时任务:基于 APScheduler 实现测试计划的灵活调度
-
- 前言
-
- 准备工作
- 第一部分:后端实现 - `APScheduler` 集成与任务调度
-
- 1. 安装 `django-apscheduler`
- 2. 配置 `django-apscheduler`
- 3. 数据库迁移
- 4. 创建调度触发函数
- 5. 启动 APScheduler 调度器
- 6. 创建定时任务管理的 API
- 7. 后端初步测试
- 第二部分:前端实现 - 定时任务管理界面
-
- 1. 创建 API 服务 (`src/api/scheduler.ts`)
- 2. 添加定时任务路由和侧边栏入口
- 3. 实现定时任务列表页面 (`src/views/system/ScheduledJobListView.vue`)
- 4. 实现定时任务创建/编辑对话框 (`src/views/system/ScheduledJobEditView.vue`)
- 第三部分:后端 `ScheduledJobSerializer` 增强 (以支持回显触发器配置)
- 第四部分:全面测试
- 总结
前言
定时任务是自动化测试平台的核心功能之一,它允许我们设置测试计划在预定的时间或周期自动执行,从而实现无人值守的自动化回归测试、持续集成/部署后的冒烟测试等场景。
为什么选择 APScheduler
和 django-apscheduler
?
APScheduler
(Advanced Python Scheduler): 一个轻量级且功能强大的 Python 任务调度库。它支持多种触发器(cron
模式、interval
模式、date
模式),非常灵活。django-apscheduler
:APScheduler
与 Django 的良好集成。它将 APScheduler 的调度信息(任务配置、下次运行时间等)直接存储在 Django 的数据库中,可以通过 Django ORM 来管理和查询定时任务,也方便通过 Django Admin 或自定义界面进行配置。- 与 Celery 配合: 如果定时任务本身是一个耗时操作(如执行一个包含大量用例的测试计划),直接在 APScheduler 的调度线程中执行会阻塞调度器,导致其他定时任务无法准时触发,甚至出现问题。所以,让 APScheduler 的定时任务只做一件轻量级的事情——将一个真正的耗时任务(即我们之前创建的 Celery 异步执行任务
execute_test_plan_task
)提交到 Celery 任务队列中。 这样,调度器的稳定性不受测试执行时间长短的影响,同时又能利用 Celery 的异步、分布式处理能力。
准备工作
- Django 后端项目就绪: 确保
test-platform/backend
项目结构完整,Celery 和 Redis 已配置并运行。 - Vue3 前端项目就绪。
- Axios 和 API 服务已封装。
- Element Plus 集成完毕。
第一部分:后端实现 - APScheduler
集成与任务调度
1. 安装 django-apscheduler
在你的 Django 项目的虚拟环境中运行:
pip install django-apscheduler
2. 配置 django-apscheduler
打开 test-platform/backend/settings.py
:
a. 添加到 INSTALLED_APPS
:
# test-platform/backend/settings.py
# ...
INSTALLED_APPS = [# ... 其他应用 ...'django_apscheduler', # 添加这一行# ...
]
# ...
# APScheduler 配置
SCHEDULER_CONFIG = {"apscheduler.jobstores": {"default": {"class": "django_apscheduler.jobstores:DjangoJobStore"}},"apscheduler.executors": {"default": {"class": "apscheduler.executors.pool:ThreadPoolExecutor","max_workers": "20"}},"apscheduler.job_defaults": {"coalesce": False,"max_instances": 3},"apscheduler.timezone": TIME_ZONE
}# django_apscheduler 配置
APSCHEDULER_DATETIME_FORMAT = "N j, Y, f:s a" # 默认时间格式
APSCHEDULER_RUN_NOW_TIMEOUT = 25 # 秒
# --- djangorestframework-simplejwt 设置 ---
b. 添加 APSCHEDULER
相关配置:
# test-platform/backend/settings.py
# ...
APSCHEDULER_DATETIME_FORMAT = "YYYY-MM-DD HH:mm:ss" # 日期时间格式
APSCHEDULER_RUN_NOW_TIMEOUT = 25 # 立即运行任务的超时时间(秒)
3. 数据库迁移
运行 python manage.py migrate
,django-apscheduler
会自动在数据库中创建管理调度任务所需的表。
4. 创建调度触发函数
这个函数是 APScheduler
将要调度的目标。它会接收测试计划ID,然后将真正的执行任务提交给 Celery。
a. 在 api
目录下创建 scheduler_jobs.py
文件,填入以下代码:
# test-platform/api/scheduler_jobs.py
import logging
from django.utils import timezone
from api.models import TestPlan, TestRun # 导入 TestPlan 和 TestRun 模型
from api.tasks import execute_test_plan_task # 导入 Celery 任务
from api.utils.log_utils import record_operation_log # 导入操作日志工具logger = logging.getLogger(__name__)def trigger_test_plan_execution_job(test_plan_id: int):"""APScheduler 定时任务触发函数。此函数仅负责将测试计划执行任务提交到 Celery 异步队列。"""try:test_plan = TestPlan.objects.get(id=test_plan_id)# 1. 创建 TestRun 记录,初始状态为 PENDINGcurrent_time = timezone.now().strftime("%Y-%m-%d %H:%M:%S")initial_run_name = f"{test_plan.name} - (定时任务触发) {current_time}"test_run = TestRun.objects.create(test_plan=test_plan,name=initial_run_name,description=f"定时任务触发执行: {test_plan.name}",status='PENDING',total_cases=test_plan.test_cases.count() # 预估总数)# 2. 调用 Celery 任务异步执行task_result = execute_test_plan_task.delay(test_plan.id, str(test_run.id))# 3. 记录操作日志record_operation_log(user=None, # 由调度器触发,没有直接用户action_type='EXECUTE',target_resource='测试计划',target_id=test_plan.id,description=f"定时任务触发执行测试计划: '{test_plan.name}' (ID: {test_plan.id}), TestRun ID: {test_run.id}, Celery Task ID: {task_result.id}",details={"trigger_type": "scheduled", "test_run_id": str(test_run.id), "celery_task_id": task_result.id})logger.info(f"定时任务成功提交测试计划 (ID: {test_plan.id}) 执行到 Celery. TestRun ID: {test_run.id}, Celery Task ID: {task_result.id}")except TestPlan.DoesNotExist:logger.error(f"APScheduler 任务执行失败: 测试计划 (ID: {test_plan_id}) 未找到。")except Exception as e:logger.error(f"APScheduler 任务执行过程中发生未知错误 for plan ID {test_plan_id}: {e}", exc_info=True)
5. 启动 APScheduler 调度器
django-apscheduler
提供了两种启动调度器的方式:
a. 在 Django 应用启动时自动启动 (不推荐用于生产环境):在 apps.py
中,但会导致开发服务器重载时重复启动,并可能在多进程部署时引发问题。
b. 作为独立的 Management Command 启动:这是更健壮和推荐的方式,非常适合生产环境。
我们采用第二种方式。
a. 创建 Management Command 文件:
在 test-platform/api/management/commands/
目录下创建 runapscheduler.py
文件,填入以下代码:
# test-platform/api/management/commands/runapscheduler.py
import loggingfrom django.conf import settingsfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJobExecution
from django_apscheduler import utilfrom django.core.management.base import BaseCommand# 导入你的 APScheduler 定时任务函数
from api.scheduler_jobs import trigger_test_plan_execution_joblogger = logging.getLogger(__name__)# 定义清理旧作业执行记录的函数
@util.close_old_connections
def delete_old_job_executions(max_age=604_800):"""删除超过指定时间(默认7天)的旧作业执行记录。这个函数本身也可以被 APScheduler 定时调度。"""DjangoJobExecution.objects.delete_old_job_executions(max_age)class Command(BaseCommand):help = "Runs APScheduler."def handle(self, *args, **options):# 创建一个后台调度器实例scheduler = BackgroundScheduler(timezone=settings.TIME_ZONE)# 将 DjangoJobStore 添加到调度器scheduler.add_jobstore(DjangoJobStore(), "default")# 添加一个定时清理旧作业执行记录的任务 (可选)# 这个任务本身由 APScheduler 调度,每12小时执行一次scheduler.add_job(delete_old_job_executions,trigger=IntervalTrigger(hours=12),id="delete_old_job_executions", # 指定一个 ID,方便管理max_instances=1, # 确保只有一个实例在运行replace_existing=True, # 如果已有同ID任务,则替换# misfire_grace_time=3600, # 如果任务错过,延迟1小时内仍可执行)logger.info("Added job 'delete_old_job_executions'.")# 打印当前所有已注册的 APScheduler 作业try:logger.info("Starting scheduler...")scheduler.start()except KeyboardInterrupt:logger.info("Stopping scheduler...")scheduler.shutdown()logger.info("Scheduler shut down successfully!")except Exception as e:logger.error(f"Scheduler startup failed: {e}", exc_info=True)scheduler.shutdown()
b. 运行调度器:
打开一个新的终端窗口 (除了 Django 开发服务器和 Celery Worker 的终端),激活虚拟环境,然后在 test-platform
目录下运行:
python manage.py runapscheduler
如果调度器成功启动,你会看到日志输出,并提示添加了 delete_old_job_executions
任务。
6. 创建定时任务管理的 API
我们将为 django-apscheduler
的 DjangoJob
模型提供 RESTful API,以便前端进行管理。
a. 在 api/serializers.py
中添加 ScheduledJobSerializer
:
# test-platform/api/serializers.py
# ...
from django_apscheduler.models import DjangoJob, DjangoJobExecution # 导入 APScheduler 的模型class ScheduledJobSerializer(serializers.ModelSerializer):test_plan_name = serializers.SerializerMethodField(read_only=True)job_type = serializers.CharField(source='job_func_name', read_only=True) # 方便前端显示函数名class Meta:model = DjangoJobfields = ['id', 'name', 'job_type', 'job_func_name', 'job_arguments', 'job_kwargs', 'job_state', 'next_run_time', 'start_date', 'end_date', 'max_instances', 'misfire_grace_time', 'coalesce', 'jobstore','test_plan_name' # 关联的测试计划名称]read_only_fields = ['job_type', 'job_func_name', 'job_state', 'next_run_time']extra_kwargs = {'job_arguments': {'required': False, 'allow_null': True}, # 允许 job_arguments 为空'job_kwargs': {'required': False, 'allow_null': True}, # 允许 job_kwargs 为空'job_state': {'required': False, 'read_only': True}}def get_test_plan_name(self, obj: DjangoJob):"""从 job_arguments 中解析 test_plan_id 并获取其名称"""if obj.job_arguments:try:# job_arguments 是一个元组的 JSON 字符串,例如 '["1"]'args_list = json.loads(obj.job_arguments)if args_list and isinstance(args_list, list) and len(args_list) > 0:test_plan_id = args_list[0]if isinstance(test_plan_id, (int, str)):try:test_plan = TestPlan.objects.get(id=int(test_plan_id))return test_plan.nameexcept TestPlan.DoesNotExist:return f"未知计划 (ID: {test_plan_id})"except (json.JSONDecodeError, IndexError, ValueError):passreturn "N/A"def create(self, validated_data: dict):# 在创建前,需要将 trigger_type 和 trigger_config 转换为 APScheduler 的 trigger 参数# 这部分逻辑通常在 ViewSet 的 create 方法中处理return super().create(validated_data)def update