吴恩达MCP课程(4):connect_server_mcp_chatbot

目录

    • 完整代码
    • 代码解释
      • 1. 导入和初始化
      • 2. 类型定义
      • 3. MCP_ChatBot 类初始化
      • 4. 查询处理 (process_query)
      • 5. 服务器连接管理
      • 6. 核心特性总结
    • 示例

完整代码

原课程代码是用Anthropic写的,下面代码是用OpenAI改写的,模型则用阿里巴巴的模型做测试
.env 文件为:

OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
OPENAI_API_BASE=https://dashscope.aliyuncs.com/compatible-mode/v1

另外,课程代码只是单轮对话,下面代码修改为多轮对话,更适合千问模型的调用方式

from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from typing import List,TypedDict
from contextlib import AsyncExitStack
from typing import Dict
import asyncio
import json
import osload_dotenv()class ToolDefinition(TypedDict):name: strdescription: strinput_schema: dictclass MCP_ChatBot:def __init__(self):# Initialize session and client objectsself.sessions: List[ClientSession] = []  # newself.exit_stack = AsyncExitStack()self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))self.available_tools: List[ToolDefinition] = []  # newself.tool_to_session: Dict[str, ClientSession] = {}self.messages = []async def process_query(self, query):self.messages.append({'role':'user', 'content':query})response = self.client.chat.completions.create(model='qwen-turbo',# max_tokens=2024,tools=self.available_tools,messages=self.messages )process_query = Truewhile process_query:# 获取助手的回复message = response.choices[0].message# 检查是否有普通文本内容if message.content:print(message.content)process_query = False# 检查是否有工具调用elif message.tool_calls:# 添加助手消息到历史self.messages.append({"role": "assistant", "content": None,"tool_calls": message.tool_calls})# 处理每个工具调用for tool_call in message.tool_calls:tool_id = tool_call.idtool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)print(f"Calling tool {tool_name} with args {tool_args}")# 执行工具调用session = self.tool_to_session[tool_name]result = await session.call_tool(tool_name, arguments=tool_args)# 添加工具结果到消息历史self.messages.append({"role": "tool","tool_call_id": tool_id,"content": result.content})# 获取下一个回复response = self.client.chat.completions.create(model='qwen-turbo',# max_tokens=2024,tools=self.available_tools,messages=self.messages )self.messages.append({"role": "assistant", "content": response.choices[0].message.content})# 如果只有文本回复,则结束处理if response.choices[0].message.content and not response.choices[0].message.tool_calls:print(response.choices[0].message.content)process_query = Falseasync def chat_loop(self):"""Run an interactive chat loop"""print("\nMCP Chatbot Started!")print("Type your queries or 'quit' to exit.")while True:try:query = input("\nQuery: ").strip()if query.lower() == 'quit':breakawait self.process_query(query)print("\n")except Exception as e:print(f"\nError: {str(e)}")async def connect_to_server(self, server_name: str, server_config: dict) -> None:"""Connect to a single MCP server."""try:server_params = StdioServerParameters(**server_config)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))read, write = stdio_transportsession = await self.exit_stack.enter_async_context(ClientSession(read, write))await session.initialize()self.sessions.append(session)# List available tools for this sessionresponse = await session.list_tools()tools = response.toolsprint(f"\nConnected to {server_name} with tools:", [t.name for t in tools])for tool in tools:  # newself.tool_to_session[tool.name] = sessionself.available_tools.append({"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema}})except Exception as e:print(f"Failed to connect to {server_name}: {e}")async def connect_to_servers(self):  # new"""Connect to all configured MCP servers."""try:with open("server_config.json", "r") as file:data = json.load(file)servers = data.get("mcpServers", {})for server_name, server_config in servers.items():await self.connect_to_server(server_name, server_config)except Exception as e:print(f"Error loading server configuration: {e}")raise           async def clenup(self):await self.exit_stack.aclose()async def main():chatbot = MCP_ChatBot()try:await chatbot.connect_to_servers()await chatbot.chat_loop()finally:await chatbot.clenup()if __name__ == "__main__":asyncio.run(main())"""
1、Fetch the content of this website: https://modelcontextprotocol.io/docs/concepts/architecture. 
2、save the content in the file "mcp_summary.md"
"""

代码解释

1. 导入和初始化

from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from typing import List,TypedDict
from contextlib import AsyncExitStack
from typing import Dict
import asyncio
import json
import osload_dotenv()
  • 导入必要的库,包括OpenAI客户端、MCP协议相关模块、异步处理模块等
  • load_dotenv() 加载环境变量配置

2. 类型定义

class ToolDefinition(TypedDict):name: strdescription: strinput_schema: dict

定义工具的类型结构,用于类型提示。

3. MCP_ChatBot 类初始化

class MCP_ChatBot:def __init__(self):self.sessions: List[ClientSession] = []  # 存储多个MCP会话self.exit_stack = AsyncExitStack()  # 管理异步资源self.client = openai.OpenAI(  # OpenAI客户端api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))self.available_tools: List[ToolDefinition] = []  # 可用工具列表self.tool_to_session: Dict[str, ClientSession] = {}  # 工具名到会话的映射self.messages = []  # 对话历史

关键特性:

  • 多会话支持sessions 列表存储多个MCP服务器会话
  • 工具映射tool_to_session 将工具名映射到对应的会话,实现工具路由
  • 资源管理:使用 AsyncExitStack 管理异步资源的生命周期

4. 查询处理 (process_query)

async def process_query(self, query):self.messages.append({'role':'user', 'content':query})response = self.client.chat.completions.create(model='qwen-turbo',tools=self.available_tools,messages=self.messages )

核心处理逻辑:

  1. 消息循环处理:使用 while process_query 循环处理多轮对话
  2. 工具调用处理:检测并执行工具调用,通过 tool_to_session 路由到正确的MCP服务器
  3. 结果整合:将工具执行结果添加到对话历史中

5. 服务器连接管理

async def connect_to_server(self, server_name: str, server_config: dict) -> None:# 建立单个服务器连接server_params = StdioServerParameters(**server_config)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))# ... 获取工具并建立映射for tool in tools:self.tool_to_session[tool.name] = sessionself.available_tools.append({"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema}})
async def connect_to_servers(self):# 连接所有配置的服务器with open("server_config.json", "r") as file:data = json.load(file)servers = data.get("mcpServers", {})for server_name, server_config in servers.items():await self.connect_to_server(server_name, server_config)

6. 核心特性总结

多服务器支持

  • 可以同时连接多个MCP服务器
  • 每个服务器的工具都被统一管理
  • 通过工具名自动路由到正确的服务器

OpenAI格式兼容

  • 工具定义使用OpenAI的函数调用格式
  • 支持完整的工具调用流程

异步处理

  • 全异步设计,支持并发处理
  • 使用 AsyncExitStack 管理资源生命周期

配置化管理

  • 通过 server_config.json 配置多个服务器
  • 支持动态加载服务器配置

这个实现相比单服务器版本的主要优势是可以整合多个不同功能的MCP服务器,为用户提供更丰富的工具集合。

示例

uv run connect_server_map_chatbot.py

请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/news/907751.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++内存学习

引入 在实例化对象时,不管是编译器还是我们自己,会使用构造函数给成员变量一个合适的初始值。 但是经过构造函数之后,我们还不能将其称为成员变量的初始化: 构造函数中的语句只能称为赋初值,而不能称作初始化 因为初…

MySQL 大战 PostgreSQL

一、底层架构对比 ​​维度​​​​MySQL​​​​PostgreSQL​​​​存储引擎​​多引擎支持(InnoDB、MyISAM等)单一存储引擎(支持扩展如Zheap、Zedstore)​​事务实现​​基于UNDO日志的MVCC基于堆表(Heap)的MVCC​​锁机制​​…

基于FPGA的二叉决策树cart算法verilog实现,训练环节采用MATLAB仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 (完整程序运行后无水印) MATLAB训练结果 上述决策树判决条件&#xff1a; 分类的决策树1 if x21<17191.5 then node 2 elseif x21>17191…

【RAG】RAG综述|一文了解RAG|从零开始(下)

文章目录 5. RAG的架构5.1 Naive RAG5.2 Advanced RAG5.2.1 检索前处理和数据索引技术5.2.2 知识分片技术5.2.3 分层索引5.2.4 检索技术5.2.4.1 优化用户查询5.2.4.2 通过假想文档嵌入修复查询和文档不对称5.2.4.3 Routing5.2.4.5 自查询检索5.2.4.6 混合搜索5.2.4.7 图检索5.2…

山东大学软件学院项目实训-基于大模型的模拟面试系统-面试官和面试记录的分享功能(2)

本文记录在发布文章时&#xff0c;可以添加自己创建的面试官和面试记录到文章中这一功能的实现。 前端 首先是在原本的界面的底部添加了两个多选框&#xff08;后期需要美化调整&#xff09; 实现的代码&#xff1a; <el-col style"margin-top: 1rem;"><e…

FPGA纯verilog实现MIPI-DSI视频编码输出,提供工程源码和技术支持

目录 1、前言工程概述免责声明 2、相关方案推荐我已有的所有工程源码总目录----方便你快速找到自己喜欢的项目我这里已有的 MIPI 编解码方案 3、设计思路框架工程设计原理框图FPGA内部彩条RGB数据位宽转换RGB数据缓存MIPI-DSI协议层编码MIPI-DPHY物理层串化MIPI-LVDS显示屏工程…

LXQt修改开始菜单高亮

开始菜单红色高亮很难看 mkdir -p ~/.local/share/lxqt/palettes/ mkdir -p ~/.local/share/lxqt/themes/ cp /usr/share/lxqt/palettes/Dark ~/.local/share/lxqt/palettes/Darker cp -p /usr/share/lxqt/themes/dark ~/.local/share/lxqt/themes/darker lxqt-panel.qss L…

DeepSeek-R1-0528-Qwen3-8B 本地ollama离线运行使用和llamafactory lora微调

参考: https://huggingface.co/deepseek-ai/DeepSeek-R1-0528-Qwen3-8B 量化版本: https://huggingface.co/unsloth/DeepSeek-R1-0528-Qwen3-8B-GGUF https://docs.unsloth.ai/basics/deepseek-r1-0528-how-to-run-locally 1、ollama运行 升级ollama版本到0.9.0 支持直接…

vue3 + WebSocket + Node 搭建前后端分离项目 开箱即用

[TOC](vue3 WebSocket Node 搭建前后端分离项目) 开箱即用 前言 top1&#xff1a;vue3.5搭建前端H5 top2&#xff1a;Node.js koa搭建后端服务接口 top3&#xff1a;WebSocket 长连接实现用户在线聊天 top4&#xff1a;接口实现模块化 Mysql 自定义 top5&#xff1a;文件上…

Vue 前端代码规范实战:ESLint v9、Prettier 与 Stylelint 集成指南与最佳实践

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall ︱vue3-element-admin︱youlai-boot︱vue-uniapp-template &#x1f33a; 仓库主页&#xff1a; GitCode︱ Gitee ︱ Github &#x1f496; 欢迎点赞 &#x1f44d; 收藏 ⭐评论 …

docker docker-ce docker.io

Ubuntu安装 ​​更新软件包列表​​ 首先确保软件包列表是最新的&#xff1a; sudo apt-get update 使用正确的卸载命令​​ 替换 docker-engine 为 docker-ce 或 docker.io&#xff1a; sudo apt-get remove docker docker-ce docker.io containerd runc ​​检查已安装的 Do…

C++ 初阶 | 类和对象易错知识点(下)

目录 0.引言 1.初始化列表 2.static 静态成员变量&#xff1a; 静态成员函数&#xff1a; 3.友元函数 4.内部类 定义&#xff1a; 特点&#xff1a; 应用&#xff1a; 5.优化写法 6.例题 求和12...n (不能用for/while/if/else等关键字) 7.总结 0.引言 今天&…

使用yocto搭建qemuarm64环境

环境 yocto下载 # 源码下载 git clone git://git.yoctoproject.org/poky git reset --hard b223b6d533a6d617134c1c5bec8ed31657dd1268 构建 # 编译镜像 export MACHINE"qemuarm64" . oe-init-build-env bitbake core-image-full-cmdline 运行 # 跑虚拟机 export …

AWS WebRTC:获取ICE服务地址(part 3):STUN服务和TURN服务的作用

STUN服务和TURN服务的作用&#xff1a; 服务全称作用是否中继流量适用场景STUNSession Traversal Utilities for NAT 协助设备发现自己的公网地址&#xff08;srflx candidate&#xff09; ❌ 不中继&#xff0c;仅辅助NAT 穿透成功时使用TURNTraversal Using Relays around N…

分析XSSstrike源码

#用于学习web安全自动化工具# 我能收获什么&#xff1f; 1.XSS漏洞检测机制 学习如何构造和发送XSS payload如何识别响应中的回显&#xff0c;WAF&#xff0c;过滤规则等如何使用词典&#xff0c;编码策略&#xff0c;上下文探测等绕过过滤器 2.Python安全工具开发技巧 使…

npm run build 报错:Some chunks are larger than 500 KB after minification

当我们的 Vue 项目太大&#xff0c;使用 npm run build 打包项目的时候&#xff0c;就有可能会遇到以下报错&#xff1a; (!) Some chunks are larger than 500 kB after minification. Consider: - Using dynamic import() to code-split the application - Use build.rollup…

【LLM相关知识点】关于LLM项目实施流程的简单整理(一)

【LLM相关知识点】关于LLM项目实施流程的简单整理&#xff08;一&#xff09; 文章目录 【LLM相关知识点】关于LLM项目实施流程的简单整理&#xff08;一&#xff09;零、学习计划梳理&#xff1a;结合ChatGPT从零开始学习LLM & 多模态大模型一、大模型相关应用场景和头部企…

海上石油钻井平台人员安全管控解决方案

一、行业挑战与需求分析 海上钻井平台面临复杂环境风险&#xff08;如易燃易爆、金属干扰、极端气象&#xff09;和人员管理难题&#xff08;如定位模糊、应急响应延迟&#xff09;。传统RFID或蓝牙定位技术存在精度不足&#xff08;1-5米&#xff09;、抗干扰能力差等问题&am…

@Docker Compose 部署 Pushgateway

文章目录 Docker Compose 部署 Pushgateway1. 目的2. 适用范围3. 先决条件4. 部署步骤4.1 创建项目目录4.2 创建 docker-compose.yml 文件4.3 启动 Pushgateway 服务4.4 验证服务运行状态4.5 测试 Pushgateway 访问 5. 配置 Prometheus 采集 Pushgateway 数据6. 日常维护6.1 查…

项目 react+taro 编写的微信 小程序,什么命令,可以减少console的显示

在 Taro 项目中&#xff0c;为了减少 console 的显示&#xff08;例如 console.log、console.info 等&#xff09;&#xff0c;可以通过配置 terser-webpack-plugin 来移除生产环境中的 console 调用。 配置步骤&#xff1a; 修改 index.js 文件 在 mini.webpackChain 中添加 …