Skip to content

For Engineer

You're building a multi-agent system. This is your integration guide.

This page assumes you've read What is mq9. It focuses on integration code and production considerations.

Quick start — public demo server

No local setup needed. Connect to the RobustMQ demo server:

bash
export NATS_URL=nats://demo.robustmq.com:4222

This is a shared environment — do not send sensitive data.

Register an agent

bash
nats request '$mq9.AI.AGENT.REGISTER' '{
  "name": "agent.summarizer",
  "mailbox": "agent.summarizer",
  "payload": "Summarizes documents and extracts key points; supports PDF and plain text"
}'

Discover agents

bash
# Semantic search
nats request '$mq9.AI.AGENT.DISCOVER' '{"semantic":"find an agent that summarizes documents","limit":5}'

# Keyword search
nats request '$mq9.AI.AGENT.DISCOVER' '{"text":"summarizer","limit":5}'

Create a mailbox

bash
nats request '$mq9.AI.MAILBOX.CREATE' '{"name":"quickstart.demo","ttl":300}'
# {"error":"","mail_address":"quickstart.demo"}

Send messages with priority

bash
# Critical — processed first
nats request '$mq9.AI.MSG.SEND.quickstart.demo' \
  --header 'mq9-priority:critical' \
  '{"type":"abort","task_id":"t-001"}'

# Normal (default, no header)
nats request '$mq9.AI.MSG.SEND.quickstart.demo' \
  '{"type":"task","payload":"process dataset A"}'

Fetch and ACK

bash
# Fetch — returns messages in priority order (critical → urgent → normal)
nats request '$mq9.AI.MSG.FETCH.quickstart.demo' '{
  "group_name": "my-worker",
  "deliver": "earliest",
  "config": {"num_msgs": 10}
}'

# ACK — advance offset to last processed msg_id
nats request '$mq9.AI.MSG.ACK.quickstart.demo' '{
  "group_name": "my-worker",
  "mail_address": "quickstart.demo",
  "msg_id": 3
}'

Install an SDK

mq9 provides official SDKs that wrap the NATS protocol calls with typed APIs:

bash
pip install mq9           # Python
npm install mq9           # JavaScript / TypeScript
go get github.com/robustmq/mq9/go   # Go
cargo add mq9             # Rust
xml
<!-- Java (Maven) -->
<dependency>
  <groupId>io.mq9</groupId>
  <artifactId>mq9</artifactId>
  <version>0.1.0</version>
</dependency>

Core operations (SDK examples)

Operations are ordered by the typical agent startup sequence: register first, discover others, then set up messaging.

agent_register — announce capabilities

python
# Python
from mq9 import Mq9Client
client = await Mq9Client.connect("nats://localhost:4222")

await client.agent_register({
    "name": "agent.translator",
    "mailbox": "agent.translator",
    "payload": "Multilingual translation; EN/ZH/JA/KO",
})
go
// Go
client, _ := mq9.Connect("nats://localhost:4222")
client.AgentRegister(ctx, mq9.AgentCard{
    Name:    "agent.translator",
    Mailbox: "agent.translator",
    Payload: "Multilingual translation; EN/ZH/JA/KO",
})

agent_discover — find agents by capability

python
# Python — semantic search
agents = await client.agent_discover(semantic="translate Chinese to English", limit=5)

# Python — keyword search
agents = await client.agent_discover(text="translator", limit=10)
go
// Go
agents, _ := client.AgentDiscover(ctx, mq9.DiscoverOptions{
    Semantic: "translate Chinese to English",
    Limit:    5,
})

Each returned agent has a name and mailbox field. Send directly to the mailbox.

mailbox_create — create a persistent address

python
# Python
address = await client.mailbox_create(name="agent.inbox", ttl=3600)
go
// Go
address, _ := client.MailboxCreate(ctx, "agent.inbox", 3600)
typescript
// TypeScript
const client = new Mq9Client("nats://localhost:4222");
await client.connect();
const address = await client.mailboxCreate({ name: "agent.inbox", ttl: 3600 });
  • name = "" (Python: None, Go: "") — broker auto-generates the address.
  • ttl = 0 — mailbox never expires.

send — send a message

python
# Python — with priority and options
msg_id = await client.send(
    "agent.inbox",
    b'{"task":"analyze","data":"..."}',
    priority=Priority.URGENT,
    key="state",       # dedup — only latest with this key is kept
    delay=60,          # deliver after 60 seconds
    ttl=300,           # message expires in 300 s
    tags=["billing"],
)
go
// Go
msgId, _ := client.Send(ctx, "agent.inbox", []byte(`{"task":"analyze"}`), mq9.SendOptions{
    Priority: mq9.PriorityUrgent,
    Key:      "state",
    Delay:    60,
})

fetch — pull messages (FETCH + ACK)

python
# Python — stateful consumption
from mq9 import FetchOptions
messages = await client.fetch("agent.inbox", group_name="workers", deliver="earliest")
for msg in messages:
    process(msg)
    await client.ack("agent.inbox", "workers", msg.msg_id)
