Celery + redis 异步分布式任务队列安装测试

发布于:2024-05-09 ⋅ 阅读:(32) ⋅ 点赞:(0)

Celery 异步分布式任务队列

Celery 5.4.0 官方文档

环境:3台 centos7.9 普通用户

redis Scheduler worker
dp95 1
dp96 1 1 1
dp97 1

1、Celery 介绍

Celery是一个简单、灵活、可靠的分布式系统,基于python开发,可以处理大量的消息,同时提供维护这样一个系统所需的工具。

它是一个专注于实时处理的任务队列,同时也支持任务调度

使用场景

  • 定时任务:定时爬虫、算法模型定时输出
  • 异步任务:I/O密集型任务,消息推送、邮件发送、ai客服
  • 分布式调度:airflow + celery 大数据ETL调度

优点:使用后再说

架构:AMQP(Advanced Message Queuing Protocol)高级消息队列协议

2、安装部署

2.1 安装消息中间件(broker)

Celery 需要一个中间件来进行接收和发送消息,通常以独立的服务形式出现,成为 消息中间人(Broker)。官方推荐RabbitMQ和Redis。

RabbitMQ:

  • 优点:支持AMQP(高级消息队列协议),可靠性高,支持消息持久化,有丰富的功能特性(如消息确认、重试、超时、死信队列等)。
  • 缺点:学习曲线较陡峭,配置复杂,性能可能较低。

Redis:

  • 优点:配置简单,性能高,可以用作消息队列用于简单的场景。
  • 缺点:不支持AMQP,不适合重载的消息队列处理,不支持消息的持久化和异步确认。

本次测试安装 redis http://download.redis.io/releases/

# redis 安装
# 解压
[dp96]$ tar -zxvf redis-7.2.4.tar.gz
[dp96]$ cd redis-7.2.4
# 编译
[dp96]$ make
# 安装
[dp96]$ make PREFIX= ~/redis install

# 修改配置文件
[dp96]$ vim ./redis.conf
'''
bind * -::*           # 绑定主机地址
protected-mode no     # 保护模式设置为 no,允许外网连接
port 6379             # 监听端口
timeout 0             # 当客户端闲置多长时间后关闭连接,如果指定为 0,表示关闭该功能
daemonize yes         # yes表示启用守护进程,默认是no即不以守护进程方式运行
loglevel notice       # 日志级别
logfile ./redis-server.log  # 指定 Redis 服务器的日志文件路径
dir ./                      # Redis 服务器的工作目录,即数据库文件的存放路径
pidfile /tmp/redis_6379.pid # 进程文件
'''

# 启动redis
[dp96]$ cd ~/redis/bin
[dp96]$ ./redis-server ~/opt/redis/redis-7.2.4/redis.conf

# 连接验证
[dp96]$ ./redis-cli
127.0.0.1:6379> CONFIG GET *

在这里插入图片描述

# redis 常用操作
# Redis支持五种数据类型:string(字符串),hash(哈希),list(列表),set(集合)及zset(sorted set:有序集合)

# 切换数据库0-15
127.0.0.1:6379> select 1 
127.0.0.1:6379> scan 0 /keys *       # 查看0号数据库所有的key

# 字符串类型
127.0.0.1:6379> set a 'test' 
127.0.0.1:6379> get a           # "test"
127.0.0.1:6379> del a

# 哈希类型,适合存储对象
127.0.0.1:6379> hmset a field1 'Hello' field2 'world' 
127.0.0.1:6379> hget a            # (error) ERR wrong number of arguments for 'hget' command        
127.0.0.1:6379> hget a field2     # "world"
127.0.0.1:6379> del a             # 重复key 会报错

# 列表
127.0.0.1:6379> lpush a redis
127.0.0.1:6379> lpush a rabbitmq
127.0.0.1:6379> lpush a 1
127.0.0.1:6379> lrange a 0 10
'''
1) "1"
2) "rabbitmq"
3) "redis"
'''

