目录
- 完整代码
- 代码解释
- 1. 导入和初始化
- 2. 类型定义
- 3. MCP_ChatBot 类初始化
- 4. 查询处理 (process_query)
- 5. 服务器连接管理
- 6. 核心特性总结
- 示例
完整代码
原课程代码是用Anthropic写的,下面代码是用OpenAI改写的,模型则用阿里巴巴的模型做测试
.env 文件为:
OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
OPENAI_API_BASE=https://dashscope.aliyuncs.com/compatible-mode/v1
另外,课程代码只是单轮对话,下面代码修改为多轮对话,更适合千问模型的调用方式
from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from typing import List,TypedDict
from contextlib import AsyncExitStack
from typing import Dict
import asyncio
import json
import osload_dotenv()class ToolDefinition(TypedDict):name: strdescription: strinput_schema: dictclass MCP_ChatBot:def __init__(self):# Initialize session and client objectsself.sessions: List[ClientSession] = [] # newself.exit_stack = AsyncExitStack()self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))self.available_tools: List[ToolDefinition] = [] # newself.tool_to_session: Dict[str, ClientSession] = {}self.messages = []async def process_query(self, query):self.messages.append({'role':'user', 'content':query})response = self.client.chat.completions.create(model='qwen-turbo',# max_tokens=2024,tools=self.available_tools,messages=self.messages )process_query = Truewhile process_query:# 获取助手的回复message = response.choices[0].message# 检查是否有普通文本内容if message.content:print(message.content)process_query = False# 检查是否有工具调用elif message.tool_calls:# 添加助手消息到历史self.messages.append({"role": "assistant", "content": None,"tool_calls": message.tool_calls})# 处理每个工具调用for tool_call in message.tool_calls:tool_id = tool_call.idtool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)print(f"Calling tool {tool_name} with args {tool_args}")# 执行工具调用session = self.tool_to_session[tool_name]result = await session.call_tool(tool_name, arguments=tool_args)# 添加工具结果到消息历史self.messages.append({"role": "tool","tool_call_id": tool_id,"content": result.content})# 获取下一个回复response = self.client.chat.completions.create(model='qwen-turbo',# max_tokens=2024,tools=self.available_tools,messages=self.messages )self.messages.append({"role": "assistant", "content": response.choices[0].message.content})# 如果只有文本回复,则结束处理if response.choices[0].message.content and not response.choices[0].message.tool_calls:print(response.choices[0].message.content)process_query = Falseasync def chat_loop(self):"""Run an interactive chat loop"""print("\nMCP Chatbot Started!")print("Type your queries or 'quit' to exit.")while True:try:query = input("\nQuery: ").strip()if query.lower() == 'quit':breakawait self.process_query(query)print("\n")except Exception as e:print(f"\nError: {str(e)}")async def connect_to_server(self, server_name: str, server_config: dict) -> None:"""Connect to a single MCP server."""try:server_params = StdioServerParameters(**server_config)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))read, write = stdio_transportsession = await self.exit_stack.enter_async_context(ClientSession(read, write))await session.initialize()self.sessions.append(session)# List available tools for this sessionresponse = await session.list_tools()tools = response.toolsprint(f"\nConnected to {server_name} with tools:", [t.name for t in tools])for tool in tools: # newself.tool_to_session[tool.name] = sessionself.available_tools.append({"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema}})except Exception as e:print(f"Failed to connect to {server_name}: {e}")async def connect_to_servers(self): # new"""Connect to all configured MCP servers."""try:with open("server_config.json", "r") as file:data = json.load(file)servers = data.get("mcpServers", {})for server_name, server_config in servers.items():await self.connect_to_server(server_name, server_config)except Exception as e:print(f"Error loading server configuration: {e}")raise async def clenup(self):await self.exit_stack.aclose()async def main():chatbot = MCP_ChatBot()try:await chatbot.connect_to_servers()await chatbot.chat_loop()finally:await chatbot.clenup()if __name__ == "__main__":asyncio.run(main())"""
1、Fetch the content of this website: https://modelcontextprotocol.io/docs/concepts/architecture.
2、save the content in the file "mcp_summary.md"
"""
代码解释
1. 导入和初始化
from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from typing import List,TypedDict
from contextlib import AsyncExitStack
from typing import Dict
import asyncio
import json
import osload_dotenv()
- 导入必要的库,包括OpenAI客户端、MCP协议相关模块、异步处理模块等
load_dotenv()
加载环境变量配置
2. 类型定义
class ToolDefinition(TypedDict):name: strdescription: strinput_schema: dict
定义工具的类型结构,用于类型提示。
3. MCP_ChatBot 类初始化
class MCP_ChatBot:def __init__(self):self.sessions: List[ClientSession] = [] # 存储多个MCP会话self.exit_stack = AsyncExitStack() # 管理异步资源self.client = openai.OpenAI( # OpenAI客户端api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))self.available_tools: List[ToolDefinition] = [] # 可用工具列表self.tool_to_session: Dict[str, ClientSession] = {} # 工具名到会话的映射self.messages = [] # 对话历史
关键特性:
- 多会话支持:
sessions
列表存储多个MCP服务器会话 - 工具映射:
tool_to_session
将工具名映射到对应的会话,实现工具路由 - 资源管理:使用
AsyncExitStack
管理异步资源的生命周期
4. 查询处理 (process_query)
async def process_query(self, query):self.messages.append({'role':'user', 'content':query})response = self.client.chat.completions.create(model='qwen-turbo',tools=self.available_tools,messages=self.messages )
核心处理逻辑:
- 消息循环处理:使用
while process_query
循环处理多轮对话 - 工具调用处理:检测并执行工具调用,通过
tool_to_session
路由到正确的MCP服务器 - 结果整合:将工具执行结果添加到对话历史中
5. 服务器连接管理
async def connect_to_server(self, server_name: str, server_config: dict) -> None:# 建立单个服务器连接server_params = StdioServerParameters(**server_config)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))# ... 获取工具并建立映射for tool in tools:self.tool_to_session[tool.name] = sessionself.available_tools.append({"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema}})
async def connect_to_servers(self):# 连接所有配置的服务器with open("server_config.json", "r") as file:data = json.load(file)servers = data.get("mcpServers", {})for server_name, server_config in servers.items():await self.connect_to_server(server_name, server_config)
6. 核心特性总结
多服务器支持:
- 可以同时连接多个MCP服务器
- 每个服务器的工具都被统一管理
- 通过工具名自动路由到正确的服务器
OpenAI格式兼容:
- 工具定义使用OpenAI的函数调用格式
- 支持完整的工具调用流程
异步处理:
- 全异步设计,支持并发处理
- 使用
AsyncExitStack
管理资源生命周期
配置化管理:
- 通过
server_config.json
配置多个服务器 - 支持动态加载服务器配置
这个实现相比单服务器版本的主要优势是可以整合多个不同功能的MCP服务器,为用户提供更丰富的工具集合。
示例
uv run connect_server_map_chatbot.py