【NLP】使用 LangGraph 构建 RAG 的Research Multi-Agent

本文中,我们介绍了一个使用LangGraph开发的RAG的Research Multi-Agent工具的实际项目。该工具旨在解决需要多个来源和迭代步骤才能得出最终答案的复杂问题。它使用混合搜索和rerank步骤来检索文档,还结合了自我纠正机制,包括幻觉检查过程,以提高响应可靠性,使其成为企业应用程序的理想选择。

一、简介 — Naive 与 Agentic RAG

对于真实的项目的而言,简单的 RAG 方法是不够的,原因如下:

  • 无法理解复杂的查询:无法将复杂的查询分解为多个可管理的子步骤,无法在单一级别处理查询,而不是分析每个步骤并得出统一的结论。
  • 缺乏幻觉或错误处理:简单的 RAG 管道缺乏响应验证步骤和处理幻觉的机制,从而阻止它们通过生成新的响应来纠正错误。
  • 缺乏动态工具使用:Naive RAG 系统不允许使用工具、调用外部 API 或根据工作流条件与数据库交互。

 因此,我们实施了一个多智能体 RAG 系统来解决所有这些问题。事实上,基于智能体的框架可以实现以下功能:

  • 路由和使用工具:路由代理可以对用户的查询进行分类,并将流程引导至适当的节点或工具。这可以实现基于上下文的决策,例如确定文档是否需要完整摘要、是否需要更详细的信息,或者问题是否超出范围。
  • 规划子步骤:复杂的查询通常需要分解成更小、更易于管理的步骤。从查询开始,可以生成一系列步骤,以便在探索查询的不同方面得出结论。例如,如果查询需要比较文档的两个不同部分,基于代理的方法可以识别这种比较需求,分别检索两个源,并将它们合并到最终响应的比较分析中。
  • 反射与纠错:除了简单的响应生成之外,基于代理的方法还可以添加验证步骤,以解决潜在的幻觉、错误或无法准确回答用户查询的响应。这还可以将自我纠正机制与人机交互相结合,将人工输入融入自动化流程。这些功能使基于代理的 RAG 系统成为企业应用更稳健、更可靠的解决方案,因为可靠性是企业应用的首要任务。
  • 共享全局状态:代理 工作流共享全局状态,简化跨多个步骤的状态管理。此共享状态对于维护多代理流程不同阶段的一致性至关重要。

二、项目概述

代理 RAG 图

图表步骤:

  1. 分析和路由查询(自适应 RAG):用户的查询被分类并路由到适当的节点。从那里,系统可以进入下一步(“研究计划生成”),向用户请求更多信息,或者在查询超出范围时立即响应
  2. 研究计划生成:系统会根据请求的复杂程度,生成一个分步研究计划,其中包含一个或多个步骤。然后,系统会返回解决用户问题所需的具体步骤列表。
  3. 研究子图:研究计划生成中定义的每个步骤都会调用一个子图。具体来说,子图首先通过 LLM 生成两个查询。接下来,系统使用集成检索器(使用相似性搜索BM25MMR)检索与这些生成的查询相关的文档。然后,在“重新排序”步骤中应用基于 Cohere 的上下文压缩,最终得出所有步骤中排名前k的相关文档及其相关分数。
  4. 生成步骤:根据相关文档,该工具通过 LLM 生成答案。
  5. 幻觉检查(基于人机交互的自校正 RAG):系统会进行反思步骤,分析生成的答案,以确定其是否与提供的上下文相符,并涵盖所有方面。如果检查失败,则图表工作流程将中断,并提示用户生成修改后的答案或结束流程。

对于向量存储的创建,使用DoclingLangChain实现了基于段落的分块方法,并使用ChromaDB构建了向量数据库。

 

构建向量数据库

文档解析

对于结构复杂的 PDF(包括布局复杂的表格),必须谨慎选择用于解析的工具。许多库在处理页面布局或表格结构复杂的 PDF 时缺乏精确度。

为了解决这个问题,我们使用了开源库Docling 。它能够直接高效地解析文档,并支持导出为所需格式。它可以读取各种常用文档格式,包括 PDF、DOCX、PPTX、XLSX、图像、HTML、AsciiDoc 和 Markdown,并将其导出为 Markdown 和 JSON 格式。Docling 能够全面理解 PDF 文档,包括表格结构、阅读顺序和页面布局。此外,它还支持对扫描 PDF 进行 OCR 识别。

然后将 PDF 中包含的文本转换为 Markdown 格式,这对于遵循基于段落的结构的分块是必要的。

from docling.document_converter import DocumentConverterlogger.info("Starting document processing.")
converter = DocumentConverter()
markdown_document = converter.convert(source).document.export_to_markdown()

提取的文本将具有类似于下图的结构。可以看出,PDF 和表格解析提取的文本保留了原始格式。

根据标题和使用MarkdownHeaderTextSplitter,输出文本随后被分成几块,得到 332 个Document对象的列表(LangChain 文档)。

from langchain_text_splitters import MarkdownHeaderTextSplitterheaders_to_split_on = [("#", "Header 1"),("##", "Header 2")
]markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on)
docs_list = markdown_splitter.split_text(markdown_document)
docs_list
# Output example
[Document(metadata={'Header 2': 'A letter from our Chief Sustainability Officer and our Senior Vice President of Learning and Sustainability'}, page_content="...."),
...]# len(docs_list):
332

