首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

机器学习|MCP(Model Context Protocol)实战

  • 25-04-17 23:13
  • 2239
  • 6809
juejin.cn

最近 MCP 这么火,了解了一段时间也该写篇总结,那就开始吧。

 1. 什么是 MCP 

MCP(Model Context Protocol,模型上下文协议) ,2024年11月底,由 Anthropic 推出的一种开放标准,旨在统一大型语言模型(LLM)与外部数据源和工具之间的通信协议。
官网的介绍: modelcontextprotocol.io/introductio…

图片

MCP 包括几个核心功能:

  • Resources 是允许服务器公开可由客户端读取并用作 LLM 交互上下文的数据和内容,包括文件内容,数据库,API,图片等;
  • Prompts 方便定义的 Prompt 模板,支持动态参数等;
  • Tools 类似 function call;
  • Sampling 主要是在完成某个事项前的中间代理,保护数据隐私;
  • Roots 根目录,它定义了服务器可以运行的边界,它们为客户端提供了一种方式,可以告知服务器相关资源及其位置;
  • Transports MCP 使用JSON-RPC 2.0 作为其传输格式,Transports 负责将 MCP 协议消息转换为 JSON-RPC 格式进行传输,并将接收到的 JSON-RPC 消息转换回 MCP 协议消息,目前支持两种协议:stdio(标准输入输出),SSE(服务端发送协议);

 2. 开发 MCP Server 

假设我们提供 web 搜索功能,那么怎么通过 MCP 对接到大模型上呢?通过开发 MCP Server,于是我基于 duckduckgo 提供了文本,图片和视频搜索的 API,参考如下:

python
代码解读
复制代码
class DuckDuckGoSearch:     """DuckDuckGo 搜索功能封装"""          def __init__(self):         self.ddgs = DDGS()          def search(self, keywords: str, max_results: int = 10, safesearch: str = 'Off',                timelimit: str = 'y') -> Dict[str, List[Dict[str, Any]]]:         """通用文本搜索                  Args:             keywords: 搜索关键词             max_results: 最大结果数量             safesearch: 安全搜索选项 ('On' or 'Off')             timelimit: 时间限制 ('d', 'w', 'm', 'y')                      Returns:             包含搜索结果的字典         """         try:             results = []             ddgs_gen = self.ddgs.text(                 keywords,                  safesearch=safesearch,                 timelimit=timelimit,                 backend="lite"             )             for r in islice(ddgs_gen, max_results):                 results.append(r)             return {'results': results}         except Exception as e:             return {'results': [], 'error': str(e)}     def search_answers(self, keywords: str, max_results: int = 5) -> Dict[str, List[Dict[str, Any]]]:         """问答搜索                  Args:             keywords: 搜索关键词             max_results: 最大结果数量                      Returns:             包含答案的字典         """         try:             results = []             # 使用 text 方法替代 answers 方法             ddgs_gen = self.ddgs.text(                 keywords,                 safesearch='Off',                 timelimit='y',                 backend="lite",                 region='wt-wt'# 使用全球区域             )             for r in islice(ddgs_gen, max_results):                 results.append(r)             return {'results': results}         except Exception as e:             return {'results': [], 'error': str(e)}     def search_images(self, keywords: str, max_results: int = 10,                       safesearch: str = 'Off') -> Dict[str, List[Dict[str, Any]]]:         """图片搜索                  Args:             keywords: 搜索关键词             max_results: 最大结果数量             safesearch: 安全搜索选项 ('On' or 'Off')                      Returns:             包含图片信息的字典         """         try:             results = []             ddgs_gen = self.ddgs.images(                 keywords,                 safesearch=safesearch,                 timelimit=None             )             for r in islice(ddgs_gen, max_results):                 results.append(r)             return {'results': results}         except Exception as e:             return {'results': [], 'error': str(e)}     def search_videos(self, keywords: str, max_results: int = 10,                       safesearch: str = 'Off', resolution: str = "high") -> Dict[str, List[Dict[str, Any]]]:         """视频搜索                  Args:             keywords: 搜索关键词             max_results: 最大结果数量             safesearch: 安全搜索选项 ('On' or 'Off')             resolution: 视频分辨率 ("high" or "standard")                      Returns:             包含视频信息的字典         """         try:             results = []             ddgs_gen = self.ddgs.videos(                 keywords,                 safesearch=safesearch,                 timelimit=None,                 resolution=resolution             )             for r in islice(ddgs_gen, max_results):                 results.append(r)             return {'results': results}         except Exception as e:             return {'results': [], 'error': str(e)}

