FastAPI用户认证系统开发指南:从零构建安全API

发布于:2025-04-17 ⋅ 阅读:(30) ⋅ 点赞:(0)

前言

在现代Web应用开发中,用户认证系统是必不可少的功能。本文将带你使用FastAPI框架构建一个完整的用户认证系统,包含注册、登录、信息更新和删除等功能。我们将采用JWT(JSON Web Token)进行身份验证,并使用SQLite作为数据库存储用户信息。

一、项目概述

1.1 功能列表

  • 用户注册
  • 用户登录(获取JWT令牌)
  • 用户信息更新
  • 用户删除
  • 用户信息查询(单个/全部)

1.2 技术栈

  • FastAPI:高性能Python Web框架
  • SQLAlchemy:Python SQL工具包和ORM
  • JWT:JSON Web Token身份验证
  • Passlib:密码哈希处理
  • SQLite:轻量级数据库

二、环境准备

2.1 安装依赖

pip install fastapi uvicorn sqlalchemy passlib python-jose[cryptography]

 2.2 项目结构

fastapi-auth/
├── main.py            # 主应用文件
└── Login.db           # SQLite数据库文件(运行后自动生成)

三、核心代码解析

3.1 配置与初始化

from fastapi import FastAPI, HTTPException, Depends, status
from pydantic import BaseModel, ConfigDict
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker, declarative_base
from passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta
from typing import Optional
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

# OAuth2配置
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# 密码加密配置
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# JWT配置
SECRET_KEY = "ninglegedongdong"  # 生产环境应使用更安全的密钥
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 创建FastAPI应用
app = FastAPI()

# 数据库配置
my_database = 'sqlite:///Login.db'
engine = create_engine(my_database, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
Base = declarative_base()

 3.2 数据模型定义

数据库模型

class User(Base):
    __tablename__ = 'login'
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True)
    hashed_password = Column(String)

Pydantic模型(用于请求/响应验证)

class UserCreate(BaseModel):
    username: str
    password: str

class UserResponse(BaseModel):
    id: int
    username: str
    model_config = ConfigDict(from_attributes=True)

class UserUpdate(BaseModel):
    username: str
    password: str

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: Optional[str] = None

3.3 工具函数

def get_password_hash(password: str):
    """生成密码哈希"""
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str):
    """验证密码"""
    return pwd_context.verify(plain_password, hashed_password)

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    """创建JWT令牌"""
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

async def get_current_user(token: str = Depends(oauth2_scheme)):
    """获取当前用户(依赖项)"""
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="身份凭证验证失败",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if not username:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    
    db = SessionLocal()
    try:
        user = db.query(User).filter(User.username == token_data.username).first()
        if user is None:
            raise credentials_exception
        return user
    finally:
        db.close()

3.4 路由处理

用户注册
@app.post('/register', response_model=UserResponse)
def create_user(user: UserCreate):
    db = SessionLocal()
    try:
        # 检查用户名是否已存在
        existing_user = db.query(User).filter(User.username == user.username).first()
        if existing_user:
            raise HTTPException(status_code=400, detail="用户名已存在")
        
        # 创建新用户
        hashed_password = get_password_hash(user.password)
        new_user = User(username=user.username, hashed_password=hashed_password)
        db.add(new_user)
        db.commit()
        db.refresh(new_user)
        return new_user
    finally:
        db.close()
用户登录(获取JWT令牌)
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    db = SessionLocal()
    try:
        user = db.query(User).filter(User.username == form_data.username).first()
        if not user or not verify_password(form_data.password, user.hashed_password):
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="用户名或密码错误",
                headers={"WWW-Authenticate": "Bearer"},
            )
        
        access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
        access_token = create_access_token(
            data={"sub": user.username},
            expires_delta=access_token_expires
        )
        return {"access_token": access_token, "token_type": "bearer"}
    finally:
        db.close()
用户信息更新
@app.put('/login/{username}', response_model=UserResponse)
def update_user(
    username: str,
    user_update: UserUpdate,
    current_user: User = Depends(get_current_user)
):
    db = SessionLocal()
    try:
        existing_user = db.query(User).filter(User.username == username).first()
        if not existing_user:
            raise HTTPException(status_code=404, detail="用户不存在")

        existing_user.username = user_update.username
        existing_user.hashed_password = get_password_hash(user_update.password)
        db.commit()
        db.refresh(existing_user)
        return existing_user
    except Exception as e:
        db.rollback()
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        db.close()
用户删除
@app.delete('/login/{username}', response_model=dict)
def delete_user(
    username: str,
    current_user: User = Depends(get_current_user)
):
    db = SessionLocal()
    try:
        existing_user = db.query(User).filter(User.username == username).first()
        if not existing_user:
            raise HTTPException(status_code=404, detail="用户不存在")
        
        db.delete(existing_user)
        db.commit()
        return {'message': f'用户 {username} 删除成功'}
    finally:
        db.close()
用户查询
# 查询单个用户
@app.get('/login/{username}', response_model=UserResponse)
def get_user_name(username: str, current_user: User = Depends(get_current_user)):
    db = SessionLocal()
    try:
        user = db.query(User).filter(User.username == username).first()
        if not user:
            raise HTTPException(status_code=404, detail="用户不存在")
        return user
    finally:
        db.close()

# 查询所有用户
@app.get('/login', response_model=list[UserResponse])
def get_all_users(current_user: User = Depends(get_current_user)):
    db = SessionLocal()
    try:
        return db.query(User).all()
    finally:
        db.close()

3.5 启动应用

if __name__ == '__main__':
    import uvicorn
    uvicorn.run("main:app", host='127.0.0.1', port=8000, reload=True)

四、API测试指南

4.1 启动服务

python main.py

服务启动后,访问 http://127.0.0.1:8000/docs 可查看Swagger UI界面。

4.2 测试流程

1.注册用户

  • 方法:POST /register
  • 请求体
{
  "username": "testuser",
  "password": "testpass"
}

2.登录获取令牌

  • 方法:POST /token
  • 表单数据:
    • username: testuser
    • password: testpass

3.访问受保护端点

  • 在请求头中添加:
Authorization: Bearer <your_token>

4.更新用户信息

  • 方法:PUT /login/testuser
  • 请求体:
{
  "username": "newusername",
  "password": "newpassword"
}

5.查询用户信息

  • 方法:GET /login/testuser 或 GET /login

6.删除用户

  • 方法:DELETE /login/testuser

五、安全注意事项

  1. 生产环境配置

    • 使用环境变量存储敏感信息(如SECRET_KEY)
    • 使用HTTPS加密通信
    • 设置更强的密码哈希算法参数
  2. JWT安全

    • 设置合理的令牌过期时间
    • 考虑实现令牌刷新机制
    • 避免在令牌中存储敏感信息
  3. 数据库安全

    • 定期备份数据库
    • 使用更安全的数据库如PostgreSQL或MySQL
    • 实施数据库连接池

六、总结

本文详细介绍了如何使用FastAPI构建一个完整的用户认证系统,涵盖了从数据库模型设计到API实现的各个方面。通过这个项目,你可以学习到:

  1. FastAPI的基本使用和路由定义
  2. SQLAlchemy ORM的配置和操作
  3. JWT身份验证的实现
  4. 密码安全处理的最佳实践
  5. RESTful API的设计原则

这个项目可以作为你开发更复杂Web应用的基础模板,根据实际需求进行扩展和优化。希望这篇指南对你的学习有所帮助!


网站公告

今日签到

点亮在社区的每一天
去签到