【A2A】使用自定义配置大模型,基于A2A协议的python代码实现
本文介绍了如何通过官方SDK和开源框架结合本地大模型实现A2A协议的Python示例。A2A(Agent-to-Agent)协议是由Google提出的智能体间通信标准,旨在实现互操作性、模块化设计和可扩展性。官方SDK提供了客户端、服务器端、数据模型等核心模块,而python-a2a项目则在此基础上扩展了更多功能,如支持主流LLM接入、本地模型部署等。文章详细展示了基于官方SDK的旅行规划示例,并
a2a-github以及开源框架上的example都支持的是google大模型和openai官方大模型,不支持配置化本地模型能力
尝试使用官方sdk和开源社区流行框架sdk,结合本地大模型,分别实现a2a的python-demo,以下是具体实现,(demo提供源码地址)
-
谷歌sdk demo源码地址:https://github.com/Linux2010/A2A/tree/feat-examples/a2a-python-sdk/examples/travel_planner
-
python-a2a-sdk demo源码地址:https://github.com/Linux2010/python-a2a/tree/feat-examples-job/examples/applications_local_model
5月16日更新:
- google的sdk迭代迅速,(本文中官方sdk的python-demo,需依赖使用本文分支代码进行sdk install),项目已迁移至独立项目中,应该是听取了我的意见😁.
- 鉴于官方sdk迭代了,如果要运行文章中介绍的谷歌sdk-demo,请使用文中的该分支版本install sdk.
- 官方最新地址:https://github.com/google/a2a-python
- 讨论地址:https://github.com/google/A2A/discussions/491
PS:非常高兴,能成为google a2a协议和a2a-python的 contributor ,分享一下~
一、前言
有必要在前言中说一些八股文巴拉巴拉…
1.1 A2A 协议是什么?
A2A(Agent-to-Agent)协议 是由 Google 提出的一种用于 AI 智能体之间通信的标准协议。其目标是建立一种通用的、标准化的通信机制,使得不同智能体能够互相理解、协作,并在复杂的多代理系统中高效地完成任务。
🎯 核心目标:
- 互操作性:不同厂商或框架开发的智能体可以无缝沟通。
- 模块化设计:每个代理专注于特定技能,通过组合构建复杂系统。
- 可扩展性:支持多种传输方式(HTTP、WebSocket)、多种模型后端(OpenAI、Anthropic、本地模型等)。
- 去中心化架构:代理之间直接通信,无需中央调度器。
📦 主要功能:
- 消息传递:定义了统一的消息格式,包括文本、文件、图像等内容类型。
- 任务执行:支持异步任务提交与结果返回。
- 能力发现:代理可通过注册中心发现彼此的能力。
- 流式响应:支持实时、渐进式的交互体验。
参考 A2A 官方文档
1.2 官方 SDK 介绍(GitHub 地址)
官方提供的 Python SDK 是 A2A 协议的实现基础库,为开发者提供了一套完整的工具链来创建、连接和运行 AI 智能体。
🧱 主要功能模块:
| 模块 | 功能描述 |
|---|---|
a2a/client |
提供客户端 API,用于向其他代理发送请求、获取响应 |
a2a/server |
提供服务器端 API,用于定义代理行为、处理请求 |
a2a/models |
包含消息、任务、内容等数据模型定义 |
a2a/discovery |
支持代理之间的自动发现与注册 |
a2a/utils |
工具函数,如序列化、日志、网络辅助等 |
a2a/workflow |
支持多个代理之间协同完成复杂任务 |
📁 SDK 结构概览:
a2a/
├── client/ # 客户端相关
├── discovery/ # 代理发现服务
├── models/ # 数据模型定义
├── server/ # 服务端接口与实现
├── utils/ # 工具函数
├── workflow/ # 多代理工作流支持
└── __init__.py # 初始化入口
该 SDK 采用轻量级设计,便于集成到各种应用中,并支持自定义扩展,适合用于构建企业级或多代理系统。
1.3 python-a2a 项目介绍(GitHub 地址)
最新版本功能

