1.定时任务的两种实现方式
1.1 用@scheduler.task装饰任务
安装插件:
pip install Flask-APScheduler
pip install apscheduler
脚本实现:
###app.py
##导入依赖库
from flask import Flask
import datetime
import config
from flask_apscheduler import APScheduler
from apscheduler.schedulers.background import BackgroundScheduler
#创建Flask对象
app = Flask(__name__)
#配置执行器
app.config['SCHEDULER_EXECUTORS'] = {'default': {'type': 'threadpool', 'max_workers': 10}}
#配置作业存储器
app.config['SCHEDULER_JOBSTORES'] = {'default': {'type': 'memory'}}
app.config.from_object(config.Config)
#创建APScheduler对象,调度器是BackgroundScheduler
scheduler=APScheduler(scheduler=BackgroundScheduler(daemon=True))
scheduler.init_app(app)
scheduler.start()
#以装饰器@scheduler.task定义任务,触发器为interval,每1秒执行一次
@scheduler.task(id='do_job_1',trigger='interval', minutes=1)
def task():
current_time=datetime.datetime.now()
print("装饰器定时任务开始执行时间:{}".format(current_time))
if __name__ == '__main__':
app.run()
定时任务测试结果:
1.2 用add_job的方式
脚本实现:
###app.py
##导入依赖库
from flask import Flask
import datetime
import config
from flask_apscheduler import APScheduler
from apscheduler.schedulers.background import BackgroundScheduler
#创建Flask对象
app = Flask(__name__)
#配置执行器
app.config['SCHEDULER_EXECUTORS'] = {'default': {'type': 'threadpool', 'max_workers': 10}}
#配置作业存储器
app.config['SCHEDULER_JOBSTORES'] = {'default': {'type': 'memory'}}
app.config.from_object(config.Config)
#创建APScheduler对象,调度器是BackgroundScheduler
scheduler=APScheduler(scheduler=BackgroundScheduler(daemon=True))
scheduler.init_app(app)
scheduler.start()
#定义任务
def api_task():
current_time = datetime.datetime.now()
print("API调用定时任务开始执行时间:{}".format(current_time))
#以调用API的方式定义定时任务
scheduler.add_job(id='do_job_2',func=api_task,trigger='interval', minutes=1)
if __name__ == '__main__':
app.run()
定时任务测试结果:
在上面的实现方式中,都涉及4个组件:调度器(scheduler),执行器(executor),作业存储器(job stores),触发器(trigger)。接下来详细介绍一下四个组件各自又包含哪些分类以及各分类又适合哪些场景。
2.调度器(scheduler)
Flask-APScheduler调度器支持七种,分别如下:
- BlockingScheduler:启动后在当前进程的主线程中运行,所以会阻塞当前正在运行的线程,直到调度任务执行完成或手动终止才会释放,适合只需要运行一次的任务
from apscheduler.schedulers.blocking import BlockingScheduler
def task_job():
print("xxxx")
if __name__ == "__main__":
# 创建 BlockingScheduler 实例
scheduler = BlockingScheduler()
scheduler.add_job(job, 'interval', seconds=5, id='my_job')
scheduler.start()
- BackgroundScheduler:独立启动一个线程执行,不会阻塞主线程,适合高频或者需要长时间运行的任务
from flask_apscheduler import APScheduler
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = APScheduler(scheduler=BackgroundScheduler(daemon=True))
scheduler.init_app(app)
scheduler.start()
- AsyncIOScheduler:异步调度器,任务函数需为 async 协程(一种特殊的函数,允许任务在等待 I/O 操作时释放 CPU 资源,避免阻塞主线程,函数的定义方法:async def func())。适合高并发 I/O 密集型任务
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = APScheduler(scheduler=AsyncIOScheduler())
scheduler.init_app(app)
scheduler.start()
- GeventScheduler:高并发调度器,利用 Gevent(高性能python并发网络库) 的协程实现并发,需使用 gevent.monkey.patch_all() 打补丁,适合大量并发但无需复杂异步代码的任务
from gevent import monkey
monkey.patch_all()
from apscheduler.schedulers.gevent import GeventScheduler
scheduler = APScheduler(scheduler=GeventScheduler())
scheduler.init_app(app)
scheduler.start()
- TornadoScheduler:异步web调度器,适合高并发的实时通信web服务、长连接服务或自定义协议服务的任务
from apscheduler.schedulers.tornado import TornadoScheduler
scheduler = APScheduler(scheduler=TornadoScheduler())
scheduler.init_app(app)
scheduler.start()
- TwistedScheduler:事件驱动异步网络调度器,适合高度定制化异步流程的任务,比如游戏服务,网络客户端,分布式系统。
from apscheduler.schedulers.twisted import TwistedScheduler
scheduler = APScheduler(scheduler=TwistedScheduler())
scheduler.init_app(app)
scheduler.start()
- QtScheduler:任务在 Qt 主线程中执行,适合桌面应用的定时任务
from apscheduler.schedulers.qt import QtScheduler
scheduler = APScheduler(scheduler=QtScheduler())
scheduler.init_app(app)
3.触发器(trigger)
触发器分为三种,分别如下:
- date触发器:日期触发器,在指定时间点执行一次任务,参数:
run_date:任务执行的具体时间,格式:datetime
/str
timezone:指定时区,不指定的话,默认用全局配置的时区,如果全局也没有配置,则用UTC时区。
- interval触发器:按固定时间间隔重复执行任务,参数:
weeks:间隔几周执行一次
days:间隔几天执行一次
hours:间隔几小时执行一次
minutes:间隔几分钟执行一次
seconds:间隔几秒执行一次
start_date:首次执行时间,可选,默认立即执行
end_date:停止执行时间,可选,默认无限制
timezone:时区,可选,默认使用调度器的全局时区
jitter:延迟秒数,可选,防止多个任务同时触发
- Cron 触发器:cron表达式触发器,表达式参数:
year:年,4位,如2025
month:月,1~12,*/5表示每5个月
day:日,1~31,*/5表示每5天
week:一年的周数
day_of_week:周几,0~6
hour:小时,0~23,*/5表示每5小时
minute:分钟,0~59,*/15表示每15分钟
second:秒,0~59
start_date:首次执行时间,可选,默认立即执行
end_date:停止执行时间,可选,默认无限制
timezone:时区,可选,默认使用配置的全局时区
jitter:延迟秒数,可选,防止多个任务同时触发
4.执行器(executor)
APScheduler提供的执行器有7种,分别如下:
- ThreadPoolExecutor:线程池执行器
type为:threadpool;
参数:max_workers:线程池里的最大线程数,默认为10
app.config['SCHEDULER_EXECUTORS'] = {
'default': {'type': 'threadpool', 'max_workers': 30}
}
- ProcessPoolExecutor:进程池执行器
type为:processpool
参数:max_workers:进程池的最大进程数,默认 5,不过受CPU核数限制
app.config['SCHEDULER_EXECUTORS'] = {
'default': {'type': 'processpool', 'max_workers': 4}
}
- GeventExecutor:Gevent协程执行器
type为:gevent
app.config['SCHEDULER_EXECUTORS'] = {
'default': {'type': 'gevent'}
}
- AsyncIOExecutor:Asyncio异步执行器
type为:asyncio
app.config['SCHEDULER_EXECUTORS'] = {
'default': {'type': 'asyncio'}
}
- TornadoExecutor:Tornado执行器
type为:tornado
app.config['SCHEDULER_EXECUTORS'] = {
'default': {'type': 'tornado'}
}
- TwistedExecutor:Twisted执行器
type为:twisted
app.config['SCHEDULER_EXECUTORS'] = {
'default': {'type': 'twisted'}
}
- DebugExecutor:调试执行器
type为:debug
app.config['SCHEDULER_EXECUTORS'] = {
'default': {'type': 'debug'}
}
5.作业存储器(job stores)
作业存储器(job stores)负责持久化存储定时任务的信息,如任务 ID、执行状态、触发器规则等,Flask-APScheduler支持的作业存储器类型有5种,分别是:
- MemoryJobStore:任务信息存储在内存中,暂停重启工程任务信息会丢失
- SQLAlchemyJobStore:任务信息存储在SQLite、MySQL、PostgreSQL数据库中,任务信息持久不会丢失
- RedisJobStore:任务信息存储在Redis中,支持分布式环境,适合高并发、分布式部署的任务
- MongoDBJobStore:任务信息存储在MongoDB中,适合非结构化数据
- ZooKeeperJobStore:使用 Apache ZooKeeper 存储任务,适合高可用性要求的环境