零 说明&仓库地址
一 需求分析
- 本项目旨在构建一个本地智能舆情分析系统,通过自然语言处理与多工具协作,实现用户查询意图的自动理解、新闻检获取新闻索、情绪分析、结构化输出与邮件推送。

- 系统整体采用C/S架构:客户端作为用户的直接交互入口,负责接受输入,调用大语言模型进行语义解析与任务规划,并根据规划结果协调各类工具的执行流程;服务端作为工具能力的提供者,内置多种独立功能模块,响应客户端的调用请求,完成实际的数据处理任务。
1.1 项目执行流程分析
- 运行时,客户端会先加载本地模型配置,与服务器建立连接,并动态获取其可用工具列表。
- 用户输入查询后,客户端会自动调用大语言模型,将自然语言请求转化为结构化的“工具调用链”
- 客户端依次驱动服务器端工具完成如:关键词搜索、新闻采集、情绪倾向分析、报告生成与邮件发送等操作。
- 这一过程中,所有中间结果与最终输出都会自动保存,并反馈给用户。
1.2 项目特点
- 整个系统运行于本地环境,通过标准输入输出通道进行进程间通信,无需依赖远程服务部署,确保了数据处理的私密性与可控性,适合用于敏感舆情监测、本地文本分析和低延迟的信息响应场景。
二 MCP开发环境准备
- MCP的开发需要借助uv进行虚拟环境创建和依赖管理
- uvx是 uv 提供的命令行工具,用于快速运行 Python 脚本。
- 前往Python官网,下载并安装 Python 3.8 或更高版本。
- 安装完成后,在终端中执行以下命令确认是否安装成功。若安装成功,终端中会输出已安装的 Python 的版本号。
python --version
- 执行以下命令,安装 uv(包含 uvx)。安装脚本会自动下载 uv 并放置到系统默认路径中。
curl -LsSf https://astral.sh/uv/install.sh | sh
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
- 执行以下命令,验证是否安装成功。若安装成功,终端中会输出已安装的 uvx 的版本号。
uvx --version
三 创建MCP项目
- 在需要存储项目的目录,进入终端,执行创建项目的命令
D:\code>uv init opinion_analysis_system
Initialized project `opinion-analysis-system` at `D:\code\opinion_analysis_system`
- 进入项目目录,新建三个文件
client.py
、server.py
、.env

