三十七、【高级特性篇】定时任务:基于 APScheduler 实现测试计划的灵活调度
-
- 前言
-
- 准备工作
- 第一部分:后端实现 - `APScheduler` 集成与任务调度
-
- 1. 安装 `django-apscheduler`
- 2. 配置 `django-apscheduler`
- 3. 数据库迁移
- 4. 创建调度触发函数
- 5. 启动 APScheduler 调度器
- 6. 创建定时任务管理的 API
- 7. 后端初步测试
- 第二部分:前端实现 - 定时任务管理界面
-
- 1. 创建 API 服务 (`src/api/scheduler.ts`)
- 2. 添加定时任务路由和侧边栏入口
- 3. 实现定时任务列表页面 (`src/views/system/ScheduledJobListView.vue`)
- 4. 实现定时任务创建/编辑对话框 (`src/views/system/ScheduledJobEditView.vue`)
- 第三部分:后端 `ScheduledJobSerializer` 增强 (以支持回显触发器配置)
- 第四部分:全面测试
- 总结
前言
定时任务是自动化测试平台的核心功能之一,它允许我们设置测试计划在预定的时间或周期自动执行,从而实现无人值守的自动化回归测试、持续集成/部署后的冒烟测试等场景。
为什么选择 APScheduler
和 django-apscheduler
?
APScheduler
(Advanced Python Scheduler): 一个轻量级且功能强大的 Python 任务调度库。它支持多种触发器(cron
模式、interval
模式、date
模式),非常灵活。django-apscheduler
:APScheduler
与 Django 的良好集成。它将 APScheduler 的调度信息(任务配置、下次运行时间等)直接存储在 Django 的数据库中,可以通过 Django ORM 来管理和查询定时任务,也方便通过 Django Admin 或自定义界面进行配置。- 与 Celery 配合: 如果定时任务本身是一个耗时操作(如执行一个包含大量用例的测试计划),直接在 APScheduler 的调度线程中执行会阻塞调度器,导致其他定时任务无法准时触发,甚至出现问题。所以,让 APScheduler 的定时任务只做一件轻量级的事情——将一个真正的耗时任务(即我们之前创建的 Celery 异步执行任务
execute_test_plan_task
)提交到 Celery 任务队列中。 这样,调度器的稳定性不受测试执行时间长短的影响,同时又能利用 Celery 的异步、分布式处理能力。
准备工作
- Django 后端项目就绪: 确保
test-platform/backend
项目结构完整,Celery 和 Redis 已配置并运行。 - Vue3 前端项目就绪。
- Axios 和 API 服务已封装。
- Element Plus 集成完毕。
第一部分:后端实现 - APScheduler
集成与任务调度
1. 安装 django-apscheduler
在你的 Django 项目的虚拟环境中运行:
pip install django-apscheduler
2. 配置 django-apscheduler
打开 test-platform/backend/settings.py
:
a. 添加到 INSTALLED_APPS
:
# test-platform/backend/settings.py
# ...
INSTALLED_APPS = [
# ... 其他应用 ...
'django_apscheduler', # 添加这一行
# ...
]
# ...
# APScheduler 配置
SCHEDULER_CONFIG = {
"apscheduler.jobstores": {
"default": {
"class": "django_apscheduler.jobstores:DjangoJobStore"
}
},
"apscheduler.executors": {
"default": {
"class": "apscheduler.executors.pool:ThreadPoolExecutor",
"max_workers": "20"
}
},
"apscheduler.job_defaults": {
"coalesce": False,
"max_instances": 3
},
"apscheduler.timezone": TIME_ZONE
}
# django_apscheduler 配置
APSCHEDULER_DATETIME_FORMAT = "N j, Y, f:s a" # 默认时间格式
APSCHEDULER_RUN_NOW_TIMEOUT = 25 # 秒
# --- djangorestframework-simplejwt 设置 ---
b. 添加 APSCHEDULER
相关配置:
# test-platform/backend/settings.py
# ...
APSCHEDULER_DATETIME_FORMAT = "YYYY-MM-DD HH:mm:ss" # 日期时间格式
APSCHEDULER_RUN_NOW_TIMEOUT = 25 # 立即运行任务的超时时间(秒)
3. 数据库迁移
运行 python manage.py migrate
,django-apscheduler
会自动在数据库中创建管理调度任务所需的表。
4. 创建调度触发函数
这个函数是 APScheduler
将要调度的目标。它会接收测试计划ID,然后将真正的执行任务提交给 Celery。
a. 在 api
目录下创建 scheduler_jobs.py
文件,填入以下代码:
# test-platform/api/scheduler_jobs.py
import logging
from django.utils import timezone
from api.models import TestPlan, TestRun # 导入 TestPlan 和 TestRun 模型
from api.tasks import execute_test_plan_task # 导入 Celery 任务
from api.utils.log_utils import record_operation_log # 导入操作日志工具
logger = logging.getLogger(__name__)
def trigger_test_plan_execution_job(test_plan_id: int):
"""
APScheduler 定时任务触发函数。
此函数仅负责将测试计划执行任务提交到 Celery 异步队列。
"""
try:
test_plan = TestPlan.objects.get(id=test_plan_id)
# 1. 创建 TestRun 记录,初始状态为 PENDING
current_time = timezone.now().strftime("%Y-%m-%d %H:%M:%S")
initial_run_name = f"{
test_plan.name} - (定时任务触发) {
current_time}"
test_run = TestRun.objects.create(
test_plan=test_plan,
name=initial_run_name,
description=f"定时任务触发执行: {
test_plan.name}",
status='PENDING',
total_cases=test_plan.test_cases.count() # 预估总数
)
# 2. 调用 Celery 任务异步执行
task_result = execute_test_plan_task.delay(test_plan.id, str(test_run.id))
# 3. 记录操作日志
record_operation_log(
user=None, # 由调度器触发,没有直接用户
action_type='EXECUTE',
target_resource='测试计划',
target_id=test_plan.id,
description=f"定时任务触发执行测试计划: '{
test_plan.name}' (ID: {
test_plan.id}), TestRun ID: {
test_run.id}, Celery Task ID: {
task_result.id}",
details={
"trigger_type": "scheduled", "test_run_id": str(test_run.id), "celery_task_id": task_result.id}
)
logger.info(f"定时任务成功提交测试计划 (ID: {
test_plan.id}) 执行到 Celery. TestRun ID: {
test_run.id}, Celery Task ID: {
task_result.id}")
except TestPlan.DoesNotExist:
logger.error(f"APScheduler 任务执行失败: 测试计划 (ID: {
test_plan_id}) 未找到。")
except Exception as e:
logger.error(f"APScheduler 任务执行过程中发生未知错误 for plan ID {
test_plan_id}: {
e}", exc_info=True)
5. 启动 APScheduler 调度器
django-apscheduler
提供了两种启动调度器的方式:
a. 在 Django 应用启动时自动启动 (不推荐用于生产环境):在 apps.py
中,但会导致开发服务器重载时重复启动,并可能在多进程部署时引发问题。
b. 作为独立的 Management Command 启动:这是更健壮和推荐的方式,非常适合生产环境。
我们采用第二种方式。
a. 创建 Management Command 文件:
在 test-platform/api/management/commands/
目录下创建 runapscheduler.py
文件,填入以下代码:
# test-platform/api/management/commands/runapscheduler.py
import logging
from django.conf import settings
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJobExecution
from django_apscheduler import util
from django.core.management.base import BaseCommand
# 导入你的 APScheduler 定时任务函数
from api.scheduler_jobs import trigger_test_plan_execution_job
logger = logging.getLogger(__name__)
# 定义清理旧作业执行记录的函数
@util.close_old_connections
def delete_old_job_executions(max_age=604_800):
"""
删除超过指定时间(默认7天)的旧作业执行记录。
这个函数本身也可以被 APScheduler 定时调度。
"""
DjangoJobExecution.objects.delete_old_job_executions(max_age)
class Command(BaseCommand):
help = "Runs APScheduler."
def handle(self, *args, **options):
# 创建一个后台调度器实例
scheduler = BackgroundScheduler(timezone=settings.TIME_ZONE)
# 将 DjangoJobStore 添加到调度器
scheduler.add_jobstore(DjangoJobStore(), "default")
# 添加一个定时清理旧作业执行记录的任务 (可选)
# 这个任务本身由 APScheduler 调度,每12小时执行一次
scheduler.add_job(
delete_old_job_executions,
trigger=IntervalTrigger(hours=12),
id="delete_old_job_executions", # 指定一个 ID,方便管理
max_instances=1, # 确保只有一个实例在运行
replace_existing=True, # 如果已有同ID任务,则替换
# misfire_grace_time=3600, # 如果任务错过,延迟1小时内仍可执行
)
logger.info("Added job 'delete_old_job_executions'.")
# 打印当前所有已注册的 APScheduler 作业
try:
logger.info("Starting scheduler...")
scheduler.start()
except KeyboardInterrupt:
logger.info("Stopping scheduler...")
scheduler.shutdown()
logger.info("Scheduler shut down successfully!")
except Exception as e:
logger.error(f"Scheduler startup failed: {
e}", exc_info=True)
scheduler.shutdown()
b. 运行调度器:
打开一个新的终端窗口 (除了 Django 开发服务器和 Celery Worker 的终端),激活虚拟环境,然后在 test-platform
目录下运行:
python manage.py runapscheduler
如果调度器成功启动,你会看到日志输出,并提示添加了 delete_old_job_executions
任务。
6. 创建定时任务管理的 API
我们将为 django-apscheduler
的 DjangoJob
模型提供 RESTful API,以便前端进行管理。
a. 在 api/serializers.py
中添加 ScheduledJobSerializer
:
# test-platform/api/serializers.py
# ...
from django_apscheduler.models import DjangoJob, DjangoJobExecution # 导入 APScheduler 的模型
class ScheduledJobSerializer(serializers.ModelSerializer):
test_plan_name = serializers.SerializerMethodField(read_only=True)
job_type = serializers.CharField(source='job_func_name', read_only=True) # 方便前端显示函数名
class Meta:
model = DjangoJob
fields = [
'id', 'name', 'job_type', 'job_func_name', 'job_arguments',
'job_kwargs', 'job_state', 'next_run_time', 'start_date', 'end_date',
'max_instances', 'misfire_grace_time', 'coalesce', 'jobstore',
'test_plan_name' # 关联的测试计划名称
]
read_only_fields = ['job_type', 'job_func_name', 'job_state', 'next_run_time']
extra_kwargs = {
'job_arguments': {
'required': False, 'allow_null': True}, # 允许 job_arguments 为空
'job_kwargs': {
'required': False, 'allow_null': True}, # 允许 job_kwargs 为空
'job_state': {
'required': False, 'read_only': True}
}
def get_test_plan_name(self, obj: DjangoJob):
"""从 job_arguments 中解析 test_plan_id 并获取其名称"""
if obj.job_arguments:
try:
# job_arguments 是一个元组的 JSON 字符串,例如 '["1"]'
args_list = json.loads(obj.job_arguments)
if args_list and isinstance(args_list, list) and len(args_list) > 0:
test_plan_id = args_list[0]
if isinstance(test_plan_id, (int, str)):
try:
test_plan = TestPlan.objects.get(id=int(test_plan_id))
return test_plan.name
except TestPlan.DoesNotExist:
return f"未知计划 (ID: {
test_plan_id})"
except (json.JSONDecodeError, IndexError, ValueError):
pass
return "N/A"
def create(self, validated_data: dict):
# 在创建前,需要将 trigger_type 和 trigger_config 转换为 APScheduler 的 trigger 参数
# 这部分逻辑通常在 ViewSet 的 create 方法中处理
return super().create(validated_data)
def update