说在前面
上一篇文章发布了一个mcp-server
,具体的server
是否能被正确的访问到?是否能够得到正常的返回? 在github
上找到一个客户端的代码实现,我把里面的大模型调用换成了支持国内大模型的方式,一起来验证一下吧~
主要功能
- 连接
mcp-server
- 获取
mcp
工具列表 - 调用大模型明确需要调用的方法以及参数
- 执行工具获取返回值
- 调用大模型进行问题总结
- 详尽的日志信息,帮助你更好的了解整个过程
一些说明
关于大模型的选择
文章里用的是open-ai sdk
,但是因为万能的阿里云连接国际,所以阿里百炼的api-key
也是通用的。百炼给新用户都是有免费的大模型调用额度的,放心使用。
大模型的使用
可以参考阿里百炼的api说明,里面有详细的参数,有兴趣的可以自行拼接尝试。
代码
import asyncio
import json
import os
import sys
from typing import Optional, List, Dict
from contextlib import AsyncExitStackfrom mcp import ClientSession
from mcp.client.sse import sse_clientfrom openai import OpenAI
from dotenv import load_dotenvload_dotenv() # load environment variables from .envclass MCPClient:def __init__(self):# Initialize session and client objects# 表示对象可以是None,也可以是ClientSession类型self.session: Optional[ClientSession] = Noneself.exit_stack = AsyncExitStack()self.openai = OpenAI(api_key="your key your key your key",base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",)async def connect_to_sse_server(self, server_url: str):"""Connect to an MCP server running with SSE transport"""# Store the context managers so they stay aliveself._streams_context = sse_client(url=server_url)streams = await self._streams_context.__aenter__()# * 表示将streams解包,将其作为参数传递给ClientSession,假设streams是一个元组,# 那么*streams就等价于ClientSession(stream1, stream2, stream3)self._session_context = ClientSession(*streams)self.session: ClientSession = await self._session_context.__aenter__()# Initializeawait self.session.initialize()# List available tools to verify connectionprint("Initialized SSE client...")print("Listing tools...")response = await self.session.list_tools()tools = response.toolsprint("工具列表:", json.dumps(response.model_dump(), ensure_ascii=False, indent=2))print("\nConnected to server with tools:", [tool.name for tool in tools])async def cleanup(self):"""Properly clean up the session and streams"""if self._session_context:await self._session_context.__aexit__(None, None, None)if self._streams_context:await self._streams_context.__aexit__(None, None, None)async def process_query(self, query: str) -> str:"""Process a query using OpenAI and available tools"""messages = [{"role": "user","content": query}]response = await self.session.list_tools()# 转换工具格式以适应OpenAI的函数调用要求available_tools: List[Dict] = [{ "type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema # 假设inputSchema符合OpenAI的参数格式要求}} for tool in response.tools]print(f"大模型调用详情: {json.dumps(messages, ensure_ascii=False, indent=2)}")# 初始OpenAI API调用response = self.openai.chat.completions.create(model="qwen3-32b", # 使用OpenAI模型max_tokens=1000,messages=messages,tools=available_tools,tool_choice="auto", # 自动决定是否调用工具extra_body={"enable_thinking": False,})# 打印大模型决策过程print("\n===== 大模型工具调用决策 =====")print(f"原始响应: {json.dumps(response.model_dump(), ensure_ascii=False, indent=2)}")# 处理响应和工具调用tool_results = []final_text = []response_message = response.choices[0].messagetool_calls = response_message.tool_callsprint(f"是否调用工具: {'是' if tool_calls else '否'}")# 打印工具调用详情if tool_calls:print(f"调用工具数量: {len(tool_calls)}")for i, call in enumerate(tool_calls):print(f"工具 {i+1}: {call.function.name}, 参数: {call.function.arguments}")# 如果有工具调用if tool_calls:for tool_call in tool_calls:tool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)# 执行工具调用print(f"\n===== 调用工具 '{tool_name}' =====")print(f"参数: {json.dumps(tool_args, ensure_ascii=False, indent=2)}")result = await self.session.call_tool(tool_name, tool_args)print(f"返回结果: {json.dumps(str(result.content), ensure_ascii=False, indent=2)}")tool_results.append({"call": tool_name, "result": result})final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")# 将工具调用结果添加到对话历史messages.append({"role": "assistant","content": None,"tool_calls": [tool_call.model_dump()]})messages.append({"role": "tool","tool_call_id": tool_call.id,"content": str(result.content)})print(f"大模型调用详情: {json.dumps(messages, ensure_ascii=False, indent=2)}")# 获取OpenAI的下一次响应response = self.openai.chat.completions.create(model="qwen3-32b",max_tokens=1000,messages=messages,extra_body={"enable_thinking": False,})final_response = response.choices[0].message.contentif final_response:final_text.append(final_response)else:# 没有工具调用,直接使用响应内容if response_message.content:final_text.append(response_message.content)return "\n".join(final_text)async def chat_loop(self):"""Run an interactive chat loop"""print("\nMCP Client Started!")print("Type your queries or 'quit' to exit.")while True:try:query = input("\nQuery: ").strip()if query.lower() == 'quit':breakresponse = await self.process_query(query)print("\n" + response)except Exception as e:print(f"\nError: {str(e)}")async def main():# if len(sys.argv) < 2:# print("Usage: uv run client.py <URL of SSE MCP server (i.e. http://localhost:8080/sse)>")# sys.exit(1)client = MCPClient()try:# await client.connect_to_sse_server(server_url=sys.argv[1])await client.connect_to_sse_server(server_url="http://localhost:8080/sse")await client.chat_loop()finally:print("Cleaning up...")# await client.cleanup()if __name__ == "__main__":asyncio.run(main())
说到最后
以上。