go
// Go
messages, _ := client.Fetch(ctx, "agent.inbox", mq9.FetchOptions{
    GroupName: "workers",
    Deliver:   "earliest",
})
for _, msg := range messages {
    process(msg)
    client.Ack(ctx, "agent.inbox", "workers", msg.MsgID)
}

ACK the last msg_id in the batch — one call confirms the whole batch. The next FETCH resumes from there.

Stateless fetch — omit group_name. Each call is independent; no offset is recorded. Use for one-off reads and inspection.

consume — continuous consumption loop

Use consume() for an automatic poll-and-process loop:

python
# Python
consumer = await client.consume(
    "agent.inbox",
    handler=async_handler,
    group_name="workers",
    auto_ack=True,
    error_handler=lambda msg, err: print(f"msg {msg.msg_id} failed: {err}"),
)
# ... do other work ...
await consumer.stop()
print(f"processed: {consumer.processed_count}")
typescript
// TypeScript
const consumer = await client.consume("task.inbox", async (msg) => {
  const data = JSON.parse(new TextDecoder().decode(msg.payload));
  console.log(data);
}, {
  groupName: "workers",
  autoAck: true,
  errorHandler: async (msg, err) => console.error(`msg ${msg.msgId} failed:`, err),
});
await consumer.stop();
  • Handler throws / returns error → message not ACKed, errorHandler called, loop continues.
  • consumer.stop() drains the current batch and exits cleanly.

agent_report and agent_unregister

python
# Report heartbeat while running
await client.agent_report({"name": "agent.translator", "report_info": "running"})

# Unregister at shutdown
await client.agent_unregister("agent.translator")

Common patterns

Dynamic agent discovery and dispatch

Discover an agent by capability, then send a task to its mailbox. No hard-coded addresses.

python
# Discover by semantic intent
agents = await client.agent_discover(semantic="extract structured data from PDFs", limit=3)
target = agents[0]

# Send task directly to discovered agent's mailbox
await client.send(target.mailbox, json.dumps({
    "task": "extract",
    "document": "report_q1.pdf",
    "reply_to": my_reply_mailbox,
}).encode())

Async request-reply

Agent A sends a request, then continues other work. Agent B processes at its own pace and replies to A's private reply mailbox.

bash
# Agent A: create private reply mailbox
nats request '$mq9.AI.MAILBOX.CREATE' '{"ttl":600}'
# → {"mail_address":"reply.a1b2c3"}

# Agent A: send request to Agent B with reply_to
nats request '$mq9.AI.MSG.SEND.agent.b' '{
  "request":"translate","text":"Hello world","lang":"fr","reply_to":"reply.a1b2c3"
}'

# Agent B: fetch its queue and reply
nats request '$mq9.AI.MSG.FETCH.agent.b' '{"group_name":"b-worker","deliver":"earliest"}'
nats request '$mq9.AI.MSG.SEND.reply.a1b2c3' '{"result":"Bonjour le monde"}'
nats request '$mq9.AI.MSG.ACK.agent.b' '{"group_name":"b-worker","mail_address":"agent.b","msg_id":1}'

# Agent A: FETCH reply whenever ready
nats request '$mq9.AI.MSG.FETCH.reply.a1b2c3' '{"deliver":"earliest"}'

Sub-agent result delivery

Parent creates a private reply mailbox and shares it with the sub-agent at spawn time. Sub-agent deposits result. Parent FETCHes asynchronously — no polling, no shared state.

python
# Parent: create private reply mailbox
reply_address = await client.mailbox_create(ttl=3600)

# Parent: send task to sub-agent with reply_to
await client.send("task.dispatch", json.dumps({
    "task": "summarize /data/corpus",
    "reply_to": reply_address,
}).encode())

# Parent: FETCH result whenever ready (non-blocking)
messages = await client.fetch(reply_address, group_name="orchestrator", deliver="earliest")

Multi-worker task queue

Multiple workers share the same group_name. Each task is processed by exactly one worker — no coordination, no duplicate processing. Workers join or leave at any time without reconfiguration.

python
# Producer: publish tasks with priority
await client.send("task.queue",
    b'{"task":"reindex","id":"t-101"}',
    priority=Priority.CRITICAL,
)

# Worker A and Worker B — same group_name
messages = await client.fetch("task.queue", group_name="workers", num_msgs=1)
for msg in messages:
    await process(msg)
    await client.ack("task.queue", "workers", msg.msg_id)

Cloud-to-edge command delivery

Cloud publishes to the edge agent's private mailbox. Edge agent may be offline for hours. On reconnect, it FETCHes all pending commands in priority order — critical reconfiguration before routine tasks.

go
// Cloud: publish commands (edge may be offline)
client.Send(ctx, "edge.agent", []byte(`{"cmd":"reconfigure","params":{"rate":100}}`),
    mq9.SendOptions{Priority: mq9.PriorityCritical})

client.Send(ctx, "edge.agent", []byte(`{"cmd":"run_diagnostic"}`), mq9.SendOptions{})

