三十七、【高级特性篇】定时任务:基于 APScheduler 实现测试计划的灵活调度

发布于:2025-07-26 ⋅ 阅读:(14) ⋅ 点赞:(0)

三十七、【高级特性篇】定时任务:基于 APScheduler 实现测试计划的灵活调度

    • 前言
      • 准备工作
      • 第一部分:后端实现 - `APScheduler` 集成与任务调度
        • 1. 安装 `django-apscheduler`
        • 2. 配置 `django-apscheduler`
        • 3. 数据库迁移
        • 4. 创建调度触发函数
        • 5. 启动 APScheduler 调度器
        • 6. 创建定时任务管理的 API
        • 7. 后端初步测试
      • 第二部分:前端实现 - 定时任务管理界面
        • 1. 创建 API 服务 (`src/api/scheduler.ts`)
        • 2. 添加定时任务路由和侧边栏入口
        • 3. 实现定时任务列表页面 (`src/views/system/ScheduledJobListView.vue`)
        • 4. 实现定时任务创建/编辑对话框 (`src/views/system/ScheduledJobEditView.vue`)
      • 第三部分:后端 `ScheduledJobSerializer` 增强 (以支持回显触发器配置)
      • 第四部分:全面测试
    • 总结

前言

定时任务是自动化测试平台的核心功能之一,它允许我们设置测试计划在预定的时间或周期自动执行,从而实现无人值守的自动化回归测试、持续集成/部署后的冒烟测试等场景。
在这里插入图片描述
为什么选择 APSchedulerdjango-apscheduler

  • APScheduler (Advanced Python Scheduler): 一个轻量级且功能强大的 Python 任务调度库。它支持多种触发器(cron 模式、interval 模式、date 模式),非常灵活。
  • django-apscheduler: APScheduler 与 Django 的良好集成。它将 APScheduler 的调度信息(任务配置、下次运行时间等)直接存储在 Django 的数据库中,可以通过 Django ORM 来管理和查询定时任务,也方便通过 Django Admin 或自定义界面进行配置。
  • 与 Celery 配合: 如果定时任务本身是一个耗时操作(如执行一个包含大量用例的测试计划),直接在 APScheduler 的调度线程中执行会阻塞调度器,导致其他定时任务无法准时触发,甚至出现问题。所以,让 APScheduler 的定时任务只做一件轻量级的事情——将一个真正的耗时任务(即我们之前创建的 Celery 异步执行任务 execute_test_plan_task)提交到 Celery 任务队列中。 这样,调度器的稳定性不受测试执行时间长短的影响,同时又能利用 Celery 的异步、分布式处理能力。

准备工作

  1. Django 后端项目就绪: 确保 test-platform/backend 项目结构完整,Celery 和 Redis 已配置并运行。
  2. Vue3 前端项目就绪。
  3. Axios 和 API 服务已封装。
  4. Element Plus 集成完毕。

第一部分:后端实现 - APScheduler 集成与任务调度

1. 安装 django-apscheduler

在你的 Django 项目的虚拟环境中运行:

pip install django-apscheduler

在这里插入图片描述

2. 配置 django-apscheduler

打开 test-platform/backend/settings.py

a. 添加到 INSTALLED_APPS
在这里插入图片描述

# test-platform/backend/settings.py
# ...
INSTALLED_APPS = [
    # ... 其他应用 ...
    'django_apscheduler', # 添加这一行
    # ...
]
# ...
# APScheduler 配置
SCHEDULER_CONFIG = {
   
   
    "apscheduler.jobstores": {
   
   
        "default": {
   
   
            "class": "django_apscheduler.jobstores:DjangoJobStore"
        }
    },
    "apscheduler.executors": {
   
   
        "default": {
   
   
            "class": "apscheduler.executors.pool:ThreadPoolExecutor",
            "max_workers": "20"
        }
    },
    "apscheduler.job_defaults": {
   
   
        "coalesce": False,
        "max_instances": 3
    },
    "apscheduler.timezone": TIME_ZONE
}

# django_apscheduler 配置
APSCHEDULER_DATETIME_FORMAT = "N j, Y, f:s a"  # 默认时间格式
APSCHEDULER_RUN_NOW_TIMEOUT = 25  # 秒
# --- djangorestframework-simplejwt 设置 ---

b. 添加 APSCHEDULER 相关配置:
在这里插入图片描述