向量数据库构建

我们构建一个向量数据库,将句子存储为向量嵌入,并在该数据库中进行搜索。在本例中,我们使用Chroma,并在本地目录“ db_vector”中存储一个持久数据库。

from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddingsembd = OpenAIEmbeddings()vectorstore_from_documents = Chroma.from_documents(documents=docs_list,collection_name="rag-chroma-google-v1",embedding=embd,persist_directory='db_vector'
)

基于LangGraph主图构建

实施的系统包括两个图表:

  • 研究人员图表作为子图,其任务是生成不同的查询,用于从向量数据库中检索和重新排序前 k 个文档。
  • 主图,包含主要工作流程,例如分析用户的查询、生成完成任务所需的步骤、产生响应以及使用人在环机制检查幻觉。

 LangGraph 图形预览

LangGraph的核心概念之一是状态。每次图执行都会创建一个状态,该状态在图中节点执行时传递,每个节点执行后都会用其返回值更新此内部状态。

让我们从构建图形状态开始这个项目。为了实现这一点,我们定义两个类:

  • Router: 包含将用户查询分类为以下类别之一的结果:“更多信息”,“环境”或“一般”。
  • GradeHallucination:包含一个二进制分数,表示反应中是否存在幻觉。
from pydantic import BaseModel, Fieldclass Router(TypedDict):"""Classify user query."""logic: strtype: Literal["more-info", "environmental", "general"]from pydantic import BaseModel, Fieldclass GradeHallucinations(BaseModel):"""Binary score for hallucination present in generation answer."""binary_score: str = Field(description="Answer is grounded in the facts, '1' or '0'")

定义的图形状态为:
InputState:包括用户和代理之间交换的消息列表。
AgentState:包含Router对用户查询的分类、研究计划中要执行的步骤列表、代理可以引用的检索文档列表以及二元评分Gradehuciation。

from dataclasses import dataclass, field
from typing import Annotated, Literal, TypedDict
from langchain_core.documents import Document
from langchain_core.messages import AnyMessage
from langgraph.graph import add_messages
from utils.utils import reduce_docs@dataclass(kw_only=True)
class InputState:"""Represents the input state for the agent.This class defines the structure of the input state, which includesthe messages exchanged between the user and the agent. It serves asa restricted version of the full State, providing a narrower interfaceto the outside world compared to what is maintained iternally."""messages: Annotated[list[AnyMessage], add_messages]"""Messages track the primary execution state of the agent.Typically accumulates a pattern of Human/AI/Human/AI messages.Returns:A new list of messages with the messages from `right` merged into `left`.If a message in `right` has the same ID as a message in `left`, themessage from `right` will replace the message from `left`."""# Primary agent state
@dataclass(kw_only=True)
class AgentState(InputState):"""State of the retrieval graph / agent."""router: Router = field(default_factory=lambda: Router(type="general", logic=""))"""The router's classification of the user's query."""steps: list[str] = field(default_factory=list)"""A list of steps in the research plan."""documents: Annotated[list[Document], reduce_docs] = field(default_factory=list)"""Populated by the retriever. This is a list of documents that the agent can reference."""hallucination: GradeHallucinations = field(default_factory=lambda: GradeHallucinations(binary_score="0"))

步骤1:分析和路线查询


函数analyze_and_route_query返回并更新状态AgentState的路由器变量。函数route_query根据之前的查询分类确定下一步
具体来说,此步骤使用Router对象更新状态,该Router对象的类型变量包含以下值之一:“更多信息”、“环境”或“常规”。根据此信息,工作流将被路由到适当的节点(“create_research_plan”、“ask_for_more_info”或“respond_to_general_query”之一)。

async def analyze_and_route_query(state: AgentState, *, config: RunnableConfig
) -> dict[str, Router]:"""Analyze the user's query and determine the appropriate routing.This function uses a language model to classify the user's query and decide how to route itwithin the conversation flow.Args:state (AgentState): The current state of the agent, including conversation history.config (RunnableConfig): Configuration with the model used for query analysis.Returns:dict[str, Router]: A dictionary containing the 'router' key with the classification result (classification type and logic)."""model = ChatOpenAI(model=GPT_4o, temperature=TEMPERATURE, streaming=True)messages = [{"role": "system", "content": ROUTER_SYSTEM_PROMPT}] + state.messageslogging.info("---ANALYZE AND ROUTE QUERY---")response = cast(Router, await model.with_structured_output(Router).ainvoke(messages))return {"router": response}def route_query(state: AgentState,
) -> Literal["create_research_plan", "ask_for_more_info", "respond_to_general_query"]:"""Determine the next step based on the query classification.Args:state (AgentState): The current state of the agent, including the router's classification.Returns:Literal["create_research_plan", "ask_for_more_info", "respond_to_general_query"]: The next step to take.Raises:ValueError: If an unknown router type is encountered."""_type = state.router["type"]if _type == "environmental":return "create_research_plan"elif _type == "more-info":return "ask_for_more_info"elif _type == "general":return "respond_to_general_query"else:raise ValueError(f"Unknown router type {_type}")

 问题的输出示例:“Retrieve the data center PUE efficiency value in Dublin in 2019”

{"logic":"This is a specific question about the environmental efficiency of a data center in Dublin in 2019, which relates to the Environmental Report.","type":"environmental"
}

