文章目录
- MCP基础概念
- 文件操作服务器
- 文件操作MCP接入谷歌ADK
- 项目创建
- 多轮对话代码
MCP基础概念
- MCP技术体系中,会将外部工具运行脚本称作服务器,而接入这些外部工具的大模型运行环境称作客户端。
- 一个客户端可以接入多个不同类型的服务器,但都要遵循
MCP
通信协议。 - MCP服务器的输出内容是一种标准格式的内容,只能被
MCP
客户端所识别。在客户端和服务器都遵循MCP
协议的时候,客户端就能够像Function calling
中大模型调用外部工具一样,调用MCP
服务器里面的工具。
- 将
ADK
作为客户端,接入MCP
工具。核心MCPToolset
类,该类是ADK
连接到MCP
服务器的桥梁。 - 在实际过程中,ADK代理将执行以下操作
MCPToolset
:- 连接:与
MCP
服务器进程建立连接。该服务器可以是通过标准输入/输出 (StdioServerParameters
) 进行通信的本地服务器,也可以是使用服务器发送事件 (SseServerParams
) 的远程服务器。 - 发现:查询
MCP
服务器以获取可用工具(list_tools MCP
方法)。 - 适应:将
MCP
工具模式转换为ADK
兼容BaseTool
实例。 - 公开:将这些适配的工具呈现给
ADK Agent
。 - 代理调用:当
Agent
决定使用其中一个工具时,发到MCP
服务器并返回结果。MCPToolset
将调用(call_tool MCP
方法)转 - 管理连接:处理与
MCP
服务器进程的连接的生命周期,通常需要明确清理的周期(如进程结束后清理)。
- 连接:与
文件操作服务器
- Filesystem服务器是一个最基础同时也是最常用的MCP服务器,同时也是官方推荐的服务器,服务器项目地址。
文件操作MCP接入谷歌ADK
项目创建
- 项目框架搭建:
# 初始化项目 uv init adk_mcp # 进入项目目录 cd adk_mcp # 创建虚拟环境 uv venv # 激活虚拟环境 .venv\Scripts\activate # 安装依赖 uv add google-adk litellm
- 创建环境配置文件
.env
:OPENAI_API_KEY=xxx OPENAI_API_BASE=https://api.openai-hk.com/v1 MODEL=openai/gpt-4o-mini
- 创建
main.py
,代码内容如下:# 导入基础库 import os import asyncio from dotenv import load_dotenv from google.genai import types from google.adk.agents import Agent from google.adk.models.lite_llm import LiteLlm from google.adk.agents.llm_agent import LlmAgent from google.adk.runners import Runner from google.adk.sessions import InMemorySessionService from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters,StdioConnectionParamsDS_API_KEY = os.getenv("OPENAI_API_KEY") DS_BASE_URL = os.getenv("OPENAI_API_BASE") DS_MODEL = os.getenv("MODEL")model = LiteLlm(model=DS_MODEL,api_base=DS_BASE_URL,api_key=DS_API_KEY )# --- Step 1: 创建导入MCP工具函数 --- async def get_tools_async():"""Gets tools from the File System MCP Server."""print("Attempting to connect to MCP Filesystem server...")# 确保目录存在test_dir = "D:\\Code\\adk_mcp\\test"if not os.path.exists(test_dir):os.makedirs(test_dir)print(f"Created directory: {test_dir}")try:# 创建MCP工具集实例mcp_toolset = MCPToolset(# 这里采用Studio通信方式进行调用connection_params=StdioConnectionParams(server_params=StdioServerParameters(command='npx',args=["-y", "@modelcontextprotocol/server-filesystem", test_dir],)))# 等待服务器启动await asyncio.sleep(2)# 获取工具列表tools = await mcp_toolset.get_tools()print("MCP Toolset created successfully.")print(f"Fetched {len(tools)} tools from MCP server.")# 返回工具和工具集对象用于清理return tools, mcp_toolsetexcept Exception as e:print(f"Error creating MCP tools: {e}")raise# --- Step 2: 创建ADK Agent -- async def get_agent_async():"""Creates an ADK Agent equipped with tools from the MCP Server."""try:tools, mcp_toolset = await get_tools_async()root_agent = Agent(model=model,name='filesystem_assistant',instruction='Help user interact with the local filesystem using available tools.',tools=tools, # 将MCP工具加载到Agent中)return root_agent, mcp_toolsetexcept Exception as e:print(f"Error creating agent: {e}")raise# --- Step 3: 执行主逻辑 --- async def async_main():mcp_toolset = Nonetry:# 创建会话管理器session_service = InMemorySessionService()session = await session_service.create_session(state={}, app_name='mcp_filesystem_app', user_id='user_fs')# 用户输入query = "请帮我查找目前文件夹里都有哪些文件?"print(f"User Query: '{query}'")content = types.Content(role='user', parts=[types.Part(text=query)])root_agent, mcp_toolset = await get_agent_async()runner = Runner(app_name='mcp_filesystem_app',agent=root_agent,session_service=session_service,)print("Running agent...")events_async = runner.run_async(session_id=session.id, user_id=session.user_id, new_message=content)async for event in events_async:print(f"Event received: {event}")except Exception as e:print(f"Error during main execution: {e}")raisefinally:# 运行完成后,关闭MCP服务器连接if mcp_toolset:print("Closing MCP server connection...")await mcp_toolset.close()print("Cleanup complete.")if __name__ == '__main__':try:asyncio.run(async_main())except Exception as e:print(f"An error occurred: {e}")
- 执行结果如下:
(adk_mcp) D:\Code\adk_mcp>uv run main.py D:\Code\adk_mcp\.venv\Lib\site-packages\pydantic\_internal\_fields.py:198: UserWarning: Field name "config_type" in "SequentialAgent" shadows an attribute in parent "BaseAgent"warnings.warn( User Query: '请帮我查找目前文件夹里都有哪些文件?' Attempting to connect to MCP Filesystem server... D:\Code\adk_mcp\.venv\Lib\site-packages\google\adk\tools\mcp_tool\mcp_tool.py:87: UserWarning: [EXPERIMENTAL] BaseAuthenticatedTool: This feature is experimental and may change or be removed in future versions without notice. It may introduce breaking changes at any time.super().__init__( auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required. MCP Toolset created successfully. Fetched 14 tools from MCP server. Running agent... Event received: content=Content(parts=[Part(text="""首先,我需要确定您当前的工作目录位置。由于您没有指定具体路径,我将先获取允许访问的目录列表:"""),Part(function_call=FunctionCall(args={},id='call_5201be00eeb24a989be18f',name='list_allowed_directories')),],role='model' ) grounding_metadata=None partial=False turn_complete=None error_code=None error_message=None interrupted=None custom_metadata=None usage_metadata=GenerateContentResponseUsageMetadata(candidates_token_count=627,prompt_token_count=1806,total_token_count=2433 ) live_session_resumption_update=None invocation_id='e-1710d865-f562-4895-ae06-4a3f0155c138' author='filesystem_assistant' actions=EventActions(skip_summarization=None, state_delta={}, artifact_delta={}, transfer_to_agent=None, escalate=None, requested_auth_configs={}) long_running_tool_ids=set() branch=None id='71634100-b10a-4a81-b779-f34f52df12eb' timestamp=1754979735.932518 Event received: content=Content(parts=[Part(function_response=FunctionResponse(id='call_5201be00eeb24a989be18f',name='list_allowed_directories',response={'result': CallToolResult(content=[<... Max depth ...>,],isError=False)})),],role='user' ) grounding_metadata=None partial=None turn_complete=None error_code=None error_message=None interrupted=None custom_metadata=None usage_metadata=None live_session_resumption_update=None invocation_id='e-1710d865-f562-4895-ae06-4a3f0155c138' author='filesystem_assistant' actions=EventActions(skip_summarization=None, state_delta={}, artifact_delta={}, transfer_to_agent=None, escalate=None, requested_auth_configs={}) long_running_tool_ids=None branch=None id='00b7f823-5a66-493d-95bd-55ee93466fc2' timestamp=1754979761.37467 Event received: content=Content(parts=[Part(text="""根据系统配置,允许访问的目录是 `D:\Code\adk_mcp\test`。我将列出该目录下的文件:"""),Part(function_call=FunctionCall(args={'path': 'D:\\Code\\adk_mcp\\test'},id='call_2f66909ac2054847b6f9d3',name='list_directory')),],role='model' ) grounding_metadata=None partial=False turn_complete=None error_code=None error_message=None interrupted=None custom_metadata=None usage_metadata=GenerateContentResponseUsageMetadata(candidates_token_count=1105,prompt_token_count=1906,total_token_count=3011 ) live_session_resumption_update=None invocation_id='e-1710d865-f562-4895-ae06-4a3f0155c138' author='filesystem_assistant' actions=EventActions(skip_summarization=None, state_delta={}, artifact_delta={}, transfer_to_agent=None, escalate=None, requested_auth_configs={}) long_running_tool_ids=set() branch=None id='39358200-064d-478e-90f6-ab0855f13ec4' timestamp=1754979761.380695 Event received: content=Content(parts=[Part(function_response=FunctionResponse(id='call_2f66909ac2054847b6f9d3',name='list_directory',response={'result': CallToolResult(content=[<... Max depth ...>,],isError=False)})),],role='user' ) grounding_metadata=None partial=None turn_complete=None error_code=None error_message=None interrupted=None custom_metadata=None usage_metadata=None live_session_resumption_update=None invocation_id='e-1710d865-f562-4895-ae06-4a3f0155c138' author='filesystem_assistant' actions=EventActions(skip_summarization=None, state_delta={}, artifact_delta={}, transfer_to_agent=None, escalate=None, requested_auth_configs={}) long_running_tool_ids=None branch=None id='2d1197f7-c81d-41a5-9fc5-15652c50fa7e' timestamp=1754979803.754635 Event received: content=Content(parts=[Part(text="""当前目录 `D:\Code\adk_mcp\test` 中的文件如下:- 📄 **系统架构设计师教程_带目录高清版.pdf**"""),],role='model' ) grounding_metadata=None partial=False turn_complete=None error_code=None error_message=None interrupted=None custom_metadata=None usage_metadata=GenerateContentResponseUsageMetadata(candidates_token_count=128,prompt_token_count=2021,total_token_count=2149 ) live_session_resumption_update=None invocation_id='e-1710d865-f562-4895-ae06-4a3f0155c138' author='filesystem_assistant' actions=EventActions(skip_summarization=None, state_delta={}, artifact_delta={}, transfer_to_agent=None, escalate=None, requested_auth_configs={}) long_running_tool_ids=None branch=None id='15fdd91c-e645-46fe-a613-76e1f28323c5' timestamp=1754979803.761915 Closing MCP server connection... Cleanup complete.
多轮对话代码
- 以下代码是
main.py
的多轮对话的版本:# 导入基础库 import os import asyncio from dotenv import load_dotenv from google.genai import types from google.adk.agents import Agent from google.adk.models.lite_llm import LiteLlm from google.adk.agents.llm_agent import LlmAgent from google.adk.runners import Runner from google.adk.sessions import InMemorySessionService from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters,StdioConnectionParamsDS_API_KEY = os.getenv("OPENAI_API_KEY") DS_BASE_URL = os.getenv("OPENAI_API_BASE") DS_MODEL = os.getenv("MODEL")model = LiteLlm(model=DS_MODEL,api_base=DS_BASE_URL,api_key=DS_API_KEY )# --- Step 1: 创建导入MCP工具函数 --- async def get_tools_async():"""Gets tools from the File System MCP Server."""print("Attempting to connect to MCP Filesystem server...")# 确保目录存在test_dir = "D:\\Code\\adk_mcp\\test"if not os.path.exists(test_dir):os.makedirs(test_dir)print(f"Created directory: {test_dir}")try:# 创建MCP工具集实例mcp_toolset = MCPToolset(# 这里采用Studio通信方式进行调用connection_params=StdioConnectionParams(server_params=StdioServerParameters(command='npx', # Command to run the serverargs=["-y", # Arguments for the command"@modelcontextprotocol/server-filesystem",test_dir],)))# 等待服务器启动await asyncio.sleep(2)# 获取工具列表tools = await mcp_toolset.get_tools()print("MCP Toolset created successfully.")print(f"Fetched {len(tools)} tools from MCP server.")# 返回工具和工具集对象用于清理return tools, mcp_toolsetexcept Exception as e:print(f"Error creating MCP tools: {e}")raise# --- Step 2: 创建ADK Agent -- async def get_agent_async():"""Creates an ADK Agent equipped with tools from the MCP Server."""try:tools, mcp_toolset = await get_tools_async()root_agent = Agent(model=model,name='filesystem_assistant',instruction='Help user interact with the local filesystem using available tools.',tools=tools, # 将MCP工具加载到Agent中)return root_agent, mcp_toolsetexcept Exception as e:print(f"Error creating agent: {e}")raise# 多轮对话函数 async def chat_loop(runner, user_id, session_id) -> None:print("\n🤖ADK + MCP对话已启动!输入'quit'退出。")while True:query = input("\n你: ").strip()if query.lower() == "quit":breaktry:print(f"\n>>> User Query: {query}")# Prepare the user's message in ADK formatcontent = types.Content(role='user', parts=[types.Part(text=query)])final_response_text = "Agent did not produce a final response." # Default# Key Concept: run_async executes the agent logic and yields Events.# We iterate through events to find the final answer.async for event in runner.run_async(user_id=user_id,session_id=session_id, new_message=content):# You can uncomment the line below to see *all* events during execution# print(f" [Event] Author: {event.author}, Type:{type(event).__name__}, Final: {event.is_final_response()}, Content:{event.content}")# Key Concept: is_final_response() marks the concluding message for the turn.if event.is_final_response():if event.content and event.content.parts:# Assuming text response in the first partfinal_response_text = event.content.parts[0].textelif event.actions and event.actions.escalate: # Handle potential errors / escalationsfinal_response_text = f"Agent escalated:{event.error_message or 'No specific message.'}"# Add more checks here if needed (e.g., specific error codes)breakprint(f"<<< Agent Response: {final_response_text}")except Exception as e:print(f"\n⚠调用过程出错: {e}")# --- Step 3: 执行主逻辑 --- async def async_main():mcp_toolset = Nonetry:# 创建会话管理器session_service = InMemorySessionService()session = await session_service.create_session(state={}, app_name='mcp_filesystem_app', user_id='user_fs')# 用户输入query = "请帮我查找目前文件夹里都有哪些文件?"print(f"User Query: '{query}'")content = types.Content(role='user', parts=[types.Part(text=query)])root_agent, mcp_toolset = await get_agent_async()runner = Runner(app_name='mcp_filesystem_app',agent=root_agent,session_service=session_service,)print("Running agent...")events_async = runner.run_async(session_id=session.id, user_id=session.user_id, new_message=content)async for event in events_async:print(f"Event received: {event}")# 启动多轮对话await chat_loop(runner, session.user_id, session.id)except Exception as e:print(f"Error during main execution: {e}")raisefinally:# 运行完成后,关闭MCP服务器连接if mcp_toolset:print("Closing MCP server connection...")await mcp_toolset.close()print("Cleanup complete.")if __name__ == '__main__':try:asyncio.run(async_main())except Exception as e:print(f"An error occurred: {e}")