FastAPI + SQLAlchemy (异步版)连接数据库时,对数据进行加密

发布于:2025-07-13 ⋅ 阅读:(15) ⋅ 点赞:(0)

简介:此部分内容为,在FastAPI + SQLAlchemy (异步版)连接数据库时,需要对保存在数据库中的API_KEY 以及用户密码进行加密时所著:

一、AES-GCM对称加密步骤

1.特点 :

  • 单一密钥:加密和解密使用相同的密钥(对称加密),需通过安全方式(如环境变量、密钥管理系统)存储和传输密钥。
  • 密钥长度:支持 128、192、256 位密钥(推荐 256 位以获得最高安全性)。

2.加解密步骤: 

1.生成密钥:

密钥一般存在环境变量中,使用安全的随机数生成器(如 Python 的 os.urandom):

generate_aes_gcm_key.py:

import os
import base64


# 1. 生成 AES-256 密钥
def generate_aes_gcm_key(key_size: int = 32) -> bytes:
    """
    生成指定长度的 AES-GCM 密钥(字节串)。
    :param key_size: 16(AES-128)、24(AES-192)、32(AES-256)
    :return: 密钥(bytes)
    """
    if key_size not in (16, 24, 32):
        raise ValueError("Key size must be 16, 24, or 32 bytes")
    return os.urandom(key_size)


# 2. 将密钥转换为 Base64 字符串(便于存储到环境变量)
def b64encode_generated_key(generated_key: bytes) ->str:
    key_base64 = base64.b64encode(generated_key).decode()
    # print("Base64 编码的密钥:", key_base64)
    return key_base64


# 只运行一次,确保全流程中密钥统一
if __name__ == "__main__":
    generated_key = generate_aes_gcm_key()
    key_base64 = b64encode_generated_key(generated_key)
    print("编码后的密钥:",key_base64) # 需要手动保存到环境变量中(.env)

2.将generate_aes_gcm_key.py中生成的密钥 AES_KEY 手动复制添加到 .env文件中:

3.在 config.py 文件中加载环境变量

from pydantic_settings import BaseSettings
from dotenv import load_dotenv
import os

# 加载环境变量(仅本地开发)
load_dotenv()

class DifySetting(BaseSettings):
    MYSQL_HOST: str
    MYSQL_PORT: int = 3306
    MYSQL_USER: str
    MYSQL_PASSWORD: str
    MYSQL_NAME: str
    
    APP_ENV: str = "dev"
    AES_KEY: str  # AES 密钥(Base64 编码)
    AES_GCM_NONCE_SIZE: int = 12  # 注意,环境变量中存在的值,在DifySetting这个类中也必须包含

    class Config:
        env_file =".env"
        env_file_encoding = "utf-8"

# 全局 AES-GCM 实例
dify_settings = DifySetting()

4.编写 aes_gcm_security.py 加密、解密函数:

import base64
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from app.core.config import dify_settings  # 假设这是你的配置模块(包含 AES_KEY)
import os
from cryptography.exceptions import InvalidTag

# 1. 初始化 AESGCM 对象
def load_aes_key() -> AESGCM:
    key_bytes = base64.b64decode(dify_settings.AES_KEY)  # Base64 → bytes
    return AESGCM(key_bytes)  # 创建 AESGCM 实例

aesgcm = load_aes_key()

# 2. 加密函数 - 返回单个组合字符串
def encrypt_aes_gcm_combined(plaintext: str) -> str:
    """
    使用 AES-GCM 加密明文字符串,返回组合字符串(IV+密文的Base64编码)
    :param plaintext: 明文(字符串)
    :return: Base64编码的字符串(前16字符为IV,后面为密文)
    """
    plaintext_bytes = plaintext.encode("utf-8")
    iv = os.urandom(12)  # 生成12字节随机IV
    ciphertext = aesgcm.encrypt(iv, plaintext_bytes, None)
    # 拼接IV和密文后整体进行Base64编码
    combined = iv + ciphertext
    return base64.b64encode(combined).decode("utf-8")

# 3. 解密函数 - 从组合字符串解密
def decrypt_aes_gcm_combined(combined_base64: str) -> str:
    """
    从组合字符串解密出原始明文
    :param combined_base64: Base64编码的组合字符串(IV+密文)
    :return: 明文(字符串)
    """
    try:
        combined = base64.b64decode(combined_base64)
        iv = combined[:12]  # 前12字节为IV
        ciphertext = combined[12:]  # 剩余部分为密文
        plaintext_bytes = aesgcm.decrypt(iv, ciphertext, None)
        return plaintext_bytes.decode("utf-8")
    except InvalidTag:
        raise ValueError("解密失败:认证标签无效(密钥或数据损坏)")
    except Exception as e:
        raise ValueError(f"解密失败:{str(e)}")

 5. 在编写 FastAPI 时调用加密、解密函数;

from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
from datetime import datetime
from sqlalchemy import select, exc
from app.database.database import db_dependency
from app.models.dify_models_ORM import Agent
# 导入优化后的加密函数
from app.core.aes_gcm_security import encrypt_aes_gcm_combined, decrypt_aes_gcm_combined

dify_router = APIRouter()

# 定义 Pydantic 模型
class AgentResponse(BaseModel):
    id: int
    agent_name: str
    agent_describe: str
    agent_url: str
    agent_Content_Type: str
    agent_api_key: str  # 返回解密后的API Key
    user: str
    created_at: datetime

    class Config:
        from_attributes = True

class CreateAgentRequest(BaseModel):
    agent_name: str
    agent_describe: str
    url: str
    api_key: str  # 接收明文API Key
    content_type: str = "application/json"
    user: str

# 创建 Agent - 使用优化后的加密方法
@dify_router.post("/dify_agents", status_code=status.HTTP_201_CREATED)
async def create_agent(request: CreateAgentRequest, db: db_dependency):
    try:
        # 使用新的组合加密方法
        combined_ciphertext = encrypt_aes_gcm_combined(request.api_key)
        
        db_agent = Agent(
            agent_name=request.agent_name,
            agent_describe=request.agent_describe,
            agent_url=request.url,
            agent_api_key=combined_ciphertext,  # 存储组合密文
            agent_Content_Type=request.content_type,
            user=request.user,
            created_at=datetime.utcnow()
        )
        db.add(db_agent)
        await db.commit()
        await db.refresh(db_agent)
        return {"agent_id": db_agent.id}
    except exc.IntegrityError:
        await db.rollback()
        raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="API Key 已存在")
    except Exception as e:
        await db.rollback()
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))

# 查询单个 Agent - 使用优化后的解密方法
@dify_router.get("/dify_agents/{agent_id}", status_code=status.HTTP_200_OK)
async def read_agent(agent_id: int, db: db_dependency):
    try:
        result = await db.execute(select(Agent).where(Agent.id == agent_id))
        agent = result.scalars().first()
        if not agent:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Agent 未找到")
        
        # 解密逻辑 - 使用新的组合解密方法
        try:
            decrypted_key = decrypt_aes_gcm_combined(agent.agent_api_key)
            # 创建代理对象副本,避免直接修改ORM对象,即返回为此副本中数据
            agent_data = {
                "id": agent.id,
                "agent_name": agent.agent_name,
                "agent_describe": agent.agent_describe,
                "agent_url": agent.agent_url,
                "agent_Content_Type": agent.agent_Content_Type,
                "agent_api_key": decrypted_key,  # 使用解密后的密钥
                "user": agent.user,
                "created_at": agent.created_at
            }
            return AgentResponse(**agent_data)
        except Exception as e:
            raise HTTPException(
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail=f"解密失败: {str(e)}"
            )
    except Exception as e:
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))