步骤1.1超出范围/需要更多信息


然后,我们定义了函数ask_for_more_info和respond_to_general_query,它们通过调用LLM直接为用户生成响应:如果路由器确定需要用户提供更多信息,则将执行第一个函数,而第二个函数则生成对与我们的主题无关的一般查询的响应。在这种情况下,有必要将生成的响应与消息列表连接起来,更新状态中的消息变量。

async def ask_for_more_info(state: AgentState, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:"""Generate a response asking the user for more information.This node is called when the router determines that more information is needed from the user.Args:state (AgentState): The current state of the agent, including conversation history and router logic.config (RunnableConfig): Configuration with the model used to respond.Returns:dict[str, list[str]]: A dictionary with a 'messages' key containing the generated response."""model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)system_prompt = MORE_INFO_SYSTEM_PROMPT.format(logic=state.router["logic"])messages = [{"role": "system", "content": system_prompt}] + state.messagesresponse = await model.ainvoke(messages)return {"messages": [response]}async def respond_to_general_query(state: AgentState, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:"""Generate a response to a general query not related to environmental.This node is called when the router classifies the query as a general question.Args:state (AgentState): The current state of the agent, including conversation history and router logic.config (RunnableConfig): Configuration with the model used to respond.Returns:dict[str, list[str]]: A dictionary with a 'messages' key containing the generated response."""model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)system_prompt = GENERAL_SYSTEM_PROMPT.format(logic=state.router["logic"])logging.info("---RESPONSE GENERATION---")messages = [{"role": "system", "content": system_prompt}] + state.messagesresponse = await model.ainvoke(messages)return {"messages": [response]}

输出示例:“What’s the weather like in Altamura?”

{"logic":"What's the weather like in Altamura?","type":"general"
}
# ---RESPONSE GENERATION---
"I appreciate your question, but I'm unable to provide information about the weather. My focus is on Environmental Reports. If you have any questions related to that topic, please let me know, and I'll be happy to help!"

步骤2:制定research 计划


如果查询分类返回值“environmental”,则用户的请求在文档的范围内,工作流将到达create_research_plan节点,该节点的功能为回答与环境相关的查询创建一个逐步的研究计划。

async def create_research_plan(state: AgentState, *, config: RunnableConfig
) -> dict[str, list[str] | str]:"""Create a step-by-step research plan for answering a environmental-related query.Args:state (AgentState): The current state of the agent, including conversation history.config (RunnableConfig): Configuration with the model used to generate the plan.Returns:dict[str, list[str]]: A dictionary with a 'steps' key containing the list of research steps."""class Plan(TypedDict):"""Generate research plan."""steps: list[str]model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)messages = [{"role": "system", "content": RESEARCH_PLAN_SYSTEM_PROMPT}] + state.messageslogging.info("---PLAN GENERATION---")response = cast(Plan, await model.with_structured_output(Plan).ainvoke(messages))return {"steps": response["steps"], "documents": "delete"}

Output example to the question “Retrieve the data center PUE efficiency value in Dublin in 2019”:

{"steps":["Look up the PUE (Power Usage Effectiveness) efficiency value for data centers specifically in Dublin for the year 2019 using statistical data sources."]
}

 在这种情况下,用户的请求只需要一步即可检索信息。


步骤3:进行research


此功能从研究计划中迈出第一步,并使用它进行研究。对于研究,该函数调用子图researcher_graph,它返回一个块列表,我们将在下一节中探索。最后,我们通过删除刚刚执行的步骤来更新状态中的steps变量。

async def conduct_research(state: AgentState) -> dict[str, Any]:"""Execute the first step of the research plan.This function takes the first step from the research plan and uses it to conduct research.Args:state (AgentState): The current state of the agent, including the research plan steps.Returns:dict[str, list[str]]: A dictionary with 'documents' containing the research results and'steps' containing the remaining research steps.Behavior:- Invokes the researcher_graph with the first step of the research plan.- Updates the state with the retrieved documents and removes the completed step."""result = await researcher_graph.ainvoke({"question": state.steps[0]}) #graph call directlydocs = result["documents"]step = state.steps[0]logging.info(f"\n{len(docs)} documents retrieved in total for the step: {step}.")return {"documents": result["documents"], "steps": state.steps[1:]}

步骤4:Researcher子图构建

 如上图所示,该图由查询生成步骤和相关块的检索步骤组成,查询生成步骤从主图传递的步骤开始。正如我们对主图所做的那样,让我们继续定义状态QueryState(研究员图中retrieve_documents节点的私有状态)和ResearcherState(researcher图的状态)。

"""States for the researcher subgraph.This module defines the state structures used in the researcher subgraph.
"""from dataclasses import dataclass, field
from typing import Annotated
from langchain_core.documents import Document
from utils.utils import reduce_docs@dataclass(kw_only=True)
class QueryState:"""Private state for the retrieve_documents node in the researcher graph."""query: str@dataclass(kw_only=True)
class ResearcherState:"""State of the researcher graph / agent."""question: str"""A step in the research plan generated by the retriever agent."""queries: list[str] = field(default_factory=list)"""A list of search queries based on the question that the researcher generates."""documents: Annotated[list[Document], reduce_docs] = field(default_factory=list)"""Populated by the retriever. This is a list of documents that the agent can reference."""

