基于ThinkPHP6用户登录逻辑,结合FastAPI框架实现用户登录系统的全流程解析

发布于:2025-03-12 ⋅ 阅读:(17) ⋅ 点赞:(0)

基于ThinkPHP6用户登录逻辑,结合FastAPI框架实现用户登录系统的全流程解析,涵盖路由配置、数据验证、JWT令牌生成与鉴权、中间件依赖等核心环节:


1. 路由配置与请求处理

  • 路由定义:使用APIRouter组织用户认证相关接口(注册、登录),并通过app.include_router()集成到主应用。例如:

    # routers/auth.py
    from fastapi import APIRouter
    router = APIRouter(prefix="/auth", tags=["Authentication"])
    
    @router.post("/register")
    async def register(user: UserRegisterSchema):
        ...
    
    @router.post("/login")
    async def login(user: UserLoginSchema):
        ...
    

    引用2977

  • 请求数据模型:通过Pydantic模型定义请求参数,实现数据校验与自动文档生成:

    from pydantic import BaseModel
    class UserLoginSchema(BaseModel):
        username: str
        password: str
    

    引用62


2. 用户注册与密码安全

  • 密码哈希存储:使用passlibbcrypt算法加密密码,避免明文存储:

    from passlib.context import CryptContext
    pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
    
    def hash_password(password: str) -> str:
        return pwd_context.hash(password)
    

    引用185

  • 用户唯一性校验:注册时检查用户名是否已存在:

    async def register(user: UserRegisterSchema, db: Session = Depends(get_db)):
        existing_user = db.query(User).filter(User.username == user.username).first()
        if existing_user:
            raise HTTPException(status_code=400, detail="用户已存在")
        hashed_pwd = hash_password(user.password)
        db_user = User(username=user.username, password=hashed_pwd)
        db.add(db_user)
        db.commit()
    

    引用1


3. 用户登录与JWT生成

  • 身份验证:查询数据库并验证密码哈希:

    def verify_password(plain_pwd: str, hashed_pwd: str) -> bool:
        return pwd_context.verify(plain_pwd, hashed_pwd)
    
    async def authenticate_user(username: str, password: str, db: Session):
        user = db.query(User).filter(User.username == username).first()
        if not user or not verify_password(password, user.password):
            return None
        return user
    

    引用7085

  • 生成JWT令牌:使用python-jose生成包含用户ID和过期时间的令牌:

    from jose import jwt
    from datetime import datetime, timedelta
    
    SECRET_KEY = "your-secret-key"
    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE = 30  # 分钟
    
    def create_access_token(user_id: str) -> str:
        payload = {"sub": user_id, "exp": datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE)}
        return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
    

    引用8085


4. 鉴权中间件与依赖注入

  • 令牌解析:通过OAuth2PasswordBearer定义令牌获取方式,并验证有效性:

    from fastapi.security import OAuth2PasswordBearer
    oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")
    
    async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
            user_id = payload.get("sub")
            user = db.query(User).filter(User.id == user_id).first()
            if not user:
                raise HTTPException(status_code=401, detail="无效令牌")
            return user
        except JWTError:
            raise HTTPException(status_code=401, detail="令牌过期或无效")
    

    引用5580

  • 受保护路由:在需要登录的路由中添加Depends(get_current_user)

    @router.get("/profile")
    async def user_profile(current_user: User = Depends(get_current_user)):
        return {"username": current_user.username}
    

5. 响应与错误处理

  • 统一响应格式:返回标准化的JSON响应,包含状态码和消息:
    @router.post("/login")
    async def login(user: UserLoginSchema, db: Session = Depends(get_db)):
        authenticated_user = await authenticate_user(user.username, user.password, db)
        if not authenticated_user:
            raise HTTPException(status_code=401, detail="用户名或密码错误")
        token = create_access_token(str(authenticated_user.id))
        return {"code": 200, "access_token": token, "token_type": "bearer"}
    
    引用70

6. 数据库与异步支持

  • SQLAlchemy集成:使用异步引擎连接数据库(如MySQL/PostgreSQL):
    from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
    engine = create_async_engine("mysql+aiomysql://user:password@localhost/db")
    async def get_db():
        async with AsyncSession(engine) as session:
            yield session
    
    引用29

关键技术与优化建议

  1. 安全性增强

    • 使用HTTPS传输敏感数据。
    • 设置JWT短期有效期并实现令牌刷新机制。
    • 定期轮换SECRET_KEY
  2. 性能优化

    • 异步数据库操作(如asyncpgaiomysql)。
    • Redis缓存高频访问的用户信息。
  3. 扩展性设计

    • 使用APIRouter拆分业务模块(用户、订单、商品)。
    • 通过中间件实现跨域(CORS)和请求日志记录。

完整代码结构示例

app/
├── main.py              # 主入口
├── routers/
│   ├── auth.py          # 认证路由(注册/登录)
│   └── users.py         # 用户信息路由
├── models/
│   └── user.py          # 用户模型定义
├── schemas/
│   └── user.py          # Pydantic数据模型
├── dependencies.py      # 鉴权依赖项
└── utils/
    ├── security.py      # JWT与密码工具
    └── database.py      # 数据库连接

