第2篇:数据持久化实战

发布于:2025-09-13 ⋅ 阅读:(18) ⋅ 点赞:(0)

在上一篇中,我们构建了一个基于内存存储的食谱助手。说实话,内存存储虽然简单,但有个致命问题:程序一重启,数据就全没了。

所以这篇我们要解决数据持久化的问题,将食谱助手从内存存储升级到SQLite数据库。

项目结构重组

先把项目结构整理得更清晰一些。之前我们所有代码都在一个文件里,现在按功能分模块:

mkdir -p recipe-assistant/app recipe-assistant/data
cd recipe-assistant
touch app/__init__.py app/main.py app/models.py

新的目录结构:

recipe-assistant/
 app/
    __init__.py
    main.py          # 主应用入口(包含所有MCP组件)
    models.py        # 数据模型和数据库管理
 data/
    recipes.db       # SQLite数据库文件bb

重要说明:FastMCP不支持模块化的方式(如include_router),所有的资源、工具和提示词组件都必须定义在同一个FastMCP实例上。因此我们将所有MCP组件都放在main.py中。

数据库设计

创建 app/models.py,定义数据模型和数据库操作:

# app/models.py
from pydantic import BaseModel
from typing import List, Optional, Dict
import sqlite3
import json
import os

# 数据模型定义(与第1篇保持一致)
class Ingredient(BaseModel):
    name: str
    quantity: str

class Recipe(BaseModel):
    id: str
    name: str
    cuisine: str
    description: str
    ingredients: List[Ingredient]
    steps: List[str]
    difficulty: str

class UserPreference(BaseModel):
    user_id: str
    favorite_cuisines: List[str]
    dietary_restrictions: List[str]
    cooking_skill: str

