协议设计
概述
$mq9.AI.* 是 mq9 为 Agent 异步通信设计的协议。核心解决的问题:Agent 之间的异步通信,发件方和收件方不需要同时在线。
mq9 只解决通信问题——消息怎么可靠送达。消息内容是 byte 数组,不解析、不校验、不限制。上层是 A2A、MCP 还是其他协议,mq9 不干涉。
mail_address 格式规范
字符集:小写字母(a-z)、数字(0-9)、点(.)
长度:1 到 128 字符
大小写:严格小写,含大写字符的 mail_address 会被 broker 拒绝
位置规则:. 只能出现在中间,开头和结尾必须是小写字母或数字;不允许连续的 .
语义:mail_address 是不透明字符串,. 不参与协议路由或匹配,仅作视觉分组
编码:不允许 URL percent-encoding
| 合法示例 | 非法示例 |
|---|---|
task.001 | task-001(含连字符) |
agent.inbox | task_001(含下划线) |
analytics.result | Task.001(含大写) |
acme.org.task.queue | .task.001(点开头) |
session.20260502 | task.001.(点结尾) |
task..001(连续点) |
完整地址示例:
task.001
agent.inbox
analytics.result
acme.task.queue
session.20260502
order.processing.urgent
agent.001.inbox基础概念
mail_address:通过 MAILBOX.CREATE 创建邮箱时由用户自定义的通信地址。不绑定 Agent 身份,一个 Agent 可以为不同任务申请不同的 mail_address。用完不管,TTL 自动清理。
mail_address 不可猜测即安全边界。 知道 mail_address 就能发消息、能订阅。不知道 mail_address 就无从操作。没有 token,没有 ACL。
TTL:邮箱创建时声明,到期自动销毁,消息随之清理。重复 CREATE 同名邮箱会报错(mailbox xxx already exists),创建前可先 QUERY 检查是否存在。
priority:可选。不指定为 normal 默认优先级,指定 urgent 或 critical 可提升处理顺序。同优先级 FIFO,跨优先级高优先处理。存储层保证顺序,消费方无需自行排序。
msg_id:每条消息的唯一标识(消息在存储中的 offset),客户端用于去重和删除操作。
消费语义:FETCH 采用 pull 模式,支持有状态消费(传 group_name,broker 记录位点,重连后可续拉)和无状态消费(不传 group_name,每次按 deliver 策略独立消费,不记录位点)。
消息流程:消息到达 → 写存储 → 客户端主动 FETCH 拉取 → ACK 确认 → broker 推进该 group 消费位点。
协议总览
| 分类 | Subject | 说明 |
|---|---|---|
| Mailbox 管理 | $mq9.AI.MAILBOX.CREATE | 创建 mailbox |
| 消息通信 | $mq9.AI.MSG.SEND.{mail_address} | 发送消息(优先级通过 mq9-priority header 指定) |
| 消息通信 | $mq9.AI.MSG.FETCH.{mail_address} | 订阅 mailbox 消息 |
| 消息通信 | $mq9.AI.MSG.ACK.{mail_address} | 消息 ACK |
| 消息通信 | $mq9.AI.MSG.QUERY.{mail_address} | 查询 mailbox 内消息 |
| 消息通信 | $mq9.AI.MSG.DELETE.{mail_address}.{msg_id} | 删除指定消息 |
| Agent 管理 | $mq9.AI.AGENT.REGISTER | Agent 注册 |
| Agent 管理 | $mq9.AI.AGENT.UNREGISTER | Agent 注销 |
| Agent 管理 | $mq9.AI.AGENT.REPORT | Agent 状态上报 |
| Agent 管理 | $mq9.AI.AGENT.DISCOVER | Agent 发现 |
所有命令均采用 request/reply 模式(
nats request),server 必定返回响应。
响应格式
每个命令有独立的响应结构。所有响应均包含 error 字段:
| 值 | 含义 |
|---|---|
"" | 成功 |
| 非空字符串 | 失败,值为错误描述 |
$mq9.AI.MAILBOX.CREATE
创建 mailbox,mail_address 由用户自定义。
请求字段
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
name | string | 否 | mailbox 的 mail_address,需符合格式规范;不填则由 broker 自动生成 |
ttl | u64? | 否 | 存活时间(秒),0 表示永不过期 |
响应字段
| 字段 | 类型 | 说明 |
|---|---|---|
error | string | 成功为空,失败为错误信息 |
mail_address | string | 创建成功后的邮箱地址 |
示例
nats request '$mq9.AI.MAILBOX.CREATE' '{"name": "agent.translator.inbox", "ttl": 0}'
# 响应
{"error":"","mail_address":"agent.translator.inbox"}
# 重复创建 → 报错
nats request '$mq9.AI.MAILBOX.CREATE' '{"name": "agent.translator.inbox"}'
# 响应
{"error":"mailbox agent.translator.inbox already exists","mail_address":""}$mq9.AI.MSG.SEND.
向指定 mailbox 发送消息。Payload 是 byte 数组,mq9 不解析内容。
请求参数
Subject:$mq9.AI.MSG.SEND.{mail_address}
Payload:任意 byte 数组,mq9 不解析内容。
Header(均可选)
| Header | 说明 |
|---|---|
mq9-key: {key} | 去重/压实 key。同 key 的消息存储层只保留最新一条,旧消息被覆盖 |
mq9-delay: {seconds} | 延迟投递秒数。消息写入后不立即可见,等 delay 到期再出现在 FETCH 结果中。延迟消息 msg_id 返回 -1 |
mq9-ttl: {seconds} | 消息级 TTL(秒)。消息在 发送时间 + ttl 后自动过期,独立于邮箱 TTL |
mq9-tags: {tag1},{tag2} | 逗号分隔的用户标签,如 billing,vip。可通过 QUERY 的 tags 字段过滤 |
mq9-priority: {value} | 消息优先级:normal(默认)/ urgent / critical。不填默认 normal |
SEND 响应字段
| 字段 | 类型 | 说明 |
|---|---|---|
error | string | 成功为空,失败为错误信息(如 mailbox 不存在) |
msg_id | i64 | 消息写入存储后的 offset;延迟消息返回 -1 |
优先级说明
| 值 | 典型场景 |
|---|---|
normal(默认) | 任务分发、结果返回、状态上报 |
urgent | 审批请求、重要通知 |
critical | 任务中断、紧急指令 |
| 同优先级 FIFO,跨优先级 critical > urgent > normal。 |
SEND 示例
# 普通消息
nats request '$mq9.AI.MSG.SEND.agent.translator.inbox' '{"text":"hello"}'
# 响应
{"error":"","msg_id":0}
# 带 key(同 key 存储层只保留最新一条)
nats request '$mq9.AI.MSG.SEND.task.001.callback' \
-H "mq9-key:status" \
'{"status":"running"}'
# 响应
{"error":"","msg_id":1}
# 带 tags(可通过 QUERY tags 字段过滤)
nats request '$mq9.AI.MSG.SEND.agent.order.inbox' \
-H "mq9-tags:billing,vip" \
'{"order_id":"o-001"}'
# 响应
{"error":"","msg_id":2}
# 延迟 60 秒投递(msg_id 返回 -1 表示延迟消息)
nats request '$mq9.AI.MSG.SEND.agent.translator.inbox' \
-H "mq9-delay:60" \
'{"text":"delayed task"}'
# 响应
{"error":"","msg_id":-1}
# 消息级 TTL 300 秒(独立于邮箱 TTL)
nats request '$mq9.AI.MSG.SEND.agent.translator.inbox' \
-H "mq9-ttl:300" \
'{"text":"short-lived message"}'
# 响应
{"error":"","msg_id":3}
# 紧急消息(通过 header 指定优先级)
nats request '$mq9.AI.MSG.SEND.agent.translator.inbox' \
-H "mq9-priority:urgent" \
'{"alert":"please expedite"}'
# 响应
{"error":"","msg_id":4}$mq9.AI.MSG.FETCH.
从 mailbox 拉取消息。支持两种消费模式:有状态消费(传 group_name,服务端记录位点)和无状态消费(不传 group_name,每次按 deliver 策略从头开始)。
FETCH 请求字段
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
group_name | string? | 否 | 消费组名称。传入时启用有状态消费,同组共享位点;不传时为无状态消费,每次按 deliver 策略开始,不记录位点 |
deliver | string | 否 | 起点策略,默认 latest;有状态消费时仅在无位点记录或 force_deliver: true 时生效 |
from_time | u64? | 否 | deliver: "from_time" 时生效,Unix 时间戳(秒) |
from_id | u64? | 否 | deliver: "from_id" 时生效,从该 msg_id 开始拉取(含) |
force_deliver | bool? | 否 | 仅有状态消费有效;true 时忽略已有位点,强制按 deliver 重新开始 |
config | object? | 否 | 拉取行为配置,见下方说明 |
deliver 策略
| 值 | 说明 |
|---|---|
latest(默认) | 从当前时刻起只拉新消息 |
earliest | 从 mailbox 最早的消息开始 |
from_time | 从指定时间戳之后开始,需配合 from_time 字段 |
from_id | 从指定 msg_id 开始(含该条),需配合 from_id 字段 |
有状态消费的位点行为(传 group_name)
| 条件 | 行为 |
|---|---|
有位点记录 且 force_deliver: false | 从上次断点续拉,deliver 不生效 |
有位点记录 且 force_deliver: true | 忽略位点,按 deliver 策略重新开始 |
| 无位点记录 | 按 deliver 策略开始(首次消费) |
无状态消费(不传 group_name)
服务端生成临时随机 group,使用 deliver 策略定位起点,消费完成后不提交位点。适合探查、调试或一次性读取场景。
config 字段
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
num_msgs | u32? | 100 | 单次最多拉取的消息条数 |
max_wait_ms | u64? | 500 | 服务端无数据时的等待时间(毫秒)。不传时默认 500ms;传 0 表示立即返回不等待。等待结束后返回空列表,避免客户端频繁轮询打爆服务端 |
FETCH 响应字段
| 字段 | 类型 | 说明 |
|---|---|---|
error | string | 成功为空,失败为错误信息(如 mailbox 不存在) |
messages | array | 消息列表,每条含 msg_id、payload、priority、create_time |
FETCH 示例
# 无状态消费:不传 group_name,每次从最新消息开始(默认 deliver: latest)
nats request '$mq9.AI.MSG.FETCH.task.001.callback' '{}'
# 无状态消费:每次从最早消息全量读取
nats request '$mq9.AI.MSG.FETCH.task.001.callback' \
'{"deliver": "earliest"}'
# 有状态消费:有位点则续拉,首次消费只拉新消息(默认 deliver: latest)
nats request '$mq9.AI.MSG.FETCH.task.001.callback' \
'{"group_name": "worker-group-1"}'
# 有状态消费:有位点则续拉,首次消费从最早消息开始
nats request '$mq9.AI.MSG.FETCH.task.001.callback' \
'{"group_name": "worker-group-1", "deliver": "earliest"}'
# 有状态消费:强制重置位点,从最早消息重新开始
nats request '$mq9.AI.MSG.FETCH.task.001.callback' \
'{"group_name": "worker-group-1", "deliver": "earliest", "force_deliver": true}'
# 指定单次拉取条数,不填默认 100 条
nats request '$mq9.AI.MSG.FETCH.task.001.callback' \
'{"group_name": "worker-group-1", "config": {"num_msgs": 50}}'$mq9.AI.MSG.ACK.
确认消息已处理,broker 推进该消费组的消费位点。
ACK 请求字段
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
group_name | string | 是 | 消费组名称,与 FETCH 时一致 |
mail_address | string | 是 | mailbox 地址 |
msg_id | u64 | 是 | 需要确认的消息 ID(来自 FETCH 响应) |
ACK 响应字段
| 字段 | 类型 | 说明 |
|---|---|---|
error | string | 成功为空,失败为错误信息 |
ACK 示例
nats request '$mq9.AI.MSG.ACK.task.001.callback' \
'{"group_name": "worker-group-1", "mail_address": "task.001.callback", "msg_id": 5}'
# 响应
{"error":""}$mq9.AI.MSG.QUERY.
查询 mailbox 当前存储的消息,不影响订阅推送。
QUERY 请求字段
| 字段 | 类型 | 说明 |
|---|---|---|
key | string? | 按 key 查询,返回该 key 的最新消息 |
limit | u64? | 返回条数上限 |
since | u64? | 返回该时间戳之后的消息 |
QUERY 响应字段
| 字段 | 类型 | 说明 |
|---|---|---|
error | string | 成功为空 |
messages | array | 消息列表 |
QUERY 示例
# 查询所有消息
nats request '$mq9.AI.MSG.QUERY.task.001.callback' '{}'
# 查询 key=status 的最新消息
nats request '$mq9.AI.MSG.QUERY.task.001.callback' '{"key": "status"}'
# 最近 10 条
nats request '$mq9.AI.MSG.QUERY.task.001.callback' '{"limit": 10}'
# 某时间戳之后
nats request '$mq9.AI.MSG.QUERY.task.001.callback' '{"since": 1234567890}'$mq9.AI.MSG.DELETE.{mail_address}.
删除 mailbox 中的指定消息。
DELETE 示例
nats request '$mq9.AI.MSG.DELETE.task.001.callback.2' ''
# 响应
{"error":"","deleted":true}$mq9.AI.AGENT.REGISTER
注册 Agent。Body 是上层协议的内容(当前示例为 A2A AgentCard),mq9 不干涉内容体,只要求携带 mailbox 字段作为路由标识。以后换其他协议同理。
REGISTER 示例
nats request '$mq9.AI.AGENT.REGISTER' \
'{ ...AgentCard,mailbox = "mq9://broker/agent.translator.inbox"... }'
# 响应
{"error":""}$mq9.AI.AGENT.UNREGISTER
注销 Agent。
UNREGISTER 示例
nats request '$mq9.AI.AGENT.UNREGISTER' \
'{ ...mailbox = "mq9://broker/agent.translator.inbox"... }'
# 响应
{"error":""}$mq9.AI.AGENT.REPORT
Agent 状态上报。Body 是上层协议内容,mq9 不干涉。
REPORT 示例
nats request '$mq9.AI.AGENT.REPORT' \
'{ ...mailbox = "mq9://broker/agent.translator.inbox", 状态字段由上层协议定义... }'
# 响应
{"error":""}$mq9.AI.AGENT.DISCOVER
按条件检索已注册的 Agent,返回原始注册内容列表,mq9 不转换不包装。
DISCOVER 请求字段
| 字段 | 类型 | 说明 |
|---|---|---|
text | string? | 全文检索关键词 |
semantic | string? | 语义检索自然语言描述(向量检索)。同时传入时优先于 text。 |
limit | number? | 每页返回结果数量上限(默认 20) |
page | number? | 页码,从 1 开始(默认 1) |
不传 text 或 semantic 时,返回该租户下所有已注册的 Agent。
DISCOVER 示例
# 全文检索
nats request '$mq9.AI.AGENT.DISCOVER' '{"text": "payment invoice"}'
# 语义检索(向量检索,优先于 text)
nats request '$mq9.AI.AGENT.DISCOVER' '{"semantic": "处理付款并生成发票"}'
# 分页:第 2 页,每页 10 条
nats request '$mq9.AI.AGENT.DISCOVER' '{"text": "payment", "limit": 10, "page": 2}'
# 列出全部
nats request '$mq9.AI.AGENT.DISCOVER' '{}'
# 返回:[{ ...原始注册内容... }, ...]错误一览
| 场景 | 响应示例 |
|---|---|
| mailbox 不存在(SEND/SUB/QUERY/DELETE) | {"error":"mailbox xxx does not exist"} |
| mailbox 已存在(CREATE 不幂等) | {"error":"mailbox xxx already exists","mail_address":""} |
| msg_id 不存在(DELETE) | {"error":"message not found"} |