# 集合
127.0.0.1:6379> sadd a 1
127.0.0.1:6379> sadd a 1
127.0.0.1:6379> sadd a 2
127.0.0.1:6379> smembers a
'''
1) "1"
2) "2"
'''

# 有序集合 zadd key score member 
127.0.0.1:6379> zadd a 1 张三
127.0.0.1:6379> zadd a 1 李四
127.0.0.1:6379> zadd a 2 王五
127.0.0.1:6379> zrangebyscore a 0 1   # 选去0-1分的集合元素

'''
1) "\xe5\xbc\xa0\xe4\xb8\x89"
2) "\xe5\xe6\x9d\x8e\xe5\x9b\x9b"
'''

2.2 安装Celery

# 安装celery
(airflow)[dp96]$ pip install celery -i https://pypi.tuna.tsinghua.edu.cn/simple

(airflow)[dp95]$ pip install celery
(airflow)[dp97]$ pip install celery

# 安装 redis Celery 远程连接redis服务时使用
(airflow)[dp96]$ pip install redis
(airflow)[dp95]$ pip install redis
(airflow)[dp97]$ pip install redis

3、功能测试

3.1 创建任务

3.1.1 在dp96 机器上创建应用文件 tasks.py
# 创建 tasks.py 
from celery import Celery
import time

app = Celery('tasks', broker='redis://10.18.18.96:6379/0',backend='redis://10.18.18.96:6379/1') # broker 任务队列;backend 结果存储数据库

@app.task
def time_sleep(n):
    time.sleep(n) 
    return f'延时{n}s函数'
3.1.2 分发应用到dp95、dp97 机器相同位置
(airflow)[dp96 ~/celery]$ scp -r tasks.py dp95:~/celery
(airflow)[dp96 ~/celery]$ scp -r tasks.py dp97:~/celery

3.2 启动worker服务

3.2.1 在dp95、dp96、dp97三台机器分别启动celery worker服务
# 以下是Celery命令入口点的选项解释:
-A, --app APPLICATION: 指定Celery应用的模块名或路径。
-b, --broker TEXT: 指定消息代理的地址,例如RabbitMQ或Redis。
--result-backend TEXT: 指定任务结果的后端存储地址。
--loader TEXT: 指定Celery加载器的类型。
--config TEXT: 指定配置文件的路径。
--workdir PATH: 指定Celery工作目录。
-C, --no-color: 禁用彩色输出。
-q, --quiet: 静默模式,减少输出。
--version: 显示Celery版本号。
--skip-checks: 跳过配置检查。
# tasks 是我们任务所在的文件名,workdir tasks.py任务文件所在目录 worker 表示启动的是 worker 程序
(airflow)[dp96 ~/celery]$ ..envs/airflow/bin/celery -A tasks --workdir ~/celery/  worker  --loglevel=info

(airflow)[dp95 ~/celery]$ ..envs/airflow/bin/celery -A tasks --workdir ~/celery/  worker  --loglevel=info
(airflow)[dp97 ~/celery]$ ..envs/airflow/bin/celery -A tasks --workdir ~/celery/  worker  --loglevel=info

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.2.2 查看redis服务,borker 已存在队列,backend 无任何结果

在这里插入图片描述
在这里插入图片描述

3.3 执行分布式异步任务

3.3.1 在dp96另一个shell界面,在task.py文件同目录下,创建test.py文件发布任务

from tasks import time_sleep
import time

def test():
    start = time.time()
    res = []
    for i in range(10):
        res_ = time_sleep.delay(5)  # 发布任务到broker (redis),正常同步任务执行时间10*5=50s
    stop = time.time()
    print(f'运行时间:{stop-start}')

在这里插入图片描述

3.4 查看异步任务结果

dp95:
在这里插入图片描述

dp96:

在这里插入图片描述

dp97:

在这里插入图片描述

如图所示,3台机器分别从队列中取出了3、4、3个睡眠5秒的任务,并在各自机器上并行(异步)运行,3台机器累计运行任务共花费5s

redis 后台结果数据库:

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到