# 查询所有 Agents - 不返回敏感API Key
@dify_router.get("/dify_agents", status_code=status.HTTP_200_OK)
async def read_agents(db: db_dependency):
    try:
        result = await db.execute(select(Agent))
        agents = result.scalars().all()
        
        # 返回不包含敏感API Key的数据
        safe_agents = []
        for agent in agents:
            safe_agents.append({
                "id": agent.id,
                "agent_name": agent.agent_name,
                "agent_describe": agent.agent_describe,
                "agent_url": agent.agent_url,
                "agent_Content_Type": agent.agent_Content_Type,
                "user": agent.user,
                "created_at": agent.created_at
            })
        
        return safe_agents
    except Exception as e:
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))

# 删除 Agent - 保持不变
@dify_router.delete("/dify_agents/{agent_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_agent(agent_id: int, db: db_dependency):
    try:
        result = await db.execute(select(Agent).where(Agent.id == agent_id))
        agent = result.scalar()
        if not agent:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Agent 未找到")
        
        await db.delete(agent)
        await db.commit()
    except Exception as e:
        await db.rollback()
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
    return None

二、密码哈希加密用户密码:

1.特点:

  • 密码哈希算法是​​单向的​​(不可逆),专门为存储密码(等无需还原原来明文信息所设计);
  • 抗暴力破解​​:通过加盐(Salt)和多次迭代(Work Factor)增加计算成本;

2.加密步骤: 

2.1 生成密钥保存在环境变量中(.env文件):

 2.2 在 config.py 配置文件中加载环境变量:

 config.py:

from pydantic_settings import BaseSettings
from dotenv import load_dotenv
import os

# 加载环境变量(仅本地开发)
load_dotenv()


# 注意,环境变量(.env)中存在的值,在DifySetting这个类中也必须包含
class DifySetting(BaseSettings):
    MYSQL_HOST: str
    MYSQL_PORT: int = 3306
    MYSQL_USER: str
    MYSQL_PASSWORD: str
    MYSQL_NAME: str
    
    APP_ENV: str = "dev"
    AES_KEY: str  # AES 密钥(Base64 编码)
    AES_GCM_NONCE_SIZE: int  
    
    Hash_KEY: str
    Hash_ALGORITHM: str
    ACCESS_TOKEN_EXPIRE_MINUTES: int

    class Config:
        env_file =".env"
        env_file_encoding = "utf-8"

# 全局 AES-GCM 实例
dify_settings = DifySetting()

2.3 编写 hash_security.py 文件哈希加密函数

记得从配置文件 config.py 导入加载的环境变量

# 导入必要的库和模块
from jose import JWTError, jwt  # JWT令牌处理库
from passlib.context import CryptContext  # 密码哈希库
from datetime import datetime, timedelta  # 日期时间处理
from app.core.config import dify_settings  # 假设这是你的配置模块(包含 Hash_KEY Hash_ALGORITHM)


# ===== 密码哈希工具 =====
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# CryptContext:这是 passlib 库中用于管理密码哈希策略的核心类,它提供了统一的接口来处理多种密码哈希算法,支持密码的哈希生成、验证和迁移
# schemes=["bcrypt"]: 指定使用的密码哈希算法为 bcrypt,一种专门为密码存储设计的加密算法
# deprecated="auto": 自动检测并标记过时的哈希算法, 当有更好的算法可用时,会自动标记旧算法为不推荐, 允许在验证旧密码后自动升级到新算法


#  生成密码哈希
def get_password_hash(password: str):
    return pwd_context.hash(password) 

# 验证哈希密码的一致性, 验证明文密码是否与哈希密码匹配
def verify_password(plain_password: str, hashed_password: str) ->bool:
    return pwd_context.verify(plain_password, hashed_password)

2.4 调用哈希加密函数

导入包:


网站公告

今日签到

点亮在社区的每一天
去签到