首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

【LLM】MCP(Python):实现 stdio 通信的Client与Server

  • 25-04-25 10:41
  • 3340
  • 9512
blog.csdn.net

本文将详细介绍如何使用 Model Context Protocol (MCP) 在 Python 中实现基于 STDIO 通信的 Client 与 Server。MCP 是一个开放协议,它使 LLM 应用与外部数据源和工具之间的无缝集成成为可能。无论你是构建 AI 驱动的 IDE、改善 chat 交互,还是构建自定义的 AI 工作流,MCP 提供了一种标准化的方式,将 LLM 与它们所需的上下文连接起来。部分灵感来源:Se7en。

提示
在开始前,请确保你已经安装了必要的依赖包:

pip install openai mcp
  • 1

本文中,我们将介绍如何配置环境、编写 MCP Server 以及实现 MCP Client。


环境配置

在使用 MCP 之前,需要先配置相关环境变量,以便 Client 与 Server 都能正确加载所需的参数。你可以在项目根目录下创建一个 .env 文件,并写入以下内容:

此外,创建一个 .env 文件来存储您的配置:

MODEL_NAME=deepseek-chat
BASE_URL=https://api.deepseek.com/v1
API_KEY=your_api_key_here
  • 1
  • 2
  • 3

上述配置中,MODEL_NAME 表示使用的 OpenAI 模型名称(例如 “deepseek-chat”),BASE_URL 指向 OpenAI API 的基础地址,而 API_KEY 则为访问 API 所需的密钥。


书写 Server 的规范

构建 MCP Server(特别是基于 stdio 通信)时,推荐遵循统一规范,提升可读性、可维护性与复用性。

  • 服务命名统一
    使用 MCP_SERVER_NAME 作为唯一名称,贯穿日志、初始化等环节。
  • 日志配置清晰
    统一使用 logging 模块,推荐 INFO 级别,便于调试和追踪。
  • 工具注册规范
    通过 @mcp.tool() 装饰器注册工具函数,要求:
    • 命名清晰
    • 参数有类型注解
    • 注释说明参数与返回值(推荐中文)
    • 加入边界检查或异常处理
  • 使用标准 stdio 启动方式
    通过 async with stdio_server() 获取输入输出流,统一调用 _mcp_server.run(...) 启动服务。
  • 初始化选项规范
    使用 InitializationOptions 设置服务名、版本及能力声明(通常由 FastMCP 提供)。
  • 通用模板

    import asyncio
    import logging
    from mcp.server.fastmcp import FastMCP
    from mcp.server import InitializationOptions, NotificationOptions
    from mcp.server.stdio import stdio_server  # STDIO 通信方式
    
    # 定义唯一服务名称
    MCP_SERVER_NAME = "your-stdio-server-name"
    
    # 配置日志输出
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    logger = logging.getLogger(MCP_SERVER_NAME)
    
    # 创建 FastMCP 实例
    mcp = FastMCP(MCP_SERVER_NAME)
    
    # 定义工具
    @mcp.tool()
    def your_tool_name(param1: type, param2: type) -> return_type:
        """
        工具描述。
    
        参数:
        - param1 (type): 描述
        - param2 (type): 描述
    
        返回:
        - return_type: 描述
        """
        # 工具实现
        pass
    
    # 启动 MCP Server 主函数
    async def main():
        # 创建 stdio 通信通道
        async with stdio_server() as (read_stream, write_stream):
            # 构建初始化选项
            init_options = InitializationOptions(
                server_name=MCP_SERVER_NAME,
                server_version="1.0.0",
                capabilities=mcp._mcp_server.get_capabilities(
                    notification_options=NotificationOptions(),
                    experimental_capabilities={}
                )
            )
            logger.info("MCP Server 以 STDIO 模式启动中...")
            # 启动 Server
            await mcp._mcp_server.run(read_stream, write_stream, init_options)
    
    # 主程序入口
    if __name__ == "__main__":
        asyncio.run(main())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

编写 MCP Server

MCP Server 的实现主要基于标准输入输出(STDIO)进行通信。服务端通过注册工具,向外界提供如加法、减法、乘法以及除法等计算功能。下面简述服务端的主要实现步骤:

  1. 初始化 FastMCP 实例
    服务端首先创建一个 FastMCP 实例,并为其命名(例如 “math-stdio-server”)。
  2. 工具注册
    使用装饰器的方式注册加法、减法、乘法、除法等工具,每个工具均包含详细的参数说明和返回值说明。
  3. 日志配置
    通过 Python 标准日志模块对服务端进行日志配置,以便记录服务运行状态和错误信息。
  4. 建立 STDIO 通信
    使用 stdio_server() 函数建立基于 STDIO 的通信,并构造初始化选项,包含服务器名称、版本以及能力说明。随后,调用 MCP 内部的服务启动函数开始监听和处理来自 Client 的请求。
import asyncio
import logging
from mcp.server.fastmcp import FastMCP
from mcp.server import InitializationOptions, NotificationOptions
from mcp.server.stdio import stdio_server  # 直接导入 stdio_server 函数

