1. 安装 Celery 及依赖
1.1 安装 Celery
使用 pip
安装 Celery(推荐 Python 3.7+ 环境):
pip install celery
1.2 选择并安装 Broker
Celery 需要一个消息中间件(Broker)来传递任务。以下是两种常用 Broker 的安装方式:
选项一:Redis
Redis 轻量且适合快速上手,安装步骤如下:
# 安装 Redis 服务(Ubuntu/Debian)
sudo apt-get install redis-server
# 启动 Redis
sudo systemctl start redis
# 安装 Python 的 Redis 客户端库
pip install redis
选项二:RabbitMQ
RabbitMQ 是 Celery 官方推荐的 Broker,适合生产环境:
# 安装 RabbitMQ(Ubuntu/Debian)
sudo apt-get install -y erlang
sudo apt-get install rabbitmq-server
# 启动 RabbitMQ
sudo systemctl start rabbitmq-server
# 可选:启用 Web 管理界面
sudo rabbitmq-plugins enable rabbitmq_management
提示:生产环境推荐使用 RabbitMQ,开发环境可用 Redis 快速测试。
2. 最小化示例:编写第一个异步任务
2.1 创建 Celery 应用
新建文件 tasks.py
,编写以下代码:
from celery import Celery
# 创建 Celery 实例,指定 Broker 和 Backend
app = Celery(
'tasks',
broker='redis://localhost:6379/0', # 使用 Redis 作为 Broker
backend='redis://localhost:6379/0' # 使用 Redis 存储结果
)
# 定义一个异步任务
@app.task
def add(x, y):
return x + y
2.2 调用异步任务
在同一目录下打开 Python Shell,测试任务:
from tasks import add
# 异步调用任务(立即返回 AsyncResult 对象)
result = add.delay(4, 6)
# 查看任务是否完成(非阻塞)
print(result.ready()) # 输出 False(任务未完成)
# 获取任务结果(阻塞等待)
print(result.get(timeout=10)) # 输出 10
3. 命令行工具:启动 Worker 与查看状态
3.1 启动 Worker 节点
在项目目录下执行以下命令启动 Worker:
celery -A tasks worker --loglevel=info
-A tasks
:指定 Celery 应用模块(tasks.py
中的app
)。--loglevel=info
:设置日志级别为详细信息。
输出示例:
[config]
.> app: tasks:0x7f8d1c3b3d90
.> transport: redis://localhost:6379/0
.> results: redis://localhost:6379/0
.> concurrency: 4 (prefork)
[queues]
.> celery exchange=celery(direct) key=celery
3.2 查看 Worker 状态
使用 celery inspect
命令检查 Worker 是否存活:
celery -A tasks inspect ping
正常响应示例:
-> celery@ubuntu: OK
pong
3.3 监控任务结果(可选)
如果配置了 backend
(如 Redis),可通过以下命令查看任务结果:
# 进入 Redis CLI
redis-cli
# 查看结果(假设任务 ID 为 7a9d8c3a-...)
127.0.0.1:6379> GET celery-task-meta-7a9d8c3a...
"{\"status\": \"SUCCESS\", \"result\": 10, ...}"
4. 常见问题与解决
4.1 Broker 连接失败
- 现象:Worker 启动时报错
ConnectionRefusedError
。 - 解决:
- 确认 Broker 服务已启动(如
systemctl status redis
)。 - 检查防火墙是否开放 Broker 端口(默认 Redis: 6379,RabbitMQ: 5672)。
- 验证连接 URL 格式(如
redis://:password@host:port/db
)。
- 确认 Broker 服务已启动(如
4.2 任务未执行
- 现象:任务长时间处于
PENDING
状态。 - 解决:
- 确认 Worker 已启动且未卡死。
- 检查任务是否被正确路由到队列(默认队列为
celery
)。
5. 环境验证
通过浏览器访问 RabbitMQ 管理界面(默认地址 http://localhost:15672
,用户名/密码为 guest/guest
)或 Redis CLI,确认消息正常流转。