- client.py是客户端,用户与客户端交互
- server.py是服务端,其中包含多个工具函数,客户端会对其中的工具函数进行调用。
四 代码实现
4.1 模型配置
- 创建
.env
文件,在.env
文件中添加相关环境变量,分别代表阿里百炼平台的url,选择的模型名称、个人API key。
- 可以选择硅基流动或阿里云百炼平台
BASE_URL="https://api.siliconflow.cn/v1/chat/completions"
MODEL=Qwen/Qwen3-32B
DASHSCOPE_API_KEY="sk-xxx"
4.2 客户端开发
4.2.1 功能分析
- 客户端从本地配置文件读取必要信息,完成模型参数的设定,并初始化所需的运行环境
- 程序启动服务端脚本并与其建立通信,获取可用的工具信息。
- 完成连接后,客户端将根据用户输入的请求,协调内部调度器对工具链任务进行统一管理。
- 在与用户交互的过程中,系统会持续监听用户输入,并调用大模型对任务进行只能拆解,规划合适的工具链执行顺序。
- 每次任务执行完毕后,客户端将自动释放相关的资源,确保系统稳定运行与退出
- 整个流程由主函数串联驱动,形成完整的一条执行主线。
4.2.2 初始化客户端配置
- 在client.py中创建一个MCP Client类,用于分装和管理与MCP协议相关的所有客户端逻辑。
- 初始化函数中,准备好资源管理工具,方便后续自动管理连接,然后从配置文件中读取大模型的相关信息,并创建一个OpenAI客户端,方便后续调用模型,最后,预留与服务器通信的会话对象,等真正建立连接后使用。
class MCPClient:
def __init__(self):
self.exit_stack = AsyncExitStack()
self.openai_api_key = os.getenv("DASHSCOPE_API_KEY")
self.base_url = os.getenv("BASE_URL")
self.model = os.getenv("MODEL")
if not self.openai_api_key:
raise ValueError("❌ 未找到 OpenAI API Key,请在 .env 文件中设置 DASHSCOPE_API_KEY")
self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)
self.session: Optional[ClientSession] = None
4.2.3 启动MCP工具服务连接
connect_to_server
函数用于连接并启动本地服务脚本。它会先判断脚本类型,再根据类型选择对应的启动方式。接着,它会通过mcp提供的方式启动服务端脚本,并建立起与服务端的通信通道。建立连接后,客户端会初始化会话,并获取服务器上的工具,方便后续根据任务调用工具。整个过程相当于“把工具服务开起来,并准备好对话”。
async def connect_to_server(self, server_script_path: str):
is_python = server_script_path.endswith('.py')
is_js = server_script_path.endswith('.js')
if not (is_python or is_js):
raise ValueError("服务器脚本必须是 .py 或 .js 文件")
command = "python" if is_python else "node"
server_params = StdioServerParameters(command=command, args=[server_script_path], env=None)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
await self.session.initialize()
response = await self.session.list_tools()
tools = response.tools
print("\n已连接到服务器,支持以下工具:", [tool.name for tool in tools])
4.2.4 工具链任务调度器
async def process_query(self, query: str) -> str:
messages = [{"role": "user", "content": query}]
response = await self.session.list_tools()
available_tools = [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema
}
} for tool in response.tools
]
keyword_match = re.search(r'(关于|分析|查询|搜索|查看)([^的\s,。、?\n]+)', query)
keyword = keyword_match.group(2) if keyword_match else "分析对象"
safe_keyword = re.sub(r'[\\/:*?"<>|]', '', keyword)[:20]
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
md_filename = f"sentiment_{safe_keyword}_{timestamp}.md"
md_path = os.path.join("./sentiment_reports", md_filename)
query = query.strip() + f" [md_filename={md_filename}] [md_path={md_path}]"
messages = [{"role": "user", "content": query}]
tool_plan = await self.plan_tool_usage(query, available_tools)
tool_outputs = {}
messages = [{"role": "user", "content": query}]
for step in tool_plan:
tool_name = step["name"]
tool_args = step["arguments"]
for key, val in tool_args.items():
if isinstance(val, str) and val.startswith("{{") and val.endswith("}}"):
ref_key = val.strip("{} ")
resolved_val = tool_outputs.get(ref_key, val)
tool_args[key] = resolved_val
if tool_name == "analyze_sentiment" and "filename" not in tool_args:
tool_args["filename"] = md_filename
if tool_name == "send_email_with_attachment" and "attachment_path" not in tool_args:
tool_args["attachment_path"] = md_path
result = await self.session.call_tool(tool_name, tool_args)
tool_outputs[tool_name] = result.content[0].text
messages.append({
"role": "tool",
"tool_call_id": tool_name,
"content": result.content[0].text
})
final_response = self.client.chat.completions.create(
model=self.model,
messages=messages
)
final_output = final_response.choices[0].message.content
def clean_filename(text: str) -> str:
text = text.strip()
text = re.sub(r'[\\/:*?\"<>|]', '', text)
return text[:50]
safe_filename = clean_filename(query)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{safe_filename}_{timestamp}.txt"
output_dir = "./llm_outputs"
os.makedirs(output_dir, exist_ok=True)
file_path = os.path.join(output_dir, filename)
with open(file_path, "w", encoding="utf-8") as f:
f.write(f"🗣 用户提问:{query}\n\n")
f.write(f"🤖 模型回复:\n{final_output}\n")
print(f"📄 对话记录已保存为:{file_path}")
return final_output
4.2.5 用户循环交互
async def chat_loop(self):
print("\n🤖 MCP 客户端已启动!输入 'quit' 退出")
while True:
try:
query = input("\n你: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query)
print(f"\n🤖 AI: {response}")
except Exception as e:
print(f"\n⚠️ 发生错误: {str(e)}")
4.2.6 只能规划工具链
plan_tool_usage
函数的作用是让大模型根据用户的提问,自动规划出一组需要使用的工具和调用顺序。
- 首先,程序会整理当前可用的工具列表,并将它们写入系统提示中,引导模型只能从这些工具中选择。提示中还明确要求模型使用固定的JSON格式输出结果,避免生成多余的自然语言。然后,程序将提示内容和用户的问题一起发送给大模型请求模型生成一个工具调用计划。计划的内容通常包括每一步要使用的工具名称,以及对应的输入参数。
- 接收到模型的回复后,程序会尝试从中提取出合法的JSON内容,并进行解析。如果解析成功,就把结果作为工具调用链返回;如果解析失败,则打印错误信息并返回一个空的计划。这个过程确保了用户的问题可以自动转化为结构化的工具执行步骤,方便后续依次调用处理。
async def plan_tool_usage(self, query: str, tools: List[dict]) -> List[dict]:
print("\n📤 提交给大模型的工具定义:")
print(json.dumps(tools, ensure_ascii=False, indent=2))
tool_list_text = "\n".join([
f"- {tool['function']['name']}: {tool['function']['description']}"
for tool in tools
])
system_prompt = {
"role": "system",
"content": (
"你是一个智能任务规划助手,用户会给出一句自然语言请求。\n"
"你只能从以下工具中选择(严格使用工具名称):\n"
f"{tool_list_text}\n"
"如果多个工具需要串联,后续步骤中可以使用 {{上一步工具名}} 占位。\n"
"返回格式:JSON 数组,每个对象包含 name 和 arguments 字段。\n"
"不要返回自然语言,不要使用未列出的工具名。"
)
}
planning_messages = [
system_prompt,
{"role": "user", "content": query}
]
response = self.client.chat.completions.create(
model=self.model,
messages=planning_messages,
tools=tools,
tool_choice="none"
)
content = response.choices[0].message.content.strip()
match = re.search(r"```(?:json)?\\s*([\s\S]+?)\\s*```", content)
if match:
json_text = match.group(1)
else:
json_text = content
# 在解析 JSON 之后返回调用计划
try:
plan = json.loads(json_text)
return plan if isinstance(plan, list) else []
except Exception as e:
print(f"❌ 工具调用链规划失败: {e}\n原始返回: {content}")
return []
async def cleanup(self):
await self.exit_stack.aclose()
4.2.7 关闭资源
4.2.8 主流程函数
- 程序的入口,控制整个客户端的运行流程。程序一开始会创建一个
McpClient
实例,也就是之前封装的客户端对象,然后指定服务端脚本的位置,并尝试连接服务器。一旦连接成功,就进入对话循环,开始等待用户输入并处理问题。无论程序中途正常退出或出错,最后都会执行cleanup
,确保所有资源都被安全关闭。
async def main():
server_script_path = "./server.py"
client = MCPClient()
try:
await client.connect_to_server(server_script_path)
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())
4.3 服务端开发
4.3.1 功能分析
- 服务器端主要负责提供新闻搜索、情感分析、邮件发送等基础工具能力,供客户端调用。分别对应着如下的三个工具:
search_google_news
用于在Goggle上搜寻相关新闻。
analyze_sentiment
用于对语句进行行情
sendemail_with_attachment
角于将本地的文件发送至目标邮箱
- 核心功能分析:
- 启动时,Server会首先加载环境变量,配置必要的API密钥和服务信息。
- 注册一组功能模块,包括:调用 SerperAPI搜索新闻内容、基于大模型分析文本情感、以及发送带有分析报告的邮件(对应各自的工具函数)。
- 每个工具均以标准接口形式暴露,客户端可以根据任务需要按需调用。
- 程序以标准输入输出(stdio)模式运行,确保与客户端实现稳定、实时的交互。
4.3.2 配置信息
SERPER_API_KEY="xx"
SMTP_SERVER=smtp.xxx.com
SMTP_PORT=465
EMAIL_USER=xxx@163.com
EMAIL_PASS=xxx
4.3.3 搜索信息函数
- 这个工具方法是通过Serper API 使用关键词从Google上去搜索获取新闻,返回前五条新闻并保存到本地文件中。
- 首先要申请Serper的API。简单注册后点击API key即可查看自己的API Key.
search_google_news
根据用户提供的关键词,调用Serper API搜索Google新闻,并返回前5条结果。执行流程如下:
- 读取API密钥:程序从环境变量中获取用于访问Serper API的密钥。
- 向新闻搜索接口发送请求:将用户输入的关键词打包成请求体,发送给Serper提供的接口。
- 提取新闻信息:从返回的数据中提取前5条新闻的标题、简介和链接,并整理成标准格式。
- 保存为json文件:将这些新闻内容保存为一个本地
.json
文件,文件名带有时间戳,方便归档。
- 返回内容与保存路径:最后,工具会将获取到的新闻数据、提示信息和保存路径返回,提供给客户端展示或传递给下一个工具使用。
load_dotenv()
mcp = FastMCP("NewsServer")
@mcp.tool()
async def search_google_news(keyword: str) -> str:
"""
使用 Serper API(Google Search 封装)根据关键词搜索新闻内容,返回前5条标题、描述和链接。
参数:
keyword (str): 关键词,如 "小米汽车"
返回:
str: JSON 字符串,包含新闻标题、描述、链接
"""
api_key = os.getenv("SERPER_API_KEY")
if not api_key:
return "❌ 未配置 SERPER_API_KEY,请在 .env 文件中设置"
url = "https://google.serper.dev/news"
headers = {
"X-API-KEY": api_key,
"Content-Type": "application/json"
}
payload = {"q": keyword}
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=payload)
data = response.json()
if "news" not in data:
return "❌ 未获取到搜索结果"
articles = [
{
"title": item.get("title"),
"desc": item.get("snippet"),
"url": item.get("link")
} for item in data["news"][:5]
]
output_dir = "./google_news"
os.makedirs(output_dir, exist_ok=True)
filename = f"google_news_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
file_path = os.path.join(output_dir, filename)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(articles, f, ensure_ascii=False, indent=2)
return (
f"✅ 已获取与 [{keyword}] 相关的前5条 Google 新闻:\n"
f"{json.dumps(articles, ensure_ascii=False, indent=2)}\n"
f"📄 已保存到:{file_path}"
)
4.3.4 文本分析函数
analyze_sentiment()
函数用于对一段新闻文本或任意内容进行情绪倾向分析,并将分析结果保存为markdown格式的报告文件。它通过@mcp.tool
注册为mcp工具,支持被客户端自动调用。
- 功能流程如下:
- 读取大模型配置:加载环境变量中的API密钥、模型名称、服务地址,用于后续调用语言模型
- 构造分析指令:将用户输入文本包装为标准化提示。示例:“请分析以下内容的情感倾向:[用户文本]”
- 获取模型回答:通过API获取结构化结果。输出示例:{“sentiment”: “positive”, “confidence”: 0.92, “reason”: “使用积极词汇”}
- 生成markdown报告:Markdown模板包含▸ 分析时间戳 ▸ 原始文本引用 ▸ 情感分类标签(建议用💚/🟡/❤️可视化) ▸ 详细解释段落
- 保存文件到本地:将生成的报告保存到本地的
./sentiment_reposts
文件夹中,文件名由用户指定,或默认自动生成包含时间戳的名称。
- 返回报告路径:最终返回生成的报告文件的路径,方便后续工具使用。
@mcp.tool()
async def analyze_sentiment(text: str, filename: str) -> str:
"""
对传入的一段文本内容进行情感分析,并保存为指定名称的 Markdown 文件。
参数:
text (str): 新闻描述或文本内容
filename (str): 保存的 Markdown 文件名(不含路径)
返回:
str: 完整文件路径(用于邮件发送)
"""
openai_key = os.getenv("DASHSCOPE_API_KEY")
model = os.getenv("MODEL")
client = OpenAI(api_key=openai_key, base_url=os.getenv("BASE_URL"))
prompt = f"请对以下新闻内容进行情绪倾向分析,并说明原因:\n\n{text}"
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
result = response.choices[0].message.content.strip()
markdown = f"""# 舆情分析报告
**分析时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
---
## 📥 原始文本
{text}
---
## 📊 分析结果
{result}
"""
output_dir = "./sentiment_reports"
os.makedirs(output_dir, exist_ok=True)
if not filename:
filename = f"sentiment_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
file_path = os.path.join(output_dir, filename)
with open(file_path, "w", encoding="utf-8") as f:
f.write(markdown)
return file_path
4.3.5 邮件发送函数
send_email_with_attachment
通过获取本地路径下的文件,然后捡起发送给指定的邮箱。
- 邮箱需要开启
POP3/SMTP
服务,开启方法,请出门右转bing。.env
注意配置如下的部分:
SMTP_SERVER=smtp.xxx.com
SMTP_PORT=465
EMAIL_USER=xxx@163.com
EMAIL_PASS=xxx
@mcp.tool()
async def send_email_with_attachment(to: str, subject: str, body: str, filename: str) -> str:
"""
发送带附件的邮件。
参数:
to: 收件人邮箱地址
subject: 邮件标题
body: 邮件正文
filename (str): 保存的 Markdown 文件名(不含路径)
返回:
邮件发送状态说明
"""
smtp_server = os.getenv("SMTP_SERVER")
smtp_port = int(os.getenv("SMTP_PORT", 465))
sender_email = os.getenv("EMAIL_USER")
sender_pass = os.getenv("EMAIL_PASS")
full_path = os.path.abspath(os.path.join("./sentiment_reports", filename))
if not os.path.exists(full_path):
return f"❌ 附件路径无效,未找到文件: {full_path}"
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = sender_email
msg["To"] = to
msg.set_content(body)
try:
with open(full_path, "rb") as f:
file_data = f.read()
file_name = os.path.basename(full_path)
msg.add_attachment(file_data, maintype="application", subtype="octet-stream", filename=file_name)
except Exception as e:
return f"❌ 附件读取失败: {str(e)}"
try:
with smtplib.SMTP_SSL(smtp_server, smtp_port) as server:
server.login(sender_email, sender_pass)
server.send_message(msg)
return f"✅ 邮件已成功发送给 {to},附件路径: {full_path}"
except Exception as e:
return f"❌ 邮件发送失败: {str(e)}"
4.3.6 主流程函数
if __name__ == "__main__":
mcp.run(transport='stdio')