基础逻辑
一般来说这个验证码登录分为手机号、以及邮箱登录
手机号短信验证,以腾讯云SMS 服务为例:
这个操作无非对后端来说就是两个接口:
一个是获取验证码,这块后端生成6位数字+expire_time 去推送到腾讯云sdk ,腾讯云sdk再去推送运营商,运营商再去推送到用户
一个就是验证验证码,根据phone+ expire 去redis查就行了
注意 第一个用户 使用手机号获取验证码的时候,不要插库,因为没有经过验证,所有涉及到手机号相关的,特别是绑定、登录等必须要验证手机号
腾讯云短信sms 服务sdk 封装,注意python只有同步线程版,故使用asyncio.to_thread 封装
import asyncio
import json
from tencentcloud.common import credential
from tencentcloud.sms.v20210111 import sms_client, models
from producer.utils.custom_exception_utils import CustomException
def send_sms_sync(phone, code, expiration_date):
try:
cred = credential.Credential(settings.TENCENT_SMS_SECRET_ID, settings.TENCENT_SMS_SECRET_KEY)
client = sms_client.SmsClient(cred, settings.TENCENT_SMS_REGION)
req = models.SendSmsRequest()
req.SmsSdkAppId = settings.TENCENT_SMS_APP_ID
req.SignName = settings.TENCENT_SMS_SIGN_NAME
req.TemplateId = settings.TENCENT_SMS_TEMPLATE_ID
req.TemplateParamSet = [code, expiration_date]
req.PhoneNumberSet = [phone]
resp = client.SendSms(req)
logger.info(resp.to_json_string(indent=2))
# 将 JSON 字符串转换为字典
r = json.loads(resp.to_json_string())
if r["SendStatusSet"][0]["Code"] == "Ok":
return r
else:
error_message = r["SendStatusSet"][0]["Message"]
logger.error(f"短信SMS服务失败:{error_message}")
raise CustomException(message=f"{phone}短信SMS服务失败\n原因:{error_message}")
except Exception as e:
logger.error(f"发送短信失败: {str(e)}")
raise CustomException(message=str(e))
async def send_sms_async(phone, code, expiration_date):
result = await asyncio.to_thread(send_sms_sync, phone, code, str(expiration_date))
return result
注意先插redis 再发送,因为 发送可能有异常,但是能保证测试的时候redis 有数据
一般来说会以redis 作为expire处理:
async def get_code(self, telephone: str):
"""
获取验证码
"""
code = await self.login_code_services.create_login_code(telephone)
await send_sms_async(phone=telephone, code=code, expiration_date=settings.SMS_VALIDATION_CODE_EXPIRATION)
创建六位数字的逻辑
create_login_code的逻辑:
async def create_login_code(self, telephone: str) -> str:
"""
根据手机号生成登录验证码并存储到 Redis 中
:param telephone: 用户手机号
:return: 生成的验证码
"""
# 生成验证码
# 生成验证码 generate_code 则是加密函数
code = self.generate_code(telephone)
# Redis 键名(可用手机号做区分)
redis_key = f"{self.login_core_prefix}:{telephone}"
# 使用 Redis 存储验证码,有效时间 10 分钟
async with redis_client.get_client() as client:
try:
# 删除已有的验证码
await client.delete(redis_key)
logger.info(f"Existing login code for {telephone} deleted from Redis.")
# 设置新的验证码
await client.set(redis_key, code, ex=self.login_code_expiration)
logger.info(f"New login code for {telephone} stored in Redis: {code}")
except Exception as e:
logger.error(f"Failed to store login code in Redis: {e}")
raise
return code
以上展示的是最简单的验证登录(设计sdk + 简单的expire 处理)
高并发请求限制
1、问题:假设你的系统面对高并发用户时,短信验证码的请求频率可能会非常高。如何防止恶意用户利用暴力破解或刷验证码的方式发起过多请求?
考察点:
• 防止频繁请求(限流)。
• 防止滥用验证码接口。
解决方案:
• 限流机制:使用 Redis 或类似工具实现用户请求频率限制。比如,使用令牌桶算法(Token Bucket)或者漏桶算法(Leaky Bucket)来限制每个手机号每分钟的验证码请求次数。
令牌桶
Token Bucket 令牌桶 [适用于需要平滑请求速率的场景,特别是在高并发的情况下,它可以平衡请求流量。]
令牌桶 本质:
先查redis 对应的键里面的值 的长度 是否超标了
超标了 就refuse 否则就是pass
插入redis的逻辑: 使用zset (有序集合)在[0, time] 插入值
import redis
import time
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
# 配置
max_tokens = 5 # 最大令牌数
rate = 1 # 每秒生成一个令牌,每秒只能插入一个新的令牌时间戳(即每秒最多允许一个请求),
# 如果超出这个速率,后续的请求就会被拒绝。
interval = 60 # 限流时间窗口,单位为秒
def generate_token(phone_number):
# 构造桶键
bucket_key = f"sms_rate_limit:{phone_number}"
# 获取当前时间(秒),这个就是rate 内容
current_time = int(time.time())
# 清除过期的令牌
# • 这行代码用来移除掉超过时间窗口 interval 的过期令牌。
# 如果设置的是 interval=60,那么每次请求时都会移除掉超过 60 秒的令牌,
# 确保令牌桶中只包含当前时间窗口内的令牌。
# 删除 score 在 [0, current_time - interval] 之间的所有元素。
redis_client.zremrangebyscore(bucket_key, 0, current_time - interval)
# 获取当前桶内的令牌数(即时间戳数)
tokens = redis_client.zrange(bucket_key, 0, -1)
# 如果桶里有足够的令牌,则拒绝请求
if len(tokens) >= max_tokens:
return False # 限流拒绝
else:
# 向桶中添加当前时间戳作为新的令牌
redis_client.zadd(bucket_key, {current_time: current_time})
return True # 允许请求
def check_rate_limit(phone_number):
# 获取桶的令牌数量
bucket_key = f"sms_rate_limit:{phone_number}"
tokens = redis_client.llen(bucket_key)
# 如果桶内令牌超过最大容量,表示请求超限
if tokens >= max_tokens:
return False # 拒绝请求
else:
return True # 允许请求
漏桶算法
漏桶算法 (Leaky Bucket)
它的水流速率是固定的,水桶有固定容量。当水桶满了,任何新的请求都会被丢弃。[适用于流量稳定的场景,具有更强的固定速率处理能力。]
import redis
import time
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
# 配置
bucket_key = "sms_rate_limit:phone_number"
max_capacity = 5 # 桶的容量,最大允许的请求数
rate = 1 # 处理请求的速率(每秒处理一个请求)
def process_request(phone_number):
bucket_key = f"sms_rate_limit:{phone_number}"
# 当前时间
current_time = int(time.time())
# 清理过期请求(处理漏桶)
redis_client.zremrangebyscore(bucket_key, 0, current_time - 60)
# 判断请求是否超出容量限制
if redis_client.zcard(bucket_key) >= max_capacity:
return False # 超出请求限制,拒绝请求
# 添加当前请求时间戳
redis_client.zadd(bucket_key, {current_time: current_time})
return True # 允许请求
,但漏桶着重于处理速率限制和固定容量控制,而令牌桶则关注请求的流量速率和生成令牌的动态过程。
滑动窗口限流
• 验证码有效期:设置合理的验证码过期时间(通常是 3-5 分钟),避免用户在很长时间内尝试。
• 滑动窗口限流:每次请求时,记录用户请求的时间戳,在每次请求时检查用户在最近一分钟内的请求次数,如果超过限制,则拒绝该请求。
# 假设我们用 Redis 记录每个手机号的请求次数
import redis
from time import time
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
def check_sms_limit(phone_number):
key = f"sms_request_count:{phone_number}"
current_time = int(time())
expire_time = 60 # 1 minute
max_requests = 5 # 最大请求次数
# 检查手机号最近一段时间内的请求次数
requests = redis_client.lrange(key, 0, -1)
requests = [int(r) for r in requests]
# 删除过期请求(超过1分钟的请求)
requests = [r for r in requests if r > current_time - expire_time]
if len(requests) >= max_requests:
return False # 超过最大请求次数
# 添加当前请求时间
redis_client.rpush(key, current_time)
redis_client.expire(key, expire_time)
return True
验证码泄漏
2、问题:用户的手机可能存在被盗的风险,如何结合其他认证机制提高安全性?比如通过 动态密码 或 生物识别 进一步加强登录安全性。
考察点:
• 多因素认证(MFA)。
• 双重验证的实现。
双重验证(2FA)方案,结合了 一次性密码(OTP) 和 二维码生成 来增强系统的安全性。这个过程通常分为两个步骤:
生成一次性密码:通过一个标准的算法(如 TOTP)生成一次性密码。
二维码展示与扫描:将 OTP 所需的密钥(通常是一个随机生成的密钥)通过二维码的方式呈现给用户,用户可以使用 TOTP 兼容的应用(如 Google Authenticator 或 Authy)来生成验证码。
使用 TOTP (基于时间的一次性密码) 生成 OTP
TOTP(Time-based One-Time Password)算法基于时间生成一次性密码,通常使用 HMAC-SHA1 算法。这个算法确保了每隔一段时间生成一个新的密码。
Time-Based One-Time Password
import pyotp
import qrcode
from hashlib import sha256
# 假设用户的手机号是唯一标识符
user_phone_number = "13800000000"
# 使用手机号作为种子生成唯一的 secret
# 使用 hashlib 将手机号进行哈希处理,生成一个固定的 secret
# 这样即使服务重启,每次生成的 secret 都是一样的
secret = pyotp.random_base32() # 你可以先生成一个固定的 secret, 并保存到数据库
# 如果希望生成基于手机号的 secret,可以使用 hashlib 和手机号
hashed_phone = sha256(user_phone_number.encode()).hexdigest()
secret = hashed_phone[:16] # 使用手机号的哈希值的一部分作为 secret
# 将用户手机号与生成的 secret 绑定存储(此处示例,实际应存数据库)
user = {
"phone_number": user_phone_number,
"2fa_enabled": True,
"2fa_secret": secret
}
# 生成二维码URL
totp = pyotp.TOTP(secret)
uri = totp.provisioning_uri(user_phone_number, issuer_name="YourApp")
# 生成二维码(可以通过Web页面显示)
qr = qrcode.make(uri)
qr.show() # 展示二维码
# 用户扫码后,输入验证码(假设用户输入了 '123456')
user_input_otp = "123456"
# 验证用户输入的OTP是否有效
if totp.verify(user_input_otp):
print("2FA Verification Success")
else:
print("Invalid OTP")
验证码频繁失效
问题:验证码的过期时间通常比较短,但某些场景下可能会发生验证码失效,用户却没有及时看到短信,如何处理这种情况?
延长验证码的过期时间虽然能够解决部分问题,但带来的一些安全和性能隐患也是不容忽视的。最好的解决方案是在验证码过期之前提供验证码重发、动态刷新或者适当的过期提醒等方式来保障用户的体验,同时确保系统的安全性和效率。
跨设备验证码
问题:同一个用户在不同设备上登录时,可能会因为设备间验证码的同步问题导致登录失败或需要重复输入验证码。如何在跨设备的场景下保证一致性?
考察点:
• 跨设备的一致性。
• 设备间验证码的共享和同步。
. 生成设备标识(Device ID)
每次用户登录时,前端可以生成一个唯一的设备标识(Device ID),并将其作为参数与验证码一起发送到后端。这个设备标识可以基于设备的硬件信息、安装的应用ID,或者生成一个唯一的UUID。比如可以通过浏览器的 localStorage、sessionStorage 或者移动端的设备ID生成。
2. 发送验证码时包含设备标识
后端在发送验证码时,将设备标识与手机号和验证码一起存储在 Redis 或数据库中。这样,无论用户在哪个设备上获取验证码,都能根据设备标识进行有效的关联。