2 celery环境搭建

发布于:2025-04-19 ⋅ 阅读:(26) ⋅ 点赞:(0)

1. 安装 Celery 及依赖

1.1 安装 Celery

使用 pip 安装 Celery(推荐 Python 3.7+ 环境):

pip install celery

1.2 选择并安装 Broker

Celery 需要一个消息中间件(Broker)来传递任务。以下是两种常用 Broker 的安装方式:

选项一:Redis

Redis 轻量且适合快速上手,安装步骤如下:

# 安装 Redis 服务(Ubuntu/Debian)
sudo apt-get install redis-server

# 启动 Redis
sudo systemctl start redis

# 安装 Python 的 Redis 客户端库
pip install redis
选项二:RabbitMQ

RabbitMQ 是 Celery 官方推荐的 Broker,适合生产环境:

# 安装 RabbitMQ(Ubuntu/Debian)
sudo apt-get install -y erlang
sudo apt-get install rabbitmq-server

# 启动 RabbitMQ
sudo systemctl start rabbitmq-server

# 可选:启用 Web 管理界面
sudo rabbitmq-plugins enable rabbitmq_management

提示:生产环境推荐使用 RabbitMQ,开发环境可用 Redis 快速测试。


2. 最小化示例:编写第一个异步任务

2.1 创建 Celery 应用

新建文件 tasks.py,编写以下代码:

from celery import Celery

# 创建 Celery 实例,指定 Broker 和 Backend
app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',  # 使用 Redis 作为 Broker
    backend='redis://localhost:6379/0'   # 使用 Redis 存储结果
)

# 定义一个异步任务
@app.task
def add(x, y):
    return x + y

2.2 调用异步任务

在同一目录下打开 Python Shell,测试任务:

from tasks import add

# 异步调用任务(立即返回 AsyncResult 对象)
result = add.delay(4, 6)

# 查看任务是否完成(非阻塞)
print(result.ready())  # 输出 False(任务未完成)

# 获取任务结果(阻塞等待)
print(result.get(timeout=10))  # 输出 10

3. 命令行工具:启动 Worker 与查看状态

3.1 启动 Worker 节点

在项目目录下执行以下命令启动 Worker:

celery -A tasks worker --loglevel=info
  • -A tasks:指定 Celery 应用模块(tasks.py 中的 app)。
  • --loglevel=info:设置日志级别为详细信息。

输出示例

[config]
.> app:         tasks:0x7f8d1c3b3d90
.> transport:   redis://localhost:6379/0
.> results:     redis://localhost:6379/0
.> concurrency: 4 (prefork)
[queues]
.> celery           exchange=celery(direct) key=celery

3.2 查看 Worker 状态

使用 celery inspect 命令检查 Worker 是否存活:

celery -A tasks inspect ping

正常响应示例:

-> celery@ubuntu: OK
    pong

3.3 监控任务结果(可选)

如果配置了 backend(如 Redis),可通过以下命令查看任务结果:

# 进入 Redis CLI
redis-cli

# 查看结果(假设任务 ID 为 7a9d8c3a-...)
127.0.0.1:6379> GET celery-task-meta-7a9d8c3a...
"{\"status\": \"SUCCESS\", \"result\": 10, ...}"

4. 常见问题与解决

4.1 Broker 连接失败

  • 现象:Worker 启动时报错 ConnectionRefusedError
  • 解决
    1. 确认 Broker 服务已启动(如 systemctl status redis)。
    2. 检查防火墙是否开放 Broker 端口(默认 Redis: 6379,RabbitMQ: 5672)。
    3. 验证连接 URL 格式(如 redis://:password@host:port/db)。

4.2 任务未执行

  • 现象:任务长时间处于 PENDING 状态。
  • 解决
    1. 确认 Worker 已启动且未卡死。
    2. 检查任务是否被正确路由到队列(默认队列为 celery)。

5. 环境验证

通过浏览器访问 RabbitMQ 管理界面(默认地址 http://localhost:15672,用户名/密码为 guest/guest)或 Redis CLI,确认消息正常流转。


网站公告

今日签到

点亮在社区的每一天
去签到