Celery - 入门(get-started)

发布于:2025-03-14 ⋅ 阅读:(14) ⋅ 点赞:(0)

在这里插入图片描述


本文翻译整理自 Celery 官方文档 get-started 部分:https://docs.celeryq.dev/en/stable/getting-started/index.html



Celery 简介

https://docs.celeryq.dev/en/stable/getting-started/introduction.html


任务队列是什么?

任务队列用作在线程或机器之间分配工作的机制。

任务队列的输入是一个称为任务的工作单元。专门的woker 进程持续监视任务队列,寻找要执行的新工作。

Celery通过消息进行通信,通常使用代理来在客户端和woker 之间进行调解。要启动一个任务,客户端会将消息添加到队列中,然后代理将那条消息发送给woker 。

一个Celery系统可以由多个woker 和代理组成,从而实现高可用性和水平扩展。

Celery是用Python编写的,但协议可以用任何语言实现。除了Python之外,还有用于Node.js的node-celerynode-celery-ts,以及PHP客户端

也可以通过公开HTTP端点并有一个请求它的任务(webhooks)来实现语言互操作性。


我需要什么?

Celery 需要一个消息传输来发送和接收消息。
RabbitMQ 和 Redis 代理传输功能完善,但同时也支持许多其他实验性解决方案,包括使用 SQLite 进行本地开发。

Celery 可以在单台机器上运行,在多台机器上运行,甚至跨越数据中心。


版本要求

Celery 版本 5.3 在

  • Python ❨3.8, 3.9, 3.10, 3.11❩
  • PyPy3.8+ ❨v7.3.11+❩
    Celery 4.x 是最后一个支持 Python 2.7 的版本, Celery 5.x 需要 Python 3.6 或更高版本。
    Celery 5.1.x 也需要 Python 3.6 或更高版本。
    Celery 5.2.x 需要 Python 3.7 或更高版本。

Celery 是一个资金有限的工程,所以我们不支持 Microsoft Windows。请不要为此平台打开任何相关的问题。


开始使用

如果这是您第一次尝试使用 Celery,或者您没有跟上 3.1 版本的开发并来自旧版本,那么您应该阅读我们的入门教程:


Celery 优点

简单

Celery 易于使用和维护,并且它 不需要配置文件

它有一个活跃、友好的社区,你可以与之交谈以获得支持,包括 mailing-list 和一个 IRC channel

这是你可以制作的 最简单的应用程序之一:

from celery import Celery

app = Celery('hello', broker='amqp://guest@localhost//')

@app.task
def hello():
 return 'hello world'

高度可用

worker和客户端在连接丢失或失败的情况下将自动重试,并且一些代理支持通过 主/主主/副本 复制的方式实现高可用性。


一个单独的Celery进程可以每分钟处理数百万个任务,并且具有 亚毫秒的往返延迟(使用RabbitMQ、librabbitmq和优化设置)。


灵活

几乎 Celery 的每一部分都可以扩展或单独使用,自定义池实现、序列化器、压缩方案、日志记录、调度器、消费者、生产者、代理传输等,还有很多更多。


它支持

  • Brokers
  • Concurrency
    • prefork (multiprocessing),
    • Eventlet, gevent
    • thread (multithreaded)
    • solo (single threaded)
  • Result Stores
    • AMQP, Redis
    • Memcached,
    • SQLAlchemy, Django ORM
    • Apache Cassandra, Elasticsearch, Riak
    • MongoDB, CouchDB, Couchbase, ArangoDB
    • Amazon DynamoDB, Amazon S3
    • Microsoft Azure Block Blob, Microsoft Azure Cosmos DB
    • Google Cloud Storage
    • File system
  • Serialization
    • pickle, json, yaml, msgpack.
    • zlib, bzip2 compression.
    • Cryptographic message signing.

特性

  • 监控

    工作器会发出一系列监控事件流,这些事件流被内置和外部工具用来实时告知您的集群正在做什么。阅读更多…

  • 工作流 可以使用我们称为“画布(canvas)”的一组强大的原语来组成简单和复杂的工作流程,包括分组、链式、分块等。阅读更多…

  • 时间 & 速率限制 您可以控制每秒/分钟/每小时可以执行多少个任务,或者允许任务运行多长时间,这可以设置为默认值、针对特定工作器或针对每种任务类型单独设置。阅读更多…

  • 调度 您可以指定任务运行的秒数或datetimePython v3.14),或者使用基于简单间隔的周期性任务,或者支持分钟、小时、星期几、月份几号和年月份的Crontab表达式。阅读更多…

  • 资源泄漏保护 --max-tasks-per-child参考CLI选项)选项用于处理用户任务泄漏资源,如内存或文件描述符,这些资源完全超出您的控制。阅读更多…

  • 用户组件 每个工作器组件都可以自定义,用户还可以定义额外的组件。工作器是通过“启动步骤”构建的——一个依赖图,它允许对工作器内部进行细致的控制。


框架集成

Celery易于与Web框架集成,其中一些甚至有集成包:

Pyramid https://pypi.org/project/pyramid_celery/
Pylons https://pypi.org/project/celery-pylons/
Flask 不需要
web2py https://pypi.org/project/web2py-celery/
Tornado https://pypi.org/project/tornado-celery/
Tryton https://pypi.org/project/celery_tryton/

