dameng-mcp-server达梦MCP服务

达梦数据库手写MCP服务

文件名称

server.py

源代码

参考mysql-mcp-server写的dameng数据库版本的
点击访问mysql-mcp-server的github仓库
在这里插入图片描述

mcp服务端

import asyncio
import logging
import os
import sys
from dmPython import connect
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
from pydantic import AnyUrl# Configure logging
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("dameng_mcp_server")def get_db_config():"""Get database configuration from environment variables."""config = {"user": os.getenv("DAMENG_USER","SYSDBA"),"password": os.getenv("DAMENG_PASSWORD","SYSDBA"),"server": os.getenv("DAMENG_HOST", "localhost"),"port": 5236,"schema": os.getenv("DAMENG_DATABASE","")}if not all([config["user"], config["password"]]):logger.error("Missing required database configuration. Please check environment variables:")logger.error("DAMENG_USER and DAMENG_PASSWORD are required")raise ValueError("Missing required database configuration")return config# Initialize server
app = Server("dameng_mcp_server")@app.list_resources()
async def list_resources() -> list[Resource]:"""List Dameng tables as resources."""config = get_db_config()try:with connect(**config) as conn:with conn.cursor() as cursor:cursor.execute("SELECT TABLE_NAME FROM ALL_TABLES")tables = cursor.fetchall()logger.info(f"Found tables: {tables}")resources = []for table in tables:resources.append(Resource(uri=f"dameng://{table[0]}/data",name=f"Table: {table[0]}",mimeType="text/plain",description=f"Data in table: {table[0]}"))return resourcesexcept Exception as e:logger.error(f"Failed to list resources: {str(e)}")return []@app.read_resource()
async def read_resource(uri: AnyUrl) -> str:"""Read table contents."""config = get_db_config()uri_str = str(uri)logger.info(f"Reading resource: {uri_str}")if not uri_str.startswith("dm://"):raise ValueError(f"Invalid URI scheme: {uri_str}")parts = uri_str[9:].split('/')table = parts[0]try:with connect(**config) as conn:with conn.cursor() as cursor:cursor.execute(f"SELECT * FROM {table} LIMIT 100")columns = [desc[0] for desc in cursor.description]rows = cursor.fetchall()result = [",".join(map(str, row)) for row in rows]return "\n".join([",".join(columns)] + result)except Exception as e:logger.error(f"Schema error reading resource {uri}: {str(e)}")raise RuntimeError(f"Schema error: {str(e)}")@app.list_tools()
async def list_tools() -> list[Tool]:"""List available Dameng tools."""logger.info("Listing tools...")return [Tool(name="execute_sql",description="Execute an SQL query on the Dameng server",inputSchema={"type": "object","properties": {"query": {"type": "string","description": "The SQL query to execute"}},"required": ["query"]})]@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:"""Execute SQL commands."""config = get_db_config()logger.info(f"Calling tool: {name} with arguments: {arguments}")if name != "execute_sql":raise ValueError(f"Unknown tool: {name}")query = arguments.get("query")if not query:raise ValueError("Query is required")try:with connect(**config) as conn:with conn.cursor() as cursor:cursor.execute(query)# Handle all other queries that return result sets (SELECT, SHOW, DESCRIBE etc.)if cursor.description is not None:columns = [desc[0] for desc in cursor.description]try:rows = cursor.fetchall()result = [",".join(map(str, row)) for row in rows]return [TextContent(type="text", text="\n".join([",".join(columns)] + result))]except Exception as e:logger.warning(f"Error fetching results: {str(e)}")return [TextContent(type="text", text=f"Query executed but error fetching results: {str(e)}")]# Non-SELECT querieselse:conn.commit()return [TextContent(type="text", text=f"Query executed successfully. Rows affected: {cursor.rowcount}")]except Exception as e:logger.error(f"Error executing SQL '{query}': {e}")return [TextContent(type="text", text=f"Error executing query: {str(e)}")]async def main():"""Main entry point to run the MCP server."""from mcp.server.stdio import stdio_server# Add additional debug outputprint("Starting Dameng MCP server with config:", file=sys.stderr)config = get_db_config()print(f"Server: {config['server']}", file=sys.stderr)print(f"Port: {config['port']}", file=sys.stderr)print(f"User: {config['user']}", file=sys.stderr)print(f"Schema: {config['schema']}", file=sys.stderr)logger.info("Starting Dameng MCP server...")logger.info(f"Schema config: {config['server']}/{config['schema']} as {config['user']}")async with stdio_server() as (read_stream, write_stream):try:await app.run(read_stream,write_stream,app.create_initialization_options())except Exception as e:logger.error(f"Server error: {str(e)}", exc_info=True)raiseif __name__ == "__main__":asyncio.run(main())

