Go SDK
Install
bash
go get github.com/robustmq/mq9/goRequires Go 1.21+.
Quick start
go
package main
import (
"context"
"fmt"
"log"
"time"
mq9 "github.com/robustmq/mq9/go"
)
func main() {
client, err := mq9.Connect("nats://localhost:4222")
if err != nil {
log.Fatal(err)
}
defer client.Close()
ctx := context.Background()
// Create a mailbox
address, err := client.MailboxCreate(ctx, "agent.inbox", 3600)
if err != nil {
log.Fatal(err)
}
// Send a message
msgId, err := client.Send(ctx, address, []byte(`{"task":"analyze"}`), mq9.SendOptions{})
if err != nil {
log.Fatal(err)
}
fmt.Println("sent:", msgId)
// Consume messages
consumer, err := client.Consume(ctx, address, func(msg mq9.Message) error {
fmt.Printf("received: %s\n", msg.Payload)
return nil
}, mq9.ConsumeOptions{GroupName: "workers", AutoAck: true})
if err != nil {
log.Fatal(err)
}
time.Sleep(10 * time.Second)
consumer.Stop()
}Connect
go
client, err := mq9.Connect(server string, opts ...mq9.Option) (*mq9.Client, error)Options:
go
mq9.WithRequestTimeout(5 * time.Second)
mq9.WithReconnectDelay(2 * time.Second)go
client, err := mq9.Connect("nats://localhost:4222",
mq9.WithRequestTimeout(10*time.Second),
)Close
go
client.Close() errorMailbox
MailboxCreate
go
client.MailboxCreate(ctx context.Context, name string, ttl int64) (string, error)name = ""— broker auto-generates the address.ttl = 0— never expires.
go
address, err := client.MailboxCreate(ctx, "agent.inbox", 3600)
address, err := client.MailboxCreate(ctx, "", 7200) // auto-generatedMessaging
Send
go
client.Send(ctx context.Context, mailAddress string, payload []byte, opts mq9.SendOptions) (int64, error)go
type SendOptions struct {
Priority Priority // default PriorityNormal
Key string // dedup key; empty = no dedup
Delay int64 // seconds; 0 = no delay
TTL int64 // message-level TTL; 0 = no TTL
Tags []string
}go
// Normal send
msgId, err := client.Send(ctx, "agent.inbox", []byte(`{"task":"analyze"}`), mq9.SendOptions{})
// Urgent priority
msgId, err := client.Send(ctx, "agent.inbox", []byte("alert"), mq9.SendOptions{
Priority: mq9.PriorityUrgent,
})
// Dedup key
msgId, err := client.Send(ctx, "task.status", payload, mq9.SendOptions{Key: "state"})
// Delayed delivery
msgId, err := client.Send(ctx, "agent.inbox", payload, mq9.SendOptions{Delay: 60})Fetch
go
client.Fetch(ctx context.Context, mailAddress string, opts mq9.FetchOptions) ([]mq9.Message, error)go
type FetchOptions struct {
GroupName string // omit for stateless
Deliver string // "latest"|"earliest"|"from_time"|"from_id"; default "latest"
FromTime int64 // unix timestamp
FromID int64
ForceDeliver bool
NumMsgs int // default 100
MaxWaitMs int64 // default 500
}go
// Stateless
messages, err := client.Fetch(ctx, "task.inbox", mq9.FetchOptions{Deliver: "earliest"})
// Stateful
messages, err := client.Fetch(ctx, "task.inbox", mq9.FetchOptions{GroupName: "workers"})
for _, msg := range messages {
client.Ack(ctx, "task.inbox", "workers", msg.MsgID)
}Ack
go
client.Ack(ctx context.Context, mailAddress string, groupName string, msgID int64) errorConsume
go
client.Consume(
ctx context.Context,
mailAddress string,
handler func(mq9.Message) error,
opts mq9.ConsumeOptions,
) (*mq9.Consumer, error)go
type ConsumeOptions struct {
GroupName string
Deliver string
NumMsgs int
MaxWaitMs int64
AutoAck bool
ErrorHandler func(msg Message, err error)
}- Handler returns non-nil error → message not ACKed,
ErrorHandlercalled, loop continues.
go
consumer, err := client.Consume(ctx, "task.inbox", func(msg mq9.Message) error {
fmt.Println(string(msg.Payload))
return nil
}, mq9.ConsumeOptions{
GroupName: "workers",
AutoAck: true,
ErrorHandler: func(msg mq9.Message, err error) {
log.Printf("msg %d failed: %v", msg.MsgID, err)
},
})
time.Sleep(30 * time.Second)
consumer.Stop()
fmt.Println(consumer.ProcessedCount())Query
go
client.Query(ctx context.Context, mailAddress string, key string, limit int64, since int64) ([]mq9.Message, error)
// key="", limit=0, since=0 → omitted from requestDelete
go
client.Delete(ctx context.Context, mailAddress string, msgID int64) errorAgent management
AgentRegister
go
client.AgentRegister(ctx context.Context, agentCard map[string]any) error
// agentCard must contain "mailbox" keyAgentUnregister
go
client.AgentUnregister(ctx context.Context, mailbox string) errorAgentReport
go
client.AgentReport(ctx context.Context, report map[string]any) errorAgentDiscover
go
client.AgentDiscover(ctx context.Context, text string, semantic string, limit int, page int) ([]map[string]any, error)
// text="", semantic="" → omitted; limit=0 → default 20; page=0 → default 1Data types
Priority
go
type Priority string
const (
PriorityNormal Priority = "normal"
PriorityUrgent Priority = "urgent"
PriorityCritical Priority = "critical"
)Message
go
type Message struct {
MsgID int64
Payload []byte
Priority Priority
CreateTime int64 // unix timestamp (seconds)
}Consumer
go
func (c *Consumer) IsRunning() bool
func (c *Consumer) ProcessedCount() int64
func (c *Consumer) Stop() // blocks until the loop exitsMq9Error
go
type Mq9Error struct {
Message string
}
func (e *Mq9Error) Error() string