说明
今天偶然在别的文章里看到这个功能,突然觉得正好。
CeleryWorker已经搭好了,但是我一直想在用户请求时进行额外的处理会比较影响处理时间,用这个正好可以搭配上。
我设想的一个场景:
- 1 用户发起请求
- 2 接口中进行关键信息的日志记录 logger.info这种,直接记录在文本文件里
- 3 影响请求后在后台进行相关操作:在redis中计数,将数据转发kafka等
这样既不会影响请求时间,又可以把需要做的操作,例如在内存中记录,修改状态;采样数据发送做完。
一个比较笨(现在有在用)的方法是在处理时将数据采样发送到kafka,然后担心kafka服务出问题,又做了try…except。当然,最坏的情况也还好,因为调用大模型通常都是数秒。
当然,在接口中logging和redis这样的操作倒是没关系,因为时间足够短。某种程度上来说,logging+ logstash可能是更好的方案,redis都还有可能挂。
还有一个相对好一点的方法(准备好了还没有启用)。使用WCelery发送任务(甚至可以是复杂任务),并且可以再封装一层异步(async httpx),这样也只是多花一个请求时间。
当然这些都不如直接用FastAPI自带的BackgroundTasks方法,这种服务启动嵌入的方法应该更可靠。(其实在flask时代,就有before和after request装饰器)
以下是一个实验代码(主要 by deepseek)
server.py
from fastapi import FastAPI, BackgroundTasks, HTTPException
import time
from typing import Optional
import logging
app = FastAPI()
# 配置日志
logging.basicConfig(filename='app.log', level=logging.INFO)
# def write_log(message: str):
# """记录日志到文件(模拟耗时操作)"""
# time.sleep(2) # 模拟耗时操作
# with open("log.txt", mode="a") as f:
# f.write(f"{time.ctime()}: {message}\n")
# logging.info(f"日志已记录: {message}")
import aiofiles # 需要先安装: pip3 install aiofiles
import asyncio
async def write_log(message: str):
"""真正的异步日志写入"""
await asyncio.sleep(2) # 正确使用await
async with aiofiles.open("log.txt", mode="a") as f:
await f.write(f"{time.ctime()}: {message}\n")
logging.info(f"日志已记录: {message}") # logging默认是同步的
def send_email(to: str, subject: str, body: Optional[str] = None):
"""模拟发送邮件(带错误处理)"""
try:
time.sleep(3) # 模拟网络延迟
with open("email_logs.txt", "a") as f:
content = f"""
Time: {time.ctime()}
To: {to}
Subject: {subject}
Body: {body or 'No content'}
{'-'*30}
"""
f.write(content)
print(f"邮件已发送给 {to}")
except Exception as e:
logging.error(f"邮件发送失败: {str(e)}")
def cleanup_temp_files():
# aa
"""模拟清理临时文件"""
time.sleep(1)
print("临时文件清理完成")
from pydantic import BaseModel
class RegisterInput(BaseModel):
username: str
email: str
@app.post("/register")
async def register_user(user_input:RegisterInput , background_tasks: BackgroundTasks):
"""用户注册接口(演示多个后台任务)"""
if not user_input.email.endswith("@example.com"):
raise HTTPException(400, "仅支持 example.com 域名")
# 添加多个后台任务
background_tasks.add_task(
write_log,
f"新用户注册: {user_input.username}, 邮箱: {user_input.email}"
)
background_tasks.add_task(
send_email,
to=user_input.email,
subject="欢迎注册",
body=f"尊敬的 {user_input.username},感谢您的注册!"
)
background_tasks.add_task(cleanup_temp_files)
return {
"message": "注册成功",
"details": "激活邮件和日志记录正在后台处理"
}
@app.get("/stats")
async def get_stats(background_tasks: BackgroundTasks):
"""获取统计信息(演示快速响应+后台处理)"""
background_tasks.add_task(
write_log,
"用户查看了统计信息"
)
# 立即返回的简单数据
return {
"active_users": 42,
"note": "详细日志正在后台记录"
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
test.py
resp = httpx.post('http://127.0.0.1:8000/register', json = {'username':'andy', 'email':'andy@example.com'})
对应产生的几个文件,如log.txt
Fri Apr 18 23:27:02 2025: 新用户注册: andy, 邮箱: andy@example.com
Fri Apr 18 23:28:08 2025: 新用户注册: andy, 邮箱: andy@example.com
Fri Apr 18 23:29:52 2025: 新用户注册: andy, 邮箱: andy@example.com
Fri Apr 18 23:30:33 2025: 新用户注册: andy, 邮箱: andy@example.com
Fri Apr 18 23:39:40 2025: 新用户注册: andy001, 邮箱: andy001@example.com
实验成功,感觉还挺好的。
原文有一些错误,说background_tasks只能执行同步任务,事实证明是错误的。某种程度上说,异步的才符合FastAPI的特点。
另外,如果有些同步操作时间特别短是可以不用异步的。例如redis操作。
以上对我有用,希望对你也有用。