# 数据库管理类
class DatabaseManager:
    def __init__(self, db_path="data/recipes.db"):
        self.db_path = db_path
        self.conn = None
        self.initialize_db()
    
    def get_connection(self):
        if self.conn is None:
            # 确保数据目录存在
            os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
            self.conn = sqlite3.connect(self.db_path)
            self.conn.execute("PRAGMA foreign_keys = ON")
            self.conn.row_factory = sqlite3.Row
        return self.conn
    
    def initialize_db(self):
        """创建数据库表并导入第1篇的示例数据"""
        conn = self.get_connection()
        cursor = conn.cursor()
        
        # 创建食谱表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS recipes (
            id TEXT PRIMARY KEY,
            name TEXT NOT NULL,
            cuisine TEXT NOT NULL,
            description TEXT,
            ingredients TEXT NOT NULL,
            steps TEXT NOT NULL,
            difficulty TEXT NOT NULL
        )
        ''')
        
        # 创建用户偏好表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS user_preferences (
            user_id TEXT PRIMARY KEY,
            favorite_cuisines TEXT NOT NULL,
            dietary_restrictions TEXT NOT NULL,
            cooking_skill TEXT NOT NULL
        )
        ''')
        
        conn.commit()
        
        # 导入第1篇的示例数据
        self.import_data()
    
    def import_data(self):
        """导入第1篇的示例数据"""
        conn = self.get_connection()
        cursor = conn.cursor()
        
        # 检查是否已有数据
        cursor.execute("SELECT COUNT(*) FROM recipes")
        count = cursor.fetchone()[0]
        
        if count == 0:
            # 第1篇的示例食谱数据
            recipes = [
                {
                    "id": "recipe_001",
                    "name": "宫保鸡丁",
                    "cuisine": "川菜",
                    "description": "经典川菜,麻辣鲜香",
                    "ingredients": [
                        {"name": "鸡胸肉", "quantity": "300g"},
                        {"name": "花生米", "quantity": "50g"},
                        {"name": "干辣椒", "quantity": "10个"},
                        {"name": "花椒", "quantity": "1茶匙"},
                        {"name": "葱", "quantity": "2根"},
                        {"name": "姜", "quantity": "3片"},
                        {"name": "蒜", "quantity": "3瓣"}
                    ],
                    "steps": [
                        "鸡胸肉切丁,用料酒、生抽、淀粉腌制15分钟",
                        "热锅凉油,放入花椒和干辣椒爆香",
                        "加入鸡丁翻炒至变色",
                        "加入葱姜蒜继续翻炒",
                        "加入调好的宫保汁炒匀",
                        "最后加入花生米炒匀即可"
                    ],
                    "difficulty": "中等"
                },
                {
                    "id": "recipe_002",
                    "name": "番茄炒蛋",
                    "cuisine": "家常菜",
                    "description": "简单易做的家常菜",
                    "ingredients": [
                        {"name": "番茄", "quantity": "2个"},
                        {"name": "鸡蛋", "quantity": "3个"},
                        {"name": "葱", "quantity": "适量"},
                        {"name": "盐", "quantity": "适量"},
                        {"name": "糖", "quantity": "少许"}
                    ],
                    "steps": [
                        "番茄切块,鸡蛋打散",
                        "热锅倒油,倒入鸡蛋炒熟盛出",
                        "锅中再倒少许油,放入番茄翻炒",
                        "番茄出汁后加入盐和糖调味",
                        "倒入炒好的鸡蛋翻炒均匀",
                        "撒上葱花即可"
                    ],
                    "difficulty": "简单"
                }
            ]
            
            # 插入食谱数据
            for recipe in recipes:
                cursor.execute('''
                INSERT INTO recipes (id, name, cuisine, description, ingredients, steps, difficulty)
                VALUES (?, ?, ?, ?, ?, ?, ?)
                ''', (
                    recipe["id"],
                    recipe["name"],
                    recipe["cuisine"],
                    recipe["description"],
                    json.dumps(recipe["ingredients"], ensure_ascii=False),
                    json.dumps(recipe["steps"], ensure_ascii=False),
                    recipe["difficulty"]
                ))
            
            # 插入示例用户偏好
            preferences = {
                "user_001": {
                    "favorite_cuisines": ["川菜"],
                    "dietary_restrictions": ["少油", "少盐"],
                    "cooking_skill": "初级"
                },
                "user_002": {
                    "favorite_cuisines": ["家常菜"],
                    "dietary_restrictions": ["健康"],
                    "cooking_skill": "初级"
                }
            }
            
            for user_id, prefs in preferences.items():
                cursor.execute('''
                INSERT INTO user_preferences (user_id, favorite_cuisines, dietary_restrictions, cooking_skill)
                VALUES (?, ?, ?, ?)
                ''', (
                    user_id,
                    json.dumps(prefs["favorite_cuisines"], ensure_ascii=False),
                    json.dumps(prefs["dietary_restrictions"], ensure_ascii=False),
                    prefs["cooking_skill"]
                ))
            
            conn.commit()
            print("示例数据导入完成")
    
    # 食谱相关操作
    def get_all_recipes(self):
        """获取所有食谱"""
        conn = self.get_connection()
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM recipes")
        rows = cursor.fetchall()
        
        recipes = []
        for row in rows:
            recipe = {
                "id": row["id"],
                "name": row["name"],
                "cuisine": row["cuisine"],
                "description": row["description"],
                "ingredients": json.loads(row["ingredients"]),
                "steps": json.loads(row["steps"]),
                "difficulty": row["difficulty"]
            }
            recipes.append(recipe)
        
        return recipes
    
    def get_recipe_by_id(self, recipe_id: str):
        """根据ID获取食谱"""
        conn = self.get_connection()
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM recipes WHERE id = ?", (recipe_id,))
        row = cursor.fetchone()
        
        if row:
            return {
                "id": row["id"],
                "name": row["name"],
                "cuisine": row["cuisine"],
                "description": row["description"],
                "ingredients": json.loads(row["ingredients"]),
                "steps": json.loads(row["steps"]),
                "difficulty": row["difficulty"]
            }
        return None
    
    def search_recipes_by_ingredient(self, ingredient: str):
        """根据食材搜索食谱"""
        conn = self.get_connection()
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM recipes WHERE ingredients LIKE ?", (f'%{ingredient}%',))
        rows = cursor.fetchall()
        
        recipes = []
        for row in rows:
            recipe = {
                "id": row["id"],
                "name": row["name"],
                "cuisine": row["cuisine"],
                "description": row["description"],
                "ingredients": json.loads(row["ingredients"]),
                "steps": json.loads(row["steps"]),
                "difficulty": row["difficulty"]
            }
            recipes.append(recipe)
        
        return recipes
    
    # 用户偏好相关操作
    def get_user_preferences(self, user_id: str):
        """获取用户偏好"""
        conn = self.get_connection()
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM user_preferences WHERE user_id = ?", (user_id,))
        row = cursor.fetchone()
        
        if row:
            return {
                "user_id": row["user_id"],
                "favorite_cuisines": json.loads(row["favorite_cuisines"]),
                "dietary_restrictions": json.loads(row["dietary_restrictions"]),
                "cooking_skill": row["cooking_skill"]
            }
        return None

