celery简介
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
- 消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
- 任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
- 任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
celery安装
pip install celery
pip install redis
快速上手
项目文件结构
![image.png](https://img-blog.csdnimg.cn/img_convert/9c74bcb530e2fa279c30122d6e72cf14.png#clientId=ud0b9e244-ff1a-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=172&id=u5d2c0338&margin=[object Object]&name=image.png&originHeight=172&originWidth=280&originalType=binary&ratio=1&rotation=0&showTitle=false&size=12242&status=done&style=none&taskId=u1a0c6d3c-1137-406b-8bce-6a79f350345&title=&width=280)
定义workers(消费者)
- celery_task.py
import celery
import time
# 异步执行结果存储
backend = 'redis://:CuiLiang@0302@127.0.0.1:6379/1'
# 消息中间件
broker = 'redis://:CuiLiang@0302@127.0.0.1:6379/2'
cel = celery.Celery('test', backend=backend, broker=broker)
@cel.task
def send_email(name):
print("向%s发送邮件..." % name)
time.sleep(5)
print("向%s发送邮件完成" % name)
return "ok"
@cel.task
def send_sms(name):
print("向%s发送短信..." % name)
time.sleep(10)
print("向%s发送短信完成" % name)
return "ok"
启动celery
celery -A celery_task worker -l info
-------------- celery@cuiliangdeAir v5.2.3 (dawn-chorus)
--- ***** -----
-- ******* ---- macOS-12.2.1-x86_64-i386-64bit 2022-03-13 12:41:53
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: test:0x10bb43730
- ** ---------- .> transport: redis://:**@127.0.0.1:6379/2
- ** ---------- .> results: redis://:**@127.0.0.1:6379/1
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery_task.send_email
. celery_task.send_sms
[2022-03-13 12:41:53,699: INFO/MainProcess] Connected to redis://:**@127.0.0.1:6379/2
[2022-03-13 12:41:53,710: INFO/MainProcess] mingle: searching for neighbors
[2022-03-13 12:41:54,752: INFO/MainProcess] mingle: all alone
[2022-03-13 12:41:54,810: INFO/MainProcess] celery@cuiliangdeAir ready.
定义broker(生产者)
from celery_task import send_email, send_sms
result = send_email.delay("one")
print(result.id)
result2 = send_sms.delay("two")
print(result2.id)
- 直接运行broker后,在celery中可看到日志信息
[2022-03-13 12:42:03,125: INFO/MainProcess] Task celery_task.send_email[bfdfce02-79d3-4fa7-92b7-36e02a6a824a] received
[2022-03-13 12:42:03,130: WARNING/ForkPoolWorker-2] 向one发送邮件...
[2022-03-13 12:42:03,139: INFO/MainProcess] Task celery_task.send_sms[26862066-88a3-4721-b50e-00da5e5690d0] received
[2022-03-13 12:42:03,142: WARNING/ForkPoolWorker-4] 向two发送短信...
[2022-03-13 12:42:08,137: WARNING/ForkPoolWorker-2] 向one发送邮件完成
[2022-03-13 12:42:08,182: INFO/ForkPoolWorker-2] Task celery_task.send_email[bfdfce02-79d3-4fa7-92b7-36e02a6a824a] succeeded in 5.052875665000101s: 'ok'
[2022-03-13 12:42:13,145: WARNING/ForkPoolWorker-4] 向two发送短信完成
[2022-03-13 12:42:13,175: INFO/ForkPoolWorker-4] Task celery_task.send_sms[26862066-88a3-4721-b50e-00da5e5690d0] succeeded in 10.03281639099987s: 'ok'
定义result(获取任务结果)
from celery.result import AsyncResult
from celery_task import cel
def get_result(task):
async_result = AsyncResult(id=task, app=cel)
if async_result.successful():
result = async_result.get()
print(result)
# result.forget() # 将结果删除
elif async_result.failed():
print('执行失败')
elif async_result.status == 'PENDING':
print('任务等待中被执行')
elif async_result.status == 'RETRY':
print('任务异常后正在重试')
elif async_result.status == 'STARTED':
print('任务已经开始被执行')
if __name__ == '__main__':
task_id = 'bfdfce02-79d3-4fa7-92b7-36e02a6a824a'
get_result(task_id)
多任务结构
项目文件结构
定义workers
- celery_tasks/celery.py(celery初始化)
from celery import Celery
cel = Celery('celery_demo',
# 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
include=['celery_tasks.task01',
'celery_tasks.task02'
])
# 通过celery实例加载配置模块
cel.config_from_object('celery_tasks.celery_config')
- celery_tasks/celery_config.py(celery配置)
# 官方配置文档:查询每个配置项的含义。
# http://docs.celeryproject.org/en/latest/userguide/configuration.html
# broker(消息中间件来接收和发送任务消息)
broker_url = 'redis://:CuiLiang@0302@127.0.0.1:6379/2'
# backend(存储worker执行的结果)
result_backend = 'redis://:CuiLiang@0302@127.0.0.1:6379/1'
# 设置时间参照,不设置默认使用的UTC时间
timezone = 'Asia/Shanghai'
# 是否使用UTC
enable_utc = False
- celery_tasks/task01.py
import time
from celery_tasks.celery import cel
@cel.task
def send_email(name):
print("向%s发送邮件..." % name)
time.sleep(5)
print("向%s发送邮件完成" % name)
return "邮件发送成功"
- celery_tasks/task02.py
import time
from celery_tasks.celery import cel
@cel.task
def send_sms(name):
print("向%s发送短信..." % name)
time.sleep(10)
print("向%s发送短信完成" % name)
return "短信发送成功"
启动celery
celery -A celery_task worker -l info
定义broker
from celery_tasks.task01 import send_email
from celery_tasks.task02 import send_sms
result = send_email.delay("张三")
print(result.id)
result2 = send_sms.delay("李四")
print(result2.id)
定义result
from celery.result import AsyncResult
from celery_tasks.celery import cel
def get_result(task):
async_result = AsyncResult(id=task, app=cel)
if async_result.successful():
result = async_result.get()
print(result)
# result.forget() # 将结果删除
elif async_result.failed():
print('执行失败')
elif async_result.status == 'PENDING':
print('任务等待中被执行')
elif async_result.status == 'RETRY':
print('任务异常后正在重试')
elif async_result.status == 'STARTED':
print('任务已经开始被执行')
if __name__ == '__main__':
task_id = 'f008d807-7fd6-4094-b2e2-97c2d154dc1d'
get_result(task_id)
定时任务
broker方式调用
- produce_task.py
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from celery_tasks.task01 import send_email
from celery_tasks.task02 import send_sms
# 指定时间运行
run_time1 = datetime(2022, 3, 13, 14, 7, 00, tzinfo=ZoneInfo("Asia/Shanghai"))
print(run_time1)
result1 = send_email.apply_async(args=["Alex", ], eta=run_time1)
print(result1.id)
# 10秒钟后运行
run_time2 = datetime.utcnow() + timedelta(seconds=10)
print(run_time2)
result2 = send_sms.apply_async(args=["Alisa", ], eta=run_time2)
print(result2.id)
- 查看celery执行结果
[2022-03-13 14:06:35,931: INFO/MainProcess] Task celery_tasks.task01.send_email[506d6747-4b7b-49cd-8e1d-56ae05a31a3b] received
[2022-03-13 14:06:35,933: WARNING/ForkPoolWorker-2] 向Alex发送邮件...
[2022-03-13 14:06:35,939: INFO/MainProcess] Task celery_tasks.task02.send_sms[73ad47fd-ef1b-45a7-9ef9-6ea121ea9254] received
[2022-03-13 14:06:40,936: WARNING/ForkPoolWorker-2] 向Alex发送邮件完成
[2022-03-13 14:06:40,946: INFO/ForkPoolWorker-2] Task celery_tasks.task01.send_email[506d6747-4b7b-49cd-8e1d-56ae05a31a3b] succeeded in 5.0126269580014196s: '邮件发送成功'
[2022-03-13 14:06:46,292: WARNING/ForkPoolWorker-2] 向Alisa发送短信...
[2022-03-13 14:06:56,295: WARNING/ForkPoolWorker-2] 向Alisa发送短信完成
[2022-03-13 14:06:56,303: INFO/ForkPoolWorker-2] Task celery_tasks.task02.send_sms[73ad47fd-ef1b-45a7-9ef9-6ea121ea9254] succeeded in 10.01082549000057s: '短信发送成功'
workers方式调用
- celery_tasks/celery_config.py(新增定时任务配置)
from datetime import timedelta
from celery.schedules import crontab
# 官方配置文档:查询每个配置项的含义。
# http://docs.celeryproject.org/en/latest/userguide/configuration.html
# broker(消息中间件来接收和发送任务消息)
broker_url = 'redis://:CuiLiang@0302@127.0.0.1:6379/2'
# backend(存储worker执行的结果)
result_backend = 'redis://:CuiLiang@0302@127.0.0.1:6379/1'
# 设置时间参照,不设置默认使用的UTC时间
timezone = 'Asia/Shanghai'
# 是否使用UTC
enable_utc = False
# 设置定时任务
beat_schedule = {
'task1': {
'task': 'celery_tasks.task01.send_email',
# 'schedule': crontab(minute="*/1"), # 每分钟发送一次
'schedule': timedelta(seconds=10), # 每10秒发送一次任务消息
'args': ('张三',)
},
'task2': {
'task': 'celery_tasks.task02.send_sms',
'schedule': crontab(hour=20, minute=46), # 在每天的晚上10点24分发送一次任务消息
'args': ('李四',)
}
}
- 启动celery workers消费任务
celery -A celery_task worker -l info
- 启动Celery Beat进程,读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列
celery -A celery_tasks beat
- 观察celery控制台打印
2022-03-13 20:45:45,497: INFO/MainProcess] Task celery_tasks.task01.send_email[54ce4bf2-f746-4c46-83b7-7842e105f449] received
[2022-03-13 20:45:45,498: WARNING/ForkPoolWorker-2] 向张三发送邮件...
[2022-03-13 20:45:50,501: WARNING/ForkPoolWorker-2] 向张三发送邮件完成
[2022-03-13 20:45:50,513: INFO/ForkPoolWorker-2] Task celery_tasks.task01.send_email[54ce4bf2-f746-4c46-83b7-7842e105f449] succeeded in 5.014935314000468s: '邮件发送成功'
[2022-03-13 20:45:55,491: INFO/MainProcess] Task celery_tasks.task01.send_email[79800192-fca7-4391-9d66-1eef4a265a17] received
[2022-03-13 20:45:55,493: WARNING/ForkPoolWorker-2] 向张三发送邮件...
[2022-03-13 20:46:00,009: INFO/MainProcess] Task celery_tasks.task02.send_sms[cb5c22b5-cd17-42d1-b35d-cdf1ffef2054] received
[2022-03-13 20:46:00,012: WARNING/ForkPoolWorker-4] 向李四发送短信...
[2022-03-13 20:46:00,494: WARNING/ForkPoolWorker-2] 向张三发送邮件完成
[2022-03-13 20:46:00,503: INFO/ForkPoolWorker-2] Task celery_tasks.task01.send_email[79800192-fca7-4391-9d66-1eef4a265a17] succeeded in 5.010498132000066s: '邮件发送成功'
[2022-03-13 20:46:05,492: INFO/MainProcess] Task celery_tasks.task01.send_email[f018aebb-32ca-4f47-9a32-81bc24af8b17] received
[2022-03-13 20:46:05,494: WARNING/ForkPoolWorker-2] 向张三发送邮件...
[2022-03-13 20:46:10,016: WARNING/ForkPoolWorker-4] 向李四发送短信完成
[2022-03-13 20:46:10,043: INFO/ForkPoolWorker-4] Task celery_tasks.task02.send_sms[cb5c22b5-cd17-42d1-b35d-cdf1ffef2054] succeeded in 10.031732094998006s: '短信发送成功'
[2022-03-13 20:46:10,496: WARNING/ForkPoolWorker-2] 向张三发送邮件完成
[2022-03-13 20:46:10,508: INFO/ForkPoolWorker-2] Task celery_tasks.task01.send_email[f018aebb-32ca-4f47-9a32-81bc24af8b17] succeeded in 5.0141525609979s: '邮件发送成功'
[2022-03-13 20:46:15,496: INFO/MainProcess] Task celery_tasks.task01.send_email[5748ef00-ccfb-4d15-9499-4c879667fe6d] received
[2022-03-13 20:46:15,498: WARNING/ForkPoolWorker-2] 向张三发送邮件...
^C
worker: Hitting Ctrl+C again will terminate all running tasks!
worker: Warm shutdown (MainProcess)
[2022-03-13 20:46:20,499: WARNING/ForkPoolWorker-2] 向张三发送邮件完成
[2022-03-13 20:46:20,511: INFO/ForkPoolWorker-2] Task celery_tasks.task01.send_email[5748ef00-ccfb-4d15-9499-4c879667fe6d] succeeded in 5.013111708001816s: '邮件发送成功'
需要注意的是先启动消费者,再启动生产者。否则当生产者启动后,会产生大量任务在队列中,启动消费者后会造成任务大量囤积消费。
DRF使用celery
项目文件结构
settings中添加配置
- DrfTest/settings.py
# celery配置
# broker(消息中间件来接收和发送任务消息)
CELERY_BROKER_URL = 'redis://:CuiLiang@0302@127.0.0.1:6379/3'
# backend(存储worker执行的结果)
CELERY_RESULT_BACKEND = 'redis://:CuiLiang@0302@127.0.0.1:6379/4'
# 设置时间参照,不设置默认使用的UTC时间
CELERY_TIMEZONE = 'Asia/Shanghai'
# 是否使用UTC
CELERY_ENABLE_TUC = False
创建celery入口文件
- DrfTest/celery.py
# 主程序
import os
from celery import Celery
# 把celery和django进行组合,识别和加载django的配置文件
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DrfTest.settings')
# 创建celery实例对象
app = Celery("mycelery")
# 通过app对象加载配置
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动扫描并加载任务(所有celery任务必须在app的tasks.py下)
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
# 启动Celery的命令
# 在项目根目录下启动
# celery -A DrfTest worker -l INFO
- DrfTest/init.py(在模块中导入此应用程序。这确保了在Django启动时加载应用程序,以便装饰器(稍后提到)将使用它)
from .celery import app as celery_app
__all__ = ('celery_app',)
App中创建workers
- public/tasks.py(celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!)
# celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!
from celery import shared_task
import time
import logging
log = logging.getLogger("django")
@shared_task # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
def send_sms(mobile):
"""发送短信"""
print("向%s发送短信.." % mobile)
time.sleep(10)
print("向%s发送短信完成" % mobile)
return "send_email OK"
@shared_task
def send_email(email):
print("向%s发送邮件..." % email)
time.sleep(5)
print("向%s发送邮件完成" % email)
return "send_sms ok"
启动celery workers
celery -A mycelery.main worker --loglevel=info
-------------- celery@cuiliangdeAir v5.2.3 (dawn-chorus)
--- ***** -----
-- ******* ---- macOS-12.2.1-x86_64-i386-64bit 2022-03-14 11:01:48
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: mycelery:0x103b30760
- ** ---------- .> transport: redis://:**@127.0.0.1:6379/3
- ** ---------- .> results: redis://:**@127.0.0.1:6379/4
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. DrfTest.celery.debug_task
. public.tasks.send_email
. public.tasks.send_sms
[2022-03-14 11:01:49,103: INFO/MainProcess] Connected to redis://:**@127.0.0.1:6379/3
[2022-03-14 11:01:49,118: INFO/MainProcess] mingle: searching for neighbors
[2022-03-14 11:01:50,162: INFO/MainProcess] mingle: all alone
[2022-03-14 11:01:50,251: INFO/MainProcess] celery@cuiliangdeAir ready.
- 从控制台打印可知,celery已扫描到public下的异步任务,并自动注册了。
创建DRF broker
- public/urls.py(路由)
from rest_framework import routers
from public import views
from django.urls import path
app_name = "public"
urlpatterns = [
path('sendSms/', views.SendSmsAPIView.as_view()),
# 异步发送短信接口
path('sendEmail/', views.SendEmailAPIView.as_view()),
# 定时发送邮件接口
path('getTaskResult/<str:task_id>/', views.GetTaskResultAPIView.as_view()),
# 获取异步任务执行结果
]
router = routers.DefaultRouter()
urlpatterns += router.urls
- public/views.py(视图)
import time
from datetime import datetime
import zoneinfo
from celery.result import AsyncResult
from django.shortcuts import render
from rest_framework import viewsets, status
from rest_framework.response import Response
from rest_framework.views import APIView
from public.tasks import send_sms, send_email
from django.conf import settings
tz = zoneinfo.ZoneInfo(settings.TIME_ZONE)
def apiDoc(request):
"""
API 接口文档
"""
return render(request, 'doc.html')
class SendSmsAPIView(APIView):
"""
celery异步发送短信
"""
@staticmethod
def post(request):
phone = request.data.get('phone')
print(phone)
result = send_sms.delay(phone)
return Response({'msg': 'success', 'task_id': result.id}, status=status.HTTP_200_OK)
class SendEmailAPIView(APIView):
"""
celery定时发送邮件
"""
@staticmethod
def post(request):
time_str = request.data.get('time')
email = request.data.get('email')
date_time = time.strptime(time_str, "%Y-%m-%d %H:%M:%S")
run_time = datetime(date_time.tm_year, date_time.tm_mon, date_time.tm_mday, date_time.tm_hour, date_time.tm_min,
date_time.tm_sec).astimezone(tz)
result = send_email.apply_async(args=[email, ], eta=run_time)
return Response({'msg': 'success', 'task_id': result.id}, status=status.HTTP_200_OK)
class GetTaskResultAPIView(APIView):
"""
celery获取任务执行结果
"""
@staticmethod
def get(request, task_id):
async_result = AsyncResult(id=task_id)
msg = ''
if async_result.successful():
result = async_result.get()
print(result)
msg = '任务执行成功!'
# result.forget() # 将结果删除
elif async_result.failed():
msg = '执行失败'
elif async_result.status == 'PENDING':
msg = '任务等待被执行'
elif async_result.status == 'RETRY':
msg = '任务异常后正在重试'
elif async_result.status == 'STARTED':
msg = '任务已经开始被执行'
return Response({'msg': msg}, status=status.HTTP_200_OK)
请求测试
- 执行发送短信异步任务
- 获取异步任务执行结果
- 定时任务执行
- 查询定时任务执行结果(未到设定时间)
- 查询定时任务执行结果(已到设定时间)
- celery控制台查看日志
[2022-03-14 12:14:51,076: INFO/MainProcess] Task public.tasks.send_sms[09d390f2-1447-4395-aca9-3f6a4ceb0fb3] received
[2022-03-14 12:14:51,077: WARNING/ForkPoolWorker-2] 向110发送短信..
[2022-03-14 12:15:01,079: WARNING/ForkPoolWorker-2] 向110发送短信完成
[2022-03-14 12:15:01,097: INFO/ForkPoolWorker-2] Task public.tasks.send_sms[09d390f2-1447-4395-aca9-3f6a4ceb0fb3] succeeded in 10.02033851700071s: 'send_email OK'
[2022-03-14 12:18:06,084: INFO/MainProcess] Task public.tasks.send_email[f33b2a22-2b3f-4409-ae3c-692ccea78bc7] received
[2022-03-14 12:20:00,203: WARNING/ForkPoolWorker-2] 向admin@qq.com发送邮件...
[2022-03-14 12:20:05,206: WARNING/ForkPoolWorker-2] 向admin@qq.com发送邮件完成
[2022-03-14 12:20:05,224: INFO/ForkPoolWorker-2] Task public.tasks.send_email[f33b2a22-2b3f-4409-ae3c-692ccea78bc7] succeeded in 5.025896257000568s: 'send_sms ok'
django-celery-results
安装与配置
- 安装软件包
pip install django-celery-results
- settings.py中注册app
INSTALLED_APPS = (
...,
'django_celery_results',
)
# 使用django数据库作为worker执行结果存储
CELERY_RESULT_BACKEND = 'django-db'
- 执行数据库迁移建表
python manage.py migrate
启动与验证
- 启动celery worker
celery -A DrfTest worker -l INFO
- 查看admin异步任务记录
django-celery-results扩展
既然使用django-celery-results可以将所有异步任务执行结果存储起来,那么就可以根据这一特点做一写扩展功能开发。例如通过API接口获取当前所有异步任务执行结果
- 源码分析
查看django-celery-results库的模型定义(venv/lib/python3.10/site-packages/django_celery_results/models.py)
class TaskResult(models.Model):
"""Task result/status."""
task_id = models.CharField(
max_length=getattr(
settings,
'DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH',
255
),
unique=True,
verbose_name=_('Task ID'),
help_text=_('Celery ID for the Task that was run'))
periodic_task_name = models.CharField(
null=True, max_length=255,
verbose_name=_('Periodic Task Name'),
help_text=_('Name of the Periodic Task which was run'))
task_name = models.CharField(
null=True, max_length=getattr(
settings,
'DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH',
255
),
verbose_name=_('Task Name'),
help_text=_('Name of the Task which was run'))
task_args = models.TextField(
null=True,
verbose_name=_('Task Positional Arguments'),
help_text=_('JSON representation of the positional arguments '
'used with the task'))
task_kwargs = models.TextField(
null=True,
verbose_name=_('Task Named Arguments'),
help_text=_('JSON representation of the named arguments '
'used with the task'))
status = models.CharField(
max_length=50, default=states.PENDING,
choices=TASK_STATE_CHOICES,
verbose_name=_('Task State'),
help_text=_('Current state of the task being run'))
worker = models.CharField(
max_length=100, default=None, null=True,
verbose_name=_('Worker'), help_text=_('Worker that executes the task')
)
content_type = models.CharField(
max_length=128,
verbose_name=_('Result Content Type'),
help_text=_('Content type of the result data'))
content_encoding = models.CharField(
max_length=64,
verbose_name=_('Result Encoding'),
help_text=_('The encoding used to save the task result data'))
result = models.TextField(
null=True, default=None, editable=False,
verbose_name=_('Result Data'),
help_text=_('The data returned by the task. '
'Use content_encoding and content_type fields to read.'))
date_created = models.DateTimeField(
auto_now_add=True,
verbose_name=_('Created DateTime'),
help_text=_('Datetime field when the task result was created in UTC'))
date_done = models.DateTimeField(
auto_now=True,
verbose_name=_('Completed DateTime'),
help_text=_('Datetime field when the task was completed in UTC'))
traceback = models.TextField(
blank=True, null=True,
verbose_name=_('Traceback'),
help_text=_('Text of the traceback if the task generated one'))
meta = models.TextField(
null=True, default=None, editable=False,
verbose_name=_('Task Meta Information'),
help_text=_('JSON meta information about the task, '
'such as information on child tasks'))
objects = managers.TaskResultManager()
class Meta:
"""Table information."""
ordering = ['-date_done']
verbose_name = _('task result')
verbose_name_plural = _('task results')
# Explicit names to solve https://code.djangoproject.com/ticket/33483
indexes = [
models.Index(fields=['task_name'],
name='django_cele_task_na_08aec9_idx'),
models.Index(fields=['status'],
name='django_cele_status_9b6201_idx'),
models.Index(fields=['worker'],
name='django_cele_worker_d54dd8_idx'),
models.Index(fields=['date_created'],
name='django_cele_date_cr_f04a50_idx'),
models.Index(fields=['date_done'],
name='django_cele_date_do_f59aad_idx'),
]
def as_dict(self):
return {
'task_id': self.task_id,
'task_name': self.task_name,
'task_args': self.task_args,
'task_kwargs': self.task_kwargs,
'status': self.status,
'result': self.result,
'date_done': self.date_done,
'traceback': self.traceback,
'meta': self.meta,
'worker': self.worker
}
def __str__(self):
return '<Task: {0.task_id} ({0.status})>'.format(self)
分析发现该模型字段存储了异步任务执行的所有结果详细信息,但是模型字段过多,我们只需要提取其中关键的几个字段信息即可
- public/serializers.py(自定义序列化器,只提取几个关键字段信息)
from rest_framework import serializers
from django_celery_results.models import TaskResult
class CeleryTaskSerializer(serializers.ModelSerializer):
class Meta:
model = TaskResult
fields = ['task_id', 'task_name', 'date_done', 'status', 'worker']
- public/urls.py(定义路由,由于是对整个模型做只读操作,所以只读视图集即可)
from rest_framework import routers
from public import views
from django.urls import path
from public.views import MyTokenObtainPairView
app_name = "public"
urlpatterns = [
path('getTaskResultAll/', views.GetTaskResultReadOnlyModelViewSet.as_view({'get': 'list'}))
# 获取所有异步任务执行结果
]
router = routers.DefaultRouter()
urlpatterns += router.urls
- public/views.py(定义视图,使用ReadOnlyModelViewSet)
from public.serializers import CeleryTaskSerializer
from django_celery_results.models import TaskResult
class GetTaskResultReadOnlyModelViewSet(viewsets.ReadOnlyModelViewSet):
"""
celery获取全部任务执行结果
"""
queryset = TaskResult.objects.all()
serializer_class = CeleryTaskSerializer
- 访问验证
django-celery-beat
安装与配置
- 安装软件包
pip install django-celery-beat
- 修改settings配置
INSTALLED_APPS = [
'django_celery_beat',
]
# 使用django_celery_beat插件用来动态配置任务
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
- 数据库迁移建表
python manage.py migrate
启动与验证
- 启动 Celery worker 服务
celery -A DrfTest worker -l INFO
- 启动beat 调度器
celery -A DrfTest beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
- django admin生成的模型用途如下
clocked # 指定时间运行的计划任务
Crontabs # 与linux crontab计划任务时间格式一致。
Intervals # 以特定间隔(例如,每5秒)运行的计划。
Periodic tasks # 当前所有计划任务模型列表
solar event # 按照日出日落设置定时任务
- django admin创建crontab格式定时任务
- 控制台查看日志
[2022-03-14 17:40:00,103: INFO/MainProcess] Task public.tasks.send_email[a2a98f8f-2fb9-4620-a660-a106e863f106] received
[2022-03-14 17:40:00,122: WARNING/ForkPoolWorker-2] 向123发送邮件...
[2022-03-14 17:40:05,127: WARNING/ForkPoolWorker-2] 向123发送邮件完成
[2022-03-14 17:40:05,223: INFO/ForkPoolWorker-2] Task public.tasks.send_email[a2a98f8f-2fb9-4620-a660-a106e863f106] succeeded in 5.107552270999804s: 'send_sms ok'
- 创建Interval时间间隔类型定时任务
- 控制台查看日志
[2022-03-14 21:19:06,010: INFO/MainProcess] Task public.tasks.send_sms[24975126-0a5a-4ed9-a499-fc1e706cbcee] received
[2022-03-14 21:19:06,034: WARNING/ForkPoolWorker-2] 向王麻子发送短信..
[2022-03-14 21:19:15,898: INFO/MainProcess] Task public.tasks.send_sms[c6346cec-c464-4e5d-ae07-db3ac83fea78] received
[2022-03-14 21:19:15,911: WARNING/ForkPoolWorker-4] 向王麻子发送短信..
[2022-03-14 21:19:16,040: WARNING/ForkPoolWorker-2] 向王麻子发送短信完成
[2022-03-14 21:19:16,093: INFO/ForkPoolWorker-2] Task public.tasks.send_sms[24975126-0a5a-4ed9-a499-fc1e706cbcee] succeeded in 10.067840424999304s: 'send_email OK'
[2022-03-14 21:19:25,898: INFO/MainProcess] Task public.tasks.send_sms[05943468-b7ca-4575-9000-9a2bd973770e] received
[2022-03-14 21:19:25,899: WARNING/ForkPoolWorker-2] 向王麻子发送短信..
[2022-03-14 21:19:25,917: WARNING/ForkPoolWorker-4] 向王麻子发送短信完成
- django-celery-results查看执行记录
django-celery-beat扩展
django-celery-beat库的特点除了使用django admin可以创建修改定时任务外,还具备热加载的能力。当我们更新定时任务的配置后,无需重启celery workers,即可实现对定时任务的动态管理功能。
通过查看django-celery-beat的源码发现,我们只需要对PeriodicTask(定时任务列表)、IntervalSchedule(时间间隔列表)、CrontabSchedule(crontab定时任务)三个模型做管理,即可实现定时任务动态管理功能(模型文件路径venv/lib/python3.10/site-packages/django_celery_beat/models.py)
整个项目的开发思路是后端提供interval和crontab表达式的查询和新增接口,还有task定时任务的增删改查接口。当用户新增定时任务时,先填写定时任务表达式,获取到id后在前端显示,然后填写定时任务信息,创建一条定时任务记录。当用户需要暂定、修改定时任务时,直接调用put接口修改即可。删除定时任务也是一样的,调用delete接口即可完成操作。
基于crontab的定时任务
crontab定时任务的动态管理思路是,当新增定时任务时,先新增crontab表达式,获取到crontab表达式的id。然后再创建定时任务,使用crontab表达式id外键关联即可。对于CrontabSchedule,我们开发新增和查询接口使用ViewSet和APIView均可,此处为了方便演示,直接使用ViewSet。对于PeriodicTask,需要提供增删改查接口,我们使用ModelViewSet。
- public/urls.py(路由,一条是新增crontab的路由,一条是新增task任务的路由)
from rest_framework import routers
from public import views
from django.urls import path
app_name = "public"
urlpatterns = [
path('crontab/', views.CrontabAPIView.as_view()),
# crontab表达式
]
router = routers.DefaultRouter()
router.register('task', views.TaskModelViewSet, 'userInfo')
# 定时任务
urlpatterns += router.urls
- public/serializers.py(模型序列化器,主要是对PeriodicTask模型进行字段过滤)
class PeriodicTaskSerializer(serializers.ModelSerializer):
"""
定时任务序列化器
"""
last_run_at = serializers.DateTimeField(read_only=True)
class Meta:
model = PeriodicTask
fields = ['id', 'name', 'task', 'args', 'last_run_at', 'enabled', 'crontab', 'interval']
- public/views.py(视图,Crontab使用APIView,提供查询和新增接口。Task使用ModelViewSet,提供增删改查接口)
import time
from datetime import datetime
from zoneinfo import ZoneInfo
from celery.result import AsyncResult
from django.db import transaction
from django.shortcuts import render
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
from rest_framework import viewsets, status
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework_simplejwt.views import TokenObtainPairView
from public.models import UserDemo
from public.serializers import UserDemoSerializer, MyTokenObtainPairSerializer, CeleryTaskSerializer, \
PeriodicTaskSerializer
from public.tasks import send_sms, send_email
from django_celery_results.models import TaskResult
def apiDoc(request):
"""
API 接口文档
"""
return render(request, 'doc.html')
class CrontabAPIView(APIView):
"""
crontab表达式
"""
@staticmethod
def get(request):
# 查询crontab表达式
crontab_id = request.query_params.get('crontab_id')
expression = CrontabSchedule.objects.get(id=crontab_id).__str__().split(' ')[:5]
return Response({'id': crontab_id, 'expression': expression}, status=status.HTTP_200_OK)
@staticmethod
def post(request):
# 新增crontab表达式
expression = request.data.get('expression')
expression.split(' ')
schedule, _ = CrontabSchedule.objects.get_or_create(
minute=expression.split(' ')[0],
hour=expression.split(' ')[1],
day_of_month=expression.split(' ')[2],
month_of_year=expression.split(' ')[3],
day_of_week=expression.split(' ')[4],
timezone=ZoneInfo("Asia/Shanghai")
)
return Response({'crontab_id': schedule.id}, status=status.HTTP_200_OK)
class TaskModelViewSet(viewsets.ModelViewSet):
"""
定时任务增删改查
"""
queryset = PeriodicTask.objects.all()
serializer_class = PeriodicTaskSerializer
- 启动celery
celery -A DrfTest worker -l INFO
celery -A DrfTest beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
- 新增crontab表达式测试(使用linux crontab格式,每天9点30分执行定时任务)
- 根据id查询crontab表达式
- 新增定时任务
- admin查看新增的定时任务
- 到了9点30分,我们查看控制台日志
[2022-03-15 09:30:00,071: INFO/MainProcess] Task public.tasks.send_sms[a4e35e52-d763-46b8-b3fe-b44501d6a63a] received
[2022-03-15 09:30:00,084: WARNING/ForkPoolWorker-2] 向adc发送短信..
[2022-03-15 09:30:10,088: WARNING/ForkPoolWorker-2] 向adc发送短信完成
[2022-03-15 09:30:10,125: INFO/ForkPoolWorker-2] Task public.tasks.send_sms[a4e35e52-d763-46b8-b3fe-b44501d6a63a] succeeded in 10.045384181999907s: 'send_email OK'
- api接口查询任务详情
至此,通过api接口新增和查询任务接口演示验证完成,修改和删除接口由于篇幅有限,就不做演示了。
基于interval的定时任务
django-celery-beat除了支持linux crontab表达式创建定时任务外,还支持使用interval时间间隔表达式创建任务。与crontab定时任务的动态管理思路是一样的,当新增定时任务时,先新增interval表达式,获取到interval表达式的id。然后再创建定时任务,使用interval表达式id外键关联即可。对于IntervalSchedule
,我们开发新增和查询接口使用ViewSet和APIView均可,此处为了方便演示,直接使用ViewSet。对于PeriodicTask,继续使用上面的代码即可,无需更改。
- public/urls.py(路由,一条是新增crontab的路由,一条是新增task任务的路由)
from rest_framework import routers
from public import views
from django.urls import path
app_name = "public"
urlpatterns = [
path('crontab/', views.CrontabAPIView.as_view()),
# crontab表达式
path('interval/', views.IntervalAPIView.as_view())
# interval表达式
]
router = routers.DefaultRouter()
router.register('task', views.TaskModelViewSet, 'userInfo')
# 定时任务
urlpatterns += router.urls
- public/views.py(视图,Crontab使用APIView,提供查询和新增接口。Task保持不变)
import time
from datetime import datetime
from zoneinfo import ZoneInfo
from celery.result import AsyncResult
from django.db import transaction
from django.shortcuts import render
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
from rest_framework import viewsets, status
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework_simplejwt.views import TokenObtainPairView
from public.models import UserDemo
from public.serializers import UserDemoSerializer, MyTokenObtainPairSerializer, CeleryTaskSerializer, \
PeriodicTaskSerializer
from public.tasks import send_sms, send_email
from django_celery_results.models import TaskResult
def apiDoc(request):
"""
API 接口文档
"""
return render(request, 'doc.html')
class IntervalAPIView(APIView):
"""
Interval表达式
"""
@staticmethod
def get(request):
# 查询Interval表达式
interval_id = request.query_params.get('interval_id')
expression = IntervalSchedule.objects.get(id=interval_id).__str__()
return Response({'id': interval_id, 'expression': expression}, status=status.HTTP_200_OK)
@staticmethod
def post(request):
# 新增Interval表达式
every = request.data.get('every')
kind = request.data.get('kind')
period = None
if kind == 'DAYS': # 固定间隔天数
period = IntervalSchedule.DAYS
elif kind == 'HOURS': # 固定间隔小时数
period = IntervalSchedule.HOURS
elif kind == 'MINUTES': # 固定间隔分钟数
period = IntervalSchedule.MINUTES
elif kind == 'SECONDS': # 固定间隔秒数
period = IntervalSchedule.SECONDS
else: # 固定间隔微秒
period = IntervalSchedule.MICROSECONDS
schedule, created = IntervalSchedule.objects.get_or_create(every=every, period=period)
return Response({'interval_id': schedule.id}, status=status.HTTP_200_OK)
class CrontabAPIView(APIView):
……
class TaskModelViewSet(viewsets.ModelViewSet):
……
- 新增interval表达式
- 查询interval表达式
- 新增interval类型定时任务
- admin查看定时任务信息
- celery查看定时任务日志
[2022-03-15 09:39:05,918: INFO/MainProcess] Task public.tasks.send_email[bc042f63-81c0-42a5-959c-b05083923c80] received
[2022-03-15 09:39:05,919: WARNING/ForkPoolWorker-2] 向mvp发送邮件...
[2022-03-15 09:39:10,923: WARNING/ForkPoolWorker-2] 向mvp发送邮件完成
[2022-03-15 09:39:10,931: INFO/ForkPoolWorker-2] Task public.tasks.send_email[bc042f63-81c0-42a5-959c-b05083923c80] succeeded in 5.01182283199978s: 'send_sms ok'
[2022-03-15 09:39:25,921: INFO/MainProcess] Task public.tasks.send_email[4d20b737-60d9-4a19-8de2-234f1425e7b1] received
[2022-03-15 09:39:25,923: WARNING/ForkPoolWorker-2] 向mvp发送邮件...
[2022-03-15 09:39:30,929: WARNING/ForkPoolWorker-2] 向mvp发送邮件完成
[2022-03-15 09:39:30,960: INFO/ForkPoolWorker-2] Task public.tasks.send_email[4d20b737-60d9-4a19-8de2-234f1425e7b1] succeeded in 5.037057360999825s: 'send_sms ok'
基于interval的定时任务创建完成,其他修改,删除接口大家可以自行测试,演示验证完成。
RabbitMQ使用
在大规模应用场景下,推荐使用RabbitMQ作为代理,因为它功能完整、稳定,也是Celery推荐的消息队列。
安装
docker pull rabbitmq:management
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=CuiLiang@0302 rabbitmq:management
- –hostname 主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);
celery使用rabbitmq
- 安装pika
pip install pika
- 指定rabbitmq作为celery的队列
broker_url ="amqp://guest:guest@127.0.0.1:5672"
参考文档
- celery文档
https://docs.celeryproject.org/en/stable/index.html
- django-celery-results文档:
https://github.com/celery/django-celery-results
- django-celery-beat文档:
https://github.com/celery/django-celery-beat
查看更多
崔亮的博客-专注devops自动化运维,传播优秀it运维技术文章。更多原创运维开发相关文章,欢迎访问https://www.cuiliangblog.cn