对于 Django,请参阅 使用Django的第一步

集成包并不是绝对必要的,但它们可以使开发更加容易,有时它们还会添加重要的钩子,例如在 fork(2) 时关闭数据库连接。


快速跳转

我想要 ⟶


跳转到


安装

您可以通过 Python 包索引(PyPI)或源码安装 Celery。

要使用 pip 安装:

$ pip install -U Celery 

Bundles

Celery 还定义了一系列Bundles,可以用来安装 Celery 以及给定功能的依赖。

您可以在您的需求文件中指定这些Bundles,或者在使用 pip 命令行时通过使用方括号来实现。可以通过逗号分隔来指定多个Bundles。

$ pip install "celery[librabbitmq]"

$ pip install "celery[librabbitmq,redis,auth,msgpack]"

以下Bundles可用:

The following bundles are available:


Serializers
  • celery[auth]: 用于使用 auth 安全序列化器。
  • celery[msgpack]: 用于使用 msgpack 序列化器。
  • celery[yaml]: 用于使用 yaml 序列化器。

Concurrency 并发

传输和后端

celery[librabbitmq]: 用于使用 librabbitmq C 库。

  • celery[redis]: 用于使用 Redis 作为消息传输或结果后端。
  • celery[sqs]: 用于使用 Amazon SQS 作为消息传输 (实验性)。
  • celery[tblib]: 用于使用 task_remote_tracebacks 功能 。
  • celery[memcache]: 用于使用 Memcached 作为结果后端(使用 https://pypi.org/project/pylibmc/)。
  • celery[pymemcache]: 用于使用 Memcached 作为结果后端(纯 Python 实现)。
  • celery[cassandra]: 用于使用 Apache Cassandra/Astra DB 作为结果后端,与 DataStax 驱动程序一起使用。
  • celery[couchbase]: 用于使用 Couchbase 作为结果后端。
  • celery[arangodb]: 用于使用 ArangoDB 作为结果后端。
  • celery[elasticsearch]: 用于使用 Elasticsearch 作为结果后端。
  • celery[riak]: 用于使用 Riak 作为结果后端。
  • celery[dynamodb]: 用于使用 AWS DynamoDB 作为结果后端。
  • celery[zookeeper]: 用于使用 Zookeeper 作为消息传输。
  • celery[sqlalchemy]: 用于使用 SQLAlchemy 作为结果后端 (受支持)。
  • celery[pyro]: 用于使用 Pyro4 消息传输 (实验性)。
  • celery[slmq]: 用于使用 SoftLayer Message Queue 传输 (实验性)。
  • celery[consul]: 用于使用 Consul.io Key/Value 存储作为消息传输或结果后端 (实验性)。
  • celery[django]: 指定 Django 支持的最低版本。 你可能不会在需求中使用这个,它只用于信息目的。
  • celery[gcs]: 用于使用 Google Cloud Storage 作为结果后端 (实验性)。

从源代码下载和安装

从 PyPI 下载 Celery 的最新版本:https://pypi.org/project/celery/

您可以通过以下步骤进行安装:

tar xvfz celery-0.0.0.tar.gz
cd celery-0.0.0
python setup.py build
python setup.py install

最后一条命令必须以具有权限的用户身份执行,如果你当前没有使用虚拟环境。


使用开发版本


使用 pip

Celery 开发版本还需要 https://pypi.org/project/kombu/https://pypi.org/project/amqp/https://pypi.org/project/billiard/https://pypi.org/project/vine/ 的开发版本。

您可以使用以下 pip 命令安装这些软件的最新快照:

$ pip install https://github.com/celery/celery/zipball/main#egg=celery
$ pip install https://github.com/celery/billiard/zipball/main#egg=billiard
$ pip install https://github.com/celery/py-amqp/zipball/main#egg=amqp
$ pip install https://github.com/celery/kombu/zipball/main#egg=kombu
$ pip install https://github.com/celery/vine/zipball/main#egg=vine  

使用 git

请参阅贡献部分。


后端和代理

https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/index.html

版本:5.4 日期:2024年9月30日

Celery支持多种消息传输替代方案。


Broker 指令


Broker 概述

这是一个不同传输支持的比较表,更多详细信息可以在每个单独传输的文档中找到(见经纪人说明)。

名称 状态 监控 远程控制
RabbitMQ Stable
Redis Stable
Amazon SQS Stable
Zookeeper Experimental

实验性代理可能功能齐全,但没有专门的维护者。

缺少监控支持意味着传输不实现事件,因此Flower、Celery事件、Celerymon和其他基于事件监控工具将无法工作。

远程控制意味着能够使用celery inspect和celery control命令(以及其他使用远程控制API的工具)在运行时检查和管理 worker。


摘要

注意:本节并非全面概述后端和代理.

Celery 能够与许多不同的后端(结果存储)和代理(消息传输)进行通信和存储。


Redis

Redis 可以同时作为后端和代理。

作为代理: Redis 在快速传输小型消息方面表现良好。大型消息可能会导致系统拥堵。

查看文档以获取详细信息


作为后端: Redis 是一个超级快速的键值存储,这使得它在获取任务调用的结果时非常高效。与 Redis 的设计一样,您必须考虑可用于存储数据的内存限制以及您如何处理数据持久化。如果结果持久化很重要,请考虑为您的后端使用另一个数据库。


RabbitMQ

RabbitMQ 是一个代理。

作为代理:与 Redis 相比,RabbitMQ 处理大型消息的能力更强,但如果消息非常快速地连续涌入,扩展性可能会成为一个问题,这时可以考虑使用 Redis 或 SQS,除非 RabbitMQ 运行在非常大的规模下。

查看详细文档


作为后端:RabbitMQ 可以通过 rpc:// 后端存储结果。此后端为每个客户端创建一个单独的临时队列。

注意:RabbitMQ(作为代理)和 Redis(作为后端)非常常用在一起。如果需要更可靠的长期持久化存储,请考虑使用 PostgreSQL 或 MySQL(通过 SQLAlchemy),Cassandra 或自定义定义的后端。


SQS

SQS是一个代理。

如果你已经紧密集成到AWS中,并且熟悉SQS,那么作为一个代理,它是一个很好的选择。它具有极高的可扩展性和完全管理,并且像RabbitMQ一样管理任务委派。但它确实缺少RabbitMQ代理的一些功能,例如worker远程控制命令

查看文档以获取详细信息


SQLAlchemy

SQLAlchemy 是一个后端。

它允许 Celery 与 MySQL、PostgreSQL、SQLite 等数据库进行交互。它是一个对象关系映射(ORM),是 Celery 使用 SQL 数据库作为结果后端的方式。

查看文档获取详细信息


Celery 初步使用步骤

https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html

Celery 是一个包含电池的作业队列。
它易于使用,因此您可以在不学习解决该问题全部复杂性的情况下开始使用。它围绕最佳实践进行设计,以便您的产品可以扩展并与其他语言集成,并且它附带您在生产中运行此类系统所需的工具和支持。

在本教程中,您将学习使用 Celery 的绝对基础知识。

了解以下内容:

  • 选择和安装消息传输(代理)。
  • 安装 Celery 并创建您的第一个任务。
  • 启动工作进程并调用任务。
  • 跟踪任务在不同状态之间的转换。
    关于检查返回值。

Celery 可能一开始看起来有些令人畏惧 - 但别担心 - 这个教程会迅速让你上手。它是故意保持简单的,以免你被高级功能所困惑。
在你完成这个教程之后,浏览其余的文档是个不错的选择。

例如,下一步教程将展示 Celery 的功能。


选择 Broker

Celery需要一种发送和接收消息的解决方案;通常这通过一个名为消息代理的独立服务来实现。

以下是一些可用的选择,包括:


RabbitMQ

RabbitMQ 是功能完善、稳定、持久且易于安装的。它是生产环境中的一个优秀选择。
关于使用 RabbitMQ 与 Celery 的详细信息: 使用 RabbitMQ

如果您正在使用 Ubuntu 或 Debian,可以通过执行以下命令来安装 RabbitMQ:

$ sudo apt-get install rabbitmq-server

或者,如果你想使用 Docker 运行它,请执行以下命令:

$ docker run -d -p 5672:5672 rabbitmq  

当命令完成后,代理程序已经在后台运行,准备好为您移动消息:Starting rabbitmq-server: SUCCESS

如果您的操作系统不是Ubuntu或Debian,无需担心,您可以访问这个网站以找到适用于其他平台(包括Microsoft Windows)的类似简单安装说明: http://www.rabbitmq.com/download.html


Redis

Redis 同样功能齐全,但在突然终止或断电的情况下更容易发生数据丢失。关于使用 Redis 的详细信息:

使用 Redis

如果您想在 Docker 上运行它,请执行以下命令:

$ docker run -d -p 6379:6379 redis

其他 brokers

除了上述内容之外,还有其他实验性传输实现可供选择,包括 Amazon SQS

查看 Broker 概述 获取完整列表。


安装 Celery

Celery 位于 Python 包索引(PyPI),因此可以使用标准的 Python 工具,如 pip 来安装:

pip install celery

$ pip install celery

应用程序

首先,你需要一个 Celery 实例。我们称这个实例为 Celery 应用 或简称为 app

由于这个实例被用作你在 Celery 中想要执行的所有操作(如创建任务和管理 worker)的入口点,因此它必须允许其他模块导入它。

在本教程中,我们将所有内容都包含在一个单独的模块中,但对于更大的项目,你需要创建一个 专用模块


让我们创建文件 tasks.py

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

Celery 的第一个参数,是当前模块名,这是必需的,以便在 main 模块中定义任务时能够自动生成名称。

第二个参数是代理关键字参数,指定您想使用的消息代理的 URL。这里我们使用的是 RabbitMQ(也是默认选项)。

参见 选择一个代理 上面的内容以获取更多选择 –
对于 RabbitMQ,您可以使用 amqp://localhost,或者对于 Redis,您可以使用 redis://localhost

您定义了一个名为 add 的单个任务,返回两个数字的和。


运行 Celery 工作服务器

您现在可以通过执行带有 worker 参数的我们的程序 来运行工作进程:

$ celery -A tasks worker --loglevel=INFO 

注意:如果工作进程没有启动,请参阅故障排除部分。

在生产环境中,您希望以守护进程的方式在后台运行工作进程。为此,您需要使用平台提供的工具,或者类似supervisord(更多信息请参阅守护进程化)的东西。

要查看所有可用的命令行选项的完整列表,请执行:

$  celery worker --help 

还有其他几个命令可用,帮助也可用:

$ celery --help

调用任务

要调用我们的任务,你可以使用 delay() 方法。

这是一个便捷的快捷方式到 apply_async() 方法,它提供了对任务执行更大的控制(见 调用任务):

>>> from tasks import add
>>> add.delay(4, 4)

任务现在已经被您之前启动的woker 处理完毕。
您可以通过查看woker 的控制台输出来验证这一点。

调用任务会返回一个 AsyncResult 实例(celery.result.AsyncResult)。
这可以用来检查任务的状态,等待任务完成,或获取其返回值(如果任务失败,则获取异常和跟踪信息)。

结果默认不启用。为了进行远程过程调用或跟踪数据库中的任务结果,您需要配置 Celery 使用结果后端。这将在下一节中描述。


保持结果

如果您想跟踪任务的状态,Celery需要将状态存储或发送到某个地方。

有几种内置的结果后端可供选择:SQLAlchemy/Django ORM、MongoDBMemcachedRedisRPC (RabbitMQ/AMQP),或者——您也可以定义自己的。

在这个例子中,我们使用rpc结果后端,将状态作为临时消息发送回去。后端通过backend参数指定于Celery(或者如果您选择使用配置模块,也可以通过result_backend设置)。因此,您可以修改tasks.py文件中的这一行来启用rpc://后端:

app = Celery('tasks', backend='rpc://', broker='pyamqp://')

或者,如果您想使用 Redis 作为结果后端,但仍然使用 RabbitMQ 作为消息代理(这是一个流行的组合):

app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')

要了解更多关于结果后端的信息,请参阅结果后端

现在已配置结果后端,请关闭当前的 Python 会话,并再次导入 tasks 模块以使更改生效。这次在调用任务时,您将保留返回的 AsyncResult 实例:

>>> from tasks import add    # close and reopen to get updated 'app'
>>> result = add.delay(4, 4)  

ready() 方法返回任务是否已完成处理:

>>> result.ready()
False

您可以选择等待结果完成,但这很少使用,因为它将异步调用转换为同步调用:

>>> result.get(timeout=1)
8

在任务引发异常的情况下,get() 将重新引发异常,但您可以通过指定 propagate 参数来覆盖此行为:

>>> result.get(propagate=False)

如果任务抛出了异常,您还可以访问原始的跟踪回溯:

>>> result.traceback

警告:后端使用资源来存储和传输结果。为了确保资源被释放,您必须在调用任务后最终在每一个返回的 AsyncResult 实例上调用 get()forget()


配置

Celery,就像一个消费类设备,运行时不需要太多的配置。
它有一个输入和一个输出。输入必须连接到一个代理,而输出可以可选地连接到一个结果后端。然而,如果你仔细观察后面,你会发现一个盖子,下面有大量的滑块、旋钮和按钮:这就是配置。

默认配置对于大多数用例来说应该足够好,但是有很多选项可以被配置,以使Celery按照需要精确地工作。了解可用的选项是一个好主意,以便熟悉可以配置的内容。你可以在配置和默认值参考中了解这些选项。

配置可以直接在应用上设置,或者通过使用专门的配置模块来设置。
作为一个例子,你可以通过更改 task_serializer 设置来配置用于序列化任务负载的默认序列化器:

app.conf.task_serializer = 'json'

如果您一次配置许多设置,可以使用 update

app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

对于较大的项目,建议使用专门的配置模块。
不建议硬编码周期性任务间隔和任务路由选项。
将这些配置保存在集中位置会更好。这对于库来说尤其如此,因为它使用户能够控制其任务的行为。
集中式配置还将允许您的系统管理员在系统出现问题时进行简单的更改。

您可以通过调用 app.config_from_object() 方法来告诉您的Celery实例使用配置模块:app.config_from_object()

app.config_from_object('celeryconfig')

这个模块通常被称为“celeryconfig”,但你可以使用任何模块名称。

在上面的例子中,必须有一个名为celeryconfig.py的模块在当前目录或Python路径中可用。它可能看起来像这样:

celeryconfig.py:

broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

为了验证您的配置文件是否正常工作且不包含任何语法错误,您可以尝试导入它:

$ python -m celeryconfig

对于完整的配置选项参考,请参阅配置和默认值

为了展示配置文件的力量,以下是如何将一个行为不端的任务路由到专用队列的示例:

celeryconfig.py

task_routes = {
    'tasks.add': 'low-priority',
}

或者,你可以在不进行路由的情况下对任务进行速率限制,这样每分钟只能处理10个此类任务(10/m):

celeryconfig.py:

task_annotations = {
    'tasks.add': {'rate_limit': '10/m'}
}

如果您正在使用 RabbitMQ 或 Redis 作为代理,那么您还可以指导工作员在运行时为任务设置新的速率限制:

$ celery -A tasks control rate_limit tasks.add 10/m
worker@example.com: OK
    new rate limit set successfully

查看路由任务了解更多关于任务路由的信息,以及关于注解的task_annotations设置,或者监控与管理指南了解更多关于远程控制命令
以及如何监控您的woker 正在做什么。


从这里开始

如果您想了解更多,请继续查看 下一步 教程,之后您可以阅读 用户指南


故障排除

常见问题解答中也有一个故障排除部分。


woker 无法启动:权限错误

  • 如果你正在使用 Debian、Ubuntu 或其他基于 Debian 的发行版:

Debian 最近将 /dev/shm 特殊文件重命名为 /run/shm.

一个简单的解决方案是创建一个符号链接:

# ln -s /run/shm /dev/shm

  • 其他:

如果您提供了任何以下参数:--pidfile--logfile--statedb,那么您必须确保它们指向一个可由启动工作进程的用户写入和读取的文件或目录。


结果后端不工作或任务总是处于 PENDING 状态

所有任务默认都是 PENDING,所以这个状态最好命名为“未知”。Celery在发送任务时不会更新状态,任何没有历史记录的任务都假定是挂起的(毕竟你知道任务 ID)。

  1. 确保任务没有启用 ignore_result
    启用此选项将强制工作器跳过更新状态。
  2. 确保没有启用 task_ignore_result配置文档)设置。
  3. 确保您没有任何仍在运行的旧 worker。

