A2A — Java
Installation
Add the dependency to pom.xml:
<dependency>
<groupId>io.mq9</groupId>
<artifactId>mq9</artifactId>
<version>0.1.0</version>
</dependency>Requirements: Java 17+, Maven
Overview
Every agent is a peer — it can both send tasks to other agents and receive tasks from other agents. There is no "client" or "server" role.
- Create a
Mq9A2AAgentwith only the broker address. - Register a handler with
onMessage(), including consumer options. - Call
connect()to connect to the broker. - Call
createMailbox()to create a mailbox and start the background consumer. - Call
register()to publish agent identity to the registry so other agents can discover it.
Quick start
Agent A — register and handle incoming tasks
import io.a2a.spec.*;
import io.mq9.ConsumeOptions;
import io.mq9.a2a.*;
Mq9A2AAgent agentA = Mq9A2AAgent.builder().build();
// A2A protocol event sequence: WORKING → Artifact → COMPLETED
agentA.onMessage(
(A2AContext ctx, EventQueue queue) ->
// A2A protocol: WORKING first — tells the sender processing has started
queue.working(ctx)
.thenCompose(v -> {
// A2A protocol: message body is one or more Parts; get the first text
String text = ctx.firstTextPart().orElse("");
String result = myTranslate(text); // replace with your logic
// A2A protocol: push result as Artifact — call multiple times for streaming
return queue.artifact(ctx, "translation", result);
})
// A2A protocol: COMPLETED last — signals the task is done
.thenCompose(v -> queue.completed(ctx)),
ConsumeOptions.builder()
.groupName("demo.agent.translator.workers")
.deliver("earliest")
.numMsgs(10)
.maxWaitMs(500)
.build()
);
agentA.connect().join();
String mailbox = agentA.createMailbox("demo.agent.translator", 0).join();
System.out.println("mailbox: " + mailbox);
// agentA.register(card).join(); // optional: publish to registryAgent B — discover Agent A and send a task
import io.a2a.spec.*;
import io.mq9.ConsumeOptions;
import io.mq9.a2a.*;
import java.util.List;
import java.util.Map;
Mq9A2AAgent agentB = Mq9A2AAgent.builder().build();
// All messages arrive here — both reply events and new incoming tasks.
// Use context.taskId to tell them apart: if it matches a task you sent,
// it's a reply; otherwise it's a new task from another agent.
agentB.onMessage(
(A2AContext ctx, EventQueue queue) -> {
System.out.println("received event task_id=" + ctx.taskId);
ctx.firstTextPart().ifPresent(t -> System.out.println("text: " + t));
return CompletableFuture.completedFuture(null);
},
ConsumeOptions.builder()
.groupName("demo.agent.sender.workers")
.deliver("earliest")
.numMsgs(10)
.maxWaitMs(500)
.build()
);
agentB.connect().join();
String bMailbox = agentB.createMailbox("demo.agent.sender", 300).join();
// Discover Agent A
List<Map<String, Object>> agents = agentB.discover("translation", false, 5).join();
Map<String, Object> target = agents.get(0);
// Build A2A SendMessageRequest — message body is one or more Parts
Message msg = new Message.Builder()
.role(Message.Role.USER)
.parts(new TextPart("你好,世界"))
.build();
SendMessageRequest request = new SendMessageRequest(
null, new MessageSendParams(msg, null, null));
// sendMessage returns msg_id; task_id is generated by Agent A (the executor)
// and arrives with reply events, readable as context.taskId in onMessage
long msgId = agentB.sendMessage(target, request, bMailbox).join();
System.out.println("sent, msg_id=" + msgId);
Thread.sleep(10_000); // wait for reply to arrive via onMessage
agentB.close();Mq9A2AAgent
Mq9A2AAgent agent = Mq9A2AAgent.builder()
.server("nats://demo.robustmq.com:4222")
.requestTimeoutMs(60_000)
.build();| Parameter | Type | Description |
|---|---|---|
server | String | NATS address of the mq9 broker. Default: nats://demo.robustmq.com:4222 |
requestTimeoutMs | long | Request timeout in milliseconds. Default: 60000 |
connect()
Connects to the broker. Returns CompletableFuture<Void>. Must be called before any other operation.
close()
Stops the consumer and disconnects from the broker. Call after all queued messages have been processed.
onMessage(handler)
Registers a message handler with default consumer options:
agent.onMessage((ctx, queue) ->
queue.working(ctx)
.thenCompose(v -> queue.artifact(ctx, "result", myProcess(ctx)))
.thenCompose(v -> queue.completed(ctx))
);onMessage(handler, options)
Registers a message handler with explicit consumer options:
agent.onMessage(handler, ConsumeOptions.builder()
.groupName("my-agent.workers")
.deliver("earliest")
.numMsgs(10)
.maxWaitMs(500)
.build());ConsumeOptions parameters:
| Parameter | Description |
|---|---|
groupName | Consumer group name. null auto-generates {mailbox}.workers, ensuring consumption resumes from the last committed offset after a restart |
deliver | Delivery start: "earliest" (default) resumes from the oldest unconsumed message; "latest" only receives new messages |
numMsgs | Messages per fetch batch. Default: 10 |
maxWaitMs | Max wait time per fetch when the mailbox is empty, in milliseconds. Default: 500 |
createMailbox(name, ttl)
Creates a mailbox and starts the background consumer. Returns CompletableFuture<String> (mailbox address).
| Parameter | Type | Description |
|---|---|---|
name | String | Mailbox name, typically AgentCard.name() |
ttl | long | Mailbox TTL in seconds; 0 = permanent |
The agent can receive messages immediately after this call — no registration required.
register(card)
Publishes agent identity to the registry so others can discover it via discover(). Returns CompletableFuture<Void>. Must be called after createMailbox().
Parameter: card — io.a2a.spec.AgentCard.
unregister()
Removes this agent from the registry. Returns CompletableFuture<Void>. The connection and consumer stay active — queued messages can still be processed. Call close() when ready to fully stop.
discover(query, semantic, limit)
Discovers agents by natural-language description. Returns CompletableFuture<List<Map<String, Object>>>.
| Parameter | Type | Description |
|---|---|---|
query | String | Natural-language query; null to list all |
semantic | boolean | true for vector semantic search; false for keyword match |
limit | int | Max number of results |
Each result map contains name, mailbox, agent_card, and other fields.
sendMessage(mailAddress, request, replyTo)
Sends a message to another agent. Returns CompletableFuture<Long> (msg_id).
| Parameter | Type | Description |
|---|---|---|
mailAddress | Object | Agent info Map from discover() (must contain "mailbox"), or a mailbox address string |
request | SendMessageRequest | The A2A message request |
replyTo | String | Your own mailbox address; null for one-way send |
Returns the msg_id assigned by the broker. The task_id is generated by the executing agent (the receiver) and arrives with reply events, readable as context.taskId in onMessage.
Handler types
A2AContext
| Field / Method | Type | Description |
|---|---|---|
ctx.taskId | String | Task ID, generated by the executing agent |
ctx.contextId | String | Context / session ID |
ctx.message | io.a2a.spec.Message | The incoming A2A message |
ctx.currentTask | io.a2a.spec.Task | Existing task for multi-turn conversations; null for new tasks |
ctx.firstTextPart() | Optional<String> | Text of the first TextPart in the message — avoids null checks in every handler |
EventQueue
EventQueue provides helper methods so you don't need to construct Builder chains in every handler:
| Method | Description |
|---|---|
queue.working(ctx) | Sends TaskStatusUpdateEvent(WORKING) |
queue.artifact(ctx, name, text) | Sends a TaskArtifactUpdateEvent with a single text part |
queue.completed(ctx) | Sends TaskStatusUpdateEvent(COMPLETED) as the final event |
queue.failed(ctx) | Sends TaskStatusUpdateEvent(FAILED) as the final event |
queue.enqueue(event) | Sends any A2A event (construct manually when needed) |
Standard A2A protocol event sequence:
queue.working(ctx)
.thenCompose(v -> queue.artifact(ctx, "result", output))
.thenCompose(v -> queue.completed(ctx))