【FastapiAdmin 二开教程】如何在FastAPl-Vue3-Admin添加注册、找回密码邮箱验证

发布于:2025-08-11 ⋅ 阅读:(19) ⋅ 点赞:(0)

一、项目背景

项目地址:FastapiAdmin
项目文档:FastapiAdmin 文档

FastAPI-Vue3-Admin 是一套完全开源的现代化企业级中后台快速开发平台,以高度模块化设计与先进技术架构为核心优势。项目采用前后端分离模式,深度融合 Python 后端框架 FastAPI 与前端主流框架 Vue3,实现多端统一开发体验,为开发者提供开箱即用的一站式解决方案,助力高效搭建高质量企业级管理系统。
在这里插入图片描述

二、功能实现流程思考

这里主要是邮箱通过SMIP配置下发给用户验证码并进行验证的流程

1.注册流程(分 2 步)

在这里插入图片描述

  • 步骤 1:验证邮箱并下发验证码
    前端操作:
    用户进入注册页面,首先填写「邮箱地址」,点击「获取验证码」按钮。
    (前端需先校验邮箱格式,如xxx@xx.com)
    后端处理:
    校验邮箱是否已被注册(若已注册,返回提示「该邮箱已注册,请直接登录」)。
    生成 6 位数字验证码(如123456),设置有效期(如 5 分钟)。
    将验证码与邮箱绑定,临时存储(使用Redis 缓存,键为verify_type:操作类型:邮箱,值为验证码,过期时间 10 分钟)。
    调用邮件服务发送验证码(邮件内容:你的注册验证码是:123456,5分钟内有效,请勿泄露给他人)。
    前端显示倒计时(如 60 秒内不可重复发送),提示「验证码已发送至邮箱,请查收」。
  • 步骤 2:提交表单并验证验证码
    前端操作:
    用户填写完整注册信息(用户名、密码、确认密码),并输入邮箱收到的验证码,点击「注册」按钮。
    后端处理:
    从 Redis 中获取该邮箱对应的验证码,校验:
    验证码是否存在(若不存在,提示「验证码已过期,请重新获取」)。
    输入的验证码与存储的是否一致(若不一致,提示「验证码错误」)。
    验证码通过后,校验其他表单信息(如用户名是否重复、密码强度是否达标)。
    所有校验通过:
    密码加密存储
    用户信息入库。
    清除 Redis 中的验证码(防止重复使用)。
    返回注册成功,跳转至登录页。
    校验失败:返回具体错误提示(如「用户名已存在」)。
2.找回密码流程(分 2 步)

在这里插入图片描述

  • 步骤 1:验证用户名及邮箱关联性,下发验证码
    前端操作:
    用户进入「找回密码」页面,填写「用户名」和「注册时的邮箱」,点击「获取验证码」。
    后端处理:
    校验用户名与邮箱是否匹配(查询用户表,若username和email对应的数据不存在,返回提示「用户名或邮箱不正确」)。
    生成 6 位验证码,设置有效期(如 5 分钟),通过 Redis 临时存储(键为reset✉️{邮箱},值为验证码)。
    发送邮件(内容:你的密码重置验证码是:123456,5分钟内有效)。
    前端显示倒计时,提示「验证码已发送至邮箱」。
  • 步骤 2:提交表单进行邮箱验证(重置密码)
    前端操作:
    用户输入收到的验证码,填写「新密码」和「确认新密码」,点击「重置密码」。
    (前端需校验:新密码强度、两次输入一致性)
    后端处理:
    校验验证码(同注册流程,检查存在性和正确性)。
    验证码通过后,校验新密码合法性。
    所有校验通过:
    加密新密码,更新用户表中的密码字段。
    清除 Redis 中的验证码。
    返回「密码重置成功」,跳转至登录页。
    校验失败:返回错误提示(如「验证码错误」或「密码格式不符」)。
3.安全性考虑
  • 防止在邮箱验证码存活期有人暴力破解验证码
  • 处理方法:加入验证次数,一个验证码最多可以尝试5次,超过5次验证需要重新生成验证码,并清除缓存。

三、功能具体实现(主要针对后端)

前端仿照项目作者二开教程,找到注册和忘记密码添加对应的接口及填入框即可,因为博主不太擅长前端并且前端添加部分比较简单,这里就不过多赘述了,遇到问题可以私聊我或者评论区留言。