步骤4.1:生成查询


此步骤基于问题生成搜索查询(research计划中的一个步骤)。此函数使用LLM生成各种搜索查询来帮助回答问题。

async def generate_queries(state: ResearcherState, *, config: RunnableConfig
) -> dict[str, list[str]]:"""Generate search queries based on the question (a step in the research plan).This function uses a language model to generate diverse search queries to help answer the question.Args:state (ResearcherState): The current state of the researcher, including the user's question.config (RunnableConfig): Configuration with the model used to generate queries.Returns:dict[str, list[str]]: A dictionary with a 'queries' key containing the list of generated search queries."""class Response(TypedDict):queries: list[str]logger.info("---GENERATE QUERIES---")model = ChatOpenAI(model="gpt-4o-mini-2024-07-18", temperature=0)messages = [{"role": "system", "content": GENERATE_QUERIES_SYSTEM_PROMPT},{"role": "human", "content": state.question},]response = cast(Response, await model.with_structured_output(Response).ainvoke(messages))queries = response["queries"]queries.append(state.question)logger.info(f"Queries: {queries}")return {"queries": response["queries"]}

Output example to the question “Retrieve the data center PUE efficiency value in Dublin in 2019”:

{"queries":["Look up the PUE (Power Usage Effectiveness) efficiency value for data centers specifically in Dublin for the year 2019 using statistical data sources.""PUE efficiency value data centers Dublin 2019","Power Usage Effectiveness statistics data centers Dublin 2019"]
}

生成查询后,我们可以使用前面定义的持久数据库定义向量库。

def _setup_vectorstore() -> Chroma:"""Set up and return the Chroma vector store instance."""embeddings = OpenAIEmbeddings()return Chroma(collection_name=VECTORSTORE_COLLECTION,embedding_function=embeddings,persist_directory=VECTORSTORE_DIRECTORY)

在RAG系统中,最关键的部分是文档检索过程。因此,人们对所使用的技术给予了极大的关注:具体来说,选择了一个集合检索器作为混合搜索和Coherer进行重新排序。

混合搜索是“关键字风格”搜索和“矢量风格”搜索的组合。它具有进行关键字搜索的优点,以及从嵌入和向量搜索中获得的语义搜索的优点。Ensemble Retriever是一种检索算法,旨在通过结合多个单独检索器的优势来提高信息检索的性能。这种方法被称为“集成检索”,它使用一种称为往复式秩融合的方法来重新排序和合并来自不同检索器的结果,从而提供比单独使用任何单个检索器更准确和相关的结果。

# Create base retrievers
retriever_bm25 = BM25Retriever.from_documents(documents, search_kwargs={"k": TOP_K})
retriever_vanilla = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": TOP_K})
retriever_mmr = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": TOP_K})ensemble_retriever = EnsembleRetriever(retrievers=[retriever_vanilla, retriever_mmr, retriever_bm25],weights=ENSEMBLE_WEIGHTS,)

重新排序是一种可用于提高RAG管道性能的技术。这是一种非常强大的方法,可以显著提升搜索系统。简而言之,重新排序需要一个查询和一个响应,并输出它们之间的相关性得分。通过这种方式,可以使用任何搜索系统来显示可能包含查询答案的多个文档,然后使用Rerank端点对其进行排序。

但是:为什么我们需要重新排名?

为了解决准确性方面的挑战,使用了两阶段检索作为提高搜索质量的一种手段。在这些两阶段系统中,第一阶段模型(集成检索器)从更大的数据集中检索一组候选文档。然后,使用第二阶段模型(重新排序器)对第一阶段模型检索到的文档进行重新排序。此外,重新排名模型,如Cohere Rerank,是一种在给定查询和文档对时输出相似性分数的模型。此分数可用于对与搜索查询最相关的文档进行重新排序。在重新排序方法中,Cohere-Rerank模型因其显著提高搜索准确性的能力而脱颖而出。该模型与传统的嵌入模型不同,它采用深度学习来直接评估每个文档和查询之间的对齐情况。Cohere-Rerank通过协同处理查询和文档来输出相关性得分,从而产生更细致的文档选择过程。

在这种情况下,检索到的文档会被重新排序,并返回前两个最相关的文档。

from langchain.retrievers.contextual_compression import ContextualCompressionRetriever
from langchain_cohere import CohereRerank
from langchain_community.llms import Cohere# Set up Cohere re-ranking
compressor = CohereRerank(top_n=2, model="rerank-english-v3.0")# Build compression retriever
compression_retriever = ContextualCompressionRetriever(base_compressor=compressor,base_retriever=ensemble_retriever,
)compression_retriever.invoke("Retrieve the data center PUE efficiency in Dublin in 2019"
)

Output example to the question “Retrieve the data center PUE efficiency value in Dublin in 2019”:

[Document(metadata={'Header 2': 'Endnotes', 'relevance_score': 0.27009502}, page_content="- 1 This calculation is based on..."),Document(metadata={'Header 2': 'DATA CENTER GRID REGION CFE', 'relevance_score': 0.20593424}, page_content="2023  \n| Country..." )]

步骤4.2:检索和重新排序文档功能

