文章目录
- LangGraph 云服务运行概念
- 运行
- LangGraph 云服务后台运行指南
- 如何启动后台运行任务
- LangGraph 云服务 - 同线程操作指南
- 如何在同一个线程上运行多个智能体
- 使用定时任务
- 无状态运行
- LangGraph 云服务可配置请求头指南
- 可配置请求头
- 流式传输
- LangGraph 云服务流式处理指南
- 流输出
- LangGraph 云服务:如何实现人机协同
- 人在回路
- 断点
- 时间回溯
- MCP Endpoint
- 重复消息处理
- 如何使用中断选项
- 如何使用回滚选项
- 拒绝处理
- 入队操作
- Webhooks
- LangGraph Cloud Webhooks 使用指南
- 使用 Webhooks
- 定时任务
- 使用定时任务
- LangGraph 自定义生命周期使用指南
- 如何添加自定义生命周期事件
- 如何添加自定义中间件
- 如何添加自定义路由
LangGraph 云服务运行概念
https://langchain-ai.github.io/langgraph/cloud/concepts/runs/
运行
运行是指对助手的一次调用。每次运行可以有自己的输入、配置和元数据,这些可能会影响底层图的执行和输出。运行可以选择在线程上执行。
LangGraph平台API提供了多个端点用于创建和管理运行。更多详情请参阅API参考文档。
LangGraph 云服务后台运行指南
https://langchain-ai.github.io/langgraph/cloud/how-tos/background_run/
如何启动后台运行任务
本指南介绍如何为您的代理启动后台运行任务。这对于长时间运行的作业非常有用。
设置
首先让我们设置客户端和线程:
Python
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)
# Using the graph deployed with the name "agent"
assistant_id = "agent"
# create thread
thread = await client.threads.create()
print(thread)
JavaScript
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });
// Using the graph deployed with the name "agent"
const assistantID = "agent";
// create thread
const thread = await client.threads.create();
console.log(thread);
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'
Output:
{
'thread_id': '5cb1e8a1-34b3-4a61-a34e-71a9799bd00d',
'created_at': '2024-08-30T20:35:52.062934+00:00',
'updated_at': '2024-08-30T20:35:52.062934+00:00',
'metadata': {},
'status': 'idle',
'config': {},
'values': None
}
检查线程上的运行情况
如果我们列出当前线程上的运行任务,会发现它是空的:
Python
runs = await client.runs.list(thread["thread_id"])
print(runs)
JavaScript
let runs = await client.runs.list(thread['thread_id']);
console.log(runs);
cURL
curl --request GET \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs
Output:
[]
在线程上启动运行
现在让我们开始一个运行:
Python
input = {"messages": [{"role": "user", "content": "what's the weather in sf"}]}
run = await client.runs.create(thread["thread_id"], assistant_id, input=input)
JavaScript
let input = {"messages": [{"role": "user", "content": "what's the weather in sf"}]};
let run = await client.runs.create(thread["thread_id"], assistantID, { input });
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs \
--header 'Content-Type: application/json' \
--data '{
"assistant_id": <ASSISTANT_ID>
}'
首次轮询时,我们可以看到 status=pending
:
Python JavaScript cURL
print(await client.runs.get(thread["thread_id"], run["run_id"]))
console.log(await client.runs.get(thread["thread_id"], run["run_id"]));
curl --request GET \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/<RUN_ID>
Output:
{
"run_id": "1ef6a5f8-bd86-6763-bbd6-bff042db7b1b",
"thread_id": "7885f0cf-94ad-4040-91d7-73f7ba007c8a",
"assistant_id": "fe096781-5601-53d2-b2f6-0d3403f7e9ca",
"created_at": "2024-09-04T01:46:47.244887+00:00",
"updated_at": "2024-09-04T01:46:47.244887+00:00",
"metadata": {},
"status": "pending",
"kwargs": {
"input": {
"messages": [
{
"role": "user",
"content": "what's the weather in sf"
}
]
},
"config": {
"metadata": {
"created_by": "system"
},
"configurable": {
"run_id": "1ef6a5f8-bd86-6763-bbd6-bff042db7b1b",
"user_id": "",
"graph_id": "agent",
"thread_id": "7885f0cf-94ad-4040-91d7-73f7ba007c8a",
"assistant_id": "fe096781-5601-53d2-b2f6-0d3403f7e9ca",
"checkpoint_id": null
}
},
"webhook": null,
"temporary": false,
"stream_mode": [
"values"
],
"feedback_keys": null,
"interrupt_after": null,
"interrupt_before": null
},
"multitask_strategy": "reject"
}
现在我们可以加入运行,等待其完成并再次检查状态:
Python
await client.runs.join(thread["thread_id"], run["run_id"])
print(await client.runs.get(thread["thread_id"], run["run_id"]))
JavaScript
await client.runs.join(thread["thread_id"], run["run_id"]);
console.log(await client.runs.get(thread["thread_id"], run["run_id"]));
cURL
curl --request GET \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/<RUN_ID>/join &&
curl --request GET \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/<RUN_ID>
Output:
{
"run_id": "1ef6a5f8-bd86-6763-bbd6-bff042db7b1b",
"thread_id": "7885f0cf-94ad-4040-91d7-73f7ba007c8a",
"assistant_id": "fe096781-5601-53d2-b2f6-0d3403f7e9ca",
"created_at": "2024-09-04T01:46:47.244887+00:00",
"updated_at": "2024-09-04T01:46:47.244887+00:00",
"metadata": {},
"status": "success",
"kwargs": {
"input": {
"messages": [
{
"role": "user",
"content": "what's the weather in sf"
}
]
},
"config": {
"metadata": {
"created_by": "system"
},
"configurable": {
"run_id": "1ef6a5f8-bd86-6763-bbd6-bff042db7b1b",
"user_id": "",
"graph_id": "agent",
"thread_id": "7885f0cf-94ad-4040-91d7-73f7ba007c8a",
"assistant_id": "fe096781-5601-53d2-b2f6-0d3403f7e9ca",
"checkpoint_id": null
}
},
"webhook": null,
"temporary": false,
"stream_mode": [
"values"
],
"feedback_keys": null,
"interrupt_after": null,
"interrupt_before": null
},
"multitask_strategy": "reject"
}
完美!运行结果符合预期。我们可以通过打印最终状态来再次确认运行是否正常:
Python
final_result = await client.threads.get_state(thread["thread_id"])
print(final_result)
JavaScript
let finalResult = await client.threads.getState(thread["thread_id"]);
console.log(finalResult);
cURL
curl --request GET \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/state
Output:
{
"values": {
"messages": [
{
"content": "what's the weather in sf",
"additional_kwargs": {},
"response_metadata": {},
"type": "human",
"name": null,
"id": "beba31bf-320d-4125-9c37-cadf526ac47a",
"example": false
},
{
"content": [
{
"id": "toolu_01AaNPSPzqia21v7aAKwbKYm",
"input": {},
"name": "tavily_search_results_json",
"type": "tool_use",
"index": 0,
"partial_json": "{\"query\": \"weather in san francisco\"}"
}
],
"additional_kwargs": {},
"response_metadata": {
"stop_reason": "tool_use",
"stop_sequence": null
},
"type": "ai",
"name": null,
"id": "run-f220faf8-1d27-4f73-ad91-6bb3f47e8639",
"example": false,
"tool_calls": [
{
"name": "tavily_search_results_json",
"args": {
"query": "weather in san francisco"
},
"id": "toolu_01AaNPSPzqia21v7aAKwbKYm",
"type": "tool_call"
}
],
"invalid_tool_calls": [],
"usage_metadata": {
"input_tokens": 273,
"output_tokens": 61,
"total_tokens": 334
}
},
{
"content": "[{\"url\": \"https://www.weatherapi.com/\", \"content\": \"{'location': {'name': 'San Francisco', 'region': 'California', 'country': 'United States of America', 'lat': 37.78, 'lon': -122.42, 'tz_id': 'America/Los_Angeles', 'localtime_epoch': 1725052131, 'localtime': '2024-08-30 14:08'}, 'current': {'last_updated_epoch': 1725051600, 'last_updated': '2024-08-30 14:00', 'temp_c': 21.1, 'temp_f': 70.0, 'is_day': 1, 'condition': {'text': 'Partly cloudy', 'icon': '//cdn.weatherapi.com/weather/64x64/day/116.png', 'code': 1003}, 'wind_mph': 11.9, 'wind_kph': 19.1, 'wind_degree': 290, 'wind_dir': 'WNW', 'pressure_mb': 1018.0, 'pressure_in': 30.07, 'precip_mm': 0.0, 'precip_in': 0.0, 'humidity': 59, 'cloud': 25, 'feelslike_c': 21.1, 'feelslike_f': 70.0, 'windchill_c': 18.6, 'windchill_f': 65.5, 'heatindex_c': 18.6, 'heatindex_f': 65.5, 'dewpoint_c': 12.2, 'dewpoint_f': 54.0, 'vis_km': 16.0, 'vis_miles': 9.0, 'uv': 5.0, 'gust_mph': 15.0, 'gust_kph': 24.2}}\"}]",
"additional_kwargs": {},
"response_metadata": {},
"type": "tool",
"name": "tavily_search_results_json",
"id": "686b2487-f332-4e58-9508-89b3a814cd81",
"tool_call_id": "toolu_01AaNPSPzqia21v7aAKwbKYm",
"artifact": {
"query": "weather in san francisco",
"follow_up_questions": null,
"answer": null,
"images": [],
"results": [
{
"title": "Weather in San Francisco",
"url": "https://www.weatherapi.com/",
"content": "{'location': {'name': 'San Francisco', 'region': 'California', 'country': 'United States of America', 'lat': 37.78, 'lon': -122.42, 'tz_id': 'America/Los_Angeles', 'localtime_epoch': 1725052131, 'localtime': '2024-08-30 14:08'}, 'current': {'last_updated_epoch': 1725051600, 'last_updated': '2024-08-30 14:00', 'temp_c': 21.1, 'temp_f': 70.0, 'is_day': 1, 'condition': {'text': 'Partly cloudy', 'icon': '//cdn.weatherapi.com/weather/64x64/day/116.png', 'code': 1003}, 'wind_mph': 11.9, 'wind_kph': 19.1, 'wind_degree': 290, 'wind_dir': 'WNW', 'pressure_mb': 1018.0, 'pressure_in': 30.07, 'precip_mm': 0.0, 'precip_in': 0.0, 'humidity': 59, 'cloud': 25, 'feelslike_c': 21.1, 'feelslike_f': 70.0, 'windchill_c': 18.6, 'windchill_f': 65.5, 'heatindex_c': 18.6, 'heatindex_f': 65.5, 'dewpoint_c': 12.2, 'dewpoint_f': 54.0, 'vis_km': 16.0, 'vis_miles': 9.0, 'uv': 5.0, 'gust_mph': 15.0, 'gust_kph': 24.2}}",
"score": 0.976148,
"raw_content": null
}
],
"response_time": 3.07
},
"status": "success"
},
{
"content": [
{
"text": "\n\nThe search results provide the current weather conditions in San Francisco. According to the data, as of 2:00 PM on August 30, 2024, the temperature in San Francisco is 70\u00b0F (21.1\u00b0C) with partly cloudy skies. The wind is blowing from the west-northwest at around 12 mph (19 km/h). The humidity is 59% and visibility is 9 miles (16 km). Overall, it looks like a nice late summer day in San Francisco with comfortable temperatures and partly sunny conditions.",
"type": "text",
"index": 0
}
],
"additional_kwargs": {},
"response_metadata": {
"stop_reason": "end_turn",
"stop_sequence": null
},
"type": "ai",
"name": null,
"id": "run-8fecc61d-3d9f-4e16-8e8a-92f702be498a",
"example": false,
"tool_calls": [],
"invalid_tool_calls": [],
"usage_metadata": {
"input_tokens": 837,
"output_tokens": 124,
"total_tokens": 961
}
}
]
},
"next": [],
"tasks": [],
"metadata": {
"step": 3,
"run_id": "1ef67140-eb23-684b-8253-91d4c90bb05e",
"source": "loop",
"writes": {
"agent": {
"messages": [
{
"id": "run-8fecc61d-3d9f-4e16-8e8a-92f702be498a",
"name": null,
"type": "ai",
"content": [
{
"text": "\n\nThe search results provide the current weather conditions in San Francisco. According to the data, as of 2:00 PM on August 30, 2024, the temperature in San Francisco is 70\u00b0F (21.1\u00b0C) with partly cloudy skies. The wind is blowing from the west-northwest at around 12 mph (19 km/h). The humidity is 59% and visibility is 9 miles (16 km). Overall, it looks like a nice late summer day in San Francisco with comfortable temperatures and partly sunny conditions.",
"type": "text",
"index": 0
}
],
"example": false,
"tool_calls": [],
"usage_metadata": {
"input_tokens": 837,
"total_tokens": 961,
"output_tokens": 124
},
"additional_kwargs": {},
"response_metadata": {
"stop_reason": "end_turn",
"stop_sequence": null
},
"invalid_tool_calls": []
}
]
}
},
"user_id": "",
"graph_id": "agent",
"thread_id": "5cb1e8a1-34b3-4a61-a34e-71a9799bd00d",
"created_by": "system",
"assistant_id": "fe096781-5601-53d2-b2f6-0d3403f7e9ca"
},
"created_at": "2024-08-30T21:09:00.079909+00:00",
"checkpoint_id": "1ef67141-3ca2-6fae-8003-fe96832e57d6",
"parent_checkpoint_id": "1ef67141-2129-6b37-8002-61fc3bf69cb5"
}
我们也可以直接打印最后一条 AIMessage 的内容:
Python
print(final_result['values']['messages'][-1]['content'][0]['text'])
JavaScript
console.log(finalResult['values']['messages'][finalResult['values']['messages'].length-1]['content'][0]['text']);
cURL
curl --request GET \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/state | jq -r '.values.messages[-1].content.[0].text'
Output:
The search results provide the current weather conditions in San Francisco. According to the data, as of 2:00 PM on August 30, 2024, the temperature in San Francisco is 70°F (21.1°C) with partly cloudy skies. The wind is blowing from the west-northwest at around 12 mph (19 km/h). The humidity is 59% and visibility is 9 miles (16 km). Overall, it looks like a nice late summer day in San Francisco with comfortable temperatures and partly sunny conditions.
LangGraph 云服务 - 同线程操作指南
访问文档:https://langchain-ai.github.io/langgraph/cloud/how-tos/same-thread/
如何在同一个线程上运行多个智能体
在LangGraph平台中,线程并不会显式地与特定智能体绑定。这意味着您可以在同一个线程上运行多个智能体,使得后续智能体能够基于前一个智能体的执行进度继续工作。
本示例将演示如何创建两个智能体,并在同一个线程上调用它们。您将观察到第二个智能体会利用第一个智能体在检查点中生成的线程状态信息作为上下文进行响应。
安装
Python
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)
openai_assistant = await client.assistants.create(
graph_id="agent", config={"configurable": {"model_name": "openai"}}
)
# There should always be a default assistant with no configuration
assistants = await client.assistants.search()
default_assistant = [a for a in assistants if not a["config"]][0]
JavaScript
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });
const openAIAssistant = await client.assistants.create(
{ graphId: "agent", config: {"configurable": {"model_name": "openai"}}}
);
const assistants = await client.assistants.search();
const defaultAssistant = assistants.find(a => !a.config);
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/assistants \
--header 'Content-Type: application/json' \
--data '{
"graph_id": "agent",
"config": { "configurable": { "model_name": "openai" } }
}' && \
curl --request POST \
--url <DEPLOYMENT_URL>/assistants/search \
--header 'Content-Type: application/json' \
--data '{
"limit": 10,
"offset": 0
}' | jq -c 'map(select(.config == null or .config == {})) | .[0]'
我们可以看到这些代理是不同的:
Python
print(openai_assistant)
JavaScript
console.log(openAIAssistant);
cURL
curl --request GET \
--url <DEPLOYMENT_URL>/assistants/<OPENAI_ASSISTANT_ID>
Output:
{
"assistant_id": "db87f39d-b2b1-4da8-ac65-cf81beb3c766",
"graph_id": "agent",
"created_at": "2024-08-30T21:18:51.850581+00:00",
"updated_at": "2024-08-30T21:18:51.850581+00:00",
"config": {
"configurable": {
"model_name": "openai"
}
},
"metadata": {}
}
Python
print(default_assistant)
JavaScript
console.log(defaultAssistant);
cURL
curl --request GET \
--url <DEPLOYMENT_URL>/assistants/<DEFAULT_ASSISTANT_ID>
Output:
{
"assistant_id": "fe096781-5601-53d2-b2f6-0d3403f7e9ca",
"graph_id": "agent",
"created_at": "2024-08-08T22:45:24.562906+00:00",
"updated_at": "2024-08-08T22:45:24.562906+00:00",
"config": {},
"metadata": {
"created_by": "system"
}
}
在线程上运行助手
运行OpenAI助手
现在我们可以先在会话线程上运行OpenAI助手。
Python
thread = await client.threads.create()
input = {"messages": [{"role": "user", "content": "who made you?"}]}
async for event in client.runs.stream(
thread["thread_id"],
openai_assistant["assistant_id"],
input=input,
stream_mode="updates",
):
print(f"Receiving event of type: {event.event}")
print(event.data)
print("\n\n")
JavaScript
const thread = await client.threads.create();
let input = {"messages": [{"role": "user", "content": "who made you?"}]}
const streamResponse = client.runs.stream(
thread["thread_id"],
openAIAssistant["assistant_id"],
{
input,
streamMode: "updates"
}
);
for await (const event of streamResponse) {
console.log(`Receiving event of type: ${event.event}`);
console.log(event.data);
console.log("\n\n");
}
cURL
thread_id=$(curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}' | jq -r '.thread_id') && \
curl --request POST \
--url "<DEPLOYMENT_URL>/threads/${thread_id}/runs/stream" \
--header 'Content-Type: application/json' \
--data '{
"assistant_id": <OPENAI_ASSISTANT_ID>,
"input": {
"messages": [
{
"role": "user",
"content": "who made you?"
}
]
},
"stream_mode": [
"updates"
]
}' | \
sed 's/\r$//' | \
awk '
/^event:/ {
if (data_content != "") {
print data_content "\n"
}
sub(/^event: /, "Receiving event of type: ", $0)
printf "%s...\n", $0
data_content = ""
}
/^data:/ {
sub(/^data: /, "", $0)
data_content = $0
}
END {
if (data_content != "") {
print data_content "\n\n"
}
}
'
Output:
Receiving event of type: metadata
{'run_id': '1ef671c5-fb83-6e70-b698-44dba2d9213e'}
Receiving event of type: updates
{'agent': {'messages': [{'content': 'I was created by OpenAI, a research organization focused on developing and advancing artificial intelligence technology.', 'additional_kwargs': {}, 'response_metadata': {'finish_reason': 'stop', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-f5735b86-b80d-4c71-8dc3-4782b5a9c7c8', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}]}}
运行默认助手
现在,我们可以在默认助手上运行它,可以看到第二个助手知道初始问题,并且能够回答"你呢?"这个问题:
Python
input = {"messages": [{"role": "user", "content": "and you?"}]}
async for event in client.runs.stream(
thread["thread_id"],
default_assistant["assistant_id"],
input=input,
stream_mode="updates",
):
print(f"Receiving event of type: {event.event}")
print(event.data)
print("\n\n")
JavaScript
let input = {"messages": [{"role": "user", "content": "and you?"}]}
const streamResponse = client.runs.stream(
thread["thread_id"],
defaultAssistant["assistant_id"],
{
input,
streamMode: "updates"
}
);
for await (const event of streamResponse) {
console.log(`Receiving event of type: ${event.event}`);
console.log(event.data);
console.log("\n\n");
}
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/stream \
--header 'Content-Type: application/json' \
--data '{
"assistant_id": <DEFAULT_ASSISTANT_ID>,
"input": {
"messages": [
{
"role": "user",
"content": "and you?"
}
]
},
"stream_mode": [
"updates"
]
}' | \
sed 's/\r$//' | \
awk '
/^event:/ {
if (data_content != "") {
print data_content "\n"
}
sub(/^event: /, "Receiving event of type: ", $0)
printf "%s...\n", $0
data_content = ""
}
/^data:/ {
sub(/^data: /, "", $0)
data_content = $0
}
END {
if (data_content != "") {
print data_content "\n\n"
}
}
'
Output:
Receiving event of type: metadata
{'run_id': '1ef6722d-80b3-6fbb-9324-253796b1cd13'}
Receiving event of type: updates
{'agent': {'messages': [{'content': [{'text': 'I am an artificial intelligence created by Anthropic, not by OpenAI. I should not have stated that OpenAI created me, as that is incorrect. Anthropic is the company that developed and trained me using advanced language models and AI technology. I will be more careful about providing accurate information regarding my origins in the future.', 'type': 'text', 'index': 0}], 'additional_kwargs': {}, 'response_metadata': {'stop_reason': 'end_turn', 'stop_sequence': None}, 'type': 'ai', 'name': None, 'id': 'run-ebaacf62-9dd9-4165-9535-db432e4793ec', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': {'input_tokens': 302, 'output_tokens': 72, 'total_tokens': 374}}]}}
使用定时任务
https://langchain-ai.github.io/langgraph/cloud/how-tos/cron_jobs/
有时您可能不希望基于用户交互来运行工作流,而是希望按计划调度工作流的执行——例如,如果您希望工作流每周自动生成并发送团队待办事项邮件。LangGraph平台通过Crons
客户端让您无需自行编写脚本就能实现这一功能。要调度一个工作流任务,您需要向客户端传递一个cron表达式来指定运行时间。Cron
任务会在后台运行,不会干扰工作流的正常调用。
设置
首先,让我们设置SDK客户端、助手和线程:
Python
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)
# Using the graph deployed with the name "agent"
assistant_id = "agent"
# create thread
thread = await client.threads.create()
print(thread)
JavaScript
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });
// Using the graph deployed with the name "agent"
const assistantId = "agent";
// create thread
const thread = await client.threads.create();
console.log(thread);
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/assistants/search \
--header 'Content-Type: application/json' \
--data '{
"limit": 10,
"offset": 0
}' | jq -c 'map(select(.config == null or .config == {})) | .[0].graph_id' && \
curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'
Output:
{
'thread_id': '9dde5490-2b67-47c8-aa14-4bfec88af217',
'created_at': '2024-08-30T23:07:38.242730+00:00',
'updated_at': '2024-08-30T23:07:38.242730+00:00',
'metadata': {},
'status': 'idle',
'config': {},
'values': None
}
线程上的定时任务
要创建与特定线程关联的定时任务,可以编写:
Python
# This schedules a job to run at 15:27 (3:27PM) every day
cron_job = await client.crons.create_for_thread(
thread["thread_id"],
assistant_id,
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "What time is it?"}]},
)
JavaScript
// This schedules a job to run at 15:27 (3:27PM) every day
const cronJob = await client.crons.create_for_thread(
thread["thread_id"],
assistantId,
{
schedule: "27 15 * * *",
input: { messages: [{ role: "user", content: "What time is it?" }] }
}
);
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/crons \
--header 'Content-Type: application/json' \
--data '{
"assistant_id": <ASSISTANT_ID>,
}'
请注意,删除不再有用的 Cron
任务非常重要。否则可能会因调用LLM而产生不必要的API费用!你可以使用以下代码删除 Cron
任务:
Python
await client.crons.delete(cron_job["cron_id"])
JavaScript
await client.crons.delete(cronJob["cron_id"]);
cURL
curl --request DELETE \
--url <DEPLOYMENT_URL>/runs/crons/<CRON_ID>
无状态定时任务
您也可以通过以下代码创建无状态定时任务:
Python
# This schedules a job to run at 15:27 (3:27PM) every day
cron_job_stateless = await client.crons.create(
assistant_id,
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "What time is it?"}]},
)
Javascript
// This schedules a job to run at 15:27 (3:27PM) every day
const cronJobStateless = await client.crons.create(
assistantId,
{
schedule: "27 15 * * *",
input: { messages: [{ role: "user", content: "What time is it?" }] }
}
);
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/runs/crons \
--header 'Content-Type: application/json' \
--data '{
"assistant_id": <ASSISTANT_ID>,
}'
**
再次提醒,任务完成后请务必删除你的作业!
Python
await client.crons.delete(cron_job_stateless["cron_id"])
JavaScript
await client.crons.delete(cronJobStateless["cron_id"]);
cURL
curl --request DELETE \
--url <DEPLOYMENT_URL>/runs/crons/<CRON_ID>
无状态运行
https://langchain-ai.github.io/langgraph/cloud/how-tos/stateless_runs/
通常情况下,在运行图(graph)时,您需要向客户端提供thread_id
参数,以便通过LangGraph平台实现的持久化状态来追踪之前的运行记录。但如果您不需要持久化运行记录,就无需使用内置的持久化状态功能,可以创建无状态运行。
安装
首先,让我们设置客户端:
Python
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)
# Using the graph deployed with the name "agent"
assistant_id = "agent"
# create thread
thread = await client.threads.create()
JavaScript
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });
// Using the graph deployed with the name "agent"
const assistantId = "agent";
// create thread
const thread = await client.threads.create();
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/assistants/search \
--header 'Content-Type: application/json' \
--data '{
"limit": 10,
"offset": 0
}' | jq -c 'map(select(.config == null or .config == {})) | .[0].graph_id' && \
curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'
无状态流式处理
我们可以以几乎相同的方式流式传输无状态运行的结果,就像处理带有状态属性的运行一样。不同之处在于,我们不是向 thread_id
参数传递值,而是传递 None
:
Python
input = {
"messages": [
{"role": "user", "content": "Hello! My name is Bagatur and I am 26 years old."}
]
}
async for chunk in client.runs.stream(
# Don't pass in a thread_id and the stream will be stateless
None,
assistant_id,
input=input,
stream_mode="updates",
):
if chunk.data and "run_id" not in chunk.data:
print(chunk.data)
JavaScript
let input = {
messages: [
{ role: "user", content: "Hello! My name is Bagatur and I am 26 years old." }
]
};
const streamResponse = client.runs.stream(
// Don't pass in a thread_id and the stream will be stateless
null,
assistantId,
{
input,
streamMode: "updates"
}
);
for await (const chunk of streamResponse) {
if (chunk.data && !("run_id" in chunk.data)) {
console.log(chunk.data);
}
}
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/runs/stream \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {\"messages\": [{\"role\": \"human\", \"content\": \"Hello! My name is Bagatur and I am 26 years old.\"}]},
\"stream_mode\": [
\"updates\"
]
}" | jq -c 'select(.data and (.data | has("run_id") | not)) | .data'
Output:
{'agent': {'messages': [{'content': "Hello Bagatur! It's nice to meet you. Thank you for introducing yourself and sharing your age. Is there anything specific you'd like to know or discuss? I'm here to help with any questions or topics you're interested in.", 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'ai', 'name': None, 'id': 'run-489ec573-1645-4ce2-a3b8-91b391d50a71', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}]}}
等待无状态结果
除了流式传输外,您还可以通过使用.wait
函数来等待无状态结果,如下所示:
Python
stateless_run_result = await client.runs.wait(
None,
assistant_id,
input=input,
)
print(stateless_run_result)
JavaScript
let statelessRunResult = await client.runs.wait(
null,
assistantId,
{ input: input }
);
console.log(statelessRunResult);
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/runs/wait \
--header 'Content-Type: application/json' \
--data '{
"assistant_id": <ASSISTANT_IDD>,
}'
Output:
{
'messages': [
{
'content': 'Hello! My name is Bagatur and I am 26 years old.',
'additional_kwargs': {},
'response_metadata': {},
'type': 'human',
'name': None,
'id': '5e088543-62c2-43de-9d95-6086ad7f8b48',
'example': False}
,
{
'content': "Hello Bagatur! It's nice to meet you. Thank you for introducing yourself and sharing your age. Is there anything specific you'd like to know or discuss? I'm here to help with any questions or topics you'd like to explore.",
'additional_kwargs': {},
'response_metadata': {},
'type': 'ai',
'name': None,
'id': 'run-d6361e8d-4d4c-45bd-ba47-39520257f773',
'example': False,
'tool_calls': [],
'invalid_tool_calls': [],
'usage_metadata': None
}
]
}
LangGraph 云服务可配置请求头指南
https://langchain-ai.github.io/langgraph/cloud/how-tos/configurable_headers/
可配置请求头
LangGraph 支持通过运行时配置动态修改代理行为与权限。使用 LangGraph 平台 时,您可以在请求体 (config
) 或特定请求头中传递这些配置,从而基于用户身份或其他请求数据进行调整。
出于隐私考虑,您可以通过 langgraph.json
文件中的 http.configurable_headers
配置项来控制哪些请求头会传递给运行时配置。
以下是自定义包含与排除请求头的方法:
{
"http": {
"configurable_headers": {
"include": ["x-user-id", "x-organization-id", "my-prefix-*"],
"exclude": ["authorization", "x-api-key"]
}
}
}
include
和 exclude
列表支持精确的头部名称,或使用 *
通配符来匹配任意数量的字符。出于安全考虑,不支持其他正则表达式模式。
在图表中使用
你可以通过任何节点的 config
参数来访问包含的头部信息。
def my_node(state, config):
organization_id = config["configurable"].get("x-organization-id")
...
或者通过从上下文中获取(在工具或其他嵌套函数内部使用时很有用)。
from langgraph.config import get_config
def search_everything(query: str):
organization_id = get_config()["configurable"].get("x-organization-id")
...
你甚至可以用它来动态编译图结构。
# my_graph.py.
import contextlib
@contextlib.asynccontextmanager
async def generate_agent(config):
organization_id = config["configurable"].get("x-organization-id")
if organization_id == "org1":
graph = ...
yield graph
else:
graph = ...
yield graph
{
"graphs": {"agent": "my_grph.py:generate_agent"}
}
退出可配置标头功能
如果您希望退出可配置标头功能,只需在 exclude
列表中设置通配符模式即可:
{
"http": {
"configurable_headers": {
"exclude": ["*"]
}
}
}
这将排除所有头部信息被添加到您的运行配置中。
请注意,排除规则优先于包含规则。
流式传输
https://langchain-ai.github.io/langgraph/cloud/concepts/streaming/
流式传输对于让LLM应用程序对终端用户保持响应至关重要。
在创建流式运行时,流式模式决定了哪些类型的数据会被流式传输回API客户端。
支持的流式模式
LangGraph 平台支持以下流式模式:
模式 | 描述 | LangGraph 库方法 |
---|---|---|
values |
在每个超级步骤后流式传输完整的图状态。指南 | .stream() / .astream() 配合 stream_mode="values" |
updates |
仅在每个节点后流式传输图状态的更新。指南 | .stream() / .astream() 配合 stream_mode="updates" |
messages-tuple |
流式传输图中生成的任何消息的LLM令牌(适用于聊天应用)。指南 | .stream() / .astream() 配合 stream_mode="messages" |
debug |
在整个图执行过程中流式传输调试信息。指南 | .stream() / .astream() 配合 stream_mode="debug" |
custom |
流式传输自定义数据。指南 | .stream() / .astream() 配合 stream_mode="custom" |
events |
流式传输所有事件(包括图的状态);主要适用于迁移大型LCEL应用时。指南 | .astream_events() |
✅ 您还可以同时组合多种模式。配置详情请参阅操作指南。
无状态运行
如果您不想将流式运行的输出结果持久化到检查点数据库中,可以通过以下方式创建无状态运行而无需创建线程:
Python
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)
async for chunk in client.runs.stream(
None, # (1)!
assistant_id,
input=inputs,
stream_mode="updates"
):
print(chunk.data)
1、我们传递的是 None
而不是一个 thread_id
UUID。
JavaScript
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL>, apiKey: <API_KEY> });
// create a streaming run
const streamResponse = client.runs.stream(
null, // (1)!
assistantID,
{
input,
streamMode: "updates"
}
);
for await (const chunk of streamResponse) {
console.log(chunk.data);
}
1、我们传递的是 None
而不是一个 thread_id
UUID。
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/runs/stream \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>'
--data "{
\"assistant_id\": \"agent\",
\"input\": <inputs>,
\"stream_mode\": \"updates\"
}"
加入并流式传输
LangGraph 平台允许您加入一个正在运行的后台任务并从中流式获取输出。为此,您可以使用 LangGraph SDK 的 client.runs.join_stream
方法:
Python
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)
async for chunk in client.runs.join_stream(
thread_id,
run_id, # (1)!
):
print(chunk)
1、这是你想加入的现有运行的 run_id
。
JavaScript
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL>, apiKey: <API_KEY> });
const streamResponse = client.runs.joinStream(
threadID,
runId // (1)!
);
for await (const chunk of streamResponse) {
console.log(chunk);
}
1、这是您想要加入的现有运行的 run_id
。
cURL
curl --request GET \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/<RUN_ID>/stream \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>'
输出未缓冲
当使用 .join_stream
时,输出不会被缓冲,因此在加入之前产生的任何输出都不会被接收。
API 参考
关于 API 的使用和实现,请参阅 API 参考文档。
LangGraph 云服务流式处理指南
https://langchain-ai.github.io/langgraph/cloud/how-tos/streaming/
流输出
流式 API
LangGraph SDK 允许您从 LangGraph API 服务器流式传输输出。
基础用法示例:
Python
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)
# Using the graph deployed with the name "agent"
assistant_id = "agent"
# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]
# create a streaming run
async for chunk in client.runs.stream(
thread_id,
assistant_id,
input=inputs,
stream_mode="updates"
):
print(chunk.data)
JavaScript
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL>, apiKey: <API_KEY> });
// Using the graph deployed with the name "agent"
const assistantID = "agent";
// create a thread
const thread = await client.threads.create();
const threadID = thread["thread_id"];
// create a streaming run
const streamResponse = client.runs.stream(
threadID,
assistantID,
{
input,
streamMode: "updates"
}
);
for await (const chunk of streamResponse) {
console.log(chunk.data);
}
cURL
创建线程:
curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'
创建流式运行任务:
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/stream \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>'
--data "{
\"assistant_id\": \"agent\",
\"input\": <inputs>,
\"stream_mode\": \"updates\"
}"
扩展示例:流式更新
这是一个可以在 LangGraph API 服务器中运行的示例图。
更多详情请参阅 LangGraph 平台快速入门。
# graph.py
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
topic: str
joke: str
def refine_topic(state: State):
return {"topic": state["topic"] + " and cats"}
def generate_joke(state: State):
return {"joke": f"This is a joke about {state['topic']}"}
graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge(START, "refine_topic")
.add_edge("refine_topic", "generate_joke")
.add_edge("generate_joke", END)
.compile()
)
当你运行起 LangGraph API 服务器后,可以通过以下方式与其交互:
Python
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)
# Using the graph deployed with the name "agent"
assistant_id = "agent"
# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]
# create a streaming run
async for chunk in client.runs.stream( # (1)!
thread_id,
assistant_id,
input={"topic": "ice cream"},
stream_mode="updates" # (2)!
):
print(chunk.data)
1、client.runs.stream()
方法返回一个迭代器,用于生成流式输出。
2、设置 stream_mode="updates"
可仅流式传输每个节点后的图状态更新。还支持其他流模式,详情请参阅支持的流模式。
JavaScript
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });
// Using the graph deployed with the name "agent"
const assistantID = "agent";
// create a thread
const thread = await client.threads.create();
const threadID = thread["thread_id"];
// create a streaming run
const streamResponse = client.runs.stream( // (1)!
threadID,
assistantID,
{
input: { topic: "ice cream" },
streamMode: "updates" // (2)!
}
);
for await (const chunk of streamResponse) {
console.log(chunk.data);
}
1、client.runs.stream()
方法返回一个迭代器,用于生成流式输出。
2、设置 streamMode: "updates"
可以仅流式传输每个节点后图状态的更新。还支持其他流模式,详情请参阅支持的流模式。
cURL
创建线程:
curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'
创建流式运行任务:
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/stream \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {\"topic\": \"ice cream\"},
\"stream_mode\": \"updates\"
}"
{'run_id': '1f02c2b3-3cef-68de-b720-eec2a4a8e920', 'attempt': 1}
{'refine_topic': {'topic': 'ice cream and cats'}}
{'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}
支持的流模式
模式 | 描述 | LangGraph 库方法 |
---|---|---|
values |
在每个超级步骤后流式传输完整的图状态。 | .stream() / .astream() 配合 stream_mode="values" 使用 |
updates |
流式传输图每一步执行后的状态更新。如果同一步骤中有多次更新(例如运行多个节点),这些更新会分别流式传输。 | .stream() / .astream() 配合 stream_mode="updates" 使用 |
messages-tuple |
流式传输调用LLM的图节点生成的LLM令牌和元数据(适用于聊天应用)。 | .stream() / .astream() 配合 stream_mode="messages" 使用 |
debug |
在图执行过程中流式传输尽可能多的调试信息。 | .stream() / .astream() 配合 stream_mode="debug" 使用 |
custom |
从图内部流式传输自定义数据。 | .stream() / .astream() 配合 stream_mode="custom" 使用 |
events |
流式传输所有事件(包括图的状态);主要适用于迁移大型LCEL应用时。 | .astream_events() |
流式传输多种模式
您可以通过向 stream_mode
参数传递一个列表来同时流式传输多种模式。
流式输出的结果将是 (mode, chunk)
元组,其中 mode
表示流模式的名称,chunk
则是该模式传输的数据片段。
Python
async for chunk in client.runs.stream(
thread_id,
assistant_id,
input=inputs,
stream_mode=["updates", "custom"]
):
print(chunk)
JavaScript
const streamResponse = client.runs.stream(
threadID,
assistantID,
{
input,
streamMode: ["updates", "custom"]
}
);
for await (const chunk of streamResponse) {
console.log(chunk);
}
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/stream \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": <inputs>,
\"stream_mode\": [
\"updates\"
\"custom\"
]
}"
流式图状态
使用流模式 updates
和 values
可以在图执行时流式传输其状态。
updates
模式会在图的每一步执行后,流式传输状态的更新部分。values
模式会在图的每一步执行后,流式传输状态的完整值。
示例图
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
topic: str
joke: str
def refine_topic(state: State):
return {"topic": state["topic"] + " and cats"}
def generate_joke(state: State):
return {"joke": f"This is a joke about {state['topic']}"}
graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge(START, "refine_topic")
.add_edge("refine_topic", "generate_joke")
.add_edge("generate_joke", END)
.compile()
)
有状态运行
以下示例假设您希望将流式运行的输出持久化到检查点数据库中,并且已创建了一个线程。创建线程的方法如下:
Python
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)
# Using the graph deployed with the name "agent"
assistant_id = "agent"
# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]
JavaScript
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });
// Using the graph deployed with the name "agent"
const assistantID = "agent";
// create a thread
const thread = await client.threads.create();
const threadID = thread["thread_id"]
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'
如果不需要持久化运行输出结果,在流式传输时可以用 None
代替 thread_id
。
updatesvalues
此功能用于仅流式传输节点每步执行后返回的状态更新。流式输出内容包含节点名称及对应更新。
Python
async for chunk in client.runs.stream(
thread_id,
assistant_id,
input={"topic": "ice cream"},
stream_mode="updates"
):
print(chunk.data)
JavaScript
const streamResponse = client.runs.stream(
threadID,
assistantID,
{
input: { topic: "ice cream" },
streamMode: "updates"
}
);
for await (const chunk of streamResponse) {
console.log(chunk.data);
}
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/stream \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {\"topic\": \"ice cream\"},
\"stream_mode\": \"updates\"
}"
使用此功能可在每个步骤后流式传输图的完整状态。
Python
async for chunk in client.runs.stream(
thread_id,
assistant_id,
input={"topic": "ice cream"},
stream_mode="values"
):
print(chunk.data)
JavaScript
const streamResponse = client.runs.stream(
threadID,
assistantID,
{
input: { topic: "ice cream" },
streamMode: "values"
}
);
for await (const chunk of streamResponse) {
console.log(chunk.data);
}
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/stream \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {\"topic\": \"ice cream\"},
\"stream_mode\": \"values\"
}"
子图
若要在流式输出中包含子图的输出结果,您可以在父图的.stream()
方法中设置subgraphs=True
参数。该设置会同时输出父图及所有子图的结果数据流。
for chunk in client.runs.stream(
thread_id,
assistant_id,
input={"foo": "foo"},
stream_subgraphs=True, # (1)!
stream_mode="updates",
):
print(chunk)
1、设置 stream_subgraphs=True
以从子图流式传输输出。
扩展示例:从子图流式传输
这是一个可以在 LangGraph API 服务器中运行的示例图。
更多详情请参阅 LangGraph 平台快速入门。
# graph.py
from langgraph.graph import START, StateGraph
from typing import TypedDict
# Define subgraph
class SubgraphState(TypedDict):
foo: str # note that this key is shared with the parent graph state
bar: str
def subgraph_node_1(state: SubgraphState):
return {"bar": "bar"}
def subgraph_node_2(state: SubgraphState):
return {"foo": state["foo"] + state["bar"]}
subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()
# Define parent graph
class ParentState(TypedDict):
foo: str
def node_1(state: ParentState):
return {"foo": "hi! " + state["foo"]}
builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()
当你运行起 LangGraph API 服务器后,可以通过以下方式与其交互:
Python JavaScript cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)
# Using the graph deployed with the name "agent"
assistant_id = "agent"
# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]
async for chunk in client.runs.stream(
thread_id,
assistant_id,
input={"foo": "foo"},
stream_subgraphs=True, # (1)!
stream_mode="updates",
):
print(chunk)
1、设置 stream_subgraphs=True
以从子图流式传输输出。
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });
// Using the graph deployed with the name "agent"
const assistantID = "agent";
// create a thread
const thread = await client.threads.create();
const threadID = thread["thread_id"];
// create a streaming run
const streamResponse = client.runs.stream(
threadID,
assistantID,
{
input: { foo: "foo" },
streamSubgraphs: true, // (1)!
streamMode: "updates"
}
);
for await (const chunk of streamResponse) {
console.log(chunk);
}
1、设置 streamSubgraphs: true
以从子图流式传输输出。
创建线程:
curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'
创建流式运行:
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/stream \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {\"foo\": \"foo\"},
\"stream_subgraphs\": true,
\"stream_mode\": [
\"updates\"
]
}"
注意:我们接收到的不仅是节点更新,还包括命名空间信息,这些命名空间会告诉我们当前正在从哪个图(或子图)进行数据流传输。
调试
使用 debug
流模式可以在图执行过程中流式传输尽可能多的信息。流式输出内容包括节点名称以及完整状态。
Python JavaScript cURL
async for chunk in client.runs.stream(
thread_id,
assistant_id,
input={"topic": "ice cream"},
stream_mode="debug"
):
print(chunk.data)
const streamResponse = client.runs.stream(
threadID,
assistantID,
{
input: { topic: "ice cream" },
streamMode: "debug"
}
);
for await (const chunk of streamResponse) {
console.log(chunk.data);
}
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/stream \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {\"topic\": \"ice cream\"},
\"stream_mode\": \"debug\"
}"
LLM 令牌
使用 messages-tuple
流式传输模式,可以从图的任何部分(包括节点、工具、子图或任务)逐个令牌流式传输大型语言模型(LLM)的输出。
messages-tuple
模式 流式输出的结果是一个元组 (message_chunk, metadata)
,其中:
message_chunk
:来自 LLM 的令牌或消息片段。metadata
:包含有关图节点和 LLM 调用详情的字典。
示例图
from dataclasses import dataclass
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START
@dataclass
class MyState:
topic: str
joke: str = ""
llm = init_chat_model(model="openai:gpt-4o-mini")
def call_model(state: MyState):
"""Call the LLM to generate a joke about a topic"""
llm_response = llm.invoke( # (1)!
[
{"role": "user", "content": f"Generate a joke about {state.topic}"}
]
)
return {"joke": llm_response.content}
graph = (
StateGraph(MyState)
.add_node(call_model)
.add_edge(START, "call_model")
.compile()
)
1、请注意,即使使用 .invoke
而非 .stream
运行 LLM,消息事件仍会被触发。
Python JavaScript cURL
async for chunk in client.runs.stream(
thread_id,
assistant_id,
input={"topic": "ice cream"},
stream_mode="messages-tuple",
):
if chunk.event != "messages":
continue
message_chunk, metadata = chunk.data # (1)!
if message_chunk["content"]:
print(message_chunk["content"], end="|", flush=True)
1、"messages-tuple"流模式返回一个由元组(message_chunk, metadata)
组成的迭代器,其中message_chunk
是LLM流式输出的token,metadata
是一个字典,包含调用LLM的图节点信息及其他相关信息。
const streamResponse = client.runs.stream(
threadID,
assistantID,
{
input: { topic: "ice cream" },
streamMode: "messages-tuple"
}
);
for await (const chunk of streamResponse) {
if (chunk.event !== "messages") {
continue;
}
console.log(chunk.data[0]["content"]); // (1)!
}
1、"messages-tuple"流模式返回一个由元组(message_chunk, metadata)
组成的迭代器,其中message_chunk
是LLM流式输出的令牌,metadata
是一个字典,包含调用LLM的图节点信息及其他相关信息。
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/stream \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {\"topic\": \"ice cream\"},
\"stream_mode\": \"messages-tuple\"
}"
过滤LLM生成的token
- 若要过滤LLM调用产生的流式token,您可以为LLM调用关联
tags
来实现。 - 若只需接收特定节点生成的token,请使用
stream_mode="messages"
模式,并通过流式元数据中的langgraph_node
字段进行输出过滤。
流式传输自定义数据
要发送自定义用户定义的数据:
Python JavaScript cURL
async for chunk in client.runs.stream(
thread_id,
assistant_id,
input={"query": "example"},
stream_mode="custom"
):
print(chunk.data)
const streamResponse = client.runs.stream(
threadID,
assistantID,
{
input: { query: "example" },
streamMode: "custom"
}
);
for await (const chunk of streamResponse) {
console.log(chunk.data);
}
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/stream \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {\"query\": \"example\"},
\"stream_mode\": \"custom\"
}"
流式事件
要流式传输所有事件(包括图的状态):
Python JavaScript cURL
async for chunk in client.runs.stream(
thread_id,
assistant_id,
input={"topic": "ice cream"},
stream_mode="events"
):
print(chunk.data)
const streamResponse = client.runs.stream(
threadID,
assistantID,
{
input: { topic: "ice cream" },
streamMode: "events"
}
);
for await (const chunk of streamResponse) {
console.log(chunk.data);
}
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/stream \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {\"topic\": \"ice cream\"},
\"stream_mode\": \"events\"
}"
LangGraph 云服务:如何实现人机协同
https://langchain-ai.github.io/langgraph/cloud/how-tos/add-human-in-the-loop/
人在回路
LangGraph 支持强大的**人在回路(HIL)**工作流,允许在自动化流程的任何环节进行人工干预。这对于由大语言模型(LLM)驱动的应用特别有用,因为模型输出可能需要验证、修正或补充上下文。
更多信息请参阅 LangGraph 人在回路功能概述。
interrupt
LangGraph中的interrupt
函数通过以下方式实现人机协同工作流:在特定节点暂停图执行、向人类展示信息,并根据人工输入继续执行图。该功能适用于审批流程、内容编辑或收集额外上下文等场景。
图的继续执行通过Command
对象实现,该对象提供了人类的响应数据。
使用interrupt
的图节点:
from langgraph.types import interrupt, Command
def human_node(state: State):
value = interrupt( # (1)!
{
"text_to_revise": state["some_text"] # (2)!
}
)
return {
"some_text": value # (3)!
}
1、interrupt(...)
会在 human_node
处暂停执行,并将给定的负载呈现给人工处理。
2、任何可 JSON 序列化的值都可以传递给 interrupt
函数。这里传递的是一个包含待修订文本的字典。
3、恢复执行后,interrupt(...)
的返回值是人工提供的输入,该值用于更新状态。
LangGraph API 调用与恢复:
Python
from langgraph_sdk import get_client
from langgraph_sdk.schema import Command
client = get_client(url=<DEPLOYMENT_URL>)
# Using the graph deployed with the name "agent"
assistant_id = "agent"
# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]
# Run the graph until the interrupt is hit.
result = await client.runs.wait(
thread_id,
assistant_id,
input={"some_text": "original text"} # (1)!
)
print(result['__interrupt__']) # (2)!
# > [
# > {
# > 'value': {'text_to_revise': 'original text'},
# > 'resumable': True,
# > 'ns': ['human_node:fc722478-2f21-0578-c572-d9fc4dd07c3b'],
# > 'when': 'during'
# > }
# > ]
# Resume the graph
print(await client.runs.wait(
thread_id,
assistant_id,
command=Command(resume="Edited text") # (3)!
))
# > {'some_text': 'Edited text'}
1、图以某种初始状态被调用。
2、当图遇到中断时,它会返回一个包含有效载荷和元数据的中断对象。
3、图通过Command(resume=...)
恢复执行,注入人工输入并继续运行。
JavaScript
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });
// Using the graph deployed with the name "agent"
const assistantID = "agent";
// create a thread
const thread = await client.threads.create();
const threadID = thread["thread_id"];
// Run the graph until the interrupt is hit.
const result = await client.runs.wait(
threadID,
assistantID,
{ input: { "some_text": "original text" } } // (1)!
);
console.log(result['__interrupt__']); // (2)!
// > [
// > {
// > 'value': {'text_to_revise': 'original text'},
// > 'resumable': True,
// > 'ns': ['human_node:fc722478-2f21-0578-c572-d9fc4dd07c3b'],
// > 'when': 'during'
// > }
// > ]
// Resume the graph
console.log(await client.runs.wait(
threadID,
assistantID,
{ command: { resume: "Edited text" }} // (3)!
));
// > {'some_text': 'Edited text'}
1、图以某种初始状态被调用。
2、当图遇到中断时,它会返回一个包含有效载荷和元数据的中断对象。
3、通过传入一个{ resume: ... }
命令对象来恢复图的执行,注入人工输入并继续运行。
cURL
创建线程:
curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'
运行图表直到触发中断。
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/wait \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {\"some_text\": \"original text\"}
}"
恢复图:
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/wait \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"command\": {
\"resume\": \"Edited text\"
}
}"
扩展示例:使用 interrupt
这是一个可以在 LangGraph API 服务器中运行的示例图。
更多详情请参阅 LangGraph 平台快速入门
from typing import TypedDict
import uuid
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
class State(TypedDict):
some_text: str
def human_node(state: State):
value = interrupt( # (1)!
{
"text_to_revise": state["some_text"] # (2)!
}
)
return {
"some_text": value # (3)!
}
# Build the graph
graph_builder = StateGraph(State)
graph_builder.add_node("human_node", human_node)
graph_builder.add_edge(START, "human_node")
graph = graph_builder.compile()
1、interrupt(...)
会在 human_node
处暂停执行,并将给定的 payload 呈现给人工处理。
2、任何可 JSON 序列化的值都可以传递给 interrupt
函数。此处传递的是一个包含待修订文本的字典。
3、当流程恢复后,interrupt(...)
的返回值将是人工提供的输入,该值用于更新状态。
当 LangGraph API 服务运行后,您可以通过以下方式与之交互:
Python
from langgraph_sdk import get_client
from langgraph_sdk.schema import Command
client = get_client(url=<DEPLOYMENT_URL>)
# Using the graph deployed with the name "agent"
assistant_id = "agent"
# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]
# Run the graph until the interrupt is hit.
result = await client.runs.wait(
thread_id,
assistant_id,
input={"some_text": "original text"} # (1)!
)
print(result['__interrupt__']) # (2)!
# > [
# > {
# > 'value': {'text_to_revise': 'original text'},
# > 'resumable': True,
# > 'ns': ['human_node:fc722478-2f21-0578-c572-d9fc4dd07c3b'],
# > 'when': 'during'
# > }
# > ]
# Resume the graph
print(await client.runs.wait(
thread_id,
assistant_id,
command=Command(resume="Edited text") # (3)!
))
# > {'some_text': 'Edited text'}
1、图以某种初始状态被调用。
2、当图遇到中断时,它会返回一个包含有效载荷和元数据的中断对象。
3、通过Command(resume=...)
恢复图的执行,注入人工输入并继续运行。
JavaScript
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });
// Using the graph deployed with the name "agent"
const assistantID = "agent";
// create a thread
const thread = await client.threads.create();
const threadID = thread["thread_id"];
// Run the graph until the interrupt is hit.
const result = await client.runs.wait(
threadID,
assistantID,
{ input: { "some_text": "original text" } } // (1)!
);
console.log(result['__interrupt__']); // (2)!
// > [
// > {
// > 'value': {'text_to_revise': 'original text'},
// > 'resumable': True,
// > 'ns': ['human_node:fc722478-2f21-0578-c572-d9fc4dd07c3b'],
// > 'when': 'during'
// > }
// > ]
// Resume the graph
console.log(await client.runs.wait(
threadID,
assistantID,
{ command: { resume: "Edited text" }} // (3)!
));
// > {'some_text': 'Edited text'}
1、图以某种初始状态被调用。
2、当图遇到中断时,它会返回一个包含有效载荷和元数据的中断对象。
3、图通过一个{ resume: ... }
命令对象恢复运行,注入人工输入并继续执行。
cURL
创建一个线程:
curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'
运行图表直到触发中断
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/wait \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {\"some_text\": \"original text\"}
}"
恢复图执行
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/wait \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"command\": {
\"resume\": \"Edited text\"
}
}"
了解更多
- LangGraph 人机交互功能概述:深入了解 LangGraph 的人机交互特性。
- 设计模式:学习如何实现审批/拒绝操作、请求用户输入等模式。
- 如何审查工具调用:详细示例展示如何审查、批准/编辑工具调用或向工具调用型 LLM 提供反馈。
https://langchain-ai.github.io/langgraph/cloud/how-tos/human_in_the_loop_breakpoint/
断点
断点会在定义的位置暂停图执行流程,允许您逐步检查每个阶段。它们利用了LangGraph的持久层机制,该机制会在每一步执行后保存图状态。
通过断点功能,您可以在任意节点检查图的状态和输入数据。执行过程将持续暂停直到您手动恢复,因为检查点机制会完整保留当前状态。
设置断点
编译时运行时
graph = graph_builder.compile( # (1)!
interrupt_before=["node_a"], # (2)!
interrupt_after=["node_b", "node_c"], # (3)!
)
1、断点是在 compile
时设置的。
2、interrupt_before
指定了在执行节点之前应暂停执行的节点。
3、interrupt_after
指定了在执行节点之后应暂停执行的节点。
Python
await client.runs.wait( # (1)!
thread_id,
assistant_id,
inputs=inputs,
interrupt_before=["node_a"], # (2)!
interrupt_after=["node_b", "node_c"] # (3)!
)
1、调用 client.runs.wait
时需传入 interrupt_before
和 interrupt_after
参数。这是运行时配置,每次调用都可调整。
2、interrupt_before
用于指定节点执行前应暂停的节点。
3、interrupt_after
用于指定节点执行后应暂停的节点。
JavaScript
await client.runs.wait( // (1)!
threadID,
assistantID,
{
input: input,
interruptBefore: ["node_a"], // (2)!
interruptAfter: ["node_b", "node_c"] // (3)!
}
)
1、调用 client.runs.wait
时需传入 interruptBefore
和 interruptAfter
参数。此为运行时配置,每次调用均可调整。
2、interruptBefore
用于指定在节点执行前应暂停执行的节点位置。
3、interruptAfter
用于指定在节点执行后应暂停执行的节点位置。
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/wait \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"interrupt_before\": [\"node_a\"],
\"interrupt_after\": [\"node_b\", \"node_c\"],
\"input\": <INPUT>
}"
提示:
本示例展示了如何添加静态断点。如需了解添加断点的更多选项,请参阅本指南。
Python JavaScript cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)
# Using the graph deployed with the name "agent"
assistant_id = "agent"
# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]
# Run the graph until the breakpoint
result = await client.runs.wait(
thread_id,
assistant_id,
input=inputs # (1)!
)
# Resume the graph
await client.runs.wait(
thread_id,
assistant_id,
input=None # (2)!
)
1、运行图表直到遇到第一个断点。
2、通过传入 None
作为输入来恢复图表运行。这将使图表继续执行,直到遇到下一个断点。
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });
// Using the graph deployed with the name "agent"
const assistantID = "agent";
// create a thread
const thread = await client.threads.create();
const threadID = thread["thread_id"];
// Run the graph until the breakpoint
const result = await client.runs.wait(
threadID,
assistantID,
{ input: input } // (1)!
);
// Resume the graph
await client.runs.wait(
threadID,
assistantID,
{ input: null } // (2)!
);
1、运行图表直到命中第一个断点。
2、通过传入 null
作为输入来恢复图表执行。这将使图表继续运行,直到命中下一个断点。
创建线程:
curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'
运行图表直到断点处:
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/wait \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": <INPUT>
}"
恢复图执行:
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/wait \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\"
}"
了解更多
- LangGraph断点指南:详细了解如何在LangGraph中添加断点。
https://langchain-ai.github.io/langgraph/cloud/how-tos/human_in_the_loop_time_travel/
时间回溯
LangGraph提供了时间回溯功能,可以从先前的检查点恢复执行——既可以重放相同状态,也可以修改状态来探索不同可能性。无论哪种情况,恢复历史执行都会在记录中创建新的分支。
使用时间旅行功能
要在LangGraph中使用时间旅行功能:
1、运行图结构:使用LangGraph SDK 的 client.runs.wait
API,传入初始输入参数来运行图结构。
2、定位现有线程中的检查点:使用client.threads.get_history
方法获取特定thread_id
的执行历史,并找到所需的checkpoint_id
。
或者,在需要暂停执行的节点前设置断点,然后可以找到该断点前记录的最新检查点。
3、(可选)修改图状态:使用client.threads.update_state
方法修改检查点处的图状态,并从修改后的状态恢复执行。
4、从检查点恢复执行:使用client.runs.wait
API,传入None
作为输入参数,并指定相应的thread_id
和checkpoint_id
。
示例
示例图表
from typing_extensions import TypedDict, NotRequired
from langgraph.graph import StateGraph, START, END
from langchain.chat_models import init_chat_model
from langgraph.checkpoint.memory import InMemorySaver
class State(TypedDict):
topic: NotRequired[str]
joke: NotRequired[str]
llm = init_chat_model(
"anthropic:claude-3-7-sonnet-latest",
temperature=0,
)
def generate_topic(state: State):
"""LLM call to generate a topic for the joke"""
msg = llm.invoke("Give me a funny topic for a joke")
return {"topic": msg.content}
def write_joke(state: State):
"""LLM call to write a joke based on the topic"""
msg = llm.invoke(f"Write a short joke about {state['topic']}")
return {"joke": msg.content}
# Build workflow
builder = StateGraph(State)
# Add nodes
builder.add_node("generate_topic", generate_topic)
builder.add_node("write_joke", write_joke)
# Add edges to connect nodes
builder.add_edge(START, "generate_topic")
builder.add_edge("generate_topic", "write_joke")
# Compile
graph = builder.compile()
1、运行图
Python JavaScript cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)
# Using the graph deployed with the name "agent"
assistant_id = "agent"
# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]
# Run the graph
result = await client.runs.wait(
thread_id,
assistant_id,
input={}
)
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });
// Using the graph deployed with the name "agent"
const assistantID = "agent";
// create a thread
const thread = await client.threads.create();
const threadID = thread["thread_id"];
// Run the graph
const result = await client.runs.wait(
threadID,
assistantID,
{ input: {}}
);
创建线程:
curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'
运行图:
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/wait \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {}
}"
2、识别检查点
Python JavaScript cURL
# The states are returned in reverse chronological order.
states = await client.threads.get_history(thread_id)
selected_state = states[1]
print(selected_state)
// The states are returned in reverse chronological order.
const states = await client.threads.getHistory(threadID);
const selectedState = states[1];
console.log(selectedState);
curl --request GET \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/history \
--header 'Content-Type: application/json'
3、更新状态(可选)
update_state
会创建一个新的检查点。新检查点将与原线程关联,但会分配一个新的检查点 ID。
Python
new_config = await client.threads.update_state(
thread_id,
{"topic": "chickens"},
checkpoint_id=selected_state["checkpoint_id"]
)
print(new_config)
JavaScript
const newConfig = await client.threads.updateState(
threadID,
{
values: { "topic": "chickens" },
checkpointId: selectedState["checkpoint_id"]
}
);
console.log(newConfig);
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/state \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"checkpoint_id\": <CHECKPOINT_ID>,
\"values\": {\"topic\": \"chickens\"}
}"
4、从检查点恢复执行
Python
await client.runs.wait(
thread_id,
assistant_id,
input=None,
checkpoint_id=new_config["checkpoint_id"]
)
Javascript
await client.runs.wait(
threadID,
assistantID,
{
input: null,
checkpointId: newConfig["checkpoint_id"]
}
);
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/wait \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"checkpoint_id\": <CHECKPOINT_ID>
}"
了解更多
- LangGraph 时间旅行指南: 详细了解如何在 LangGraph 中使用时间旅行功能。
MCP Endpoint
https://langchain-ai.github.io/langgraph/concepts/server-mcp
模型上下文协议(MCP) 是一种开放协议,用于以模型无关的格式描述工具和数据源,使大型语言模型(LLMs)能够通过结构化 API 发现并使用它们。
LangGraph 服务器 使用 可流式 HTTP 传输 实现了 MCP。这使得 LangGraph 代理 可以作为 MCP 工具 暴露,使其能够与任何支持可流式 HTTP 的 MCP 兼容客户端一起使用。
MCP 端点位于:
/mcp
请查看 LangGraph 服务端。
需求说明
使用 MCP 前,请确保已安装以下依赖项:
langgraph-api >= 0.2.3
langgraph-sdk >= 0.1.61
可通过以下命令安装:
pip install "langgraph-api>=0.2.3" "langgraph-sdk>=0.1.61"
将代理作为MCP工具公开
部署后,您的代理将按照以下配置显示为MCP端点中的一个工具:
- 工具名称:代理的名称。
- 工具描述:代理的描述。
- 工具输入模式:代理的输入模式。
设置名称和描述
您可以在 langgraph.json
中配置智能体的名称和描述信息:
{
"graphs": {
"my_agent": {
"path": "./my_agent/agent.py:graph",
"description": "A description of what the agent does"
}
},
"env": ".env"
}
部署后,您可以使用 LangGraph SDK 更新名称和描述。
模式定义
应定义清晰、最小化的输入输出模式,避免向大语言模型(LLM)暴露不必要的内部复杂性。
默认的MessagesState使用AnyMessage
类型,虽然支持多种消息类型,但直接暴露给LLM显得过于宽泛。
建议改为定义自定义代理或工作流,使用显式类型化的输入输出结构。
例如,一个回答文档问题的工作流可以这样设计:
API参考文档: StateGraph | START | END
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
# Define input schema
class InputState(TypedDict):
question: str
# Define output schema
class OutputState(TypedDict):
answer: str
# Combine input and output
class OverallState(InputState, OutputState):
pass
# Define the processing node
def answer_node(state: InputState):
# Replace with actual logic and do something useful
return {"answer": "bye", "question": state["question"]}
# Build the graph with explicit schemas
builder = StateGraph(OverallState, input=InputState, output=OutputState)
builder.add_node(answer_node)
builder.add_edge(START, "answer_node")
builder.add_edge("answer_node", END)
graph = builder.compile()
# Run the graph
print(graph.invoke({"question": "hi"}))
更多详情,请参阅底层概念指南。
使用概述
要启用MCP功能:
- 升级至 langgraph-api>=0.2.3 版本。若您正在部署LangGraph平台,创建新版本时将自动完成此升级。
- MCP工具(代理程序)将自动对外暴露接口。
- 连接任何支持流式HTTP的MCP兼容客户端。
客户端
使用符合 MCP 规范的客户端连接 LangGraph 服务器。以下示例展示了如何通过不同编程语言进行连接。
JavaScript/TypeScriptPython
npm install @modelcontextprotocol/sdk
注意
将 serverUrl
替换为您的 LangGraph 服务器 URL,并根据需要配置身份验证标头。
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
// Connects to the LangGraph MCP endpoint
async function connectClient(url) {
const baseUrl = new URL(url);
const client = new Client({
name: 'streamable-http-client',
version: '1.0.0'
});
const transport = new StreamableHTTPClientTransport(baseUrl);
await client.connect(transport);
console.log("Connected using Streamable HTTP transport");
console.log(JSON.stringify(await client.listTools(), null, 2));
return client;
}
const serverUrl = "http://localhost:2024/mcp";
connectClient(serverUrl)
.then(() => {
console.log("Client connected successfully");
})
.catch(error => {
console.error("Failed to connect client:", error);
});
使用以下命令安装适配器:
pip install langchain-mcp-adapters
以下是一个示例,展示如何连接到远程 MCP 端点并使用代理作为工具:
# Create server parameters for stdio connection
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
import asyncio
from langchain_mcp_adapters.tools import load_mcp_tools
from langgraph.prebuilt import create_react_agent
server_params = {
"url": "https://mcp-finance-agent.xxx.us.langgraph.app/mcp",
"headers": {
"X-Api-Key":"lsv2_pt_your_api_key"
}
}
async def main():
async with streamablehttp_client(**server_params) as (read, write, _):
async with ClientSession(read, write) as session:
# Initialize the connection
await session.initialize()
# Load the remote graph as if it was a tool
tools = await load_mcp_tools(session)
# Create and run a react agent with the tools
agent = create_react_agent("openai:gpt-4.1", tools)
# Invoke the agent with a message
agent_response = await agent.ainvoke({"messages": "What can the finance agent do for me?"})
print(agent_response)
if __name__ == "__main__":
asyncio.run(main())
会话行为
当前 LangGraph MCP 实现不支持会话功能。每个 /mcp
请求都是无状态且相互独立的。
认证
/mcp
端点采用与 LangGraph API 其他部分相同的认证机制。具体设置方法请参阅认证指南。
禁用 MCP
要禁用 MCP 端点,请在您的 langgraph.json
配置文件中将 disable_mcp
设置为 true
:
{
"http": {
"disable_mcp": true
}
}
这将阻止服务器暴露 /mcp
端点。
重复消息处理
https://langchain-ai.github.io/langgraph/concepts/double_texting/
前置条件
用户可能会以非预期的方式与您的图进行交互。例如,用户可能发送第一条消息后,在图运行完成前又发送第二条消息。更普遍的情况是,用户可能在第一次运行未完成时就再次调用图。我们将这种现象称为"重复消息"。
目前,LangGraph 仅在 LangGraph 平台 中处理此问题,开源版本暂未包含该功能。这是因为要处理这种情况,我们需要了解图的部署方式,而 LangGraph 平台负责部署工作,因此相关逻辑需要内置其中。如果您不想使用 LangGraph 平台,下文将详细介绍我们已实现的替代方案。
拒绝
这是最简单的选项,它会直接拒绝任何后续运行,不允许重复发送文本。
关于如何配置拒绝重复文本选项,请参阅操作指南。
入队
这是一个相对简单的选项,它会持续执行首次运行直至整个流程完成,然后将新输入作为单独的运行任务发送。如需配置入队双文本选项,请参阅操作指南。
中断
此选项会中断当前执行,但会保存截至该时间点已完成的所有工作。随后插入用户输入,并从中断处继续执行。
启用此选项后,您的流程图应能处理可能出现的异常边界情况。例如,您可能已调用某个工具但尚未获得该工具运行结果,此时可能需要移除该工具调用以避免出现悬空工具调用。
有关配置中断双文本选项的具体操作,请参阅操作指南。
回滚
此选项会中断当前执行,并回滚之前完成的所有工作(包括原始运行输入)。然后它会重新发送新的用户输入,基本上就像这是原始输入一样。
有关配置回滚双文本选项的详细步骤,请参阅操作指南。
如何使用中断选项
https://langchain-ai.github.io/langgraph/cloud/how-tos/interrupt_concurrent/
本指南假设您已了解什么是双文本发送,相关概念可查阅双文本发送概念指南。
本指南主要介绍双文本发送中的interrupt
选项,该选项会中断图表的前次运行,并使用双文本启动新运行。此选项不会删除首次运行,而是将其保留在数据库中,并将其状态设置为interrupted
。以下是使用interrupt
选项的简单示例。
安装配置
首先,我们将定义一个快速辅助函数来打印出 JS 和 CURL 模型的输出(如果使用 Python 可以跳过此步骤):
Javascript
function prettyPrint(m) {
const padded = " " + m['type'] + " ";
const sepLen = Math.floor((80 - padded.length) / 2);
const sep = "=".repeat(sepLen);
const secondSep = sep + (padded.length % 2 ? "=" : "");
* console.log(`${sep}${padded}${secondSep}`);
console.log("\n\n");
console.log(m.content);
}
CURL
# PLACE THIS IN A FILE CALLED pretty_print.sh
pretty_print() {
local type="$1"
local content="$2"
local padded=" $type "
local total_width=80
local sep_len=$(( (total_width - ${#padded}) / 2 ))
local sep=$(printf '=%.0s' $(eval "echo {1.."${sep_len}"}"))
local second_sep=$sep
if (( (total_width - ${#padded}) % 2 )); then
second_sep="${second_sep}="
fi
* echo "${sep}${padded}${second_sep}"
echo
echo "$content"
}
现在,让我们导入所需的包并实例化客户端、助手和线程。
Python JavaScript cURL
import asyncio
from langchain_core.messages import convert_to_messages
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)
# Using the graph deployed with the name "agent"
assistant_id = "agent"
thread = await client.threads.create()
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });
// Using the graph deployed with the name "agent"
const assistantId = "agent";
const thread = await client.threads.create();
curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'
创建运行
现在我们可以启动两个运行实例,并等待第二个运行完成后再继续操作:
Python
# the first run will be interrupted
interrupted_run = await client.runs.create(
thread["thread_id"],
assistant_id,
input={"messages": [{"role": "user", "content": "what's the weather in sf?"}]},
)
# sleep a bit to get partial outputs from the first run
await asyncio.sleep(2)
run = await client.runs.create(
thread["thread_id"],
assistant_id,
input={"messages": [{"role": "user", "content": "what's the weather in nyc?"}]},
multitask_strategy="interrupt",
)
# wait until the second run completes
await client.runs.join(thread["thread_id"], run["run_id"])
JavaScript
// the first run will be interrupted
let interruptedRun = await client.runs.create(
thread["thread_id"],
assistantId,
{ input: { messages: [{ role: "human", content: "what's the weather in sf?" }] } }
);
// sleep a bit to get partial outputs from the first run
await new Promise(resolve => setTimeout(resolve, 2000));
let run = await client.runs.create(
thread["thread_id"],
assistantId,
{
input: { messages: [{ role: "human", content: "what's the weather in nyc?" }] },
multitaskStrategy: "interrupt"
}
);
// wait until the second run completes
await client.runs.join(thread["thread_id"], run["run_id"]);
cURL
curl --request POST \
--url <DEPLOY<ENT_URL>>/threads/<THREAD_ID>/runs \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {\"messages\": [{\"role\": \"human\", \"content\": \"what\'s the weather in sf?\"}]},
}" && sleep 2 && curl --request POST \
--url <DEPLOY<ENT_URL>>/threads/<THREAD_ID>/runs \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {\"messages\": [{\"role\": \"human\", \"content\": \"what\'s the weather in nyc?\"}]},
\"multitask_strategy\": \"interrupt\"
}" && curl --request GET \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/<RUN_ID>/join
查看运行结果
我们可以看到线程包含了第一次运行的部分数据加上第二次运行的数据
Python
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
m.pretty_print()
JavaScript
const state = await client.threads.getState(thread["thread_id"]);
for (const m of state['values']['messages']) {
prettyPrint(m);
}
cURL
source pretty_print.sh && curl --request GET \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/state | \
jq -c '.values.messages[]' | while read -r element; do
type=$(echo "$element" | jq -r '.type')
content=$(echo "$element" | jq -r '.content | if type == "array" then tostring else . end')
pretty_print "$type" "$content"
done
Output:
================================ Human Message =================================
what's the weather in sf?
================================== Ai Message ==================================
[{'id': 'toolu_01MjNtVJwEcpujRGrf3x6Pih', 'input': {'query': 'weather in san francisco'}, 'name': 'tavily_search_results_json', 'type': 'tool_use'}]
Tool Calls:
tavily_search_results_json (toolu_01MjNtVJwEcpujRGrf3x6Pih)
Call ID: toolu_01MjNtVJwEcpujRGrf3x6Pih
Args:
query: weather in san francisco
================================= Tool Message =================================
Name: tavily_search_results_json
[{"url": "https://www.wunderground.com/hourly/us/ca/san-francisco/KCASANFR2002/date/2024-6-18", "content": "High 64F. Winds W at 10 to 20 mph. A few clouds from time to time. Low 49F. Winds W at 10 to 20 mph. Temp. San Francisco Weather Forecasts. Weather Underground provides local & long-range weather ..."}]
================================ Human Message =================================
what's the weather in nyc?
================================== Ai Message ==================================
[{'id': 'toolu_01KtE1m1ifPLQAx4fQLyZL9Q', 'input': {'query': 'weather in new york city'}, 'name': 'tavily_search_results_json', 'type': 'tool_use'}]
Tool Calls:
tavily_search_results_json (toolu_01KtE1m1ifPLQAx4fQLyZL9Q)
Call ID: toolu_01KtE1m1ifPLQAx4fQLyZL9Q
Args:
query: weather in new york city
================================= Tool Message =================================
Name: tavily_search_results_json
[{"url": "https://www.accuweather.com/en/us/new-york/10021/june-weather/349727", "content": "Get the monthly weather forecast for New York, NY, including daily high/low, historical averages, to help you plan ahead."}]
================================== Ai Message ==================================
The search results provide weather forecasts and information for New York City. Based on the top result from AccuWeather, here are some key details about the weather in NYC:
- This is a monthly weather forecast for New York City for the month of June.
- It includes daily high and low temperatures to help plan ahead.
- Historical averages for June in NYC are also provided as a reference point.
- More detailed daily or hourly forecasts with precipitation chances, humidity, wind, etc. can be found by visiting the AccuWeather page.
So in summary, the search provides a convenient overview of the expected weather conditions in New York City over the next month to give you an idea of what to prepare for if traveling or making plans there. Let me know if you need any other details!
验证原始中断的运行确实被中断
Python
print((await client.runs.get(thread["thread_id"], interrupted_run["run_id"]))["status"])
Javascript
console.log((await client.runs.get(thread['thread_id'], interruptedRun["run_id"]))["status"])
Output:
'interrupted'
如何使用回滚选项
https://langchain-ai.github.io/langgraph/cloud/how-tos/rollback_concurrent/
本指南假设您已了解什么是双文本处理,如需了解该概念,请参阅双文本处理概念指南。
本指南将介绍双文本处理中的rollback
选项,该选项会中断图表的前次运行,并使用双文本启动新运行。此选项与interrupt
选项非常相似,但在这种情况下,第一次运行会从数据库中完全删除且无法重新启动。以下是使用rollback
选项的简单示例。
安装配置
首先,我们会定义一个快速辅助函数来打印 JS 和 CURL 模型的输出(如果使用 Python 可以跳过这部分):
Javascript
function prettyPrint(m) {
const padded = " " + m['type'] + " ";
const sepLen = Math.floor((80 - padded.length) / 2);
const sep = "=".repeat(sepLen);
const secondSep = sep + (padded.length % 2 ? "=" : "");
* console.log(`${sep}${padded}${secondSep}`);
console.log("\n\n");
console.log(m.content);
}
CURL
# PLACE THIS IN A FILE CALLED pretty_print.sh
pretty_print() {
local type="$1"
local content="$2"
local padded=" $type "
local total_width=80
local sep_len=$(( (total_width - ${#padded}) / 2 ))
local sep=$(printf '=%.0s' $(eval "echo {1.."${sep_len}"}"))
local second_sep=$sep
if (( (total_width - ${#padded}) % 2 )); then
second_sep="${second_sep}="
fi
* echo "${sep}${padded}${second_sep}"
echo
echo "$content"
}
现在,让我们导入所需的包并实例化客户端、助手和线程。
Python
import asyncio
import httpx
from langchain_core.messages import convert_to_messages
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)
# Using the graph deployed with the name "agent"
assistant_id = "agent"
thread = await client.threads.create()
JavaScript
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });
// Using the graph deployed with the name "agent"
const assistantId = "agent";
const thread = await client.threads.create();
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'
创建运行任务
现在让我们运行一个设置了多任务参数为"rollback"的线程:
Python
# the first run will be rolled back
rolled_back_run = await client.runs.create(
thread["thread_id"],
assistant_id,
input={"messages": [{"role": "user", "content": "what's the weather in sf?"}]},
)
run = await client.runs.create(
thread["thread_id"],
assistant_id,
input={"messages": [{"role": "user", "content": "what's the weather in nyc?"}]},
multitask_strategy="rollback",
)
# wait until the second run completes
await client.runs.join(thread["thread_id"], run["run_id"])
JavaScript
// the first run will be interrupted
let rolledBackRun = await client.runs.create(
thread["thread_id"],
assistantId,
{ input: { messages: [{ role: "human", content: "what's the weather in sf?" }] } }
);
let run = await client.runs.create(
thread["thread_id"],
assistant_id,
{
input: { messages: [{ role: "human", content: "what's the weather in nyc?" }] },
multitaskStrategy: "rollback"
}
);
// wait until the second run completes
await client.runs.join(thread["thread_id"], run["run_id"]);
cURL
curl --request POST \
--url <DEPLOY<ENT_URL>>/threads/<THREAD_ID>/runs \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {\"messages\": [{\"role\": \"human\", \"content\": \"what\'s the weather in sf?\"}]},
}" && curl --request POST \
--url <DEPLOY<ENT_URL>>/threads/<THREAD_ID>/runs \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {\"messages\": [{\"role\": \"human\", \"content\": \"what\'s the weather in nyc?\"}]},
\"multitask_strategy\": \"rollback\"
}" && curl --request GET \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/<RUN_ID>/join
查看运行结果
我们可以看到线程仅在第二次运行时才有数据
Python
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
m.pretty_print()
JavaScript
const state = await client.threads.getState(thread["thread_id"]);
for (const m of state['values']['messages']) {
prettyPrint(m);
}
cURL
source pretty_print.sh && curl --request GET \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/state | \
jq -c '.values.messages[]' | while read -r element; do
type=$(echo "$element" | jq -r '.type')
content=$(echo "$element" | jq -r '.content | if type == "array" then tostring else . end')
pretty_print "$type" "$content"
done
Output:
================================ Human Message =================================
what's the weather in nyc?
================================== Ai Message ==================================
[{'id': 'toolu_01JzPqefao1gxwajHQ3Yh3JD', 'input': {'query': 'weather in nyc'}, 'name': 'tavily_search_results_json', 'type': 'tool_use'}]
Tool Calls:
tavily_search_results_json (toolu_01JzPqefao1gxwajHQ3Yh3JD)
Call ID: toolu_01JzPqefao1gxwajHQ3Yh3JD
Args:
query: weather in nyc
================================= Tool Message =================================
Name: tavily_search_results_json
[{"url": "https://www.weatherapi.com/", "content": "{'location': {'name': 'New York', 'region': 'New York', 'country': 'United States of America', 'lat': 40.71, 'lon': -74.01, 'tz_id': 'America/New_York', 'localtime_epoch': 1718734479, 'localtime': '2024-06-18 14:14'}, 'current': {'last_updated_epoch': 1718733600, 'last_updated': '2024-06-18 14:00', 'temp_c': 29.4, 'temp_f': 84.9, 'is_day': 1, 'condition': {'text': 'Sunny', 'icon': '//cdn.weatherapi.com/weather/64x64/day/113.png', 'code': 1000}, 'wind_mph': 2.2, 'wind_kph': 3.6, 'wind_degree': 158, 'wind_dir': 'SSE', 'pressure_mb': 1025.0, 'pressure_in': 30.26, 'precip_mm': 0.0, 'precip_in': 0.0, 'humidity': 63, 'cloud': 0, 'feelslike_c': 31.3, 'feelslike_f': 88.3, 'windchill_c': 28.3, 'windchill_f': 82.9, 'heatindex_c': 29.6, 'heatindex_f': 85.3, 'dewpoint_c': 18.4, 'dewpoint_f': 65.2, 'vis_km': 16.0, 'vis_miles': 9.0, 'uv': 7.0, 'gust_mph': 16.5, 'gust_kph': 26.5}}"}]
================================== Ai Message ==================================
The weather API results show that the current weather in New York City is sunny with a temperature of around 85°F (29°C). The wind is light at around 2-3 mph from the south-southeast. Overall it looks like a nice sunny summer day in NYC.
验证原回滚运行的记录已被删除
Python
try:
await client.runs.get(thread["thread_id"], rolled_back_run["run_id"])
except httpx.HTTPStatusError as _:
print("Original run was correctly deleted")
Javascript
try {
await client.runs.get(thread["thread_id"], rolledBackRun["run_id"]);
} catch (e) {
console.log("Original run was correctly deleted");
}
Output:
Original run was correctly deleted
拒绝处理
https://langchain-ai.github.io/langgraph/cloud/how-tos/reject_concurrent/
本指南假设您已了解什么是双文本处理,相关概念可参考双文本概念指南。
本文将重点介绍双文本处理中的reject
选项,该选项会通过抛出错误来拒绝图的新运行,同时继续执行原始运行直至完成。以下是一个使用reject
选项的简单示例。
安装配置
首先,我们将定义一个快速辅助函数来打印 JS 和 CURL 模型输出(如果使用 Python 可以跳过此步骤):
Javascript
function prettyPrint(m) {
const padded = " " + m['type'] + " ";
const sepLen = Math.floor((80 - padded.length) / 2);
const sep = "=".repeat(sepLen);
const secondSep = sep + (padded.length % 2 ? "=" : "");
* console.log(`${sep}${padded}${secondSep}`);
console.log("\n\n");
console.log(m.content);
}
CURL
# PLACE THIS IN A FILE CALLED pretty_print.sh
pretty_print() {
local type="$1"
local content="$2"
local padded=" $type "
local total_width=80
local sep_len=$(( (total_width - ${#padded}) / 2 ))
local sep=$(printf '=%.0s' $(eval "echo {1.."${sep_len}"}"))
local second_sep=$sep
if (( (total_width - ${#padded}) % 2 )); then
second_sep="${second_sep}="
fi
* echo "${sep}${padded}${second_sep}"
echo
echo "$content"
}
现在,我们导入所需的包并实例化客户端、助手和线程。
Python
import httpx
from langchain_core.messages import convert_to_messages
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)
# Using the graph deployed with the name "agent"
assistant_id = "agent"
thread = await client.threads.create()
JavaScript
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });
// Using the graph deployed with the name "agent"
const assistantId = "agent";
const thread = await client.threads.create();
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'
创建运行任务
现在我们可以启动一个线程,并尝试使用"reject"选项启动第二个线程,由于已经存在运行中的任务,这次尝试应该会失败:
Python
run = await client.runs.create(
thread["thread_id"],
assistant_id,
input={"messages": [{"role": "user", "content": "what's the weather in sf?"}]},
)
try:
await client.runs.create(
thread["thread_id"],
assistant_id,
input={
"messages": [{"role": "user", "content": "what's the weather in nyc?"}]
},
multitask_strategy="reject",
)
except httpx.HTTPStatusError as e:
print("Failed to start concurrent run", e)
JavaScript
const run = await client.runs.create(
thread["thread_id"],
assistantId,
input={"messages": [{"role": "user", "content": "what's the weather in sf?"}]},
);
try {
await client.runs.create(
thread["thread_id"],
assistantId,
{
input: {"messages": [{"role": "user", "content": "what's the weather in nyc?"}]},
multitask_strategy:"reject"
},
);
} catch (e) {
console.error("Failed to start concurrent run", e);
}
cURL
curl --request POST \
--url <DEPLOY<ENT_URL>>/threads/<THREAD_ID>/runs \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {\"messages\": [{\"role\": \"human\", \"content\": \"what\'s the weather in sf?\"}]},
}" && curl --request POST \
--url <DEPLOY<ENT_URL>>/threads/<THREAD_ID>/runs \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {\"messages\": [{\"role\": \"human\", \"content\": \"what\'s the weather in nyc?\"}]},
\"multitask_strategy\": \"reject\"
}" || { echo "Failed to start concurrent run"; echo "Error: $?" >&2; }
Output:
Failed to start concurrent run Client error '409 Conflict' for url 'http://localhost:8123/threads/f9e7088b-8028-4e5c-88d2-9cc9a2870e50/runs'
For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/409
查看运行结果
我们可以验证原始线程是否已完成执行:
Python
# wait until the original run completes
await client.runs.join(thread["thread_id"], run["run_id"])
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
m.pretty_print()
Javascript
await client.runs.join(thread["thread_id"], run["run_id"]);
const state = await client.threads.getState(thread["thread_id"]);
for (const m of state["values"]["messages"]) {
prettyPrint(m);
}
cURL
source pretty_print.sh && curl --request GET \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/<RUN_ID>/join && \
curl --request GET --url <DEPLOYMENT_URL>/threads/<THREAD_ID>/state | \
jq -c '.values.messages[]' | while read -r element; do
type=$(echo "$element" | jq -r '.type')
content=$(echo "$element" | jq -r '.content | if type == "array" then tostring else . end')
pretty_print "$type" "$content"
done
Output:
================================ Human Message =================================
what's the weather in sf?
================================== Ai Message ==================================
[{'id': 'toolu_01CyewEifV2Kmi7EFKHbMDr1', 'input': {'query': 'weather in san francisco'}, 'name': 'tavily_search_results_json', 'type': 'tool_use'}]
Tool Calls:
tavily_search_results_json (toolu_01CyewEifV2Kmi7EFKHbMDr1)
Call ID: toolu_01CyewEifV2Kmi7EFKHbMDr1
Args:
query: weather in san francisco
================================= Tool Message =================================
Name: tavily_search_results_json
[{"url": "https://www.accuweather.com/en/us/san-francisco/94103/june-weather/347629", "content": "Get the monthly weather forecast for San Francisco, CA, including daily high/low, historical averages, to help you plan ahead."}]
================================== Ai Message ==================================
According to the search results from Tavily, the current weather in San Francisco is:
The average high temperature in San Francisco in June is around 65°F (18°C), with average lows around 54°F (12°C). June tends to be one of the cooler and foggier months in San Francisco due to the marine layer of fog that often blankets the city during the summer months.
Some key points about the typical June weather in San Francisco:
- Mild temperatures with highs in the 60s F and lows in the 50s F
- Foggy mornings that often burn off to sunny afternoons
- Little to no rainfall, as June falls in the dry season
- Breezy conditions, with winds off the Pacific Ocean
- Layers are recommended for changing weather conditions
So in summary, you can expect mild, foggy mornings giving way to sunny but cool afternoons in San Francisco this time of year. The marine layer keeps temperatures moderate compared to other parts of California in June.
入队操作
https://langchain-ai.github.io/langgraph/cloud/how-tos/enqueue_concurrent/
本指南假设您已了解什么是双发短信,如需了解该概念,请参阅双发短信概念指南。
本指南将重点介绍双发短信中的enqueue
选项,该选项会将中断操作加入队列,并按客户端接收顺序执行。以下是一个使用enqueue
选项的简单示例。
安装配置
首先,我们会定义一个快速辅助函数来打印 JS 和 CURL 模型的输出(如果使用 Python 可以跳过这部分):
Javascript
function prettyPrint(m) {
const padded = " " + m['type'] + " ";
const sepLen = Math.floor((80 - padded.length) / 2);
const sep = "=".repeat(sepLen);
const secondSep = sep + (padded.length % 2 ? "=" : "");
* console.log(`${sep}${padded}${secondSep}`);
console.log("\n\n");
console.log(m.content);
}
CURL
# PLACE THIS IN A FILE CALLED pretty_print.sh
pretty_print() {
local type="$1"
local content="$2"
local padded=" $type "
local total_width=80
local sep_len=$(( (total_width - ${#padded}) / 2 ))
local sep=$(printf '=%.0s' $(eval "echo {1.."${sep_len}"}"))
local second_sep=$sep
if (( (total_width - ${#padded}) % 2 )); then
second_sep="${second_sep}="
fi
* echo "${sep}${padded}${second_sep}"
echo
echo "$content"
}
然后,我们导入所需的包并实例化客户端、助手和线程。
Python
import asyncio
import httpx
from langchain_core.messages import convert_to_messages
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)
# Using the graph deployed with the name "agent"
assistant_id = "agent"
thread = await client.threads.create()
JavaScript
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });
// Using the graph deployed with the name "agent"
const assistantId = "agent";
const thread = await client.threads.create();
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'
创建运行任务
现在我们将启动两个运行任务,其中第二个任务会使用"enqueue"多任务策略来中断第一个任务:
Python
first_run = await client.runs.create(
thread["thread_id"],
assistant_id,
input={"messages": [{"role": "user", "content": "what's the weather in sf?"}]},
)
second_run = await client.runs.create(
thread["thread_id"],
assistant_id,
input={"messages": [{"role": "user", "content": "what's the weather in nyc?"}]},
multitask_strategy="enqueue",
)
JavaScript
const firstRun = await client.runs.create(
thread["thread_id"],
assistantId,
input={"messages": [{"role": "user", "content": "what's the weather in sf?"}]},
)
const secondRun = await client.runs.create(
thread["thread_id"],
assistantId,
input={"messages": [{"role": "user", "content": "what's the weather in nyc?"}]},
multitask_strategy="enqueue",
)
cURL
curl --request POST \
--url <DEPLOY<ENT_URL>>/threads/<THREAD_ID>/runs \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {\"messages\": [{\"role\": \"human\", \"content\": \"what\'s the weather in sf?\"}]},
}" && curl --request POST \
--url <DEPLOY<ENT_URL>>/threads/<THREAD_ID>/runs \
--header 'Content-Type: application/json' \
--data "{
\"assistant_id\": \"agent\",
\"input\": {\"messages\": [{\"role\": \"human\", \"content\": \"what\'s the weather in nyc?\"}]},
\"multitask_strategy\": \"enqueue\"
}"
查看运行结果
验证线程是否包含两次运行的数据:
Python
# wait until the second run completes
await client.runs.join(thread["thread_id"], second_run["run_id"])
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
m.pretty_print()
Javascript
await client.runs.join(thread["thread_id"], secondRun["run_id"]);
const state = await client.threads.getState(thread["thread_id"]);
for (const m of state["values"]["messages"]) {
prettyPrint(m);
}
cURL
source pretty_print.sh && curl --request GET \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/<RUN_ID>/join && \
curl --request GET --url <DEPLOYMENT_URL>/threads/<THREAD_ID>/state | \
jq -c '.values.messages[]' | while read -r element; do
type=$(echo "$element" | jq -r '.type')
content=$(echo "$element" | jq -r '.content | if type == "array" then tostring else . end')
pretty_print "$type" "$content"
done
Output:
================================ Human Message =================================
what's the weather in sf?
================================== Ai Message ==================================
[{'id': 'toolu_01Dez1sJre4oA2Y7NsKJV6VT', 'input': {'query': 'weather in san francisco'}, 'name': 'tavily_search_results_json', 'type': 'tool_use'}]
Tool Calls:
tavily_search_results_json (toolu_01Dez1sJre4oA2Y7NsKJV6VT)
Call ID: toolu_01Dez1sJre4oA2Y7NsKJV6VT
Args:
query: weather in san francisco
================================= Tool Message =================================
Name: tavily_search_results_json
[{"url": "https://www.accuweather.com/en/us/san-francisco/94103/weather-forecast/347629", "content": "Get the current and future weather conditions for San Francisco, CA, including temperature, precipitation, wind, air quality and more. See the hourly and 10-day outlook, radar maps, alerts and allergy information."}]
================================== Ai Message ==================================
According to AccuWeather, the current weather conditions in San Francisco are:
Temperature: 57°F (14°C)
Conditions: Mostly Sunny
Wind: WSW 10 mph
Humidity: 72%
The forecast for the next few days shows partly sunny skies with highs in the upper 50s to mid 60s F (14-18°C) and lows in the upper 40s to low 50s F (9-11°C). Typical mild, dry weather for San Francisco this time of year.
Some key details from the AccuWeather forecast:
Today: Mostly sunny, high of 62°F (17°C)
Tonight: Partly cloudy, low of 49°F (9°C)
Tomorrow: Partly sunny, high of 59°F (15°C)
Saturday: Mostly sunny, high of 64°F (18°C)
Sunday: Partly sunny, high of 61°F (16°C)
So in summary, expect seasonable spring weather in San Francisco over the next several days, with a mix of sun and clouds and temperatures ranging from the upper 40s at night to the low 60s during the days. Typical dry conditions with no rain in the forecast.
================================ Human Message =================================
what's the weather in nyc?
================================== Ai Message ==================================
[{'text': 'Here are the current weather conditions and forecast for New York City:', 'type': 'text'}, {'id': 'toolu_01FFft5Sx9oS6AdVJuRWWcGp', 'input': {'query': 'weather in new york city'}, 'name': 'tavily_search_results_json', 'type': 'tool_use'}]
Tool Calls:
tavily_search_results_json (toolu_01FFft5Sx9oS6AdVJuRWWcGp)
Call ID: toolu_01FFft5Sx9oS6AdVJuRWWcGp
Args:
query: weather in new york city
================================= Tool Message =================================
Name: tavily_search_results_json
[{"url": "https://www.weatherapi.com/", "content": "{'location': {'name': 'New York', 'region': 'New York', 'country': 'United States of America', 'lat': 40.71, 'lon': -74.01, 'tz_id': 'America/New_York', 'localtime_epoch': 1718734479, 'localtime': '2024-06-18 14:14'}, 'current': {'last_updated_epoch': 1718733600, 'last_updated': '2024-06-18 14:00', 'temp_c': 29.4, 'temp_f': 84.9, 'is_day': 1, 'condition': {'text': 'Sunny', 'icon': '//cdn.weatherapi.com/weather/64x64/day/113.png', 'code': 1000}, 'wind_mph': 2.2, 'wind_kph': 3.6, 'wind_degree': 158, 'wind_dir': 'SSE', 'pressure_mb': 1025.0, 'pressure_in': 30.26, 'precip_mm': 0.0, 'precip_in': 0.0, 'humidity': 63, 'cloud': 0, 'feelslike_c': 31.3, 'feelslike_f': 88.3, 'windchill_c': 28.3, 'windchill_f': 82.9, 'heatindex_c': 29.6, 'heatindex_f': 85.3, 'dewpoint_c': 18.4, 'dewpoint_f': 65.2, 'vis_km': 16.0, 'vis_miles': 9.0, 'uv': 7.0, 'gust_mph': 16.5, 'gust_kph': 26.5}}"}]
================================== Ai Message ==================================
According to the weather data from WeatherAPI:
Current Conditions in New York City (as of 2:00 PM local time):
- Temperature: 85°F (29°C)
- Conditions: Sunny
- Wind: 2 mph (4 km/h) from the SSE
- Humidity: 63%
- Heat Index: 85°F (30°C)
The forecast shows sunny and warm conditions persisting over the next few days:
Today: Sunny, high of 85°F (29°C)
Tonight: Clear, low of 68°F (20°C)
Tomorrow: Sunny, high of 88°F (31°C)
Thursday: Mostly sunny, high of 90°F (32°C)
Friday: Partly cloudy, high of 87°F (31°C)
So New York City is experiencing beautiful sunny weather with seasonably warm temperatures in the mid-to-upper 80s Fahrenheit (around 30°C). Humidity is moderate in the 60% range. Overall, ideal late spring/early summer conditions for being outdoors in the city over the next several days.
Webhooks
https://langchain-ai.github.io/langgraph/cloud/concepts/webhooks/
Webhooks 支持从您的 LangGraph 平台应用程序到外部服务的事件驱动通信。例如,当对 LangGraph 平台的 API 调用完成运行后,您可能希望向单独的服务发出更新。
许多 LangGraph 平台端点接受 webhook
参数。如果某个能接受 POST 请求的端点指定了该参数,LangGraph 平台将在运行完成时发送请求。
更多详情请参阅对应的操作指南。
LangGraph Cloud Webhooks 使用指南
官方文档链接 : https://langchain-ai.github.io/langgraph/cloud/how-tos/webhooks/
使用 Webhooks
在使用 LangGraph 平台时,您可能希望通过 Webhooks 在 API 调用完成后接收更新。Webhooks 非常适合在运行处理完成后触发您服务中的操作。要实现此功能,您需要公开一个能接收 POST
请求的端点,并将该端点作为 webhook
参数传入您的 API 请求中。
目前 SDK 尚未内置对定义 Webhook 端点的支持,但您可以通过 API 请求手动指定它们。
支持的端点
以下API端点接受webhook
参数:
操作 | HTTP方法 | 端点 |
---|---|---|
创建运行 | POST |
/thread/{thread_id}/runs |
创建定时线程 | POST |
/thread/{thread_id}/runs/crons |
流式运行 | POST |
/thread/{thread_id}/runs/stream |
等待运行 | POST |
/thread/{thread_id}/runs/wait |
创建定时任务 | POST |
/runs/crons |
无状态流式运行 | POST |
/runs/stream |
无状态等待运行 | POST |
/runs/wait |
本指南将展示如何在流式运行后触发webhook。
设置助手与线程
在调用API之前,需要先设置好助手和线程。
Python
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)
assistant_id = "agent"
thread = await client.threads.create()
print(thread)
JavaScript
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });
const assistantID = "agent";
const thread = await client.threads.create();
console.log(thread);
CURL
curl --request POST \
--url <DEPLOYMENT_URL>/assistants/search \
--header 'Content-Type: application/json' \
--data '{ "limit": 10, "offset": 0 }' | jq -c 'map(select(.config == null or .config == {})) | .[0]' && \
curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'
示例响应:
{
"thread_id": "9dde5490-2b67-47c8-aa14-4bfec88af217",
"created_at": "2024-08-30T23:07:38.242730+00:00",
"updated_at": "2024-08-30T23:07:38.242730+00:00",
"metadata": {},
"status": "idle",
"config": {},
"values": null
}
通过 Webhook 与图运行结合使用
要使用 webhook,只需在 API 请求中指定 webhook
参数。当运行完成后,LangGraph 平台会向指定的 webhook URL 发送一个 POST
请求。
例如,如果你的服务器在 https://my-server.app/my-webhook-endpoint
监听 webhook 事件,可以在请求中包含以下内容:
Python
input = { "messages": [{ "role": "user", "content": "Hello!" }] }
async for chunk in client.runs.stream(
thread_id=thread["thread_id"],
assistant_id=assistant_id,
input=input,
stream_mode="events",
webhook="https://my-server.app/my-webhook-endpoint"
):
pass
JavaScript
const input = { messages: [{ role: "human", content: "Hello!" }] };
const streamResponse = client.runs.stream(
thread["thread_id"],
assistantID,
{
input: input,
webhook: "https://my-server.app/my-webhook-endpoint"
}
);
for await (const chunk of streamResponse) {
// Handle stream output
}
CURL
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/stream \
--header 'Content-Type: application/json' \
--data '{
"assistant_id": <ASSISTANT_ID>,
"input": {"messages": [{"role": "user", "content": "Hello!"}]},
"webhook": "https://my-server.app/my-webhook-endpoint"
}'
Webhook 有效载荷
LangGraph 平台以运行的格式发送 webhook 通知。详情请参阅API 参考文档。请求有效载荷包含运行输入、配置和其他元数据,这些信息都存储在 kwargs
字段中。
安全 Webhooks
为确保只有经过授权的请求才能访问您的 webhook 端点,建议添加一个安全令牌作为查询参数:
https://my-server.app/my-webhook-endpoint?token=YOUR_SECRET_TOKEN
您的服务器在处理请求前应提取并验证该令牌。
测试Webhooks
您可以使用以下在线服务测试您的webhook:
- Beeceptor - 快速创建测试端点并检查传入的webhook负载。
- Webhook.site - 实时查看、调试和记录传入的webhook请求。
这些工具可帮助您验证LangGraph Platform是否正确触发并向您的服务发送webhooks。
定时任务
在许多场景下,按计划运行智能助手非常有用。例如,假设您正在构建一个每日运行的助手,用于发送当天新闻的邮件摘要。您可以使用定时任务让该助手每天晚8点自动执行。
LangGraph平台支持用户自定义调度计划的定时任务。用户只需指定调度计划、助手及输入内容,服务器就会按照既定计划执行以下操作:
- 使用指定助手创建新会话线程
- 向该线程发送预设的输入内容
请注意,每次执行时都会向线程发送相同的输入内容。具体创建方法请参阅操作指南。
LangGraph平台API提供了多个用于创建和管理定时任务的端点,详见API参考文档。
使用定时任务
https://langchain-ai.github.io/langgraph/cloud/how-tos/cron_jobs/
有时您可能不希望基于用户交互来运行工作流,而是希望按计划调度工作流的执行——例如,如果您希望工作流每周自动生成并发送团队待办事项邮件。LangGraph平台通过Crons
客户端让您无需自行编写脚本就能实现这一功能。要调度一个工作流任务,您需要向客户端传递一个cron表达式来指定运行时间。Cron
任务会在后台运行,不会干扰工作流的正常调用。
设置
首先,让我们设置SDK客户端、助手和线程:
Python
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)
# Using the graph deployed with the name "agent"
assistant_id = "agent"
# create thread
thread = await client.threads.create()
print(thread)
JavaScript
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });
// Using the graph deployed with the name "agent"
const assistantId = "agent";
// create thread
const thread = await client.threads.create();
console.log(thread);
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/assistants/search \
--header 'Content-Type: application/json' \
--data '{
"limit": 10,
"offset": 0
}' | jq -c 'map(select(.config == null or .config == {})) | .[0].graph_id' && \
curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'
Output:
{
'thread_id': '9dde5490-2b67-47c8-aa14-4bfec88af217',
'created_at': '2024-08-30T23:07:38.242730+00:00',
'updated_at': '2024-08-30T23:07:38.242730+00:00',
'metadata': {},
'status': 'idle',
'config': {},
'values': None
}
线程上的定时任务
要创建与特定线程关联的定时任务,可以编写:
Python
# This schedules a job to run at 15:27 (3:27PM) every day
cron_job = await client.crons.create_for_thread(
thread["thread_id"],
assistant_id,
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "What time is it?"}]},
)
Javascript
// This schedules a job to run at 15:27 (3:27PM) every day
const cronJob = await client.crons.create_for_thread(
thread["thread_id"],
assistantId,
{
schedule: "27 15 * * *",
input: { messages: [{ role: "user", content: "What time is it?" }] }
}
);
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/crons \
--header 'Content-Type: application/json' \
--data '{
"assistant_id": <ASSISTANT_ID>,
}'
请注意,删除不再有用的 Cron
任务非常重要。否则可能会因调用LLM而产生不必要的API费用!你可以使用以下代码删除 Cron
任务:
Python
await client.crons.delete(cron_job["cron_id"])
Javascript
await client.crons.delete(cronJob["cron_id"]);
cURL
curl --request DELETE \
--url <DEPLOYMENT_URL>/runs/crons/<CRON_ID>
无状态定时任务
您也可以通过以下代码创建无状态定时任务:
Python
# This schedules a job to run at 15:27 (3:27PM) every day
cron_job_stateless = await client.crons.create(
assistant_id,
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "What time is it?"}]},
)
JavaScript
// This schedules a job to run at 15:27 (3:27PM) every day
const cronJobStateless = await client.crons.create(
assistantId,
{
schedule: "27 15 * * *",
input: { messages: [{ role: "user", content: "What time is it?" }] }
}
);
cURL
curl --request POST \
--url <DEPLOYMENT_URL>/runs/crons \
--header 'Content-Type: application/json' \
--data '{
"assistant_id": <ASSISTANT_ID>,
}'
**
再次提醒,任务完成后请务必删除你的作业!
Python
await client.crons.delete(cron_job_stateless["cron_id"])
JavaScript
await client.crons.delete(cronJobStateless["cron_id"]);
cURL
curl --request DELETE \
--url <DEPLOYMENT_URL>/runs/crons/<CRON_ID>
LangGraph 自定义生命周期使用指南
https://langchain-ai.github.io/langgraph/how-tos/http/custom_lifespan/
如何添加自定义生命周期事件
在将代理部署到LangGraph平台时,通常需要在服务器启动时初始化数据库连接等资源,并确保在关闭时正确释放它们。生命周期事件允许您挂钩到服务器的启动和关闭序列,以处理这些关键设置和拆卸任务。
这与添加自定义路由的工作方式相同。您只需提供自己的Starlette
应用(包括FastAPI
、FastHTML
和其他兼容应用)。
以下是使用FastAPI的示例。
仅限Python
目前我们仅支持在使用langgraph-api>=0.0.26
的Python部署中添加自定义生命周期事件。
创建应用
基于已有的LangGraph Platform应用,请将以下生命周期代码添加到您的webapp.py
文件中。若需从零开始,您可以使用CLI通过模板创建新应用。
langgraph new --template=new-langgraph-project-python my_new_project
当你拥有一个LangGraph项目时,添加以下应用代码:
# ./src/agent/webapp.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
@asynccontextmanager
async def lifespan(app: FastAPI):
# for example...
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
# Create reusable session factory
async_session = sessionmaker(engine, class_=AsyncSession)
# Store in app state
app.state.db_session = async_session
yield
# Clean up connections
await engine.dispose()
app = FastAPI(lifespan=lifespan)
# ... can add custom routes if needed.
配置 langgraph.json
将以下内容添加到你的 langgraph.json
配置文件中。确保路径指向你之前创建的 webapp.py
文件。
{
"dependencies": ["."],
"graphs": {
"agent": "./src/agent/graph.py:graph"
},
"env": ".env",
"http": {
"app": "./src/agent/webapp.py:app"
}
// Other configuration options like auth, store, etc.
}
启动服务器
在本地测试服务器:
langgraph dev --no-browser
当服务器启动时,您应该能看到打印的启动消息;当使用 Ctrl+C
停止服务器时,则会显示清理消息。
部署方式
您可以直接将应用部署到LangGraph平台,也可以选择自托管平台进行部署。
后续步骤
现在您已为部署添加了生命周期事件,可以采用类似技术来添加自定义路由或自定义中间件,进一步定制服务器行为。
如何添加自定义中间件
https://langchain-ai.github.io/langgraph/how-tos/http/custom_middleware/
在将代理部署到LangGraph平台时,您可以为服务器添加自定义中间件,用于处理诸如记录请求指标、注入或检查头部信息、执行安全策略等需求,而无需修改核心服务器逻辑。这与添加自定义路由的工作方式相同。您只需提供自己的Starlette
应用(包括FastAPI
、FastHTML
和其他兼容应用)。
添加中间件允许您在全局范围内拦截和修改请求与响应,无论这些请求是访问您的自定义端点还是LangGraph平台内置的API。
以下是一个使用FastAPI的示例。
仅限Python
目前我们仅支持在Python部署中使用langgraph-api>=0.0.26
版本添加自定义中间件。
创建应用
基于已有的LangGraph Platform应用程序,将以下中间件代码添加到你的webapp.py
文件中。如果是从零开始,你可以使用CLI从模板创建一个新应用。
langgraph new --template=new-langgraph-project-python my_new_project
当你拥有一个LangGraph项目后,请添加以下应用代码:
# ./src/agent/webapp.py
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
app = FastAPI()
class CustomHeaderMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
response.headers['X-Custom-Header'] = 'Hello from middleware!'
return response
# Add the middleware to the app
app.add_middleware(CustomHeaderMiddleware)
配置 langgraph.json
将以下内容添加到你的 langgraph.json
配置文件中。请确保路径指向你之前创建的 webapp.py
文件。
{
"dependencies": ["."],
"graphs": {
"agent": "./src/agent/graph.py:graph"
},
"env": ".env",
"http": {
"app": "./src/agent/webapp.py:app"
}
// Other configuration options like auth, store, etc.
}
启动服务器
在本地测试服务器:
langgraph dev --no-browser
现在,任何发往你服务器的请求都会在其响应中包含自定义头部 X-Custom-Header
。
部署
您可以将此应用直接部署到 LangGraph 平台,或部署到您自主托管的平台上。
后续步骤
现在您已为部署添加了自定义中间件,可以使用类似技术来添加自定义路由或定义自定义生命周期事件,进一步定制服务器行为。
如何添加自定义路由
https://langchain-ai.github.io/langgraph/how-tos/http/custom_routes/
当将代理部署到LangGraph平台时,您的服务器会自动暴露以下路由:创建运行和线程、与长期记忆存储交互、管理可配置助手以及其他核心功能(查看所有默认API端点)。
您可以通过提供自己的Starlette
应用(包括FastAPI
、FastHTML
和其他兼容应用)来添加自定义路由。通过在langgraph.json
配置文件中指定应用路径,即可让LangGraph平台识别这些路由。
定义自定义应用对象允许您添加任何所需的路由,因此您可以实现从添加/login
端点到编写完整全栈Web应用的所有功能,所有这些都部署在单个LangGraph服务器中。
下面是一个使用FastAPI的示例。
创建应用
如果基于已有的LangGraph平台应用进行开发,请将以下自定义路由代码添加到你的webapp.py
文件中。若需从零开始创建应用,可通过CLI工具使用模板新建应用。
langgraph new --template=new-langgraph-project-python my_new_project
当你拥有一个LangGraph项目后,添加以下应用代码:
# ./src/agent/webapp.py
from fastapi import FastAPI
app = FastAPI()
@app.get("/hello")
def read_root():
return {"Hello": "World"}
配置 langgraph.json
将以下内容添加到你的 langgraph.json
配置文件中。确保路径指向你之前创建的 app.py
文件。
{
"dependencies": ["."],
"graphs": {
"agent": "./src/agent/graph.py:graph"
},
"env": ".env",
"http": {
"app": "./src/agent/webapp.py:app"
}
// Other configuration options like auth, store, etc.
}
启动服务器
在本地测试服务器:
langgraph dev --no-browser
在浏览器中访问 localhost:2024/hello
(2024
是默认开发端口),您应该会看到 /hello
端点返回 {"Hello": "World"}
。
覆盖默认端点
应用中创建的路由会优先于系统默认路由,这意味着您可以覆盖并重新定义任何默认端点的行为。
部署指南
您可以直接将此应用部署到 LangGraph 平台,也可以部署到自托管平台。
后续步骤
现在您已为部署添加了自定义路由,可以运用相同技术进一步定制服务器行为,例如定义自定义中间件和自定义生命周期事件。