Flask(八)异步任务与后台处理

发布于:2025-04-04 ⋅ 阅读:(26) ⋅ 点赞:(0)

在 Web 应用中,有时需要执行一些耗时操作,比如发送邮件、图片处理、数据分析等。如果这些任务在请求中同步执行,会导致用户体验变差,甚至请求超时。Flask 本身是一个同步框架,但可以结合 Celery 或 Background Tasks(Flask threading & multiprocessing) 实现异步任务和后台处理。

本章内容:

  • Flask 异步任务概述
  • 使用 Celery 处理异步任务
  • 结合 Redis 作为任务队列
  • Flask + Threading 实现简单的后台任务
  • Flask + Multiprocessing 处理 CPU 密集型任务
  • 使用 WebSocket 实时反馈任务进度

8.1 Flask 异步任务概述

8.1.1 什么是异步任务?

异步任务指的是那些不需要用户等待执行结果的任务,通常用于:

  • 发送邮件
  • 处理大文件上传
  • 数据分析和机器学习任务
  • 定时任务(如每天凌晨备份数据库)

8.1.2 解决方案对比

方法

适用场景

难度

备注

Celery

任务队列,高并发任务

中等

需 Redis / RabbitMQ

Threading

简单异步任务

适合轻量任务

Multiprocessing

CPU 密集型任务

中等

适合计算密集任务

8.2 使用 Celery 处理异步任务

8.2.1 安装 Celery 和 Redis

pip install celery redis

8.2.2 配置 Celery

在 celery_config.py 中:

from celery import Celery

def make_celery(app):
    celery = Celery(

        app.import_name,

        backend=app.config['CELERY_RESULT_BACKEND'],

        broker=app.config['CELERY_BROKER_URL']
    )

    celery.conf.update(app.config)

    return celery

在 app.py 中配置 Celery:

from flask import Flask
from celery_config import make_celery

app = Flask(__name__)

app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

celery = make_celery(app)

8.2.3 创建异步任务

@celery.task
def async_task(n):

    import time
    time.sleep(n)

    return f"任务执行完毕,等待了 {n} 秒"

8.2.4 在 Flask 视图中调用任务

@app.route('/run_task/<int:seconds>')
def run_task(seconds):

    task = async_task.apply_async(args=[seconds])

    return f"任务已提交,任务 ID: {task.id}"

8.2.5 启动 Celery Worker

celery -A app.celery worker --loglevel=info

8.3 使用 Flask-Threading 实现简单异步任务

适用于小型异步任务,如:

  • 发送邮件
  • 处理小型任务

8.3.1 安装 Flask-Threading

pip install flask-threading

8.3.2 示例代码

import threading
import time
from flask import Flask

app = Flask(__name__)

def background_task(n):
    print(f"任务开始,耗时 {n} 秒")

    time.sleep(n)

    print("任务完成")

@app.route('/start_task/<int:seconds>')
def start_task(seconds):

    thread = threading.Thread(target=background_task, args=(seconds,))

    thread.start()

    return "后台任务已启动"

8.4 使用 Multiprocessing 处理 CPU 密集型任务

适用于:

  • 数据分析
  • 图像处理
  • 机器学习计算

8.4.1 使用 multiprocessing

import multiprocessing
import time

def cpu_task(n):
    print(f"计算任务开始,耗时 {n} 秒")

    time.sleep(n)

    print("计算任务完成")

@app.route('/cpu_task/<int:seconds>')
def run_cpu_task(seconds):

    process = multiprocessing.Process(target=cpu_task, args=(seconds,))

    process.start()

    return "CPU 任务已提交"

8.5 使用 WebSocket 实时反馈任务进度

对于长时间任务,前端需要实时获取进度,这里使用 Flask-SocketIO

8.5.1 安装 Flask-SocketIO

pip install flask-socketio

8.5.2 在 app.py 中配置 WebSocket

from flask_socketio import SocketIO
import time

app = Flask(__name__)

socketio = SocketIO(app)

@app.route('/start_long_task')
def start_long_task():

    socketio.start_background_task(target=long_running_task)

    return "任务开始,前端可监听进度"

def long_running_task():
    for i in range(1, 11):

        time.sleep(1)

        socketio.emit('progress', {'progress': i * 10})

8.5.3 前端监听 WebSocket 事件

<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js"></script><script>

    var socket = io();

    socket.on('progress', function(data) {

        console.log("任务进度:" + data.progress + "%");

    });
</script>

8.6 结语

本章介绍了 Flask 处理异步任务的方法:

  • Celery + Redis:适合高并发异步任务
  • Flask-Threading:适用于小任务
  • Multiprocessing:用于 CPU 密集型任务
  • WebSocket:实现任务进度反馈

下一章将介绍 Flask 的邮件发送与通知系统