every blog every motto: You can do more than you think.
https://blog.csdn.net/weixin_39190382?type=blog
0. 前言
langgraph 基础
1. Chatbot实现
# !pip install langchain
# !pip install langgraph
from typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages# 定义 State
class State(TypedDict):# 状态变量 messages 类型是 list,更新方式是 add_messages# add_messages 是内置的一个方法,将新的消息列表追加在原列表后面messages: Annotated[list, add_messages]# 创建 Graph
graph_builder = StateGraph(State)
from langchain.chat_models import init_chat_model
# llm = init_chat_model("gpt-4o", model_provider="openai")
llm = init_chat_model("deepseek-chat", model_provider="deepseek")# 定义一个执行节点
# 输入是 State,输出是系统回复
def chatbot(state: State):# 调用大模型,并返回消息(列表)# 返回值会触发状态更新 add_messagesreturn {"messages": [llm.invoke(state["messages"])]}graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)graph = graph_builder.compile()
Adding a node to a graph that has already been compiled. This will not be reflected in the compiled graph.---------------------------------------------------------------------------ValueError Traceback (most recent call last)Cell In[9], line 127 def chatbot(state: State):8 # 调用大模型,并返回消息(列表)9 # 返回值会触发状态更新 add_messages10 return {"messages": [llm.invoke(state["messages"])]}
---> 12 graph_builder.add_node("chatbot", chatbot)13 graph_builder.add_edge(START, "chatbot")14 graph_builder.add_edge("chatbot", END)File c:\Users\13010\miniconda3\envs\py12\Lib\site-packages\langgraph\graph\state.py:456, in StateGraph.add_node(self, node, action, defer, metadata, input_schema, retry_policy, cache_policy, destinations, **kwargs)454 raise RuntimeError455 if node in self.nodes:
--> 456 raise ValueError(f"Node `{node}` already present.")457 if node == END or node == START:458 raise ValueError(f"Node `{node}` is reserved.")ValueError: Node `chatbot` already present.
from IPython.display import Image, display# 可视化展示这个工作流
try:display(Image(data=graph.get_graph().draw_mermaid_png()))
except Exception as e:print(e)
from langchain.schema import AIMessage def stream_graph_updates(user_input: str):# 向 graph 传入一条消息(触发状态更新 add_messages)for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}):for value in event.values():if "messages" in value and isinstance(value["messages"][-1], AIMessage):print("Assistant:", value["messages"][-1].content)def run():# 执行这个工作流while True:user_input = input("User: ")if user_input.strip() == "":breakstream_graph_updates(user_input)
run()
Assistant: 你好!😊 很高兴见到你~有什么我可以帮你的吗?
Assistant: 我是DeepSeek Chat,由深度求索公司(DeepSeek)研发的智能AI助手!🤖✨ 我的使命是帮助你解答问题、提供信息、陪你聊天,甚至帮你处理各种文本和文件。无论是学习、工作,还是日常生活中的疑问,都可以来问我!😊 有什么我可以帮你的吗?
2. RAG
# !pip install -U langchain-community pymupdf
# !pip install dashscope
# !pip install faiss-cpu
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_community.document_loaders import PyMuPDFLoader# 加载文档
loader = PyMuPDFLoader("./data/deepseek-v3-1-4.pdf")
pages = loader.load_and_split()# 文档切分
text_splitter = RecursiveCharacterTextSplitter(chunk_size=512,chunk_overlap=200,length_function=len,add_start_index=True,
)texts = text_splitter.create_documents([page.page_content for page in pages[:2]]
)# 灌库
embeddings = DashScopeEmbeddings(model="text-embedding-v1")
db = FAISS.from_documents(texts, embeddings)# 检索 top-5 结果
retriever = db.as_retriever(search_kwargs={"k": 5})
from langchain.prompts import ChatPromptTemplate, HumanMessagePromptTemplate# Prompt模板
template = """请根据对话历史和下面提供的信息回答上面用户提出的问题:
{query}
"""
prompt = ChatPromptTemplate.from_messages([HumanMessagePromptTemplate.from_template(template),]
)
def retrieval(state: State):user_query = ""if len(state["messages"]) >= 1:# 获取最后一轮用户输入user_query = state["messages"][-1]else:return {"messages": []}# 检索docs = retriever.invoke(str(user_query))# 填 prompt 模板messages = prompt.invoke("\n".join([doc.page_content for doc in docs])).messagesreturn {"messages": messages}
graph_builder = StateGraph(State)
graph_builder.add_node("retrieval", retrieval)
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "retrieval")
graph_builder.add_edge("retrieval","chatbot")
graph_builder.add_edge("chatbot", END)graph = graph_builder.compile()
from IPython.display import Image, display# 可视化展示这个工作流
try:display(Image(data=graph.get_graph().draw_mermaid_png()))
except Exception as e:print(e)
run()
Assistant: DeepSeek-V3 是一个大型混合专家模型(Mixture-of-Experts, MoE),总参数量为 **6710 亿(671B)**,其中每个 token 激活的参数量为 **370 亿(37B)**。
3. 加入分支:若找不到答案转人工
from langchain.schema import HumanMessage
from typing import Literal
from langgraph.types import interrupt, Command# 校验
def verify(state: State)-> Literal["chatbot","ask_human"]:message = HumanMessage("请根据对话历史和上面提供的信息判断,已知的信息是否能够回答用户的问题。直接输出你的判断'Y'或'N'")ret = llm.invoke(state["messages"]+[message])if 'Y' in ret.content:return "chatbot"else:return "ask_human"# 人工处理
def ask_human(state: State):user_query = state["messages"][-2].contenthuman_response = interrupt({"question": user_query})# Update the state with the human's input or route the graph based on the input.return {"messages": [AIMessage(human_response)]}
from langgraph.checkpoint.memory import MemorySaver# 用于持久化存储 state (这里以内存模拟)
# 生产中可以使用 Redis 等高性能缓存中间件
memory = MemorySaver()graph_builder = StateGraph(State)graph_builder.add_node("retrieval", retrieval)
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_node("ask_human", ask_human)graph_builder.add_edge(START, "retrieval")
graph_builder.add_conditional_edges("retrieval", verify)
graph_builder.add_edge("ask_human", END)
graph_builder.add_edge("chatbot", END)# 中途会被转人工打断,所以需要 checkpointer 存储状态
graph = graph_builder.compile(checkpointer=memory)
from langchain.schema import AIMessage # 当使用 checkpointer 时,需要配置读取 state 的 thread_id
# 可以类比 OpenAI Assistants API 理解,或者想象 Redis 中的 key
thread_config = {"configurable": {"thread_id": "my_thread_id"}}def stream_graph_updates(user_input: str):# 向 graph 传入一条消息(触发状态更新 add_messages)for event in graph.stream({"messages": [{"role": "user", "content": user_input}]},thread_config):for value in event.values():if isinstance(value, tuple):return value[0].value["question"]elif "messages" in value and isinstance(value["messages"][-1], AIMessage):print("Assistant:", value["messages"][-1].content)return Nonereturn Nonedef resume_graph_updates(human_input: str):for event in graph.stream(Command(resume=human_input), thread_config, stream_mode="updates"):for value in event.values():if "messages" in value and isinstance(value["messages"][-1], AIMessage):print("Assistant:", value["messages"][-1].content)
def run():# 执行这个工作流while True:user_input = input("User: ")if user_input.strip() == "":breakquestion = stream_graph_updates(user_input)if question:human_answer = input("Ask Human: "+question+"\nHuman: ")resume_graph_updates(human_answer)
from IPython.display import Image, display# 可视化展示这个工作流
try:display(Image(data=graph.get_graph().draw_mermaid_png()))
except Exception as e:print(e)
run()
Assistant: DeepSeek-V3 是一个大型混合专家模型(MoE),总参数量为 **6710亿(671B)**,其中每个 token 激活的参数量为 **370亿(37B)**。 (根据论文 arXiv:2412.19437v2 提供的信息,DeepSeek-V3 的架构明确标注为 671B 参数规模。)
Assistant: 090
LangGraph 还支持:
- 工具调用
- 并行处理
- 状态持久化
- 对话历史管理
- 历史动作回放(用于调试与测试)
- 子图管理
- 多智能体协作
- …
更多关于 LangGraph 的 HowTo,参考官方文档:https://langchain-ai.github.io/langgraph/how-tos