Skip to content

协议设计

概述

$mq9.AI.* 是 mq9 为 Agent 异步通信设计的协议。核心解决的问题:Agent 之间的异步通信,发件方和收件方不需要同时在线。

mq9 只解决通信问题——消息怎么可靠送达。消息内容是 byte 数组,不解析、不校验、不限制。上层是 A2A、MCP 还是其他协议,mq9 不干涉。


mail_address 格式规范

字符集:小写字母(a-z)、数字(0-9)、点(.

长度:1 到 128 字符

大小写:严格小写,含大写字符的 mail_address 会被 broker 拒绝

位置规则. 只能出现在中间,开头和结尾必须是小写字母或数字;不允许连续的 .

语义mail_address 是不透明字符串,. 不参与协议路由或匹配,仅作视觉分组

编码:不允许 URL percent-encoding

合法示例非法示例
task.001task-001(含连字符)
agent.inboxtask_001(含下划线)
analytics.resultTask.001(含大写)
acme.org.task.queue.task.001(点开头)
session.20260502task.001.(点结尾)
task..001(连续点)

完整地址示例

text
task.001
agent.inbox
analytics.result
acme.task.queue
session.20260502
order.processing.urgent
agent.001.inbox

基础概念

mail_address:通过 MAILBOX.CREATE 创建邮箱时由用户自定义的通信地址。不绑定 Agent 身份,一个 Agent 可以为不同任务申请不同的 mail_address。用完不管,TTL 自动清理。

mail_address 不可猜测即安全边界。 知道 mail_address 就能发消息、能订阅。不知道 mail_address 就无从操作。没有 token,没有 ACL。

TTL:邮箱创建时声明,到期自动销毁,消息随之清理。重复 CREATE 同名邮箱会报错(mailbox xxx already exists),创建前可先 QUERY 检查是否存在。

priority:可选。不指定为 normal 默认优先级,指定 urgent 或 critical 可提升处理顺序。同优先级 FIFO,跨优先级高优先处理。存储层保证顺序,消费方无需自行排序。

msg_id:每条消息的唯一标识(消息在存储中的 offset),客户端用于去重和删除操作。

消费语义:FETCH 采用 pull 模式,支持有状态消费(传 group_name,broker 记录位点,重连后可续拉)和无状态消费(不传 group_name,每次按 deliver 策略独立消费,不记录位点)。

消息流程:消息到达 → 写存储 → 客户端主动 FETCH 拉取 → ACK 确认 → broker 推进该 group 消费位点。


协议总览

分类Subject说明
Mailbox 管理$mq9.AI.MAILBOX.CREATE创建 mailbox
消息通信$mq9.AI.MSG.SEND.{mail_address}发送消息(优先级通过 mq9-priority header 指定)
消息通信$mq9.AI.MSG.FETCH.{mail_address}订阅 mailbox 消息
消息通信$mq9.AI.MSG.ACK.{mail_address}消息 ACK
消息通信$mq9.AI.MSG.QUERY.{mail_address}查询 mailbox 内消息
消息通信$mq9.AI.MSG.DELETE.{mail_address}.{msg_id}删除指定消息
Agent 管理$mq9.AI.AGENT.REGISTERAgent 注册
Agent 管理$mq9.AI.AGENT.UNREGISTERAgent 注销
Agent 管理$mq9.AI.AGENT.REPORTAgent 状态上报
Agent 管理$mq9.AI.AGENT.DISCOVERAgent 发现

所有命令均采用 request/reply 模式(nats request),server 必定返回响应。


响应格式

每个命令有独立的响应结构。所有响应均包含 error 字段:

含义
""成功
非空字符串失败,值为错误描述

$mq9.AI.MAILBOX.CREATE

创建 mailbox,mail_address 由用户自定义。

请求字段

字段类型必填说明
namestringmailbox 的 mail_address,需符合格式规范;不填则由 broker 自动生成
ttlu64?存活时间(秒),0 表示永不过期

响应字段

字段类型说明
errorstring成功为空,失败为错误信息
mail_addressstring创建成功后的邮箱地址

示例

bash
nats request '$mq9.AI.MAILBOX.CREATE' '{"name": "agent.translator.inbox", "ttl": 0}'
# 响应
{"error":"","mail_address":"agent.translator.inbox"}

# 重复创建 → 报错
nats request '$mq9.AI.MAILBOX.CREATE' '{"name": "agent.translator.inbox"}'
# 响应
{"error":"mailbox agent.translator.inbox already exists","mail_address":""}

$mq9.AI.MSG.SEND.

向指定 mailbox 发送消息。Payload 是 byte 数组,mq9 不解析内容。

请求参数

Subject$mq9.AI.MSG.SEND.{mail_address}

Payload:任意 byte 数组,mq9 不解析内容。

Header(均可选)

Header说明
mq9-key: {key}去重/压实 key。同 key 的消息存储层只保留最新一条,旧消息被覆盖
mq9-delay: {seconds}延迟投递秒数。消息写入后不立即可见,等 delay 到期再出现在 FETCH 结果中。延迟消息 msg_id 返回 -1
mq9-ttl: {seconds}消息级 TTL(秒)。消息在 发送时间 + ttl 后自动过期,独立于邮箱 TTL
mq9-tags: {tag1},{tag2}逗号分隔的用户标签,如 billing,vip。可通过 QUERY 的 tags 字段过滤
mq9-priority: {value}消息优先级:normal(默认)/ urgent / critical。不填默认 normal

SEND 响应字段

字段类型说明
errorstring成功为空,失败为错误信息(如 mailbox 不存在)
msg_idi64消息写入存储后的 offset;延迟消息返回 -1

优先级说明

典型场景
normal(默认)任务分发、结果返回、状态上报
urgent审批请求、重要通知
critical任务中断、紧急指令
同优先级 FIFO,跨优先级 critical > urgent > normal。

SEND 示例

bash
# 普通消息
nats request '$mq9.AI.MSG.SEND.agent.translator.inbox' '{"text":"hello"}'
# 响应
{"error":"","msg_id":0}

# 带 key(同 key 存储层只保留最新一条)
nats request '$mq9.AI.MSG.SEND.task.001.callback' \
  -H "mq9-key:status" \
  '{"status":"running"}'
# 响应
{"error":"","msg_id":1}

# 带 tags(可通过 QUERY tags 字段过滤)
nats request '$mq9.AI.MSG.SEND.agent.order.inbox' \
  -H "mq9-tags:billing,vip" \
  '{"order_id":"o-001"}'
# 响应
{"error":"","msg_id":2}

# 延迟 60 秒投递(msg_id 返回 -1 表示延迟消息)
nats request '$mq9.AI.MSG.SEND.agent.translator.inbox' \
  -H "mq9-delay:60" \
  '{"text":"delayed task"}'
# 响应
{"error":"","msg_id":-1}

# 消息级 TTL 300 秒(独立于邮箱 TTL)
nats request '$mq9.AI.MSG.SEND.agent.translator.inbox' \
  -H "mq9-ttl:300" \
  '{"text":"short-lived message"}'
# 响应
{"error":"","msg_id":3}

# 紧急消息(通过 header 指定优先级)
nats request '$mq9.AI.MSG.SEND.agent.translator.inbox' \
  -H "mq9-priority:urgent" \
  '{"alert":"please expedite"}'
# 响应
{"error":"","msg_id":4}

$mq9.AI.MSG.FETCH.

从 mailbox 拉取消息。支持两种消费模式:有状态消费(传 group_name,服务端记录位点)和无状态消费(不传 group_name,每次按 deliver 策略从头开始)。

FETCH 请求字段

字段类型必填说明
group_namestring?消费组名称。传入时启用有状态消费,同组共享位点;不传时为无状态消费,每次按 deliver 策略开始,不记录位点
deliverstring起点策略,默认 latest;有状态消费时仅在无位点记录或 force_deliver: true 时生效
from_timeu64?deliver: "from_time" 时生效,Unix 时间戳(秒)
from_idu64?deliver: "from_id" 时生效,从该 msg_id 开始拉取(含)
force_deliverbool?仅有状态消费有效;true 时忽略已有位点,强制按 deliver 重新开始
configobject?拉取行为配置,见下方说明

deliver 策略

说明
latest(默认)从当前时刻起只拉新消息
earliest从 mailbox 最早的消息开始
from_time从指定时间戳之后开始,需配合 from_time 字段
from_id从指定 msg_id 开始(含该条),需配合 from_id 字段

有状态消费的位点行为(传 group_name)

条件行为
有位点记录 且 force_deliver: false从上次断点续拉,deliver 不生效
有位点记录 且 force_deliver: true忽略位点,按 deliver 策略重新开始
无位点记录deliver 策略开始(首次消费)

无状态消费(不传 group_name)

服务端生成临时随机 group,使用 deliver 策略定位起点,消费完成后不提交位点。适合探查、调试或一次性读取场景。

config 字段

字段类型默认值说明
num_msgsu32?100单次最多拉取的消息条数
max_wait_msu64?500服务端无数据时的等待时间(毫秒)。不传时默认 500ms;传 0 表示立即返回不等待。等待结束后返回空列表,避免客户端频繁轮询打爆服务端

FETCH 响应字段

字段类型说明
errorstring成功为空,失败为错误信息(如 mailbox 不存在)
messagesarray消息列表,每条含 msg_idpayloadprioritycreate_time

FETCH 示例

bash
# 无状态消费:不传 group_name,每次从最新消息开始(默认 deliver: latest)
nats request '$mq9.AI.MSG.FETCH.task.001.callback' '{}'

# 无状态消费:每次从最早消息全量读取
nats request '$mq9.AI.MSG.FETCH.task.001.callback' \
  '{"deliver": "earliest"}'

# 有状态消费:有位点则续拉,首次消费只拉新消息(默认 deliver: latest)
nats request '$mq9.AI.MSG.FETCH.task.001.callback' \
  '{"group_name": "worker-group-1"}'

# 有状态消费:有位点则续拉,首次消费从最早消息开始
nats request '$mq9.AI.MSG.FETCH.task.001.callback' \
  '{"group_name": "worker-group-1", "deliver": "earliest"}'

# 有状态消费:强制重置位点,从最早消息重新开始
nats request '$mq9.AI.MSG.FETCH.task.001.callback' \
  '{"group_name": "worker-group-1", "deliver": "earliest", "force_deliver": true}'

# 指定单次拉取条数,不填默认 100 条
nats request '$mq9.AI.MSG.FETCH.task.001.callback' \
  '{"group_name": "worker-group-1", "config": {"num_msgs": 50}}'

$mq9.AI.MSG.ACK.

确认消息已处理,broker 推进该消费组的消费位点。

ACK 请求字段

字段类型必填说明
group_namestring消费组名称,与 FETCH 时一致
mail_addressstringmailbox 地址
msg_idu64需要确认的消息 ID(来自 FETCH 响应)

ACK 响应字段

字段类型说明
errorstring成功为空,失败为错误信息

ACK 示例

bash
nats request '$mq9.AI.MSG.ACK.task.001.callback' \
  '{"group_name": "worker-group-1", "mail_address": "task.001.callback", "msg_id": 5}'
# 响应
{"error":""}

$mq9.AI.MSG.QUERY.

查询 mailbox 当前存储的消息,不影响订阅推送。

QUERY 请求字段

字段类型说明
keystring?按 key 查询,返回该 key 的最新消息
limitu64?返回条数上限
sinceu64?返回该时间戳之后的消息

QUERY 响应字段

字段类型说明
errorstring成功为空
messagesarray消息列表

QUERY 示例

bash
# 查询所有消息
nats request '$mq9.AI.MSG.QUERY.task.001.callback' '{}'

# 查询 key=status 的最新消息
nats request '$mq9.AI.MSG.QUERY.task.001.callback' '{"key": "status"}'

# 最近 10 条
nats request '$mq9.AI.MSG.QUERY.task.001.callback' '{"limit": 10}'

# 某时间戳之后
nats request '$mq9.AI.MSG.QUERY.task.001.callback' '{"since": 1234567890}'

$mq9.AI.MSG.DELETE.{mail_address}.

删除 mailbox 中的指定消息。

DELETE 示例

bash
nats request '$mq9.AI.MSG.DELETE.task.001.callback.2' ''
# 响应
{"error":"","deleted":true}

$mq9.AI.AGENT.REGISTER

注册 Agent。Body 是上层协议的内容(当前示例为 A2A AgentCard),mq9 不干涉内容体,只要求携带 mailbox 字段作为路由标识。以后换其他协议同理。

REGISTER 示例

bash
nats request '$mq9.AI.AGENT.REGISTER' \
  '{ ...AgentCard,mailbox = "mq9://broker/agent.translator.inbox"... }'
# 响应
{"error":""}

$mq9.AI.AGENT.UNREGISTER

注销 Agent。

UNREGISTER 示例

bash
nats request '$mq9.AI.AGENT.UNREGISTER' \
  '{ ...mailbox = "mq9://broker/agent.translator.inbox"... }'
# 响应
{"error":""}

$mq9.AI.AGENT.REPORT

Agent 状态上报。Body 是上层协议内容,mq9 不干涉。

REPORT 示例

bash
nats request '$mq9.AI.AGENT.REPORT' \
  '{ ...mailbox = "mq9://broker/agent.translator.inbox", 状态字段由上层协议定义... }'
# 响应
{"error":""}

$mq9.AI.AGENT.DISCOVER

按条件检索已注册的 Agent,返回原始注册内容列表,mq9 不转换不包装。

DISCOVER 请求字段

字段类型说明
textstring?全文检索关键词
semanticstring?语义检索自然语言描述(向量检索)。同时传入时优先于 text
limitnumber?每页返回结果数量上限(默认 20)
pagenumber?页码,从 1 开始(默认 1)

不传 textsemantic 时,返回该租户下所有已注册的 Agent。

DISCOVER 示例

bash
# 全文检索
nats request '$mq9.AI.AGENT.DISCOVER' '{"text": "payment invoice"}'

# 语义检索(向量检索,优先于 text)
nats request '$mq9.AI.AGENT.DISCOVER' '{"semantic": "处理付款并生成发票"}'

# 分页:第 2 页,每页 10 条
nats request '$mq9.AI.AGENT.DISCOVER' '{"text": "payment", "limit": 10, "page": 2}'

# 列出全部
nats request '$mq9.AI.AGENT.DISCOVER' '{}'
# 返回:[{ ...原始注册内容... }, ...]

错误一览

场景响应示例
mailbox 不存在(SEND/SUB/QUERY/DELETE){"error":"mailbox xxx does not exist"}
mailbox 已存在(CREATE 不幂等){"error":"mailbox xxx already exists","mail_address":""}
msg_id 不存在(DELETE){"error":"message not found"}