python-a2a 是一个基于 A2A 协议的完整 Python 实现,提供了比官方 SDK 更丰富的示例和工具集,旨在帮助开发者快速搭建自己的 AI 代理系统。
🌟 项目核心功能:
- ✅ 支持 OpenAI、Anthropic、AWS Bedrock 等主流 LLM 接入
- ✅ 内置 Web UI 和 RESTful 接口,方便可视化调试
- ✅ 示例丰富:从 Hello World 到旅行规划助手、天气助手应有尽有
- ✅ 支持本地模型部署(llama.cpp、Ollama 等)
- ✅ 集成 LangChain 和 MCP(Model Control Protocol),增强代理控制能力
- ✅ 支持流式输出、任务调度、多代理路由等功能
🗂️ 项目目录结构解析:
python-a2a/
├── docs/ # 文档说明
├── examples/ # 各类代理示例
│ ├── ai_powered_agents # 基于大模型的代理
│ ├── applications # 应用场景示例(旅行规划、天气助手)
│ ├── workflows # 多代理流程示例
│ └── streaming # 流式输出支持
├── python_a2a/ # 核心库代码
│ ├── client/ # 客户端相关模块
│ ├── server/ # 服务端与 LLM 接口
│ ├── models/ # 数据模型
│ ├── discovery/ # 代理发现机制
│ └── utils/ # 工具函数
├── notebooks/ # Jupyter Notebook 教程
└── tests/ # 单元测试
二,基于官方sdk运行结合本地模型实现
demo源码地址:https://github.com/Linux2010/A2A/tree/feat-examples/a2a-python-sdk/examples/travel_planner
- 基于官方helloworld-demo魔改,steam流式模式
2.1 运行效果
- server端(steam模式交互)

- client端(steam模式交互)


2.2 client
- 修改为这是一个通用的client,理论上支持官方sdk协议下的各种server.py的连接
from a2a.client import A2AClient
from typing import Any
import httpx
from uuid import uuid4
import asyncio
def print_welcome_message() -> None:
print("欢迎使用通用A2A客户端!")
print("请输入您的查询(输入 'exit' 退出):")
def get_user_query() -> str:
return input("\n> ")
async def interact_with_server(client: A2AClient) -> None:
while True:
user_input = get_user_query()
if user_input.lower() == 'exit':
print("再见!")
break
send_message_payload: dict[str, Any] = {
'message': {
'role': 'user',
'parts': [
{'type': 'text', 'text': user_input}
],
'messageId': uuid4().hex,
},
}
try:
stream_response = client.send_message_streaming(
payload=send_message_payload
)
# print as steam type
async for chunk in stream_response:
data = chunk.model_dump(mode='json', exclude_none=True)
for part in data.get('result', {}).get('parts', []):
if part.get('type') == 'text':
print(part.get('text'), end='',flush=True)
await asyncio.sleep(0.05)
except Exception as e:
print(f"发生错误: {e}")
async def main() -> None:
print_welcome_message()
async with httpx.AsyncClient() as httpx_client:
client = await A2AClient.get_client_from_agent_card_url(
httpx_client, 'http://localhost:9999'
)
await interact_with_server(client)
if __name__ == '__main__':
asyncio.run(main())
2.3 server
- main.py
import click
from agent_executor import HelloWorldAgentExecutor
from a2a.server import A2AServer
from a2a.server.request_handlers import DefaultA2ARequestHandler
from a2a.types import (
AgentAuthentication,
AgentCapabilities,
AgentCard,
AgentSkill,
)
@click.command()
@click.option('--host', 'host', default='localhost')
@click.option('--port', 'port', default=9999)
def main(host: str, port: int):
skill = AgentSkill(
id='travel_planner',
name='travel planner agent',
description='travel planner',
tags=['travel planner'],
examples=['hello', 'nice to meet you!'],
)
agent_card = AgentCard(
name='travel planner Agent',
description='travel planner',
url='http://localhost:9999/',
version='1.0.0',
defaultInputModes=['text'],
defaultOutputModes=['text'],
capabilities=AgentCapabilities(),
skills=[skill],
authentication=AgentAuthentication(schemes=['public']),
)
request_handler = DefaultA2ARequestHandler(
agent_executor=HelloWorldAgentExecutor()
)
server = A2AServer(agent_card=agent_card, request_handler=request_handler)
server.start(host=host, port=port)
if __name__ == '__main__':
main()
- agent_executor.py
from uuid import uuid4
from agent import TravelPlannerAgent
from typing_extensions import override
from a2a.server.agent_execution import BaseAgentExecutor
from a2a.server.events import EventQueue
from a2a.types import (
MessageSendParams,
Message,
Part,
Role,
SendMessageRequest,
SendStreamingMessageRequest,
Task,
TextPart,
)
class HelloWorldAgentExecutor(BaseAgentExecutor):
"""Test AgentProxy Implementation."""
def __init__(self):
self.agent = TravelPlannerAgent()
@override
async def on_message_send(
self,
request: SendMessageRequest,
event_queue: EventQueue,
task: Task | None,
) -> None:
params: MessageSendParams = request.params
query = self._get_user_query(params)
result = await self.agent.invoke(query)
message: Message = Message(
role=Role.agent,
parts=[Part(TextPart(text=result))],
messageId=str(uuid4()),
)
print(message)
event_queue.enqueue_event(message)
@override
async def on_message_stream(
self,
request: SendStreamingMessageRequest,
event_queue: EventQueue,
task: Task | None,
) -> None:
params: MessageSendParams = request.params
query = self._get_user_query(params)
async for chunk in self.agent.stream(query):
message: Message = Message(
role=Role.agent,
parts=[Part(TextPart(text=chunk['content']))],
messageId=str(uuid4()),
final=chunk['done'],
)
print(chunk['content'])
event_queue.enqueue_event(message)
def _get_user_query(self, task_send_params: MessageSendParams) -> str:
"""Helper to get user query from task send params."""
part = task_send_params.message.parts[0].root
if not isinstance(part, TextPart):
raise ValueError('Only text parts are supported')
return part.text
2.4 agent
- 使用 LangChain 的 ChatOpenAI 来实现本地大模型交互
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import json
from collections.abc import AsyncGenerator
class TravelPlannerAgent:
""" travel planner Agent."""
def __init__(self):
"""初始化对话模型"""
try:
with open("config.json") as f:
config = json.load(f)
self.model = ChatOpenAI(
model=config["model_name"],
base_url=config["base_url"],
api_key=config["api_key"],
temperature=0.7 # 控制生成随机性(0-2,越大越随机)
)
except FileNotFoundError:
print("错误:找不到配置文件 config.json")
exit()
except KeyError as e:
print(f"配置文件缺少必要字段:{e}")
exit()
async def stream(self, query: str) -> AsyncGenerator[str, None]:
"""流式返回大模型的响应给客户端."""
try:
# 初始化对话历史(可添加系统消息)
messages = [
SystemMessage(
content="""
你是一个专业的旅行助手,专长于旅行规划、目的地信息和旅行推荐。
你的目标是根据用户的偏好和约束,帮助他们规划愉快、安全和现实的旅行。
提供信息时:
- 建议要具体且实用
- 考虑季节性、预算限制和旅行物流
- 强调文化体验和真实的当地活动
- 包括目的地相关的实用旅行小贴士
- 适当的时候用标题和项目符号格式清晰展示信息
对于行程:
- 根据景点之间的旅行时间创建现实的每日安排
- 平衡热门旅游景点和鲜为人知的体验
- 包含大约的时间和实际的后勤安排
- 建议突出当地美食的用餐选择
- 考虑天气、当地事件和开放时间进行规划
始终保持有帮助、热情但现实的语气,并在适当时候承认自己的知识有限。
"""
)
]
# 添加用户消息到历史
messages.append(HumanMessage(content=query))
# 流式调用模型生成回复
for chunk in self.model.stream(messages):
# 返回文本内容块
if hasattr(chunk, 'content') and chunk.content:
yield {'content': chunk.content, 'done': False}
yield {'content': '\n', 'done': True}
except Exception as e:
print(f"发生错误:{str(e)}")
yield "对不起,处理您的请求时发生了错误。"
2.5 修改配置信息并运行
- 支持配置自定义模型地址和模型名称
- 修改config.json
{
"model_name":"Qwen/QwQ-32B",
"api_key": "sk-xxxxx",
"base_url": "https://api.siliconflow.cn/v1"
}
- 查看readme安装依赖并运行server
uv run . # 依赖也可以通过pip安装依赖,server运行: python __main__.py
- 运行client
uv run common_client.py # client运行也可以用过: python common_client.py
三,基于python-a2a结合本地模型实现
个人实现demo源码地址:https://github.com/Linux2010/python-a2a/tree/feat-examples-job/examples/applications_local_model
3.1 效果展示
- server端
python-a2a有关于server路由下的agent的卡片介绍:http://127.0.0.1:5001/