async def retrieve_and_rerank_documents(state: QueryState, *, config: RunnableConfig
) -> dict[str, list[Document]]:"""Retrieve documents based on a given query.This function uses a retriever to fetch relevant documents for a given query.Args:state (QueryState): The current state containing the query string.config (RunnableConfig): Configuration with the retriever used to fetch documents.Returns:dict[str, list[Document]]: A dictionary with a 'documents' key containing the list of retrieved documents."""logger.info("---RETRIEVING DOCUMENTS---")logger.info(f"Query for the retrieval process: {state.query}")response = compression_retriever.invoke(state.query)return {"documents": response}

步骤4.3 :构建子图

builder = StateGraph(ResearcherState)
builder.add_node(generate_queries)
builder.add_node(retrieve_and_rerank_documents)
builder.add_edge(START, "generate_queries")
builder.add_conditional_edges("generate_queries",retrieve_in_parallel,  # type: ignorepath_map=["retrieve_and_rerank_documents"],
)
builder.add_edge("retrieve_and_rerank_documents", END)
researcher_graph = builder.compile()

步骤5:检查完成


使用conditional_edge,我们构建一个循环,其结束条件由check_finished返回的值决定。此函数检查create_research_plan节点创建的步骤列表中是否没有更多步骤需要处理。完成所有步骤后,流程将继续到响应节点。

def check_finished(state: AgentState) -> Literal["respond", "conduct_research"]:"""Determine if the research process is complete or if more research is needed.This function checks if there are any remaining steps in the research plan:- If there are, route back to the `conduct_research` node- Otherwise, route to the `respond` nodeArgs:state (AgentState): The current state of the agent, including the remaining research steps.Returns:Literal["respond", "conduct_research"]: The next step to take based on whether research is complete."""if len(state.steps or []) > 0:return "conduct_research"else:return "respond

步骤6:响应


根据所进行的research生成对用户查询的最终响应。此函数使用对话历史和research代理检索到的文档来制定全面的答案。

async def respond(state: AgentState, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:"""Generate a final response to the user's query based on the conducted research.This function formulates a comprehensive answer using the conversation history and the documents retrieved by the researcher.Args:state (AgentState): The current state of the agent, including retrieved documents and conversation history.config (RunnableConfig): Configuration with the model used to respond.Returns:dict[str, list[str]]: A dictionary with a 'messages' key containing the generated response."""print("--- RESPONSE GENERATION STEP ---")model = ChatOpenAI(model="gpt-4o-2024-08-06", temperature=0)context = format_docs(state.documents)prompt = RESPONSE_SYSTEM_PROMPT.format(context=context)messages = [{"role": "system", "content": prompt}] + state.messagesresponse = await model.ainvoke(messages)return {"messages": [response]}

步骤7:检查幻觉


此步骤检查基于检索到的文档的事实集是否支持LLM在上一步骤中生成的响应,并给出二进制分数。

async def check_hallucinations(state: AgentState, *, config: RunnableConfig
) -> dict[str, Any]:"""Analyze the user's query and checks if the response is supported by the set of facts based on the document retrieved,providing a binary score result.This function uses a language model to analyze the user's query and gives a binary score result.Args:state (AgentState): The current state of the agent, including conversation history.config (RunnableConfig): Configuration with the model used for query analysis.Returns:dict[str, Router]: A dictionary containing the 'router' key with the classification result (classification type and logic)."""model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)system_prompt = CHECK_HALLUCINATIONS.format(documents=state.documents,generation=state.messages[-1])messages = [{"role": "system", "content": system_prompt}] + state.messageslogging.info("---CHECK HALLUCINATIONS---")response = cast(GradeHallucinations, await model.with_structured_output(GradeHallucinations).ainvoke(messages))return {"hallucination": response}

步骤8:人工审批(人工参与)


如果LLM的回应没有得到一系列事实的支持,那么它很可能包含幻觉。在这种情况下,图形会中断,用户可以控制下一步:只重试上一代步骤,而不重新启动整个工作流或结束流程。这种人在循环步骤确保了用户的控制,同时避免了意外的循环或不希望的操作。

LangGraph中的中断函数通过在特定节点暂停图形、向人类呈现信息以及用他们的输入恢复图形,实现了人类在循环中的工作流程。此功能对于审批、编辑或收集其他输入等任务非常有用。中断函数与Command对象结合使用,以使用人类提供的值恢复图形。

def human_approval(state: AgentState,
):_binary_score = state.hallucination.binary_scoreif _binary_score == "1":return "END"else:retry_generation = interrupt({"question": "Is this correct?","llm_output": state.messages[-1]})if retry_generation == "y":print("voglio continuare")return "respond"else:return "END"

构建主图

from langgraph.graph import END, START, StateGraph
from langgraph.checkpoint.memory import MemorySavercheckpointer = MemorySaver()builder = StateGraph(AgentState, input=InputState)
builder.add_node(analyze_and_route_query)
builder.add_edge(START, "analyze_and_route_query")
builder.add_conditional_edges("analyze_and_route_query", route_query)
builder.add_node(create_research_plan)
builder.add_node(ask_for_more_info)
builder.add_node(respond_to_general_query)
builder.add_node(conduct_research)
builder.add_node("respond", respond)
builder.add_node(check_hallucinations)
builder.add_conditional_edges("check_hallucinations", human_approval, {"END": END, "respond": "respond"})
builder.add_edge("create_research_plan", "conduct_research")
builder.add_conditional_edges("conduct_research", check_finished)
builder.add_edge("respond", "check_hallucinations")graph = builder.compile(checkpointer=checkpointer)