// Edge: on reconnect, fetch all pending in priority order
messages, _ := client.Fetch(ctx, "edge.agent", mq9.FetchOptions{
    GroupName: "edge-agent",
    Deliver:   "earliest",
    NumMsgs:   10,
})

Human-in-the-loop approval

The human's client uses the exact same protocol as any agent — no webhooks, no routing middleware.

typescript
// Agent: send approval request to human's mailbox
await client.send(humanMailAddress, JSON.stringify({
  type: "approval_request",
  action: "delete_dataset",
  target: "ds-prod-2024",
  reply_to: agentMailAddress,
}), { priority: Priority.URGENT });

// Human's client — same SDK
const consumer = await client.consume(humanMailAddress, async (req) => {
  const data = JSON.parse(new TextDecoder().decode(req.payload));
  const approved = await showApprovalUI(data);
  await client.send(data.reply_to, JSON.stringify({ approved, reviewer: "alice" }));
});

Capability-based routing

Use DISCOVER at runtime to route tasks to the right agent without hard-coding targets.

python
async def route_task(task_description: str, payload: bytes):
    agents = await client.agent_discover(semantic=task_description, limit=1)
    if not agents:
        raise RuntimeError(f"No agent found for: {task_description}")
    await client.send(agents[0].mailbox, payload)

LangChain / LangGraph integration

langchain-mq9 is an official toolkit that wraps all mq9 operations as LangChain tools. Works with both LangChain and LangGraph out of the box.

bash
pip install langchain-mq9

8 tools included:

ToolOperation
agent_registerRegister this agent with capabilities
agent_discoverFind agents by text or semantic search
create_mailboxCreate a private mailbox
send_messageSend a message with priority
fetch_messagesPull messages (FETCH + ACK model)
ack_messagesAdvance consumer group offset
query_messagesInspect mailbox read-only
delete_messageDelete a specific message

LangChain:

python
from langchain_mq9 import Mq9Toolkit
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_openai import ChatOpenAI

toolkit = Mq9Toolkit(server="nats://localhost:4222")
tools = toolkit.get_tools()

llm = ChatOpenAI(model="gpt-4o")
agent = create_tool_calling_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools)
result = executor.invoke({"input": "Discover all registered agents and send a task to the translator"})

LangGraph:

python
from langgraph.prebuilt import create_react_agent
from langchain_mq9 import Mq9Toolkit

toolkit = Mq9Toolkit(server="nats://localhost:4222")
app = create_react_agent(llm, toolkit.get_tools())
result = await app.ainvoke({"messages": [("human", "Find an agent that can summarize PDFs")]})

Manual tool usage (no LLM):

python
tools_by_name = {t.name: t for t in toolkit.get_tools()}

agents = await tools_by_name["agent_discover"]._arun(semantic="translation", limit=3)
address = await tools_by_name["create_mailbox"]._arun(ttl=300)
await tools_by_name["send_message"]._arun(mail_address=address, content="hello")
result = await tools_by_name["fetch_messages"]._arun(mail_address=address, group_name="workers")

MCP server

mq9 exposes a Model Context Protocol (MCP) server on the RobustMQ Admin Server. Connect any MCP-compatible client (Claude Desktop, Cursor, etc.):

text
http://<admin-server>:<port>/mcp

Error handling

All protocol responses include an error field. An empty string means success.

Error messageCause
mailbox xxx already existsCREATE called with a name that already exists
mailbox not foundMailbox does not exist or has expired
message not foundThe specified msg_id does not exist or has expired
invalid mail_addressFormat is invalid (uppercase, hyphens, etc.)
agent not foundUNREGISTER or REPORT called with unknown Agent name

SDK exceptions: all SDKs throw/return Mq9Error for non-empty error responses.


Deployment

Development (Docker)

bash
docker run -d --name mq9 -p 4222:4222 -v mq9-data:/data robustmq/robustmq:latest

Mount -v mq9-data:/data to persist mailboxes and messages across restarts.

Production — single node

bash
docker run -d \
  --name mq9 \
  -p 4222:4222 \
  -p 9090:9090 \
  -v /data/mq9:/data \
  --restart unless-stopped \
  robustmq/robustmq:latest
  • Port 4222 — mq9/NATS protocol (Agent connections)
  • Port 9090 — Prometheus metrics endpoint

A single node handles millions of concurrent Agent connections and is sufficient for most production workloads.

Cluster mode

Scale horizontally when a single node is not enough. Agents use the same SDK — no client code changes required.


Pattern reference

ScenarioKey feature
Dynamic routingAGENT.DISCOVER + send to returned mailbox
Capability registryAGENT.REGISTER + AGENT.DISCOVER
Point-to-pointPrivate mailbox + FETCH + ACK
Competing workersShared group_name across workers
Request-replyPrivate reply mailbox + reply_to
Offline deliveryStore-first, FETCH on reconnect
Cloud-to-edgePriority ordering on reconnect
Human-in-the-loopSame protocol for humans and Agents

See What for design rationale. See For Agent for the wire protocol reference.