# agent.py (modify get_tools_async and other parts as needed)
import os
import asyncio
from dotenv import load_dotenv
from google.genai import types
from google.adk.agents.llm_agent import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService # Optional
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, SseServerParams, StdioServerParameters
from qwen import llm
load_dotenv('ADK/.env')    # 换成自己的env地址
global Amaps_api_key
Amaps_api_key = os.getenv('AMAP_MAPS_API_KEY')
# 使用高德地图mcp server 
async def get_tools_async():
  """ Step 1: Gets tools from the 高德地图 MCP Server."""
  # IMPORTANT: Replace with your actual key

#   if "YOUR_API_KEY" in Amaps_api_key:
#       raise ValueError("Please replace 'YOUR_API_KEY_FROM_STEP_1' with your actual Google Maps API key.")
  
  print("Attempting to connect to MCP 高德 Maps server...")
  tools, exit_stack = await MCPToolset.from_server(
      connection_params=StdioServerParameters(
          command='npx',
          args=["-y",
                "@amap/amap-maps-mcp-server"
          ],
          # Pass the API key as an environment variable to the npx process
          env={
              "AMAP_MAPS_API_KEY": Amaps_api_key
          }
      )
  )
  print("MCP Toolset created successfully.")
  return tools, exit_stack

# --- Step 2: Agent Definition ---
async def get_agent_async():
  """Creates an ADK Agent equipped with tools from the MCP Server."""
  tools, exit_stack = await get_tools_async()
  print(f"Fetched {len(tools)} tools from MCP server.")
  root_agent = LlmAgent(
      model=llm, # Adjust if needed
      name='maps_assistant',
      instruction='Help user with mapping and directions using available tools.',
      tools=tools,
  )
  return root_agent, exit_stack

# --- Step 3: Main Execution Logic (modify query) ---
async def async_main():
  session_service = InMemorySessionService()
  artifacts_service = InMemoryArtifactService() # Optional
  session = session_service.create_session(
      state={}, app_name='mcp_maps_app', user_id='user_maps'
  )

  # TODO: Use specific addresses for reliable results with this server
  query = "杭州站的位置在哪里"
  print(f"User Query: '{query}'")
  content = types.Content(role='user', parts=[types.Part(text=query)])
  
  root_agent, exit_stack = await get_agent_async()
  runner = Runner(
      app_name='mcp_maps_app',
      agent=root_agent,
      artifact_service=artifacts_service, # Optional
      session_service=session_service,
  )

  print("Running agent...")
  final_response_content = "No final response received."
  events_async = runner.run_async(
      session_id=session.id, user_id=session.user_id, new_message=content
  )
  # 取出完整结果
  async for event in events_async:
    if event.is_final_response() and event.content and event.content.parts:
            # For output_schema, the content is the JSON string itself
        final_response_content = event.content.parts[0].text
        print(final_response_content)
    # print(f"Event received: {event}")

  print("Closing MCP server connection...")
  await exit_stack.aclose()
  print("Cleanup complete.")

if __name__ == '__main__':
  try:
    asyncio.run(async_main())
  except Exception as e:
      print(f"An error occurred: {e}")

运行结果:

Logo

一站式 AI 云服务平台

更多推荐