1.公共部分
.env

配置文件中加入自己邮箱的SMTP配置,可以在网上教程中查找使用方法

# 邮箱SMTP配置
SMTP_SERVER=smtp.example.com
SMTP_PORT=465
SMTP_USERNAME=example@example.com
SMTP_PASSWORD=example_key
SMTP_SENDER_NAME=example-serve
setting

对应的配置项中也要添加上

    # ================================================= #
    # ******************* 邮件服务配置 ***************** #
    # ================================================= #
    SMTP_SERVER: Optional[str] = None
    SMTP_PORT: Optional[int] = None
    SMTP_USERNAME: Optional[str] = None
    SMTP_PASSWORD: Optional[str] = None
    SMTP_SENDER_NAME: str = "System Notification"
enums

仿照老大的写法,读取写入redis时,不同服务拼接不同的键名,这里加上邮箱验证码

@unique
class RedisInitKeyConfig(Enum):
    """系统内置Redis键名枚举"""

    ACCESS_TOKEN = {'key': 'access_token', 'remark': '登录令牌信息'}
    REFRESH_TOKEN = {'key': 'refresh_token', 'remark': '刷新令牌信息'}
    CAPTCHA_CODES = {'key': 'captcha_codes', 'remark': '图片验证码'}
    EMAIL_CODES = {'key': 'email_codes', 'remark': '邮箱验证码'}
    SYSTEM_CONFIG = {'key': 'system_config', 'remark': '系统配置'}
    SYSTEM_DICT = {'key':'system_dict','remark': '数据字典'}
    
    @property
    def key(self) -> str:
        """获取Redis键名"""
        return self.value.get('key')

    @property 
    def remark(self) -> str:
        """获取Redis键名说明"""
        return self.value.get('remark')
email_schema

处理与邮箱验证码相关的请求数据验证,这个文件我放在了system/schema内

# -*- coding: utf-8 -*-

from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field, EmailStr


class EmailSceneEnum(str, Enum):
    register = "register"
    forget = "forget"


class SendEmailCodeSchema(BaseModel):
    email: EmailStr = Field(..., description="邮箱")
    scene: EmailSceneEnum = Field(..., description="场景: register | forget")
    username: Optional[str] = Field(default=None, description="用户名(找回密码可选,用于校验用户存在)")


class EmailCodeVerifySchema(BaseModel):
    email: EmailStr = Field(..., description="邮箱")
    scene: EmailSceneEnum = Field(..., description="场景: register | forget")
    code: str = Field(..., max_length=10, description="邮箱验证码")
    username: Optional[str] = Field(default=None, description="用户名(找回密码可选,用于校验用户存在)")



email_util

负责发送邮件,及一些简单的邮箱验证,具体可以看代码注释

# -*- coding: utf-8 -*-

import smtplib
import re
from email.mime.text import MIMEText
from email.header import Header
from email.utils import formataddr
from typing import Dict, Any

from app.config.setting import settings
from app.core.exceptions import CustomException


EMAIL_PATTERN = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')