引用整合1295570778085以下是如何基于ThinkPHP6用户登录逻辑,结合FastAPI框架实现用户登录系统的全流程解析,涵盖路由配置、数据验证、JWT令牌生成与鉴权、中间件依赖等核心环节:


1. 路由配置与请求处理

  • 路由定义:使用APIRouter组织用户认证相关接口(注册、登录),并通过app.include_router()集成到主应用。例如:

    # routers/auth.py
    from fastapi import APIRouter
    router = APIRouter(prefix="/auth", tags=["Authentication"])
    
    @router.post("/register")
    async def register(user: UserRegisterSchema):
        ...
    
    @router.post("/login")
    async def login(user: UserLoginSchema):
        ...
    

    引用2977

  • 请求数据模型:通过Pydantic模型定义请求参数,实现数据校验与自动文档生成:

    from pydantic import BaseModel
    class UserLoginSchema(BaseModel):
        username: str
        password: str
    

    引用62


2. 用户注册与密码安全

  • 密码哈希存储:使用passlibbcrypt算法加密密码,避免明文存储:

    from passlib.context import CryptContext
    pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
    
    def hash_password(password: str) -> str:
        return pwd_context.hash(password)
    

    引用185

  • 用户唯一性校验:注册时检查用户名是否已存在:

    async def register(user: UserRegisterSchema, db: Session = Depends(get_db)):
        existing_user = db.query(User).filter(User.username == user.username).first()
        if existing_user:
            raise HTTPException(status_code=400, detail="用户已存在")
        hashed_pwd = hash_password(user.password)
        db_user = User(username=user.username, password=hashed_pwd)
        db.add(db_user)
        db.commit()
    

    引用1


3. 用户登录与JWT生成

  • 身份验证:查询数据库并验证密码哈希:

    def verify_password(plain_pwd: str, hashed_pwd: str) -> bool:
        return pwd_context.verify(plain_pwd, hashed_pwd)
    
    async def authenticate_user(username: str, password: str, db: Session):
        user = db.query(User).filter(User.username == username).first()
        if not user or not verify_password(password, user.password):
            return None
        return user
    

    引用7085

  • 生成JWT令牌:使用python-jose生成包含用户ID和过期时间的令牌:

    from jose import jwt
    from datetime import datetime, timedelta
    
    SECRET_KEY = "your-secret-key"
    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE = 30  # 分钟
    
    def create_access_token(user_id: str) -> str:
        payload = {"sub": user_id, "exp": datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE)}
        return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
    

    引用8085


4. 鉴权中间件与依赖注入

  • 令牌解析:通过OAuth2PasswordBearer定义令牌获取方式,并验证有效性:

    from fastapi.security import OAuth2PasswordBearer
    oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")
    
    async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
            user_id = payload.get("sub")
            user = db.query(User).filter(User.id == user_id).first()
            if not user:
                raise HTTPException(status_code=401, detail="无效令牌")
            return user
        except JWTError:
            raise HTTPException(status_code=401, detail="令牌过期或无效")
    

    引用5580

  • 受保护路由:在需要登录的路由中添加Depends(get_current_user)

    @router.get("/profile")
    async def user_profile(current_user: User = Depends(get_current_user)):
        return {"username": current_user.username}
    

5. 响应与错误处理

  • 统一响应格式:返回标准化的JSON响应,包含状态码和消息:
    @router.post("/login")
    async def login(user: UserLoginSchema, db: Session = Depends(get_db)):
        authenticated_user = await authenticate_user(user.username, user.password, db)
        if not authenticated_user:
            raise HTTPException(status_code=401, detail="用户名或密码错误")
        token = create_access_token(str(authenticated_user.id))
        return {"code": 200, "access_token": token, "token_type": "bearer"}
    
    引用70

6. 数据库与异步支持

  • SQLAlchemy集成:使用异步引擎连接数据库(如MySQL/PostgreSQL):
    from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
    engine = create_async_engine("mysql+aiomysql://user:password@localhost/db")
    async def get_db():
        async with AsyncSession(engine) as session:
            yield session
    
    引用29

关键技术与优化建议

  1. 安全性增强

    • 使用HTTPS传输敏感数据。
    • 设置JWT短期有效期并实现令牌刷新机制。
    • 定期轮换SECRET_KEY
  2. 性能优化

    • 异步数据库操作(如asyncpgaiomysql)。
    • Redis缓存高频访问的用户信息。
  3. 扩展性设计

    • 使用APIRouter拆分业务模块(用户、订单、商品)。
    • 通过中间件实现跨域(CORS)和请求日志记录。

完整代码结构示例

app/
├── main.py              # 主入口
├── routers/
│   ├── auth.py          # 认证路由(注册/登录)
│   └── users.py         # 用户信息路由
├── models/
│   └── user.py          # 用户模型定义
├── schemas/
│   └── user.py          # Pydantic数据模型
├── dependencies.py      # 鉴权依赖项
└── utils/
    ├── security.py      # JWT与密码工具
    └── database.py      # 数据库连接

网站公告

今日签到

点亮在社区的每一天
去签到