创建一个简单的工作流:Start ——> 节点1(固定输入输出) ——> End
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from typing import Annotated
history_messages = [
SystemMessage(content="你是一个聊天助理"),
HumanMessage(content="你好,你能介绍下你自己吗?"),
AIMessage(content="我是一个小模型聊天助理"),
]
messages = history_messages + [HumanMessage(content="今天天气怎么样?")]
class MyState(TypedDict):
messages:Annotated[list,""]
def node1(state:MyState):
return {"messages": AIMessage(content="今天天气很好")}
graph_builder = StateGraph(MyState)
graph_builder.add_node("node1", node1)
graph_builder.add_edge(START, "node1")
graph_builder.add_edge("node1", END)
graph = graph_builder.compile()
response = graph.stream({"messages": messages})
for event in response:
print(event)
创建一个简单的工作流:Start ——> 节点1(固定输入输出) ——> 节点2(固定输入输出) ——> End
同时数据存到Mastate的数组里传递给下一个节点
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from typing import Annotated
from langgraph.graph.message import add_messages
history_messages = [
SystemMessage(content="你是一个聊天助理"),
HumanMessage(content="你好,你能介绍下你自己吗?"),
AIMessage(content="我是一个小模型聊天助理"),
]
messages = history_messages + [HumanMessage(content="今天天气怎么样?")]
class MyState(TypedDict):
messages:Annotated[list,add_messages]
def node1(state:MyState):
return {"messages": AIMessage(content="今天天气很好")}
def node2(state:MyState):
return {"messages": AIMessage(content="明天天气也很好")}
graph_builder = StateGraph(MyState)
graph_builder.add_node("node1", node1)
graph_builder.add_node("node2", node2)
graph_builder.add_edge("node1", "node2")
graph_builder.add_edge(START, "node1")
graph_builder.add_edge("node2", END)
graph = graph_builder.compile()
response = graph.stream({"messages": messages})
for event in response:
print(event)
创建一个简单的会话工作流:Start ——> 会话节点1 ——> End
from langchain.chat_models import init_chat_model
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from typing import Annotated
from langgraph.graph.message import add_messages
class MyState(TypedDict):
messages: Annotated[list, add_messages]
llm = init_chat_model(
"deepseek-r1:7b",
model_provider="ollama")
def chatbot(state: MyState):
return {"messages": llm.invoke(state["messages"])}
graph_builder = StateGraph(MyState)
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)
graph = graph_builder.compile()
response = graph.stream({"messages": "你好"})
for event in response:
if "chatbot" in event:
value = event["chatbot"]
if "messages" in value and isinstance(value["messages"], AIMessage):
print(event["chatbot"]["messages"].content)
用langChain创建一个简单的Rag :
需要再安装两个包:
pip install pypdf
pip install dashscope
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
# 加载文档
loader = PyPDFLoader("./data/你的PDF文档.pdf")
pages = loader.load_and_split()
# 文档切片
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=200,
length_function=len,
add_start_index=True,
)
texts = text_splitter.create_documents(
[page.page_content for page in pages[:2]],
)
# 灌库
embeddings = DashScopeEmbeddings(model="text-embedding-v1",dashscope_api_key="你的阿里云百炼平台APIkey")
db = FAISS.from_documents(texts, embeddings)
# 检索 top-5 结果
retriever = db.as_retriever(search_kwargs={"k": 5})
docs=retriever.invoke("文档相关的问题")
print(docs)
作为一个条件节点放到会话工作流会话节点前:Start——> 条件Rag节点 ——> 会话节点1 ——> End
from langchain.chat_models import init_chat_model
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
llm = init_chat_model(
"deepseek-r1:7b",
model_provider="ollama")
def chatbot(state: MessagesState):
return {"messages": llm.invoke(state["messages"])}
def retriever(state: MessagesState):
question = state["messages"][-1].content
messages = "请参照一下上下:"
# 加载文档
loader = PyPDFLoader("./data/你的PDF文档.pdf")
pages = loader.load_and_split()
# 文档切片
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=200,
length_function=len,
add_start_index=True,
)
texts = text_splitter.create_documents(
[page.page_content for page in pages[:2]],
)
# 灌库
embeddings = DashScopeEmbeddings(model="text-embedding-v1", dashscope_api_key="你的apikey")
db = FAISS.from_documents(texts, embeddings)
# 检索 top-5 结果
dbRetriever = db.as_retriever(search_kwargs={"k": 5})
docs = dbRetriever.invoke(question)
for doc in docs:
messages = messages + doc.page_content
messages = messages + "回答问题:" + question
return {"messages": HumanMessage(content=messages)}
graph_builder = StateGraph(MessagesState)
graph_builder.add_node("retriever", retriever)
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "retriever")
graph_builder.add_edge("retriever", "chatbot")
graph_builder.add_edge("chatbot", END)
graph = graph_builder.compile()
response = graph.stream({"messages": "你的问题?"})
for event in response:
if "chatbot" in event:
value = event["chatbot"]
if "messages" in value and isinstance(value["messages"], AIMessage):
print(event["chatbot"]["messages"].content)