Java SDK
安装
Maven:
xml
<dependency>
<groupId>io.mq9</groupId>
<artifactId>mq9</artifactId>
<version>0.1.0</version>
</dependency>Gradle:
groovy
implementation 'io.mq9:mq9:0.1.0'需要 Java 17+。
快速开始
java
import io.mq9.*;
import java.util.concurrent.CompletableFuture;
public class Example {
public static void main(String[] args) throws Exception {
Mq9Client client = Mq9Client.connect("nats://localhost:4222").get();
// 创建邮箱
String address = client.mailboxCreate("agent.inbox", 3600).get();
// 发送消息
long msgId = client.send(address, "hello world".getBytes(),
SendOptions.builder().build()).get();
System.out.println("sent: " + msgId);
// 消费消息
Consumer consumer = client.consume(address, msg -> {
System.out.println("received: " + new String(msg.payload));
return CompletableFuture.completedFuture(null);
}, ConsumeOptions.builder().groupName("workers").build()).get();
Thread.sleep(10000);
consumer.stop().get();
client.close();
}
}Mq9Client
java
// 静态工厂方法 — 返回 CompletableFuture<Mq9Client>
Mq9Client.connect(String server)
Mq9Client.connect(String server, ClientOptions options)java
ClientOptions options = ClientOptions.builder()
.requestTimeout(Duration.ofSeconds(10))
.build();
Mq9Client client = Mq9Client.connect("nats://localhost:4222", options).get();实现了 AutoCloseable:
java
try (Mq9Client client = Mq9Client.connect("nats://localhost:4222").get()) {
// ...
}邮箱
mailboxCreate
java
CompletableFuture<String> mailboxCreate(String name, long ttl)name = null— broker 自动生成地址。ttl = 0— 永不过期。
java
String address = client.mailboxCreate("agent.inbox", 3600).get();
String address = client.mailboxCreate(null, 7200).get(); // 自动生成消息收发
send
java
CompletableFuture<Long> send(String mailAddress, byte[] payload, SendOptions options)java
SendOptions options = SendOptions.builder()
.priority(Priority.URGENT)
.key("state") // 去重键
.delay(60L) // 延迟秒数
.ttl(300L) // 消息 TTL 秒数
.tags(List.of("billing", "vip"))
.build();java
// 普通发送
long msgId = client.send("agent.inbox", "hello".getBytes(),
SendOptions.builder().build()).get();
// 紧急优先级
long msgId = client.send("agent.inbox", "alert".getBytes(),
SendOptions.builder().priority(Priority.URGENT).build()).get();
// 去重键
long msgId = client.send("task.status", payload,
SendOptions.builder().key("state").build()).get();fetch
java
CompletableFuture<List<Message>> fetch(String mailAddress, FetchOptions options)java
FetchOptions options = FetchOptions.builder()
.groupName("workers") // 省略则为无状态
.deliver("earliest") // "latest"|"earliest"|"from_time"|"from_id"
.numMsgs(50)
.maxWaitMs(1000L)
.forceDeliver(false)
.build();java
// 无状态
List<Message> messages = client.fetch("task.inbox",
FetchOptions.builder().deliver("earliest").build()).get();
// 有状态
List<Message> messages = client.fetch("task.inbox",
FetchOptions.builder().groupName("workers").build()).get();
for (Message msg : messages) {
client.ack("task.inbox", "workers", msg.msgId).get();
}ack
java
CompletableFuture<Void> ack(String mailAddress, String groupName, long msgId)consume
java
CompletableFuture<Consumer> consume(
String mailAddress,
Function<Message, CompletableFuture<Void>> handler,
ConsumeOptions options
)java
ConsumeOptions options = ConsumeOptions.builder()
.groupName("workers")
.autoAck(true)
.numMsgs(10)
.errorHandler((msg, throwable) -> {
System.err.println("msg " + msg.msgId + " failed: " + throwable.getMessage());
})
.build();- handler 抛出异常或以异常完成 → 消息不会被 ACK,调用
errorHandler,循环继续。
java
Consumer consumer = client.consume("task.inbox", msg -> {
System.out.println(new String(msg.payload));
return CompletableFuture.completedFuture(null);
}, ConsumeOptions.builder().groupName("workers").autoAck(true).build()).get();
Thread.sleep(30000);
consumer.stop().get();
System.out.println("processed: " + consumer.getProcessedCount());query
java
CompletableFuture<List<Message>> query(String mailAddress, String key, Long limit, Long since)
// key=null、limit=null、since=null → 不传入请求delete
java
CompletableFuture<Void> delete(String mailAddress, long msgId)Agent 管理
agentRegister
java
CompletableFuture<Void> agentRegister(Map<String, Object> agentCard)
// agentCard 必须包含 "mailbox" 键agentUnregister
java
CompletableFuture<Void> agentUnregister(String mailbox)agentReport
java
CompletableFuture<Void> agentReport(Map<String, Object> report)agentDiscover
java
CompletableFuture<List<Map<String, Object>>> agentDiscover(
String text, String semantic, Integer limit, Integer page
)
// text=null、semantic=null → 省略;limit=null → 默认 20;page=null → 默认 1数据类型
Priority
java
public enum Priority {
NORMAL("normal"),
URGENT("urgent"),
CRITICAL("critical");
}Message
java
public class Message {
public final long msgId;
public final byte[] payload;
public final Priority priority;
public final long createTime; // Unix 时间戳(秒)
}Consumer
java
public class Consumer {
public boolean isRunning();
public long getProcessedCount();
public CompletableFuture<Void> stop();
}Mq9Error
java
// 非受检异常
public class Mq9Error extends RuntimeException {
public Mq9Error(String message) { super(message); }
}java
try {
client.mailboxCreate("agent.inbox", 3600).get();
} catch (ExecutionException e) {
if (e.getCause() instanceof Mq9Error err) {
System.err.println(err.getMessage());
}
}