- client端

3.2 client
#!/usr/bin/env python
import sys
import argparse
import socket
import time
import threading
import multiprocessing
def check_dependencies():
"""Check if required dependencies are installed"""
missing_deps = []
try:
import python_a2a
except ImportError:
missing_deps.append("python-a2a")
try:
import flask
except ImportError:
missing_deps.append("flask")
if missing_deps:
print("❌ Missing dependencies:")
for dep in missing_deps:
print(f" - {dep}")
print("\nPlease install the required dependencies:")
print(" pip install \"python-a2a[server]\"")
print("\nThen run this example again.")
return False
print("✅ All dependencies are installed correctly!")
return True
def find_available_port(start_port=5000, max_tries=10):
"""Find an available port starting from start_port"""
for port in range(start_port, start_port + max_tries):
try:
# Try to create a socket on the port
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('localhost', port))
sock.close()
return port
except OSError:
# Port is already in use, try the next one
continue
# If we get here, no ports were available
print(f"⚠️ Could not find an available port in range {start_port}-{start_port + max_tries - 1}")
return start_port # Return the start port as default
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description="Simple A2A Client Example")
parser.add_argument(
"--port", type=int, default=None,
help="Port to run the server on (default: auto-select an available port)"
)
parser.add_argument(
"--external", type=str, default=None,
help="External A2A endpoint URL (if provided, won't start local server)"
)
return parser.parse_args()
def start_local_server(port):
"""Start a local A2A server on the specified port"""
from python_a2a import A2AServer, run_server, AgentCard, AgentSkill, TaskStatus, TaskState
# Create an agent card with skills information
agent_card = AgentCard(
name="Demo Agent",
description="A simple A2A server for demonstration purposes",
url=f"http://localhost:{port}",
version="1.0.0",
skills=[
AgentSkill(
name="Echo",
description="Echoes back your message",
examples=["Hello!", "How are you?"]
),
AgentSkill(
name="Greeting",
description="Greets you with a friendly message",
examples=["Greet me", "Say hello"]
)
]
)
# Create a simple A2A server
class DemoServer(A2AServer):
def __init__(self):
# Initialize with our agent card
super().__init__(agent_card=agent_card)
def handle_task(self, task):
# Extract message text
message_data = task.message or {}
content = message_data.get("content", {})
text = content.get("text", "") if isinstance(content, dict) else ""
# Prepare response based on message content
text_lower = text.lower()
if "greet" in text_lower or "hello" in text_lower or "hi" in text_lower:
response = f"👋 Hello there! I'm the Demo Agent. How can I help you today?"
else:
response = f"You said: {text}\n\nI'm a simple demo agent for the A2A protocol. Try saying 'Hello' or 'Greet me'!"
# Create response artifact
task.artifacts = [{
"parts": [{"type": "text", "text": response}]
}]
task.status = TaskStatus(state=TaskState.COMPLETED)
return task
# Create and run the server
server = DemoServer()
run_server(server, host="localhost", port=port)
def main():
# First, check dependencies
if not check_dependencies():
return 1
# Parse command line arguments
args = parse_arguments()
# Import after checking dependencies
from python_a2a import A2AClient
# Handle external or local server
if args.external:
endpoint_url = args.external
print(f"\n🚀 Connecting to external A2A agent at: {endpoint_url}")
server_process = None
else:
# Find an available port if none was specified
if args.port is None:
port = find_available_port()
print(f"🔍 Auto-selected port: {port}")
else:
port = args.port
print(f"🔍 Using specified port: {port}")
endpoint_url = f"http://localhost:{port}"
# Start a local server in a separate process
print(f"\n🚀 Starting a local demo A2A server on port {port}...")
server_process = multiprocessing.Process(target=start_local_server, args=(port,))
server_process.start()
# Give the server some time to start
print("⏳ Waiting for server to start...")
time.sleep(2)
try:
# Create an A2A client and connect to the server
print(f"🔌 Connecting to A2A agent at: {endpoint_url}")
client = A2AClient(endpoint_url)
# Try to get agent information
try:
print("\n=== Agent Information ===")
print(f"Name: {client.agent_card.name}")
print(f"Description: {client.agent_card.description}")
print(f"Version: {client.agent_card.version}")
if client.agent_card.skills:
print("\nAvailable Skills:")
for skill in client.agent_card.skills:
print(f"- {skill.name}: {skill.description}")
if skill.examples:
print(f" Examples: {', '.join(skill.examples)}")
except Exception as e:
print(f"\n⚠️ Could not retrieve agent card: {e}")
print("The agent may not support the A2A discovery protocol.")
print("You can still send messages to the agent.")
# Interactive message sending loop
print("\n=== Send Messages to the Agent ===")
print("Type your messages (or 'exit' to quit):")
while True:
try:
user_input = input("\n> ")
if user_input.lower() in ["exit", "quit", "q"]:
break
# Send the message and get the response
print("\nSending message to agent...")
response = client.ask(user_input)
# Print the response
print("\nAgent response:")
print(f"{response}")
except KeyboardInterrupt:
print("\nExiting...")
break
except Exception as e:
print(f"\n❌ Error: {e}")
print("Try sending a different message or check your connection.")
except Exception as e:
print(f"\n❌ Error connecting to agent: {e}")
print("\nPossible reasons:")
print("- The endpoint URL is incorrect")
print("- The agent server is not running")
print("- Network connectivity issues")
print("\nPlease check the URL and try again.")
return 1
finally:
# Clean up the server process if we started one
if server_process:
print("\n🛑 Stopping local server...")
server_process.terminate()
server_process.join(timeout=2)
print("✅ Local server stopped")
print("\n=== What's Next? ===")
print("1. Try 'simple_server.py' to create your own A2A server")
print("2. Try 'function_calling.py' to use function calling with A2A")
print("3. Try the other examples to explore more A2A features")
print("\n🎉 You've successfully used the A2A client! 🎉")
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
print("\n✅ Program interrupted by user")
sys.exit(0)
3.3 server&agent
#!/usr/bin/env python
"""
OpenAI 旅行规划助手(中文版)
一个使用 OpenAI 和 A2A 构建的完整旅行规划系统。
此示例演示了如何构建实用的旅行规划器,结合 OpenAI 的能力与专用代理。
运行方式:
export OPENAI_API_KEY=your_api_key
python openai_travel_planner_zh.py
依赖安装:
pip install "python-a2a[openai,server]"
"""
import sys
import time
import argparse
import socket
import multiprocessing
import re
import json
# 检查依赖项
try:
import python_a2a
import openai
import flask
except ImportError as e:
module_name = getattr(e, 'name', str(e))
print(f"❌ 缺少依赖: {module_name}")
print("请安装所需的包:")
print(" pip install \"python-a2a[openai,server]\"")
sys.exit(1)
# 导入所有必要的 A2A 组件
# 装饰器可能在主包中
from python_a2a import A2AServer, run_server, AgentCard, AgentSkill, TaskStatus, TaskState
from python_a2a import OpenAIA2AServer, A2AClient
# 尝试从主包获取装饰器
try:
from python_a2a import skill, agent
except ImportError:
# 如果不可用,则定义简化版本
print("⚠️ 无法导入 skill 和 agent 装饰器,使用简化版本")
def skill(name=None, description=None, tags=None, examples=None):
"""简化版技能装饰器"""
def decorator(func):
func._skill_info = {
"name": name or func.__name__,
"description": description or func.__doc__ or "",
"tags": tags or [],
"examples": examples or []
}
return func
return decorator
def agent(name=None, description=None, version=None, url=None):
"""简化版代理装饰器"""
def decorator(cls):
return cls
return decorator
def find_available_port(start_port=5000, max_tries=10):
"""从指定端口开始查找可用端口"""
for port in range(start_port, start_port + max_tries):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('localhost', port))
sock.close()
return port
except OSError:
continue
return start_port + 100 # 返回一个较高的备用端口
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description="OpenAI Travel Planner Example")
parser.add_argument(
"--port", type=int, default=None,
help="Port for the Travel Planner (default: auto-select)"
)
parser.add_argument(
"--no-test", action="store_true",
help="Don't run test queries automatically"
)
return parser.parse_args()
class TravelKnowledgeBase:
"""补充 OpenAI 知识的旅行信息知识库"""
def __init__(self):
# 主要目的地的基本签证规则
self.visa_rules = {
"usa": "大多数国家需要旅游签证(B-2)。签证免签计划国家需要ESTA。",
"uk": "许多国家需要标准访客签证。脱欧前欧盟公民享有免签待遇。",
"eu": "许多非欧盟公民需要申根签证。180天内最多停留90天。",
"japan": "许多国家享受最长90天的免签待遇。",
"australia": "短途访问需要eVisitor或ETA。更长时间需要正式签证。",
"canada": "免签外国公民飞往加拿大需要eTA。其他人需要签证。",
"china": "几乎所有外国人入境前都需要签证。",
"thailand": "许多国家享受30天免签。更长时间需要签证。",
"uae": "许多国籍可落地签。其他人需要预先申请签证。",
"singapore": "许多国家享受30-90天免签入境。"
}
# 旅行健康建议
self.health_advisories = {
"general": "出行前确保常规疫苗接种是最新的。",
"tropical": "前往热带地区时考虑接种甲肝、伤寒和黄热病疫苗。",
"malaria": "前往疟疾流行地区需服用抗疟药。",
"covid": "检查每个目的地当前的新冠要求,包括检测和疫苗接种。",
"water": "在许多发展中国家,请只饮用瓶装水或开水,并避免食用冰块。",
"altitude": "前往高海拔地区时,留出时间适应环境以防止高山症。",
"insurance": "出国旅行时应始终购买全面的旅行健康保险。"
}
# 一般旅行贴士
self.travel_tips = [
"制作重要文件如护照和身份证的电子版和纸质副本。",
"在目的地国家注册你所在国家的大使馆或领事馆。",
"出发前研究当地习俗和法律。",
"学习目的地语言的一些基本短语。",
"告知你的银行和信用卡公司你的旅行计划。",
"打包一个包含基本急救用品的小急救箱以备紧急情况。",
"使用国际通用的交通应用,如 Uber 或本地等效应用。",
"考虑在当地购买SIM卡或国际数据套餐。",
"检查目的地是否有任何安全问题或限制区域。",
"研究针对游客的典型骗局。"
]
def get_visa_info(self, country):
"""获取国家签证信息"""
country = country.lower()
# 尝试直接匹配
if country in self.visa_rules:
return self.visa_rules[country]
# 尝试部分匹配
for key, value in self.visa_rules.items():
if key in country or country in key:
return value
return "请咨询目的地国家的大使馆或领事馆了解具体签证要求。"
def get_health_advisory(self, region=None):
"""获取特定地区的健康建议"""
if region and region.lower() in self.health_advisories:
return self.health_advisories[region.lower()]
# 返回一般建议
return self.health_advisories["general"]
def get_travel_tips(self, count=3):
"""获取随机旅行贴士"""
import random
return random.sample(self.travel_tips, min(count, len(self.travel_tips)))
def test_client(port):
"""对旅行规划器运行测试查询"""
# 等待服务器启动
time.sleep(3)
print("\n🧪 正在测试旅行规划器...")
client = A2AClient(f"http://localhost:{port}")
# 测试查询
test_queries = [
"规划一个三天的东京之旅",
"访问法国的签证要求是什么?",
"为家庭伦敦度假提供建议"
]
for query in test_queries:
try:
print(f"\n💬 查询: {query}")
response = client.ask(query)
print(f"✈️ 响应: {response}")
time.sleep(1)
except Exception as e:
print(f"❌ 错误: {e}")
print("\n✅ 测试完成!你的旅行规划器已准备就绪。")
print(f"💻 服务运行地址: http://localhost:{port}")
print("📝 可尝试提问如: '规划巴黎周末游','泰国的最佳旅行时间',或'海滩度假需要带什么'")
print("🛑 在服务器终端按 Ctrl+C 停止服务。")
def main():
# 解析命令行参数
args = parse_arguments()
with open("config.json") as f:
config = json.load(f)
model_name = config["model_name"]
base_url = config["base_url"]
api_key = config["api_key"]
# 如果未指定则自动选择端口
if args.port is None:
port = find_available_port()
print(f"🔍 自动选择端口: {port}")
else:
port = args.port
print(f"🔍 使用指定端口: {port}")
print("\n✈️ OpenAI 旅行规划器 ✈️")
print(f"一个完整的旅行规划系统,由 OpenAI {model_name} 提供支持")
# 初始化知识库
kb = TravelKnowledgeBase()
# 创建基于 OpenAI 的旅行规划器
class TravelPlanner(A2AServer):
"""
一个由 OpenAI 驱动的旅行规划器,用于创建行程、
提供旅行信息以及提供推荐。
"""
def __init__(self, knowledge_base, openai_model,base_url,api_key):
# 使用我们的代理卡片初始化
super().__init__(AgentCard(
name="旅行规划师",
description="帮助规划旅行、查找信息和提供推荐的 AI 旅行助手",
url=f"http://localhost:{port}",
version="1.0.0",
skills=[
AgentSkill(
name="行程规划",
description="根据偏好和持续时间创建详细旅行行程",
examples=["规划一个三天的东京之旅", "规划一个带孩子的巴黎周末游"]
),
AgentSkill(
name="旅行信息",
description="提供特定旅行信息如签证要求、健康建议等",
examples=["日本的签证要求", "泰国的健康建议"]
),
AgentSkill(
name="推荐",
description="提供活动、住宿和餐厅的定制化推荐",
examples=["伦敦的事情要做", "罗马适合家庭的餐厅"]
)
]
))
# 存储知识库
self.kb = knowledge_base
# 初始化带有旅行专用系统提示的 OpenAI 后端
self.openai = OpenAIA2AServer(
api_key=api_key,
model=openai_model,
base_url=base_url,
temperature=0.7,
system_prompt="""
你是一个专业的旅行助手,专长于旅行规划、目的地信息和旅行推荐。
你的目标是根据用户的偏好和约束,帮助他们规划愉快、安全和现实的旅行。
提供信息时:
- 建议要具体且实用
- 考虑季节性、预算限制和旅行物流
- 强调文化体验和真实的当地活动
- 包括目的地相关的实用旅行小贴士
- 适当的时候用标题和项目符号格式清晰展示信息
对于行程:
- 根据景点之间的旅行时间创建现实的每日安排
- 平衡热门旅游景点和鲜为人知的体验
- 包含大约的时间和实际的后勤安排
- 建议突出当地美食的用餐选择
- 考虑天气、当地事件和开放时间进行规划
始终保持有帮助、热情但现实的语气,并在适当时候承认自己的知识有限。
"""
)
def plan_trip(self, destination, duration=3, interests=None, budget=None):
"""
创建自定义旅行行程。
参数:
destination: 旅行目的地城市/国家
duration: 旅行天数(默认:3天)
interests: 旅行者兴趣列表(可选)
budget: 预算水平(低、中、高)(可选)
返回:
详细的日程安排
"""
# 格式化 OpenAI 查询
if interests and budget:
query = f"为{destination}创建一个详细的{duration}-天行程。兴趣: {interests}。预算: {budget}."
elif interests:
query = f"为{destination}创建一个详细的{duration}-天行程。兴趣: {interests}."
elif budget:
query = f"为{destination}创建一个详细的{duration}-天行程。预算: {budget}."
else:
query = f"为{destination}创建一个详细的{duration}-天行程。"
# 获取 OpenAI 响应
from python_a2a import Message, TextContent, MessageRole
message = Message(content=TextContent(text=query), role=MessageRole.USER)
response = self.openai.handle_message(message)
# 添加一些来自我们知识库的旅行贴士
tips = self.kb.get_travel_tips(3)
tips_text = "\n\n实用旅行贴士:\n" + "\n".join(f"- {tip}" for tip in tips)
return response.content.text + tips_text
def get_travel_info(self, country, topic):
"""
获取目的地的特定旅行信息。
参数:
country: 目的地国家
topic: 信息主题(签证、健康、安全等)
返回:
相关旅行信息
"""
# 检查签证信息
if "visa" in topic.lower():
kb_info = self.kb.get_visa_info(country)
# 格式化 OpenAI 查询以扩展我们的知识
query = f"访问{country}的签证要求是什么?添加更多关于此信息的内容: {kb_info}"
elif "health" in topic.lower():
kb_info = self.kb.get_health_advisory()
query = f"访问{country}时应该知道哪些健康注意事项?参考这个建议: {kb_info}"
else:
# 其他主题的一般查询
query = f"关于{topic},去{country}旅行时我应该知道什么?"
# 获取 OpenAI 响应
from python_a2a import Message, TextContent, MessageRole
message = Message(content=TextContent(text=query), role=MessageRole.USER)
response = self.openai.handle_message(message)
return response.content.text
def get_recommendations(self, destination, category, preferences=None):
"""
获取个性化的旅行推荐。
参数:
destination: 旅行目的地
category: 推荐类型(活动、餐厅、酒店等)
preferences: 更详细的偏好的可选参数
返回:
推荐列表及其描述
"""
# 格式化 OpenAI 查询
if preferences:
query = f"推荐适合喜欢{preferences}的人在{destination}的{category}。"
else:
query = f"推荐{destination}最好的{category}。"
# 获取 OpenAI 响应
from python_a2a import Message, TextContent, MessageRole
message = Message(content=TextContent(text=query), role=MessageRole.USER)
response = self.openai.handle_message(message)
return response.content.text
def handle_task(self, task):
"""通过识别意图并路由到适当的技能来处理传入的任务"""
try:
# 从任务中提取消息文本
message_data = task.message or {}
content = message_data.get("content", {})
text = content.get("text", "") if isinstance(content, dict) else ""
# 确定消息的意图
intent, params = self._analyze_intent(text)
# 根据意图路由到合适的技能
if intent == "trip_planning":
response_text = self.plan_trip(**params)
elif intent == "travel_info":
response_text = self.get_travel_info(**params)
elif intent == "recommendations":
response_text = self.get_recommendations(**params)
else:
# 对于一般查询,直接传递给 OpenAI
from python_a2a import Message, TextContent, MessageRole
message = Message(content=TextContent(text=text), role=MessageRole.USER)
response = self.openai.handle_message(message)
response_text = response.content.text
# 创建工件与响应
task.artifacts = [{
"parts": [{"type": "text", "text": response_text}]
}]
# 标记为已完成
task.status = TaskStatus(state=TaskState.COMPLETED)
return task
except Exception as e:
# 处理错误
error_message = f"抱歉,我遇到了一个错误: {str(e)}"
task.artifacts = [{
"parts": [{"type": "text", "text": error_message}]
}]
task.status = TaskStatus(state=TaskState.FAILED)
return task
def _analyze_intent(self, text):
"""
分析用户的消息以确定意图并提取参数。
这里使用简单的模式匹配,在实际系统中可以增强 NLP 功能。
"""
text_lower = text.lower()
# 提取潜在的目的地
destination_match = re.search(r"(?:in|to|for|visit(?:ing)?)(?:\\s+|\\b)([A-Z][a-zA-Z\\s]+)(?:\\.|\\?|$|\\s+)", text_lower)
destination = destination_match.group(1).strip() if destination_match else None
# 提取潜在的持续时间
duration_match = re.search(r"(\d+)(?:-day|天)", text_lower)
duration = int(duration_match.group(1)) if duration_match else 3
# 提取潜在的兴趣
interests_match = re.search(r"兴趣: ([^\\n]+)", text_lower)
interests = interests_match.group(1).strip() if interests_match else None
# 提取潜在的预算
budget_match = re.search(r"预算: ([^\\n]+)", text_lower)
budget = budget_match.group(1).strip() if budget_match else None
# 提取潜在的主题
topic_match = re.search(r"(签证|健康|安全|天气|保险|其他)", text_lower)
topic = topic_match.group(1).strip() if topic_match else None
# 提取潜在的类别
category_match = re.search(r"(活动|餐厅|酒店|景点|娱乐)", text_lower)
category = category_match.group(1).strip() if category_match else None
# 提取潜在的偏好
preferences_match = re.search(r"喜欢([^\\n]+)", text_lower)
preferences = preferences_match.group(1).strip() if preferences_match else None
# 确定意图
if destination and duration:
return "trip_planning", {
"destination": destination,
"duration": duration,
"interests": interests,
"budget": budget
}
elif destination and topic:
return "travel_info", {
"country": destination,
"topic": topic
}
elif destination and category:
return "recommendations", {
"destination": destination,
"category": category,
"preferences": preferences
}
# 默认为一般查询
return "general", {}
# 创建旅行规划器
travel_planner = TravelPlanner(kb, model_name,base_url,api_key)
# 打印代理信息
print("\n=== 旅行规划器信息 ===")
print(f"名称: {travel_planner.agent_card.name}")
print(f"描述: {travel_planner.agent_card.description}")
print(f"网址: {travel_planner.agent_card.url}")
print(f"OpenAI 模型: {model_name}")
print("\n=== 可用技能 ===")
for skill in travel_planner.agent_card.skills:
print(f"- {skill.name}: {skill.description}")
# 如果启用测试,则在单独的进程中启动测试客户端
client_process = None
if not args.no_test:
client_process = multiprocessing.Process(target=test_client, args=(port,))
client_process.start()
# 启动服务器
print(f"\n🚀 在 http://localhost:{port} 上启动旅行规划器")
print("按 Ctrl+C 停止服务器")
try:
run_server(travel_planner, host="0.0.0.0", port=port)
except KeyboardInterrupt:
print("\n✅ 服务器已停止")
except Exception as e:
print(f"\n❌ 启动服务器时出错: {e}")
if "Address already in use" in str(e):
print(f"\n端口 {port} 已被占用。尝试使用其他端口:")
print(f" python openai_travel_planner_zh.py --port {port + 1}")
return 1
finally:
# 清理客户端进程
if client_process:
client_process.terminate()
client_process.join()
print("\n=== 下一步 ===")
print("1. 尝试 'openai_mcp_agent.py' 示例添加工具功能")
print("2. 尝试 'knowledge_base.py' 示例创建自己的数据系统")
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
print("\n✅ 程序被用户中断")
sys.exit(0)
3.4 修改配置并运行
可以查看readme: https://github.com/Linux2010/python-a2a/blob/feat-examples-job/examples/applications_local_model/readme.md
- 修改config.json
{
"model_name":"Qwen/QwQ-32B",
"api_key": "sk-xxxxx",
"base_url": "https://api.siliconflow.cn/v1"
}
- 依赖安装
pip install python-a2a # Includes LangChain, MCP, and other integrations
- server和client运行
# server运行
python openai_travel_planner_zh.py --port 5001 --no-test
# client运行
python app_client.py --external http://127.0.0.1:5001
四、总结
4.1 google-a2a-sdk
google官方的a2a的python实现sdk,当前在迅速迭代,使用体感比较差
- 当前是基于a2a的标准实现python-demo,但没有使用到function-call和mcp的tools调用,后续持续优化
4.2 python-a2a-sdk
开源社区基于a2a协议的python-sdk,功能相对完善,思路完整,使用体验较好
- python-a2a具备的其他能力,以下是相关介绍:https://github.com/Linux2010/python-a2a/blob/feat-examples-job/README_zh.md
```bash
# Send a message to an agent
a2a send http://localhost:5000 "What is artificial intelligence?"
# Stream a response in real-time
a2a stream http://localhost:5000 "Generate a step-by-step tutorial for making pasta"
# Start the Agent Flow UI
a2a ui
# Start the Agent Flow UI with custom options
a2a ui --port 9000 --host 0.0.0.0 --storage-dir ~/.my_workflows --debug --no-browser
# Start an OpenAI-powered A2A server
a2a openai --model gpt-4 --system-prompt "You are a helpful coding assistant"
# Start an Anthropic-powered A2A server
a2a anthropic --model claude-3-opus-20240229 --system-prompt "You are a friendly AI teacher"
# Start an MCP server with tools
a2a mcp-serve --name "Data Analysis MCP" --port 5001 --script analysis_tools.py
# Start an MCP-enabled A2A agent
a2a mcp-agent --servers data=http://localhost:5001 calc=http://localhost:5002
# Call an MCP tool directly
a2a mcp-call http://localhost:5001 analyze_csv --params file=data.csv columns=price,date
# Manage agent networks
a2a network --add weather=http://localhost:5001 travel=http://localhost:5002 --save network.json
# Run a workflow from a script
a2a workflow --script research_workflow.py --context initial_data.json
+ 重点介绍: Agent Network(agent flow能力)
```bash
a2a ui

更多推荐




所有评论(0)