# 全局数据库实例
db = DatabaseManager()

def _get_all_recipes() -> Dict:
    """获取所有食谱数据"""
    try:
        recipes = db.get_all_recipes()
        return {
            "success": True,
            "recipes": recipes,
            "count": len(recipes)
        }
    except Exception as e:
        return {
            "success": False,
            "error": f"获取食谱数据失败: {str(e)}"
        }

def _get_recipe_by_id(recipe_id: str) -> Dict:
    """根据ID获取特定食谱"""
    try:
        recipe = db.get_recipe_by_id(recipe_id)
        if recipe:
            return {
                "success": True,
                "recipe": recipe
            }
        else:
            return {
                "success": False,
                "error": f"未找到ID为{recipe_id}的食谱"
            }
    except Exception as e:
        return {
            "success": False,
            "error": f"获取食谱失败: {str(e)}"
        }

def _get_user_preferences(user_id: str) -> Dict:
    """获取用户偏好数据"""
    try:
        preferences = db.get_user_preferences(user_id)
        if preferences:
            return {
                "success": True,
                "preferences": preferences
            }
        else:
            return {
                "success": False,
                "error": f"未找到ID为{user_id}的用户"
            }
    except Exception as e:
        return {
            "success": False,
            "error": f"获取用户偏好失败: {str(e)}"
        }

def _search_recipes_by_ingredient(ingredient: str) -> Dict:
    """根据食材查询食谱"""
    try:
        if not ingredient or not ingredient.strip():
            return {
                "success": False,
                "error": "请提供有效的食材名称"
            }
        
        recipes = db.search_recipes_by_ingredient(ingredient.strip())
        
        if recipes:
            return {
                "success": True,
                "message": f"找到了{len(recipes)}个包含{ingredient}的食谱",
                "recipes": recipes
            }
        else:
            return {
                "success": True,
                "message": f"抱歉,没有找到包含{ingredient}的食谱",
                "recipes": []
            }
    except Exception as e:
        return {
            "success": False,
            "error": f"查询食谱时出错: {str(e)}"
        }

def _recommend_recipes(user_id: str, available_ingredients: List[str] = None) -> Dict:
    """根据用户偏好推荐食谱"""
    try:
        # 获取用户偏好
        user_prefs = db.get_user_preferences(user_id)
        if not user_prefs:
            return {
                "success": False,
                "error": f"未找到ID为{user_id}的用户偏好"
            }
        
        # 获取所有食谱
        all_recipes = db.get_all_recipes()
        recommended_recipes = []
        
        for recipe in all_recipes:
            # 根据用户喜好的菜系过滤
            if recipe["cuisine"] in user_prefs["favorite_cuisines"]:
                # 如果提供了可用食材,检查是否匹配
                if available_ingredients:
                    recipe_ingredients = [ing["name"] for ing in recipe["ingredients"]]
                    if any(avail_ing in recipe_ingredients for avail_ing in available_ingredients):
                        recommended_recipes.append(recipe)
                else:
                    recommended_recipes.append(recipe)
        
        return {
            "success": True,
            "message": f"为您推荐了{len(recommended_recipes)}道菜",
            "recipes": recommended_recipes[:5]  # 限制返回数量
        }
    except Exception as e:
        return {
            "success": False,
            "error": f"推荐食谱时出错: {str(e)}"
        }

这个数据库管理类把之前内存中的操作都移到了SQLite里。

主应用入口

现在创建 app/main.py,将所有MCP组件集中在一个文件中:

# app/main.py
from fastmcp import FastMCP
from typing import Dict, List
from .models import (
    _get_all_recipes, 
    _get_recipe_by_id, 
    _get_user_preferences,
    _search_recipes_by_ingredient,
    _recommend_recipes
)

