a2a-github以及开源框架上的example都支持的是google大模型和openai官方大模型,不支持配置化本地模型能力

尝试使用官方sdk和开源社区流行框架sdk,结合本地大模型,分别实现a2a的python-demo,以下是具体实现,(demo提供源码地址)


5月16日更新:

  • google的sdk迭代迅速,(本文中官方sdk的python-demo,需依赖使用本文分支代码进行sdk install),项目已迁移至独立项目中,应该是听取了我的意见😁.
  • 鉴于官方sdk迭代了,如果要运行文章中介绍的谷歌sdk-demo,请使用文中的该分支版本install sdk.
  • 官方最新地址:https://github.com/google/a2a-python
  • 讨论地址:https://github.com/google/A2A/discussions/491

PS:非常高兴,能成为google a2a协议和a2a-python的 contributor ,分享一下~
在这里插入图片描述


一、前言

有必要在前言中说一些八股文巴拉巴拉…

1.1 A2A 协议是什么?

A2A(Agent-to-Agent)协议 是由 Google 提出的一种用于 AI 智能体之间通信的标准协议。其目标是建立一种通用的、标准化的通信机制,使得不同智能体能够互相理解、协作,并在复杂的多代理系统中高效地完成任务。

🎯 核心目标:
  • 互操作性:不同厂商或框架开发的智能体可以无缝沟通。
  • 模块化设计:每个代理专注于特定技能,通过组合构建复杂系统。
  • 可扩展性:支持多种传输方式(HTTP、WebSocket)、多种模型后端(OpenAI、Anthropic、本地模型等)。
  • 去中心化架构:代理之间直接通信,无需中央调度器。
📦 主要功能:
  • 消息传递:定义了统一的消息格式,包括文本、文件、图像等内容类型。
  • 任务执行:支持异步任务提交与结果返回。
  • 能力发现:代理可通过注册中心发现彼此的能力。
  • 流式响应:支持实时、渐进式的交互体验。

参考 A2A 官方文档


