记一次mcp初体验(带可运行demo)
最近在忙着开发别的需求,在等测试结果的期间,看了一下近几个月很火的MCP。在看完对应的介绍之后,打算去找个demo来跑一下。这里参考的demo是Anthropic的Claude,不过因为注册手机需要海外的,于是决定接入到阿里的百炼平台。
背景
最近在忙着开发别的需求,在等测试结果的期间,看了一下近几个月很火的MCP。在看完对应的介绍之后,打算去找个demo来跑一下。这里参考的demo是Anthropic的Claude,不过因为注册手机需要海外的,于是决定接入到阿里的百炼平台。
MCP是什么?
这里我就不写一些很学术或文绉绉的解释了,我会用自己的理解来进行简单的介绍。
在介绍MCP之前必须先简单介绍一下Agent,在llm领域的Agent可以简单理解成会自己根据问题使用工具(Tools)的llm。以前的做法可以是在Agent项目中自己写tools,然后自己调用。
而MCP的出现则是将tools与作为基座的llm进行解耦,Anthropic通过制定规则,让tools服务化(也就是起了一个tools的server),从而可以被外部的llm进行调用。
以上是我对mcp的简单理解,mcp如果广泛铺开后,就可以集群众的力量来开发tools,从而使llm能实现更多的功能。
MCPdemo
参考自https://mcp-docs.cn/introduction
这里建议使用uv来管理,真的非常方便!!
server代码
可以参考https://mcp-docs.cn/quickstart/server,可以照搬这里的代码。这就是mcp的好处,工具部分规范化。
client代码
参考自https://mcp-docs.cn/quickstart/client,但由于国内使用Claude不方便,故接入了阿里的模型。而因为不同的llm的messages的结构不同,因此对其进行修改,详情可看对应的注释。
需要注意的是,这里并没有使用langchain或langgraph来管理上下文,完全靠自己来拼接管理。
from openai import OpenAI
class MCPClient:
def __init__(self):
# 初始化会话和客户端对象
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.client_openai = OpenAI(
api_key="xxx",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
async def connect_to_server(self, server_script_path: str):
"""连接到 MCP 服务器
Args:
server_script_path: 服务器脚本的路径 (.py 或 .js)
"""
is_python = server_script_path.endswith('.py')
is_js = server_script_path.endswith('.js')
if not (is_python or is_js):
raise ValueError("服务器脚本必须是 .py 或 .js 文件")
command = "python" if is_python else "node"
server_params = StdioServerParameters(
command=command,
args=[server_script_path],
env=None
)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
await self.session.initialize()
# 列出可用的工具
response = await self.session.list_tools()
tools = response.tools
print("\n已连接到服务器,工具包括:", [tool.name for tool in tools])
async def process_query(self, query: str) -> str:
"""使用 Claude 和可用的工具处理查询"""
messages = [
{
"role": "user",
"content": query
}
]
response = await self.session.list_tools()
available_tools = [
{
"type": "function", # 添加 type 字段
"function": { # 将 name, description, parameters 嵌套在 function 下
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
}
for tool in response.tools
]
response = self.client_openai.chat.completions.create(
model="你的model名",
max_tokens=1000,
messages=messages,
tools=available_tools
)
# 处理响应并处理工具调用
final_text = []
# --- 处理响应和可能的工具调用循环 ---
while True: # 使用循环处理可能的连续工具调用
if not response.choices:
final_text.append("[模型未返回有效响应]")
break
choice = response.choices[0]
message = choice.message # ChatCompletionMessage 对象
# 总是将助手的回复(无论是内容还是工具调用请求)添加到历史记录中
messages.append(message.model_dump()) # 使用 .model_dump() 转换为正确的字典格式
# 检查是否有工具调用请求
if message.tool_calls:
print("Tool calls requested:", message.tool_calls)
# (可选) 如果模型可能一次返回多个 tool_calls,你需要遍历它们
# for tool_call in message.tool_calls:
# # ... 执行并添加每个工具的结果 ...
# 为了简单起见,我们先处理第一个(与你原始代码一致)
tool_call = message.tool_calls[0] # 获取第一个工具调用对象
if tool_call.type == 'function':
tool_name = tool_call.function.name
tool_id = tool_call.id # 获取 tool_call 的 ID
try:
tool_args = json.loads(tool_call.function.arguments)
print(f"Executing tool: {tool_name} with args: {tool_args}")
except json.JSONDecodeError:
print(f"Error parsing tool arguments: {tool_call.function.arguments}")
# 处理错误,例如返回错误信息给模型
tool_result_content_str = f"Error: Invalid arguments format for tool {tool_name}"
messages.append({
"role": "tool",
"tool_call_id": tool_id,
"content": tool_result_content_str,
})
# 可以选择继续循环或中断
continue # 跳过此工具调用,继续循环(或根据需要调整)
final_text.append(f"[调用工具 {tool_name},参数 {tool_args}]") # 记录到最终文本
# --- 执行工具调用 ---
try:
result = await self.session.call_tool(tool_name, tool_args)
# --- 从 result 中提取 *字符串* 内容 ---
# **你需要根据 call_tool 的实际返回值调整这部分**
tool_result_content_str = ""
if hasattr(result, 'content') and isinstance(result.content, list) and len(result.content) > 0 and hasattr(result.content[0], 'text'):
# 情况:result.content 是 [TextContent(text='...')]
tool_result_content_str = result.content[0].text
elif hasattr(result, 'text'):
# 情况:result 本身是 TextContent(text='...')
tool_result_content_str = result.text
elif isinstance(result, str):
# 情况:result 直接是字符串
tool_result_content_str = result
else:
# 其他情况:尝试转换为字符串,或者记录错误
print(f"Warning: Unexpected tool result type: {type(result)}. Converting to string.")
tool_result_content_str = str(result)
# --- 提取结束 ---
# print(f"Tool {tool_name} result (string):", tool_result_content_str)
# --- 构建正确的 tool result 消息 ---
tool_message = {
"role": "tool",
"tool_call_id": tool_id, # 使用正确的键名和 ID
"content": tool_result_content_str # 使用提取的字符串结果
}
messages.append(tool_message)
except Exception as e:
print(f"Error executing tool {tool_name}: {e}")
# 向模型报告工具执行失败
messages.append({
"role": "tool",
"tool_call_id": tool_id,
"content": f"Error executing tool {tool_name}: {e}"
})
# --- 工具调用和结果添加完毕,准备下一次 API 调用 ---
# print("Messages before next API call:", messages)
response = self.client_openai.chat.completions.create(
model="qwen-max",
max_tokens=1000,
messages=messages,
tools=available_tools
)
# 继续循环,让下一次迭代处理新的响应
else:
# 如果 tool_call 类型不是 'function',可能需要处理或忽略
print(f"Ignoring non-function tool call: {tool_call.type}")
break # 或者根据需要处理
# 如果没有工具调用,说明助手返回了最终内容
elif message.content:
print("Assistant returned final content:", message.content)
final_text.append(message.content)
break # 结束循环
# 如果既没有工具调用也没有内容 (例如 stop_reason='length')
else:
print("Assistant finished without content or tool calls.")
final_text.append("[模型未返回内容]")
break # 结束循环
# 循环结束后,组合最终文本
return "\n".join(final_text)
async def chat_loop(self):
"""运行交互式聊天循环"""
print("\nMCP 客户端已启动!")
print("输入你的查询或输入 'quit' 退出。")
# query = "how is the weather today in LA"
# response = await self.process_query(query)
# print("\n" + response)
while True:
try:
query = input("\n查询: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query)
print("\n" + response)
except Exception as e:
print(f"\n错误: {str(e)}")
async def cleanup(self):
"""清理资源"""
await self.exit_stack.aclose()
async def main():
if len(sys.argv) < 2:
print("使用方法: python client.py <path_to_server_script>")
sys.exit(1)
client = MCPClient()
try:
await client.connect_to_server(sys.argv[1])
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == "__main__":
import sys
asyncio.run(main())
在构建完对应的server,和上面的client后,运行以下代码即可运行了
uv run client.py path/to/server.py # python 服务器
更多推荐


所有评论(0)