以上是对于 duckduckgo 封装,除了提供搜索以外,我们需要按照规范开发 MCP Server,代码如下:

python
代码解读
复制代码
# 初始化 FastMCP 服务器 app = FastMCP('web-search') @app.tool() async def web_search(query: str) -> str:     """     搜索互联网内容     Args:         query: 要搜索内容     Returns:         搜索结果的总结     """     ddg = DuckDuckGoSearch()     return ddg.search(query)      if __name__ == "__main__":     app.run(transport='stdio')
  • 创建 FastMCP
  • 提供 app.tool,web_search 的接口和文档信息
  • 启动 FastMCP

最终引入库如下:

python
代码解读
复制代码
# !pip install duckduckgo-search # !pip install mcp from itertools import islice from typing import List, Dict, Any, Optional from mcp.server import FastMCP from duckduckgo_search import DDGS

 3. 调试 MCP Server 

开发完上述的 MCP Server,通常我们是需要调试功能,使用官方的 Inspector 可视化工具来执行(首先需要安装 nodejs,确保 npx 命令可以使用),命令如下:

xml
代码解读
复制代码
npx -y @modelcontextprotocol/inspector <command> <arg1> <arg2>

按照我上述文件名为 mcp_server.py,启动:npx -y @modelcontextprotocol/inspector python3.11 mcp_server.py,执行界面如下:

图片

然后打开本地浏览器:http://127.0.0.1:6274,就可以进入调试界面:

图片

 4. 开发 MCP Client 

上面开发了 MCP Server,那么怎么让大模型调用 MCP Server 呢?步骤如下:

  • 首先将支持本地的 MCP Tools 列表提供给大模型
  • 其次约束大模型在回答某一类问题,或者不能获取知识时让系统调用 MCP Server
  • 最后将 MCP Server 返回的内容提供给大模型总结

代码如下(注意这里需要通过环境变量配置 OPENAI_API_KEY 和 OPENAI_API_BASE):

