celery异步与定时任务

发布于:2022-12-17 ⋅ 阅读:(1308) ⋅ 点赞:(1)

celery简介

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
在这里插入图片描述

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

  • 消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

  • 任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

  • 任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

celery安装

pip install celery
pip install redis

快速上手

项目文件结构

![image.png](https://img-blog.csdnimg.cn/img_convert/9c74bcb530e2fa279c30122d6e72cf14.png#clientId=ud0b9e244-ff1a-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=172&id=u5d2c0338&margin=[object Object]&name=image.png&originHeight=172&originWidth=280&originalType=binary&ratio=1&rotation=0&showTitle=false&size=12242&status=done&style=none&taskId=u1a0c6d3c-1137-406b-8bce-6a79f350345&title=&width=280)

定义workers(消费者)

  • celery_task.py
import celery
import time
# 异步执行结果存储
backend = 'redis://:CuiLiang@0302@127.0.0.1:6379/1'
# 消息中间件
broker = 'redis://:CuiLiang@0302@127.0.0.1:6379/2'
cel = celery.Celery('test', backend=backend, broker=broker)


@cel.task
def send_email(name):
    print("向%s发送邮件..." % name)
    time.sleep(5)
    print("向%s发送邮件完成" % name)
    return "ok"


@cel.task
def send_sms(name):
    print("向%s发送短信..." % name)
    time.sleep(10)
    print("向%s发送短信完成" % name)
    return "ok"

启动celery

celery -A celery_task worker -l info
-------------- celery@cuiliangdeAir v5.2.3 (dawn-chorus)
--- ***** ----- 
-- ******* ---- macOS-12.2.1-x86_64-i386-64bit 2022-03-13 12:41:53
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         test:0x10bb43730
- ** ---------- .> transport:   redis://:**@127.0.0.1:6379/2
- ** ---------- .> results:     redis://:**@127.0.0.1:6379/1
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
-------------- [queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
. celery_task.send_email
. celery_task.send_sms

[2022-03-13 12:41:53,699: INFO/MainProcess] Connected to redis://:**@127.0.0.1:6379/2
[2022-03-13 12:41:53,710: INFO/MainProcess] mingle: searching for neighbors
[2022-03-13 12:41:54,752: INFO/MainProcess] mingle: all alone
[2022-03-13 12:41:54,810: INFO/MainProcess] celery@cuiliangdeAir ready.

定义broker(生产者)

from celery_task import send_email, send_sms

result = send_email.delay("one")
print(result.id)
result2 = send_sms.delay("two")
print(result2.id)
  • 直接运行broker后,在celery中可看到日志信息
[2022-03-13 12:42:03,125: INFO/MainProcess] Task celery_task.send_email[bfdfce02-79d3-4fa7-92b7-36e02a6a824a] received
[2022-03-13 12:42:03,130: WARNING/ForkPoolWorker-2] 向one发送邮件...
[2022-03-13 12:42:03,139: INFO/MainProcess] Task celery_task.send_sms[26862066-88a3-4721-b50e-00da5e5690d0] received
[2022-03-13 12:42:03,142: WARNING/ForkPoolWorker-4] 向two发送短信...
[2022-03-13 12:42:08,137: WARNING/ForkPoolWorker-2] 向one发送邮件完成
[2022-03-13 12:42:08,182: INFO/ForkPoolWorker-2] Task celery_task.send_email[bfdfce02-79d3-4fa7-92b7-36e02a6a824a] succeeded in 5.052875665000101s: 'ok'
[2022-03-13 12:42:13,145: WARNING/ForkPoolWorker-4] 向two发送短信完成
[2022-03-13 12:42:13,175: INFO/ForkPoolWorker-4] Task celery_task.send_sms[26862066-88a3-4721-b50e-00da5e5690d0] succeeded in 10.03281639099987s: 'ok'

定义result(获取任务结果)

from celery.result import AsyncResult
from celery_task import cel


def get_result(task):
    async_result = AsyncResult(id=task, app=cel)
    if async_result.successful():
        result = async_result.get()
        print(result)
        # result.forget() # 将结果删除
    elif async_result.failed():
        print('执行失败')
    elif async_result.status == 'PENDING':
        print('任务等待中被执行')
    elif async_result.status == 'RETRY':
        print('任务异常后正在重试')
    elif async_result.status == 'STARTED':
        print('任务已经开始被执行')
        
        
        if __name__ == '__main__':
            task_id = 'bfdfce02-79d3-4fa7-92b7-36e02a6a824a'
            get_result(task_id)

多任务结构

项目文件结构

在这里插入图片描述

定义workers

  • celery_tasks/celery.py(celery初始化)
from celery import Celery
cel = Celery('celery_demo',
             # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
             include=['celery_tasks.task01',
                      'celery_tasks.task02'
                      ])

# 通过celery实例加载配置模块
cel.config_from_object('celery_tasks.celery_config')
  • celery_tasks/celery_config.py(celery配置)
# 官方配置文档:查询每个配置项的含义。
# http://docs.celeryproject.org/en/latest/userguide/configuration.html
# broker(消息中间件来接收和发送任务消息)
broker_url = 'redis://:CuiLiang@0302@127.0.0.1:6379/2'
# backend(存储worker执行的结果)
result_backend = 'redis://:CuiLiang@0302@127.0.0.1:6379/1'
# 设置时间参照,不设置默认使用的UTC时间
timezone = 'Asia/Shanghai'
# 是否使用UTC
enable_utc = False
  • celery_tasks/task01.py
import time
from celery_tasks.celery import cel


@cel.task
def send_email(name):
    print("向%s发送邮件..." % name)
    time.sleep(5)
    print("向%s发送邮件完成" % name)
    return "邮件发送成功"

  • celery_tasks/task02.py
import time
from celery_tasks.celery import cel


@cel.task
def send_sms(name):
    print("向%s发送短信..." % name)
    time.sleep(10)
    print("向%s发送短信完成" % name)
    return "短信发送成功"

启动celery

 celery -A celery_task worker -l info

定义broker

from celery_tasks.task01 import send_email
from celery_tasks.task02 import send_sms

result = send_email.delay("张三")
print(result.id)
result2 = send_sms.delay("李四")
print(result2.id)

定义result

from celery.result import AsyncResult
from celery_tasks.celery import cel


def get_result(task):
    async_result = AsyncResult(id=task, app=cel)
    if async_result.successful():
        result = async_result.get()
        print(result)
        # result.forget() # 将结果删除
    elif async_result.failed():
        print('执行失败')
    elif async_result.status == 'PENDING':
        print('任务等待中被执行')
    elif async_result.status == 'RETRY':
        print('任务异常后正在重试')
    elif async_result.status == 'STARTED':
        print('任务已经开始被执行')
        
        
        if __name__ == '__main__':
            task_id = 'f008d807-7fd6-4094-b2e2-97c2d154dc1d'
            get_result(task_id)

定时任务

broker方式调用

  • produce_task.py
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from celery_tasks.task01 import send_email
from celery_tasks.task02 import send_sms

# 指定时间运行
run_time1 = datetime(2022, 3, 13, 14, 7, 00, tzinfo=ZoneInfo("Asia/Shanghai"))
print(run_time1)
result1 = send_email.apply_async(args=["Alex", ], eta=run_time1)
print(result1.id)
# 10秒钟后运行
run_time2 = datetime.utcnow() + timedelta(seconds=10)
print(run_time2)
result2 = send_sms.apply_async(args=["Alisa", ], eta=run_time2)
print(result2.id)
  • 查看celery执行结果
[2022-03-13 14:06:35,931: INFO/MainProcess] Task celery_tasks.task01.send_email[506d6747-4b7b-49cd-8e1d-56ae05a31a3b] received
[2022-03-13 14:06:35,933: WARNING/ForkPoolWorker-2] 向Alex发送邮件...
[2022-03-13 14:06:35,939: INFO/MainProcess] Task celery_tasks.task02.send_sms[73ad47fd-ef1b-45a7-9ef9-6ea121ea9254] received
[2022-03-13 14:06:40,936: WARNING/ForkPoolWorker-2] 向Alex发送邮件完成
[2022-03-13 14:06:40,946: INFO/ForkPoolWorker-2] Task celery_tasks.task01.send_email[506d6747-4b7b-49cd-8e1d-56ae05a31a3b] succeeded in 5.0126269580014196s: '邮件发送成功'
[2022-03-13 14:06:46,292: WARNING/ForkPoolWorker-2] 向Alisa发送短信...
[2022-03-13 14:06:56,295: WARNING/ForkPoolWorker-2] 向Alisa发送短信完成
[2022-03-13 14:06:56,303: INFO/ForkPoolWorker-2] Task celery_tasks.task02.send_sms[73ad47fd-ef1b-45a7-9ef9-6ea121ea9254] succeeded in 10.01082549000057s: '短信发送成功'

workers方式调用

  • celery_tasks/celery_config.py(新增定时任务配置)
from datetime import timedelta
from celery.schedules import crontab

# 官方配置文档:查询每个配置项的含义。
# http://docs.celeryproject.org/en/latest/userguide/configuration.html
# broker(消息中间件来接收和发送任务消息)
broker_url = 'redis://:CuiLiang@0302@127.0.0.1:6379/2'
# backend(存储worker执行的结果)
result_backend = 'redis://:CuiLiang@0302@127.0.0.1:6379/1'
# 设置时间参照,不设置默认使用的UTC时间
timezone = 'Asia/Shanghai'
# 是否使用UTC
enable_utc = False
# 设置定时任务
beat_schedule = {
    'task1': {
        'task': 'celery_tasks.task01.send_email',
        # 'schedule': crontab(minute="*/1"), # 每分钟发送一次
        'schedule': timedelta(seconds=10),  # 每10秒发送一次任务消息
        'args': ('张三',)
    },
    'task2': {
        'task': 'celery_tasks.task02.send_sms',
        'schedule': crontab(hour=20, minute=46),  # 在每天的晚上10点24分发送一次任务消息
        'args': ('李四',)
    }
}

  • 启动celery workers消费任务
 celery -A celery_task worker -l info
  • 启动Celery Beat进程,读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列
 celery -A celery_tasks beat
 
  • 观察celery控制台打印
2022-03-13 20:45:45,497: INFO/MainProcess] Task celery_tasks.task01.send_email[54ce4bf2-f746-4c46-83b7-7842e105f449] received
[2022-03-13 20:45:45,498: WARNING/ForkPoolWorker-2] 向张三发送邮件...
[2022-03-13 20:45:50,501: WARNING/ForkPoolWorker-2] 向张三发送邮件完成
[2022-03-13 20:45:50,513: INFO/ForkPoolWorker-2] Task celery_tasks.task01.send_email[54ce4bf2-f746-4c46-83b7-7842e105f449] succeeded in 5.014935314000468s: '邮件发送成功'
[2022-03-13 20:45:55,491: INFO/MainProcess] Task celery_tasks.task01.send_email[79800192-fca7-4391-9d66-1eef4a265a17] received
[2022-03-13 20:45:55,493: WARNING/ForkPoolWorker-2] 向张三发送邮件...
[2022-03-13 20:46:00,009: INFO/MainProcess] Task celery_tasks.task02.send_sms[cb5c22b5-cd17-42d1-b35d-cdf1ffef2054] received
[2022-03-13 20:46:00,012: WARNING/ForkPoolWorker-4] 向李四发送短信...
[2022-03-13 20:46:00,494: WARNING/ForkPoolWorker-2] 向张三发送邮件完成
[2022-03-13 20:46:00,503: INFO/ForkPoolWorker-2] Task celery_tasks.task01.send_email[79800192-fca7-4391-9d66-1eef4a265a17] succeeded in 5.010498132000066s: '邮件发送成功'
[2022-03-13 20:46:05,492: INFO/MainProcess] Task celery_tasks.task01.send_email[f018aebb-32ca-4f47-9a32-81bc24af8b17] received
[2022-03-13 20:46:05,494: WARNING/ForkPoolWorker-2] 向张三发送邮件...
[2022-03-13 20:46:10,016: WARNING/ForkPoolWorker-4] 向李四发送短信完成
[2022-03-13 20:46:10,043: INFO/ForkPoolWorker-4] Task celery_tasks.task02.send_sms[cb5c22b5-cd17-42d1-b35d-cdf1ffef2054] succeeded in 10.031732094998006s: '短信发送成功'
[2022-03-13 20:46:10,496: WARNING/ForkPoolWorker-2] 向张三发送邮件完成
[2022-03-13 20:46:10,508: INFO/ForkPoolWorker-2] Task celery_tasks.task01.send_email[f018aebb-32ca-4f47-9a32-81bc24af8b17] succeeded in 5.0141525609979s: '邮件发送成功'
[2022-03-13 20:46:15,496: INFO/MainProcess] Task celery_tasks.task01.send_email[5748ef00-ccfb-4d15-9499-4c879667fe6d] received
[2022-03-13 20:46:15,498: WARNING/ForkPoolWorker-2] 向张三发送邮件...
^C
worker: Hitting Ctrl+C again will terminate all running tasks!

worker: Warm shutdown (MainProcess)
[2022-03-13 20:46:20,499: WARNING/ForkPoolWorker-2] 向张三发送邮件完成
[2022-03-13 20:46:20,511: INFO/ForkPoolWorker-2] Task celery_tasks.task01.send_email[5748ef00-ccfb-4d15-9499-4c879667fe6d] succeeded in 5.013111708001816s: '邮件发送成功'

需要注意的是先启动消费者,再启动生产者。否则当生产者启动后,会产生大量任务在队列中,启动消费者后会造成任务大量囤积消费。

DRF使用celery

项目文件结构

在这里插入图片描述

settings中添加配置

  • DrfTest/settings.py
# celery配置
# broker(消息中间件来接收和发送任务消息)
CELERY_BROKER_URL = 'redis://:CuiLiang@0302@127.0.0.1:6379/3'
# backend(存储worker执行的结果)
CELERY_RESULT_BACKEND = 'redis://:CuiLiang@0302@127.0.0.1:6379/4'
# 设置时间参照,不设置默认使用的UTC时间
CELERY_TIMEZONE = 'Asia/Shanghai'
# 是否使用UTC
CELERY_ENABLE_TUC = False

创建celery入口文件

  • DrfTest/celery.py
# 主程序
import os
from celery import Celery

# 把celery和django进行组合,识别和加载django的配置文件
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DrfTest.settings')
# 创建celery实例对象
app = Celery("mycelery")
# 通过app对象加载配置
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动扫描并加载任务(所有celery任务必须在app的tasks.py下)
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

# 启动Celery的命令
# 在项目根目录下启动
# celery -A DrfTest worker -l INFO
  • DrfTest/init.py(在模块中导入此应用程序。这确保了在Django启动时加载应用程序,以便装饰器(稍后提到)将使用它)
from .celery import app as celery_app

__all__ = ('celery_app',)

App中创建workers

  • public/tasks.py(celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!)
# celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!
from celery import shared_task
import time
import logging

log = logging.getLogger("django")


@shared_task  # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
def send_sms(mobile):
    """发送短信"""
    print("向%s发送短信.." % mobile)
    time.sleep(10)
    print("向%s发送短信完成" % mobile)
    return "send_email OK"


@shared_task
def send_email(email):
    print("向%s发送邮件..." % email)
    time.sleep(5)
    print("向%s发送邮件完成" % email)
    return "send_sms ok"

启动celery workers

celery -A mycelery.main worker --loglevel=info
 -------------- celery@cuiliangdeAir v5.2.3 (dawn-chorus)
--- ***** ----- 
-- ******* ---- macOS-12.2.1-x86_64-i386-64bit 2022-03-14 11:01:48
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         mycelery:0x103b30760
- ** ---------- .> transport:   redis://:**@127.0.0.1:6379/3
- ** ---------- .> results:     redis://:**@127.0.0.1:6379/4
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . DrfTest.celery.debug_task
  . public.tasks.send_email
  . public.tasks.send_sms

[2022-03-14 11:01:49,103: INFO/MainProcess] Connected to redis://:**@127.0.0.1:6379/3
[2022-03-14 11:01:49,118: INFO/MainProcess] mingle: searching for neighbors
[2022-03-14 11:01:50,162: INFO/MainProcess] mingle: all alone
[2022-03-14 11:01:50,251: INFO/MainProcess] celery@cuiliangdeAir ready.
  • 从控制台打印可知,celery已扫描到public下的异步任务,并自动注册了。

创建DRF broker

  • public/urls.py(路由)
from rest_framework import routers
from public import views
from django.urls import path

app_name = "public"
urlpatterns = [
    path('sendSms/', views.SendSmsAPIView.as_view()),
    # 异步发送短信接口
    path('sendEmail/', views.SendEmailAPIView.as_view()),
    # 定时发送邮件接口
    path('getTaskResult/<str:task_id>/', views.GetTaskResultAPIView.as_view()),
    # 获取异步任务执行结果
]
router = routers.DefaultRouter()
urlpatterns += router.urls
  • public/views.py(视图)
import time
from datetime import datetime
import zoneinfo
from celery.result import AsyncResult
from django.shortcuts import render
from rest_framework import viewsets, status
from rest_framework.response import Response
from rest_framework.views import APIView
from public.tasks import send_sms, send_email
from django.conf import settings

tz = zoneinfo.ZoneInfo(settings.TIME_ZONE)

def apiDoc(request):
    """
    API 接口文档
    """
    return render(request, 'doc.html')


class SendSmsAPIView(APIView):
    """
    celery异步发送短信
    """

    @staticmethod
    def post(request):
        phone = request.data.get('phone')
        print(phone)
        result = send_sms.delay(phone)
        return Response({'msg': 'success', 'task_id': result.id}, status=status.HTTP_200_OK)


class SendEmailAPIView(APIView):
    """
    celery定时发送邮件
    """

    @staticmethod
    def post(request):
        time_str = request.data.get('time')
        email = request.data.get('email')
        date_time = time.strptime(time_str, "%Y-%m-%d %H:%M:%S")
        run_time = datetime(date_time.tm_year, date_time.tm_mon, date_time.tm_mday, date_time.tm_hour, date_time.tm_min,
                            date_time.tm_sec).astimezone(tz)
        result = send_email.apply_async(args=[email, ], eta=run_time)
        return Response({'msg': 'success', 'task_id': result.id}, status=status.HTTP_200_OK)


class GetTaskResultAPIView(APIView):
    """
    celery获取任务执行结果
    """

    @staticmethod
    def get(request, task_id):
        async_result = AsyncResult(id=task_id)
        msg = ''
        if async_result.successful():
            result = async_result.get()
            print(result)
            msg = '任务执行成功!'
            # result.forget() # 将结果删除
        elif async_result.failed():
            msg = '执行失败'
        elif async_result.status == 'PENDING':
            msg = '任务等待被执行'
        elif async_result.status == 'RETRY':
            msg = '任务异常后正在重试'
        elif async_result.status == 'STARTED':
            msg = '任务已经开始被执行'
        return Response({'msg': msg}, status=status.HTTP_200_OK)

请求测试

  • 执行发送短信异步任务

在这里插入图片描述

  • 获取异步任务执行结果

在这里插入图片描述

  • 定时任务执行

在这里插入图片描述

  • 查询定时任务执行结果(未到设定时间)

在这里插入图片描述

  • 查询定时任务执行结果(已到设定时间)

在这里插入图片描述

  • celery控制台查看日志
[2022-03-14 12:14:51,076: INFO/MainProcess] Task public.tasks.send_sms[09d390f2-1447-4395-aca9-3f6a4ceb0fb3] received
[2022-03-14 12:14:51,077: WARNING/ForkPoolWorker-2] 向110发送短信..
[2022-03-14 12:15:01,079: WARNING/ForkPoolWorker-2] 向110发送短信完成
[2022-03-14 12:15:01,097: INFO/ForkPoolWorker-2] Task public.tasks.send_sms[09d390f2-1447-4395-aca9-3f6a4ceb0fb3] succeeded in 10.02033851700071s: 'send_email OK'
[2022-03-14 12:18:06,084: INFO/MainProcess] Task public.tasks.send_email[f33b2a22-2b3f-4409-ae3c-692ccea78bc7] received
[2022-03-14 12:20:00,203: WARNING/ForkPoolWorker-2] 向admin@qq.com发送邮件...
[2022-03-14 12:20:05,206: WARNING/ForkPoolWorker-2] 向admin@qq.com发送邮件完成
[2022-03-14 12:20:05,224: INFO/ForkPoolWorker-2] Task public.tasks.send_email[f33b2a22-2b3f-4409-ae3c-692ccea78bc7] succeeded in 5.025896257000568s: 'send_sms ok'

django-celery-results

安装与配置

  • 安装软件包
pip install django-celery-results
  • settings.py中注册app
INSTALLED_APPS = (
    ...,
    'django_celery_results',
)
# 使用django数据库作为worker执行结果存储
CELERY_RESULT_BACKEND = 'django-db'
  • 执行数据库迁移建表
python manage.py migrate

启动与验证

  • 启动celery worker
 celery -A DrfTest worker -l INFO
  • 查看admin异步任务记录

在这里插入图片描述

django-celery-results扩展

既然使用django-celery-results可以将所有异步任务执行结果存储起来,那么就可以根据这一特点做一写扩展功能开发。例如通过API接口获取当前所有异步任务执行结果

  • 源码分析

查看django-celery-results库的模型定义(venv/lib/python3.10/site-packages/django_celery_results/models.py)

class TaskResult(models.Model):
    """Task result/status."""

    task_id = models.CharField(
        max_length=getattr(
            settings,
            'DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH',
            255
        ),
        unique=True,
        verbose_name=_('Task ID'),
        help_text=_('Celery ID for the Task that was run'))
    periodic_task_name = models.CharField(
        null=True, max_length=255,
        verbose_name=_('Periodic Task Name'),
        help_text=_('Name of the Periodic Task which was run'))
    task_name = models.CharField(
        null=True, max_length=getattr(
            settings,
            'DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH',
            255
        ),
        verbose_name=_('Task Name'),
        help_text=_('Name of the Task which was run'))
    task_args = models.TextField(
        null=True,
        verbose_name=_('Task Positional Arguments'),
        help_text=_('JSON representation of the positional arguments '
                    'used with the task'))
    task_kwargs = models.TextField(
        null=True,
        verbose_name=_('Task Named Arguments'),
        help_text=_('JSON representation of the named arguments '
                    'used with the task'))
    status = models.CharField(
        max_length=50, default=states.PENDING,
        choices=TASK_STATE_CHOICES,
        verbose_name=_('Task State'),
        help_text=_('Current state of the task being run'))
    worker = models.CharField(
        max_length=100, default=None, null=True,
        verbose_name=_('Worker'), help_text=_('Worker that executes the task')
    )
    content_type = models.CharField(
        max_length=128,
        verbose_name=_('Result Content Type'),
        help_text=_('Content type of the result data'))
    content_encoding = models.CharField(
        max_length=64,
        verbose_name=_('Result Encoding'),
        help_text=_('The encoding used to save the task result data'))
    result = models.TextField(
        null=True, default=None, editable=False,
        verbose_name=_('Result Data'),
        help_text=_('The data returned by the task.  '
                    'Use content_encoding and content_type fields to read.'))
    date_created = models.DateTimeField(
        auto_now_add=True,
        verbose_name=_('Created DateTime'),
        help_text=_('Datetime field when the task result was created in UTC'))
    date_done = models.DateTimeField(
        auto_now=True,
        verbose_name=_('Completed DateTime'),
        help_text=_('Datetime field when the task was completed in UTC'))
    traceback = models.TextField(
        blank=True, null=True,
        verbose_name=_('Traceback'),
        help_text=_('Text of the traceback if the task generated one'))
    meta = models.TextField(
        null=True, default=None, editable=False,
        verbose_name=_('Task Meta Information'),
        help_text=_('JSON meta information about the task, '
                    'such as information on child tasks'))

    objects = managers.TaskResultManager()

    class Meta:
        """Table information."""

        ordering = ['-date_done']

        verbose_name = _('task result')
        verbose_name_plural = _('task results')

        # Explicit names to solve https://code.djangoproject.com/ticket/33483
        indexes = [
            models.Index(fields=['task_name'],
                         name='django_cele_task_na_08aec9_idx'),
            models.Index(fields=['status'],
                         name='django_cele_status_9b6201_idx'),
            models.Index(fields=['worker'],
                         name='django_cele_worker_d54dd8_idx'),
            models.Index(fields=['date_created'],
                         name='django_cele_date_cr_f04a50_idx'),
            models.Index(fields=['date_done'],
                         name='django_cele_date_do_f59aad_idx'),
        ]

    def as_dict(self):
        return {
            'task_id': self.task_id,
            'task_name': self.task_name,
            'task_args': self.task_args,
            'task_kwargs': self.task_kwargs,
            'status': self.status,
            'result': self.result,
            'date_done': self.date_done,
            'traceback': self.traceback,
            'meta': self.meta,
            'worker': self.worker
        }

    def __str__(self):
        return '<Task: {0.task_id} ({0.status})>'.format(self)

分析发现该模型字段存储了异步任务执行的所有结果详细信息,但是模型字段过多,我们只需要提取其中关键的几个字段信息即可

  • public/serializers.py(自定义序列化器,只提取几个关键字段信息)
from rest_framework import serializers
from django_celery_results.models import TaskResult


class CeleryTaskSerializer(serializers.ModelSerializer):
    class Meta:
        model = TaskResult
        fields = ['task_id', 'task_name', 'date_done', 'status', 'worker']

  • public/urls.py(定义路由,由于是对整个模型做只读操作,所以只读视图集即可)
from rest_framework import routers
from public import views
from django.urls import path
from public.views import MyTokenObtainPairView

app_name = "public"
urlpatterns = [
    path('getTaskResultAll/', views.GetTaskResultReadOnlyModelViewSet.as_view({'get': 'list'}))
    # 获取所有异步任务执行结果
]
router = routers.DefaultRouter()
urlpatterns += router.urls

  • public/views.py(定义视图,使用ReadOnlyModelViewSet)
from public.serializers import CeleryTaskSerializer
from django_celery_results.models import TaskResult


class GetTaskResultReadOnlyModelViewSet(viewsets.ReadOnlyModelViewSet):
    """
    celery获取全部任务执行结果
    """
    queryset = TaskResult.objects.all()
    serializer_class = CeleryTaskSerializer

  • 访问验证

在这里插入图片描述

django-celery-beat

安装与配置

  • 安装软件包
pip install django-celery-beat
  • 修改settings配置
INSTALLED_APPS = [
    'django_celery_beat',
]
# 使用django_celery_beat插件用来动态配置任务
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
  • 数据库迁移建表
python manage.py migrate

启动与验证

  • 启动 Celery worker 服务
celery -A DrfTest worker -l INFO
  • 启动beat 调度器
celery -A DrfTest beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
  • django admin生成的模型用途如下
clocked # 指定时间运行的计划任务
Crontabs  # 与linux crontab计划任务时间格式一致。
Intervals # 以特定间隔(例如,每5秒)运行的计划。
Periodic tasks # 当前所有计划任务模型列表
solar event # 按照日出日落设置定时任务
  • django admin创建crontab格式定时任务

在这里插入图片描述

在这里插入图片描述

  • 控制台查看日志
[2022-03-14 17:40:00,103: INFO/MainProcess] Task public.tasks.send_email[a2a98f8f-2fb9-4620-a660-a106e863f106] received
[2022-03-14 17:40:00,122: WARNING/ForkPoolWorker-2]123发送邮件...
[2022-03-14 17:40:05,127: WARNING/ForkPoolWorker-2]123发送邮件完成
[2022-03-14 17:40:05,223: INFO/ForkPoolWorker-2] Task public.tasks.send_email[a2a98f8f-2fb9-4620-a660-a106e863f106] succeeded in 5.107552270999804s: 'send_sms ok'
  • 创建Interval时间间隔类型定时任务

在这里插入图片描述

  • 控制台查看日志
[2022-03-14 21:19:06,010: INFO/MainProcess] Task public.tasks.send_sms[24975126-0a5a-4ed9-a499-fc1e706cbcee] received
[2022-03-14 21:19:06,034: WARNING/ForkPoolWorker-2] 向王麻子发送短信..
[2022-03-14 21:19:15,898: INFO/MainProcess] Task public.tasks.send_sms[c6346cec-c464-4e5d-ae07-db3ac83fea78] received
[2022-03-14 21:19:15,911: WARNING/ForkPoolWorker-4] 向王麻子发送短信..
[2022-03-14 21:19:16,040: WARNING/ForkPoolWorker-2] 向王麻子发送短信完成
[2022-03-14 21:19:16,093: INFO/ForkPoolWorker-2] Task public.tasks.send_sms[24975126-0a5a-4ed9-a499-fc1e706cbcee] succeeded in 10.067840424999304s: 'send_email OK'
[2022-03-14 21:19:25,898: INFO/MainProcess] Task public.tasks.send_sms[05943468-b7ca-4575-9000-9a2bd973770e] received
[2022-03-14 21:19:25,899: WARNING/ForkPoolWorker-2] 向王麻子发送短信..
[2022-03-14 21:19:25,917: WARNING/ForkPoolWorker-4] 向王麻子发送短信完成
  • django-celery-results查看执行记录

在这里插入图片描述

django-celery-beat扩展

django-celery-beat库的特点除了使用django admin可以创建修改定时任务外,还具备热加载的能力。当我们更新定时任务的配置后,无需重启celery workers,即可实现对定时任务的动态管理功能。
通过查看django-celery-beat的源码发现,我们只需要对PeriodicTask(定时任务列表)、IntervalSchedule(时间间隔列表)、CrontabSchedule(crontab定时任务)三个模型做管理,即可实现定时任务动态管理功能(模型文件路径venv/lib/python3.10/site-packages/django_celery_beat/models.py)
整个项目的开发思路是后端提供interval和crontab表达式的查询和新增接口,还有task定时任务的增删改查接口。当用户新增定时任务时,先填写定时任务表达式,获取到id后在前端显示,然后填写定时任务信息,创建一条定时任务记录。当用户需要暂定、修改定时任务时,直接调用put接口修改即可。删除定时任务也是一样的,调用delete接口即可完成操作。

基于crontab的定时任务

crontab定时任务的动态管理思路是,当新增定时任务时,先新增crontab表达式,获取到crontab表达式的id。然后再创建定时任务,使用crontab表达式id外键关联即可。对于CrontabSchedule,我们开发新增和查询接口使用ViewSet和APIView均可,此处为了方便演示,直接使用ViewSet。对于PeriodicTask,需要提供增删改查接口,我们使用ModelViewSet。

  • public/urls.py(路由,一条是新增crontab的路由,一条是新增task任务的路由)
from rest_framework import routers
from public import views
from django.urls import path

app_name = "public"
urlpatterns = [
    path('crontab/', views.CrontabAPIView.as_view()),
    # crontab表达式
]
router = routers.DefaultRouter()
router.register('task', views.TaskModelViewSet, 'userInfo')
# 定时任务
urlpatterns += router.urls
  • public/serializers.py(模型序列化器,主要是对PeriodicTask模型进行字段过滤)
class PeriodicTaskSerializer(serializers.ModelSerializer):
    """
    定时任务序列化器
    """
    last_run_at = serializers.DateTimeField(read_only=True)

    class Meta:
        model = PeriodicTask
        fields = ['id', 'name', 'task', 'args', 'last_run_at', 'enabled', 'crontab', 'interval']
  • public/views.py(视图,Crontab使用APIView,提供查询和新增接口。Task使用ModelViewSet,提供增删改查接口)
import time
from datetime import datetime
from zoneinfo import ZoneInfo
from celery.result import AsyncResult
from django.db import transaction
from django.shortcuts import render
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
from rest_framework import viewsets, status
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework_simplejwt.views import TokenObtainPairView
from public.models import UserDemo
from public.serializers import UserDemoSerializer, MyTokenObtainPairSerializer, CeleryTaskSerializer, \
    PeriodicTaskSerializer
from public.tasks import send_sms, send_email
from django_celery_results.models import TaskResult


def apiDoc(request):
    """
    API 接口文档
    """
    return render(request, 'doc.html')


class CrontabAPIView(APIView):
    """
    crontab表达式
    """

    @staticmethod
    def get(request):
        # 查询crontab表达式
        crontab_id = request.query_params.get('crontab_id')
        expression = CrontabSchedule.objects.get(id=crontab_id).__str__().split(' ')[:5]
        return Response({'id': crontab_id, 'expression': expression}, status=status.HTTP_200_OK)

    @staticmethod
    def post(request):
        # 新增crontab表达式
        expression = request.data.get('expression')
        expression.split(' ')
        schedule, _ = CrontabSchedule.objects.get_or_create(
            minute=expression.split(' ')[0],
            hour=expression.split(' ')[1],
            day_of_month=expression.split(' ')[2],
            month_of_year=expression.split(' ')[3],
            day_of_week=expression.split(' ')[4],
            timezone=ZoneInfo("Asia/Shanghai")
        )
        return Response({'crontab_id': schedule.id}, status=status.HTTP_200_OK)


class TaskModelViewSet(viewsets.ModelViewSet):
    """
    定时任务增删改查
    """
    queryset = PeriodicTask.objects.all()
    serializer_class = PeriodicTaskSerializer
  • 启动celery
celery -A DrfTest worker -l INFO
celery -A DrfTest beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
  • 新增crontab表达式测试(使用linux crontab格式,每天9点30分执行定时任务)

在这里插入图片描述

  • 根据id查询crontab表达式

在这里插入图片描述

  • 新增定时任务

在这里插入图片描述

  • admin查看新增的定时任务

在这里插入图片描述

  • 到了9点30分,我们查看控制台日志
[2022-03-15 09:30:00,071: INFO/MainProcess] Task public.tasks.send_sms[a4e35e52-d763-46b8-b3fe-b44501d6a63a] received
[2022-03-15 09:30:00,084: WARNING/ForkPoolWorker-2] 向adc发送短信..
[2022-03-15 09:30:10,088: WARNING/ForkPoolWorker-2] 向adc发送短信完成
[2022-03-15 09:30:10,125: INFO/ForkPoolWorker-2] Task public.tasks.send_sms[a4e35e52-d763-46b8-b3fe-b44501d6a63a] succeeded in 10.045384181999907s: 'send_email OK'
  • api接口查询任务详情

在这里插入图片描述

至此,通过api接口新增和查询任务接口演示验证完成,修改和删除接口由于篇幅有限,就不做演示了。

基于interval的定时任务

django-celery-beat除了支持linux crontab表达式创建定时任务外,还支持使用interval时间间隔表达式创建任务。与crontab定时任务的动态管理思路是一样的,当新增定时任务时,先新增interval表达式,获取到interval表达式的id。然后再创建定时任务,使用interval表达式id外键关联即可。对于IntervalSchedule
,我们开发新增和查询接口使用ViewSet和APIView均可,此处为了方便演示,直接使用ViewSet。对于PeriodicTask,继续使用上面的代码即可,无需更改。

  • public/urls.py(路由,一条是新增crontab的路由,一条是新增task任务的路由)
from rest_framework import routers
from public import views
from django.urls import path

app_name = "public"
urlpatterns = [
    path('crontab/', views.CrontabAPIView.as_view()),
    # crontab表达式
    path('interval/', views.IntervalAPIView.as_view())
    # interval表达式
]
router = routers.DefaultRouter()
router.register('task', views.TaskModelViewSet, 'userInfo')
# 定时任务
urlpatterns += router.urls
  • public/views.py(视图,Crontab使用APIView,提供查询和新增接口。Task保持不变)
import time
from datetime import datetime
from zoneinfo import ZoneInfo
from celery.result import AsyncResult
from django.db import transaction
from django.shortcuts import render
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
from rest_framework import viewsets, status
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework_simplejwt.views import TokenObtainPairView
from public.models import UserDemo
from public.serializers import UserDemoSerializer, MyTokenObtainPairSerializer, CeleryTaskSerializer, \
    PeriodicTaskSerializer
from public.tasks import send_sms, send_email
from django_celery_results.models import TaskResult


def apiDoc(request):
    """
    API 接口文档
    """
    return render(request, 'doc.html')

class IntervalAPIView(APIView):
    """
    Interval表达式
    """

    @staticmethod
    def get(request):
        # 查询Interval表达式
        interval_id = request.query_params.get('interval_id')
        expression = IntervalSchedule.objects.get(id=interval_id).__str__()
        return Response({'id': interval_id, 'expression': expression}, status=status.HTTP_200_OK)

    @staticmethod
    def post(request):
        # 新增Interval表达式
        every = request.data.get('every')
        kind = request.data.get('kind')
        period = None
        if kind == 'DAYS':  # 固定间隔天数
            period = IntervalSchedule.DAYS
        elif kind == 'HOURS':  # 固定间隔小时数
            period = IntervalSchedule.HOURS
        elif kind == 'MINUTES':  # 固定间隔分钟数
            period = IntervalSchedule.MINUTES
        elif kind == 'SECONDS':  # 固定间隔秒数
            period = IntervalSchedule.SECONDS
        else:  # 固定间隔微秒
            period = IntervalSchedule.MICROSECONDS
        schedule, created = IntervalSchedule.objects.get_or_create(every=every, period=period)
        return Response({'interval_id': schedule.id}, status=status.HTTP_200_OK)
    

class CrontabAPIView(APIView):
    ……


class TaskModelViewSet(viewsets.ModelViewSet):
    ……
  • 新增interval表达式

在这里插入图片描述

  • 查询interval表达式

在这里插入图片描述

  • 新增interval类型定时任务

在这里插入图片描述

  • admin查看定时任务信息

在这里插入图片描述

  • celery查看定时任务日志
[2022-03-15 09:39:05,918: INFO/MainProcess] Task public.tasks.send_email[bc042f63-81c0-42a5-959c-b05083923c80] received
[2022-03-15 09:39:05,919: WARNING/ForkPoolWorker-2] 向mvp发送邮件...
[2022-03-15 09:39:10,923: WARNING/ForkPoolWorker-2] 向mvp发送邮件完成
[2022-03-15 09:39:10,931: INFO/ForkPoolWorker-2] Task public.tasks.send_email[bc042f63-81c0-42a5-959c-b05083923c80] succeeded in 5.01182283199978s: 'send_sms ok'
[2022-03-15 09:39:25,921: INFO/MainProcess] Task public.tasks.send_email[4d20b737-60d9-4a19-8de2-234f1425e7b1] received
[2022-03-15 09:39:25,923: WARNING/ForkPoolWorker-2] 向mvp发送邮件...
[2022-03-15 09:39:30,929: WARNING/ForkPoolWorker-2] 向mvp发送邮件完成
[2022-03-15 09:39:30,960: INFO/ForkPoolWorker-2] Task public.tasks.send_email[4d20b737-60d9-4a19-8de2-234f1425e7b1] succeeded in 5.037057360999825s: 'send_sms ok'

基于interval的定时任务创建完成,其他修改,删除接口大家可以自行测试,演示验证完成。

RabbitMQ使用

在大规模应用场景下,推荐使用RabbitMQ作为代理,因为它功能完整、稳定,也是Celery推荐的消息队列。

安装

docker pull rabbitmq:management
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=CuiLiang@0302 rabbitmq:management
  • –hostname 主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);

celery使用rabbitmq

  • 安装pika
pip install pika
  • 指定rabbitmq作为celery的队列
broker_url ="amqp://guest:guest@127.0.0.1:5672"

参考文档

  • celery文档

https://docs.celeryproject.org/en/stable/index.html

  • django-celery-results文档:

https://github.com/celery/django-celery-results

  • django-celery-beat文档:

https://github.com/celery/django-celery-beat

查看更多

崔亮的博客-专注devops自动化运维,传播优秀it运维技术文章。更多原创运维开发相关文章,欢迎访问https://www.cuiliangblog.cn