class EmailUtil:
    """邮件发送工具类:复制 others/emil 逻辑,并统一从 Settings 读取 SMTP 配置"""

    @staticmethod
    def is_valid_email(email: str) -> bool:
        return bool(EMAIL_PATTERN.match(email))

    @staticmethod
    def build_config_from_settings() -> Dict[str, str]:
        return {
            "smtp_server": settings.SMTP_SERVER or "",
            "smtp_port": str(settings.SMTP_PORT) if settings.SMTP_PORT is not None else "",
            "smtp_username": settings.SMTP_USERNAME or "",
            "smtp_password": settings.SMTP_PASSWORD or "",
            "sender_name": settings.SMTP_SENDER_NAME or "System Notification",
        }

    @classmethod
    def send_email_with_config(
        cls,
        email_config: Dict[str, str],
        recipient: str,
        subject: str,
        content: str,
        is_html: bool = True,
    ) -> Dict[str, Any]:
        # 参考 others/emil.py 的发送逻辑
        if not cls.is_valid_email(recipient):
            return {"success": False, "error": f"无效的收件人邮箱格式: {recipient}"}

        sender_email = email_config.get('smtp_username', '')
        if not cls.is_valid_email(sender_email):
            return {"success": False, "error": f"无效的发件人邮箱格式: {sender_email}"}

        required_configs = ['smtp_server', 'smtp_port', 'smtp_username', 'smtp_password']
        for config_key in required_configs:
            if config_key not in email_config or not email_config[config_key]:
                return {"success": False, "error": f"邮箱配置不完整,缺少: {config_key}"}

        try:
            mime_type = "html" if is_html else "plain"
            message = MIMEText(content, mime_type, "utf-8")

            sender_name = email_config.get('sender_name', 'System Notification')
            try:
                formatted_from = formataddr((Header(sender_name, 'utf-8').encode(), sender_email))
            except Exception:
                formatted_from = sender_email

            message["From"] = formatted_from
            message["To"] = recipient
            message["Subject"] = Header(subject, "utf-8")

            port = int(email_config['smtp_port'])
            server = smtplib.SMTP_SSL(email_config['smtp_server'], port)
            try:
                server.login(sender_email, email_config['smtp_password'])
                server.sendmail(sender_email, recipient, message.as_string())
                return {"success": True, "message": "邮件发送成功"}
            finally:
                server.quit()

        except smtplib.SMTPAuthenticationError:
            raise CustomException(msg="邮箱认证失败,请检查账号和授权码是否正确")
        except smtplib.SMTPConnectError:
            raise CustomException(msg="无法连接到SMTP服务器,请检查服务器地址和端口是否正确")
        except smtplib.SMTPDataError as e:
            detail = e.smtp_error.decode() if hasattr(e, 'smtp_error') and isinstance(e.smtp_error, (bytes, bytearray)) else str(e)
            raise CustomException(msg=f"邮件数据错误: {getattr(e, 'smtp_code', 'unknown')} - {detail}")
        except Exception as e:
            raise CustomException(msg=f"发送邮件时发生错误: {str(e)}")

    @classmethod
    def send_email(
        cls,
        recipient: str,
        subject: str,
        content: str,
        is_html: bool = True,
    ) -> Dict[str, Any]:
        config = cls.build_config_from_settings()
        # 基础配置校验(提前失败,给出明确提示)
        required = [config.get('smtp_server'), config.get('smtp_port'), config.get('smtp_username'), config.get('smtp_password')]
        if not all(required):
            raise CustomException(msg="邮件服务未配置,请先在 .env 配置 SMTP_SERVER/SMTP_PORT/SMTP_USERNAME/SMTP_PASSWORD")

        result = cls.send_email_with_config(config, recipient, subject, content, is_html)
        if not result.get("success"):
            raise CustomException(msg=f"发送邮件失败: {result.get('error')}")
        return result
email_servce

这个主要是通过调用email_util里面的工具,实现邮箱验证与其他一些与邮箱相关用到的服务,这个夹在了用户与邮箱工具之间,做一个过渡。

另外这里验证码进行验证与存储时在验证码后面多拼接了一位,用于记录验证次数,验证码本来是123456,那么最初存储的时候是1234560,最后一位用于计数,每验证匹配失败一次就加1,其余失败不变。当超过5次认定有破坏服务器的行为,清理redis并提示重新生成。