python
代码解读
复制代码
import json import asyncio import os from typing import Optional from contextlib import AsyncExitStack from openai import OpenAI from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client class MCPClient:     def __init__(self):         self.session: Optional[ClientSession] = None         self.exit_stack = AsyncExitStack()         self.client = OpenAI(             api_key=os.getenv("OPENAI_API_KEY"),              base_url=os.getenv("OPENAI_API_BASE"),         )         self.mode_name = "gpt-4o-mini"     asyncdef connect_to_server(self):         server_params = StdioServerParameters(             # 服务器执行的命令             command='python3.11',             # 运行的参数             args=['mcp_server.py'],             # 环境变量,默认为 None,表示使用当前环境变量             # env=None         )         stdio_transport = await self.exit_stack.enter_async_context(             stdio_client(server_params))         stdio, write = stdio_transport         self.session = await self.exit_stack.enter_async_context(             ClientSession(stdio, write))         await self.session.initialize()     asyncdef process_query(self, query: str) -> str:         system_prompt = (             "You are a helpful assistant."             "You have the function of online search. "             "Please MUST call web_search tool to search the Internet content before answering."             "Please do not lose the user's question information when searching,"             "and try to maintain the completeness of the question content as much as possible."             "When there is a date related question in the user's question,"             "please use the search function directly to search and PROHIBIT inserting specific time."         )                  messages = [             {"role": "system", "content": system_prompt},             {"role": "user", "content": query}         ]         # 获取所有 mcp 服务器 工具列表信息         response = await self.session.list_tools()         # 生成 function call 的描述信息         available_tools = [{             "type": "function",             "function": {                 "name": tool.name,                 "description": tool.description,                 "input_schema": tool.inputSchema             }         } for tool in response.tools]         print(f"\n\n ========> Available tools:\n{response}\n")         # 请求 function call 的描述信息通过 tools 参数传入         response = self.client.chat.completions.create(             model=self.mode_name,             messages=messages,             tools=available_tools,         )         # 处理返回的内容         content = response.choices[0]         if content.finish_reason == "tool_calls":             # 如何是需要使用工具,就解析工具             tool_call = content.message.tool_calls[0]             tool_name = tool_call.function.name             tool_args = json.loads(tool_call.function.arguments)             # 执行工具             result = await self.session.call_tool(tool_name, tool_args)             print(f"\n\nCalling tool [{tool_name}] with args [{tool_args}]\nCalling tool response: [{result}]\n\n")                          # 将返回的调用哪个工具数据和工具执行完成后的数据都存入messages中             messages.append(content.message.model_dump())             messages.append({                 "role": "tool",                 "content": result.content[0].text,                 "tool_call_id": tool_call.id,             })             # 将上面的结果再返回给模型用于生产最终的结果             response = self.client.chat.completions.create(                 model=self.mode_name,                 messages=messages,             )             return response.choices[0].message.content         return content.message.content     asyncdef chat(self):         whileTrue:             try:                 query = input("\nQuery: ").strip()                 if query.lower() == 'quit':                     break                 response = await self.process_query(query)                 print("\n" + response)             except Exception as e:                 import traceback                 traceback.print_exc()     asyncdef cleanup(self):         """Clean up resources"""         await self.exit_stack.aclose() asyncdef main():     client = MCPClient()     try:         await client.connect_to_server()         await client.chat()     finally:         await client.cleanup() if __name__ == "__main__":     asyncio.run(main())

图片

 5. Sampling 

Sampling 是采样,就是允许服务器通过客户端请求 LLM 完成,从而实现复杂的代理行为,同时保持安全性和隐私性,通俗的讲就是可以确认某个流程是否可以继续执行,执行顺序如下:

  • MCP 服务器向 MCP 客户端发送sampling/createMessage请求
  • MCP 客户端审查该请求,并可以进行修改
  • MCP 客户端从 LLM 中生成一个结果
  • MCP 客户端审查生成的结果
  • MCP 客户端将结果返回给 MCP 服务器图片

代码如下:

python
代码解读
复制代码
@app.tool() asyncdef shell(cmd: str) -> str:     """     执行 shell 脚本     Args:         cmd: 要执行的 shell 命令     Returns:         获取返回的结果     """     # 创建 SamplingMessage 用于触发 sampling callback 函数     result = await app.get_context().session.create_message(         messages=[             SamplingMessage(                 role='user', content=TextContent(                     type='text', text=f'是否可以执行当前命令: {cmd} (Y/N)')             )         ],         max_tokens=1024     )     print(f"result.content: {result.content}")     # 获取到 sampling callback 函数的返回值,并根据返回值进行处理     if result.content.text == 'Y':         print(f'执行命令: {cmd}')         import subprocess         result = subprocess.run(cmd, shell=True, capture_output=True, text=True)         return result.stdout     else:         print(f'拒绝执行命令: {cmd}')         returnf'命令执行被拒绝, content: {result.content}'

可以在调试界面中确认是否继续往下执行:

图片

 6. Prompts 

MCP 中提供了 Prompts 的功能,通过传入参数可以自定义 Prompt 模板,主要是方便后续可以动态生成,或者根据输入逻辑控制 LLM,样例代码如下:

python
代码解读
复制代码
@app.prompt("代码专家") def ask_review(code_snippet: str) -> str:     return f"Please review the following code snippet for potential bugs and style issues:\n```python\n{code_snippet}\n```" if __name__ == "__main__":     app.run(transport='stdio')

调试工具中可以直接使用:

图片

 7. Resources 

MCP 中提供了可以使用的资源列表,允许服务器公开可由客户端读取并用作 LLM 交互上下文的数据和内容,其中资源协议格式:[protocol]://[host]/[path],比如可以提供文件,数据库等。

  • 文件:file:///home/user/aaa.txt
  • 数据库:postgres://database/customers/schema
  • 屏幕:screen://localhost/display1

样例代码如下:

python
代码解读
复制代码
@app.resource("db://users/{user_id}/email") async def get_user_email(user_id: str) -> str:     """Retrieves the email address for a given user ID."""     # Replace with actual database lookup     emails = {"123": "[email protected]", "456": "[email protected]"}     return emails.get(user_id, "[email protected]")

调试工具中可以直接使用:

图片

 8. 生命周期 

MCP Server 本身是没有生命周期,但是 FastMCP 为了能结合业务本身的逻辑,提供了生命周期的控制,分别是:初始化,交互通信中,服务被关闭,那么在代码中怎么控制呢?

less
代码解读
复制代码
@dataclass class AppContext:     histories: dict          def __init__(self, histories: dict):         self.histories = histories         print(f"初始化 AppContext: {self.histories}") @asynccontextmanager asyncdef app_lifespan(server):     # 在 MCP 初始化时执行     histories = {}     try:         yield AppContext(histories=histories)     finally:         print(f"关闭服务器:{histories}")          # 初始化 FastMCP 服务器 app = FastMCP(     'mcp-server',     lifespan=app_lifespan, )

 9. LangChain 中使用 MCP Server 

做 LLM 应用开发,基本上所有的工具都集成到 LangChain,MCP 也不例外,如下是如何在 LangChain 中使用的代码:

ini
代码解读
复制代码
# !pip install langchain_mcp_adapters # !pip install langgraph # !pip install langchain_openai from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client from langchain_mcp_adapters.tools import load_mcp_tools from langgraph.prebuilt import create_react_agent from langchain_openai import ChatOpenAI import os import asyncio model = ChatOpenAI(     openai_api_base=os.getenv("OPENAI_API_BASE"),     openai_api_key=os.getenv("OPENAI_API_KEY"),     model="gpt-4o", ) server_params = StdioServerParameters(     # 服务器执行的命令     command='python3.11',     # 运行的参数     args=['mcp_server.py'],     # 环境变量,默认为 None,表示使用当前环境变量     # env=None ) asyncdef main():     asyncwith stdio_client(server_params) as (read, write):         asyncwith ClientSession(read, write) as session:             await session.initialize()             # 获取工具列表             tools = await load_mcp_tools(session)             # 创建并使用 ReAct agent             agent = create_react_agent(model, tools)             agent_response = await agent.ainvoke({'messages': '深圳天气如何?'})             print(f"agent_response: {agent_response}") if __name__ == "__main__":     asyncio.run(main())

 10. 其他 

(1)配置 Cursor

图片

打开 mcp.json 可以手动配置:

json
代码解读
复制代码
{   "mcpServers": {     "mcp-server": {       "command": "python3.11",       "args": ["/Volumes/my/mpserver/blog/机器学习/code/mcp/mcp-server.py"]     }   } }

也可以参考官方配置 SSE 协议:

json
代码解读
复制代码
{   "mcpServers": {     "server-name": {       "url": "http://localhost:3000/sse",       "env": {         "API_KEY": "value"       }     }   } }

(2)开源的 MCP 资源或者项目

MCP 官方提供了很多服务,可以参考:mcp.so/。
另外也有一些开源项目,有兴趣可以看看:github.com/yzfly/Aweso…

 参考 

(1)modelcontextprotocol.io/tutorials/b…
(2)github.com/yzfly/Aweso…

注:本文转载自juejin.cn的周末程序猿的文章"https://juejin.cn/post/7492091303384465423"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top