一、Django 中使用 django-apscheduler 实现定时任务
可以方便地管理周期性任务(如每天清理缓存、定时发送邮件等)
1. 安装
pip install django-apscheduler -i https://pypi.tuna.tsinghua.edu.cn/simple #0.7.0
2.添加到应用,python manage.py migrate
# settings.py
INSTALLED_APPS = [
# ...
'django_apscheduler',
]
python manage.py migrate
执行后,mysql中,多了两个django_apscheduler开头的表。
3. 一个django应用中创建定时任务
可以是一个utils包下,创建一个叫task.py的,我是直接在相关应用下建了一个task.py
也有人也在view.py中, 还有人习惯专门改个app来实现定时任务。
# tasks.py
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore
import uuid
# 初始化调度器
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), "default")
# --- 定义定时任务函数 ---
def cleanup_temp_data():
print("清理临时数据...")
def send_daily_report():
print("发送日报...")
def generate_job_id( jobname):
return f"{jobname}_{uuid.uuid4().hex[:6]}"
# --- 将任务添加到调度器 ---
scheduler.add_job(
cleanup_temp_data,
"interval", # 间隔性任务
seconds=10, # 每隔 60 秒执行一次
# days=1, # 每天执行一次
id=generate_job_id("cleanup_job"),
# id="cleanup_job",
# replace_existing=True, # 允许覆盖同名任务 (即数据库已有id为 cleanup_job,不添加这个就会报错)
)
scheduler.add_job(
send_daily_report,
"cron", # 定时任务(类似 crontab)
hour=9, # 每天 9 点执行
minute=10, # 加了这行就表示,每天9点 10 分钟
id="report_job",
replace_existing=True, # 允许覆盖同名任务
)
# 注意:这里暂时不启动调度器!在 apps.py 中启动。
# 启动调度器 (这里也启动不了呢)
# scheduler.start()
方式了,在方法上写装饰器@register_job(scheduler,‘interval’,seconds=20,id=‘clenauo_job2’),就不用下方的 scheduler.add_job()部分了
启动项目时的定时任务
4.配置启动
一般定义任务启动是配置在应用中app.py中的,表示启动项目时,来启动定义是任务。
from django.apps import AppConfig
import filelock
class HrunnerConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "hrunner"
# 在 Django 开发环境中,AppConfig.ready() 方法被执行两次是.正常现象,但会导致定时任务重复注册
# 生产环境下是没问题的。
def ready(self):
# 仅在应用加载完成后初始化调度器
if not hasattr(self, 'scheduler_started'):
from .task import scheduler # 导入当前应用的调度器
# 启动调度器
scheduler.start()
self.scheduler_started = True
5.启动观察第三步代码
python managy runserver 因为开发环境,会导致执行了两次ready。有点点问题。
开发环境下,解决办法。
先清空两个表,然后这里就这样写,但是启动时会报。(原因就是ready执行了两次,第二次报错,故意让第二次没有加成功。。就只有一个任务)
生成环境下。还没尝试
好像,必须要 replace_existing,不然测试环境时,项目启动不起来。
定时任务方式,有两种
后续定时任务
接口添加任务。
定时任务应用(没行)
感觉就把定时任务写死,启动时执行就好
django+ celery异步任务配置
1. 安装celery 和redis
pip install celery redis -i https://pypi.tuna.tsinghua.edu.cn/simple
2. 需要有一个redis服务
3. django项目结构
假设项目名为 myproject,应用名为 myapp:
myproject/
├── myproject/
│ ├── init.py
│ ├── celery.py # 新增 Celery 配置文件
│ ├── settings.py
│ └── urls.py
└── myapp/
├── tasks.py # 存放异步任务
└── …
4. 配置celery
创建 myproject/celery.py
# myproject/celery.py
import os
from celery import Celery
# 设置 Django 环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# 创建 Celery 实例
app = Celery('myproject')
# 从 Django 配置中读取 Celery 设置(以 CELERY_ 开头的配置)
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现所有 Django 应用中的 tasks.py 文件
app.autodiscover_tasks()
修改 myproject/init.py
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ['celery_app'] # 确保 Celery 应用被正确加载
在settings.py中添加配置
# myproject/settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0' # 消息代理用 Redis
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 任务结果存储
CELERY_TIMEZONE = 'Asia/Shanghai' # 时区
# 注: TIME_ZONE = 'Asia/Shanghai' # 要和这个时区一致
# 启动celery worker
# celery -A myproject worker --loglevel=info
# windows电脑还要添加这个,不然会有点问题
CELERY_WORKER_POOL = 'solo'
# 启动celery worker
# celery -A your_project worker --loglevel=info -P solo
5. 编写异步任务
在应用 myapp 中创建 tasks.py
# myapp/tasks.py
from celery import shared_task
@shared_task
def send_welcome_email(email):
print(f"模拟发送邮件到 {email}...")
# 这里可以写实际发邮件的代码(如调用 Django 的 send_mail)
return f"邮件已发送至 {email}"
6. 配置异步任务调用
在视图或任何地方调用
# myapp/views.py
from django.http import JsonResponse
from .tasks import send_welcome_email
def register(request):
email = "user@example.com"
# 异步调用任务(.delay() 是快捷方法)
send_welcome_email.delay(email)
return JsonResponse({"status": "邮件发送中..."})
7.启动服务
a.启动celery worker (注意换成你的项目名)
celery -A myproject worker --loglevel=info
# windows电脑用下面的,并且settings中还要加那个solo
celery -A your_project worker --loglevel=info -P solo
b.启动django服务器
python manage.py runserver
8. 测试
访问注册接口后,观察 Celery Worker 的日志输出:
9. 原理理解
异步调用playwright操控浏览器。
任务方法
from celery import shared_task
from playwright.sync_api import sync_playwright
from django.core.cache import cache
@shared_task
def execute_browser_operation( debugger_url, actions):
"""
:param debugger_url: 远程浏览器调试地址(如 http://172.16.1.4:9222)
:param actions: 操作步骤列表(JSON序列化)
"""
try:
with sync_playwright() as p:
# 连接远程浏览器
browser = p.chromium.connect_over_cdp(debugger_url)
page = browser.contexts[0].pages[0]
# 操作浏览器(示例:打开百度)
# page.goto("https://www.baidu.com")
# page.fill("#kw", "自动化测试")
"""action 的结构。
[
{"type": "navigate", "url": "https://www.baidu.com"},
{"type": "fill", "selector": "#kw", "text": "刘亦菲"},
{"type": "click", "selector": "#su"},
{"type": "select_dropdown", "selector": "#country-select", "value": "china"}
]
"""
# 执行操作步骤
result = {'steps': []}
for action in actions:
if action['type'] == 'navigate':
page.goto(action['url'])
result['steps'].append(f"导航至 {action['url']}")
elif action['type'] == 'fill':
page.fill(action['selector'], action['text'])
result['steps'].append(f"填写 {action['selector']}")
elif action['type'] == 'click':
page.click(action['selector'])
elif action['type'] == 'select_dropdown':
page.select_option(action['selector'], action['value'])
# 添加更多操作类型...
# 保存截图到媒体目录
# screenshot_path = f"media/screenshots/{self.request.id}.png"
# page.screenshot(path=screenshot_path)
return {
'status': 'success',
# 'screenshot': screenshot_path,
'details': result
}
except Exception as e:
return {'status': 'error', 'message': str(e)}
finally:
browser.close()
views
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import permissions
from .celery_tasks import execute_browser_operation
class BrowserControlView(APIView):
def post(self, request):
# 验证参数
required_fields = ['debugger_url', 'actions']
if not all(field in request.data for field in required_fields):
return Response({'error': '缺少必要参数'}, status=400)
# 提交异步任务
task = execute_browser_operation.delay(
debugger_url=request.data['debugger_url'],
actions=request.data['actions']
)
return Response({
'task_id': task.id,
'status_endpoint': f'/api/task-status/{task.id}/'
})
# 任务状态查询接口
# myapp/views.py
from celery.result import AsyncResult
class TaskStatusView(APIView):
def get(self, request, task_id):
task = AsyncResult(task_id)
return Response({
'task_id': task_id,
'status': task.status,
'result': task.result if task.ready() else None
})
url.py
path('control/', views.BrowserControlView.as_view(), name='browser-control'),
path('task-status/<str:task_id>/', views.TaskStatusView.as_view(), name='task-status'),
效果
调接口,可以执行本地浏览器
执行端开放端口方式:
开放成功后,在打开的浏览器访问
Chrome:
访问 http://0.0.0.0:9222/json/version,若返回浏览器信息则成功
http://127.0.0.1:9222/json/version 这样是不对的 (0.0.0.0)才能是ip+端口访问
可能需要
# 将本地9222端口暴露到所有网络接口(需管理员权限)
netsh interface portproxy add v4tov4 listenaddress=0.0.0.0 listenport=9222 connectaddress=127.0.0.1 connectport=9222
#验证
netsh interface portproxy show all
#删除
netsh interface portproxy delete v4tov4 listenaddress=0.0.0.0 listenport=9222
# 启动浏览器
cd "C:\Program Files\Google\Chrome\Application"
chrome.exe --remote-debugging-port=9222 --remote-debugging-address=0.0.0.0 --user-data-dir="C:\playwright_debug"
# 验证 需要看到 (tcp 0.0.0.0:9222别人才能通过ip访问到你)
netstat -ano | findstr :9222
开端口
# 查看有没有
netsh advfirewall firewall show rule name=all | findstr "9222"
# 给一个
netsh advfirewall firewall add rule name="Playwright_9222" dir=in action=allow protocol=TCP localport=9222
# 删除
netsh advfirewall firewall delete rule name="Playwright_9222"
bat文件(参考-未成熟1)
@echo off
setlocal enabledelayedexpansion
:: 检查管理员权限
net session >nul 2>&1
if %errorlevel% neq 0 (
echo 请右键点击此脚本,选择"以管理员身份运行"
pause
exit /b 1
)
:: 1. 设置防火墙规则
echo 正在配置防火墙规则...
netsh advfirewall firewall show rule name="Playwright_9222" >nul 2>&1
if !errorlevel! equ 0 (
echo [跳过] 防火墙规则 Playwright_9222 已存在
) else (
netsh advfirewall firewall add rule name="Playwright_9222" dir=in action=allow protocol=TCP localport=9222
if !errorlevel! neq 0 (
echo [错误] 无法创建防火墙规则
exit /b 1
)
echo [成功] 防火墙规则已添加
)
:: 2. 配置端口转发
echo 正在设置端口代理...
netsh interface portproxy show v4tov4 | findstr "0.0.0.0:9222" >nul
if !errorlevel! equ 0 (
echo [跳过] 端口代理规则已存在
) else (
netsh interface portproxy add v4tov4 listenaddress=0.0.0.0 listenport=9222 connectaddress=127.0.0.1 connectport=9222
if !errorlevel! neq 0 (
echo [错误] 无法创建端口代理规则
exit /b 1
)
echo [成功] 端口代理已配置
)
:: 3. 启动 Chrome
echo 正在启动 Chrome 浏览器...
cd /d "C:\Program Files\Google\Chrome\Application"
if not exist "chrome.exe" (
echo [错误] 未找到 Chrome 安装路径
echo 请检查路径: "C:\Program Files\Google\Chrome\Application"
exit /b 1
)
if not exist "C:\playwright_debug" (
mkdir "C:\playwright_debug"
)
start "" chrome.exe --remote-debugging-port=9222 --remote-debugging-address=0.0.0.0 --user-data-dir="C:\playwright_debug"
echo 所有操作已完成
pause
bat2,供参考
@echo off
setlocal enabledelayedexpansion
:: 强制管理员权限
if not "%1"=="admin" (powershell start -verb runas '%0' admin & exit)
:: 清理旧进程和规则
taskkill /im chrome.exe /f >nul 2>&1
netsh interface portproxy delete v4tov4 listenport=9222 >nul 2>&1
:: 启动 Chrome(核心修改)
echo 启动 Chrome 调试实例...
cd /d "C:\Program Files\Google\Chrome\Application"
start "" chrome.exe ^
--remote-debugging-port=9222 ^
--remote-debugging-address=0.0.0.0 ^
--user-data-dir="C:\playwright_debug" ^
--no-first-run --disable-extensions
:: 延迟等待服务启动
timeout /t 5 /nobreak >nul
:: 验证监听状态
echo 当前端口监听状态:
netstat -ano | findstr :9222
pause
palywright操作别人电脑调试
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
# 替换为同事电脑的IP地址(如192.168.1.100)
# browser = p.chromium.connect_over_cdp("http://127.0.0.1:9222")
browser = p.chromium.connect_over_cdp("http://172.16.1.4:9222")
default_context = browser.contexts[0] # 获取默认上下文
page = default_context.pages[0] # 获取已打开的页面
# 操作浏览器(示例:打开百度)
page.goto("https://www.baidu.com")
page.fill("#kw", "自动化测试")
# from playwright.sync_api import sync_playwright
#
# with sync_playwright() as p:
# # 替换为同事电脑的实际IP(如192.168.1.100)
# browser = p.chromium.connect_over_cdp("http://192.168.1.100:9222")
# default_context = browser.contexts[0] # 获取默认上下文
# page = default_context.pages[0] # 获取已打开的页面
#
# # 操作浏览器示例:访问百度并搜索
# page.goto("https://www.baidu.com")
# page.fill("#kw", "自动化测试")
# page.click("#su")
# page.wait_for_timeout(3000) # 等待3秒观察结果
前端端vue代码和效果
点击执行,会操作本地电脑浏览器
uicasedemo.vue
<template>
<div class="container">
<h1 class="title">UI自动化Demo</h1>
<!-- 操作按钮区域 -->
<div class="action-bar">
<!-- 新增执行端地址输入框 -->
<el-input
v-model="form.debugger_url"
placeholder="请输入执行端IP端口"
style="width: 300px; margin-right: 20px"
/>
<el-button type="primary" @click="dialogVisible = true">新增步骤</el-button>
<el-button type="success" @click="handleExecute">执行测试</el-button>
</div>
<!-- 步骤列表 -->
<div class="step-list">
<div v-for="(step, index) in steps" :key="index" class="step-item">
<span class="step-index">步骤 {{ index + 1 }}</span>
<div class="step-content">
<div class="step-name">{{ step.action }}操作</div>
<div class="step-params" v-if="step.params">{{ step.params }}</div>
</div>
<div class="step-actions">
<el-button size="small" @click="handleEditStep(index)">编辑</el-button>
<el-button size="small" type="danger" @click="handleDeleteStep(index)">删除</el-button>
</div>
</div>
</div>
<!-- 新增步骤弹窗 -->
<el-dialog v-model="dialogVisible" title="新增操作步骤" width="30%">
<el-form :model="formData">
<el-form-item label="操作类型">
<el-select
v-model="formData.action"
placeholder="请选择操作"
@change="handleActionChange"
>
<el-option label="点击" value="click" />
<el-option label="输入" value="fill" />
<el-option label="访问地址" value="goto" />
<el-option label="等待" value="wait" />
</el-select>
</el-form-item>
<!-- 动态参数区域 -->
<template v-if="formData.action === 'goto'">
<el-form-item label="访问地址">
<el-input v-model="formData.params.url" placeholder="请输入URL" />
</el-form-item>
</template>
<template v-if="formData.action === 'click'">
<el-form-item label="元素定位">
<el-input v-model="formData.params.selector" placeholder="请输入元素选择器" />
</el-form-item>
</template>
<template v-if="formData.action === 'fill'">
<el-form-item label="元素定位">
<el-input v-model="formData.params.selector" placeholder="请输入元素选择器" />
</el-form-item>
<el-form-item label="输入内容">
<el-input v-model="formData.params.text" placeholder="请输入要输入的内容" />
</el-form-item>
</template>
<!-- 新增等待参数区域 -->
<template v-if="formData.action === 'wait'">
<el-form-item label="等待时间(毫秒)">
<el-input
v-model="formData.params.time"
type="number"
placeholder="请输入等待时间"
min="1"
/>
</el-form-item>
</template>
</el-form>
<template #footer>
<el-button @click="dialogVisible = false">取消</el-button>
<el-button type="primary" @click="handleAddStep">确认</el-button>
</template>
</el-dialog>
</div>
</template>
<script setup>
import { ref, reactive } from 'vue'
import { uidemosend } from '@/utils/api.js'
// 步骤数据
const steps = ref([
{
action: 'goto',
params: {
type: 'goto',
url: 'https://www.baidu.com',
selector: '',
text: ''
}
},
{
action: 'fill',
params: {
type: 'fill',
url: '',
selector: '#kw',
text: '刘亦菲'
}
},
{
action: 'click',
params: {
type: 'click',
url: '',
selector: '#su',
text: ''
}
}
])
const dialogVisible = ref(false)
const formData = ref({
editingIndex: -1,
action: '',
params: {
type: '',
url: '',
selector: '',
text: '',
time: 0
}
})
// 切换操作类型时清空参数
const handleActionChange = (value) => {
formData.value.params = {
type: value,
url: '',
selector: '',
text: '',
time: value === 'wait' ? 2000 : 0
}
}
// 处理编辑步骤
const handleEditStep = (index) => {
const step = steps.value[index]
formData.value = {
editingIndex: index,
action: step.action,
params: {
type: step.params.type,
url: step.params.url || '',
selector: step.params.selector || '',
text: step.params.text || '',
time: Number(step.params.time) || 0
}
}
dialogVisible.value = true
}
// 处理删除步骤
const handleDeleteStep = (index) => {
steps.value.splice(index, 1)
}
// 修改后的添加步骤方法
const handleAddStep = () => {
if (formData.value.action) {
const newStep = {
action: formData.value.action,
params: {
type: formData.value.params.type,
url: formData.value.params.url || '',
selector: formData.value.params.selector || '',
text: formData.value.params.text || '',
time: Number(formData.value.params.time) || 0
}
}
if (formData.value.editingIndex >= 0) {
// 更新现有步骤
steps.value.splice(formData.value.editingIndex, 1, newStep)
} else {
// 添加新步骤
steps.value.push(newStep)
}
// 重置表单
formData.value = {
editingIndex: -1,
action: '',
params: {
type: '',
url: '',
selector: '',
text: '',
time: 0
}
}
dialogVisible.value = false
}
}
const form = reactive({
debugger_url: 'http://127.0.0.1:9222',
actions: []
})
const handleExecute = () => {
console.log('执行')
// 将steps中的params提取到form.actions
form.actions = steps.value.map((step) => ({ ...step.params }))
console.log(form)
uidemosend(form)
}
</script>
<style lang="scss" scoped>
.container {
padding: 20px;
.title {
text-align: center;
margin-bottom: 30px;
}
.action-bar {
margin-bottom: 20px;
display: flex;
justify-content: flex-end;
}
.step-list {
border: 1px solid #ebeef5;
border-radius: 4px;
.step-item {
padding: 12px 20px;
border-bottom: 1px solid #ebeef5;
display: flex;
align-items: center;
&:last-child {
border-bottom: none;
}
.step-index {
width: 80px;
color: #909399;
}
.step-content {
flex: 1;
.step-name {
font-weight: 500;
margin-bottom: 4px;
}
.step-params {
color: #666;
font-size: 0.9em;
}
}
.step-actions {
margin-left: auto;
display: flex;
align-items: center;
.el-button {
margin-left: 10px;
}
}
}
}
}
</style>