文章目录
一、项目背景
项目地址:FastapiAdmin
项目文档:FastapiAdmin 文档
FastAPI-Vue3-Admin 是一套完全开源的现代化企业级中后台快速开发平台,以高度模块化设计与先进技术架构为核心优势。项目采用前后端分离模式,深度融合 Python 后端框架 FastAPI 与前端主流框架 Vue3,实现多端统一开发体验,为开发者提供开箱即用的一站式解决方案,助力高效搭建高质量企业级管理系统。
二、功能实现流程思考
这里主要是邮箱通过SMIP配置下发给用户验证码并进行验证的流程
1.注册流程(分 2 步)
- 步骤 1:验证邮箱并下发验证码
前端操作:
用户进入注册页面,首先填写「邮箱地址」,点击「获取验证码」按钮。
(前端需先校验邮箱格式,如xxx@xx.com)
后端处理:
校验邮箱是否已被注册(若已注册,返回提示「该邮箱已注册,请直接登录」)。
生成 6 位数字验证码(如123456),设置有效期(如 5 分钟)。
将验证码与邮箱绑定,临时存储(使用Redis 缓存,键为verify_type:操作类型:邮箱,值为验证码,过期时间 10 分钟)。
调用邮件服务发送验证码(邮件内容:你的注册验证码是:123456,5分钟内有效,请勿泄露给他人)。
前端显示倒计时(如 60 秒内不可重复发送),提示「验证码已发送至邮箱,请查收」。 - 步骤 2:提交表单并验证验证码
前端操作:
用户填写完整注册信息(用户名、密码、确认密码),并输入邮箱收到的验证码,点击「注册」按钮。
后端处理:
从 Redis 中获取该邮箱对应的验证码,校验:
验证码是否存在(若不存在,提示「验证码已过期,请重新获取」)。
输入的验证码与存储的是否一致(若不一致,提示「验证码错误」)。
验证码通过后,校验其他表单信息(如用户名是否重复、密码强度是否达标)。
所有校验通过:
密码加密存储
用户信息入库。
清除 Redis 中的验证码(防止重复使用)。
返回注册成功,跳转至登录页。
校验失败:返回具体错误提示(如「用户名已存在」)。
2.找回密码流程(分 2 步)
- 步骤 1:验证用户名及邮箱关联性,下发验证码
前端操作:
用户进入「找回密码」页面,填写「用户名」和「注册时的邮箱」,点击「获取验证码」。
后端处理:
校验用户名与邮箱是否匹配(查询用户表,若username和email对应的数据不存在,返回提示「用户名或邮箱不正确」)。
生成 6 位验证码,设置有效期(如 5 分钟),通过 Redis 临时存储(键为reset✉️{邮箱},值为验证码)。
发送邮件(内容:你的密码重置验证码是:123456,5分钟内有效)。
前端显示倒计时,提示「验证码已发送至邮箱」。 - 步骤 2:提交表单进行邮箱验证(重置密码)
前端操作:
用户输入收到的验证码,填写「新密码」和「确认新密码」,点击「重置密码」。
(前端需校验:新密码强度、两次输入一致性)
后端处理:
校验验证码(同注册流程,检查存在性和正确性)。
验证码通过后,校验新密码合法性。
所有校验通过:
加密新密码,更新用户表中的密码字段。
清除 Redis 中的验证码。
返回「密码重置成功」,跳转至登录页。
校验失败:返回错误提示(如「验证码错误」或「密码格式不符」)。
3.安全性考虑
- 防止在邮箱验证码存活期有人暴力破解验证码
- 处理方法:加入验证次数,一个验证码最多可以尝试5次,超过5次验证需要重新生成验证码,并清除缓存。
三、功能具体实现(主要针对后端)
前端仿照项目作者二开教程,找到注册和忘记密码添加对应的接口及填入框即可,因为博主不太擅长前端并且前端添加部分比较简单,这里就不过多赘述了,遇到问题可以私聊我或者评论区留言。
1.公共部分
.env
配置文件中加入自己邮箱的SMTP配置,可以在网上教程中查找使用方法
# 邮箱SMTP配置
SMTP_SERVER=smtp.example.com
SMTP_PORT=465
SMTP_USERNAME=example@example.com
SMTP_PASSWORD=example_key
SMTP_SENDER_NAME=example-serve
setting
对应的配置项中也要添加上
# ================================================= #
# ******************* 邮件服务配置 ***************** #
# ================================================= #
SMTP_SERVER: Optional[str] = None
SMTP_PORT: Optional[int] = None
SMTP_USERNAME: Optional[str] = None
SMTP_PASSWORD: Optional[str] = None
SMTP_SENDER_NAME: str = "System Notification"
enums
仿照老大的写法,读取写入redis时,不同服务拼接不同的键名,这里加上邮箱验证码
@unique
class RedisInitKeyConfig(Enum):
"""系统内置Redis键名枚举"""
ACCESS_TOKEN = {'key': 'access_token', 'remark': '登录令牌信息'}
REFRESH_TOKEN = {'key': 'refresh_token', 'remark': '刷新令牌信息'}
CAPTCHA_CODES = {'key': 'captcha_codes', 'remark': '图片验证码'}
EMAIL_CODES = {'key': 'email_codes', 'remark': '邮箱验证码'}
SYSTEM_CONFIG = {'key': 'system_config', 'remark': '系统配置'}
SYSTEM_DICT = {'key':'system_dict','remark': '数据字典'}
@property
def key(self) -> str:
"""获取Redis键名"""
return self.value.get('key')
@property
def remark(self) -> str:
"""获取Redis键名说明"""
return self.value.get('remark')
email_schema
处理与邮箱验证码相关的请求数据验证,这个文件我放在了system/schema内
# -*- coding: utf-8 -*-
from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field, EmailStr
class EmailSceneEnum(str, Enum):
register = "register"
forget = "forget"
class SendEmailCodeSchema(BaseModel):
email: EmailStr = Field(..., description="邮箱")
scene: EmailSceneEnum = Field(..., description="场景: register | forget")
username: Optional[str] = Field(default=None, description="用户名(找回密码可选,用于校验用户存在)")
class EmailCodeVerifySchema(BaseModel):
email: EmailStr = Field(..., description="邮箱")
scene: EmailSceneEnum = Field(..., description="场景: register | forget")
code: str = Field(..., max_length=10, description="邮箱验证码")
username: Optional[str] = Field(default=None, description="用户名(找回密码可选,用于校验用户存在)")
email_util
负责发送邮件,及一些简单的邮箱验证,具体可以看代码注释
# -*- coding: utf-8 -*-
import smtplib
import re
from email.mime.text import MIMEText
from email.header import Header
from email.utils import formataddr
from typing import Dict, Any
from app.config.setting import settings
from app.core.exceptions import CustomException
EMAIL_PATTERN = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
class EmailUtil:
"""邮件发送工具类:复制 others/emil 逻辑,并统一从 Settings 读取 SMTP 配置"""
@staticmethod
def is_valid_email(email: str) -> bool:
return bool(EMAIL_PATTERN.match(email))
@staticmethod
def build_config_from_settings() -> Dict[str, str]:
return {
"smtp_server": settings.SMTP_SERVER or "",
"smtp_port": str(settings.SMTP_PORT) if settings.SMTP_PORT is not None else "",
"smtp_username": settings.SMTP_USERNAME or "",
"smtp_password": settings.SMTP_PASSWORD or "",
"sender_name": settings.SMTP_SENDER_NAME or "System Notification",
}
@classmethod
def send_email_with_config(
cls,
email_config: Dict[str, str],
recipient: str,
subject: str,
content: str,
is_html: bool = True,
) -> Dict[str, Any]:
# 参考 others/emil.py 的发送逻辑
if not cls.is_valid_email(recipient):
return {"success": False, "error": f"无效的收件人邮箱格式: {recipient}"}
sender_email = email_config.get('smtp_username', '')
if not cls.is_valid_email(sender_email):
return {"success": False, "error": f"无效的发件人邮箱格式: {sender_email}"}
required_configs = ['smtp_server', 'smtp_port', 'smtp_username', 'smtp_password']
for config_key in required_configs:
if config_key not in email_config or not email_config[config_key]:
return {"success": False, "error": f"邮箱配置不完整,缺少: {config_key}"}
try:
mime_type = "html" if is_html else "plain"
message = MIMEText(content, mime_type, "utf-8")
sender_name = email_config.get('sender_name', 'System Notification')
try:
formatted_from = formataddr((Header(sender_name, 'utf-8').encode(), sender_email))
except Exception:
formatted_from = sender_email
message["From"] = formatted_from
message["To"] = recipient
message["Subject"] = Header(subject, "utf-8")
port = int(email_config['smtp_port'])
server = smtplib.SMTP_SSL(email_config['smtp_server'], port)
try:
server.login(sender_email, email_config['smtp_password'])
server.sendmail(sender_email, recipient, message.as_string())
return {"success": True, "message": "邮件发送成功"}
finally:
server.quit()
except smtplib.SMTPAuthenticationError:
raise CustomException(msg="邮箱认证失败,请检查账号和授权码是否正确")
except smtplib.SMTPConnectError:
raise CustomException(msg="无法连接到SMTP服务器,请检查服务器地址和端口是否正确")
except smtplib.SMTPDataError as e:
detail = e.smtp_error.decode() if hasattr(e, 'smtp_error') and isinstance(e.smtp_error, (bytes, bytearray)) else str(e)
raise CustomException(msg=f"邮件数据错误: {getattr(e, 'smtp_code', 'unknown')} - {detail}")
except Exception as e:
raise CustomException(msg=f"发送邮件时发生错误: {str(e)}")
@classmethod
def send_email(
cls,
recipient: str,
subject: str,
content: str,
is_html: bool = True,
) -> Dict[str, Any]:
config = cls.build_config_from_settings()
# 基础配置校验(提前失败,给出明确提示)
required = [config.get('smtp_server'), config.get('smtp_port'), config.get('smtp_username'), config.get('smtp_password')]
if not all(required):
raise CustomException(msg="邮件服务未配置,请先在 .env 配置 SMTP_SERVER/SMTP_PORT/SMTP_USERNAME/SMTP_PASSWORD")
result = cls.send_email_with_config(config, recipient, subject, content, is_html)
if not result.get("success"):
raise CustomException(msg=f"发送邮件失败: {result.get('error')}")
return result
email_servce
这个主要是通过调用email_util里面的工具,实现邮箱验证与其他一些与邮箱相关用到的服务,这个夹在了用户与邮箱工具之间,做一个过渡。
另外这里验证码进行验证与存储时在验证码后面多拼接了一位,用于记录验证次数,验证码本来是123456,那么最初存储的时候是1234560,最后一位用于计数,每验证匹配失败一次就加1,其余失败不变。当超过5次认定有破坏服务器的行为,清理redis并提示重新生成。
class EmailCodeService:
"""邮箱验证码服务"""
@classmethod
async def send_email_code_service(cls, redis: Redis, data: SendEmailCodeSchema, db: AsyncSession | None = None) -> bool:
"""发送邮箱验证码并缓存到Redis"""
# 生成6位数字验证码
import random
code = ''.join([str(random.randint(0, 9)) for _ in range(6)])
# 如果场景为找回密码,且提供了用户名,需要校验用户存在与邮箱一致
if data.scene == EmailSceneEnum.FORGET and data.username:
if db is None:
raise CustomException(msg="服务不可用,请稍后重试")
auth = AuthSchema(db=db)
user = await UserCRUD(auth).get_by_username_crud(username=data.username)
if not user:
raise CustomException(msg="用户不存在")
if user.email and user.email != data.email:
raise CustomException(msg="邮箱与账号不匹配")
# 邮件内容
subject = "验证码通知"
html_content = f"""
<html><body>
<p>您正在进行{data.scene}操作,验证码如下:</p>
<div style='text-align:center;margin:12px 0;'>
<span style='display:inline-block;background:#f8f9fa;padding:12px 20px;border-radius:4px;font-size:24px;font-weight:700;letter-spacing:6px;border:1px solid #e0e0e0'>{code}</span>
<p style='font-size:12px;color:#666;margin-top:8px'>验证码有效期为10分钟</p>
</div>
<p>若非本人操作,请忽略本邮件。</p>
</body></html>
"""
# 发送邮件(统一工具类读取 Settings 配置)
EmailUtil.send_email(recipient=data.email, subject=subject, content=html_content, is_html=True)
# 缓存验证码,缓存的时候缓存一个计数操作,如果查询一定次数将抛出异常,避免有些人进行暴力破解
key = f"{RedisInitKeyConfig.EMAIL_CODES.key}:{data.scene}:{data.email}"
await RedisCURD(redis).set(key=key, value=code+'0', expire=600)
return True
@classmethod
async def verify_email_code_service(cls, redis: Redis, scene: str, email : str, captcha: str) -> bool:
"""校验邮箱验证码,支持注册与找回场景"""
key = f"{RedisInitKeyConfig.EMAIL_CODES.key}:{scene}:{email}"
cache_code = await RedisCURD(redis).get(key)
if not cache_code:
raise CustomException(msg='邮箱验证码不存在或已过期')
try:
cache_code = cache_code.decode() if isinstance(cache_code, (bytes, bytearray)) else str(cache_code)
except Exception:
cache_code = str(cache_code)
if int(cache_code[6]) >= 5:
# 多次验证邮箱,为避免注册时恶意获取验证码,增加一个次数限制
await RedisCURD(redis).delete(key)
raise CustomException(msg='邮箱验证码错误次数过多,请稍后重新获取')
if captcha != cache_code[:6]:
await RedisCURD(redis).set(key=key, value=cache_code[:6]+f'{int(cache_code[6])+1}', expire=300)
raise CustomException(msg='邮箱验证码错误')
# 验证通过后可选择删除验证码,避免复用
await RedisCURD(redis).delete(key)
return True
2.邮箱验证码下发
由于邮箱相关的操作封装到了email_service内,所以这里需要做的处理比较少
@router.post("/get_email_code", summary="发送邮箱验证码", description="发送邮箱验证码")
async def send_email_code_controller(
data: SendEmailCodeSchema,
redis: Redis = Depends(redis_getter),
db: AsyncSession = Depends(db_getter),
) -> JSONResponse:
await EmailCodeService.send_email_code_service(redis=redis, data=data, db=db)
logger.info(f"发送邮箱验证码成功: {data.scene} -> {data.email}")
return SuccessResponse(msg="验证码已发送")
2.注册实现
user_controller
接口地址
@router.post('/register', summary="注册用户", description="注册用户")
async def register_user_controller(
data: UserRegisterSchema,
db: AsyncSession = Depends(db_getter),
redis: Redis = Depends(redis_getter),
) -> JSONResponse:
auth = AuthSchema(db=db)
user_register_result = await UserService.register_user_service(redis=redis, data=data, auth=auth)
logger.info(f"{data.username} 注册用户成功: {user_register_result}")
return SuccessResponse(data=user_register_result, msg='注册用户成功')
user_schema
数据校验模块,这里新添加了一个email与email_code
class UserRegisterSchema(BaseModel):
"""注册"""
name: str = Field(default=None, max_length=15, description="名称")
mobile: Optional[str] = Field(default=None, description="手机号")
username: str = Field(default=None, max_length=15, description="账号")
password: str = Field(default=None, max_length=128, description="密码哈希值")
email: Optional[EmailStr] = Field(default=None, description="邮箱")
email_code: Optional[str] = Field(default=None, max_length=10, description="邮箱验证码")
role_ids: Optional[List[int]] = Field(default=[2], description='角色ID')
creator_id: Optional[int] = Field(default=1, description='创建人ID')
description: Optional[str] = Field(default=f'注册用户{username}', max_length=250, description="备注")
user_service
这里是邮箱验证码的验证,如果通过验证就持久化用户信息,否则就返回相应的提示,dict_data.pop(‘email_code’)是为了适配User model
@classmethod
async def register_user_service(cls, redis: Redis, auth: AuthSchema, data: UserRegisterSchema) -> Dict:
"""用户注册"""
# 检查用户名是否存在
username_ok = await UserCRUD(auth).get_by_username_crud(username=data.username)
if username_ok:
raise CustomException(msg='账号已存在,请去登录')
if not EmailUtil.is_valid_email(data.email):
raise CustomException(msg='邮箱格式不正确')
if not data.email_code:
raise CustomException(msg='邮箱验证码不能为空')
# 验证邮箱及code
await EmailCodeService.verify_email_code_service(redis=redis, scene="register", email=data.email, captcha=data.email_code)
data.password = PwdUtil.set_password_hash(password=data.password)
data.name = data.username
data.creator_id = 1
dict_data = data.model_dump(exclude_unset=True)
# dict_data['creator_id'] = data.creator_id
# dict_data['dept_id'] = data.dept_id
# dict_data['description'] = data.description
dict_data.pop('email_code')
result = await UserCRUD(auth).create(data=dict_data)
await UserCRUD(auth).set_user_roles_crud(user_ids=[result.id], role_ids=data.role_ids)
# await UserCRUD(auth).set_user_positions_crud(user_ids=[result.id], position_ids=data.position_ids)
return UserOutSchema.model_validate(result).model_dump()
3.找回密码实现
user_controller
接口地址
@router.post('/forget/password', summary="忘记密码", description="忘记密码")
async def forget_password_controller(
data: UserForgetPasswordSchema,
db: AsyncSession = Depends(db_getter),
redis: Redis = Depends(redis_getter),
) -> JSONResponse:
auth = AuthSchema(db=db)
user_forget_password_result = await UserService.forget_password_service(redis=redis, data=data, auth=auth)
logger.info(f"{data.username} 重置密码成功: {user_forget_password_result}")
return SuccessResponse(data=user_forget_password_result, msg='重置密码成功')
user_schema
数据校验模块,这里新添加了一个email与email_code
class UserForgetPasswordSchema(BaseModel):
"""忘记密码"""
username: str = Field(default=None, max_length=15, description="用户名")
email: Optional[EmailStr] = Field(default=None, description="邮箱")
email_code: Optional[str] = Field(default=None, max_length=10, description="邮箱验证码")
new_password: str = Field(default=None, max_length=128, description="新密码")
user_service
这块与注册模块类似,比注册模块稍简单。
@classmethod
async def forget_password_service(cls, redis: Redis , auth: AuthSchema, data: UserForgetPasswordSchema) -> Dict:
"""用户忘记密码"""
user = await UserCRUD(auth).get_by_username_crud(username=data.username)
if not user:
raise CustomException(msg="用户不存在")
if not user.status:
raise CustomException(msg="用户已停用")
if not EmailUtil.is_valid_email(data.email):
raise CustomException(msg='邮箱格式不正确')
if not data.email_code:
raise CustomException(msg='邮箱验证码不能为空')
# 验证邮箱及code
await EmailCodeService.verify_email_code_service(redis=redis, scene="forget", email=data.email, captcha=data.email_code)
new_password_hash = PwdUtil.set_password_hash(password=data.new_password)
new_user = await UserCRUD(auth).forget_password_crud(id=user.id, password_hash=new_password_hash)
return UserOutSchema.model_validate(new_user).model_dump()
四、测试
1.注册
可以登录:
2.忘记密码
3.多次验证失败
多次失败,直接重新下发。然后重新验证。不过这种还是有被蒙对的可能,只不过验证码不在固定站在那里等着人找了,
博主的能力还是比较欠缺的,如果有哪些地方有更好的优化欢迎大家提出疑问,合理的话会及时进行改进,希望能够有所帮助!!!