# test-platform/backend/settings.py
# ...
APSCHEDULER_DATETIME_FORMAT = "YYYY-MM-DD HH:mm:ss" # 日期时间格式
APSCHEDULER_RUN_NOW_TIMEOUT = 25 # 立即运行任务的超时时间(秒)
3. 数据库迁移

运行 python manage.py migratedjango-apscheduler 会自动在数据库中创建管理调度任务所需的表。
在这里插入图片描述

4. 创建调度触发函数

这个函数是 APScheduler 将要调度的目标。它会接收测试计划ID,然后将真正的执行任务提交给 Celery。

a. api 目录下创建 scheduler_jobs.py 文件,填入以下代码:
在这里插入图片描述

# test-platform/api/scheduler_jobs.py
import logging
from django.utils import timezone
from api.models import TestPlan, TestRun # 导入 TestPlan 和 TestRun 模型
from api.tasks import execute_test_plan_task # 导入 Celery 任务
from api.utils.log_utils import record_operation_log # 导入操作日志工具

logger = logging.getLogger(__name__)

def trigger_test_plan_execution_job(test_plan_id: int):
    """
    APScheduler 定时任务触发函数。
    此函数仅负责将测试计划执行任务提交到 Celery 异步队列。
    """
    try:
        test_plan = TestPlan.objects.get(id=test_plan_id)
        
        # 1. 创建 TestRun 记录,初始状态为 PENDING
        current_time = timezone.now().strftime("%Y-%m-%d %H:%M:%S")
        initial_run_name = f"{
     
     test_plan.name} - (定时任务触发) {
     
     current_time}"
        test_run = TestRun.objects.create(
            test_plan=test_plan,
            name=initial_run_name,
            description=f"定时任务触发执行: {
     
     test_plan.name}",
            status='PENDING',
            total_cases=test_plan.test_cases.count() # 预估总数
        )

        # 2. 调用 Celery 任务异步执行
        task_result = execute_test_plan_task.delay(test_plan.id, str(test_run.id))
        
        # 3. 记录操作日志
        record_operation_log(
            user=None, # 由调度器触发,没有直接用户
            action_type='EXECUTE',
            target_resource='测试计划',
            target_id=test_plan.id,
            description=f"定时任务触发执行测试计划: '{
     
     test_plan.name}' (ID: {
     
     test_plan.id}), TestRun ID: {
     
     test_run.id}, Celery Task ID: {
     
     task_result.id}",
            details={
   
   "trigger_type": "scheduled", "test_run_id": str(test_run.id), "celery_task_id": task_result.id}
        )
        
        logger.info(f"定时任务成功提交测试计划 (ID: {
     
     test_plan.id}) 执行到 Celery. TestRun ID: {
     
     test_run.id}, Celery Task ID: {
     
     task_result.id}")

    except TestPlan.DoesNotExist:
        logger.error(f"APScheduler 任务执行失败: 测试计划 (ID: {
     
     test_plan_id}) 未找到。")
    except Exception as e:
        logger.error(f"APScheduler 任务执行过程中发生未知错误 for plan ID {
     
     test_plan_id}: {
     
     e}", exc_info=True)
5. 启动 APScheduler 调度器

django-apscheduler 提供了两种启动调度器的方式:
a. 在 Django 应用启动时自动启动 (不推荐用于生产环境):在 apps.py 中,但会导致开发服务器重载时重复启动,并可能在多进程部署时引发问题。
b. 作为独立的 Management Command 启动:这是更健壮和推荐的方式,非常适合生产环境。

我们采用第二种方式

a. 创建 Management Command 文件:
test-platform/api/management/commands/ 目录下创建 runapscheduler.py 文件,填入以下代码:
在这里插入图片描述

# test-platform/api/management/commands/runapscheduler.py
import logging

from django.conf import settings

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJobExecution
from django_apscheduler import util

from django.core.management.base import BaseCommand

# 导入你的 APScheduler 定时任务函数
from api.scheduler_jobs import trigger_test_plan_execution_job

logger = logging.getLogger(__name__)

# 定义清理旧作业执行记录的函数
@util.close_old_connections
def delete_old_job_executions(max_age=604_800):
    """
    删除超过指定时间(默认7天)的旧作业执行记录。
    这个函数本身也可以被 APScheduler 定时调度。
    """
    DjangoJobExecution.objects.delete_old_job_executions(max_age)