客户端

这段引用尚硅谷宋红康老师手写的客户端

import asyncio
import os
import json
from typing import Optional, List
from contextlib import AsyncExitStack
from datetime import datetime
import re
from openai import OpenAI
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_clientload_dotenv()class MCPClient:def __init__(self):self.exit_stack = AsyncExitStack()self.openai_api_key = os.getenv("DASHSCOPE_API_KEY")self.base_url = os.getenv("BASE_URL")self.model = os.getenv("MODEL")if not self.openai_api_key:raise ValueError("❌ 未找到 OpenAI API Key,请在 .env 文件中设置 DASHSCOPE_API_KEY")self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)self.session: Optional[ClientSession] = Noneasync def connect_to_server(self, server_script_path: str):# 对服务器脚本进行判断,只允许是 .py 或 .jsis_python = server_script_path.endswith('.py')is_js = server_script_path.endswith('.js')if not (is_python or is_js):raise ValueError("服务器脚本必须是 .py 或 .js 文件")# 确定启动命令,.py 用 python,.js 用 nodecommand = "python" if is_python else "node"# 构造 MCP 所需的服务器参数,包含启动命令、脚本路径参数、环境变量(为 None 表示默认)server_params = StdioServerParameters(command=command, args=[server_script_path], env=None)# 启动 MCP 工具服务进程(并建立 stdio 通信)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))# 拆包通信通道,读取服务端返回的数据,并向服务端发送请求self.stdio, self.write = stdio_transport# 创建 MCP 客户端会话对象self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))# 初始化会话await self.session.initialize()# 获取工具列表并打印response = await self.session.list_tools()tools = response.toolsprint("\n已连接到服务器,支持以下工具:", [tool.name for tool in tools])async def process_query(self, query: str) -> str:# 准备初始消息和获取工具列表messages = [{"role": "user", "content": query}]response = await self.session.list_tools()available_tools = [{"type": "function","function": {"name": tool.name,"description": tool.description,"input_schema": tool.inputSchema}} for tool in response.tools]# 提取问题的关键词,对文件名进行生成。# 在接收到用户提问后就应该生成出最后输出的 md 文档的文件名,# 因为导出时若再生成文件名会导致部分组件无法识别该名称。keyword_match = re.search(r'(关于|分析|查询|搜索|查看)([^的\s,。、?\n]+)', query)keyword = keyword_match.group(2) if keyword_match else "分析对象"safe_keyword = re.sub(r'[\\/:*?"<>|]', '', keyword)[:20]timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')md_filename = f"sentiment_{safe_keyword}_{timestamp}.md"md_path = os.path.join("./sentiment_reports", md_filename)# 更新查询,将文件名添加到原始查询中,使大模型在调用工具链时可以识别到该信息# 然后调用 plan_tool_usage 获取工具调用计划query = query.strip() + f" [md_filename={md_filename}] [md_path={md_path}]"messages = [{"role": "user", "content": query}]tool_plan = await self.plan_tool_usage(query, available_tools)tool_outputs = {}messages = [{"role": "user", "content": query}]# 依次执行工具调用,并收集结果for step in tool_plan:tool_name = step["name"]tool_args = step["arguments"]for key, val in tool_args.items():if isinstance(val, str) and val.startswith("{{") and val.endswith("}}"):ref_key = val.strip("{} ")resolved_val = tool_outputs.get(ref_key, val)tool_args[key] = resolved_val# 注入统一的文件名或路径(用于分析和邮件)if tool_name == "analyze_sentiment" and "filename" not in tool_args:tool_args["filename"] = md_filenameif tool_name == "send_email_with_attachment" and "attachment_path" not in tool_args:tool_args["attachment_path"] = md_pathresult = await self.session.call_tool(tool_name, tool_args)tool_outputs[tool_name] = result.content[0].textmessages.append({"role": "tool","tool_call_id": tool_name,"content": result.content[0].text})# 调用大模型生成回复信息,并输出保存结果final_response = self.client.chat.completions.create(model=self.model,messages=messages)final_output = final_response.choices[0].message.content# 对辅助函数进行定义,目的是把文本清理成合法的文件名def clean_filename(text: str) -> str:text = text.strip()text = re.sub(r'[\\/:*?\"<>|]', '', text)return text[:50]# 使用清理函数处理用户查询,生成用于文件命名的前缀,并添加时间戳、设置输出目录# 最后构建出完整的文件路径用于保存记录safe_filename = clean_filename(query)timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')filename = f"{safe_filename}_{timestamp}.txt"output_dir = "./llm_outputs"os.makedirs(output_dir, exist_ok=True)file_path = os.path.join(output_dir, filename)# 将对话内容写入 md 文档,其中包含用户的原始提问以及模型的最终回复结果with open(file_path, "w", encoding="utf-8") as f:f.write(f"🗣 用户提问:{query}\n\n")f.write(f"🤖 模型回复:\n{final_output}\n")print(f"📄 对话记录已保存为:{file_path}")return final_outputasync def chat_loop(self):# 初始化提示信息print("\n🤖 MCP 客户端已启动!输入 'quit' 退出")# 进入主循环中等待用户输入while True:try:query = input("\n你: ").strip()if query.lower() == 'quit':break# 处理用户的提问,并返回结果response = await self.process_query(query)print(f"\n🤖 AI: {response}")except Exception as e:print(f"\n⚠️ 发生错误: {str(e)}")async def plan_tool_usage(self, query: str, tools: List[dict]) -> List[dict]:# 构造系统提示词 system_prompt。# 将所有可用工具组织为文本列表插入提示中,并明确指出工具名,# 限定返回格式是 JSON,防止其输出错误格式的数据。print("\n📤 提交给大模型的工具定义:")print(json.dumps(tools, ensure_ascii=False, indent=2))tool_list_text = "\n".join([f"- {tool['function']['name']}: {tool['function']['description']}"for tool in tools])system_prompt = {"role": "system","content": ("你是一个智能任务规划助手,用户会给出一句自然语言请求。\n""你只能从以下工具中选择(严格使用工具名称):\n"f"{tool_list_text}\n""如果多个工具需要串联,后续步骤中可以使用 {{上一步工具名}} 占位。\n""返回格式:JSON 数组,每个对象包含 name 和 arguments 字段。\n""不要返回自然语言,不要使用未列出的工具名。")}# 构造对话上下文并调用模型。# 将系统提示和用户的自然语言一起作为消息输入,并选用当前的模型。planning_messages = [system_prompt,{"role": "user", "content": query}]response = self.client.chat.completions.create(model=self.model,messages=planning_messages,tools=tools,tool_choice="none")# 提取出模型返回的 JSON 内容content = response.choices[0].message.content.strip()match = re.search(r"```(?:json)?\\s*([\s\S]+?)\\s*```", content)if match:json_text = match.group(1)else:json_text = content# 在解析 JSON 之后返回调用计划try:plan = json.loads(json_text)return plan if isinstance(plan, list) else []except Exception as e:print(f"❌ 工具调用链规划失败: {e}\n原始返回: {content}")return []async def cleanup(self):await self.exit_stack.aclose()async def main():server_script_path = "D:\\soft\\PythonProject\\mcp-project\\dmserver.py"# server_script_path = "D:\\soft\\SoftProject\\mysql_mcp_server\\src\mysql_mcp_server\\server.py"# server_script_path = "D:\\soft\\SoftProject\\mysql_mcp_server\\src\\dameng\\server.py"client = MCPClient()try:await client.connect_to_server(server_script_path)await client.chat_loop()finally:await client.cleanup()if __name__ == "__main__":asyncio.run(main())

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/77964.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/77964.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IntelliJ IDEA 内存优化