构建main方法(app.py)
“每个函数都被定义为异步,以在生成步骤中启用流行为。

from subgraph.graph_states import ResearcherState
from main_graph.graph_states import AgentState
from utils.utils import config, new_uuid
from subgraph.graph_builder import researcher_graph
from main_graph.graph_builder import InputState, graph
from langgraph.types import Command
import asyncio
import uuidimport asyncio
import time
import builtinsthread = {"configurable": {"thread_id": new_uuid()}}async def process_query(query):inputState = InputState(messages=query)async for c, metadata in graph.astream(input=inputState, stream_mode="messages", config=thread):if c.additional_kwargs.get("tool_calls"):print(c.additional_kwargs.get("tool_calls")[0]["function"].get("arguments"), end="", flush=True)if c.content:time.sleep(0.05)print(c.content, end="", flush=True)if len(graph.get_state(thread)[-1]) > 0:if len(graph.get_state(thread)[-1][0].interrupts) > 0:response = input("\nThe response may contain uncertain information. Retry the generation? If yes, press 'y': ")if response.lower() == 'y':async for c, metadata in graph.astream(Command(resume=response), stream_mode="messages", config=thread):if c.additional_kwargs.get("tool_calls"):print(c.additional_kwargs.get("tool_calls")[0]["function"].get("arguments"), end="")if c.content:time.sleep(0.05)print(c.content, end="", flush=True)async def main():input = builtins.inputprint("Enter your query (type '-q' to quit):")while True:query = input("> ")if query.strip().lower() == "-q":print("Exiting...")breakawait process_query(query)if __name__ == "__main__":asyncio.run(main())

在第一次调用后,检查图形状态是否有中断。如果找到任何图形,可以使用以下命令再次调用该图形:

graph.astream(Command(resume=response), stream_mode="messages", config=thread)

三、结果

现场测试

作为第一个测试,执行以下查询以从不同的表中提取不同的值,结合多步骤方法的功能并利用 Docling 库的解析功能。

复杂问题:Retrieve the data center PUE efficiency values in Singapore 2nd facility in 2019 and 2022. Also retrieve regional average CFE in Asia pacific in 2023

完整结果正确幻觉检查成功通过

聊天机器人生成的步骤:

  • “Look up the PUE efficiency values for the Singapore 2nd facility for the years 2019 and 2022.”,
  • ”Find the regional average CFE for the Asia Pacific region in 2023.”

生成的文本:“- The Power Usage Effectiveness (PUE) for the Singapore 2nd facility in 2019 is not available, as the data for that year is not provided. However, the PUE for 2022 is 1.21

2023年亚太地区平均无碳能源(CFE)为12%。

完整输出

Enter your query (type '-q' to quit):
> Retrieve the data center PUE efficiency values in Singapore 2nd facility in 2019 and 2022. Also retrieve regional average CFE in Asia pacific in 2023 
2025-01-10 20:39:53,381 - INFO - ---ANALYZE AND ROUTE QUERY---
2025-01-10 20:39:53,381 - INFO - MESSAGES: [HumanMessage(content='Retrieve the data center PUE efficiency values in Singapore 2nd facility in 2019 and 2022. Also retrieve regional average CFE in Asia pacific in 2023 ', additional_kwargs={}, response_metadata={}, id='351a00e9-ecda-49e2-b069-19196348a82a')]
{"logic":"Retrieve the data center PUE efficiency values in Singapore 2nd facility in 2019 and 2022. Also retrieve regional average CFE in Asia pacific in 2023","type":"environmental"}2025-01-10 20:39:55,586 - INFO - ---PLAN GENERATION---
{"steps":["Look up the PUE efficiency values for the Singapore 2nd facility for the years 2019 and 2022.","Find the regional average CFE for the Asia Pacific region in 2023."]}2025-01-10 20:39:57,323 - INFO - ---GENERATE QUERIES---
{"queries":["PUE efficiency values Singapore 2nd facility 2019","PUE efficiency values Singapore 2nd facility 2022"]}2025-01-10 20:39:58,285 - INFO - Queries: ['PUE efficiency values Singapore 2nd facility 2019', 'PUE efficiency values Singapore 2nd facility 2022', 'Look up the PUE efficiency values for the Singapore 2nd facility for the years 2019 and 2022.']
2025-01-10 20:39:58,288 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:39:58,288 - INFO - Query for the retrieval process: PUE efficiency values Singapore 2nd facility 2019
2025-01-10 20:39:59,568 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:39:59,568 - INFO - Query for the retrieval process: PUE efficiency values Singapore 2nd facility 2022
2025-01-10 20:40:00,891 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:40:00,891 - INFO - Query for the retrieval process: Look up the PUE efficiency values for the Singapore 2nd facility for the years 2019 and 2022.
2025-01-10 20:40:01,820 - INFO - 
4 documents retrieved in total for the step: Look up the PUE efficiency values for the Singapore 2nd facility for the years 2019 and 2022..
2025-01-10 20:40:01,825 - INFO - ---GENERATE QUERIES---
{"queries":["Asia Pacific regional average CFE 2023","CFE statistics Asia Pacific 2023"]}2025-01-10 20:40:02,778 - INFO - Queries: ['Asia Pacific regional average CFE 2023', 'CFE statistics Asia Pacific 2023', 'Find the regional average CFE for the Asia Pacific region in 2023.']
2025-01-10 20:40:02,780 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:40:02,780 - INFO - Query for the retrieval process: Asia Pacific regional average CFE 2023
2025-01-10 20:40:03,757 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:40:03,757 - INFO - Query for the retrieval process: CFE statistics Asia Pacific 2023
2025-01-10 20:40:04,885 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:40:04,885 - INFO - Query for the retrieval process: Find the regional average CFE for the Asia Pacific region in 2023.
2025-01-10 20:40:06,526 - INFO - 
4 documents retrieved in total for the step: Find the regional average CFE for the Asia Pacific region in 2023..
2025-01-10 20:40:06,530 - INFO - --- RESPONSE GENERATION STEP ---
- The Power Usage Effectiveness (PUE) for the Singapore 2nd facility in 2019 is not available, as the data for that year is not provided. However, the PUE for 2022 is 1.21 [e048d08a-4ef6-77b5-20d3-352dcec590b7].- The regional average Carbon-Free Energy (CFE) in the Asia Pacific for 2023 is 12% [9c489d2f-f16f-572b-abed-ee1d5d0ed379].2025-01-10 20:40:14,918 - INFO - ---CHECK HALLUCINATIONS---
{"binary_score":"1"}> 