class Command(BaseCommand):
    help = "Runs APScheduler."

    def handle(self, *args, **options):
        # 创建一个后台调度器实例
        scheduler = BackgroundScheduler(timezone=settings.TIME_ZONE)
        
        # 将 DjangoJobStore 添加到调度器
        scheduler.add_jobstore(DjangoJobStore(), "default")

        # 添加一个定时清理旧作业执行记录的任务 (可选)
        # 这个任务本身由 APScheduler 调度,每12小时执行一次
        scheduler.add_job(
            delete_old_job_executions,
            trigger=IntervalTrigger(hours=12),
            id="delete_old_job_executions",  # 指定一个 ID,方便管理
            max_instances=1, # 确保只有一个实例在运行
            replace_existing=True, # 如果已有同ID任务,则替换
            # misfire_grace_time=3600, # 如果任务错过,延迟1小时内仍可执行
        )
        logger.info("Added job 'delete_old_job_executions'.")

        # 打印当前所有已注册的 APScheduler 作业
        try:
            logger.info("Starting scheduler...")
            scheduler.start()
        except KeyboardInterrupt:
            logger.info("Stopping scheduler...")
            scheduler.shutdown()
            logger.info("Scheduler shut down successfully!")
        except Exception as e:
            logger.error(f"Scheduler startup failed: {
     
     e}", exc_info=True)
            scheduler.shutdown()

b. 运行调度器:
打开一个新的终端窗口 (除了 Django 开发服务器和 Celery Worker 的终端),激活虚拟环境,然后在 test-platform 目录下运行:

python manage.py runapscheduler

如果调度器成功启动,你会看到日志输出,并提示添加了 delete_old_job_executions 任务。
在这里插入图片描述

6. 创建定时任务管理的 API

我们将为 django-apschedulerDjangoJob 模型提供 RESTful API,以便前端进行管理。

a. api/serializers.py 中添加 ScheduledJobSerializer
在这里插入图片描述

# test-platform/api/serializers.py
# ...
from django_apscheduler.models import DjangoJob, DjangoJobExecution # 导入 APScheduler 的模型

class ScheduledJobSerializer(serializers.ModelSerializer):
    test_plan_name = serializers.SerializerMethodField(read_only=True)
    job_type = serializers.CharField(source='job_func_name', read_only=True) # 方便前端显示函数名
    
    class Meta:
        model = DjangoJob
        fields = [
            'id', 'name', 'job_type', 'job_func_name', 'job_arguments', 
            'job_kwargs', 'job_state', 'next_run_time', 'start_date', 'end_date', 
            'max_instances', 'misfire_grace_time', 'coalesce', 'jobstore',
            'test_plan_name' # 关联的测试计划名称
        ]
        read_only_fields = ['job_type', 'job_func_name', 'job_state', 'next_run_time']
        extra_kwargs = {
   
   
            'job_arguments': {
   
   'required': False, 'allow_null': True}, # 允许 job_arguments 为空
            'job_kwargs': {
   
   'required': False, 'allow_null': True}, # 允许 job_kwargs 为空
            'job_state': {
   
   'required': False, 'read_only': True}
        }

    def get_test_plan_name(self, obj: DjangoJob):
        """从 job_arguments 中解析 test_plan_id 并获取其名称"""
        if obj.job_arguments:
            try:
                # job_arguments 是一个元组的 JSON 字符串,例如 '["1"]'
                args_list = json.loads(obj.job_arguments)
                if args_list and isinstance(args_list, list) and len(args_list) > 0:
                    test_plan_id = args_list[0]
                    if isinstance(test_plan_id, (int, str)):
                        try:
                            test_plan = TestPlan.objects.get(id=int(test_plan_id))
                            return test_plan.name
                        except TestPlan.DoesNotExist:
                            return f"未知计划 (ID: {
     
     test_plan_id})"
            except (json.JSONDecodeError, IndexError, ValueError):
                pass
        return "N/A"

    def create(self, validated_data: dict):
        # 在创建前,需要将 trigger_type 和 trigger_config 转换为 APScheduler 的 trigger 参数
        # 这部分逻辑通常在 ViewSet 的 create 方法中处理
        return super().create(validated_data)

    def update

网站公告

今日签到

点亮在社区的每一天
去签到