目录
代码
from pocketflow import Node, Flow
from openai import OpenAI
def call_llm(messages):
client = OpenAI(
api_key = "your api key",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1")
response = client.chat.completions.create(
model="qwen-turbo",
messages=messages,
temperature=0.7
)
return response.choices[0].message.content
class UserInputNode(Node):
def prep(self, shared):
# Initialize messages if this is the first run
if "messages" not in shared:
shared["messages"] = []
print("Welcome to the Travel Advisor Chat! Type 'exit' to end the conversation.")
return None
def exec(self, _):
# Get user input
user_input = input("\nYou: ")
return user_input
def post(self, shared, prep_res, exec_res):
user_input = exec_res
# Check if user wants to exit
if user_input and user_input.lower() == 'exit':
print("\nGoodbye! Safe travels!")
return None # End the conversation
# Store user input in shared
shared["user_input"] = user_input
# Move to guardrail validation
return "validate"
class GuardrailNode(Node):
def prep(self, shared):
# Get the user input from shared data
user_input = shared.get("user_input", "")
return user_input
def exec(self, user_input):
# Basic validation checks
if not user_input or user_input.strip() == "":
return False, "Your query is empty. Please provide a travel-related question."
if len(user_input.strip()) < 3:
return False, "Your query is too short. Please provide more details about your travel question."
# LLM-based validation for travel topics
prompt = f"""
Evaluate if the following user query is related to travel advice, destinations, planning, or other travel topics.
The chat should ONLY answer travel-related questions and reject any off-topic, harmful, or inappropriate queries.
User query: {user_input}
Return your evaluation in YAML format:
```yaml
valid: true/false
reason: [Explain why the query is valid or invalid]
```"""
# Call LLM with the validation prompt
messages = [{"role": "user", "content": prompt}]
response = call_llm(messages)
# Extract YAML content
yaml_content = response.split("```yaml")[1].split("```")[0].strip() if "```yaml" in response else response
import yaml
result = yaml.safe_load(yaml_content)
assert result is not None, "Error: Invalid YAML format"
assert "valid" in result and "reason" in result, "Error: Invalid YAML format"
is_valid = result.get("valid", False)
reason = result.get("reason", "Missing reason in YAML response")
return is_valid, reason
def post(self, shared, prep_res, exec_res):
is_valid, message = exec_res
if not is_valid:
# Display error message to user
print(f"\nTravel Advisor: {message}")
# Skip LLM call and go back to user input
return "retry"
# Valid input, add to message history
shared["messages"].append({"role": "user", "content": shared["user_input"]})
# Proceed to LLM processing
return "process"
class LLMNode(Node):
def prep(self, shared):
# Add system message if not present
if not any(msg.get("role") == "system" for msg in shared["messages"]):
shared["messages"].insert(0, {
"role": "system",
"content": "You are a helpful travel advisor that provides information about destinations, travel planning, accommodations, transportation, activities, and other travel-related topics. Only respond to travel-related queries and keep responses informative and friendly. Your response are concise in 100 words."
})
# Return all messages for the LLM
return shared["messages"]
def exec(self, messages):
# Call LLM with the entire conversation history
response = call_llm(messages)
return response
def post(self, shared, prep_res, exec_res):
# Print the assistant's response
print(f"\nTravel Advisor: {exec_res}")
# Add assistant message to history
shared["messages"].append({"role": "assistant", "content": exec_res})
# Loop back to continue the conversation
return "continue"
# Create the flow with nodes and connections
user_input_node = UserInputNode()
guardrail_node = GuardrailNode()
llm_node = LLMNode()
# Create flow connections
user_input_node - "validate" >> guardrail_node
guardrail_node - "retry" >> user_input_node # Loop back if input is invalid
guardrail_node - "process" >> llm_node
llm_node - "continue" >> user_input_node # Continue conversation
flow = Flow(start=user_input_node)
shared = {}
flow.run(shared)
代码解释
这个示例展示了如何使用PocketFlow框架实现一个带有guardrail(安全防护)机制的旅行顾问聊天机器人。整个系统由三个核心节点组成,通过条件流控制实现智能对话管理。
1. 系统架构
用户输入 → 安全验证 → LLM处理 → 用户输入
↑ ↓
←─────────┘
(验证失败时重试)
2. 核心组件详解
2.1 LLM调用函数
def call_llm(messages):
client = OpenAI(
api_key = "your api key",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1")
- 使用阿里云DashScope的兼容OpenAI API接口
- 配置qwen-turbo模型,temperature=0.7保证回答的创造性
2.2 UserInputNode(用户输入节点)
class UserInputNode(Node):
def prep(self, shared):
if "messages" not in shared:
shared["messages"] = []
功能:
prep
:初始化对话历史,首次运行时显示欢迎信息exec
:获取用户输入post
:检查退出条件,将输入存储到共享状态,返回"validate"进入验证流程
2.3 GuardrailNode(安全防护节点)
这是系统的核心安全组件,实现多层验证:
基础验证:
if not user_input or user_input.strip() == "":
return False, "Your query is empty..."
if len(user_input.strip()) < 3:
return False, "Your query is too short..."
LLM智能验证:
prompt = f"""
Evaluate if the following user query is related to travel advice...
Return your evaluation in YAML format:
```yaml
valid: true/false
reason: [Explain why the query is valid or invalid]
```"""
验证流程:
- 基础格式检查(空输入、长度)
- 使用LLM判断是否为旅行相关话题
- 解析YAML格式的验证结果
- 根据验证结果决定流向:
- 失败:返回"retry"重新输入
- 成功:返回"process"进入LLM处理
2.4 LLMNode(LLM处理节点)
def prep(self, shared):
if not any(msg.get("role") == "system" for msg in shared["messages"]):
shared["messages"].insert(0, {
"role": "system",
"content": "You are a helpful travel advisor..."
})
功能:
prep
:确保系统提示词存在,定义AI助手角色exec
:调用LLM生成回答post
:显示回答,更新对话历史,返回"continue"继续对话
3. 流程控制机制
user_input_node - "validate" >> guardrail_node
guardrail_node - "retry" >> user_input_node
guardrail_node - "process" >> llm_node
llm_node - "continue" >> user_input_node
流程说明:
- 正常流程:用户输入 → 验证通过 → LLM处理 → 继续对话
- 安全拦截:用户输入 → 验证失败 → 重新输入
- 退出机制:用户输入"exit" → 程序结束