现在让我们在ChatGPT上尝试一下。将pdf文件上传到web应用程序后,也进行了相同的查询。

如图所示,ChatGPT返回的值不正确,模型出现幻觉。在这种情况下,幻觉检查步骤将允许反应再生(Self-Reflective RAG)。

四、结论

Agentic RAG:技术挑战和注意事项

尽管性能有所提高,但实施 Agentic RAG 仍存在挑战:

  • 延迟:代理交互的复杂性增加通常会导致更长的响应时间。在速度和准确性之间取得平衡是一项关键挑战。
  • 评估和可观察性:随着 Agentic RAG 系统变得越来越复杂,持续评估和可观察性变得必要。

总而言之,Agentic RAG 标志着人工智能领域的重大突破。通过将大型语言模型的功能与自主推理和信息检索相结合,Agentic RAG 引入了智能和灵活性的全新标准。随着人工智能的不断发展,Agentic RAG 将在各行各业发挥重要作用,彻底改变我们使用技术的方式。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/86321.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/86321.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Docker基础】Docker容器管理:docker restart详解

目录 1 docker restart命令概述 1.1 命令作用 1.2 与相关命令对比 2 命令语法详解 2.1 基础语法 2.2 核心参数说明 3 核心原理深度解析 3.1 信号传递机制 3.2 状态转换 4 典型应用场景 4.1 服务配置更新 4.2 故障恢复流程 5 进阶使用技巧 5.1 组合命令应用 5.2 …

mongoDB服务本地化部署

mongoDB服务本地化部署 前言mongoDB下载选择版本安装 前言 mongoDB数据库解释 MongoDB 是由C语言编写的,是一个基于分布式文件存储的开源数据库系统;在高负载的情况下,添加更多的节点,可以保证服务器性能;MongoDB 旨在…

YOLOv10tensorRT推理代码C++

最近实现了YOLOv10的tensorRT推理代码除了后处理部分只适合YOLOv10之外&#xff0c;其余部分基本可以在yolo系列通用~学习记录~。 #include <fstream> #include <iostream> #include <vector> #include <opencv2/opencv.hpp> #include "NvInfer.…

软件定时器详解:RTOS 中的“软时钟”机制与源码解析

在嵌入式实时系统开发中&#xff0c;定时器是不可或缺的工具。软件定时器&#xff08;Software Timer&#xff09; 提供了一种无需创建独立任务、便可在特定延时后执行回调函数的机制。它适用于那些不要求高精度、但需要周期性或一次性延时执行操作的场景。 一、什么是软件定时…

从Yocto中获取源码用传统的方式单独编译

