1. 项目概述
项目目标
构建一个本地智能舆论分析系统。
利用自然语言处理和多工具协作,实现用户查询意图的自动理解。
进行新闻检索、情绪分析、结构化输出和邮件推送。
系统流程
用户查询:用户输入查询请求。
提取关键词:从用户查询中提取关键词。
使用Google Serper API搜索:利用API获取新闻前5篇文章。
分析情感倾向:对获取的文章进行情绪分析。
保存为Markdown文件:将分析结果保存为Markdown格式。
发送邮件:将结果通过邮件发送给用户。
系统架构
Client-Server架构:
客户端(Client):用户交互入口,负责接收输入和调用大语言模型进行任务规划。
服务器端(Server):工具能力提供者,处理数据和响应客户端请求。
项目执行流程
客户端加载模型:加载本地模型配置,与服务器建立连接。
用户输入查询:客户端自动调用大语言模型,将自然语言请求转化为结构化工具调用。
客户端驱动服务器端:完成关键词搜索、新闻采集、情绪倾向分析、报告生成和邮件发送。
2. MCP的环境准备
MCP的开发需要借助uv
(虚拟环境管理工具)进行虚拟环境创建和依赖的管理。
2.1 安装uv
提供了两种安装
uv
的方法:使用
pip
安装:pip install uv
使用
conda
安装(针对已安装Anaconda环境的用户):conda install uv
2.2 创建MCP项目
通过
cd
命令进入要创建项目的空间,然后使用以下命令创建一个空的MCP项目uv init mcp-project
这将在指定目录下创建一个名为
mcp-project
的文件夹,其中包含初始化的项目结构。在
mcp-project目录下
,创建两个Python文件,分别是client.py
和server.py
:client.py
是客户端,用户与客户端进行交互。server.py
是服务端,其中包含了多种工具函数,客户端会调用其中的工具函数进行操作。
这样,MCP项目的创建便完成了。
3. 代码实现
3.1 确定大模型参数
创建一个
.env
文件,在该文件中添加相关的环境变量,这些变量分别代表阿里百炼平台的URL、选择的模型名称、个人的百炼平台API。BASE_URL:
指定用于API请求的基础URL,例如它可以是阿里云的DashScope服务的兼容模式地址:
https://dashscope.aliyuncs.com/compatible-mode/v1
。
MODEL:
指定要使用的模型名称。
DASHSCOPE_API_KEY:
DashScope服务的API密钥,用于认证和授权访问DashScope平台的API。
SERPER_API_KEY:
Serper服务的API密钥,Serper是一个提供搜索引擎结果页面(SERP)数据的API服务,允许开发者通过HTTP请求获取搜索引擎的结果。
SMTP_SERVER:
指定用于发送电子邮件的SMTP服务器地址。在您的例子中,它是:
smtp.163.com
,这是163邮箱的SMTP服务器。
SMTP_PORT:
指定SMTP服务器的端口号。在您的例子中,端口号是:
465
,这是一个常用的SMTP服务端口,通常用于SSL加密连接。
EMAIL_USER:
用于SMTP认证的电子邮件用户名,通常是您的电子邮件地址。。
EMAIL_PASS:
用于SMTP认证的电子邮件密码。
3.2 client.py的构建
3.2.1 功能分析
首先从客户端入手,进行
client.py
的构建。其总体架构如下:[配置初始化] [连接工具服务器(MCP Server)] [用户提问] -> chat_loop() [[LLM 规划工具调用链]] [顺序执行工具链] [保存分析结果 & 最终回答]
运行过程中有以下几个关键步骤:
客户端从本地配置文件中读取必要的信息,完成大模型参数的设定(见3.2.2 确定大模型参数),并初始化所需的运行环境(见3.2.2 初始化客户端配置)。
程序启动服务端脚本并与其建立通信,获取可用的工具信息(见3.2.3 启动MCP工具服务连接)。
完成连接后,客户端将根据用户输入的请求,协调内部调度器对工具链任务进行统一管理(见3.2.4 工具链任务调度器)。
在与用户交互的过程中,系统会持续监听用户输入(见3.2.5 用户交互循环),并调用大模型对任务进行智能拆解,规划合适的工具链执行顺序(见3.2.6 智能规划工具链)。
每次任务执行完毕后,客户端将自动释放相关资源,确保系统稳定运行与退出(见3.2.7 关闭资源)。
整个流程由主函数串联驱动,形成完整的一条执行主线(见3.2.8 主流程函数)。
3.2.2 初始化客户端配置
在client.py
中创建一个MCPClient
类,用于封装和管理与MCP协议相关的所有客户端逻辑,随后在里面编写各种相关函数。
class MCPClient:
def __init__(self):
# 创建 AsyncExitStack, 用于托管所有异步资源释放,这是为了后续连接 MCP Server 时使用 'async with' 语法自动管理上下文。
self.exit_stack = AsyncExitStack()
# 从环境中读取配置项
self.openai_api_key = os.getenv("DASHSCOPE_API_KEY")
self.base_url = os.getenv("BASE_URL")
self.model = os.getenv("MODEL")
# 对 LLM 相关配置进行初始化
if not self.openai_api_key:
raise ValueError("未找到 OpenAI API Key, 请在 .env 文件中设置 DASHSCOPE_API_KEY")
# 初始化 OpenAI 客户端对象
self.client = OpenAI(api_key=self.openai_api_key,
base_url=self.base_url)
# 初始化 MCP Session(用于延迟赋值),等待连接 MCP Server 后再初始化它
self.session: Optional[ClientSession] = None
3.2.3 启动MCP工具服务连接
connect_to_server
函数的作用是连接并启动本地的服务器脚本。它会先判断脚本类型(必须是 .py
或 .js
),再根据类型选择对应的启动方式(Python或Node.js)。接着,它会通过MCP提供的方式启动服务端脚本,并建立起与服务端的通信通道。建立连接后,客户端会初始化会话,并获取服务器上有哪些工具可以使用,方便后续根据任务调用这些工具。整个过程相当于“把工具服务开起来,并准备好对话”。
async def connect_to_server(self, server_script_path: str):
# 对服务器脚本进行判断,只允许是 .py 或 .js
is_python = server_script_path.endswith('.py')
is_js = server_script_path.endswith('.js')
if not (is_python or is_js):
raise ValueError("服务器脚本必须是 .py 或 .js 文件")
# 确定启动命令,.py 用 python,.js 用 node
command = "python" if is_python else "node"
# 构造 MCP 所需的服务器参数,包括启动参数、脚本路径参数、环境变量(为 None 表示默认)
server_params = StdioServerParameters(command=command, args=(server_script_path,), env=None)
# 启动 MCP 工具服务进程(并建立 stdio 通信)
self.stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
# 封装通信通道,读取服务器返回的数据,并向服务器发送请求
self.stdio, self.write = stdio.transport
# 创建 MCP 客户端会话对象
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
# 初始化会话
await self.session.initialize()
# 获取工具列表并打印
response = await self.session.list_tools()
if not ("MC是服务器,支持以下工具:", {tool_name for tool in tools})
详细步骤
判断脚本类型:
检查
server_script_path
是否以.py
或.js
结尾,否则抛出ValueError
。
确定启动命令:
如果是
.py
文件,使用python
命令;如果是.js
文件,使用node
命令。
构造服务器参数:
使用
StdioServerParameters
构造服务器参数,包括命令、脚本路径和环境变量。
启动 MCP 工具服务进程:
使用
stdio_client
启动 MCP 工具服务进程,并建立stdio
通信。
封装通信通道:
读取服务器返回的数据,并向服务器发送请求。
创建 MCP 客户端会话对象:
使用
ClientSession
创建 MCP 客户端会话对象。
初始化会话:
调用
session.initialize()
初始化会话。
获取工具列表并打印:
调用
session.list_tools()
获取工具列表,并打印支持的工具。
3.2.4 工具链任务调度器
process_query
函数是客户端处理用户提问的核心部分,负责从接收问题到规划任务、调用工具、生成回复,再到保存结果的整个闭环。
功能步骤
获取支持的工具列表
向服务器请求当前支持的工具列表,例如“新闻搜索”、“情感分析”、“发送邮件”等。
提取关键词
从用户问题中提取关键词,生成统一的文件名,后续所有工具都会使用这个名字保存或读取文件,保证流程一致。
工具链规划
将问题交给大语言模型,决定如何使用这些工具(如先查新闻,再分析情感,再发邮件)。
调用服务器上的工具
按顺序调用服务器上的工具,并在调用前动态地填入一些信息(如文件名或路径)。
收集执行结果
收集所有工具执行完毕后的结果,程序会再调用一次大模型,让它根据整个过程总结一个回答。
保存对话记录
将对话记录(包括用户的提问和模型的回答)自动保存成一个
.txt
文件,方便后续查阅。
async def process_query(self, query: str) -> str:
# 准备初始消息和获取工具列表
messages = {"role": "user", "content": query}
response = await self.session.list_tools()
available_tools = [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema,
},
} for tool in response.tools
]
# 提取问题的关键词,对文件名进行生成
keyword_match = re.search(r"(关于|分析|查询|搜索|查看)(.+?)(\n|$)", query)
keyword = keyword_match.group(2) if keyword_match else "分析对象"
safe_keyword = re.sub(r'[\\/*?:"<>|]', '', keyword)[:20]
timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S")
md_filename = f"Sentiment_{safe_keyword}_{timestamp}.md"
md_path = os.path.join("./sentiment-reports/", md_filename)
# 更新查询,将文件名添加到原始查询中,使大模型在调用工具链时可以识别到该信息
messages = {"role": "user", "content": query}
md_path = (query.strip() + f" [md_filename={md_filename}]")[
md_path + md_path
]
messages = {"role": "user", "content": query}
tool_plan = await self.plan_tool_usage(query, available_tools)
tool_outputs = {}
messages = [{"role": "user", "content": query}]
# 依次执行工具调用,并收集结果
for step in tool_plan:
tool_name = step["name"]
tool_args = step["arguments"]
for key, val in tool_args.items():
if isinstance(val, str) and val.startswith("{{") and val.endswith("}})"):
ref_key = val.strip("{{").strip("}}")
resolved_val = tool_outputs.get(ref_key, val)
tool_args[key] = resolved_val
# 注入统一的文件名或路径(用于分析和邮件)
if tool_name == "analyze_sentiment" and "filename" not in tool_args:
tool_args["filename"] = md_filename
result = await self.session.call_tool(tool_name, tool_args)
tool_output[tool_name] = result.content[0].text
messages.append({
"role": "tool",
"tool_called": tool_name,
"content": result.content[0].text
})
# 调用大模型生成回复信息,并输出保存结果
final_response = self.client.chat_completions.create(
model=self.model,
messages=messages
)
final_output = final_response.choices[0].message.content
# 对辅助函数进行定义,目的是把文本清理成合法的文件名
def clean_filename(text: str) -> str:
text = text.strip().replace("\n", "").replace("\r", "")
return text[:50]
# 使用清理函数处理用户查询,生成用于文件命名的前缀,并添加时间戳、设置输出目录
safe_filename = clean_filename(query)
timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S")
filename = f"{safe_filename}_{timestamp}.txt"
output_dir = "./lin_outputs"
os.makedirs(output_dir, exist_ok=True)
file_path = os.path.join(output_dir, filename)
# 将对话内容写入 md 文档,其中包含用户的初始提问以及模型的最终回复结果
with open(file_path, 'w', encoding='utf-8') as f:
f.write(f"* 用户提问:{query}\n\n")
f.write(f"* 模型回复:\n{final_output}\n")
print(f"📄 对话记录已保存为:{file_path}")
return final_output
3.2.5 用户交互循环笔记
概述
chat_loop
函数是客户端的“对话主入口”,负责程序和用户之间的交互。它是一个无限循环,不断等待用户输入问题,并处理这些输入。
主要功能
提示用户输入:
程序启动时,打印提示信息,告知用户系统已启动,可以开始提问(输入
quit
可退出)。
无限循环等待输入:
进入一个无限循环,不断等待用户输入问题。
处理用户输入:
每当用户输入一句话,程序会将这个问题传递给
process_query()
函数,自动规划任务、调用工具、生成回复。
打印结果:
处理完毕后,将结果打印出来。
错误处理:
如果在运行过程中出现错误(如连接失败、参数出错等),程序会捕获错误信息并打印出来,而不会直接崩溃。
async def chat_loop(self):
# 初始化提示信息
print("\n💬 MCP 客户端已启动!输入 'quit' 退出")
while True:
try:
# 进入主循环中等待用户输入
query = input("\n你:").strip()
if query.lower() == 'quit':
break
# 处理用户的提问,并返回结果
response = await self.process_query(query)
print(f"\n🤖 AI:{response}")
except Exception as e:
print(f"\n⚠️ 发生错误:{str(e)}")
3.2.6 智能规划工具链
概述
plan_tool_usage
函数的作用是让大模型根据用户的问题,自动规划出一组需要使用的工具和调用顺序。这个过程确保了用户的问题可以自动转化为结构化的工具执行步骤,方便后续依次调用处理。
主要功能
整理当前可用的工具列表:
将可用的工具整理为列表,并写入系统提示中,引导模型只能从这些工具中选择。
发送提示内容给大模型:
将提示内容和用户的问题一起发送给大模型,请求模型生成一个工具调用计划。
解析大模型的回复:
从大模型的回复中提取出合法的 JSON 内容,并进行解析。如果解析成功,就将结果作为工具调用链返回;如果解析失败,则打印错误信息并返回一个空的计划。
代码实现
async def plan_tool_usage(self, query: str, tools: list[dict]) -> List[dict]:
# 构造系统提示词 system_prompt
# 将所有可用工具组织为文本列表输入提示中,并明确指出工具名。
# 限定使用格式是 JSON,防止大模型输出错误格式的数据。
print("\n🤖 正在生成工具调用计划...")
print(json.dumps(tools, ensure_ascii=False, indent=2))
tool_list_text = "\n".join([
f"{{'function': {{'name': '{tool['function']['name']}', 'description': '{tool['function']['description']}'}}}}"
for tool in tools
])
system_prompt = {
"role": "system",
"content": f"""
你是一个智能任务规划助手,用户会给出一句自然语言请求。\n
你只能从以下工具中选择(严格使用工具名称):\n{tool_list_text}\n
如果多个工具需要串联,后续步骤中可以使用【下一步工具名】占位。\n
返回格式:JSON 数组,每个对象包含 name 和 arguments 字段。\n
不要返回自然语言,不要使用未列出的工具。
"""
}
# 构造对话上下文并调用模型
planning_messages = [
system_prompt,
{"role": "user", "content": query}
]
response = self.client.chat_completions.create(
model=self.model,
messages=planning_messages,
tool=tools,
tool_choice="none"
)
# 提取出模型返回的 JSON 内容
content = response.choices[0].message.content.strip()
match = re.search(r"(?<json\)\[(.*)\]\s*\)(\s*\)\s*content\)", content)
if match:
json_text = match.group(1)
else:
json_text = content
# 在解析 JSON 之后返回调用计划
try:
plan = json.loads(json_text)
return plan if isinstance(plan, list) else []
except Exception as e:
print(f"❌ 工具调用链规划失败:{e}\n原始返回:{content}")
return []
3.2.7 关闭资源
概述: 该函数用于在程序结束时关闭并清理所有已打开的资源,确保程序收尾干净、退出彻底。
功能:
调用之前创建的
AsyncExitStack
,这个工具会自动管理在程序运行过程中建立的连接,如与服务器的通信通道。通过调用
aclose()
,可以确保所有资源都被优雅地释放,避免出现内存泄漏或卡住进程的问题。
代码实现:
async def cleanup(self):
await self.exit_stack.aclose()
3.2.8 主流程函数
概述: 这是程序的主入口,控制整个客户端的运行流程。
功能:
程序一开始会创建一个
MCPClient
实例,也就是之前封装的客户端对象。然后指定服务端脚本的位置,并尝试连接服务器。
一旦连接成功,就进入对话循环,开始等待用户输入并处理问题。
无论程序中途正常退出还是出错,最后都会执行
cleanup()
,确保所有资源都被安全关闭。
代码实现:
async def main():
server_script_path = "F:\\mcp-project\\server.py"
client = MCPClient()
try:
await client.connect_to_server(server_script_path)
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())
3.3 server.py 的构建
概述: server.py
是服务器端的主要脚本,负责提供新闻搜索、情感分析、邮件发送等基础工具能力,供客户端调用。
3.3.1 功能分析
核心工具:
search_google_news:
用于在 Google 上搜索相关新闻。
analyze_sentiment:
用于对语句进行情感分析。
send_email_with_attachment:
用于将本地的文件发送至目标邮箱。
核心功能剖析:
启动时加载环境变量:
Server 会首先加载环境变量,配置必要的 API 密钥和服务信息。
注册功能模块:
注册一组功能模块,包括:
调用 Server API 搜索新闻内容。
基于大模型分析文本情感。
发送带有分析报告的邮件(对应各自的工具函数)。
工具接口暴露:
每个工具均以标准接口形式暴露,客户端可以根据任务需要按需调用。
标准输入输出 (stdio) 模式运行:
程序以标准输入输出 (stdio) 模式运行,确保与客户端实现稳定、实时的交互。
3.3.2 search_google_news()
函数
概述
该函数通过 Serper API 使用关键词从 Google 上搜索获取新闻,返回前五条新闻并保存到本地文件中。
主要内容
申请 Serper API:
需要先申请 Serper 的 API,访问 Serper 官网 注册并获取 API Key。
配置环境变量:
在
.env
文件中配置 Serper 的 API Key。
函数作用:
search_google_news()
函数的作用是根据用户提供的关键词,调用 Serper API 搜索 Google 新闻,并返回前 5 条结果。
执行过程:
读取 API 密钥:从环境变量中获取用于访问 Serper API 的密钥。
向新闻搜索接口发起请求:将用户输入的关键词打包成请求体,发送给 Serper 提供的 Google News 接口。
提取新闻信息:从返回的数据中提取前 5 条新闻的标题、简介和链接。
保存为 JSON 文件:将这些新闻内容保存成一个本地
.json
文件,文件名带有时间戳,方便归档。返回内容与保存路径:最后,工具会将获取到的新闻数据、提示信息和保存路径一起返回,供客户端展示或传递给下一个工具使用。
代码实现
@mcp.tool()
async def search_google_news(keyword: str) -> str:
# 从环境中获取 API 密钥并进行检查
api_key = os.getenv("SERPER_API_KEY")
if not api_key:
return "❌ 未配置 SERPER_API_KEY,请在 .env 文件中设置"
# 设置请求参数并发送请求
url = "https://google.serper.dev/news"
headers = {
"X-API-KEY": api_key,
"Content-Type": "application/json"
}
payload = {"q": keyword}
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=payload)
json_response = response.json()
# 检查数据,并按照格式提取新闻,返回前五条新闻
if "news" not in data:
return "❌ 未获取到搜索结果"
articles = [
{
"title": item.get("title"),
"desc": item.get("snippet"),
"url": item.get("link")
} for item in data["news"][:5]
]
# 将新闻结果以带有时间戳命名的 .JSON 格式文件的形式保存在本地指定的路径
output_dir = "/google_news"
os.makedirs(output_dir, exist_ok=True)
filename = f"google_news_{datetime.now().strftime('%Y-%m-%d_%H%M%S')}.json"
file_path = os.path.join(output_dir, filename)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(articles, f, ensure_ascii=False, indent=2)
return (
f"📰 已获取与 {keyword} 相关的前5条 Google 新闻。\n"
f"📄 已保存到: {file_path}"
)
详细步骤
获取 API 密钥:
从环境变量中获取 Serper API 的密钥。
设置请求参数:
构造请求 URL、请求头和请求体。
发送请求:
使用
httpx.AsyncClient()
发送 POST 请求。
解析响应:
将响应内容解析为 JSON 格式。
提取新闻信息:
从解析后的 JSON 中提取前 5 条新闻的标题、简介和链接。
保存为 JSON 文件:
将提取的新闻信息保存为一个本地
.json
文件,文件名带有时间戳。
返回结果:
返回保存路径和新闻信息。
3.3.3 analyze_sentiment()
函数
概述
analyze_sentiment()
函数用于对一段新闻文本或任意内容进行情感倾向分析,并将分析结果保存为 Markdown 格式的报告文件。
主要内容
功能流程:
读取大模型配置:从环境变量中加载大模型的 API 密钥、模型名称和服务器地址,用于后续调用语言模型。
构造分析指令:将用户输入的文本内容整理成标准格式,调用大模型进行情感分析。
获取模型回复:调用大模型,发送分析指令并获取分析结果。
生成 Markdown 报告:将原始文本与分析结果整理成结构化的 Markdown 报告,包含时间戳、原文、分析结果。
保存到本地文件:将生成的报告保存到本地,文件名由用户指定,或默认由程序生成。
返回报告路径:返回生成的报告文件路径,方便后续工具(如邮件发送)使用。
代码实现
@mcp.tool()
async def analyze_sentiment(text: str, filename: str) -> str:
# 读取大模型配置
openai_key = os.getenv("DASHSCOPE_API_KEY")
client = OpenAI(api_key=openai_key, base_url=os.getenv("BASE_URL"))
# 构造情感分析的提示词
prompt = f"请对以下新闻内容进行情感倾向分析,并说明原因。\n\n{text}"
# 向模型发送请求,并处理返回的结果
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
result = response.choices[0].message.content.strip()
# 生成 Markdown 格式的分析报告,并指定是设置好的输出目录
markdown = f"**分析时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n**原文**:\n\n{text}\n\n**分析结果**:\n\n{result}\n"
# 创建输出目录
output_dir = "./sentiment_report"
os.makedirs(output_dir, exist_ok=True)
# 生成文件名
filename = f"sentiment_{datetime.now().strftime('%Y-%m-%d_%H%M%S')}.md"
file_path = os.path.join(output_dir, filename)
# 将分析结果写入文件
with open(file_path, "w", encoding="utf-8") as f:
f.write(markdown)
return file_path
详细步骤
读取 API 密钥:
从环境变量中获取 OpenAI 的 API 密钥。
构造提示词:
将用户输入的文本内容构造成提示词,发送给大模型进行情感分析。
发送请求并获取结果:
使用 OpenAI 客户端发送请求,并获取情感分析结果。
生成 Markdown 报告:
将原始文本和分析结果整理成 Markdown 格式的报告。
创建输出目录:
创建用于保存报告文件的输出目录。
生成文件名:
生成带有时间戳的文件名。
保存报告文件:
将生成的 Markdown 报告保存到本地文件中。
返回文件路径:
返回生成的报告文件路径。
3.3.4 send_email_with_attachment()函数
概述
send_email_with_attachment()
是一个工具类,用于通过获取本地路径下的文件,然后将其发送给指定的邮箱。
主要功能
读取 SMTP 配置:
从环境变量中读取 SMTP 服务器地址、端口、发件人邮箱和授权码。
拼接附件路径并检查是否存在:
程序会在指定的
sentiment_reports
文件夹中查找附件,如果找不到文件,就会提示失败。
构造邮件内容:
创建邮件对象,设置主题、正文、收件人等基本信息。
添加附件:
将 Markdown 报告文件读取为二进制,并以附件形式加入邮件中。
连接 SMTP 服务器并发送邮件:
通过 SSL 安全连接登录邮箱服务器,并发送邮件。如果发送成功会返回确认信息,如果失败则返回错误说明。
执行流程
读取发件邮箱配置:
从环境变量中读取 SMTP 服务器地址、端口、发件人邮箱和授权码,这些信息是发送邮件的基础。
拼接附件路径并检查是否存在:
程序会在默认的
sentiment_reports
文件夹中查找附件,如果找不到文件,就会提示失败。
构造邮件内容:
创建邮件对象,设置主题、正文、收件人等基本信息。
添加附件:
将 Markdown 报告文件读取为二进制,并以附件形式加入邮件中。
连接 SMTP 服务器并发送邮件:
通过 SSL 安全连接登录邮箱服务器,并发送邮件。如果发送成功会返回确认信息,如果失败则返回错误说明。
代码实现
@mcp.tool()
async def send_email_with_attachment(to: str, subject: str, body: str, filename: str) -> str:
# 读取并配置 SMTP 相关信息
smtp_server = os.getenv("SMTP_SERVER") # 例如 smtp.qq.com
smtp_port = int(os.getenv("SMTP_PORT", 465))
sender_email = os.getenv("EMAIL_USER")
sender_pass = os.getenv("EMAIL_PASS")
# 获取附件文件的路径,并进行检查是否存在
full_path = os.path.abspath(os.path.join("./sentiment_reports", filename))
if not os.path.exists(full_path):
return f"❌ 附件路径无效,未找到文件:{full_path}"
# 创建邮件并设置内容
msg = EmailMessage()
msg['Subject'] = subject
msg['From'] = sender_email
msg['To'] = to
msg.set_content(body)
# 添加附件并发送邮件
try:
with open(full_path, "rb") as f:
file_name = os.path.basename(full_path)
msg.add_attachment(file_data, maintype="application", subtype="octet-stream", filename=file_name)
except Exception as e:
return f"❌ 附件读取失败:{str(e)}"
try:
with smtplib.SMTP_SSL(smtp_server, smtp_port) as server:
server.login(sender_email, sender_pass)
server.send_message(msg)
return f"✅ 邮件已成功发送给 {to},附件路径:{full_path}"
except Exception as e:
return f"❌ 邮件发送失败:{str(e)}"
4、测试
在运行的时候只需要运行client.py就可以运行整个项目了。