背景

最近在忙着开发别的需求,在等测试结果的期间,看了一下近几个月很火的MCP。在看完对应的介绍之后,打算去找个demo来跑一下。这里参考的demo是Anthropic的Claude,不过因为注册手机需要海外的,于是决定接入到阿里的百炼平台。

MCP是什么?

这里我就不写一些很学术或文绉绉的解释了,我会用自己的理解来进行简单的介绍。

在介绍MCP之前必须先简单介绍一下Agent,在llm领域的Agent可以简单理解成会自己根据问题使用工具(Tools)的llm。以前的做法可以是在Agent项目中自己写tools,然后自己调用。

而MCP的出现则是将tools与作为基座的llm进行解耦,Anthropic通过制定规则,让tools服务化(也就是起了一个tools的server),从而可以被外部的llm进行调用。

以上是我对mcp的简单理解,mcp如果广泛铺开后,就可以集群众的力量来开发tools,从而使llm能实现更多的功能。

MCPdemo

参考自https://mcp-docs.cn/introduction
这里建议使用uv来管理,真的非常方便!!

server代码

可以参考https://mcp-docs.cn/quickstart/server,可以照搬这里的代码。这就是mcp的好处,工具部分规范化。

client代码

参考自https://mcp-docs.cn/quickstart/client,但由于国内使用Claude不方便,故接入了阿里的模型。而因为不同的llm的messages的结构不同,因此对其进行修改,详情可看对应的注释。
需要注意的是,这里并没有使用langchain或langgraph来管理上下文,完全靠自己来拼接管理。