# 定义服务器名称
MCP_SERVER_NAME = "math-stdio-server"

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(MCP_SERVER_NAME)

# 初始化 FastMCP 实例
mcp = FastMCP(MCP_SERVER_NAME)

# 注册加法工具
@mcp.tool()
def add(a: float, b: float) -> float:
    """
    加法工具

    参数:
      - a (float): 第一个数字(必填)
      - b (float): 第二个数字(必填)

    返回:
      - float: a 与 b 的和
    """
    return a + b

# 注册减法工具
@mcp.tool()
def subtract(a: float, b: float) -> float:
    """
    减法工具

    参数:
      - a (float): 被减数(必填)
      - b (float): 减数(必填)

    返回:
      - float: a 与 b 的差
    """
    return a - b

# 注册乘法工具
@mcp.tool()
def multiply(a: float, b: float) -> float:
    """
    乘法工具

    参数:
      - a (float): 第一个数字(必填)
      - b (float): 第二个数字(必填)

    返回:
      - float: a 与 b 的积
    """
    return a * b

# 注册除法工具
@mcp.tool()
def divide(a: float, b: float) -> float:
    """
    除法工具

    参数:
      - a (float): 分子(必填)
      - b (float): 分母(必填,且不能为零)

    返回:
      - float: a 与 b 的商
    """
    if b == 0:
        raise ValueError("除数不能为零")
    return a / b

async def main():
    # 使用 stdio_server 建立 STDIO 通信
    async with stdio_server() as (read_stream, write_stream):
        # 构造初始化选项
        init_options = InitializationOptions(
            server_name=MCP_SERVER_NAME,
            server_version="1.0.0",
            capabilities=mcp._mcp_server.get_capabilities(
                notification_options=NotificationOptions(),
                experimental_capabilities={}
            )
        )
        logger.info("通过 STDIO 模式启动 MCP Server ...")
        # 使用内部的 _mcp_server 运行服务
        await mcp._mcp_server.run(read_stream, write_stream, init_options)

if __name__ == "__main__":
    asyncio.run(main())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99

编写 MCP Client

MCP Client 主要实现与多个基于 STDIO 的服务器建立连接,并通过 OpenAI API 对用户的自然语言查询进行处理,调用相应工具获得最终结果。客户端的主要逻辑可以分为以下部分:

  1. 初始化客户端
    在 MCPClient 类的构造函数中,传入所需的模型名称、OpenAI API 基础地址、API 密钥以及包含服务端脚本路径的列表。客户端将使用这些参数初始化 OpenAI 异步客户端,同时准备一个 AsyncExitStack 来管理所有异步上下文。
  2. 建立多个 STDIO 连接
    通过遍历服务器脚本列表,为每个脚本生成唯一标识符(如 server0、server1 等),然后依次调用 stdio_client 函数建立连接,并通过 ClientSession 完成初始化。在连接成功后,从每个服务器获取可用工具列表,并将工具名称加上前缀(例如 server0_add)保存到映射表中,避免工具名称冲突。
  3. 处理用户查询
    在 process_query 方法中,客户端首先根据用户的输入构造消息,然后汇总所有连接服务器提供的工具,传递给 OpenAI API 进行处理。当 API 返回调用工具的请求时,客户端根据工具名称找到对应服务器会话,并执行相应的工具调用,收集返回结果后再交由 API 生成后续回复,直至所有工具调用处理完成。
  4. 交互式对话循环
    客户端提供一个简单的命令行交互循环,用户输入查询后,调用 process_query 方法获取最终回复,并打印在终端上。如果用户输入 quit 或使用 Ctrl+C 中断,则客户端将平滑退出并释放所有资源。
  5. 资源清理
    最后,在退出前,通过 AsyncExitStack 统一关闭所有连接,确保资源不会泄露。
import asyncio
import json
import os
import sys
from typing import List
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import AsyncOpenAI