# 创建主应用实例
mcp = FastMCP("RecipeAssistant")

# 资源组件
@mcp.resource("recipes://all")
def get_all_recipes() -> Dict:
    """
    获取所有食谱数据
    
    Returns:
        包含所有食谱的字典
    """
    return _get_all_recipes()

@mcp.resource("recipes://{recipe_id}")
def get_recipe_by_id(recipe_id: str) -> Dict:
    """
    根据ID获取特定食谱
    
    Args:
        recipe_id: 食谱ID
    
    Returns:
        食谱详细信息
    """
    return _get_recipe_by_id(recipe_id)

@mcp.resource("users://{user_id}/preferences")
def get_user_preferences(user_id: str) -> Dict:
    """
    获取用户偏好数据
    
    Args:
        user_id: 用户ID
    
    Returns:
        用户偏好数据
    """
    return _get_user_preferences(user_id)

# 工具组件
@mcp.tool()
def search_recipes_by_ingredient(ingredient: str) -> Dict:
    """
    根据食材查询食谱
    
    Args:
        ingredient: 食材名称
    
    Returns:
        包含匹配食谱的字典
    """
    return _search_recipes_by_ingredient(ingredient)

@mcp.tool()
def recommend_recipes(user_id: str, available_ingredients: List[str] = None) -> Dict:
    """
    根据用户偏好和可用食材推荐食谱
    
    Args:
        user_id: 用户ID
        available_ingredients: 可用食材列表(可选)
    
    Returns:
        包含推荐食谱的字典
    """
    return _recommend_recipes(user_id, available_ingredients)

# 提示词组件
@mcp.prompt()
def generate_recipe_search_response(ingredient: str) -> str:
    """
    生成食谱查询的回复
    
    Args:
        ingredient: 食材名称
    
    Returns:
        格式化的回复文本
    """
    search_result = _search_recipes_by_ingredient(ingredient)
    
    if not search_result["success"]:
        return f"抱歉,查询食谱时出现了问题:{search_result.get('error', '未知错误')}"
    
    recipes = search_result["recipes"]
    
    if not recipes:
        return f"抱歉,我没有找到包含{ingredient}的食谱。请尝试其他食材。"
    
    # 生成回复文本
    response = f"我找到了{len(recipes)}个包含{ingredient}的食谱:\n\n"
    
    for i, recipe in enumerate(recipes, 1):
        response += f"{i}. {recipe['name']} - {recipe['description']}\n"
        response += f"   难度:{recipe['difficulty']}\n"
        response += f"   主要食材:{', '.join(ing['name'] for ing in recipe['ingredients'][:3])}\n\n"
    
    response += f"想了解某个食谱的详细做法,请告诉我食谱的编号。"
    
    return response

@mcp.prompt()
def generate_recipe_details(recipe_id: str) -> str:
    """
    生成食谱详细信息的回复
    
    Args:
        recipe_id: 食谱ID
    
    Returns:
        格式化的食谱详情
    """
    recipe_result = _get_recipe_by_id(recipe_id)
    
    if not recipe_result["success"]:
        return f"抱歉,无法获取食谱详情:{recipe_result.get('error', '未知错误')}"
    
    recipe = recipe_result["recipe"]
    
    # 生成详细食谱信息
    response = f"# {recipe['name']}\n\n"
    response += f"{recipe['description']}\n\n"
    
    response += "## 食材准备\n\n"
    for ing in recipe["ingredients"]:
        response += f"- {ing['name']}: {ing['quantity']}\n"
    
    response += "\n## 烹饪步骤\n\n"
    for i, step in enumerate(recipe["steps"], 1):
        response += f"{i}. {step}\n"
    
    response += f"\n难度:{recipe['difficulty']}\n"
    response += f"菜系:{recipe['cuisine']}\n"
    
    return response

if __name__ == "__main__":
    mcp.run()

测试验证

现在来测试我们升级后的应用。

使用MCP Inspector测试

首先,启动我们的应用:

cd recipe-assistant
python -m app.main

在另一个终端启动MCP Inspector测试,可以按照我们之前的方式进行测试:

npx @modelcontextprotocol/inspector stdio python -m app.main

下一篇预告:我们将通过实际代码对比,学习如何将一个FastMCP工具改写成原生SDK版本,直观理解两者的差异和使用场景。