Skip to content

A2A — Java

Installation

Add the dependency to pom.xml:

xml
<dependency>
    <groupId>io.mq9</groupId>
    <artifactId>mq9</artifactId>
    <version>0.1.0</version>
</dependency>

Requirements: Java 17+, Maven


Overview

Every agent is a peer — it can both send tasks to other agents and receive tasks from other agents. There is no "client" or "server" role.

  • Create a Mq9A2AAgent with only the broker address.
  • Register a handler with onMessage(), including consumer options.
  • Call connect() to connect to the broker.
  • Call createMailbox() to create a mailbox and start the background consumer.
  • Call register() to publish agent identity to the registry so other agents can discover it.

Quick start

Agent A — register and handle incoming tasks

java
import io.a2a.spec.*;
import io.mq9.ConsumeOptions;
import io.mq9.a2a.*;

Mq9A2AAgent agentA = Mq9A2AAgent.builder().build();

// A2A protocol event sequence: WORKING → Artifact → COMPLETED
agentA.onMessage(
    (A2AContext ctx, EventQueue queue) ->
        // A2A protocol: WORKING first — tells the sender processing has started
        queue.working(ctx)
            .thenCompose(v -> {
                // A2A protocol: message body is one or more Parts; get the first text
                String text = ctx.firstTextPart().orElse("");
                String result = myTranslate(text); // replace with your logic
                // A2A protocol: push result as Artifact — call multiple times for streaming
                return queue.artifact(ctx, "translation", result);
            })
            // A2A protocol: COMPLETED last — signals the task is done
            .thenCompose(v -> queue.completed(ctx)),
    ConsumeOptions.builder()
        .groupName("demo.agent.translator.workers")
        .deliver("earliest")
        .numMsgs(10)
        .maxWaitMs(500)
        .build()
);

agentA.connect().join();
String mailbox = agentA.createMailbox("demo.agent.translator", 0).join();
System.out.println("mailbox: " + mailbox);
// agentA.register(card).join(); // optional: publish to registry

Agent B — discover Agent A and send a task

java
import io.a2a.spec.*;
import io.mq9.ConsumeOptions;
import io.mq9.a2a.*;
import java.util.List;
import java.util.Map;

Mq9A2AAgent agentB = Mq9A2AAgent.builder().build();

// All messages arrive here — both reply events and new incoming tasks.
// Use context.taskId to tell them apart: if it matches a task you sent,
// it's a reply; otherwise it's a new task from another agent.
agentB.onMessage(
    (A2AContext ctx, EventQueue queue) -> {
        System.out.println("received event task_id=" + ctx.taskId);
        ctx.firstTextPart().ifPresent(t -> System.out.println("text: " + t));
        return CompletableFuture.completedFuture(null);
    },
    ConsumeOptions.builder()
        .groupName("demo.agent.sender.workers")
        .deliver("earliest")
        .numMsgs(10)
        .maxWaitMs(500)
        .build()
);

agentB.connect().join();
String bMailbox = agentB.createMailbox("demo.agent.sender", 300).join();

// Discover Agent A
List<Map<String, Object>> agents = agentB.discover("translation", false, 5).join();
Map<String, Object> target = agents.get(0);

// Build A2A SendMessageRequest — message body is one or more Parts
Message msg = new Message.Builder()
        .role(Message.Role.USER)
        .parts(new TextPart("你好,世界"))
        .build();
SendMessageRequest request = new SendMessageRequest(
        null, new MessageSendParams(msg, null, null));

// sendMessage returns msg_id; task_id is generated by Agent A (the executor)
// and arrives with reply events, readable as context.taskId in onMessage
long msgId = agentB.sendMessage(target, request, bMailbox).join();
System.out.println("sent, msg_id=" + msgId);

Thread.sleep(10_000); // wait for reply to arrive via onMessage
agentB.close();

Mq9A2AAgent

java
Mq9A2AAgent agent = Mq9A2AAgent.builder()
        .server("nats://demo.robustmq.com:4222")
        .requestTimeoutMs(60_000)
        .build();
