Python 全栈系列253 再梳理flask-celery的搭建

发布于:2024-06-17 ⋅ 阅读:(25) ⋅ 点赞:(0)

说明

最近做了几个实验,将结论梳理一下,方便以后翻看。

  • 1 flask-celery 主要用于数据流的同步任务,其执行由flask-aps发起,基于IO并发的方法,达到资源的高效利用,满足业务上的需求。
  • 2 目前部署环境有算网机和anygpu

内容

1 环境

现在看起来,将flask-celery部署在宿主机上是可行的,一般来说我会先安装miniconda3,有些容器本身就已经是anaconda3的就不动了。

apt的升级是可选的

sudo apt update
sudo apt upgrade -y

wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh

bash https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh

~/miniconda3/bin/conda init

source ~/.bashrc

conda 24.4.0

which conda
/root/miniconda3/bin/conda

然后,一般就在base环境下安装包,如果本身已经是容器环境(租用机有些不允许再安装软件),就直接安装相关python3包

pip3 install ipython -i https://mirrors.aliyun.com/pypi/simple/
pip3 install requests -i https://mirrors.aliyun.com/pypi/simple/
wget -NO Basefuncs-1.10-py3-none-any.whl http://YOURS/downup/view008_download_from_folder/pys.Basefuncs-1.10-py3-none-any.whl
pip3 install Basefuncs-1.10-py3-none-any.whl
pip3 install clickhouse_driver -i https://mirrors.aliyun.com/pypi/simple/
pip3 install pandas -i https://mirrors.aliyun.com/pypi/simple/
pip3 install numpy -i https://mirrors.aliyun.com/pypi/simple/
pip3 install redis -i https://mirrors.aliyun.com/pypi/simple/
pip3 install pydantic -i https://mirrors.aliyun.com/pypi/simple/
pip3 install nest_asyncio -i https://mirrors.aliyun.com/pypi/simple/
pip3 install aiohttp -i https://mirrors.aliyun.com/pypi/simple/
pip3 install Flask -i https://mirrors.aliyun.com/pypi/simple/
pip3 install Flask-APScheduler -i https://mirrors.aliyun.com/pypi/simple/
pip3 install celery -i https://mirrors.aliyun.com/pypi/simple/
pip3 install gunicorn -i https://mirrors.aliyun.com/pypi/simple/
pip3 install mongoengine -i https://mirrors.aliyun.com/pypi/simple/
pip3 install apscheduler -i https://mirrors.aliyun.com/pypi/simple/
pip3 install tornado -i https://mirrors.aliyun.com/pypi/simple/
pip3 install Pillow -i https://mirrors.aliyun.com/pypi/simple/
pip3 install markdown -i https://mirrors.aliyun.com/pypi/simple/
pip3 install pymysql -i https://mirrors.aliyun.com/pypi/simple/
pip3 install gevent -i https://mirrors.aliyun.com/pypi/simple/
pip3 install akshare -i https://mirrors.aliyun.com/pypi/simple/

接下来需要在指定位置创建文件

  • 1 如果是在算网机,那么直接拉取项目文件,一般放在 /opt/project_notes/flask_celery 下面
  • 2 如果是在租用机,一般建立在/opt/flask_celery 下面,采用vim + 拷贝的方式
systemd

一般在宿主机级别可以有这个服务的控制,但是很多算力机也是没有的,只租用容器(怎么有种卖艺不卖身的感觉 - - !)。
如果没有systemd,那么就需要切到对应目录下,直接用后台启动的方式

直接用命令启动的方式,切换到对应目录下

cd  /opt/project_notes/flask_celery 
或
cd /opt/flask_celery

vim server_single_v2.py
# 如果采用了非标的redis配置,需要到程序里面修改一下地址
celery_broker = 'redis://:YOURS@127.0.0.1:24008/1'

# 前台启动命令
gunicorn server_single_v2:app -b 0.0.0.0:24104
celery -A server_single_v2.celery_ worker

写的时候刚打开,一堆积压的任务立即扑面而来。
在这里插入图片描述
我突然意识到celery的异步可能是线程级,而不是协程级的。(因为worker编号看起来很像我的cpu核数编号)
在这里插入图片描述
算了,先放过自己 。用一段时间看看,之后可以自己创建一个协程级的服务来进行流转和调度。我主要的应用应该就是在于异步请求API。我想以后有一块很大的改造就是将取数/查询部分的服务都使用异步进行优化。可以参考tornado同步转异步几种方式,先从搭建异步tornado开始。

编辑启动脚本vim ~/start_flask_celery.sh。理论上需要source才能activate,虽然即使不activate也可以运行,但我猜主要是因为在服务里面写了conda环境,所以默认进入了base。

#!/bin/bash
# 激活 base 环境(或你创建的特定环境)
source /root/miniconda3/etc/profile.d/conda.sh
conda activate base

#!/bin/bash
#anaconda环境

# 运行 Python 服务脚本
# 算网机
cd /opt/project_notes/flask_celery 
# 租用机
# cd /opt/flask_celery

nohup gunicorn server_single_v2:app -b 0.0.0.0:24104 >/dev/null 2>&1 &
nohup celery -A server_single_v2.celery_ worker >/dev/null 2>&1 &

如果没有systemd,那么就开机的时候手动执行一下脚本,一般还是尽量注册为systemd服务,注意一下两种不同环境的配置。
先修改脚本执行权限 chmod +x ~/start_flask_celery.shvim /lib/systemd/system/flask_celery.service

[Unit]
Description=flask_celery_service
After=network.target network-online.target syslog.target
Wants=network.target network-online.target

[Service]
#启动服务的命令
Type=forking
ExecStart=/root/start_flask_celery.sh
Restart=always
RestartSec=5
# miniconda
Environment="PATH=/root/miniconda3/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
# anaconda
#Environment="PATH=/root/anaconda3/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"

[Install]
WantedBy=multi-user.target

然后配置重载、启动和自启动

systemctl daemon-reload
systemctl start flask_celery
systemctl enable flask_celery
systemctl status flask_celery

┌─root@m7:~
└─ $ systemctl status  flask_celery
● flask_celery.service - flask_celery_service
   Loaded: loaded (/lib/systemd/system/flask_celery.service; enabled; vendor preset: enabled)
   Active: active (running) since Sat 2024-06-15 13:03:51 CST; 1s ago
  Process: 428 ExecStart=/root/start_flask_celery.sh (code=exited, status=0/SUCCESS)
    Tasks: 38 (limit: 4915)
   CGroup: /system.slice/flask_celery.service
           ├─475 /root/miniconda3/bin/python /root/miniconda3/bin/gunicorn server_single_v2:app -b 0.0.0.0:24104
           ├─476 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker
           ├─556 /root/miniconda3/bin/python /root/miniconda3/bin/gunicorn server_single_v2:app -b 0.0.0.0:24104
           ├─864 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker
           ├─924 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker
           ├─949 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker
           └─975 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker

615 13:03:51 m7 systemd[1]: Starting flask_celery_service...
615 13:03:51 m7 systemd[1]: Started flask_celery_service.

测试

import requests as req 

para_dict = {'arg1':111,
             'arg2':222}

resp = req.post('http://127.0.0.1:24104/sum_post/',json = para_dict )
print('每个任务耗时10秒')
import time 
tick1 = time.time()
print(req.get('http://127.0.0.1:24104/get_result/%s' % resp.text).text)
tick2 = time.time()
print('actually takes %.2f' % (tick2 - tick1 ))

每个任务耗时10333
actually takes 10.02