flask+celery处理异步任务

发布于:2024-11-02 ⋅ 阅读:(7) ⋅ 点赞:(0)

celery是一个强大的分布式任务队列,在这里我们介绍一下它最基本的处理异步任务的功能,包含以下几个:

  • 创建Celery实例

  • 创建一个异步任务

  • 查询异步任务的信息

  • 取消异步任务

使用的环境是flask3.0+Celery5.4

1. 创建Celery实例

celery通过中间件获取任务,再将任务结果写入backend,所以创建实例时需要传递这两个的url,其支持多种协议(比如redis、rabbitmq等),这里我们以redis为例。最简单的一种是创建对象时传递:

from celery import Celery

celery_app = Celery(__name__, broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

另外一种是使用配置文件,按官方文档的例子,在同级目录创建一个celeryconfig.py文件,写入配置信息,然后更新celery对象的配置:

celery_app = Celery(__name__)
apcelery_appp.config_from_object('celeryconfig')

但可能是由于Celery版本问题,配置字段改变了(大小写改变),这种方法连接backend一直报错,暂时还没找到原因

2. 创建一个异步任务

类似于flask,celery也使用装饰器来创建任务处理函数,比如官方例子:

@celery_app.task
def add(x, y):
    return x + y

但这种方式无法访问任务实例的信息,需要修改一下装饰器的参数,将任务实例作为第一个参数引入:

@celery_app.task(bind=True)
def add(self, x, y):
    print(self.request.id) # 打印任务
    return x + y

返回值会写到backend,存储在result字段。结合flask的例子如下:

from flask import Flask

# 其它代码
# ...

app = Flask(__name__)

@app.route('/task', methods=['POST'])
def add_task():
    id = add.delay(1, 2) # 返回的是异步任务的
    return {'code': 0, 'data': id}

注意:Celery是要单独启动的,然后再启动flask,启动命令类似于:

Celery -A app worker --loglevel=info

这个地方也有比较多的坑,经常报模块不存在这样的错

3. 查询异步任务的信息

大部分异步任务的耗时都比较长,过程中用户可能想知道任务执行的情况,比如进度等,这时前面引入的第一个参数(self)就能发挥作用了,我们修改一下demo, 每隔1秒计算一次,并将进度更新到backend:

import time

@celery_app.task(bind=True)
def add(self, x, y):
    for i in range(1, 100):
        print(x + y)
        self.update_state(state="PROGRESS", meta={'index': i})
        time.sleep(1)
    return x + y

而对应的flask查询代码如下:

from celery.result import AsyncResult

@app.route('/task/<string:id>', methods=['GET'])
def get_task(id):
    return {'code': 0, 'data': celery_app.AsyncResult(id)}

4. 取消异步任务

取消任务要借助Celery的Control类:

@app.route('/task/<string:id>', methods=['DELETE'])
def remove_task(id):
    ctrl = Control(celery_app) 
    ctrl.revoke(id)
    return {'code': 0}