class MCPClient:
    def __init__(self, model_name: str, base_url: str, api_key: str, server_scripts: List[str]):
        """
        初始化 MCP 客户端,支持多个 stdio 服务器。

        :param model_name: OpenAI 模型名称,例如 "deepseek-chat"。
        :param base_url: OpenAI API 基础地址,例如 "https://api.deepseek.com/v1"。
        :param api_key: OpenAI API 密钥。
        :param server_scripts: stdio 服务脚本路径列表。
        """
        self.model_name = model_name
        self.base_url = base_url
        self.api_key = api_key
        self.server_scripts = server_scripts

        self.sessions = {}         # server_id -> (session, session_ctx, stdio_ctx)
        self.tool_mapping = {}     # 带前缀的工具名 -> (session, 原始工具名)
        self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)
        self.exit_stack = AsyncExitStack()

    async def initialize_sessions(self):
        """初始化所有 stdio 服务器连接,并收集工具映射。"""
        for i, script in enumerate(self.server_scripts):
            if not (os.path.exists(script) and script.endswith(".py")):
                print(f"脚本 {script} 不存在或不是 .py 文件,跳过。")
                continue

            server_id = f"server{i}"
            params = StdioServerParameters(command="python", args=[script], env=None)
            try:
                stdio_ctx = stdio_client(params)
                stdio = await self.exit_stack.enter_async_context(stdio_ctx)
                session_ctx = ClientSession(*stdio)
                session = await self.exit_stack.enter_async_context(session_ctx)
                await session.initialize()
                self.sessions[server_id] = (session, session_ctx, stdio_ctx)

                response = await session.list_tools()
                for tool in response.tools:
                    self.tool_mapping[f"{server_id}_{tool.name}"] = (session, tool.name)
                print(f"已连接到 {script},工具:{[tool.name for tool in response.tools]}")
            except Exception as e:
                print(f"连接 {script} 失败:{e}")

    async def cleanup(self):
        """释放所有资源。"""
        try:
            await self.exit_stack.aclose()
            print("所有连接资源已释放")
        except asyncio.CancelledError:
            pass
        except Exception as e:
            print(f"清理资源时异常:{e}")

    async def _gather_available_tools(self):
        """汇总所有服务器的工具列表。"""
        tools = []
        for server_id, (session, _, _) in self.sessions.items():
            response = await session.list_tools()
            for tool in response.tools:
                tools.append({
                    "type": "function",
                    "function": {
                        "name": f"{server_id}_{tool.name}",
                        "description": tool.description,
                        "parameters": tool.inputSchema,
                    }
                })
        return tools

    async def process_query(self, query: str) -> str:
        """处理查询,调用 OpenAI API 和相应工具后返回结果。"""
        messages = [{"role": "user", "content": query}]
        available_tools = await self._gather_available_tools()

        try:
            response = await self.client.chat.completions.create(
                model=self.model_name, messages=messages, tools=available_tools
            )
        except Exception as e:
            return f"调用 OpenAI API 失败:{e}"

        final_text = [response.choices[0].message.content or ""]
        message = response.choices[0].message

        # 当有工具调用时循环处理
        while message.tool_calls:
            for call in message.tool_calls:
                tool_name = call.function.name
                if tool_name not in self.tool_mapping:
                    final_text.append(f"未找到工具:{tool_name}")
                    continue

                session, original_tool = self.tool_mapping[tool_name]
                tool_args = json.loads(call.function.arguments)
                try:
                    result = await session.call_tool(original_tool, tool_args)
                    final_text.append(f"[调用 {tool_name} 参数: {tool_args}]")
                    final_text.append(f"工具结果: {result.content}")
                except Exception as e:
                    final_text.append(f"调用 {tool_name} 出错:{e}")
                    continue

                messages += [
                    {"role": "assistant", "tool_calls": [{
                        "id": call.id,
                        "type": "function",
                        "function": {"name": tool_name, "arguments": json.dumps(tool_args)}
                    }]},
                    {"role": "tool", "tool_call_id": call.id, "content": str(result.content)}
                ]
            try:
                response = await self.client.chat.completions.create(
                    model=self.model_name, messages=messages, tools=available_tools
                )
            except Exception as e:
                final_text.append(f"调用 OpenAI API 失败:{e}")
                break
            message = response.choices[0].message
            if message.content:
                final_text.append(message.content)
        return "\n".join(final_text)

    async def chat_loop(self):
        """交互式对话循环,捕获中断平滑退出。"""
        print("MCP 客户端已启动,输入问题,输入 'quit' 退出。")
        while True:
            try:
                query = input("问题: ").strip()
                if query.lower() == "quit":
                    break
                result = await self.process_query(query)
                print("\n" + result)
            except KeyboardInterrupt:
                print("\n检测到中断信号,退出。")
                break
            except Exception as e:
                print(f"发生错误:{e}")


async def main():
    model_name = os.getenv("MODEL_NAME", "deepseek-chat")
    base_url = os.getenv("BASE_URL", "https://api.deepseek.com/v1")
    api_key = os.getenv("API_KEY")
    if not api_key:
        print("未设置 API_KEY 环境变量")
        sys.exit(1)

    # 示例:使用两个 stdio 脚本
    server_scripts = ["server.py"]
    client = MCPClient(model_name, base_url, api_key, server_scripts)
    try:
        await client.initialize_sessions()
        await client.chat_loop()
    except KeyboardInterrupt:
        print("\n收到中断信号")
    finally:
        await client.cleanup()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("程序已终止。")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177

总结

通过本文的介绍,我们了解了如何使用 MCP 协议在 Python 中构建基于 STDIO 通信的 Client 与 Server。服务端通过注册多个工具为外部应用提供计算能力,而客户端则利用 OpenAI API 和工具调用的方式,将自然语言查询转化为对具体工具的调用,最终将结果反馈给用户。

这种基于 STDIO 的通信方式不仅简化了服务端与客户端之间的连接,还能方便地支持多服务器同时运行,为构建灵活高效的 LLM 应用提供了坚实的基础。

注:本文转载自blog.csdn.net的T0uken的文章"https://blog.csdn.net/2303_80346267/article/details/147011176"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

139
资讯
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top