任何多 Agent 系统,都会遇到同样的两个问题。
① Agent 之间如何找到彼此? 按 capability 发现,而不是硬编码地址。
② Agent 之间如何可靠地通信? Agent A 发消息时,Agent B 可能离线、正忙、或者还不存在。消息不能丢。
mq9 专门解决这两个问题,所以开发者可以专注于 Agent 逻辑,而不是基础设施。
AgentCard · 语义向量检索 · 全文搜索
# Register with capability description
nats request '$mq9.AI.AGENT.REGISTER' '{
"name": "agent.translator",
"mailbox": "agent.translator",
"payload": "Multilingual translation; EN/ZH/JA/KO"
}'
# Discover by semantic intent
nats request '$mq9.AI.AGENT.DISCOVER' '{
"semantic": "translate Chinese to English",
"limit": 5
}'
# Keyword search
nats request '$mq9.AI.AGENT.DISCOVER' '{
"text": "translator", "limit": 10
}'持久化邮箱 · Pull+ACK · 离线投递
# Create a mailbox (Agent's persistent address)
nats request '$mq9.AI.MAILBOX.CREATE' '{"name":"agent.inbox","ttl":3600}'
# Send — message persists until fetched
nats request '$mq9.AI.MSG.SEND.agent.inbox' --header 'mq9-priority:critical' '{"task":"analyze","id":"t-001"}'
# FETCH when ready (broker tracks offset)
nats request '$mq9.AI.MSG.FETCH.agent.inbox' '{"group_name":"workers","deliver":"earliest"}'
# ACK to advance offset
nats request '$mq9.AI.MSG.ACK.agent.inbox' '{"group_name":"workers","msg_id":1}'critical → urgent → normal · 去重 · 延迟投递
# Priority: critical messages fetched first
--header 'mq9-priority:critical'
--header 'mq9-priority:urgent'
# (no header = normal)
# Key dedup — only latest per key kept
--header 'mq9-key:task.status'
# Delay visibility by N seconds
--header 'mq9-delay:30'
# Per-message TTL (independent of mailbox TTL)
--header 'mq9-ttl:300'
# Tags — filterable via QUERY
--header 'mq9-tags:billing,critical'子 Agent 完成任务将结果写入主 Agent 邮箱,主 Agent 随时 FETCH 取结果,不需要阻塞等待。
多个 Worker 共享同一 group_name,broker 保证每条任务只被一个 Worker 拿到,Worker 随时加入或退出。
Agent 通过 REGISTER 注册能力描述,其他 Agent 用自然语言语义或关键词 DISCOVER 合适的 Agent,找到后直接 SEND 任务。
云端向边缘 Agent 邮箱发指令,边缘断网期间消息持久化等待,重连后 FETCH 按优先级顺序拿到所有待处理指令。
Agent 向审批邮箱发决策请求,人类通过同样的 FETCH 取到请求,处理后 SEND 结果回 Agent 私有邮箱。人和 Agent 使用相同协议。
Agent A 创建私有回复邮箱,发请求时带上 reply_to。Agent B 处理后将结果 SEND 到回复邮箱,A 随时 FETCH 取结果,不阻塞。
Worker 启动时 REGISTER,定期 REPORT 上报状态,主 Agent 通过 DISCOVER 列出所有在线 Worker,Worker 关闭时 UNREGISTER。
检测方向共享邮箱发 critical 优先级告警,处理器 FETCH 拉取,即使临时离线也不会丢失告警。
mq9 基于 NATS 协议。任何语言的 NATS 客户端直接就是 mq9 的客户端,零依赖,零学习成本。
官方 SDK,五种语言统一 API,类型安全,异步优先,内置 Priority 枚举、consume 循环和 Agent 注册方法。
pip install mq9npm install mq9go get github.com/robustmq/mq9/gocargo add mq9官方 LangChain 工具包,8 个工具覆盖全部 mq9 操作,直接接入 LangChain Agent 和 LangGraph 工作流。原生 A2A 协议支持通过 mq9.a2a 接入。
pip install langchain-mq9pip install mq9[a2a]所有操作通过 NATS request/reply 完成,主题前缀 $mq9.AI.*。响应包含 error 字段,空字符串表示成功。
$mq9.AI.AGENT.REGISTER注册 Agent 及 AgentCard 能力描述$mq9.AI.AGENT.DISCOVER全文检索 + 语义向量检索 Agent$mq9.AI.AGENT.REPORTAgent 状态上报 / 心跳$mq9.AI.AGENT.UNREGISTER关闭时注销 Agent$mq9.AI.MAILBOX.CREATE创建持久化邮箱,声明 TTL$mq9.AI.MSG.SEND.{addr}发送消息,优先级通过 mq9-priority header 指定$mq9.AI.MSG.FETCH.{addr}Pull 拉取消息,支持有状态/无状态消费$mq9.AI.MSG.ACK.{addr}ACK 推进消费位点,支持断点续拉$mq9.AI.MSG.QUERY.{addr}查询消息,不影响消费位点$mq9.AI.MSG.DELETE.{addr}.{id}删除指定消息mq9-priority: critical|urgent消息优先级,normal 为默认不填mq9-key: {key}同 key 只保留最新一条(去重压实)mq9-delay: {seconds}延迟投递,指定秒数后消息才可见mq9-ttl: {seconds}消息级 TTL,独立于邮箱 TTLmq9-tags: tag1,tag2标签,可通过 QUERY 过滤连接公共演示服务器——无需本地部署。
export NATS_URL=nats://demo.robustmq.com:4222
# Register your Agent
nats request '$mq9.AI.AGENT.REGISTER' \
'{"name":"my.agent","mailbox":"my.inbox","payload":"My Agent capabilities"}'
# Create a mailbox
nats request '$mq9.AI.MAILBOX.CREATE' '{"name":"my.inbox","ttl":3600}'
# Send a message
nats request '$mq9.AI.MSG.SEND.my.inbox' \
--header 'mq9-priority:urgent' \
'{"task":"summarize dataset A"}'
# FETCH (resumes from last ACK)
nats request '$mq9.AI.MSG.FETCH.my.inbox' \
'{"group_name":"worker","deliver":"earliest"}'