优化插件使用 1&#xff09;卸载不必要插件&#xff1a;进入 “设置”→“插件”→“已安装”&#xff0c;查看并卸载不常用的插件&#xff0c;如代码分析、代码覆盖率等不常用的插件&#xff0c;只保留必要的插件。2&#xff09;定期清理与更新插件&#xff1a;定期检查插件更…

TCL中环深化全球布局,技术迭代应对行业调整

在全球能源转型加速与光伏行业深度调整的双重背景下,TCL中环凭借前瞻性的全球化布局与核心技术突破,持续巩固行业领先地位。2024年年报显示,报告期内实现营业收入284.19亿元,净利润为-108.06亿元。面对行业周期性亏损与产业链价格非理性竞争带来的业绩压力,公司自2024年下半年起…

dubbo 异步化实践

DubboService public class AsyncOrderFacadeImpl implements AsyncOrderFacade {private Logger logger LoggerFactory.getLogger(AsyncOrderFacadeImpl.class);// 构建线程池ThreadPoolExecutor threadPoolExecutor new ThreadPoolExecutor(1000, 1000, 10, TimeUnit.SECOND…

CSS3布局方式介绍

CSS3布局方式介绍 CSS3布局(Layout)系统是现代网页设计中用于构建页面结构和控制元素排列的一组强大工具。CSS3提供了多种布局方式,每种方式都有其适用场景,其中最常用的是Flexbox和CSS Grid。 先看传统上几种布局方式,再较详细的介绍现代布局方式Flexbox和CSS Grid。 传…

MoonBit支持国产芯片开发--性能媲美C

MoonBit支持国产芯片开发–性能媲美C 在 ESP32-C3 上实现生命游戏 过去&#xff0c;我们曾在文章《硬件实现&#xff1a;在ESP32-C6单片机上运行MoonBit WASM-4小游戏》中&#xff0c;展示了如何通过 WebAssembly (WASM) 将 MoonBit 程序移植到物理硬件&#xff0c;初步探索其…

【RAG 框架部署】LangChain-Chatchat (原 Langchain-ChatGLM) + Ollama

目录 前言 一、什么是RAG&#xff1f; 二、环境准备和Ollama搭建 1、conda虚拟环境配置 2、Ollama搭建 三、LangChain-Chatchat搭建 1、框架安装 2、文件配置 3、初始化知识库 4、启动Langchan-Chatchat 前言 由于LangChain-Chatchat的 0.3.0 版本已修改为支持不同模…

python对接马来西亚股票完整代码

StockTV全球股票数据API对接实战&#xff1a;构建智能金融分析系统 一、StockTV API核心功能解析 StockTV作为覆盖200国家证券市场的数据平台&#xff0c;其API提供三大核心模块的对接能力&#xff1a; 市场列表查询 - 获取指定国家的股票基础数据个股详情检索 - 查询实时行情…

普通IT的股票交易成长史--20250430晚

声明&#xff1a;本文章的内容只是自己学习的总结&#xff0c;不构成投资建议。文中观点基本来自yt站Andylee&#xff0c;美股Alpha姐&#xff0c;综合自己的观点得出。感谢他们的无私分享。 送给自己的话&#xff1a; 仓位就是生命&#xff0c;绝对不能满仓&#xff01;&…

windows 下 oracle 数据库的备份与还原

1、备份 创建备份出来的文件存放的位置。 创建目录对象&#xff0c;在数据库中创建一个目录对象&#xff0c;该对象指向文件系统中用于存储导出文件的实际目录&#xff08; sql 命令&#xff0c;可以在 plsql 中执行&#xff09;。 -- 创建目录对象&#xff0c;\D:\Oracle19c\…

基于单片机的智能药盒系统

标题:基于单片机的智能药盒系统 内容:1.摘要 本文聚焦于基于单片机的智能药盒系统。背景方面&#xff0c;随着人口老龄化加剧&#xff0c;老年人按时准确服药问题愈发凸显&#xff0c;同时现代快节奏生活也使人们容易遗忘服药时间。目的是设计并实现一个能帮助人们按时、按量服…

“100% 成功的 PyTorch CUDA GPU 支持” 安装攻略

#工作记录 一、总述 在深度学习领域&#xff0c;PyTorch 凭借其灵活性和强大的功能&#xff0c;成为了众多开发者和研究者的首选框架。而 CUDA GPU 支持能够显著加速 PyTorch 的计算过程&#xff0c;大幅提升训练和推理效率。然而&#xff0c;安装带有 CUDA GPU 支持的 PyTor…

图数据库榜单网站

图数据库榜单 https://db-engines.com/en/ranking/graphdbms点击跳转

Android Jetpack Compose 面试题大全(2025最新整理)

基础概念 什么是 Jetpack Compose&#xff1f;它与传统 Android UI 开发有何不同&#xff1f; Compose 是 Android 的现代声明式 UI 工具包&#xff0c;使用 Kotlin 编写不同于传统的基于 View 和 XML 的 imperative 方式&#xff0c;Compose 使用声明式范式主要区别&#xff1…

添加了addResourceHandlers 但没用

B站黑马的视频 public class WebMvcConfig extends WebMvcConfigurationSupport { /** * 设置静态资源映射 * param registry */ Override protected void addResourceHandlers(ResourceHandlerRegistry registry) { log.info("开始进…

STM32实现simpleFOC控制无刷电机

一、FOC基础知识学习 使用simpleFOC控制无刷电机前&#xff0c;需要大概了解一下相关知识&#xff0c;包括力矩控制、速度控制、位置控制的原理和它们之间的联系。 推荐学习资料&#xff1a; 教你写一个比SimpleFOC更好的电机库_哔哩哔哩_bilibili 《灯哥手把手教你写FOC算…

【数据结构】快慢指针

一、快慢指针的原理 定义&#xff1a; 快指针&#xff1a;每次移动两步 慢指针&#xff1a;每次移动一步 终止条件&#xff1a; 当快指针到达链表末尾时停止 事件复杂度&#xff1a; 始终为O(n),仅需依次遍历 空间复杂度&#xff1a; …

毕业论文 | 基于STM32的自动烟雾报警系统设计

基于STM32的烟雾报警系统 一、系统设计原理1. **系统架构**2. **工作原理**二、核心公式与算法1. **MQ-2传感器浓度计算**2. **温度传感器数据处理**3. **校准与滤波**三、关键代码实现1. **ADC初始化与数据读取(以MQ-2为例)**2. **报警逻辑与阈值设置**3. **EEPROM存储阈值*…

Android Gradle插件开发

文章目录 1. Gradle插件是什么2. 为什么需要插件3. 编写插件位置4. 编写插件5. 自定义插件扩展5.1 订阅扩展对象5.2 把扩展添加给Plugin并使用5.3 配置参数5.4 嵌套扩展5.4.1 定义扩展5.4.2 获取扩展属性5.4.3 使用5.4.4 执行5.4.5 输出 6. 编写在单独项目里6.1 新建Module6.2 …

PPIO X OWL:一键开启任务自动化的高效革命

2024年&#xff0c;仅凭一PPIO X OWL&#xff1a;一键开启任务自动化的高效革命篇技术论文&#xff0c;OWL的Github仓库便在24小时斩获了15k Star&#xff0c;成为2024年增速最快的多智能体协作框架&#xff0c;重新定义了任务自动化的效率边界。Camel AI团队开源全栈方案&…

分布式事务,事务失效,TC事务协调者

1. 概述 本方案书旨在解决分布式系统中事务一致性问题&#xff0c;重点阐述全局事务标识&#xff08;XID&#xff09;的传递与存储机制、事务协调者&#xff08;TC&#xff09;的设计与部署&#xff0c;以及分布式事务失效场景的应对策略。基于业界成熟框架&#xff08;如Seat…