不小心启动多个 worker很容易,所以确保在启动新的 worker之前,先正确关闭前一个 worker。

如果一个旧 worker没有配置为预期的结果后端,它可能会运行并劫持任务。

可以将 --pidfile 参数设置为绝对路径,以确保这种情况不会发生。
4. 确保客户端已配置正确的后端。

如果由于某种原因,客户端被配置为使用与工作进程不同的后端,您将无法收到结果。
确保后端配置正确:

>>> result = task.delay()
>>> print(result.backend)

下一步

https://docs.celeryq.dev/en/stable/getting-started/next-steps.html

上面 Celery 的第一步 指南是故意简化版的。在这个指南中,我将更详细地展示 Celery 提供的功能,包括如何为您的应用程序和库添加 Celery 支持。

此文档并未记录 Celery 的所有功能和最佳实践,因此建议您也阅读 用户指南


在您的应用程序中使用Celery


我们的项目

项目布局:

src/
    proj/__init__.py
        /celery.py
        /tasks.py

proj/celery.py
from celery import Celery

app = Celery('proj',
             broker='amqp://',
             backend='rpc://',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

在本模块中,您创建了我们的 Celery 实例(有时被称为 app)。要在您的项目中使用 Celery,您只需导入此实例即可。

  • broker 参数指定要使用的代理的 URL。
    查看选择一个代理获取更多信息。
  • backend 参数指定要使用的返回后端。
    它用于跟踪任务状态和结果。
    虽然结果默认是禁用的,但我在这里使用 RPC 结果后端,因为我要演示如何检索结果。您可能想为您的应用程序使用不同的后端。它们都有不同的优势和劣势。如果您不需要结果,最好禁用它们。也可以通过设置 @task(ignore_result=True) 选项来为单个任务禁用结果。
    更多信息请参阅 保留结果
  • include 参数是一个要导入的模块列表,当
    工作进程开始。您需要在这里添加我们的任务模块,以便工作进程能够找到我们的任务。

proj/tasks.py
from .celery import app

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y

@app.task
def xsum(numbers):
    return sum(numbers)

启动工作进程

celebre程序可用于启动worker(您需要在上面的目录中运行worker proj,根据示例项目布局,目录为src):

$ celery -A proj worker -l INFO

当工作进程启动时,你应该会看到一个横幅和一些消息:

--------------- celery@halcyon.local v4.0 (latentcall)
--- ***** -----
-- ******* ---- [Configuration]
- *** --- * --- . broker:      amqp://guest@localhost:5672//
- ** ---------- . app:         __main__:0x1012d8590
- ** ---------- . concurrency: 8 (processes)
- ** ---------- . events:      OFF (enable -E to monitor this worker)
- ** ----------
- *** --- * --- [Queues]
-- ******* ---- . celery:      exchange:celery(direct) binding:celery
--- ***** -----

[2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started.

  • broker 是你在 celery 模块的 broker 参数中指定的 URL。你也可以通过使用命令行中的 -b 选项来指定不同的 broker。
  • 并发性 是用于并行处理你的任务的前置工作进程的数量。当所有这些进程都在忙于工作,新的任务必须等待其中一个任务完成,然后才能被处理。

默认的并发数是那台机器上的 CPU 数量(包括核心)。你可以使用 celery worker -c 选项指定一个自定义的数量。没有推荐的值,因为最佳数量取决于许多因素,但如果你的任务主要是 I/O 密集型,那么你可以尝试增加它。实验表明,增加超过 CPU 数量两倍以上的并发数很少有效,并且可能会降低性能。

包括默认的 prefork 池,Celery 还支持使用 Eventlet、Gevent,以及以单线程运行(见 并发性)。

  • 事件 是一个选项,它会使 Celery 发送监视消息(事件),以监视在 worker 中发生的操作。这些可以被像 celery events 和 Flower(实时 Celery 监视器)这样的监视程序使用,你可以在 监视和管理指南 中了解更多信息。
  • 队列 是 worker 将从中消费任务的队列列表。worker 可以被指示一次性从多个队列中消费,这被用来将消息路由到特定的 worker,作为服务质量、关注点分离和优先级排序的手段,所有这些都在 路由指南 中描述。

你可以通过传递 --help 标志来获取完整的命令行参数列表:

$ celery worker --help

这些选项在 worker指南中描述得更加详细。


停止 worker

要停止工作进程,只需按下 Control-c。工作进程支持的信号列表详见工作进程指南


在后台

在生产环境中,您会希望以后台模式运行工作进程,详情请参考守护进程教程

守护进程脚本使用celery multi命令以后台模式启动一个或多个工作进程:

$ celery multi start w1 -A proj -l INFO
celery multi v4.0.0 (latentcall)
Starting nodes...
    > w1.halcyon.local: OK

您也可以重新启动它:

$ celery  multi restart w1 -A proj -l INFO
celery multi v4.0.0 (latentcall)
Stopping nodes...
    > w1.halcyon.local: TERM -> 64024
Waiting for 1 node.....
    > w1.halcyon.local: OK
Restarting node w1.halcyon.local: OK
celery multi v4.0.0 (latentcall)
Stopping nodes...
    > w1.halcyon.local: TERM -> 64052

或者停止它:

$ celery multi stop w1 -A proj -l INFO

stop 命令是异步的,所以它不会等待工作进程关闭。你可能想要使用 stopwait 命令代替,它确保在退出之前完成所有当前正在执行的任务:

$ celery multi stopwait w1 -A proj -l INFO

注意:celery multi 不存储有关 worker的信息,因此您需要在重新启动时使用相同的命令行参数。

在停止时,必须使用相同的 pidfile 和 logfile 参数。

默认情况下,它将在当前目录中创建 pid 和日志文件。

为了防止多个 worker同时启动,建议将这些文件放在一个专用目录中:

$ mkdir -p /var/run/celery
$ mkdir -p /var/log/celery
$ celery multi start w1 -A proj -l INFO --pidfile=/var/run/celery/%n.pid \
                                        --logfile=/var/log/celery/%n%I.log

使用多命令,您可以启动多个工作进程,并且有强大的命令行语法来指定不同工作进程的参数,例如:

$ celery multi start 10 -A proj -l INFO -Q:1-3 images,video -Q:4,5 data \
    -Q default -L:4,5 debug

更多示例请参阅 API 参考中的multi模块。


关于 --app 参数

The --app argument specifies the Celery app instance to use, in the form of module.path:attribute

But it also supports a shortcut form. If only a package name is specified, it’ll try to search for the app instance, in the following order:

With --app=proj:

  1. 一个名为 proj.app 的属性,或者
  2. 一个名为 proj.celery 的属性,或者
  3. 模块 proj 中任何 值是 Celery app 的属性,或

如果没有找到这些,它将尝试一个名为 proj.celery 的子模块:

  1. 一个名为 proj.celery.app 的属性,或者
  2. 一个名为 proj.celery.celery 的属性,或者
  3. 模块 proj.celery 中任何值为 Celery app 的属性

这个方案模仿了文档中使用的实践——即,对于单个包含的模块使用 proj:app,而对于更大的项目使用 proj.celery:app


调用任务

您可以使用 delay() 方法调用一个任务:

>>> from proj.tasks import add

>>> add.delay(2, 2)

此方法实际上是一个到另一个名为 apply_async() 方法的星号参数快捷方式:

>>> add.apply_async((2, 2))

后者使您能够指定执行选项,如运行时间(倒计时)、应发送到的队列等:

>>> add.apply_async((2, 2), queue='lopri', countdown=10)

在上述示例中,任务将被发送到名为 lopri 的队列中,并且任务将在消息发送后最早10秒执行。

直接应用任务将在当前进程中执行任务,因此不会发送任何消息:

>>> add(2, 2)
4

这三个方法 - delay(), apply_async() 和应用 (__call__),构成了 Celery 调用 API,它也被用于签名。

调用用户指南 中可以找到对调用 API 的更详细概述。

每个任务调用都将获得一个唯一的标识符(一个 UUID)——这是任务 ID。

delayapply_async 方法返回一个 AsyncResult 实例,可以用来跟踪任务执行状态。但为了做到这一点,你需要启用一个 结果后端 以便将状态存储在某个地方。

默认情况下结果是被禁用的,因为没有适合每个应用的结果后端;为了选择一个,你需要考虑每个后端的缺点。对于许多任务,保留返回值甚至可能并不非常有用,所以这是一个合理的默认设置。另外请注意,结果后端不用于监控任务和 worker:为此,Celery 使用专用的事件消息(见 监控和管理指南)。

如果你已经配置了结果后端,你可以检索任务的返回值:

>>> res = add.delay(2, 2)
>>> res.get(timeout=1)
4

您可以查看 id 属性来找到任务的 id:

>>> res.id
d6b3aea2-fb9b-4ebc-8da4-848818db9114

您也可以检查异常和跟踪回溯,如果任务抛出了异常,实际上 result.get() 默认会传播任何错误:

>>> res = add.delay(2, '2')
>>> res.get(timeout=1)

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "celery/result.py", line 221, in get
    return self.backend.wait_for_pending(
  File "celery/backends/asynchronous.py", line 195, in wait_for_pending
    return result.maybe_throw(callback=callback, propagate=propagate)
  File "celery/result.py", line 333, in maybe_throw
    self.throw(value, self._to_remote_traceback(tb))
  File "celery/result.py", line 326, in throw
    self.on_ready.throw(*args, **kwargs)
  File "vine/promises.py", line 244, in throw
    reraise(type(exc), exc, tb)
  File "vine/five.py", line 195, in reraise
    raise value
TypeError: unsupported operand type(s) for +: 'int' and 'str'

如果你不希望错误传播,可以通过传递 propagate 来禁用它:

>>> res.get(propagate=False)
TypeError("unsupported operand type(s) for +: 'int' and 'str'")

在这种情况下,它将返回引发的异常实例——
因此,要检查任务是否成功或失败,您必须使用结果实例上的相应方法:

>>> res.failed()
True

>>> res.successful()
False

所以它是如何知道任务是否失败的?它可以通过查看任务的 状态 来找出答案。

>>> res.state
'FAILURE'

一个任务只能处于单一状态,但它可以经过几个状态。一个典型任务的阶段可以是:

PENDING -> STARTED -> SUCCESS

起始状态是一个特殊状态,只有当启用task_track_started设置,或者为任务设置了@task(track_started=True)选项时,才会记录该状态。

挂起状态实际上并不是一个记录的状态,而是任何未知任务ID的默认状态:您可以从以下示例中看出这一点:

>>> from proj.celery import app

>>> res = app.AsyncResult('this-id-does-not-exist')
>>> res.state
'PENDING'

如果任务需要重试,阶段可能会变得更加复杂。

为了演示,对于重试了两次的任务,阶段将如下所示:

PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

关于任务状态,您应该查看用户指南中的状态部分。

调用指南中详细描述了调用任务。


Canvas : 设计工作流程

您刚刚学习了如何使用任务的 delay 方法来调用任务,这通常就足够了。但有时您可能希望将任务调用的签名传递给另一个进程或作为另一个函数的参数,Celery 使用一种称为 signatures 的机制来实现这一点。

签名以某种方式封装了单个任务调用的参数和执行选项,使其可以被传递给函数,甚至可以序列化并通过网络发送。

您可以使用参数 (2, 2) 和 10 秒的倒计时来为 add 任务创建一个签名,如下所示:

>>> add.signature((2, 2), countdown=10)
tasks.add(2, 2)

也有使用星号参数的快捷方式:

>>> add.s(2, 2)
tasks.add(2, 2)

又是调用API的时候了…

签名实例也支持调用API,这意味着它们有 delayapply_async 方法。

但是有一点区别是,签名可能已经指定了参数签名。add 任务接受两个参数,所以指定两个参数的签名将构成一个完整的签名:

>>> s1 = add.s(2, 2)
>>> res = s1.delay()
>>> res.get()
4

但是,您也可以创建不完整的签名来创建我们所说的
部分签名

# incomplete partial: add(?, 2)
>>> s2 = add.s(2)

s2 现在是一个部分签名,需要另一个参数才能完整,这在调用签名时可以解决:

# resolves the partial: add(8, 2)
>>> res = s2.delay(8)
>>> res.get()
10

这里添加了参数8,它被附加到现有的参数2上,形成了完整的签名 add(8, 2)

关键字参数也可以稍后添加;这些参数将与任何现有的关键字参数合并,但新参数将具有优先级:

>>> s3 = add.s(2, 2, debug=True)
>>> s3.delay(debug=False)   # debug is now False.

如所述,签名支持调用API:意味着

  • sig.apply_async(args=(), kwargs={}, **options)

调用具有可选部分参数和部分关键字参数的签名。也支持部分执行选项。

  • sig.delay(*args, **kwargs)
    Star 参数版本的apply_async。任何参数都将被添加在前面。对于签名中的参数,关键字参数将与任何现有密钥合并。

所以这一切看起来都很有用,但你能用这些做什么呢?
为了达到这个目的,我必须介绍画布图元…


原语

这些原语本身就是签名对象,因此它们可以以任何数量的方式组合,以构成复杂的工作流程。


注意:以下示例检索结果,因此要尝试它们,您需要配置一个结果后端。上面的示例项目已经做到了这一点(参见 Celery 的后端参数Celery)。

让我们看看一些示例:


Groups

一个 group 调用一系列任务并行执行,并返回一个特殊的结果实例,让您可以作为一个组来检查结果,并按顺序检索返回值。

>>> from celery import group
>>> from proj.tasks import add

>>> group(add.s(i, i) for i in range(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

  • 部分组
>>> g = group(add.s(i) for i in range(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

Chains

任务可以被链接在一起,以便在第一个任务返回后,其他任务被调用:

Tasks can be linked together so that after one task returns the other is called:
>>> from celery import chain
>>> from proj.tasks import add, mul

# (4 + 4) * 8
>>> chain(add.s(4, 4) | mul.s(8))().get()
64

或者是一个部分链:

>>> # (? + 4) * 8
>>> g = chain(add.s(4) | mul.s(8))
>>> g(4).get()
64

链也可以这样编写:

>>> (add.s(4, 4) | mul.s(8))().get()
64

Chords

一个和弦是一个具有回调函数的组:

>>> from celery import chord
>>> from proj.tasks import add, xsum

>>> chord((add.s(i, i) for i in range(10)), xsum.s())().get()
90

一个与另一个任务链式连接的组将被自动转换为和弦:

>>> (group(add.s(i, i) for i in range(10)) | xsum.s())().get()
90

由于这些原语都是签名类型,因此它们可以几乎以任何方式组合,例如:

>>> upload_document.s(file) | group(apply_filter.s() for filter in filters)

确保阅读更多关于 Canvas 用户指南中的工作流程。


路由

Celery 支持所有由 AMQP 提供的路由功能,但它还支持简单路由,其中消息被发送到命名队列。

task_routes 设置允许您通过名称路由任务,并将所有内容集中在一个位置:

app.conf.update(
    task_routes = {
        'proj.tasks.add': {'queue': 'hipri'},
    },
)

您也可以使用 apply_async 函数的 queue 参数在运行时指定队列:

You can also specify the queue at runtime
with the `queue` argument to `apply_async`:
>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue='hipri')

您可以通过指定 celery worker -Q 选项来使工作进程从该队列中消费:

您可以通过指定 `*celery worker -Q*` 选项来使工作进程从该队列中消费:

$ celery -A proj worker -Q hipri

您可以通过逗号分隔的列表来指定多个队列。
例如,您可以让工作器同时从默认队列和 hipri 队列中消费,其中
默认队列名为 celery 是由于历史原因:

$ celery -A proj worker -Q hipri,celery

队列的顺序不重要,因为工作器将对队列给予相同的权重。

要了解更多关于路由的信息,包括充分利用AMQP路由的完整功能,请参阅路由指南


远程控制

如果您使用 RabbitMQ (AMQP)、Redis 或 Qpid 作为代理,那么您可以在运行时控制并检查工作进程。

例如,您可以看到工作进程目前正在处理哪些任务:

$ celery -A proj inspect active

这是通过使用广播消息实现的,因此所有远程控制命令都会被集群中的每个工作节点接收。

您还可以使用--destination选项指定一个或多个工作节点来处理请求。
这是一个用逗号分隔的工作节点主机名列表:

$ celery -A proj inspect active --destination=celery@example.com

如果未提供目标,则每个工作进程都将行动并对请求做出回应。

celery inspect 命令包含一些不会在工作进程中更改任何内容的命令;它只返回关于工作进程中正在发生的事情的信息和统计信息。
要执行以下 inspect 命令列表:

$ celery -A proj inspect --help

然后是 celery控制 命令,它包含一些在运行时真正改变工作进程内容的命令:

$ celery -A proj control --help

例如,您可以强制工作器启用事件消息(用于监控任务和工作器):

$ celery -A proj control enable_events

当事件启用时,您就可以启动事件转储器,以查看 worker正在做什么:

$ celery -A proj events --dump

或者您可以选择启动 curses 接口:

$ celery -A proj events

当你完成监控后,可以再次禁用事件:

$ celery -A proj control disable_events

The celery status 命令也使用远程控制命令,并显示集群中在线 worker的列表:

$ celery -A proj status

您可以在监控指南中了解更多关于 celery 命令和监控的信息。


时区

所有时间和日期,在内部和消息中,都使用 UTC 时区。

当工作进程收到消息时,例如设置了倒计时,它会将那个 UTC 时间转换为本地时间。如果您希望使用与系统时区不同的时区,则必须使用 timezone 设置(配置 timezone)来配置:

app.conf.timezone = 'Europe/London'

优化

默认配置不是针对吞吐量优化的。默认情况下,它试图在许多短任务和较少长任务之间走中间路线,这是吞吐量和公平调度之间的折衷。

如果您有严格的公平调度需求,或者想要优化吞吐量,那么您应该阅读优化指南


现在该做什么?

现在你已经阅读了这份文档,你应该继续阅读用户指南

如果你有兴趣,还可以查看API参考



资源


获得帮助


邮件列表

关于 Celery 的使用、开发和未来的讨论,请加入 celery-users 邮件列表。


IRC

来和我们在IRC上聊天吧。#celery频道位于Libera Chat网络。


错误追踪器

如果您有任何建议、错误报告或烦恼,请向我们的问题跟踪器报告,链接为 https://github.com/celery/celery/issues/


维基

https://github.com/celery/celery/wiki


贡献

Celery 的开发在 GitHub 上进行:https://github.com/celery/celery

强烈鼓励您参与 celery 的开发。如果您不喜欢 GitHub(出于某种原因),您也可以发送常规补丁。

请务必阅读文档中的 向 Celery 贡献 部分。


许可证

此软件根据新BSD许可证授权。请参阅顶级分发目录中的LICENSE文件,以获取完整的许可证文本。


获取帮助


邮件列表

关于 Celery 的使用、开发和未来的讨论,请加入 celery-users 邮件列表。


IRC

来和我们在IRC上聊天吧。#celery频道位于Libera Chat网络。


邮件列表

关于Celery的使用、开发和未来的讨论,请加入celery-users邮件列表。


IRC

来我们这里用IRC聊天吧。#celery 频道位于 Libera Chat 网络。


Bug tracker

如果您有任何建议、错误报告或烦恼,请将它们报告给我们的问题跟踪器:https://github.com/celery/celery/issues/


维基

https://github.com/celery/celery/wiki


贡献

Celery 的开发在 GitHub 进行:https://github.com/celery/celery

强烈鼓励您参与 celery 的开发。如果您不喜欢 GitHub(出于某种原因),欢迎您发送常规补丁。

请务必阅读文档中的 为 Celery 贡献 部分。


许可

本软件根据新BSD许可证授权。请参阅顶层分发目录中的 LICENSE 文件以获取完整的许可证文本。


2025-03-13(四)


网站公告

今日签到

点亮在社区的每一天
去签到