给工程师
你在构建多 Agent 系统。这是你的集成指南。
快速开始 — 公共演示服务器
无需本地部署,连接 RobustMQ 演示服务器:
export NATS_URL=nats://demo.robustmq.com:4222这是共享环境——请勿发送敏感数据。
注册一个 Agent
nats request '$mq9.AI.AGENT.REGISTER' '{
"name": "agent.summarizer",
"mailbox": "agent.summarizer",
"payload": "Text summarization; supports EN/ZH; max 10k tokens"
}'
# → {"error":""}发现已注册的 Agent
nats request '$mq9.AI.AGENT.DISCOVER' '{
"semantic": "summarize long documents",
"limit": 5
}'
# → [{"name":"agent.summarizer","mailbox":"agent.summarizer","payload":"..."}]向发现的 Agent 发消息
# 向刚发现的 agent.summarizer 发任务
nats request '$mq9.AI.MSG.SEND.agent.summarizer' \
--header 'mq9-priority:urgent' \
'{"task":"summarize","doc_id":"d-001","reply_to":"orchestrator.inbox"}'
# → {"error":"","msg_id":1}FETCH 和 ACK
# 按优先级顺序返回(critical → urgent → normal)
nats request '$mq9.AI.MSG.FETCH.agent.summarizer' '{
"group_name": "worker",
"deliver": "earliest",
"config": {"num_msgs": 10}
}'
# 推进位点到最后处理的 msg_id
nats request '$mq9.AI.MSG.ACK.agent.summarizer' '{
"group_name": "worker",
"mail_address": "agent.summarizer",
"msg_id": 3
}'安装 SDK
mq9 提供官方 SDK,用类型化 API 封装 NATS 协议调用:
pip install mq9 # Python
npm install mq9 # JavaScript / TypeScript
go get github.com/robustmq/mq9/go # Go
cargo add mq9 # Rust<!-- Java (Maven) -->
<dependency>
<groupId>io.mq9</groupId>
<artifactId>mq9</artifactId>
<version>0.1.0</version>
</dependency>核心操作顺序
推荐的集成顺序:agent_register → agent_discover → mailbox_create → send → fetch → ack → consume 循环
agent_register — 注册 Agent
# Python
from mq9 import Mq9Client
client = await Mq9Client.connect("nats://localhost:4222")
await client.agent_register({
"name": "agent.translator",
"mailbox": "agent.translator",
"payload": "Multilingual translation; EN/ZH/JA/KO",
})// Go
client, _ := mq9.Connect("nats://localhost:4222")
client.AgentRegister(ctx, mq9.AgentCard{
Name: "agent.translator",
Mailbox: "agent.translator",
Payload: "Multilingual translation; EN/ZH/JA/KO",
})agent_discover — 发现其他 Agent
# Python — 语义搜索
agents = await client.agent_discover(semantic="translate Chinese to English", limit=5)
for agent in agents:
print(agent.name, agent.mailbox)
# Python — 关键词搜索
agents = await client.agent_discover(text="translator", limit=10)// Go
agents, _ := client.AgentDiscover(ctx, mq9.DiscoverOptions{
Semantic: "translate Chinese to English",
Limit: 5,
})mailbox_create — 创建邮箱
# Python
address = await client.mailbox_create(name="agent.inbox", ttl=3600)// Go
address, _ := client.MailboxCreate(ctx, "agent.inbox", 3600)// TypeScript
const client = new Mq9Client("nats://localhost:4222");
await client.connect();
const address = await client.mailboxCreate({ name: "agent.inbox", ttl: 3600 });name = ""(Python:None, Go:"") — broker 自动生成地址ttl = 0— 邮箱永不过期
send — 发送消息
# Python — 带优先级和选项
msg_id = await client.send(
"agent.inbox",
b'{"task":"analyze","data":"..."}',
priority=Priority.URGENT,
key="state", # 去重——同 key 只保留最新
delay=60, # 60 秒后投递
ttl=300, # 消息 300 秒后过期
tags=["billing"],
)// Go
msgId, _ := client.Send(ctx, "agent.inbox", []byte(`{"task":"analyze"}`), mq9.SendOptions{
Priority: mq9.PriorityUrgent,
Key: "state",
Delay: 60,
})fetch + ack — 拉取消息
# Python — 有状态消费
messages = await client.fetch("agent.inbox", group_name="workers", deliver="earliest")
for msg in messages:
process(msg)
await client.ack("agent.inbox", "workers", msg.msg_id)// Go
messages, _ := client.Fetch(ctx, "agent.inbox", mq9.FetchOptions{
GroupName: "workers",
Deliver: "earliest",
})
for _, msg := range messages {
process(msg)
client.Ack(ctx, "agent.inbox", "workers", msg.MsgID)
}ACK 批次中最后一条 msg_id——一次调用确认整批,下次 FETCH 从该位点续拉。
无状态拉取 — 不传 group_name,每次独立,不记录位点,适合一次性读取和检查。
consume — 持续消费循环
使用 consume() 自动轮询处理:
# Python
consumer = await client.consume(
"agent.inbox",
handler=async_handler,
group_name="workers",
auto_ack=True,
error_handler=lambda msg, err: print(f"msg {msg.msg_id} failed: {err}"),
)
# ... 做其他工作 ...
await consumer.stop()
print(f"processed: {consumer.processed_count}")// TypeScript
const consumer = await client.consume("task.inbox", async (msg) => {
const data = JSON.parse(new TextDecoder().decode(msg.payload));
console.log(data);
}, {
groupName: "workers",
autoAck: true,
errorHandler: async (msg, err) => console.error(`msg ${msg.msgId} failed:`, err),
});
await consumer.stop();- handler 抛出异常 → 消息不 ACK,调用
errorHandler,循环继续 consumer.stop()等待当前批次处理完毕后干净退出
常见模式
发现并路由到合适的 Agent
Orchestrator 不硬编码下游 Agent 地址——通过语义搜索动态发现,拿到 mailbox 后直发。
# Orchestrator:发现能处理翻译任务的 Agent
agents = await client.agent_discover(semantic="translate English to French", limit=1)
if agents:
target = agents[0].mailbox
await client.send(target, json.dumps({
"task": "translate",
"text": "Hello world",
"reply_to": my_inbox,
}).encode(), priority=Priority.NORMAL)子 Agent 结果返回
父 Agent 创建私有回复邮箱并在 spawn 时传给子 Agent。子 Agent 写入结果,父 Agent 随时 FETCH——无需阻塞等待,无共享状态。
# 父 Agent:创建私有回复邮箱
reply_address = await client.mailbox_create(ttl=3600)
# 父 Agent:发送任务并带上 reply_to
await client.send("task.dispatch", json.dumps({
"task": "summarize /data/corpus",
"reply_to": reply_address,
}).encode())
# 父 Agent:随时 FETCH 结果(非阻塞)
messages = await client.fetch(reply_address, group_name="orchestrator", deliver="earliest")多 Worker 竞争消费任务队列
多个 Worker 共享同一个 group_name。每条任务只被一个 Worker 拿到——无需协调,无重复消费。Worker 随时加入或退出,无需重新配置。
# 生产者:发送带优先级的任务
await client.send("task.queue",
b'{"task":"reindex","id":"t-101"}',
priority=Priority.CRITICAL,
)
# Worker A 和 Worker B — 相同的 group_name
messages = await client.fetch("task.queue", group_name="workers", num_msgs=1)
for msg in messages:
await process(msg)
await client.ack("task.queue", "workers", msg.msg_id)异步 Request-Reply
Agent A 向 Agent B 发问题,继续做其他工作。Agent B 按自己的节奏处理,将结果 SEND 到 A 的私有回复邮箱。
# Agent A:创建私有回复邮箱
nats request '$mq9.AI.MAILBOX.CREATE' '{"ttl":600}'
# → {"mail_address":"reply.a1b2c3"}
# Agent A:向 Agent B 发请求并带上 reply_to
nats request '$mq9.AI.MSG.SEND.agent.b' '{
"request":"translate","text":"Hello world","lang":"fr","reply_to":"reply.a1b2c3"
}'
# Agent B:拉取任务并回复
nats request '$mq9.AI.MSG.FETCH.agent.b' '{"group_name":"b-worker","deliver":"earliest"}'
nats request '$mq9.AI.MSG.SEND.reply.a1b2c3' '{"result":"Bonjour le monde"}'
nats request '$mq9.AI.MSG.ACK.agent.b' '{"group_name":"b-worker","mail_address":"agent.b","msg_id":1}'
# Agent A:随时 FETCH 结果
nats request '$mq9.AI.MSG.FETCH.reply.a1b2c3' '{"deliver":"earliest"}'云端到边缘指令下发
云端向边缘 Agent 邮箱发指令,边缘断网期间消息持久化等待。重连后按优先级顺序 FETCH——紧急重配置先于常规任务。
// 云端:发布指令(边缘可能离线)
client.Send(ctx, "edge.agent", []byte(`{"cmd":"reconfigure","params":{"rate":100}}`),
mq9.SendOptions{Priority: mq9.PriorityCritical})
client.Send(ctx, "edge.agent", []byte(`{"cmd":"run_diagnostic"}`), mq9.SendOptions{})
// 边缘:重连后按优先级拉取所有待处理指令
messages, _ := client.Fetch(ctx, "edge.agent", mq9.FetchOptions{
GroupName: "edge-agent",
Deliver: "earliest",
NumMsgs: 10,
})人机混合审批工作流
人类客户端使用与 Agent 完全相同的协议——无需 webhook,无需路由中间件。
// Agent:向人类邮箱发送审批请求
await client.send(humanMailAddress, JSON.stringify({
type: "approval_request",
action: "delete_dataset",
target: "ds-prod-2024",
reply_to: agentMailAddress,
}), { priority: Priority.URGENT });
// 人类客户端——同一个 SDK
const consumer = await client.consume(humanMailAddress, async (req) => {
const data = JSON.parse(new TextDecoder().decode(req.payload));
const approved = await showApprovalUI(data);
await client.send(data.reply_to, JSON.stringify({ approved, reviewer: "alice" }));
});广播通知
命名公共邮箱,多个消费者各自持有不同 group_name,每个消费者独立消费所有消息。
# 发送方:向公共邮箱广播
await client.send("system.events", b'{"event":"deploy","version":"1.2.0"}')
# 消费者 A(监控)
msgs_a = await client.fetch("system.events", group_name="monitor", deliver="earliest")
# 消费者 B(日志)
msgs_b = await client.fetch("system.events", group_name="logger", deliver="earliest")定期心跳与状态上报
Agent 在 consume 循环中同时维持注册心跳。
import asyncio
async def heartbeat_loop(client, name):
while True:
await client.agent_report({"name": name, "report_info": "running"})
await asyncio.sleep(30)
# 并发运行消费循环和心跳
consumer = await client.consume("agent.inbox", handler, group_name="worker")
asyncio.create_task(heartbeat_loop(client, "agent.translator"))LangChain / LangGraph 集成
langchain-mq9 是官方工具包,将所有 mq9 操作封装为 LangChain 工具,开箱即用支持 LangChain 和 LangGraph。
pip install langchain-mq98 个工具:
| 工具 | 操作 |
|---|---|
agent_register | 注册 Agent 及能力描述 |
agent_discover | 按文本或语义搜索 Agent |
create_mailbox | 创建私有邮箱 |
send_message | 发送带优先级的消息 |
fetch_messages | 拉取消息(FETCH + ACK 模型) |
ack_messages | 推进消费组位点 |
query_messages | 只读检查邮箱 |
delete_message | 删除指定消息 |
LangChain:
from langchain_mq9 import Mq9Toolkit
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_openai import ChatOpenAI
toolkit = Mq9Toolkit(server="nats://localhost:4222")
tools = toolkit.get_tools()
llm = ChatOpenAI(model="gpt-4o")
agent = create_tool_calling_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools)
result = executor.invoke({"input": "发现所有已注册的翻译 Agent 并向第一个发送任务"})LangGraph:
from langgraph.prebuilt import create_react_agent
from langchain_mq9 import Mq9Toolkit
toolkit = Mq9Toolkit(server="nats://localhost:4222")
app = create_react_agent(llm, toolkit.get_tools())
result = await app.ainvoke({"messages": [("human", "发现所有已注册的 Agent")]})MCP Server
mq9 在 RobustMQ Admin Server 上暴露 Model Context Protocol (MCP) server。连接任意 MCP 兼容客户端(Claude Desktop、Cursor 等):
http://<admin-server>:<port>/mcp错误处理
所有协议响应包含 error 字段,空字符串表示成功。
| 错误信息 | 原因 |
|---|---|
mailbox xxx already exists | CREATE 时名称已存在 |
mailbox not found | 邮箱不存在或已过期 |
message not found | 指定 msg_id 不存在或已过期 |
invalid mail_address | 格式无效(含大写、连字符等) |
agent not found | UNREGISTER 或 REPORT 时 Agent 名称未知 |
SDK 异常:所有 SDK 对非空 error 响应抛出 / 返回 Mq9Error。
部署
开发环境(Docker)
docker run -d --name mq9 -p 4222:4222 -v mq9-data:/data robustmq/robustmq:latest挂载 -v mq9-data:/data 以在重启时保留邮箱、消息和 Agent 注册记录。
生产环境 — 单节点
docker run -d \
--name mq9 \
-p 4222:4222 \
-p 9090:9090 \
-v /data/mq9:/data \
--restart unless-stopped \
robustmq/robustmq:latest- 端口
4222— mq9/NATS 协议(Agent 连接) - 端口
9090— Prometheus 指标端点
单节点可处理数百万并发 Agent 连接,足以应对大多数生产负载。
集群模式
当单节点不够用时横向扩展。Agent 使用相同的 SDK——客户端代码无需修改。
模式参考
| 场景 | 关键特性 |
|---|---|
| 动态发现路由 | AGENT.REGISTER + AGENT.DISCOVER + SEND |
| 点对点通信 | 私有邮箱 + FETCH + ACK |
| 竞争消费 | 多 Worker 共享 group_name |
| 广播 | 命名公共邮箱,多消费者各自 group_name |
| Request-Reply | 私有回复邮箱 + reply_to |
| 离线投递 | 存储优先,重连后 FETCH |
| 云端到边缘 | 重连后按优先级顺序消费 |
| 人机混合 | 人和 Agent 使用相同协议 |
