简介:此部分内容为,在FastAPI + SQLAlchemy (异步版)连接数据库时,需要对保存在数据库中的API_KEY 以及用户密码进行加密时所著:
一、AES-GCM对称加密步骤
1.特点 :
- 单一密钥:加密和解密使用相同的密钥(对称加密),需通过安全方式(如环境变量、密钥管理系统)存储和传输密钥。
- 密钥长度:支持 128、192、256 位密钥(推荐 256 位以获得最高安全性)。
2.加解密步骤:
1.生成密钥:
密钥一般存在环境变量中,使用安全的随机数生成器(如 Python 的 os.urandom
):
generate_aes_gcm_key.py:
import os
import base64
# 1. 生成 AES-256 密钥
def generate_aes_gcm_key(key_size: int = 32) -> bytes:
"""
生成指定长度的 AES-GCM 密钥(字节串)。
:param key_size: 16(AES-128)、24(AES-192)、32(AES-256)
:return: 密钥(bytes)
"""
if key_size not in (16, 24, 32):
raise ValueError("Key size must be 16, 24, or 32 bytes")
return os.urandom(key_size)
# 2. 将密钥转换为 Base64 字符串(便于存储到环境变量)
def b64encode_generated_key(generated_key: bytes) ->str:
key_base64 = base64.b64encode(generated_key).decode()
# print("Base64 编码的密钥:", key_base64)
return key_base64
# 只运行一次,确保全流程中密钥统一
if __name__ == "__main__":
generated_key = generate_aes_gcm_key()
key_base64 = b64encode_generated_key(generated_key)
print("编码后的密钥:",key_base64) # 需要手动保存到环境变量中(.env)
2.将generate_aes_gcm_key.py中生成的密钥 AES_KEY 手动复制添加到 .env文件中:
3.在 config.py 文件中加载环境变量
from pydantic_settings import BaseSettings
from dotenv import load_dotenv
import os
# 加载环境变量(仅本地开发)
load_dotenv()
class DifySetting(BaseSettings):
MYSQL_HOST: str
MYSQL_PORT: int = 3306
MYSQL_USER: str
MYSQL_PASSWORD: str
MYSQL_NAME: str
APP_ENV: str = "dev"
AES_KEY: str # AES 密钥(Base64 编码)
AES_GCM_NONCE_SIZE: int = 12 # 注意,环境变量中存在的值,在DifySetting这个类中也必须包含
class Config:
env_file =".env"
env_file_encoding = "utf-8"
# 全局 AES-GCM 实例
dify_settings = DifySetting()
4.编写 aes_gcm_security.py 加密、解密函数:
import base64
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from app.core.config import dify_settings # 假设这是你的配置模块(包含 AES_KEY)
import os
from cryptography.exceptions import InvalidTag
# 1. 初始化 AESGCM 对象
def load_aes_key() -> AESGCM:
key_bytes = base64.b64decode(dify_settings.AES_KEY) # Base64 → bytes
return AESGCM(key_bytes) # 创建 AESGCM 实例
aesgcm = load_aes_key()
# 2. 加密函数 - 返回单个组合字符串
def encrypt_aes_gcm_combined(plaintext: str) -> str:
"""
使用 AES-GCM 加密明文字符串,返回组合字符串(IV+密文的Base64编码)
:param plaintext: 明文(字符串)
:return: Base64编码的字符串(前16字符为IV,后面为密文)
"""
plaintext_bytes = plaintext.encode("utf-8")
iv = os.urandom(12) # 生成12字节随机IV
ciphertext = aesgcm.encrypt(iv, plaintext_bytes, None)
# 拼接IV和密文后整体进行Base64编码
combined = iv + ciphertext
return base64.b64encode(combined).decode("utf-8")
# 3. 解密函数 - 从组合字符串解密
def decrypt_aes_gcm_combined(combined_base64: str) -> str:
"""
从组合字符串解密出原始明文
:param combined_base64: Base64编码的组合字符串(IV+密文)
:return: 明文(字符串)
"""
try:
combined = base64.b64decode(combined_base64)
iv = combined[:12] # 前12字节为IV
ciphertext = combined[12:] # 剩余部分为密文
plaintext_bytes = aesgcm.decrypt(iv, ciphertext, None)
return plaintext_bytes.decode("utf-8")
except InvalidTag:
raise ValueError("解密失败:认证标签无效(密钥或数据损坏)")
except Exception as e:
raise ValueError(f"解密失败:{str(e)}")
5. 在编写 FastAPI 时调用加密、解密函数;
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
from datetime import datetime
from sqlalchemy import select, exc
from app.database.database import db_dependency
from app.models.dify_models_ORM import Agent
# 导入优化后的加密函数
from app.core.aes_gcm_security import encrypt_aes_gcm_combined, decrypt_aes_gcm_combined
dify_router = APIRouter()
# 定义 Pydantic 模型
class AgentResponse(BaseModel):
id: int
agent_name: str
agent_describe: str
agent_url: str
agent_Content_Type: str
agent_api_key: str # 返回解密后的API Key
user: str
created_at: datetime
class Config:
from_attributes = True
class CreateAgentRequest(BaseModel):
agent_name: str
agent_describe: str
url: str
api_key: str # 接收明文API Key
content_type: str = "application/json"
user: str
# 创建 Agent - 使用优化后的加密方法
@dify_router.post("/dify_agents", status_code=status.HTTP_201_CREATED)
async def create_agent(request: CreateAgentRequest, db: db_dependency):
try:
# 使用新的组合加密方法
combined_ciphertext = encrypt_aes_gcm_combined(request.api_key)
db_agent = Agent(
agent_name=request.agent_name,
agent_describe=request.agent_describe,
agent_url=request.url,
agent_api_key=combined_ciphertext, # 存储组合密文
agent_Content_Type=request.content_type,
user=request.user,
created_at=datetime.utcnow()
)
db.add(db_agent)
await db.commit()
await db.refresh(db_agent)
return {"agent_id": db_agent.id}
except exc.IntegrityError:
await db.rollback()
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="API Key 已存在")
except Exception as e:
await db.rollback()
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
# 查询单个 Agent - 使用优化后的解密方法
@dify_router.get("/dify_agents/{agent_id}", status_code=status.HTTP_200_OK)
async def read_agent(agent_id: int, db: db_dependency):
try:
result = await db.execute(select(Agent).where(Agent.id == agent_id))
agent = result.scalars().first()
if not agent:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Agent 未找到")
# 解密逻辑 - 使用新的组合解密方法
try:
decrypted_key = decrypt_aes_gcm_combined(agent.agent_api_key)
# 创建代理对象副本,避免直接修改ORM对象,即返回为此副本中数据
agent_data = {
"id": agent.id,
"agent_name": agent.agent_name,
"agent_describe": agent.agent_describe,
"agent_url": agent.agent_url,
"agent_Content_Type": agent.agent_Content_Type,
"agent_api_key": decrypted_key, # 使用解密后的密钥
"user": agent.user,
"created_at": agent.created_at
}
return AgentResponse(**agent_data)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"解密失败: {str(e)}"
)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
# 查询所有 Agents - 不返回敏感API Key
@dify_router.get("/dify_agents", status_code=status.HTTP_200_OK)
async def read_agents(db: db_dependency):
try:
result = await db.execute(select(Agent))
agents = result.scalars().all()
# 返回不包含敏感API Key的数据
safe_agents = []
for agent in agents:
safe_agents.append({
"id": agent.id,
"agent_name": agent.agent_name,
"agent_describe": agent.agent_describe,
"agent_url": agent.agent_url,
"agent_Content_Type": agent.agent_Content_Type,
"user": agent.user,
"created_at": agent.created_at
})
return safe_agents
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
# 删除 Agent - 保持不变
@dify_router.delete("/dify_agents/{agent_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_agent(agent_id: int, db: db_dependency):
try:
result = await db.execute(select(Agent).where(Agent.id == agent_id))
agent = result.scalar()
if not agent:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Agent 未找到")
await db.delete(agent)
await db.commit()
except Exception as e:
await db.rollback()
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
return None
二、密码哈希加密用户密码:
1.特点:
- 密码哈希算法是单向的(不可逆),专门为存储密码(等无需还原原来明文信息所设计);
- 抗暴力破解:通过加盐(Salt)和多次迭代(Work Factor)增加计算成本;
2.加密步骤:
2.1 生成密钥保存在环境变量中(.env文件):
2.2 在 config.py 配置文件中加载环境变量:
config.py:
from pydantic_settings import BaseSettings
from dotenv import load_dotenv
import os
# 加载环境变量(仅本地开发)
load_dotenv()
# 注意,环境变量(.env)中存在的值,在DifySetting这个类中也必须包含
class DifySetting(BaseSettings):
MYSQL_HOST: str
MYSQL_PORT: int = 3306
MYSQL_USER: str
MYSQL_PASSWORD: str
MYSQL_NAME: str
APP_ENV: str = "dev"
AES_KEY: str # AES 密钥(Base64 编码)
AES_GCM_NONCE_SIZE: int
Hash_KEY: str
Hash_ALGORITHM: str
ACCESS_TOKEN_EXPIRE_MINUTES: int
class Config:
env_file =".env"
env_file_encoding = "utf-8"
# 全局 AES-GCM 实例
dify_settings = DifySetting()
2.3 编写 hash_security.py 文件哈希加密函数
记得从配置文件 config.py 导入加载的环境变量
# 导入必要的库和模块
from jose import JWTError, jwt # JWT令牌处理库
from passlib.context import CryptContext # 密码哈希库
from datetime import datetime, timedelta # 日期时间处理
from app.core.config import dify_settings # 假设这是你的配置模块(包含 Hash_KEY Hash_ALGORITHM)
# ===== 密码哈希工具 =====
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# CryptContext:这是 passlib 库中用于管理密码哈希策略的核心类,它提供了统一的接口来处理多种密码哈希算法,支持密码的哈希生成、验证和迁移
# schemes=["bcrypt"]: 指定使用的密码哈希算法为 bcrypt,一种专门为密码存储设计的加密算法
# deprecated="auto": 自动检测并标记过时的哈希算法, 当有更好的算法可用时,会自动标记旧算法为不推荐, 允许在验证旧密码后自动升级到新算法
# 生成密码哈希
def get_password_hash(password: str):
return pwd_context.hash(password)
# 验证哈希密码的一致性, 验证明文密码是否与哈希密码匹配
def verify_password(plain_password: str, hashed_password: str) ->bool:
return pwd_context.verify(plain_password, hashed_password)
2.4 调用哈希加密函数
导入包: