吴恩达MCP课程(5):mcp_chatbot_prompt_resource.py

前提条件:
1、吴恩达MCP课程(5):research_server_prompt_resource.py
2、server_config_prompt_resource.json文件

{"mcpServers": {"filesystem": {"command": "npx","args": ["--y","@modelcontextprotocol/server-filesystem","."]},"research": {"command": "uv","args": ["run", "research_server_prompt_resource.py"]},"fetch": {"command": "uvx","args": ["mcp-server-fetch"]}}
}

代码

原课程用的anthropic的,下面改成openai,并用千问模型做测试

from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from contextlib import AsyncExitStack
import json
import asyncio
import osload_dotenv()class MCP_ChatBot:def __init__(self):self.exit_stack = AsyncExitStack()self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))# Tools list required for OpenAI APIself.available_tools = []# Prompts list for quick display self.available_prompts = []# Sessions dict maps tool/prompt names or resource URIs to MCP client sessionsself.sessions = {}async def connect_to_server(self, server_name, server_config):try:server_params = StdioServerParameters(**server_config)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))read, write = stdio_transportsession = await self.exit_stack.enter_async_context(ClientSession(read, write))await session.initialize()try:# List available toolsresponse = await session.list_tools()for tool in response.tools:self.sessions[tool.name] = sessionself.available_tools.append({"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema}})# List available promptsprompts_response = await session.list_prompts()if prompts_response and prompts_response.prompts:for prompt in prompts_response.prompts:self.sessions[prompt.name] = sessionself.available_prompts.append({"name": prompt.name,"description": prompt.description,"arguments": prompt.arguments})# List available resourcesresources_response = await session.list_resources()if resources_response and resources_response.resources:for resource in resources_response.resources:resource_uri = str(resource.uri)self.sessions[resource_uri] = sessionexcept Exception as e:print(f"Error {e}")except Exception as e:print(f"Error connecting to {server_name}: {e}")async def connect_to_servers(self):try:with open("server_config_prompt_resource.json", "r") as file:data = json.load(file)servers = data.get("mcpServers", {})for server_name, server_config in servers.items():await self.connect_to_server(server_name, server_config)except Exception as e:print(f"Error loading server config: {e}")raiseasync def process_query(self, query):messages = [{"role":"user", "content":query}]while True:response = self.client.chat.completions.create(model="qwen-turbo",tools=self.available_tools,messages=messages)message = response.choices[0].message# 检查是否有普通文本内容if message.content:print(message.content)messages.append({"role": "assistant", "content": message.content})break# 检查是否有工具调用elif message.tool_calls:# 添加助手消息到历史messages.append({"role": "assistant", "content": None,"tool_calls": message.tool_calls})# 处理每个工具调用for tool_call in message.tool_calls:tool_id = tool_call.idtool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)print(f"Calling tool {tool_name} with args {tool_args}")# 获取session并调用工具session = self.sessions.get(tool_name)if not session:print(f"Tool '{tool_name}' not found.")breakresult = await session.call_tool(tool_name, arguments=tool_args)# 添加工具结果到消息历史messages.append({"role": "tool","tool_call_id": tool_id,"content": result.content})else:breakasync def get_resource(self, resource_uri):session = self.sessions.get(resource_uri)# Fallback for papers URIs - try any papers resource sessionif not session and resource_uri.startswith("papers://"):for uri, sess in self.sessions.items():if uri.startswith("papers://"):session = sessbreakif not session:print(f"Resource '{resource_uri}' not found.")returntry:result = await session.read_resource(uri=resource_uri)if result and result.contents:print(f"\nResource: {resource_uri}")print("Content:")print(result.contents[0].text)else:print("No content available.")except Exception as e:print(f"Error: {e}")async def list_prompts(self):"""List all available prompts."""if not self.available_prompts:print("No prompts available.")returnprint("\nAvailable prompts:")for prompt in self.available_prompts:print(f"- {prompt['name']}: {prompt['description']}")if prompt['arguments']:print(f"  Arguments:")for arg in prompt['arguments']:arg_name = arg.name if hasattr(arg, 'name') else arg.get('name', '')print(f"    - {arg_name}")async def execute_prompt(self, prompt_name, args):"""Execute a prompt with the given arguments."""session = self.sessions.get(prompt_name)if not session:print(f"Prompt '{prompt_name}' not found.")returntry:result = await session.get_prompt(prompt_name, arguments=args)if result and result.messages:prompt_content = result.messages[0].content# Extract text from content (handles different formats)if isinstance(prompt_content, str):text = prompt_contentelif hasattr(prompt_content, 'text'):text = prompt_content.textelse:# Handle list of content itemstext = " ".join(item.text if hasattr(item, 'text') else str(item) for item in prompt_content)print(f"\nExecuting prompt '{prompt_name}'...")await self.process_query(text)except Exception as e:print(f"Error: {e}")async def chat_loop(self):print("\nMCP Chatbot Started!")print("Type your queries or 'quit' to exit.")print("Use @folders to see available topics")print("Use @<topic> to search papers in that topic")print("Use /prompts to list available prompts")print("Use /prompt <name> <arg1=value1> to execute a prompt")while True:try:query = input("\nQuery: ").strip()if not query:continueif query.lower() == 'quit':break# Check for @resource syntax firstif query.startswith('@'):# Remove @ sign  topic = query[1:]if topic == "folders":resource_uri = "papers://folders"else:resource_uri = f"papers://{topic}"await self.get_resource(resource_uri)continue# Check for /command syntaxif query.startswith('/'):parts = query.split()command = parts[0].lower()if command == '/prompts':await self.list_prompts()elif command == '/prompt':if len(parts) < 2:print("Usage: /prompt <name> <arg1=value1> <arg2=value2>")continueprompt_name = parts[1]args = {}# Parse argumentsfor arg in parts[2:]:if '=' in arg:key, value = arg.split('=', 1)args[key] = valueawait self.execute_prompt(prompt_name, args)else:print(f"Unknown command: {command}")continueawait self.process_query(query)except Exception as e:print(f"\nError: {str(e)}")async def cleanup(self):await self.exit_stack.aclose()async def main():chatbot = MCP_ChatBot()try:await chatbot.connect_to_servers()await chatbot.chat_loop()finally:await chatbot.cleanup()if __name__ == "__main__":asyncio.run(main())

代码解释

这段代码实现了一个基于MCP(Model Context Protocol)的聊天机器人,能够连接到多个MCP服务器,使用它们提供的工具、提示和资源。下面是对代码的详细解释:

类结构与初始化

class MCP_ChatBot:def __init__(self):self.exit_stack = AsyncExitStack()self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))# Tools list required for OpenAI APIself.available_tools = []# Prompts list for quick display self.available_prompts = []# Sessions dict maps tool/prompt names or resource URIs to MCP client sessionsself.sessions = {}
  • AsyncExitStack:用于管理多个异步上下文管理器,确保资源正确释放
  • openai.OpenAI:创建OpenAI客户端,使用环境变量中的API密钥和基础URL
  • available_tools:存储可用工具列表,用于OpenAI API的工具调用功能
  • available_prompts:存储可用提示列表,用于快速显示
  • sessions:字典,将工具名称、提示名称或资源URI映射到对应的MCP客户端会话

服务器连接

async def connect_to_server(self, server_name, server_config):# 创建服务器参数、建立连接并初始化会话# 获取可用工具、提示和资源

这个方法负责:

  1. 使用提供的配置创建StdioServerParameters
  2. 通过stdio_client建立与服务器的连接
  3. 创建并初始化ClientSession
  4. 获取服务器提供的工具、提示和资源
  5. 将它们添加到相应的列表和字典中
async def connect_to_servers(self):# 从配置文件加载服务器信息并连接到每个服务器

这个方法从server_config_prompt_resource.json文件加载服务器配置,并为每个服务器调用connect_to_server方法。

查询处理

async def process_query(self, query):# 处理用户查询,使用OpenAI API和可用工具

这个方法是聊天机器人的核心,它:

  1. 创建包含用户查询的消息列表
  2. 调用OpenAI API,传递消息和可用工具
  3. 处理API的响应:
    • 如果有普通文本内容,打印并添加到消息历史
    • 如果有工具调用,执行每个工具调用并将结果添加到消息历史

工具调用的处理流程:

  1. 从响应中提取工具ID、名称和参数
  2. sessions字典中获取对应的会话
  3. 调用工具并获取结果
  4. 将结果添加到消息历史中

资源和提示管理

async def get_resource(self, resource_uri):# 获取并显示指定URI的资源内容

这个方法用于获取和显示资源内容,特别是论文资源。它会:

  1. sessions字典中获取对应的会话
  2. 对于以papers://开头的URI,如果找不到对应会话,会尝试使用任何处理papers的会话
  3. 调用read_resource方法获取资源内容
  4. 打印资源内容
async def list_prompts(self):# 列出所有可用的提示

这个方法列出所有可用的提示及其描述和参数。

async def execute_prompt(self, prompt_name, args):# 执行指定的提示,并处理结果

这个方法执行指定的提示,并将结果传递给process_query方法进行处理。它处理不同格式的提示内容,确保能够正确提取文本。

聊天循环

async def chat_loop(self):# 主聊天循环,处理用户输入

这个方法是聊天机器人的主循环,它:

  1. 打印欢迎信息和使用说明
  2. 循环接收用户输入
  3. 根据输入的不同格式执行不同操作:
    • 如果输入是quit,退出循环
    • 如果输入以@开头,调用get_resource方法获取资源
    • 如果输入以/开头,执行命令(如列出提示或执行提示)
    • 否则,调用process_query方法处理普通查询

资源清理

async def cleanup(self):# 清理资源

这个方法调用exit_stack.aclose()来清理所有资源。

主函数

async def main():chatbot = MCP_ChatBot()try:await chatbot.connect_to_servers()await chatbot.chat_loop()finally:await chatbot.cleanup()

主函数创建MCP_ChatBot实例,连接到服务器,运行聊天循环,并确保在结束时清理资源。

代码运行结果

uv run mcp_chatbot_prompt_resource.py

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/web/82767.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Linux】Linux基础指令3

1. which指令 功能&#xff1a;搜索系统指定的命令 2. whereis指令 功能&#xff1a;⽤于找到程序的源、⼆进制⽂件或⼿册 3. grep指令 语法&#xff1a; grep [ 选项 ] 搜寻字符串 ⽂件 功能&#xff1a;在⽂件中搜索字符串&#xff0c;将找到的⾏打印出来 常⽤选项&…

李沐《动手学深度学习》d2l安装教程

文章目录 最新回答报错提醒安装对应版本安装C工具和Windows SDK 最新回答 安装旧版本即可 pip install d2l0.17.0 WARNING: Ignoring invalid distribution -pencv-python (e:\python3.10\lib\site-packages) Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple C…

CMake 为 Debug 版本的库或可执行文件添加 d 后缀

在使用 CMake 构建项目时,我们经常需要区分 Debug 和 Release 构建版本。一个常见的做法是为 Debug 版本的库或可执行文件添加后缀(如 d),例如 libmylibd.so 或 myappd.exe。 本文将介绍几种在 CMake 中实现为 Debug 版本自动添加 d 后缀的方法。 方法一:使用 CMAKE_DEBU…

echarts树状图与vue3

父组件 - 使用RadialTreeView <template><div class"page-container"><div class"header-title">美国产品图谱 (ECharts Radial Tree)</div><RadialTreeView :chart-data"radialData" background-color"#E6E6F…

C# 日志管理功能代码

一、功能概述 本应用通过 AsyncFileLogger 类提供了灵活的日志控制功能&#xff0c;可在运行时通过 UI 界面启用或禁用日志记录。日志系统具有以下特点&#xff1a; 可控制开关&#xff1a;通过按钮随时启用或禁用日志&#xff0c;无需重启应用异步写入&#xff1a;日志记录采…

CSS 性能优化

目录 CSS 性能优化CSS 提高性能的方法1. 选择器优化1.1 选择器性能原则1.2 选择器优化示例 2. 重排&#xff08;Reflow&#xff09;和重绘&#xff08;Repaint&#xff09;优化2.1 重排和重绘的概念2.2 触发重排的操作2.3 触发重绘的操作2.4 优化重排和重绘的方法 3. 资源优化3…

【JJ斗地主-注册安全分析报告】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造成亏损无底洞 …

SON.stringify()和JSON.parse()之间的转换

1.JSON.stringify() 作用&#xff1a;将对象、数组转换成字符串 const obj {code: "500",message: "出错了", }; const jsonString JSON.stringify(obj); console.log(jsonString);//"{"code":"Mark Lee","message"…

MongoDB $type 操作符详解

MongoDB $type 操作符详解 引言 MongoDB 是一款流行的开源文档型数据库,它提供了丰富的查询操作符来满足不同的数据查询需求。在 MongoDB 中,$type 操作符是一个非常有用的查询操作符,它允许用户根据文档中字段的类型来查询文档。本文将详细介绍 MongoDB 的 $type 操作符,…

RagFlow优化代码解析(一)

引子 前文写到RagFlow的环境搭建&推理测试&#xff0c;感兴趣的童鞋可以移步&#xff08;RagFlow环境搭建&推理测试-CSDN博客&#xff09;。前文也写过RagFLow参数配置&测试的文档&#xff0c;详见&#xff08;RagFlow环境搭建&推理测试-CSDN博客&#xff09;…

永磁同步电机控制算法--模糊PI转速控制器

一、原理介绍 在常规的PID控制系统的基础上提出了一种模糊PID以及矢量变换方法相结合的控制系统&#xff0c;经过仿真分析对比证明&#xff1a; 模糊PID控制系统能够有效的提高永磁同步电机的转速响应速度&#xff0c;降低转矩脉动&#xff0c;增强了整体控制系统的抗干扰能力…

MySQL基本操作(续)

第3章&#xff1a;MySQL基本操作&#xff08;续&#xff09; 3.3 表操作 表是关系型数据库中存储数据的基本结构&#xff0c;由行和列组成。在MySQL中&#xff0c;表操作包括创建表、查看表结构、修改表和删除表等。本节将详细介绍这些操作。 3.3.1 创建表 在MySQL中&#…

探索未知惊喜,盲盒抽卡机小程序系统开发新启航

在消费市场不断追求新鲜感与惊喜体验的当下&#xff0c;盲盒抽卡机以其独特的魅力&#xff0c;迅速成为众多消费者热衷的娱乐与消费方式。我们紧跟这一潮流趋势&#xff0c;专注于盲盒抽卡机小程序系统的开发&#xff0c;致力于为商家和用户打造一个充满趣味与惊喜的数字化平台…

89.实现添加收藏的功能的后端实现

实现完查看收藏列表之后&#xff0c;实现的是添加收藏的功能 我的设想是&#xff1a;在对话界面中&#xff0c;如果用户认为AI的回答非常好&#xff0c;可以通过点击该回答对应的气泡中的图标&#xff0c;对该内容进行添加 所以后端实现为&#xff1a; service类中添加&…

OD 算法题 B卷【猴子吃桃】

文章目录 猴子吃桃 猴子吃桃 猴子喜欢吃桃&#xff0c;桃园有N棵桃树&#xff0c;第i棵桃树上有Ni个桃&#xff0c;看守将在H(>N)小时后回来&#xff1b;猴子可以决定吃桃的速度K(个/小时)&#xff0c;每个小时他会选择一棵桃树&#xff0c;从中吃掉K个桃&#xff0c;如果这…

ubuntu 端口复用

需求描述&#xff1a;复用服务器的 80端口&#xff0c;同时处理 ssh 和 http 请求&#xff0c;也就是 ssh 连接和 http 访问服务器的时候都可以指定 80 端口&#xff0c;然后服务器可以正确分发请求给 ssh 或者 http。 此时&#xff0c;ssh 监听的端口为 22&#xff0c;而 htt…

Hive中ORC存储格式的优化方法

优化Hive中的ORC(Optimized Row Columnar)存储格式可显著提升查询性能、降低存储成本。以下是详细的优化方法,涵盖参数配置、数据组织、写入优化及监控调优等维度: 一、ORC核心参数优化 1. 存储与压缩参数 SET orc.block.size=268435456; -- 块大小(默认256MB)…

Vim 设置搜索高亮底色

在 Vim 中&#xff0c;默认搜索命中会高亮显示&#xff0c;方便用户快速定位关键字。但有些用户希望自定义搜索匹配的底色或前景色&#xff0c;以适应不同的配色方案或提高可读性。本文将详细介绍如何修改 Vim 的搜索高亮颜色。 一、Vim 搜索高亮机制 Vim 用内置的高亮组&…

【计算机网络】非阻塞IO——poll实现多路转接

&#x1f525;个人主页&#x1f525;&#xff1a;孤寂大仙V &#x1f308;收录专栏&#x1f308;&#xff1a;计算机网络 &#x1f339;往期回顾&#x1f339;&#xff1a;【计算机网络】非阻塞IO——select实现多路转接 &#x1f516;流水不争&#xff0c;争的是滔滔不息 一、…

vscode使用系列之快速生成html模板

一.欢迎来到我的酒馆 vscode&#xff0c;yyds! 目录 一.欢迎来到我的酒馆二.vscode下载安装1.关于vscode你需要知道2.开始下载安装 三.vscode快速创建html模板 二.vscode下载安装 1.关于vscode你需要知道 Q&#xff1a;为什么使用vscode? A&#xff1a;使用vscode写…