在 Web 应用中,有时需要执行一些耗时操作,比如发送邮件、图片处理、数据分析等。如果这些任务在请求中同步执行,会导致用户体验变差,甚至请求超时。Flask 本身是一个同步框架,但可以结合 Celery 或 Background Tasks(Flask threading & multiprocessing) 实现异步任务和后台处理。
本章内容:
- Flask 异步任务概述
- 使用 Celery 处理异步任务
- 结合 Redis 作为任务队列
- Flask + Threading 实现简单的后台任务
- Flask + Multiprocessing 处理 CPU 密集型任务
- 使用 WebSocket 实时反馈任务进度
8.1 Flask 异步任务概述
8.1.1 什么是异步任务?
异步任务指的是那些不需要用户等待执行结果的任务,通常用于:
- 发送邮件
- 处理大文件上传
- 数据分析和机器学习任务
- 定时任务(如每天凌晨备份数据库)
8.1.2 解决方案对比
方法 |
适用场景 |
难度 |
备注 |
Celery |
任务队列,高并发任务 |
中等 |
需 Redis / RabbitMQ |
Threading |
简单异步任务 |
低 |
适合轻量任务 |
Multiprocessing |
CPU 密集型任务 |
中等 |
适合计算密集任务 |
8.2 使用 Celery 处理异步任务
8.2.1 安装 Celery 和 Redis
pip install celery redis
8.2.2 配置 Celery
在 celery_config.py 中:
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
return celery
在 app.py 中配置 Celery:
from flask import Flask
from celery_config import make_celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
celery = make_celery(app)
8.2.3 创建异步任务
@celery.task
def async_task(n):
import time
time.sleep(n)
return f"任务执行完毕,等待了 {n} 秒"
8.2.4 在 Flask 视图中调用任务
@app.route('/run_task/<int:seconds>')
def run_task(seconds):
task = async_task.apply_async(args=[seconds])
return f"任务已提交,任务 ID: {task.id}"
8.2.5 启动 Celery Worker
celery -A app.celery worker --loglevel=info
8.3 使用 Flask-Threading 实现简单异步任务
适用于小型异步任务,如:
- 发送邮件
- 处理小型任务
8.3.1 安装 Flask-Threading
pip install flask-threading
8.3.2 示例代码
import threading
import time
from flask import Flask
app = Flask(__name__)
def background_task(n):
print(f"任务开始,耗时 {n} 秒")
time.sleep(n)
print("任务完成")
@app.route('/start_task/<int:seconds>')
def start_task(seconds):
thread = threading.Thread(target=background_task, args=(seconds,))
thread.start()
return "后台任务已启动"
8.4 使用 Multiprocessing 处理 CPU 密集型任务
适用于:
- 数据分析
- 图像处理
- 机器学习计算
8.4.1 使用 multiprocessing
import multiprocessing
import time
def cpu_task(n):
print(f"计算任务开始,耗时 {n} 秒")
time.sleep(n)
print("计算任务完成")
@app.route('/cpu_task/<int:seconds>')
def run_cpu_task(seconds):
process = multiprocessing.Process(target=cpu_task, args=(seconds,))
process.start()
return "CPU 任务已提交"
8.5 使用 WebSocket 实时反馈任务进度
对于长时间任务,前端需要实时获取进度,这里使用 Flask-SocketIO。
8.5.1 安装 Flask-SocketIO
pip install flask-socketio
8.5.2 在 app.py 中配置 WebSocket
from flask_socketio import SocketIO
import time
app = Flask(__name__)
socketio = SocketIO(app)
@app.route('/start_long_task')
def start_long_task():
socketio.start_background_task(target=long_running_task)
return "任务开始,前端可监听进度"
def long_running_task():
for i in range(1, 11):
time.sleep(1)
socketio.emit('progress', {'progress': i * 10})
8.5.3 前端监听 WebSocket 事件
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js"></script><script>
var socket = io();
socket.on('progress', function(data) {
console.log("任务进度:" + data.progress + "%");
});
</script>
8.6 结语
本章介绍了 Flask 处理异步任务的方法:
- Celery + Redis:适合高并发异步任务
- Flask-Threading:适用于小任务
- Multiprocessing:用于 CPU 密集型任务
- WebSocket:实现任务进度反馈
下一章将介绍 Flask 的邮件发送与通知系统。