Celery在Django中的作用

发布于:2025-03-13 ⋅ 阅读:(13) ⋅ 点赞:(0)

在 Django 中使用 Celery 主要是为了处理耗时或需要异步执行的任务,避免阻塞 Web 请求的响应,提升用户体验和系统性能。 

安装很简单,打开终端

pip install celery

先来了解一下

async tasks——异步任务

  • 用途:将耗时的操作从 HTTP 请求-响应周期中剥离,放到后台异步执行。

  • 典型场景

    • 发送邮件或短信验证码(避免用户等待网络延迟)。

    • 处理图片/视频(如缩略图生成、格式转换)。

    • 调用第三方 API(如支付接口、地图服务)。

    • 大数据处理或复杂计算(如生成报表、数据分析)。

异步实现发送验证码流程

流程就是,把一些方法“摘”出来,放在tasks.py文件下,在视图函数中调用这个方法,这里举例发送邮箱验证码。
先看下目录结构,一目了然,有人问tasks.py放app里还是哪?tasks.py 文件的最佳实践是 直接放在相关功能的应用目录下(例如这里验证码流程在 Login 应用,tasks就放在Login下)

book_demo/
├── book_demo/              # 项目根目录
│   ├── __init__.py
│   ├── settings.py         # Django 配置
│   ├── urls.py
│   └── celery.py           # Celery 初始化配置(关键!)
│
└── Login/                  # 你的登录/注册功能应用
    ├── __init__.py
    ├── tasks.py            # Celery 任务定义(核心文件)
    ├── views.py            # 处理验证码的视图函数
    ├── models.py
    └── ...

0.环境准备

不用担心,总共需要安装Redis\Celery两个就行了

先安装一些配置才能启动Celery:也就是安装Celery 架构的核心组成部分。

常看到一个描述是:使用Redis作为Broker  ?????一头雾水,下面用人话讲讲,了解一下在开始安装:

1. 什么是 Broker?
  • Broker(消息代理):可以理解为一个「任务中转站」,负责在 Celery 的各个组件之间传递消息。

  • 类比:想象你去快递站寄包裹,Broker 就像快递公司的「分拣中心」——它负责接收你要寄的包裹(任务),并分发给快递员(Worker)去处理。

  • 核心作用:当你在代码中调用 .delay() 提交异步任务时,这个任务会被发送到 Broker,再由 Broker 转发给空闲的 Worker 执行。


2. 什么是 Redis?
  • Redis:是一个开源的内存数据库,支持高性能的键值存储,常用于缓存、消息队列等场景。

  • 在 Celery 中的角色:Redis 可以作为 Broker 的具体实现之一(即用 Redis 来充当任务中转站)。

  • 其他常见 Broker

    • RabbitMQ:专为消息队列设计的服务,稳定性更高(企业级首选)。

    • Amazon SQS:云服务商提供的队列服务。


3. Celery 的完整工作流程
  1. 任务提交:你在 Django 代码中调用 task.delay(),提交任务到 Broker。

  2. 任务存储:Broker(如 Redis)将任务存入队列。

  3. 任务分发:Celery Worker 进程从 Broker 拉取任务。

  4. 任务执行:Worker 执行任务(如发送验证码)。

  5. 结果存储(可选):任务结果可存入 Result Backend(如另一个 Redis 数据库)。

[Django] --> [Broker(Redis)] --> [Celery Worker] --> [执行任务]

4. 为什么需要 Broker?
  • 解耦生产者和消费者:Django(生产者)无需关心任务如何执行,Worker(消费者)也无需知道任务是谁提交的。

  • 缓冲任务:当任务瞬间激增时,Broker 作为缓冲区,避免任务丢失。

  • 支持分布式:多个 Worker 可以同时从 Broker 拉取任务,实现负载均衡。

了解完毕,我这里使用的是Redis充当Broker

安装Redis

1.下载

2. 运行安装程序

  • 双击 .msi 文件启动安装向导。

  • 选择安装路径

    • 在安装界面中,点击 "Browse...",选择 D:\Redis(或其他自定义路径)。

  • 勾选 "Add Redis installation folder to PATH"(将 Redis 添加到环境变量)。

  • 完成安装。

3.验证安装

  • 打开 cmd 或 PowerShell

# 检查 Redis 是否安装成功
redis-server --version
# 输出示例:Redis server v=5.0.14.1

# 启动 Redis 服务端
redis-server

 出现下面的图案代表启动成功:

这里如果报错出现下图,不用担心
 

可参考另一篇博客~


安装完Celery和Redis之后,可开始下面的: 

1.配置 Celery

book_deml/settings.py下添加:

CACHES = {

    'default': {
        'BACKEND': 'django.core.cache.backends.redis.RedisCache',
        'LOCATION': 'redis://127.0.0.1:6379/1',  # Redis 地址和数据库编号
    }
}

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'

确保项目根目录的 celery.py 和 __init__.py 已正确配置:

# book_demo/book_demo/celery.py
import os
from celery import Celery

# 设置 Django 环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'book_demo.settings')

app = Celery('book_demo')

# 从 Django 配置中读取 Celery 设置
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动发现所有应用中的 tasks.py
app.autodiscover_tasks()
# book_demo/book_demo/__init__.py
from .celery import app as celery_app

__all__ = ['celery_app']

2.运行 Celery Worker

启动 Celery Worker 处理异步任务,说人话就是打开终端输入命令启动celery:

celery -A book_demo worker --loglevel=info -P eventlet

 这里关于这个命令另一篇处理启动Celery的博客中有详细介绍。eventlet记得也先pip install 安装

出现下图则代表安装成功: 

测试缓存功能

在 Django Shell 中手动测试缓存功能:

1.输入:
python manage.py shell
2.输入:
from django.core.cache import cache

# 设置缓存
cache.set('test_key', 'test_value', timeout=300)
# 获取缓存
value = cache.get('test_key')
print(value)  # 输出:test_value

成功就长这样: 

 


 3.创建 tasks.py

在 Login/tasks.py 中定义发送验证码的异步任务:

# book_demo/Login/tasks.py
from celery import shared_task
from django.core.mail import send_mail
from django.conf import settings
import random

@shared_task
def send_email_captcha_task(email):
    # 生成 6 位随机验证码
    captcha = ''.join(random.choices('0123456789', k=6))

    # 存储到缓存(有效期 5 分钟)这里我用的Redis
    cache_key = f'verification_code_{email}'
    cache.set(cache_key, captcha, timeout=300)
    stored_captcha = cache.get(cache_key)  # 从缓存中获取验证码

    # 打印缓存键和验证码
    logger.info(f"缓存键:{cache_key}, 缓存值:{stored_captcha}")

    try:
        send_mail(
            subject='您的验证码',
            message=f'您的验证码是:{captcha},有效期 5 分钟。',
            from_email=None,  # 使用配置的默认发件人
            recipient_list=[email],
            fail_silently=False,
        )
        return True  # 可选:返回任务执行状态
    except Exception as e:
        # 记录错误日志
        logger.error(f'邮件发送失败: {str(e)}')
        return False  # 可选:返回任务执行状态

这里需要注意几点:

  • 在 tasks.py 文件中不推荐直接使用 print,而是使用日志记录器(logger)来记录信息。因为celery本质就是分布式任务队列,任务可能在不同的进程、机器甚至服务器上运行。直接使用 print 输出的内容很难被集中管理和监控。还因为print 输出的内容通常不会被持久化到日志文件中,而是直接显示在终端上。一旦任务执行完成,这些信息就会丢失,不利于后续的排查和分析。
  • 如果logger.info无法正常打印,看一下是不是启动celery时没有添加:--loglevel=info

4.在视图函数中调用异步任务

修改 book_demo/Login/views.py,触发异步任务:

# book_demo/Login/views.py
from django.shortcuts import render, HttpResponse
from .tasks import send_verification_code_task  # 从当前应用导入任务
from rest_framework.views import APIView

class send_email_captcha(APIView):
    def post(self, request):
        email = request.POST.get('email')
        
        ##这里一般校验邮箱格式、是否为空、是否已经注册等

        try:
            # 异步触发发送验证码任务
            send_email_captcha_task.delay(email)  # 关键:使用 .delay()
            # 使用.delay()确保任务在CeleryWorker中执行,避免阻塞请求。
            return JsonResponse(
                {'success': True, 'message': '验证码已发至您的邮箱'},
                status=status.HTTP_200_OK
            )
        except Exception as e:
            return JsonResponse(
                {'success': False, 'message': f'服务器错误:{str(e)}'},
                status=status.HTTP_500_INTERNAL_SERVER_ERROR
            )

从缓存中获取验证码是否正确:

class verify_email_captcha(APIView):
    def post(self, request):
        try:
            email = request.data.get('email')
            user_input_captcha = request.data.get('emailCaptcha')
            cache_key = f'verification_code_{email}'
            stored_captcha = cache.get(cache_key)  # 从缓存中获取验证码

            # 打印缓存键和验证码
            print(f"缓存键:{cache_key}, 缓存值:{stored_captcha}")

            if not stored_captcha:
                return Response({'success': False, 'message': '验证码已过期或无效'}, status=status.HTTP_400_BAD_REQUEST)

            # 比较用户输入的验证码和存储的验证码
            if user_input_captcha == stored_captcha:
                return Response({'success': True, 'message': '验证码验证成功'}, status=status.HTTP_200_OK)
            else:
                return Response({'success': False, 'message': '验证码错误'}, status=status.HTTP_400_BAD_REQUEST)
        except Exception as e:
            return Response({'success': False, 'message': f'服务器错误:{str(e)}'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)

总结 

 好了到这正常异步处理发验证码的流程就搞好了 ~

回顾一下Celery 的核心作用:

  • 异步执行:将耗时的操作(如发邮件、短信、复杂计算)从 Django 的 HTTP 请求-响应周期中剥离,交给 Celery Worker 在后台执行。

  • 解耦:Django 负责接收请求和快速返回响应,Celery 负责处理耗时任务,二者通过 消息队列(如 Redis) 通信。

Redis 的双重角色:

  1. 消息队列(Broker)

    • Celery 使用 Redis 作为任务传递的中转站(Django 提交任务到 Redis,Celery Worker 从 Redis 拉取任务)。

    • 这是通过 CELERY_BROKER_URL 配置的。

  2. 缓存(Cache)

    • 你手动将验证码存储到 Redis(通过 cache.set()),用于后续验证。

    • 这是通过 CACHES 配置的(Django 缓存框架)。

人话就是:
Celery就是单独把发验证码的这个方法摘出来,异步执行(就是把方法写在task.py下),结果呢(比如验证码)就存在Redis里,在用到的时候就再从Redis中获取。