摘要:2025 年 4 月,Google 正式推出 Agent2Agent (A2A) 协议,并在 2026 年初将其捐赠给 Linux Foundation。这一开放标准旨在解决 AI Agent 生态系统的碎片化问题,使不同框架、不同厂商构建的 Agent 能够像人类同事一样协作。本文深入剖析 A2A 协议的核心架构、通信机制,并与 MCP 协议进行对比分析,提供生产级实现方案。
2026 年的 AI Agent 市场呈现出前所未有的繁荣,但也伴随着严重的碎片化:
- 框架林立:LangGraph、LlamaIndex、AutoGen、CrewAI、Microsoft Agent Framework 等各自为政
- 通信孤岛:每个框架的 Agent 只能在自己的生态内协作,跨框架通信需要定制开发
- 重复建设:每个团队都要重新实现 Agent 发现、任务委派、结果返回等基础能力
- 集成成本高昂:企业想要组合多个 Agent 完成复杂任务,需要编写大量胶水代码
根据 2026 年 1 月 Instaclustr 的调研,83% 的企业在部署多 Agent 系统时遇到了互操作性问题,平均集成成本占项目总预算的 40% 以上。
在 A2A 协议出现之前,市场上的解决方案主要分为三类:
| 方案类型 | 代表产品 | 优点 | 缺点 |
|---|---|---|---|
| 统一框架 | LangGraph, AutoGen | 内部通信高效 | 锁定单一生态 |
| API 网关 | 自定义 REST API | 灵活可控 | 开发维护成本高 |
| 消息队列 | Redis Pub/Sub, Kafka | 解耦性好 | 语义层缺失 |
这些方案都无法从根本上解决问题——缺少一个标准化的 Agent 通信语言。
2025 年 4 月 9 日,Google 在开发者博客上正式宣布 A2A 协议:
"A2A empowers developers to build agents capable of connecting with any other agent built using the protocol and offers users the flexibility to combine agents from various providers."
2026 年初,Google 将 A2A 协议捐赠给 Linux Foundation,标志着这一标准正式进入开源治理阶段。
A2A (Agent2Agent) Protocol 是一个开放标准,使 AI Agent 能够跨不同平台和框架进行通信和协作,无论其底层技术如何。
关键特性:
- 框架无关性:LangGraph Agent 可以与 CrewAI Agent 直接对话
- 厂商中立:不同公司构建的 Agent 可以互操作
- 语义完整:支持任务委派、状态同步、结果返回等完整语义
- 传输多样:支持 HTTP、SSE (Server-Sent Events)、gRPC 等多种传输层
A2A 协议定义了三种核心角色:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Client Agent │ ──────▶ │ A2A Server │ ◀────── │ Task Agent │
│ (任务发起方) │ │ (协议网关) │ │ (任务执行方) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
│ 1. 发现可用 Agent │ │
│ 2. 发送任务请求 │ │
│◀─────────────────────────▶│ │
│ 3. 接收流式状态更新 │ │
│ │ 4. 任务委派 │
│ │──────────────────────────▶│
│ │ 5. 流式返回结果 │
│ │◀──────────────────────────│
│ 6. 聚合结果返回 │ │
│◀──────────────────────────│ │
▼ ▼ ▼
Client Agent:任务的发起方,负责发现其他 Agent 并委派任务
Task Agent:任务的执行方,接收任务并返回结果
A2A Server:协议网关,处理 Agent 发现、路由、状态管理等
A2A 协议定义了五种核心消息类型:
// 1. TaskRequest - 任务请求
interface TaskRequest {
id: string;
type: "task/request";
payload: {
description: string; // 任务描述
context?: Record<string, any>; // 上下文数据
requirements?: { // 执行要求
capabilities: string[];
timeout?: number;
};
};
}
// 2. TaskStatus - 任务状态更新
interface TaskStatus {
id: string;
type: "task/status";
payload: {
status: "working" | "completed" | "failed" | "cancelled";
progress?: number; // 0-100 进度百分比
message?: string; // 状态消息
intermediateResults?: any; // 中间结果
};
}
// 3. TaskResult - 任务结果
interface TaskResult {
id: string;
type: "task/result";
payload: {
success: boolean;
data?: any;
error?: string;
metadata?: {
executionTime: number;
tokensUsed?: number;
};
};
}
// 4. AgentCard - Agent 能力描述
interface AgentCard {
name: string;
description: string;
capabilities: string[];
inputTypes: string[];
outputTypes: string[];
authentication?: AuthScheme;
endpoints: {
task: string;
status: string;
cancel: string;
};
}
// 5. DiscoveryRequest - Agent 发现请求
interface DiscoveryRequest {
type: "discovery/request";
payload: {
capabilities?: string[]; // 所需能力
location?: string; // 可选的位置约束
};
}A2A 协议的标准通信流程如下:
┌──────────────┐ ┌──────────────┐
│ Client Agent │ │ Task Agent │
└──────┬───────┘ └──────┬───────┘
│ │
│ ① GET /.well-known/agent-card │
│────────────────────────────────────────▶│
│ │
│ ② AgentCard (JSON) │
│◀────────────────────────────────────────│
│ │
│ ③ POST /task (TaskRequest) │
│────────────────────────────────────────▶│
│ │
│ ④ SSE Stream (TaskStatus × N) │
│◀────────────────────────────────────────│
│ - status: "working", progress: 20% │
│ - status: "working", progress: 50% │
│ - status: "working", progress: 80% │
│ │
│ ⑤ TaskResult (final) │
│◀────────────────────────────────────────│
│ │
A2A 协议支持多种传输层,最常用的是 HTTP + SSE:
# A2A Server 实现示例 (FastAPI + SSE)
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import asyncio
import json
app = FastAPI()
class TaskRequest(BaseModel):
id: str
type: str
payload: dict
class TaskAgent:
def __init__(self, name: str, capabilities: list[str]):
self.name = name
self.capabilities = capabilities
async def execute(self, task_id: str, description: str, context: dict):
"""执行任务并流式返回状态"""
# 阶段 1: 初始化
yield {
"id": task_id,
"type": "task/status",
"payload": {
"status": "working",
"progress": 10,
"message": "正在分析任务..."
}
}
# 阶段 2: 执行核心逻辑
await asyncio.sleep(1) # 模拟处理
yield {
"id": task_id,
"type": "task/status",
"payload": {
"status": "working",
"progress": 50,
"message": "正在处理数据...",
"intermediateResults": {"partial": "some data"}
}
}
# 阶段 3: 完成
await asyncio.sleep(1)
yield {
"id": task_id,
"type": "task/result",
"payload": {
"success": True,
"data": {"result": "final output"},
"metadata": {
"executionTime": 2.3,
"tokensUsed": 1500
}
}
}
# Agent 能力描述端点
@app.get("/.well-known/agent-card")
async def get_agent_card():
return {
"name": "DataAnalysisAgent",
"description": "专业数据分析 Agent,支持 CSV/JSON 数据处理和可视化",
"capabilities": ["data_analysis", "chart_generation", "statistical_testing"],
"inputTypes": ["application/json", "text/csv"],
"outputTypes": ["application/json", "image/png"],
"endpoints": {
"task": "/task",
"status": "/task/{task_id}/status",
"cancel": "/task/{task_id}/cancel"
}
}
# 任务执行端点 (SSE 流式)
@app.post("/task")
async def create_task(request: TaskRequest):
agent = TaskAgent("DataAnalysisAgent", ["data_analysis"])
async def event_generator():
async for event in agent.execute(
request.id,
request.payload["description"],
request.payload.get("context", {})
):
yield f"data: {json.dumps(event)}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)A2A 协议支持两种发现模式:
模式一:直接发现 (Direct Discovery)
# 已知 Agent 地址时,直接获取能力描述
curl https://agent.example.com/.well-known/agent-card模式二:注册表发现 (Registry Discovery)
# 通过中央注册表查找符合条件的 Agent
curl -X POST https://a2a-registry.org/discover \
-H "Content-Type: application/json" \
-d '{
"capabilities": ["data_analysis", "chart_generation"],
"location": "us-west"
}'A2A 协议支持多种认证方案:
interface AuthScheme {
type: "bearer" | "api_key" | "oauth2" | "mtls";
// Bearer Token
bearer?: {
tokenUrl: string;
scopes: string[];
};
// API Key
apiKey?: {
headerName: string;
queryParamName?: string;
};
// OAuth2
oauth2?: {
authorizationUrl: string;
tokenUrl: string;
flows: string[];
};
}生产环境推荐配置:
- 内部网络:mTLS (双向 TLS 认证)
- 公有 API:OAuth2 + Bearer Token
- 简单场景:API Key (通过 Header 传递)
| 维度 | A2A Protocol | MCP Protocol |
|---|---|---|
| 核心目标 | Agent 间通信 | 工具调用标准化 |
| 通信对象 | Agent ↔ Agent | Client ↔ Server (Tools) |
| 语义层次 | 任务委派、状态同步 | 工具列表、参数调用 |
| 典型场景 | 多 Agent 协作 | 单 Agent 扩展能力 |
A2A 架构:
┌─────────────┐ ┌─────────────┐
│ Agent A │ ◀─────▶ │ Agent B │
│ (任务发起) │ A2A │ (任务执行) │
└─────────────┘ Protocol└─────────────┘
MCP 架构:
┌─────────────┐ ┌─────────────┐
│ Client │ ◀─────▶ │ Server │
│ (Agent) │ MCP │ (Tools) │
└─────────────┘ Protocol└─────────────┘
在实际生产系统中,A2A 和 MCP 经常协同工作:
┌─────────────────────────────────────────────────────────────┐
│ Multi-Agent System │
│ │
│ ┌──────────────┐ A2A ┌──────────────┐ │
│ │ Coordinator │ ◀────────▶ │ Specialist │ │
│ │ Agent │ │ Agent │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ MCP │ MCP │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ MCP Server │ │ MCP Server │ │
│ │ (Tools) │ │ (Tools) │ │
│ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
** Coordinator Agent**:通过 A2A 协议发现并委派任务给 Specialist Agent
Specialist Agent:通过 MCP 协议调用专业工具完成任务
这种架构在 OpenClaw 的生产环境中已得到验证,支持:
- 动态 Agent 发现与任务委派
- 工具能力的按需扩展
- 跨框架 Agent 协作
假设我们需要构建一个智能投资分析系统,包含三个 Agent:
- DataFetcher Agent (LangGraph):负责获取市场数据
- Analysis Agent (CrewAI):负责数据分析和洞察生成
- Report Agent (自定义框架):负责报告生成和格式化
# docker-compose.yml
version: '3.8'
services:
data-fetcher:
build: ./agents/data-fetcher
ports:
- "8001:8000"
environment:
- AGENT_NAME=DataFetcherAgent
- CAPABILITIES=["market_data", "financial_reports"]
analysis-agent:
build: ./agents/analysis
ports:
- "8002:8000"
environment:
- AGENT_NAME=AnalysisAgent
- CAPABILITIES=["trend_analysis", "risk_assessment"]
report-agent:
build: ./agents/report
ports:
- "8003:8000"
environment:
- AGENT_NAME=ReportAgent
- CAPABILITIES=["report_generation", "visualization"]
coordinator:
build: ./coordinator
depends_on:
- data-fetcher
- analysis-agent
- report-agent# coordinator/main.py
import httpx
import asyncio
from typing import List, Dict, Any
class A2ACoordinator:
def __init__(self):
self.agents: Dict[str, Dict] = {}
async def discover_agents(self, registry_url: str):
"""发现可用的 Agent"""
agent_urls = [
"http://data-fetcher:8000",
"http://analysis-agent:8000",
"http://report-agent:8000"
]
for url in agent_urls:
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{url}/.well-known/agent-card",
timeout=5.0
)
card = response.json()
card["base_url"] = url
self.agents[card["name"]] = card
except Exception as e:
print(f"Failed to discover {url}: {e}")
async def execute_workflow(self, ticker: str) -> Dict[str, Any]:
"""执行投资分析工作流"""
results = {}
# 步骤 1: 获取数据
data_result = await self.delegate_task(
agent_name="DataFetcherAgent",
description=f"获取 {ticker} 的最新市场数据和财务报告",
context={"ticker": ticker, "timeframe": "1Y"}
)
results["market_data"] = data_result
# 步骤 2: 分析数据
analysis_result = await self.delegate_task(
agent_name="AnalysisAgent",
description="分析市场数据,生成投资洞察和风险评估",
context={
"market_data": data_result,
"analysis_types": ["trend", "risk", "valuation"]
}
)
results["analysis"] = analysis_result
# 步骤 3: 生成报告
report_result = await self.delegate_task(
agent_name="ReportAgent",
description="生成投资分析报告,包含图表和关键指标",
context={
"ticker": ticker,
"analysis": analysis_result,
"format": "markdown"
}
)
results["report"] = report_result
return results
async def delegate_task(
self,
agent_name: str,
description: str,
context: Dict[str, Any]
) -> Any:
"""委派任务给指定 Agent"""
agent = self.agents.get(agent_name)
if not agent:
raise ValueError(f"Agent {agent_name} not found")
task_id = f"task_{asyncio.get_event_loop().time()}"
task_request = {
"id": task_id,
"type": "task/request",
"payload": {
"description": description,
"context": context,
"requirements": {
"capabilities": agent["capabilities"],
"timeout": 300
}
}
}
async with httpx.AsyncClient(timeout=300.0) as client:
# 发起任务
async with client.stream(
"POST",
f"{agent['base_url']}/task",
json=task_request
) as response:
final_result = None
# 处理 SSE 流
async for line in response.aiter_lines():
if line.startswith("data: "):
event = json.loads(line[6:])
if event["type"] == "task/status":
status = event["payload"]["status"]
progress = event["payload"].get("progress", 0)
message = event["payload"].get("message", "")
print(f"[{agent_name}] {status} ({progress}%): {message}")
elif event["type"] == "task/result":
final_result = event["payload"]
break
if not final_result or not final_result.get("success"):
raise RuntimeError(
f"Task failed: {final_result.get('error', 'Unknown error')}"
)
return final_result.get("data")
# 使用示例
async def main():
coordinator = A2ACoordinator()
await coordinator.discover_agents("http://registry:8000")
result = await coordinator.execute_workflow("AAPL")
print(f"Report generated: {result['report']['url']}")
if __name__ == "__main__":
asyncio.run(main())# agents/analysis/main.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
AGENT_CARD = {
"name": "AnalysisAgent",
"description": "专业投资分析 Agent,支持趋势分析、风险评估和估值建模",
"capabilities": ["trend_analysis", "risk_assessment", "valuation_modeling"],
"inputTypes": ["application/json"],
"outputTypes": ["application/json"],
"endpoints": {
"task": "/task",
"status": "/task/{task_id}/status",
"cancel": "/task/{task_id}/cancel"
}
}
@app.get("/.well-known/agent-card")
async def get_agent_card():
return AGENT_CARD
@app.post("/task")
async def create_task(request: dict):
"""接收任务并流式执行"""
task_id = request["id"]
description = request["payload"]["description"]
context = request["payload"].get("context", {})
async def execute_analysis():
# 阶段 1: 数据验证
yield json.dumps({
"id": task_id,
"type": "task/status",
"payload": {
"status": "working",
"progress": 10,
"message": "验证输入数据..."
}
}) + "\n\n"
await asyncio.sleep(0.5)
# 阶段 2: 趋势分析
yield json.dumps({
"id": task_id,
"type": "task/status",
"payload": {
"status": "working",
"progress": 35,
"message": "执行趋势分析...",
"intermediateResults": {
"trend": "bullish",
"confidence": 0.78
}
}
}) + "\n\n"
await asyncio.sleep(1.0)
# 阶段 3: 风险评估
yield json.dumps({
"id": task_id,
"type": "task/status",
"payload": {
"status": "working",
"progress": 65,
"message": "执行风险评估...",
"intermediateResults": {
"risk_level": "moderate",
"volatility": 0.23
}
}
}) + "\n\n"
await asyncio.sleep(1.0)
# 阶段 4: 生成最终结果
yield json.dumps({
"id": task_id,
"type": "task/result",
"payload": {
"success": True,
"data": {
"trend": {
"direction": "bullish",
"strength": 0.78,
"timeframe": "3M"
},
"risk": {
"level": "moderate",
"volatility": 0.23,
"max_drawdown": 0.15
},
"valuation": {
"pe_ratio": 28.5,
"peg_ratio": 1.8,
"dcf_value": 195.50
},
"recommendation": "HOLD",
"confidence": 0.75
},
"metadata": {
"executionTime": 2.5,
"tokensUsed": 2200,
"model": "claude-sonnet-4-20250514"
}
}
}) + "\n\n"
return StreamingResponse(
execute_analysis(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)在生产环境中,我们实施了以下优化:
1. 连接池管理
# 使用 httpx 连接池复用
client = httpx.AsyncClient(
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
timeout=300.0
)2. 超时与重试
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
async def delegate_task_with_retry(self, agent_name: str, task: dict):
return await self.delegate_task(agent_name, task)3. 结果缓存
from functools import lru_cache
import hashlib
def cache_key(description: str, context: dict) -> str:
content = f"{description}:{json.dumps(context, sort_keys=True)}"
return hashlib.md5(content.encode()).hexdigest()
@lru_cache(maxsize=1000)
def get_cached_result(key: str) -> Optional[dict]:
# 从 Redis 或其他存储获取缓存
pass-
A2A 协议是 Agent 互操作性的关键基础设施,类似于 HTTP 对于 Web 的意义
-
与 MCP 协议形成互补:A2A 处理 Agent 间通信,MCP 处理工具调用
-
生产级实现需要考虑:
- 流式状态更新 (SSE)
- 超时与重试机制
- 认证与授权
- 结果缓存与优化
-
生态系统正在快速成熟:
- Linux Foundation 接管治理
- 主流框架陆续支持
- 企业级应用案例增多
短期 (2026 H2):
- 更多框架原生支持 A2A 协议
- 出现专业的 A2A 网关和注册表服务
- 安全标准进一步完善
中期 (2027):
- Agent 应用商店出现,支持 A2A 的 Agent 可被直接发现和购买
- 跨组织 Agent 协作成为常态
- 出现基于 A2A 的 Agent 编排平台
长期 (2028+):
- A2A 成为 AI Agent 通信的事实标准
- 人类与 Agent、Agent 与 Agent 的边界进一步模糊
- 真正的"Agent 互联网"形成
对于正在构建多 Agent 系统的团队:
-
立即采用:新项目直接使用 A2A 协议作为 Agent 通信标准
-
渐进迁移:现有系统通过 A2A 网关逐步迁移
-
双协议栈:同时支持 A2A (Agent 通信) 和 MCP (工具调用)
-
关注生态:跟踪 Linux Foundation A2A 工作组的最新进展
- Google Developers Blog. "Announcing the Agent2Agent Protocol (A2A)". April 2025.
- A2A Protocol Documentation. https://a2a-protocol.org
- GitHub - a2aproject/A2A. https://github.com/a2aproject/A2A
- IBM. "What Is Agent2Agent (A2A) Protocol?". November 2025.
- Instaclustr. "Agentic AI Frameworks: Top 8 Options in 2026". January 2026.
- Microsoft Agent Framework Release Notes. February 2026.
作者: OpenClaw Research Team
发布日期: 2026-03-30
许可: CC BY 4.0