class EmailCodeService:
    """邮箱验证码服务"""

    @classmethod
    async def send_email_code_service(cls, redis: Redis, data: SendEmailCodeSchema, db: AsyncSession | None = None) -> bool:
        """发送邮箱验证码并缓存到Redis"""
        # 生成6位数字验证码
        import random
        code = ''.join([str(random.randint(0, 9)) for _ in range(6)])

        # 如果场景为找回密码,且提供了用户名,需要校验用户存在与邮箱一致
        if data.scene == EmailSceneEnum.FORGET and data.username:
            if db is None:
                raise CustomException(msg="服务不可用,请稍后重试")
            auth = AuthSchema(db=db)
            user = await UserCRUD(auth).get_by_username_crud(username=data.username)
            if not user:
                raise CustomException(msg="用户不存在")
            if user.email and user.email != data.email:
                raise CustomException(msg="邮箱与账号不匹配")

        # 邮件内容
        subject = "验证码通知"
        html_content = f"""
        <html><body>
        <p>您正在进行{data.scene}操作,验证码如下:</p>
        <div style='text-align:center;margin:12px 0;'>
          <span style='display:inline-block;background:#f8f9fa;padding:12px 20px;border-radius:4px;font-size:24px;font-weight:700;letter-spacing:6px;border:1px solid #e0e0e0'>{code}</span>
          <p style='font-size:12px;color:#666;margin-top:8px'>验证码有效期为10分钟</p>
        </div>
        <p>若非本人操作,请忽略本邮件。</p>
        </body></html>
        """

        # 发送邮件(统一工具类读取 Settings 配置)
        EmailUtil.send_email(recipient=data.email, subject=subject, content=html_content, is_html=True)

        # 缓存验证码,缓存的时候缓存一个计数操作,如果查询一定次数将抛出异常,避免有些人进行暴力破解
        key = f"{RedisInitKeyConfig.EMAIL_CODES.key}:{data.scene}:{data.email}"
        await RedisCURD(redis).set(key=key, value=code+'0', expire=600)
        return True

    @classmethod
    async def verify_email_code_service(cls, redis: Redis, scene: str, email : str, captcha: str) -> bool:
        """校验邮箱验证码,支持注册与找回场景"""

        key = f"{RedisInitKeyConfig.EMAIL_CODES.key}:{scene}:{email}"
        cache_code = await RedisCURD(redis).get(key)
        if not cache_code:
            raise CustomException(msg='邮箱验证码不存在或已过期')
        try:
            cache_code = cache_code.decode() if isinstance(cache_code, (bytes, bytearray)) else str(cache_code)
        except Exception:
            cache_code = str(cache_code)
            
        if int(cache_code[6]) >= 5:
            # 多次验证邮箱,为避免注册时恶意获取验证码,增加一个次数限制
            await RedisCURD(redis).delete(key)
            raise CustomException(msg='邮箱验证码错误次数过多,请稍后重新获取')
        if captcha != cache_code[:6]:
            await RedisCURD(redis).set(key=key, value=cache_code[:6]+f'{int(cache_code[6])+1}', expire=300)
            raise CustomException(msg='邮箱验证码错误')
        # 验证通过后可选择删除验证码,避免复用
        await RedisCURD(redis).delete(key)
        return True
2.邮箱验证码下发

由于邮箱相关的操作封装到了email_service内,所以这里需要做的处理比较少

@router.post("/get_email_code", summary="发送邮箱验证码", description="发送邮箱验证码")
async def send_email_code_controller(
    data: SendEmailCodeSchema,
    redis: Redis = Depends(redis_getter),
    db: AsyncSession = Depends(db_getter),
) -> JSONResponse:
    await EmailCodeService.send_email_code_service(redis=redis, data=data, db=db)
    logger.info(f"发送邮箱验证码成功: {data.scene} -> {data.email}")
    return SuccessResponse(msg="验证码已发送")
2.注册实现
user_controller

接口地址

@router.post('/register', summary="注册用户", description="注册用户")
async def register_user_controller(
    data: UserRegisterSchema, 
    db: AsyncSession = Depends(db_getter),
    redis: Redis = Depends(redis_getter),
) -> JSONResponse:
    auth = AuthSchema(db=db)
    user_register_result = await UserService.register_user_service(redis=redis, data=data, auth=auth)
    logger.info(f"{data.username} 注册用户成功: {user_register_result}")
    return SuccessResponse(data=user_register_result, msg='注册用户成功')
user_schema

数据校验模块,这里新添加了一个email与email_code

class UserRegisterSchema(BaseModel):
    """注册"""
    name: str = Field(default=None, max_length=15, description="名称")
    mobile: Optional[str] = Field(default=None, description="手机号")
    username: str = Field(default=None, max_length=15, description="账号")
    password: str = Field(default=None, max_length=128, description="密码哈希值")
    email: Optional[EmailStr] = Field(default=None, description="邮箱")
    email_code: Optional[str] = Field(default=None, max_length=10, description="邮箱验证码")
    role_ids: Optional[List[int]] = Field(default=[2], description='角色ID')
    creator_id: Optional[int] = Field(default=1, description='创建人ID')
    description: Optional[str] = Field(default=f'注册用户{username}', max_length=250, description="备注")