1.2 官方 SDK 介绍(GitHub 地址

官方提供的 Python SDK 是 A2A 协议的实现基础库,为开发者提供了一套完整的工具链来创建、连接和运行 AI 智能体。

🧱 主要功能模块:
模块 功能描述
a2a/client 提供客户端 API,用于向其他代理发送请求、获取响应
a2a/server 提供服务器端 API,用于定义代理行为、处理请求
a2a/models 包含消息、任务、内容等数据模型定义
a2a/discovery 支持代理之间的自动发现与注册
a2a/utils 工具函数,如序列化、日志、网络辅助等
a2a/workflow 支持多个代理之间协同完成复杂任务
📁 SDK 结构概览:
a2a/
├── client/          # 客户端相关
├── discovery/       # 代理发现服务
├── models/          # 数据模型定义
├── server/          # 服务端接口与实现
├── utils/           # 工具函数
├── workflow/        # 多代理工作流支持
└── __init__.py      # 初始化入口

该 SDK 采用轻量级设计,便于集成到各种应用中,并支持自定义扩展,适合用于构建企业级或多代理系统。


1.3 python-a2a 项目介绍(GitHub 地址

最新版本功能

python-a2a 是一个基于 A2A 协议的完整 Python 实现,提供了比官方 SDK 更丰富的示例和工具集,旨在帮助开发者快速搭建自己的 AI 代理系统。

🌟 项目核心功能:
  • ✅ 支持 OpenAI、Anthropic、AWS Bedrock 等主流 LLM 接入
  • ✅ 内置 Web UI 和 RESTful 接口,方便可视化调试
  • ✅ 示例丰富:从 Hello World 到旅行规划助手、天气助手应有尽有
  • ✅ 支持本地模型部署(llama.cpp、Ollama 等)
  • ✅ 集成 LangChain 和 MCP(Model Control Protocol),增强代理控制能力
  • ✅ 支持流式输出、任务调度、多代理路由等功能
🗂️ 项目目录结构解析:
python-a2a/
├── docs/                # 文档说明
├── examples/            # 各类代理示例
│   ├── ai_powered_agents # 基于大模型的代理
│   ├── applications     # 应用场景示例(旅行规划、天气助手)
│   ├── workflows        # 多代理流程示例
│   └── streaming        # 流式输出支持
├── python_a2a/         # 核心库代码
│   ├── client/          # 客户端相关模块
│   ├── server/          # 服务端与 LLM 接口
│   ├── models/          # 数据模型
│   ├── discovery/       # 代理发现机制
│   └── utils/          # 工具函数
├── notebooks/           # Jupyter Notebook 教程
└── tests/              # 单元测试

二,基于官方sdk运行结合本地模型实现

demo源码地址:https://github.com/Linux2010/A2A/tree/feat-examples/a2a-python-sdk/examples/travel_planner

  • 基于官方helloworld-demo魔改,steam流式模式

2.1 运行效果

  • server端(steam模式交互)

  • client端(steam模式交互)

2.2 client

  • 修改为这是一个通用的client,理论上支持官方sdk协议下的各种server.py的连接
from a2a.client import A2AClient
from typing import Any
import httpx
from uuid import uuid4
import asyncio

def print_welcome_message() -> None:
    print("欢迎使用通用A2A客户端!")
    print("请输入您的查询(输入 'exit' 退出):")

def get_user_query() -> str:
    return input("\n> ")

async def interact_with_server(client: A2AClient) -> None:
    while True:
        user_input = get_user_query()
        if user_input.lower() == 'exit':
            print("再见!")
            break

        send_message_payload: dict[str, Any] = {
            'message': {
                'role': 'user',
                'parts': [
                    {'type': 'text', 'text': user_input}
                ],
                'messageId': uuid4().hex,
            },
        }

        try:
            stream_response = client.send_message_streaming(
                payload=send_message_payload
            )
            # print as steam type
            async for chunk in stream_response:
                data = chunk.model_dump(mode='json', exclude_none=True)
                for part in data.get('result', {}).get('parts', []):
                    if part.get('type') == 'text':
                        print(part.get('text'), end='',flush=True)
                        await asyncio.sleep(0.05)
        except Exception as e:
            print(f"发生错误: {e}")

async def main() -> None:
    print_welcome_message()
    async with httpx.AsyncClient() as httpx_client:
        client = await A2AClient.get_client_from_agent_card_url(
            httpx_client, 'http://localhost:9999'
        )
        await interact_with_server(client)


if __name__ == '__main__':
    asyncio.run(main())

2.3 server

  • main.py
import click

from agent_executor import HelloWorldAgentExecutor

from a2a.server import A2AServer
from a2a.server.request_handlers import DefaultA2ARequestHandler
from a2a.types import (
    AgentAuthentication,
    AgentCapabilities,
    AgentCard,
    AgentSkill,
)


@click.command()
@click.option('--host', 'host', default='localhost')
@click.option('--port', 'port', default=9999)
def main(host: str, port: int):
    skill = AgentSkill(
        id='travel_planner',
        name='travel planner agent',
        description='travel planner',
        tags=['travel planner'],
        examples=['hello', 'nice to meet you!'],
    )

    agent_card = AgentCard(
        name='travel planner Agent',
        description='travel planner',
        url='http://localhost:9999/',
        version='1.0.0',
        defaultInputModes=['text'],
        defaultOutputModes=['text'],
        capabilities=AgentCapabilities(),
        skills=[skill],
        authentication=AgentAuthentication(schemes=['public']),
    )

    request_handler = DefaultA2ARequestHandler(
        agent_executor=HelloWorldAgentExecutor()
    )

    server = A2AServer(agent_card=agent_card, request_handler=request_handler)
    server.start(host=host, port=port)


if __name__ == '__main__':
    main()

  • agent_executor.py
from uuid import uuid4
from agent import TravelPlannerAgent

from typing_extensions import override

from a2a.server.agent_execution import BaseAgentExecutor
from a2a.server.events import EventQueue
from a2a.types import (
    MessageSendParams,
    Message,
    Part,
    Role,
    SendMessageRequest,
    SendStreamingMessageRequest,
    Task,
    TextPart,
)



class HelloWorldAgentExecutor(BaseAgentExecutor):
    """Test AgentProxy Implementation."""

    def __init__(self):
        self.agent = TravelPlannerAgent()

    @override
    async def on_message_send(
        self,
        request: SendMessageRequest,
        event_queue: EventQueue,
        task: Task | None,
    ) -> None:
        params: MessageSendParams = request.params
        query = self._get_user_query(params)
        result = await self.agent.invoke(query)

        message: Message = Message(
            role=Role.agent,
            parts=[Part(TextPart(text=result))],
            messageId=str(uuid4()),
        )
        print(message)
        event_queue.enqueue_event(message)

    @override
    async def on_message_stream(
        self,
        request: SendStreamingMessageRequest,
        event_queue: EventQueue,
        task: Task | None,
    ) -> None:
        params: MessageSendParams = request.params
        query = self._get_user_query(params)

        async for chunk in self.agent.stream(query):
            message: Message = Message(
                role=Role.agent,
                parts=[Part(TextPart(text=chunk['content']))],
                messageId=str(uuid4()),
                final=chunk['done'],
            )
            print(chunk['content'])
            event_queue.enqueue_event(message)

    def _get_user_query(self, task_send_params: MessageSendParams) -> str:
        """Helper to get user query from task send params."""
        part = task_send_params.message.parts[0].root
        if not isinstance(part, TextPart):
            raise ValueError('Only text parts are supported')
        return part.text

2.4 agent

  • 使用 LangChain 的 ChatOpenAI 来实现本地大模型交互
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import json
from collections.abc import AsyncGenerator

class TravelPlannerAgent:
    """ travel planner Agent."""

    def __init__(self):
        """初始化对话模型"""
        try:
            with open("config.json") as f:
                config = json.load(f)
            self.model = ChatOpenAI(
                model=config["model_name"],
                base_url=config["base_url"],
                api_key=config["api_key"],
                temperature=0.7  # 控制生成随机性(0-2,越大越随机)
            )
        except FileNotFoundError:
            print("错误:找不到配置文件 config.json")
            exit()
        except KeyError as e:
            print(f"配置文件缺少必要字段:{e}")
            exit()

    async def stream(self, query: str) -> AsyncGenerator[str, None]:

        """流式返回大模型的响应给客户端."""
        try:
            # 初始化对话历史(可添加系统消息)
            messages = [
                SystemMessage(
                    content="""
                你是一个专业的旅行助手,专长于旅行规划、目的地信息和旅行推荐。
                你的目标是根据用户的偏好和约束,帮助他们规划愉快、安全和现实的旅行。
                
                提供信息时:
                - 建议要具体且实用
                - 考虑季节性、预算限制和旅行物流
                - 强调文化体验和真实的当地活动
                - 包括目的地相关的实用旅行小贴士
                - 适当的时候用标题和项目符号格式清晰展示信息
                
                对于行程:
                - 根据景点之间的旅行时间创建现实的每日安排
                - 平衡热门旅游景点和鲜为人知的体验
                - 包含大约的时间和实际的后勤安排
                - 建议突出当地美食的用餐选择
                - 考虑天气、当地事件和开放时间进行规划
                
                始终保持有帮助、热情但现实的语气,并在适当时候承认自己的知识有限。
                """
                )
            ]

            # 添加用户消息到历史
            messages.append(HumanMessage(content=query))

            # 流式调用模型生成回复
            for chunk in self.model.stream(messages):
                # 返回文本内容块
                if hasattr(chunk, 'content') and chunk.content:
                    yield  {'content': chunk.content, 'done': False}
            yield {'content': '\n', 'done': True}

        except Exception as e:
            print(f"发生错误:{str(e)}")
            yield "对不起,处理您的请求时发生了错误。"

2.5 修改配置信息并运行

  • 支持配置自定义模型地址和模型名称
  • 修改config.json
{
  "model_name":"Qwen/QwQ-32B",
  "api_key": "sk-xxxxx",
  "base_url": "https://api.siliconflow.cn/v1"
}
  • 查看readme安装依赖并运行server
uv run . # 依赖也可以通过pip安装依赖,server运行: python __main__.py
  • 运行client
uv run common_client.py # client运行也可以用过: python common_client.py

三,基于python-a2a结合本地模型实现

个人实现demo源码地址:https://github.com/Linux2010/python-a2a/tree/feat-examples-job/examples/applications_local_model

3.1 效果展示

  • server端

python-a2a有关于server路由下的agent的卡片介绍:http://127.0.0.1:5001/

  • client端

3.2 client

#!/usr/bin/env python

import sys
import argparse
import socket
import time
import threading
import multiprocessing

def check_dependencies():
    """Check if required dependencies are installed"""
    missing_deps = []

    try:
        import python_a2a
    except ImportError:
        missing_deps.append("python-a2a")

    try:
        import flask
    except ImportError:
        missing_deps.append("flask")

    if missing_deps:
        print("❌ Missing dependencies:")
        for dep in missing_deps:
            print(f"  - {dep}")

        print("\nPlease install the required dependencies:")
        print("    pip install \"python-a2a[server]\"")
        print("\nThen run this example again.")
        return False

    print("✅ All dependencies are installed correctly!")
    return True

def find_available_port(start_port=5000, max_tries=10):
    """Find an available port starting from start_port"""
    for port in range(start_port, start_port + max_tries):
        try:
            # Try to create a socket on the port
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.bind(('localhost', port))
            sock.close()
            return port
        except OSError:
            # Port is already in use, try the next one
            continue

    # If we get here, no ports were available
    print(f"⚠️  Could not find an available port in range {start_port}-{start_port + max_tries - 1}")
    return start_port  # Return the start port as default

def parse_arguments():
    """Parse command line arguments"""
    parser = argparse.ArgumentParser(description="Simple A2A Client Example")
    parser.add_argument(
        "--port", type=int, default=None,
        help="Port to run the server on (default: auto-select an available port)"
    )
    parser.add_argument(
        "--external", type=str, default=None,
        help="External A2A endpoint URL (if provided, won't start local server)"
    )
    return parser.parse_args()

def start_local_server(port):
    """Start a local A2A server on the specified port"""
    from python_a2a import A2AServer, run_server, AgentCard, AgentSkill, TaskStatus, TaskState

    # Create an agent card with skills information
    agent_card = AgentCard(
        name="Demo Agent",
        description="A simple A2A server for demonstration purposes",
        url=f"http://localhost:{port}",
        version="1.0.0",
        skills=[
            AgentSkill(
                name="Echo",
                description="Echoes back your message",
                examples=["Hello!", "How are you?"]
            ),
    AgentSkill(
                name="Greeting",
                description="Greets you with a friendly message",
                examples=["Greet me", "Say hello"]
            )
        ]
    )
    
    # Create a simple A2A server
    class DemoServer(A2AServer):
        def __init__(self):
            # Initialize with our agent card
            super().__init__(agent_card=agent_card)
        
        def handle_task(self, task):
            # Extract message text
            message_data = task.message or {}
            content = message_data.get("content", {})
            text = content.get("text", "") if isinstance(content, dict) else ""
            
            # Prepare response based on message content
            text_lower = text.lower()
            
            if "greet" in text_lower or "hello" in text_lower or "hi" in text_lower:
                response = f"👋 Hello there! I'm the Demo Agent. How can I help you today?"
            else:
                response = f"You said: {text}\n\nI'm a simple demo agent for the A2A protocol. Try saying 'Hello' or 'Greet me'!"
            
            # Create response artifact
            task.artifacts = [{
                "parts": [{"type": "text", "text": response}]
            }]
            
            task.status = TaskStatus(state=TaskState.COMPLETED)
            return task
    
    # Create and run the server
    server = DemoServer()
    run_server(server, host="localhost", port=port)

def main():
    # First, check dependencies
    if not check_dependencies():
        return 1
    
    # Parse command line arguments
    args = parse_arguments()
    
    # Import after checking dependencies
    from python_a2a import A2AClient
    
    # Handle external or local server
    if args.external:
        endpoint_url = args.external
        print(f"\n🚀 Connecting to external A2A agent at: {endpoint_url}")
        server_process = None
    else:
        # Find an available port if none was specified
        if args.port is None:
            port = find_available_port()
            print(f"🔍 Auto-selected port: {port}")
        else:
            port = args.port
            print(f"🔍 Using specified port: {port}")
        
        endpoint_url = f"http://localhost:{port}"
        
        # Start a local server in a separate process
        print(f"\n🚀 Starting a local demo A2A server on port {port}...")
        server_process = multiprocessing.Process(target=start_local_server, args=(port,))
        server_process.start()
        
        # Give the server some time to start
        print("⏳ Waiting for server to start...")
        time.sleep(2)
    
    try:
        # Create an A2A client and connect to the server
        print(f"🔌 Connecting to A2A agent at: {endpoint_url}")
        client = A2AClient(endpoint_url)
        
        # Try to get agent information
        try:
            print("\n=== Agent Information ===")
            print(f"Name: {client.agent_card.name}")
            print(f"Description: {client.agent_card.description}")
            print(f"Version: {client.agent_card.version}")
            
            if client.agent_card.skills:
                print("\nAvailable Skills:")
                for skill in client.agent_card.skills:
                    print(f"- {skill.name}: {skill.description}")
                    if skill.examples:
                        print(f"  Examples: {', '.join(skill.examples)}")
        except Exception as e:
            print(f"\n⚠️ Could not retrieve agent card: {e}")
            print("The agent may not support the A2A discovery protocol.")
            print("You can still send messages to the agent.")
        
        # Interactive message sending loop
        print("\n=== Send Messages to the Agent ===")
        print("Type your messages (or 'exit' to quit):")
        
        while True:
            try:
                user_input = input("\n> ")
                if user_input.lower() in ["exit", "quit", "q"]:
                    break
                
                # Send the message and get the response
                print("\nSending message to agent...")
                response = client.ask(user_input)
                
                # Print the response
                print("\nAgent response:")
                print(f"{response}")
                
            except KeyboardInterrupt:
                print("\nExiting...")
                break
            except Exception as e:
                print(f"\n❌ Error: {e}")
                print("Try sending a different message or check your connection.")
    
    except Exception as e:
        print(f"\n❌ Error connecting to agent: {e}")
        print("\nPossible reasons:")
        print("- The endpoint URL is incorrect")
        print("- The agent server is not running")
        print("- Network connectivity issues")
        print("\nPlease check the URL and try again.")
        return 1
    finally:
        # Clean up the server process if we started one
        if server_process:
            print("\n🛑 Stopping local server...")
            server_process.terminate()
            server_process.join(timeout=2)
            print("✅ Local server stopped")
    
    print("\n=== What's Next? ===")
    print("1. Try 'simple_server.py' to create your own A2A server")
    print("2. Try 'function_calling.py' to use function calling with A2A")
    print("3. Try the other examples to explore more A2A features")
    
    print("\n🎉 You've successfully used the A2A client! 🎉")
    return 0

if __name__ == "__main__":
    try:
        sys.exit(main())
    except KeyboardInterrupt:
        print("\n✅ Program interrupted by user")
        sys.exit(0)

3.3 server&agent

#!/usr/bin/env python
"""
OpenAI 旅行规划助手(中文版)

一个使用 OpenAI 和 A2A 构建的完整旅行规划系统。
此示例演示了如何构建实用的旅行规划器,结合 OpenAI 的能力与专用代理。

运行方式:
    export OPENAI_API_KEY=your_api_key
    python openai_travel_planner_zh.py

依赖安装:
    pip install "python-a2a[openai,server]"
"""

import sys
import time
import argparse
import socket
import multiprocessing
import re
import json



# 检查依赖项
try:
    import python_a2a
    import openai
    import flask
except ImportError as e:
    module_name = getattr(e, 'name', str(e))
    print(f"❌ 缺少依赖: {module_name}")
    print("请安装所需的包:")
    print("    pip install \"python-a2a[openai,server]\"")
    sys.exit(1)

# 导入所有必要的 A2A 组件
# 装饰器可能在主包中
from python_a2a import A2AServer, run_server, AgentCard, AgentSkill, TaskStatus, TaskState
from python_a2a import OpenAIA2AServer, A2AClient

# 尝试从主包获取装饰器
try:
    from python_a2a import skill, agent
except ImportError:
    # 如果不可用,则定义简化版本
    print("⚠️ 无法导入 skill 和 agent 装饰器,使用简化版本")
    
    def skill(name=None, description=None, tags=None, examples=None):
        """简化版技能装饰器"""
        def decorator(func):
            func._skill_info = {
                "name": name or func.__name__,
                "description": description or func.__doc__ or "",
                "tags": tags or [],
                "examples": examples or []
            }
            return func
        return decorator
    
    def agent(name=None, description=None, version=None, url=None):
        """简化版代理装饰器"""
        def decorator(cls):
            return cls
        return decorator

def find_available_port(start_port=5000, max_tries=10):
    """从指定端口开始查找可用端口"""
    for port in range(start_port, start_port + max_tries):
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.bind(('localhost', port))
            sock.close()
            return port
        except OSError:
            continue
    
    return start_port + 100  # 返回一个较高的备用端口


def parse_arguments():
    """Parse command line arguments"""
    parser = argparse.ArgumentParser(description="OpenAI Travel Planner Example")
    parser.add_argument(
        "--port", type=int, default=None,
        help="Port for the Travel Planner (default: auto-select)"
    )
    parser.add_argument(
        "--no-test", action="store_true",
        help="Don't run test queries automatically"
    )
    return parser.parse_args()


class TravelKnowledgeBase:
    """补充 OpenAI 知识的旅行信息知识库"""
    
    def __init__(self):
        # 主要目的地的基本签证规则
        self.visa_rules = {
            "usa": "大多数国家需要旅游签证(B-2)。签证免签计划国家需要ESTA。",
            "uk": "许多国家需要标准访客签证。脱欧前欧盟公民享有免签待遇。",
            "eu": "许多非欧盟公民需要申根签证。180天内最多停留90天。",
            "japan": "许多国家享受最长90天的免签待遇。",
            "australia": "短途访问需要eVisitor或ETA。更长时间需要正式签证。",
            "canada": "免签外国公民飞往加拿大需要eTA。其他人需要签证。",
            "china": "几乎所有外国人入境前都需要签证。",
            "thailand": "许多国家享受30天免签。更长时间需要签证。",
            "uae": "许多国籍可落地签。其他人需要预先申请签证。",
            "singapore": "许多国家享受30-90天免签入境。"
        }
        
        # 旅行健康建议
        self.health_advisories = {
            "general": "出行前确保常规疫苗接种是最新的。",
            "tropical": "前往热带地区时考虑接种甲肝、伤寒和黄热病疫苗。",
            "malaria": "前往疟疾流行地区需服用抗疟药。",
            "covid": "检查每个目的地当前的新冠要求,包括检测和疫苗接种。",
            "water": "在许多发展中国家,请只饮用瓶装水或开水,并避免食用冰块。",
            "altitude": "前往高海拔地区时,留出时间适应环境以防止高山症。",
            "insurance": "出国旅行时应始终购买全面的旅行健康保险。"
        }
        
        # 一般旅行贴士
        self.travel_tips = [
            "制作重要文件如护照和身份证的电子版和纸质副本。",
            "在目的地国家注册你所在国家的大使馆或领事馆。",
            "出发前研究当地习俗和法律。",
            "学习目的地语言的一些基本短语。",
            "告知你的银行和信用卡公司你的旅行计划。",
            "打包一个包含基本急救用品的小急救箱以备紧急情况。",
            "使用国际通用的交通应用,如 Uber 或本地等效应用。",
            "考虑在当地购买SIM卡或国际数据套餐。",
            "检查目的地是否有任何安全问题或限制区域。",
            "研究针对游客的典型骗局。"
        ]
    
    def get_visa_info(self, country):
        """获取国家签证信息"""
        country = country.lower()
        # 尝试直接匹配
        if country in self.visa_rules:
            return self.visa_rules[country]
        
        # 尝试部分匹配
        for key, value in self.visa_rules.items():
            if key in country or country in key:
                return value
        
        return "请咨询目的地国家的大使馆或领事馆了解具体签证要求。"
    
    def get_health_advisory(self, region=None):
        """获取特定地区的健康建议"""
        if region and region.lower() in self.health_advisories:
            return self.health_advisories[region.lower()]
        
        # 返回一般建议
        return self.health_advisories["general"]
    
    def get_travel_tips(self, count=3):
        """获取随机旅行贴士"""
        import random
        return random.sample(self.travel_tips, min(count, len(self.travel_tips)))


def test_client(port):
    """对旅行规划器运行测试查询"""
    # 等待服务器启动
    time.sleep(3)
    
    print("\n🧪 正在测试旅行规划器...")
    client = A2AClient(f"http://localhost:{port}")
    
    # 测试查询
    test_queries = [
        "规划一个三天的东京之旅",
        "访问法国的签证要求是什么?",
        "为家庭伦敦度假提供建议"
    ]
    
    for query in test_queries:
        try:
            print(f"\n💬 查询: {query}")
            response = client.ask(query)
            print(f"✈️ 响应: {response}")
            time.sleep(1)
        except Exception as e:
            print(f"❌ 错误: {e}")
    
    print("\n✅ 测试完成!你的旅行规划器已准备就绪。")
    print(f"💻 服务运行地址: http://localhost:{port}")
    print("📝 可尝试提问如: '规划巴黎周末游','泰国的最佳旅行时间',或'海滩度假需要带什么'")
    print("🛑 在服务器终端按 Ctrl+C 停止服务。")


def main():
    # 解析命令行参数
    args = parse_arguments()
    with open("config.json") as f:
        config = json.load(f)
    model_name = config["model_name"]
    base_url = config["base_url"]
    api_key = config["api_key"]

    # 如果未指定则自动选择端口
    if args.port is None:
        port = find_available_port()
        print(f"🔍 自动选择端口: {port}")
    else:
        port = args.port
        print(f"🔍 使用指定端口: {port}")
    
    print("\n✈️ OpenAI 旅行规划器 ✈️")
    print(f"一个完整的旅行规划系统,由 OpenAI {model_name} 提供支持")
    
    # 初始化知识库
    kb = TravelKnowledgeBase()
    
    # 创建基于 OpenAI 的旅行规划器
    class TravelPlanner(A2AServer):
        """
        一个由 OpenAI 驱动的旅行规划器,用于创建行程、
        提供旅行信息以及提供推荐。
        """
        def __init__(self, knowledge_base, openai_model,base_url,api_key):
            # 使用我们的代理卡片初始化
            super().__init__(AgentCard(
                name="旅行规划师",
                description="帮助规划旅行、查找信息和提供推荐的 AI 旅行助手",
                url=f"http://localhost:{port}",
                version="1.0.0",
                skills=[
                    AgentSkill(
                        name="行程规划",
                        description="根据偏好和持续时间创建详细旅行行程",
                        examples=["规划一个三天的东京之旅", "规划一个带孩子的巴黎周末游"]
                    ),
                    AgentSkill(
                        name="旅行信息",
                        description="提供特定旅行信息如签证要求、健康建议等",
                        examples=["日本的签证要求", "泰国的健康建议"]
                    ),
                    AgentSkill(
                        name="推荐",
                        description="提供活动、住宿和餐厅的定制化推荐",
                        examples=["伦敦的事情要做", "罗马适合家庭的餐厅"]
                    )
                ]
            ))
            
            # 存储知识库
            self.kb = knowledge_base
            
            # 初始化带有旅行专用系统提示的 OpenAI 后端
            self.openai = OpenAIA2AServer(
                api_key=api_key,
                model=openai_model,
                base_url=base_url,
                temperature=0.7,
                system_prompt="""
                你是一个专业的旅行助手,专长于旅行规划、目的地信息和旅行推荐。
                你的目标是根据用户的偏好和约束,帮助他们规划愉快、安全和现实的旅行。
                
                提供信息时:
                - 建议要具体且实用
                - 考虑季节性、预算限制和旅行物流
                - 强调文化体验和真实的当地活动
                - 包括目的地相关的实用旅行小贴士
                - 适当的时候用标题和项目符号格式清晰展示信息
                
                对于行程:
                - 根据景点之间的旅行时间创建现实的每日安排
                - 平衡热门旅游景点和鲜为人知的体验
                - 包含大约的时间和实际的后勤安排
                - 建议突出当地美食的用餐选择
                - 考虑天气、当地事件和开放时间进行规划
                
                始终保持有帮助、热情但现实的语气,并在适当时候承认自己的知识有限。
                """
            )
        
        def plan_trip(self, destination, duration=3, interests=None, budget=None):
            """
            创建自定义旅行行程。
            
            参数:
                destination: 旅行目的地城市/国家
                duration: 旅行天数(默认:3天)
                interests: 旅行者兴趣列表(可选)
                budget: 预算水平(低、中、高)(可选)
                
            返回:
                详细的日程安排
            """
            # 格式化 OpenAI 查询
            if interests and budget:
                query = f"为{destination}创建一个详细的{duration}-天行程。兴趣: {interests}。预算: {budget}."
            elif interests:
                query = f"为{destination}创建一个详细的{duration}-天行程。兴趣: {interests}."
            elif budget:
                query = f"为{destination}创建一个详细的{duration}-天行程。预算: {budget}."
            else:
                query = f"为{destination}创建一个详细的{duration}-天行程。"
            
            # 获取 OpenAI 响应
            from python_a2a import Message, TextContent, MessageRole
            message = Message(content=TextContent(text=query), role=MessageRole.USER)
            response = self.openai.handle_message(message)
            
            # 添加一些来自我们知识库的旅行贴士
            tips = self.kb.get_travel_tips(3)
            tips_text = "\n\n实用旅行贴士:\n" + "\n".join(f"- {tip}" for tip in tips)
            
            return response.content.text + tips_text
        
        def get_travel_info(self, country, topic):
            """
            获取目的地的特定旅行信息。
            
            参数:
                country: 目的地国家
                topic: 信息主题(签证、健康、安全等)
                
            返回:
                相关旅行信息
            """
            # 检查签证信息
            if "visa" in topic.lower():
                kb_info = self.kb.get_visa_info(country)
                # 格式化 OpenAI 查询以扩展我们的知识
                query = f"访问{country}的签证要求是什么?添加更多关于此信息的内容: {kb_info}"
            elif "health" in topic.lower():
                kb_info = self.kb.get_health_advisory()
                query = f"访问{country}时应该知道哪些健康注意事项?参考这个建议: {kb_info}"
            else:
                # 其他主题的一般查询
                query = f"关于{topic},去{country}旅行时我应该知道什么?"
            
            # 获取 OpenAI 响应
            from python_a2a import Message, TextContent, MessageRole
            message = Message(content=TextContent(text=query), role=MessageRole.USER)
            response = self.openai.handle_message(message)
            
            return response.content.text
        
        def get_recommendations(self, destination, category, preferences=None):
            """
            获取个性化的旅行推荐。
            
            参数:
                destination: 旅行目的地
                category: 推荐类型(活动、餐厅、酒店等)
                preferences: 更详细的偏好的可选参数
                
            返回:
                推荐列表及其描述
            """
            # 格式化 OpenAI 查询
            if preferences:
                query = f"推荐适合喜欢{preferences}的人在{destination}{category}。"
            else:
                query = f"推荐{destination}最好的{category}。"
            
            # 获取 OpenAI 响应
            from python_a2a import Message, TextContent, MessageRole
            message = Message(content=TextContent(text=query), role=MessageRole.USER)
            response = self.openai.handle_message(message)
            
            return response.content.text
        
        def handle_task(self, task):
            """通过识别意图并路由到适当的技能来处理传入的任务"""
            try:
                # 从任务中提取消息文本
                message_data = task.message or {}
                content = message_data.get("content", {})
                text = content.get("text", "") if isinstance(content, dict) else ""
                
                # 确定消息的意图
                intent, params = self._analyze_intent(text)
                
                # 根据意图路由到合适的技能
                if intent == "trip_planning":
                    response_text = self.plan_trip(**params)
                elif intent == "travel_info":
                    response_text = self.get_travel_info(**params)
                elif intent == "recommendations":
                    response_text = self.get_recommendations(**params)
                else:
                    # 对于一般查询,直接传递给 OpenAI
                    from python_a2a import Message, TextContent, MessageRole
                    message = Message(content=TextContent(text=text), role=MessageRole.USER)
                    response = self.openai.handle_message(message)
                    response_text = response.content.text
                
                # 创建工件与响应
                task.artifacts = [{
                    "parts": [{"type": "text", "text": response_text}]
                }]
                
                # 标记为已完成
                task.status = TaskStatus(state=TaskState.COMPLETED)
                
                return task
            
            except Exception as e:
                # 处理错误
                error_message = f"抱歉,我遇到了一个错误: {str(e)}"
                task.artifacts = [{
                    "parts": [{"type": "text", "text": error_message}]
                }]
                task.status = TaskStatus(state=TaskState.FAILED)
                return task
        
        def _analyze_intent(self, text):
            """
            分析用户的消息以确定意图并提取参数。
            这里使用简单的模式匹配,在实际系统中可以增强 NLP 功能。
            """
            text_lower = text.lower()
            
            # 提取潜在的目的地
            destination_match = re.search(r"(?:in|to|for|visit(?:ing)?)(?:\\s+|\\b)([A-Z][a-zA-Z\\s]+)(?:\\.|\\?|$|\\s+)", text_lower)
            destination = destination_match.group(1).strip() if destination_match else None
            
            # 提取潜在的持续时间
            duration_match = re.search(r"(\d+)(?:-day|天)", text_lower)
            duration = int(duration_match.group(1)) if duration_match else 3
            
            # 提取潜在的兴趣
            interests_match = re.search(r"兴趣: ([^\\n]+)", text_lower)
            interests = interests_match.group(1).strip() if interests_match else None
            
            # 提取潜在的预算
            budget_match = re.search(r"预算: ([^\\n]+)", text_lower)
            budget = budget_match.group(1).strip() if budget_match else None
            
            # 提取潜在的主题
            topic_match = re.search(r"(签证|健康|安全|天气|保险|其他)", text_lower)
            topic = topic_match.group(1).strip() if topic_match else None
            
            # 提取潜在的类别
            category_match = re.search(r"(活动|餐厅|酒店|景点|娱乐)", text_lower)
            category = category_match.group(1).strip() if category_match else None
            
            # 提取潜在的偏好
            preferences_match = re.search(r"喜欢([^\\n]+)", text_lower)
            preferences = preferences_match.group(1).strip() if preferences_match else None
            
            # 确定意图
            if destination and duration:
                return "trip_planning", {
                    "destination": destination,
                    "duration": duration,
                    "interests": interests,
                    "budget": budget
                }
            elif destination and topic:
                return "travel_info", {
                    "country": destination,
                    "topic": topic
                }
            elif destination and category:
                return "recommendations", {
                    "destination": destination,
                    "category": category,
                    "preferences": preferences
                }
            # 默认为一般查询
            return "general", {}
    
    # 创建旅行规划器
    travel_planner = TravelPlanner(kb, model_name,base_url,api_key)
    
    # 打印代理信息
    print("\n=== 旅行规划器信息 ===")
    print(f"名称: {travel_planner.agent_card.name}")
    print(f"描述: {travel_planner.agent_card.description}")
    print(f"网址: {travel_planner.agent_card.url}")
    print(f"OpenAI 模型: {model_name}")
    
    print("\n=== 可用技能 ===")
    for skill in travel_planner.agent_card.skills:
        print(f"- {skill.name}: {skill.description}")
    
    # 如果启用测试,则在单独的进程中启动测试客户端
    client_process = None
    if not args.no_test:
        client_process = multiprocessing.Process(target=test_client, args=(port,))
        client_process.start()
    
    # 启动服务器
    print(f"\n🚀 在 http://localhost:{port} 上启动旅行规划器")
    print("按 Ctrl+C 停止服务器")
    
    try:
        run_server(travel_planner, host="0.0.0.0", port=port)
    except KeyboardInterrupt:
        print("\n✅ 服务器已停止")
    except Exception as e:
        print(f"\n❌ 启动服务器时出错: {e}")
        if "Address already in use" in str(e):
            print(f"\n端口 {port} 已被占用。尝试使用其他端口:")
            print(f"    python openai_travel_planner_zh.py --port {port + 1}")
        return 1
    finally:
        # 清理客户端进程
        if client_process:
            client_process.terminate()
            client_process.join()
    
    print("\n=== 下一步 ===")
    print("1. 尝试 'openai_mcp_agent.py' 示例添加工具功能")
    print("2. 尝试 'knowledge_base.py' 示例创建自己的数据系统")
    
    return 0

if __name__ == "__main__":
    try:
        sys.exit(main())
    except KeyboardInterrupt:
        print("\n✅ 程序被用户中断")
        sys.exit(0)

3.4 修改配置并运行

可以查看readme: https://github.com/Linux2010/python-a2a/blob/feat-examples-job/examples/applications_local_model/readme.md

  • 修改config.json
{
  "model_name":"Qwen/QwQ-32B",
  "api_key": "sk-xxxxx",
  "base_url": "https://api.siliconflow.cn/v1"
}
  • 依赖安装
  pip install python-a2a  # Includes LangChain, MCP, and other integrations
  • server和client运行
# server运行
python openai_travel_planner_zh.py --port 5001 --no-test

# client运行
python app_client.py --external http://127.0.0.1:5001

四、总结

4.1 google-a2a-sdk

google官方的a2a的python实现sdk,当前在迅速迭代,使用体感比较差

  • 当前是基于a2a的标准实现python-demo,但没有使用到function-call和mcp的tools调用,后续持续优化

4.2 python-a2a-sdk

开源社区基于a2a协议的python-sdk,功能相对完善,思路完整,使用体验较好

```bash
# Send a message to an agent
a2a send http://localhost:5000 "What is artificial intelligence?"

# Stream a response in real-time
a2a stream http://localhost:5000 "Generate a step-by-step tutorial for making pasta"

# Start the Agent Flow UI
a2a ui

# Start the Agent Flow UI with custom options
a2a ui --port 9000 --host 0.0.0.0 --storage-dir ~/.my_workflows --debug --no-browser

# Start an OpenAI-powered A2A server
a2a openai --model gpt-4 --system-prompt "You are a helpful coding assistant"

# Start an Anthropic-powered A2A server
a2a anthropic --model claude-3-opus-20240229 --system-prompt "You are a friendly AI teacher"

# Start an MCP server with tools
a2a mcp-serve --name "Data Analysis MCP" --port 5001 --script analysis_tools.py

# Start an MCP-enabled A2A agent
a2a mcp-agent --servers data=http://localhost:5001 calc=http://localhost:5002

# Call an MCP tool directly
a2a mcp-call http://localhost:5001 analyze_csv --params file=data.csv columns=price,date

# Manage agent networks
a2a network --add weather=http://localhost:5001 travel=http://localhost:5002 --save network.json

# Run a workflow from a script
a2a workflow --script research_workflow.py --context initial_data.json





+ 重点介绍: Agent Network(agent flow能力)

```bash
a2a ui

Logo

一站式 AI 云服务平台

更多推荐