在上一篇中,我们构建了一个基于内存存储的食谱助手。说实话,内存存储虽然简单,但有个致命问题:程序一重启,数据就全没了。
所以这篇我们要解决数据持久化的问题,将食谱助手从内存存储升级到SQLite数据库。
项目结构重组
先把项目结构整理得更清晰一些。之前我们所有代码都在一个文件里,现在按功能分模块:
mkdir -p recipe-assistant/app recipe-assistant/data
cd recipe-assistant
touch app/__init__.py app/main.py app/models.py
新的目录结构:
recipe-assistant/
app/
__init__.py
main.py # 主应用入口(包含所有MCP组件)
models.py # 数据模型和数据库管理
data/
recipes.db # SQLite数据库文件bb
重要说明:FastMCP不支持模块化的方式(如include_router
),所有的资源、工具和提示词组件都必须定义在同一个FastMCP实例上。因此我们将所有MCP组件都放在main.py
中。
数据库设计
创建 app/models.py
,定义数据模型和数据库操作:
# app/models.py
from pydantic import BaseModel
from typing import List, Optional, Dict
import sqlite3
import json
import os
# 数据模型定义(与第1篇保持一致)
class Ingredient(BaseModel):
name: str
quantity: str
class Recipe(BaseModel):
id: str
name: str
cuisine: str
description: str
ingredients: List[Ingredient]
steps: List[str]
difficulty: str
class UserPreference(BaseModel):
user_id: str
favorite_cuisines: List[str]
dietary_restrictions: List[str]
cooking_skill: str
# 数据库管理类
class DatabaseManager:
def __init__(self, db_path="data/recipes.db"):
self.db_path = db_path
self.conn = None
self.initialize_db()
def get_connection(self):
if self.conn is None:
# 确保数据目录存在
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
self.conn = sqlite3.connect(self.db_path)
self.conn.execute("PRAGMA foreign_keys = ON")
self.conn.row_factory = sqlite3.Row
return self.conn
def initialize_db(self):
"""创建数据库表并导入第1篇的示例数据"""
conn = self.get_connection()
cursor = conn.cursor()
# 创建食谱表
cursor.execute('''
CREATE TABLE IF NOT EXISTS recipes (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
cuisine TEXT NOT NULL,
description TEXT,
ingredients TEXT NOT NULL,
steps TEXT NOT NULL,
difficulty TEXT NOT NULL
)
''')
# 创建用户偏好表
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_preferences (
user_id TEXT PRIMARY KEY,
favorite_cuisines TEXT NOT NULL,
dietary_restrictions TEXT NOT NULL,
cooking_skill TEXT NOT NULL
)
''')
conn.commit()
# 导入第1篇的示例数据
self.import_data()
def import_data(self):
"""导入第1篇的示例数据"""
conn = self.get_connection()
cursor = conn.cursor()
# 检查是否已有数据
cursor.execute("SELECT COUNT(*) FROM recipes")
count = cursor.fetchone()[0]
if count == 0:
# 第1篇的示例食谱数据
recipes = [
{
"id": "recipe_001",
"name": "宫保鸡丁",
"cuisine": "川菜",
"description": "经典川菜,麻辣鲜香",
"ingredients": [
{"name": "鸡胸肉", "quantity": "300g"},
{"name": "花生米", "quantity": "50g"},
{"name": "干辣椒", "quantity": "10个"},
{"name": "花椒", "quantity": "1茶匙"},
{"name": "葱", "quantity": "2根"},
{"name": "姜", "quantity": "3片"},
{"name": "蒜", "quantity": "3瓣"}
],
"steps": [
"鸡胸肉切丁,用料酒、生抽、淀粉腌制15分钟",
"热锅凉油,放入花椒和干辣椒爆香",
"加入鸡丁翻炒至变色",
"加入葱姜蒜继续翻炒",
"加入调好的宫保汁炒匀",
"最后加入花生米炒匀即可"
],
"difficulty": "中等"
},
{
"id": "recipe_002",
"name": "番茄炒蛋",
"cuisine": "家常菜",
"description": "简单易做的家常菜",
"ingredients": [
{"name": "番茄", "quantity": "2个"},
{"name": "鸡蛋", "quantity": "3个"},
{"name": "葱", "quantity": "适量"},
{"name": "盐", "quantity": "适量"},
{"name": "糖", "quantity": "少许"}
],
"steps": [
"番茄切块,鸡蛋打散",
"热锅倒油,倒入鸡蛋炒熟盛出",
"锅中再倒少许油,放入番茄翻炒",
"番茄出汁后加入盐和糖调味",
"倒入炒好的鸡蛋翻炒均匀",
"撒上葱花即可"
],
"difficulty": "简单"
}
]
# 插入食谱数据
for recipe in recipes:
cursor.execute('''
INSERT INTO recipes (id, name, cuisine, description, ingredients, steps, difficulty)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
recipe["id"],
recipe["name"],
recipe["cuisine"],
recipe["description"],
json.dumps(recipe["ingredients"], ensure_ascii=False),
json.dumps(recipe["steps"], ensure_ascii=False),
recipe["difficulty"]
))
# 插入示例用户偏好
preferences = {
"user_001": {
"favorite_cuisines": ["川菜"],
"dietary_restrictions": ["少油", "少盐"],
"cooking_skill": "初级"
},
"user_002": {
"favorite_cuisines": ["家常菜"],
"dietary_restrictions": ["健康"],
"cooking_skill": "初级"
}
}
for user_id, prefs in preferences.items():
cursor.execute('''
INSERT INTO user_preferences (user_id, favorite_cuisines, dietary_restrictions, cooking_skill)
VALUES (?, ?, ?, ?)
''', (
user_id,
json.dumps(prefs["favorite_cuisines"], ensure_ascii=False),
json.dumps(prefs["dietary_restrictions"], ensure_ascii=False),
prefs["cooking_skill"]
))
conn.commit()
print("示例数据导入完成")
# 食谱相关操作
def get_all_recipes(self):
"""获取所有食谱"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM recipes")
rows = cursor.fetchall()
recipes = []
for row in rows:
recipe = {
"id": row["id"],
"name": row["name"],
"cuisine": row["cuisine"],
"description": row["description"],
"ingredients": json.loads(row["ingredients"]),
"steps": json.loads(row["steps"]),
"difficulty": row["difficulty"]
}
recipes.append(recipe)
return recipes
def get_recipe_by_id(self, recipe_id: str):
"""根据ID获取食谱"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM recipes WHERE id = ?", (recipe_id,))
row = cursor.fetchone()
if row:
return {
"id": row["id"],
"name": row["name"],
"cuisine": row["cuisine"],
"description": row["description"],
"ingredients": json.loads(row["ingredients"]),
"steps": json.loads(row["steps"]),
"difficulty": row["difficulty"]
}
return None
def search_recipes_by_ingredient(self, ingredient: str):
"""根据食材搜索食谱"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM recipes WHERE ingredients LIKE ?", (f'%{ingredient}%',))
rows = cursor.fetchall()
recipes = []
for row in rows:
recipe = {
"id": row["id"],
"name": row["name"],
"cuisine": row["cuisine"],
"description": row["description"],
"ingredients": json.loads(row["ingredients"]),
"steps": json.loads(row["steps"]),
"difficulty": row["difficulty"]
}
recipes.append(recipe)
return recipes
# 用户偏好相关操作
def get_user_preferences(self, user_id: str):
"""获取用户偏好"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM user_preferences WHERE user_id = ?", (user_id,))
row = cursor.fetchone()
if row:
return {
"user_id": row["user_id"],
"favorite_cuisines": json.loads(row["favorite_cuisines"]),
"dietary_restrictions": json.loads(row["dietary_restrictions"]),
"cooking_skill": row["cooking_skill"]
}
return None
# 全局数据库实例
db = DatabaseManager()
def _get_all_recipes() -> Dict:
"""获取所有食谱数据"""
try:
recipes = db.get_all_recipes()
return {
"success": True,
"recipes": recipes,
"count": len(recipes)
}
except Exception as e:
return {
"success": False,
"error": f"获取食谱数据失败: {str(e)}"
}
def _get_recipe_by_id(recipe_id: str) -> Dict:
"""根据ID获取特定食谱"""
try:
recipe = db.get_recipe_by_id(recipe_id)
if recipe:
return {
"success": True,
"recipe": recipe
}
else:
return {
"success": False,
"error": f"未找到ID为{recipe_id}的食谱"
}
except Exception as e:
return {
"success": False,
"error": f"获取食谱失败: {str(e)}"
}
def _get_user_preferences(user_id: str) -> Dict:
"""获取用户偏好数据"""
try:
preferences = db.get_user_preferences(user_id)
if preferences:
return {
"success": True,
"preferences": preferences
}
else:
return {
"success": False,
"error": f"未找到ID为{user_id}的用户"
}
except Exception as e:
return {
"success": False,
"error": f"获取用户偏好失败: {str(e)}"
}
def _search_recipes_by_ingredient(ingredient: str) -> Dict:
"""根据食材查询食谱"""
try:
if not ingredient or not ingredient.strip():
return {
"success": False,
"error": "请提供有效的食材名称"
}
recipes = db.search_recipes_by_ingredient(ingredient.strip())
if recipes:
return {
"success": True,
"message": f"找到了{len(recipes)}个包含{ingredient}的食谱",
"recipes": recipes
}
else:
return {
"success": True,
"message": f"抱歉,没有找到包含{ingredient}的食谱",
"recipes": []
}
except Exception as e:
return {
"success": False,
"error": f"查询食谱时出错: {str(e)}"
}
def _recommend_recipes(user_id: str, available_ingredients: List[str] = None) -> Dict:
"""根据用户偏好推荐食谱"""
try:
# 获取用户偏好
user_prefs = db.get_user_preferences(user_id)
if not user_prefs:
return {
"success": False,
"error": f"未找到ID为{user_id}的用户偏好"
}
# 获取所有食谱
all_recipes = db.get_all_recipes()
recommended_recipes = []
for recipe in all_recipes:
# 根据用户喜好的菜系过滤
if recipe["cuisine"] in user_prefs["favorite_cuisines"]:
# 如果提供了可用食材,检查是否匹配
if available_ingredients:
recipe_ingredients = [ing["name"] for ing in recipe["ingredients"]]
if any(avail_ing in recipe_ingredients for avail_ing in available_ingredients):
recommended_recipes.append(recipe)
else:
recommended_recipes.append(recipe)
return {
"success": True,
"message": f"为您推荐了{len(recommended_recipes)}道菜",
"recipes": recommended_recipes[:5] # 限制返回数量
}
except Exception as e:
return {
"success": False,
"error": f"推荐食谱时出错: {str(e)}"
}
这个数据库管理类把之前内存中的操作都移到了SQLite里。
主应用入口
现在创建 app/main.py
,将所有MCP组件集中在一个文件中:
# app/main.py
from fastmcp import FastMCP
from typing import Dict, List
from .models import (
_get_all_recipes,
_get_recipe_by_id,
_get_user_preferences,
_search_recipes_by_ingredient,
_recommend_recipes
)
# 创建主应用实例
mcp = FastMCP("RecipeAssistant")
# 资源组件
@mcp.resource("recipes://all")
def get_all_recipes() -> Dict:
"""
获取所有食谱数据
Returns:
包含所有食谱的字典
"""
return _get_all_recipes()
@mcp.resource("recipes://{recipe_id}")
def get_recipe_by_id(recipe_id: str) -> Dict:
"""
根据ID获取特定食谱
Args:
recipe_id: 食谱ID
Returns:
食谱详细信息
"""
return _get_recipe_by_id(recipe_id)
@mcp.resource("users://{user_id}/preferences")
def get_user_preferences(user_id: str) -> Dict:
"""
获取用户偏好数据
Args:
user_id: 用户ID
Returns:
用户偏好数据
"""
return _get_user_preferences(user_id)
# 工具组件
@mcp.tool()
def search_recipes_by_ingredient(ingredient: str) -> Dict:
"""
根据食材查询食谱
Args:
ingredient: 食材名称
Returns:
包含匹配食谱的字典
"""
return _search_recipes_by_ingredient(ingredient)
@mcp.tool()
def recommend_recipes(user_id: str, available_ingredients: List[str] = None) -> Dict:
"""
根据用户偏好和可用食材推荐食谱
Args:
user_id: 用户ID
available_ingredients: 可用食材列表(可选)
Returns:
包含推荐食谱的字典
"""
return _recommend_recipes(user_id, available_ingredients)
# 提示词组件
@mcp.prompt()
def generate_recipe_search_response(ingredient: str) -> str:
"""
生成食谱查询的回复
Args:
ingredient: 食材名称
Returns:
格式化的回复文本
"""
search_result = _search_recipes_by_ingredient(ingredient)
if not search_result["success"]:
return f"抱歉,查询食谱时出现了问题:{search_result.get('error', '未知错误')}"
recipes = search_result["recipes"]
if not recipes:
return f"抱歉,我没有找到包含{ingredient}的食谱。请尝试其他食材。"
# 生成回复文本
response = f"我找到了{len(recipes)}个包含{ingredient}的食谱:\n\n"
for i, recipe in enumerate(recipes, 1):
response += f"{i}. {recipe['name']} - {recipe['description']}\n"
response += f" 难度:{recipe['difficulty']}\n"
response += f" 主要食材:{', '.join(ing['name'] for ing in recipe['ingredients'][:3])}\n\n"
response += f"想了解某个食谱的详细做法,请告诉我食谱的编号。"
return response
@mcp.prompt()
def generate_recipe_details(recipe_id: str) -> str:
"""
生成食谱详细信息的回复
Args:
recipe_id: 食谱ID
Returns:
格式化的食谱详情
"""
recipe_result = _get_recipe_by_id(recipe_id)
if not recipe_result["success"]:
return f"抱歉,无法获取食谱详情:{recipe_result.get('error', '未知错误')}"
recipe = recipe_result["recipe"]
# 生成详细食谱信息
response = f"# {recipe['name']}\n\n"
response += f"{recipe['description']}\n\n"
response += "## 食材准备\n\n"
for ing in recipe["ingredients"]:
response += f"- {ing['name']}: {ing['quantity']}\n"
response += "\n## 烹饪步骤\n\n"
for i, step in enumerate(recipe["steps"], 1):
response += f"{i}. {step}\n"
response += f"\n难度:{recipe['difficulty']}\n"
response += f"菜系:{recipe['cuisine']}\n"
return response
if __name__ == "__main__":
mcp.run()
测试验证
现在来测试我们升级后的应用。
使用MCP Inspector测试
首先,启动我们的应用:
cd recipe-assistant
python -m app.main
在另一个终端启动MCP Inspector测试,可以按照我们之前的方式进行测试:
npx @modelcontextprotocol/inspector stdio python -m app.main
下一篇预告:我们将通过实际代码对比,学习如何将一个FastMCP工具改写成原生SDK版本,直观理解两者的差异和使用场景。