A high performance NATS client router that scales well
Trie management derived from Julien Schmidt's httprouter, but adapted to the notation of the NATS topics
go get github.com/mondora/natsrouter/v2
Basic complete example
package main
import (
"log"
"time"
"github.com/mondora/natsrouter/v2"
)
type Config struct {
nMux *natsrouter.Router
// ...
}
// NatsMsgFake simil to nats.Msg
type NatsMsgFake struct {
Data []byte
Subject string
Sub *struct {
Subject string
}
}
type SubMsg struct {
msg *NatsMsgFake
sub string
}
func (sm *SubMsg) GetMsg() interface{} {
return sm.msg
}
func (sm *SubMsg) GetSubject() string {
return sm.sub
}
func NewSubjectMsg(natsMsg *NatsMsgFake /*nats.Msg*/) natsrouter.SubjectMsg {
subMsg := &SubMsg{
msg: natsMsg,
sub: natsMsg.Subject,
}
return subMsg
}
type Pipeline struct {
cfg *Config
msg *NatsMsgFake
}
func NewListenerPipeline(cfg *Config, msg *NatsMsgFake) *Pipeline {
return &Pipeline{
cfg: cfg,
msg: msg,
}
}
func (p *Pipeline) processMessage(action string) {
log.Printf("action: %s - path: %s - data: %s\n", action, p.msg.Subject, string(p.msg.Data))
}
func (p *Pipeline) processDefault() {
log.Printf("unmanaged path: %s\n", p.msg.Sub.Subject)
}
// ...
func (cfg *Config) SubscribeListener() {
// "input.:guid.v1.ping.>" OR "input.*.v1.ping.>"
cfg.nMux.Handle("input.*.v1.ping.>", 1, func(msg natsrouter.SubjectMsg, ps natsrouter.Params, message interface{}) {
m := message.(*Pipeline)
m.processMessage("PING")
})
// "input.:guid.v1.msg.>" (OR "input.*.v1.msg.>")
cfg.nMux.Handle("input.*.v1.msg.>", 1, func(msg natsrouter.SubjectMsg, ps natsrouter.Params, message interface{}) {
m := message.(*Pipeline)
m.processMessage("MSG")
})
// Default. Rank 2 avoid collision with other valid subject ("input.*.v1.ping.>" or "input.*.v1.msg.>")
cfg.nMux.Handle("input.*.v1.>", 2, func(msg natsrouter.SubjectMsg, ps natsrouter.Params, message interface{}) {
m := message.(*Pipeline)
m.processDefault()
})
// ...
// queue subscribe subject must be a larger than the subjects related to the various Handlers
// es. "input.*.v1.>" is wider than "input.*.v1.msg.>"
// TODO enable QueueSubscribe with real NATS connection
// _, err := natsCli.QueueSubscribe("input.*.v1.>", queueName, cfg.listenerHandler)
// ...
}
func (cfg *Config) listenerHandler(msg *NatsMsgFake) {
message := NewListenerPipeline(cfg, msg)
// manages incoming NATS message, scanning binary tree for all defined rank
subMsg := NewSubjectMsg(msg)
err := cfg.nMux.ServeNATSWithPayload(subMsg, message)
if err != nil {
// 404 Not Found
log.Println("404 Not Found")
}
}
func main() {
cfg := &Config{
nMux: natsrouter.New(),
}
cfg.SubscribeListener()
// inject msg (simulate NATS incoming message)
cfg.listenerHandler(&NatsMsgFake{
Subject: "input.TEST.v1.msg.test_action",
Data: []byte("TEST DATA"),
})
time.Sleep(1 * time.Second)
log.Println("DONE.")
}