A2A Protocol
目录
DeepSeek
A2A(Agent-to-Agent)协议是一种专为智能体(Agent)之间直接通信与协作而设计的标准化开放协议。它通过定义统一的交互模型、能力发现与任务管理机制,使不同框架或供应商开发的智能体能够相互发现、协商并协同完成复杂任务。该协议打破了智能体间的“信息孤岛”,为构建跨平台、可互操作的分布式智能体网络提供了基础,在企业自动化、多智能体系统及 AI 服务集成等领域具有关键作用。
为什么使用 A2A 协议?

A2A 如何与 MCP 协同工作?

场景示例
用户向计算器智能体提交任务,计算器智能体通过注册中心发现已注册的加法智能体、减法智能体,获取其能力信息,经推理匹配后调用对应智能体执行计算,并将结果返回给用户。
- 1 个 Main:主调用程序
- 1 个 Registry:提供 Agent 地址
- 1 个 Calculator Agent:拆解任务 + 决策调用哪个 Agent
- 2 个 实际计算 Agent:一个执行加法任务,一个执行减法任务

环境准备
pip install flask创建 3 个 Agent
agent_calculator.py 文件,这是 A2A 的关键:动态发现 + 模拟 LLM 决策 + 链式调用
# calculator.py
from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
REGISTRY_URI = "http://localhost:5000"
def find_agent(op):
agents = requests.get(f"{REGISTRY_URI}/discover").json()
for agent in agents:
if agent['capability'] == op:
print(f"[Calculator] Found agent {agent['name']} for operation '{op}'")
return agent['uri']
return None
@app.route("/calculate", methods=["POST"])
def calculate():
task = request.json
op = task['operation']
agent_uri = find_agent(op)
if not agent_uri:
return jsonify({"error": "No agent found"}), 404
artifact = requests.post(f"{agent_uri}/task", json=task).json()
print(f"[Calculator] Received artifact: {artifact}")
return jsonify(artifact)
if __name__ == "__main__":
print("[Calculator] Running on http://localhost:5003")
app.run(port=5003)agent_add.py 文件,执行加法:
# agent_add.py
from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
# Agent Card
agent_card = {
"name": "agent_add",
"capability": "add",
"uri": "http://localhost:5001"
}
REGISTRY_URI = "http://localhost:5000"
# 注册到 Registry
requests.post(f"{REGISTRY_URI}/register", json=agent_card)
@app.route("/task", methods=["POST"])
def handle_task():
task = request.json
print(f"[Add Agent] Received task: {task}")
a, b = task['numbers']
artifact = {"result": a + b} # Artifact
print(f"[Add Agent] Returning artifact: {artifact}")
return jsonify(artifact)
if __name__ == "__main__":
print("[Add Agent] Running on http://localhost:5001")
app.run(port=5001)agent_sub.py 文件,执行减法:
# agent_sub.py
from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
# Agent Card
agent_card = {
"name": "agent_sub",
"capability": "sub",
"uri": "http://localhost:5002"
}
REGISTRY_URI = "http://localhost:5000"
# 注册到 Registry
requests.post(f"{REGISTRY_URI}/register", json=agent_card)
@app.route("/task", methods=["POST"])
def handle_task():
task = request.json
print(f"[Sub Agent] Received task: {task}")
a, b = task['numbers']
artifact = {"result": a - b} # Artifact
print(f"[Sub Agent] Returning artifact: {artifact}")
return jsonify(artifact)
if __name__ == "__main__":
print("[Sub Agent] Running on http://localhost:5002")
app.run(port=5002)注册中心
# registry.py
from flask import Flask, request, jsonify
app = Flask(__name__)
agents = {} # 存储 Agent Card
@app.route("/register", methods=["POST"])
def register_agent():
agent_card = request.json
agents[agent_card['name']] = agent_card
print(f"[Registry] Registered agent: {agent_card['name']}")
return jsonify({"status": "ok"})
@app.route("/discover", methods=["GET"])
def discover_agents():
print(f"[Registry] Agents discovered: {list(agents.keys())}")
return jsonify(list(agents.values()))
if __name__ == "__main__":
print("[Registry] Starting Registry on http://localhost:5000")
app.run(port=5000)主调用程序
# main.py
import requests
CALCULATOR_URI = "http://localhost:5003"
# 示例任务
tasks = [
{"operation": "add", "numbers": [5, 3]},
{"operation": "sub", "numbers": [5, 3]}
]
for task in tasks:
print(f"[Main] Sending task: {task}")
artifact = requests.post(f"{CALCULATOR_URI}/calculate", json=task).json()
print(f"[Main] Received artifact: {artifact}\n")运行和测试
示例目录结构:
├── main.py
├── registry.py
├── agent_calculator.py
├── agent_add.py
├── agent_sub.py开 5 个终端,分别运行:
python registry.py
python agent_add.py
python agent_sub.py
python agent_calculator.py
python main.py