前言
在现代Web应用开发中,用户认证系统是必不可少的功能。本文将带你使用FastAPI框架构建一个完整的用户认证系统,包含注册、登录、信息更新和删除等功能。我们将采用JWT(JSON Web Token)进行身份验证,并使用SQLite作为数据库存储用户信息。
一、项目概述
1.1 功能列表
- 用户注册
- 用户登录(获取JWT令牌)
- 用户信息更新
- 用户删除
- 用户信息查询(单个/全部)
1.2 技术栈
- FastAPI:高性能Python Web框架
- SQLAlchemy:Python SQL工具包和ORM
- JWT:JSON Web Token身份验证
- Passlib:密码哈希处理
- SQLite:轻量级数据库
二、环境准备
2.1 安装依赖
pip install fastapi uvicorn sqlalchemy passlib python-jose[cryptography]
2.2 项目结构
fastapi-auth/
├── main.py # 主应用文件
└── Login.db # SQLite数据库文件(运行后自动生成)
三、核心代码解析
3.1 配置与初始化
from fastapi import FastAPI, HTTPException, Depends, status
from pydantic import BaseModel, ConfigDict
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker, declarative_base
from passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta
from typing import Optional
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
# OAuth2配置
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# 密码加密配置
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# JWT配置
SECRET_KEY = "ninglegedongdong" # 生产环境应使用更安全的密钥
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 创建FastAPI应用
app = FastAPI()
# 数据库配置
my_database = 'sqlite:///Login.db'
engine = create_engine(my_database, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
Base = declarative_base()
3.2 数据模型定义
数据库模型
class User(Base):
__tablename__ = 'login'
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
hashed_password = Column(String)
Pydantic模型(用于请求/响应验证)
class UserCreate(BaseModel):
username: str
password: str
class UserResponse(BaseModel):
id: int
username: str
model_config = ConfigDict(from_attributes=True)
class UserUpdate(BaseModel):
username: str
password: str
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
3.3 工具函数
def get_password_hash(password: str):
"""生成密码哈希"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str):
"""验证密码"""
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""创建JWT令牌"""
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme)):
"""获取当前用户(依赖项)"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="身份凭证验证失败",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if not username:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
db = SessionLocal()
try:
user = db.query(User).filter(User.username == token_data.username).first()
if user is None:
raise credentials_exception
return user
finally:
db.close()
3.4 路由处理
用户注册
@app.post('/register', response_model=UserResponse)
def create_user(user: UserCreate):
db = SessionLocal()
try:
# 检查用户名是否已存在
existing_user = db.query(User).filter(User.username == user.username).first()
if existing_user:
raise HTTPException(status_code=400, detail="用户名已存在")
# 创建新用户
hashed_password = get_password_hash(user.password)
new_user = User(username=user.username, hashed_password=hashed_password)
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user
finally:
db.close()
用户登录(获取JWT令牌)
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
db = SessionLocal()
try:
user = db.query(User).filter(User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
finally:
db.close()
用户信息更新
@app.put('/login/{username}', response_model=UserResponse)
def update_user(
username: str,
user_update: UserUpdate,
current_user: User = Depends(get_current_user)
):
db = SessionLocal()
try:
existing_user = db.query(User).filter(User.username == username).first()
if not existing_user:
raise HTTPException(status_code=404, detail="用户不存在")
existing_user.username = user_update.username
existing_user.hashed_password = get_password_hash(user_update.password)
db.commit()
db.refresh(existing_user)
return existing_user
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
用户删除
@app.delete('/login/{username}', response_model=dict)
def delete_user(
username: str,
current_user: User = Depends(get_current_user)
):
db = SessionLocal()
try:
existing_user = db.query(User).filter(User.username == username).first()
if not existing_user:
raise HTTPException(status_code=404, detail="用户不存在")
db.delete(existing_user)
db.commit()
return {'message': f'用户 {username} 删除成功'}
finally:
db.close()
用户查询
# 查询单个用户
@app.get('/login/{username}', response_model=UserResponse)
def get_user_name(username: str, current_user: User = Depends(get_current_user)):
db = SessionLocal()
try:
user = db.query(User).filter(User.username == username).first()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
return user
finally:
db.close()
# 查询所有用户
@app.get('/login', response_model=list[UserResponse])
def get_all_users(current_user: User = Depends(get_current_user)):
db = SessionLocal()
try:
return db.query(User).all()
finally:
db.close()
3.5 启动应用
if __name__ == '__main__':
import uvicorn
uvicorn.run("main:app", host='127.0.0.1', port=8000, reload=True)
四、API测试指南
4.1 启动服务
python main.py
服务启动后,访问 http://127.0.0.1:8000/docs
可查看Swagger UI界面。
4.2 测试流程
1.注册用户:
- 方法:POST
/register
- 请求体
{
"username": "testuser",
"password": "testpass"
}
2.登录获取令牌:
- 方法:POST
/token
- 表单数据:
- username: testuser
- password: testpass
3.访问受保护端点:
- 在请求头中添加:
Authorization: Bearer <your_token>
4.更新用户信息:
- 方法:PUT
/login/testuser
- 请求体:
{
"username": "newusername",
"password": "newpassword"
}
5.查询用户信息:
- 方法:GET
/login/testuser
或 GET/login
6.删除用户:
- 方法:DELETE
/login/testuser
五、安全注意事项
生产环境配置:
- 使用环境变量存储敏感信息(如SECRET_KEY)
- 使用HTTPS加密通信
- 设置更强的密码哈希算法参数
JWT安全:
- 设置合理的令牌过期时间
- 考虑实现令牌刷新机制
- 避免在令牌中存储敏感信息
数据库安全:
- 定期备份数据库
- 使用更安全的数据库如PostgreSQL或MySQL
- 实施数据库连接池
六、总结
本文详细介绍了如何使用FastAPI构建一个完整的用户认证系统,涵盖了从数据库模型设计到API实现的各个方面。通过这个项目,你可以学习到:
- FastAPI的基本使用和路由定义
- SQLAlchemy ORM的配置和操作
- JWT身份验证的实现
- 密码安全处理的最佳实践
- RESTful API的设计原则
这个项目可以作为你开发更复杂Web应用的基础模板,根据实际需求进行扩展和优化。希望这篇指南对你的学习有所帮助!