ParameterTypeDescription
serverStringNATS address of the mq9 broker. Default: nats://demo.robustmq.com:4222
requestTimeoutMslongRequest timeout in milliseconds. Default: 60000

connect()

Connects to the broker. Returns CompletableFuture<Void>. Must be called before any other operation.

close()

Stops the consumer and disconnects from the broker. Call after all queued messages have been processed.

onMessage(handler)

Registers a message handler with default consumer options:

java
agent.onMessage((ctx, queue) ->
    queue.working(ctx)
        .thenCompose(v -> queue.artifact(ctx, "result", myProcess(ctx)))
        .thenCompose(v -> queue.completed(ctx))
);

onMessage(handler, options)

Registers a message handler with explicit consumer options:

java
agent.onMessage(handler, ConsumeOptions.builder()
        .groupName("my-agent.workers")
        .deliver("earliest")
        .numMsgs(10)
        .maxWaitMs(500)
        .build());

ConsumeOptions parameters:

ParameterDescription
groupNameConsumer group name. null auto-generates {mailbox}.workers, ensuring consumption resumes from the last committed offset after a restart
deliverDelivery start: "earliest" (default) resumes from the oldest unconsumed message; "latest" only receives new messages
numMsgsMessages per fetch batch. Default: 10
maxWaitMsMax wait time per fetch when the mailbox is empty, in milliseconds. Default: 500

createMailbox(name, ttl)

Creates a mailbox and starts the background consumer. Returns CompletableFuture<String> (mailbox address).

ParameterTypeDescription
nameStringMailbox name, typically AgentCard.name()
ttllongMailbox TTL in seconds; 0 = permanent

The agent can receive messages immediately after this call — no registration required.

register(card)

Publishes agent identity to the registry so others can discover it via discover(). Returns CompletableFuture<Void>. Must be called after createMailbox().

Parameter: cardio.a2a.spec.AgentCard.

unregister()

Removes this agent from the registry. Returns CompletableFuture<Void>. The connection and consumer stay active — queued messages can still be processed. Call close() when ready to fully stop.

discover(query, semantic, limit)

Discovers agents by natural-language description. Returns CompletableFuture<List<Map<String, Object>>>.

ParameterTypeDescription
queryStringNatural-language query; null to list all
semanticbooleantrue for vector semantic search; false for keyword match
limitintMax number of results

Each result map contains name, mailbox, agent_card, and other fields.

sendMessage(mailAddress, request, replyTo)

Sends a message to another agent. Returns CompletableFuture<Long> (msg_id).

ParameterTypeDescription
mailAddressObjectAgent info Map from discover() (must contain "mailbox"), or a mailbox address string
requestSendMessageRequestThe A2A message request
replyToStringYour own mailbox address; null for one-way send

Returns the msg_id assigned by the broker. The task_id is generated by the executing agent (the receiver) and arrives with reply events, readable as context.taskId in onMessage.


Handler types

A2AContext

Field / MethodTypeDescription
ctx.taskIdStringTask ID, generated by the executing agent
ctx.contextIdStringContext / session ID
ctx.messageio.a2a.spec.MessageThe incoming A2A message
ctx.currentTaskio.a2a.spec.TaskExisting task for multi-turn conversations; null for new tasks
ctx.firstTextPart()Optional<String>Text of the first TextPart in the message — avoids null checks in every handler

EventQueue

EventQueue provides helper methods so you don't need to construct Builder chains in every handler:

MethodDescription
queue.working(ctx)Sends TaskStatusUpdateEvent(WORKING)
queue.artifact(ctx, name, text)Sends a TaskArtifactUpdateEvent with a single text part
queue.completed(ctx)Sends TaskStatusUpdateEvent(COMPLETED) as the final event
queue.failed(ctx)Sends TaskStatusUpdateEvent(FAILED) as the final event
queue.enqueue(event)Sends any A2A event (construct manually when needed)

Standard A2A protocol event sequence:

java
queue.working(ctx)
    .thenCompose(v -> queue.artifact(ctx, "result", output))
    .thenCompose(v -> queue.completed(ctx))