要获取 Yocto 构建后的 Linux 内核和 U-Boot 源码,并进行独立编译,需获取完整的源码树(包含所有应用补丁和配置)及原始配置信息。以下是具体步骤: 获取最终源码路径确定构建目录位置: 内核工作目录 KERNEL_WORKDIR=$(bitbake -e virtual/kernel | grep ^WORKDIR= | cut…

【记录】服务器|常见的八种硬盘接口的简介和清晰的接口图片(2025年6月)

硬盘接口很多&#xff0c;在管服务器的时候总是要买&#xff0c;但是偶尔会忘记自己的服务器支持什么接口&#xff0c;此时就需要看引脚。 如果没插满&#xff0c;就可以直接拍接口的图片&#xff0c;与下面这些图片对照一下【文字介绍是AI直接生成的&#xff0c;图片是我到处…

在一个成熟产品中,如何设计数据库架构以应对客户字段多样化,确保系统的可维护性、可扩展性和高性能。

在SaaS系统、平台型应用或高度可配置的企业级软件中&#xff0c;我们常常会遇到一个现实问题&#xff1a;不同客户对同一个业务表存在差异化字段需求。例如&#xff0c;A客户需要一个“业务员等级”字段&#xff0c;B客户不需要&#xff1b;C客户希望订单表中增加“海外仓编码”…

社群营销应该怎么玩

现在做营销&#xff0c;光靠打广告可不行了。大家都喜欢扎堆儿&#xff0c;找志同道合的人一起玩&#xff0c;这就是社群的力量。那怎么用好这股力量呢&#xff1f;咱们慢慢聊。 首先得明白&#xff0c;社群不是拉个群就完事了。关键是要让大家觉得这里有意思&#xff0c;有收…

【论文阅读笔记】TransparentGS:当高斯溅射学会“看穿”玻璃,如何攻克透明物体重建难题?

文章目录 TransparentGS: Fast Inverse Rendering of Transparent Objects with GaussiansInfoAbstractIntroductionMethod预备知识3D GS的概念不再赘述渲染方程透明高斯Gaussian Light Field Probes多阶段重建实验结果和评估消融实验应用讨论和限制结论TransparentGS: Fast In…

某视频网站运维工程师面试题

某视频网站运维工程师面试题 1、 简单写下Xeon和Itanium这两个产品的本质区别&#xff1f; 2、 ECC内存每Bank的颗粒数是单数还是双数的&#xff1f; 3、 假如有5块1T的硬盘&#xff0c;要求组合成尽量多的实际使用空间并至少容忍坏2盘而不影响raid组工作。请问有几种模式来组…

Java底层原理:深入理解JVM性能调优与监控

一、JVM性能调优概述 JVM性能调优是Java应用优化的重要环节&#xff0c;通过合理配置JVM参数&#xff0c;可以提高Java应用的性能和稳定性。JVM性能调优的主要目标是减少垃圾回收的频率和时间&#xff0c;提高线程的运行效率&#xff0c;优化内存的使用。 &#xff08;一&…

Joblib库多进程/线程使用(一):使用generator参数实现边响应边使用

进程与线程的基本概念 特性进程 (Process)线程 (Thread)定义 操作系统分配资源的基本单位&#xff08;独立的内存空间&#xff09; 多进程可真正并行&#xff08;利用多核 CPU&#xff09; 进程内的执行单元&#xff08;共享进程资源&#xff09;独立性完全独立&#xff0c;崩…

css上下滚动文字

效果图 取得是数组里的数据 上下滚动切换 css .notice-new {background: #222222;border-radius: 19rpx;margin-top: 28rpx;font-size: 24rpx;color: white;font-weight: 500;padding: 0 20rpx;height: 55rpx;line-height: 55rpx;overflow: hidden;.notice-scroll-wrapper {pos…

概念篇: 01-带你认识Dockerfile

在本篇文章中&#xff0c;我们将带你认识 Dockerfile —— 构建 Docker 镜像的"蓝图"。我们会介绍它的基本概念和常用指令&#xff0c;帮助你理解如何使用它来打包你的应用。 简单了解 Docker&#xff08;背景知识&#xff09; 在我们深入 Dockerfile 之前&#xf…

技术伦理之争:OpenAI陷抄袭风波,法院强制下架宣传视频

在AI巨头OpenAI宣布以65亿美元天价收购苹果前设计总监Jony Ive的硬件公司IO仅一个月后&#xff0c;一场抄袭指控将这家科技明星企业推上风口浪尖。 源自谷歌X实验室的初创企业IYO将OpenAI告上法庭&#xff0c;指控其窃取智能耳塞核心技术&#xff0c;并通过巨额收购试图掩盖抄袭…

前沿解读:缺陷如何操控二维半导体中的电子摩擦耗散超快动力学

摩擦能耗约占全球一次能源损耗的1/3&#xff0c;在微纳器件中尤为突出。二维半导体&#xff08;如WS₂&#xff09;因其独特的电子特性成为研究热点&#xff0c;但电子摩擦的动态机制因电子行为的超快特性长期难以捕捉。近期清华团队在Nature Communications发表的研究[1]&…

什么是物联网 (IoT)?

你家是否安装了智能恒温器&#xff1f;或者你属于三分之一的美国健身追踪器用户&#xff0c;通过设备记录运动习惯&#xff1f;如果是&#xff0c;你已在使用物联网技术。这项技术不仅融入日常生活&#xff0c;更深刻改变着组织的运营方式。物联网通过多种技术连接数字与物理世…

[特殊字符] Windows 查看端口占用及服务来源教程(以 9018 端口为例)

下面是一份详细的 Windows 系统中排查 某端口&#xff08;如 9018&#xff09;被哪个程序占用 并确定其具体服务来源的完整教程&#xff0c;适合用于日常运维、开发部署排障等场景。 &#x1f3af; Windows 查看端口占用及服务来源教程&#xff08;以 9018 端口为例&#xff09…

异步爬虫 原理与解析

先遍历100遍一个程序 import requests import logging import timelogging.basicConfig(levellogging.INFO, format%(asctime)s - %(levelname)s: %(message)s) TOTAL_NUMBER 100 BASE_URL https://ssr4.scrape.center/start_time time.time() for id in range(1,TOTAL_NUM…

vscode管理go多个版本

#1.下载go安装包 https://developer.aliyun.com/mirror/golang/?spma2c6h.25603864.0.0.55ea7c45IsI4GM # 2.创建 sdk 目录&#xff08;如果不存在&#xff09; mkdir -p ~/sdk # 3.解压下载的 go1.16.15 到 ~/sdk/ tar -C ~/sdk -xzf go1.16.15.linux-amd64.tar.gz # 4.重…