from openai import OpenAI
class MCPClient:
    def __init__(self):
        # 初始化会话和客户端对象
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.client_openai = OpenAI(
    api_key="xxx", 
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
  )
   
    async def connect_to_server(self, server_script_path: str):
        """连接到 MCP 服务器

        Args:
            server_script_path: 服务器脚本的路径 (.py 或 .js)
        """
        is_python = server_script_path.endswith('.py')
        is_js = server_script_path.endswith('.js')
        if not (is_python or is_js):
            raise ValueError("服务器脚本必须是 .py 或 .js 文件")

        command = "python" if is_python else "node"
        server_params = StdioServerParameters(
            command=command,
            args=[server_script_path],
            env=None
        )

        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

        await self.session.initialize()

        # 列出可用的工具
        response = await self.session.list_tools()
        tools = response.tools
        print("\n已连接到服务器,工具包括:", [tool.name for tool in tools])
    
    async def process_query(self, query: str) -> str:
        """使用 Claude 和可用的工具处理查询"""
        messages = [
            {
                "role": "user",
                "content": query
            }
        ]
        response = await self.session.list_tools()

        available_tools = [
            {
                "type": "function",  # 添加 type 字段
                "function": {       # 将 name, description, parameters 嵌套在 function 下
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.inputSchema  
                }
            }
            for tool in response.tools
        ]
        response = self.client_openai.chat.completions.create(
            model="你的model名",
            max_tokens=1000,
            messages=messages,
            tools=available_tools
        )
        # 处理响应并处理工具调用
        final_text = []
        
    # --- 处理响应和可能的工具调用循环 ---
        while True: # 使用循环处理可能的连续工具调用
            if not response.choices:
                final_text.append("[模型未返回有效响应]")
                break

            choice = response.choices[0]
            message = choice.message # ChatCompletionMessage 对象

            # 总是将助手的回复(无论是内容还是工具调用请求)添加到历史记录中
            messages.append(message.model_dump()) # 使用 .model_dump() 转换为正确的字典格式

            # 检查是否有工具调用请求
            if message.tool_calls:
                print("Tool calls requested:", message.tool_calls)
                # (可选) 如果模型可能一次返回多个 tool_calls,你需要遍历它们
                # for tool_call in message.tool_calls:
                #     # ... 执行并添加每个工具的结果 ...
                # 为了简单起见,我们先处理第一个(与你原始代码一致)
                tool_call = message.tool_calls[0] # 获取第一个工具调用对象

                if tool_call.type == 'function':
                    tool_name = tool_call.function.name
                    tool_id = tool_call.id # 获取 tool_call 的 ID

                    try:
                        tool_args = json.loads(tool_call.function.arguments)
                        print(f"Executing tool: {tool_name} with args: {tool_args}")
                    except json.JSONDecodeError:
                        print(f"Error parsing tool arguments: {tool_call.function.arguments}")
                        # 处理错误,例如返回错误信息给模型
                        tool_result_content_str = f"Error: Invalid arguments format for tool {tool_name}"
                        messages.append({
                            "role": "tool",
                            "tool_call_id": tool_id,
                            "content": tool_result_content_str,
                        })
                        # 可以选择继续循环或中断
                        continue # 跳过此工具调用,继续循环(或根据需要调整)

                    final_text.append(f"[调用工具 {tool_name},参数 {tool_args}]") # 记录到最终文本

                    # --- 执行工具调用 ---
                    try:
                        result = await self.session.call_tool(tool_name, tool_args)

                        # --- 从 result 中提取 *字符串* 内容 ---
                        # **你需要根据 call_tool 的实际返回值调整这部分**
                        tool_result_content_str = ""
                        if hasattr(result, 'content') and isinstance(result.content, list) and len(result.content) > 0 and hasattr(result.content[0], 'text'):
                            # 情况:result.content 是 [TextContent(text='...')]
                            tool_result_content_str = result.content[0].text
                        elif hasattr(result, 'text'):
                            # 情况:result 本身是 TextContent(text='...')
                            tool_result_content_str = result.text
                        elif isinstance(result, str):
                            # 情况:result 直接是字符串
                            tool_result_content_str = result
                        else:
                            # 其他情况:尝试转换为字符串,或者记录错误
                            print(f"Warning: Unexpected tool result type: {type(result)}. Converting to string.")
                            tool_result_content_str = str(result)
                        # --- 提取结束 ---

                        # print(f"Tool {tool_name} result (string):", tool_result_content_str)

                        # --- 构建正确的 tool result 消息 ---
                        tool_message = {
                            "role": "tool",
                            "tool_call_id": tool_id, # 使用正确的键名和 ID
                            "content": tool_result_content_str # 使用提取的字符串结果
                        }
                        messages.append(tool_message)

                    except Exception as e:
                        print(f"Error executing tool {tool_name}: {e}")
                        # 向模型报告工具执行失败
                        messages.append({
                            "role": "tool",
                            "tool_call_id": tool_id,
                            "content": f"Error executing tool {tool_name}: {e}"
                        })

                    # --- 工具调用和结果添加完毕,准备下一次 API 调用 ---
                    # print("Messages before next API call:", messages)
                    response = self.client_openai.chat.completions.create(
                        model="qwen-max",
                        max_tokens=1000,
                        messages=messages,
                        tools=available_tools
                    )
                    # 继续循环,让下一次迭代处理新的响应

                else:
                    # 如果 tool_call 类型不是 'function',可能需要处理或忽略
                    print(f"Ignoring non-function tool call: {tool_call.type}")
                    break # 或者根据需要处理

            # 如果没有工具调用,说明助手返回了最终内容
            elif message.content:
                print("Assistant returned final content:", message.content)
                final_text.append(message.content)
                break # 结束循环

            # 如果既没有工具调用也没有内容 (例如 stop_reason='length')
            else:
                print("Assistant finished without content or tool calls.")
                final_text.append("[模型未返回内容]")
                break # 结束循环

        # 循环结束后,组合最终文本
        return "\n".join(final_text)

    async def chat_loop(self):
        """运行交互式聊天循环"""
        print("\nMCP 客户端已启动!")
        print("输入你的查询或输入 'quit' 退出。")
        # query = "how is the weather today in LA"
        # response = await self.process_query(query)
        # print("\n" + response)
        while True:
            try:
                query = input("\n查询: ").strip()

                if query.lower() == 'quit':
                    break

                response = await self.process_query(query)
                print("\n" + response)

            except Exception as e:
                print(f"\n错误: {str(e)}")

    async def cleanup(self):
        """清理资源"""
        await self.exit_stack.aclose()
    
async def main():
    if len(sys.argv) < 2:
        print("使用方法: python client.py <path_to_server_script>")
        sys.exit(1)

    client = MCPClient()
    try:
        await client.connect_to_server(sys.argv[1])
        await client.chat_loop()
    finally:
        await client.cleanup()

if __name__ == "__main__":
    import sys
    asyncio.run(main())

在构建完对应的server,和上面的client后,运行以下代码即可运行了

uv run client.py path/to/server.py # python 服务器
Logo

一站式 AI 云服务平台

更多推荐