user_service

这里是邮箱验证码的验证,如果通过验证就持久化用户信息,否则就返回相应的提示,dict_data.pop(‘email_code’)是为了适配User model

    @classmethod
    async def register_user_service(cls, redis: Redis, auth: AuthSchema, data: UserRegisterSchema) -> Dict:
        """用户注册"""
        # 检查用户名是否存在
        username_ok = await UserCRUD(auth).get_by_username_crud(username=data.username)
        if username_ok:
            raise CustomException(msg='账号已存在,请去登录')
        if not EmailUtil.is_valid_email(data.email):
            raise CustomException(msg='邮箱格式不正确')
        if not data.email_code:
            raise CustomException(msg='邮箱验证码不能为空')
        
        # 验证邮箱及code
        await EmailCodeService.verify_email_code_service(redis=redis, scene="register", email=data.email, captcha=data.email_code)

        data.password = PwdUtil.set_password_hash(password=data.password)
        data.name = data.username
        data.creator_id = 1
        dict_data = data.model_dump(exclude_unset=True)
        # dict_data['creator_id'] = data.creator_id
        # dict_data['dept_id'] = data.dept_id
        # dict_data['description'] = data.description
        dict_data.pop('email_code')
        result = await UserCRUD(auth).create(data=dict_data)
        await UserCRUD(auth).set_user_roles_crud(user_ids=[result.id], role_ids=data.role_ids)
        # await UserCRUD(auth).set_user_positions_crud(user_ids=[result.id], position_ids=data.position_ids)
        return UserOutSchema.model_validate(result).model_dump()
3.找回密码实现
user_controller

接口地址

@router.post('/forget/password', summary="忘记密码", description="忘记密码")
async def forget_password_controller(
    data: UserForgetPasswordSchema, 
    db: AsyncSession = Depends(db_getter),
    redis: Redis = Depends(redis_getter),
) -> JSONResponse:
    auth = AuthSchema(db=db)
    user_forget_password_result = await UserService.forget_password_service(redis=redis, data=data, auth=auth)
    logger.info(f"{data.username} 重置密码成功: {user_forget_password_result}")
    return SuccessResponse(data=user_forget_password_result, msg='重置密码成功')
user_schema

数据校验模块,这里新添加了一个email与email_code

class UserForgetPasswordSchema(BaseModel):
    """忘记密码"""
    username: str = Field(default=None, max_length=15, description="用户名")
    email: Optional[EmailStr] = Field(default=None, description="邮箱")
    email_code: Optional[str] = Field(default=None, max_length=10, description="邮箱验证码")
    new_password: str = Field(default=None, max_length=128, description="新密码")
user_service

这块与注册模块类似,比注册模块稍简单。

    @classmethod
    async def forget_password_service(cls, redis: Redis , auth: AuthSchema, data: UserForgetPasswordSchema) -> Dict:
        """用户忘记密码"""
        user = await UserCRUD(auth).get_by_username_crud(username=data.username)
        if not user:
            raise CustomException(msg="用户不存在")
        if not user.status:
            raise CustomException(msg="用户已停用")
        if not EmailUtil.is_valid_email(data.email):
            raise CustomException(msg='邮箱格式不正确')
        if not data.email_code:
            raise CustomException(msg='邮箱验证码不能为空')
        # 验证邮箱及code
        await EmailCodeService.verify_email_code_service(redis=redis, scene="forget", email=data.email, captcha=data.email_code)
        new_password_hash = PwdUtil.set_password_hash(password=data.new_password)
        new_user = await UserCRUD(auth).forget_password_crud(id=user.id, password_hash=new_password_hash)
        return UserOutSchema.model_validate(new_user).model_dump()

四、测试

1.注册

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
可以登录:
在这里插入图片描述

2.忘记密码

在这里插入图片描述

在这里插入图片描述

3.多次验证失败

多次失败,直接重新下发。然后重新验证。不过这种还是有被蒙对的可能,只不过验证码不在固定站在那里等着人找了,
在这里插入图片描述


博主的能力还是比较欠缺的,如果有哪些地方有更好的优化欢迎大家提出疑问,合理的话会及时进行改进,希望能够有所帮助!!!


网站公告

今日签到

点亮在社区的每一天
去签到