Skip to content

Commit

Permalink
Feat/nats multi table support (#133)
Browse files Browse the repository at this point in the history
* add nats multi table sink support using NATS header

* add multiple nats split-to-table supports

* fix gcs test

* add nats test config file

---------

Co-authored-by: JeffreyLean <[email protected]>
  • Loading branch information
jeffreylean and JeffreyLean authored Aug 1, 2023
1 parent 63b422f commit 2a864b1
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 191 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ RUN mkdir -p /go/src/talaria
COPY . src/talaria
RUN cd src/talaria && go build . && test -x talaria

FROM debian:latest AS base
FROM debian:bullseye AS base
ARG [email protected]
LABEL maintainer=${MAINTAINER}

Expand Down
14 changes: 9 additions & 5 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,15 @@ type S3SQS struct {

// NATS represents NATS consumer configuration
type NATS struct {
Subject string `json:"subject" yaml:"subject" env:"SUBJECT"`
Host string `json:"host" yaml:"host" env:"HOST"`
Port int32 `json:"port" yaml:"port" env:"PORT"`
Stream string `json:"stream" yaml:"stream" env:"STREAM"`
Queue string `json:"queue" yaml:"queue" env:"QUEUE"`
Host string `json:"host" yaml:"host" env:"HOST"`
Port int32 `json:"port" yaml:"port" env:"PORT"`
Split []SplitWriter `json:"split" yaml:"split" env:"SPLIT"`
}

type SplitWriter struct {
Subject string `json:"subject" yaml:"subject" env:"SUBJECT"`
Table string `json:"table" yaml:"table" env:"TABLE"`
QueueGroup string `json:"queueGroup" yaml:"queueGroup" env:"QUEUE_GROUP"`
}

// Presto represents the Presto configuration
Expand Down
67 changes: 67 additions & 0 deletions internal/ingress/nats/jetstream/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package jetstream

import (
"fmt"
"log"

"github.com/kelindar/talaria/internal/config"
"github.com/kelindar/talaria/internal/monitor"
"github.com/nats-io/nats.go"
)

type JSClient interface {
Subscribe(subj string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
QueueSubscribe(subj, queue string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
Publish(subj string, data []byte, opts ...nats.PubOpt) (*nats.PubAck, error)
// PublishMsg publishes a Msg to JetStream.
PublishMsg(m *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error)
// AddStream creates a stream.
AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error)
// DeleteStream deletes a stream.
DeleteStream(name string, opts ...nats.JSOpt) error
}

type NatsClient interface {
Close()
}

type Client struct {
Context JSClient
Server NatsClient
}

// Create new jetstream client.
func New(conf *config.NATS, monitor monitor.Monitor) (*Client, error) {
nc, err := nats.Connect(fmt.Sprintf("%s:%d", conf.Host, conf.Port), nats.ReconnectHandler(func(_ *nats.Conn) {
log.Println("Successfully renonnect")
}), nats.ClosedHandler(func(nc *nats.Conn) {
log.Printf("Connection close due to %q", nc.LastError())
monitor.Error(nc.LastError())
}), nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Printf("Got disconnected. Reason: %q\n", nc.LastError())
monitor.Error(nc.LastError())
}), nats.ErrorHandler(natsErrHandler))
if err != nil {
return nil, err
}

js, err := nc.JetStream()
if err != nil {
return nil, err
}
client := &Client{js, nc}

return client, nil
}

func natsErrHandler(nc *nats.Conn, sub *nats.Subscription, natsErr error) {
if natsErr == nats.ErrSlowConsumer {
pendingMsgs, _, err := sub.Pending()
if err != nil {
log.Printf("couldn't get pending messages: %v", err)
return
}
log.Printf("Falling behind with %d pending messages on subject %q.\n",
pendingMsgs, sub.Subject)
}
}
188 changes: 67 additions & 121 deletions internal/ingress/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,181 +4,127 @@ import (
"context"
"encoding/json"
"fmt"
"log"

"github.com/kelindar/talaria/internal/config"
"github.com/kelindar/talaria/internal/ingress/nats/jetstream"
"github.com/kelindar/talaria/internal/monitor"
"github.com/kelindar/talaria/internal/monitor/errors"
"github.com/nats-io/nats.go"
)

const (
ctxTag = "NATS"
ctxTag = "NATS"
tableNameHeader = "table"
)

type Ingress struct {
// jetstream exposed interface.
jetstream JetstreamI
monitor monitor.Monitor
conn *nats.Conn
queue chan *nats.Msg
cancel context.CancelFunc
JSClient jetstream.Client
monitor monitor.Monitor
cancel context.CancelFunc
split []splitWriter
}

type jetstream struct {
// The name of the queue group.
queue string // The name of subject listening to.
subject string
// The jetstream context which provide jetstream api.
jsContext nats.JetStreamContext
}
type Event map[string]interface{}

type JetstreamI interface {
// Subscribe to defined subject from Nats server.
Subscribe(handler nats.MsgHandler) (*nats.Subscription, error)
Publish(msg []byte) (nats.PubAckFuture, error)
type splitWriter struct {
subject string
table string
queueGroup string
queue chan *nats.Msg
}

type Event map[string]interface{}

// New create new ingestion from nats jetstream to sinks.
func New(conf *config.NATS, monitor monitor.Monitor) (*Ingress, error) {
nc, err := nats.Connect(fmt.Sprintf("%s:%d", conf.Host, conf.Port), nats.ReconnectHandler(func(_ *nats.Conn) {
log.Println("Successfully renonnect")
}), nats.ClosedHandler(func(nc *nats.Conn) {
log.Printf("Connection close due to %q", nc.LastError())
monitor.Error(nc.LastError())
}), nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Printf("Got disconnected. Reason: %q\n", nc.LastError())
monitor.Error(nc.LastError())
}), nats.ErrorHandler(natsErrHandler))
jsClient, err := jetstream.New(conf, monitor)
if err != nil {
return nil, err
}

js, err := NewJetStream(conf.Subject, conf.Queue, nc)
if err != nil {
return nil, err
split := make([]splitWriter, len(conf.Split))
for i, s := range conf.Split {
split[i] = splitWriter{subject: s.Subject, table: s.Table, queue: make(chan *nats.Msg, 100), queueGroup: s.QueueGroup}
}

return &Ingress{
jetstream: js,
monitor: monitor,
conn: nc,
queue: make(chan *nats.Msg, 100),
}, nil
}

func natsErrHandler(nc *nats.Conn, sub *nats.Subscription, natsErr error) {
log.Printf("error: %v\n", natsErr)
if natsErr == nats.ErrSlowConsumer {
pendingMsgs, _, err := sub.Pending()
if err != nil {
log.Printf("couldn't get pending messages: %v", err)
return
}
log.Printf("Falling behind with %d pending messages on subject %q.\n",
pendingMsgs, sub.Subject)
}
}

// NewJetStream create Jetstream context
func NewJetStream(subject, queue string, nc *nats.Conn) (*jetstream, error) {
js, err := nc.JetStream()
if err != nil {
return nil, err
}
return &jetstream{
subject: subject,
queue: queue,
jsContext: js,
JSClient: *jsClient,
monitor: monitor,
split: split,
}, nil
}

// Subscribe to a subject in nats server
func (s *jetstream) Subscribe(handler nats.MsgHandler) (*nats.Subscription, error) {
// Queuesubscribe automatically create ephemeral push based consumer with queue group defined.
sb, err := s.jsContext.QueueSubscribe(s.subject, s.queue, handler)
if err != nil {
return nil, err
}
// set higher pending limits
sb.SetPendingLimits(65536, (1<<18)*1024)
_, b, _ := sb.PendingLimits()
log.Println("nats: maximum pending limits (bytes): ", b)
return sb, nil
}

// Publish message to the subject in nats server
func (s *jetstream) Publish(msg []byte) (nats.PubAckFuture, error) {
p, err := s.jsContext.PublishAsync(s.subject, msg)
if err != nil {
return nil, err
}
return p, nil
}

// SubsribeHandler subscribes to specific subject and unmarshal the message into talaria's event type.
// The event message then will be used as the input of the handler function defined.
func (i *Ingress) SubsribeHandler(handler func(b []map[string]interface{})) error {
_, err := i.jetstream.Subscribe(func(msg *nats.Msg) {
block := make([]map[string]interface{}, 0)
if err := json.Unmarshal(msg.Data, &block); err != nil {
i.monitor.Error(errors.Internal("nats: unable to unmarshal", err))
func (i *Ingress) SubsribeHandler(handler func(b []map[string]interface{}, table string)) error {
for _, s := range i.split {
// Queuesubscribe automatically create ephemeral push based consumer with queue group defined.
sb, err := i.JSClient.Context.QueueSubscribe(s.subject, s.queueGroup, func(msg *nats.Msg) {
block := make([]map[string]interface{}, 0)
if err := json.Unmarshal(msg.Data, &block); err != nil {
i.monitor.Error(errors.Internal("nats: unable to unmarshal", err))
}
i.monitor.Count1(ctxTag, "NATS.subscribe.count")
handler(block, s.table)
})
if err != nil {
i.monitor.Error(err)
}
i.monitor.Count1(ctxTag, "NATS.subscribe.count")
handler(block)
})
if err != nil {
return err
// set higher pending limits
sb.SetPendingLimits(65536, (1<<18)*1024)
i.monitor.Info(fmt.Sprintf("%s->%s split created", s.subject, s.table))
}
return nil
}

// SubscribeHandlerWithPool process the message concurrently using goroutine pool.
// The message will be asynchornously executed to reduce the message process time to avoid being slow consumer.
func (i *Ingress) SubsribeHandlerWithPool(ctx context.Context, handler func(b []map[string]interface{})) error {
func (i *Ingress) SubsribeHandlerWithPool(ctx context.Context, handler func(b []map[string]interface{}, split string)) error {
// Initialze pool
ctx, cancel := context.WithCancel(ctx)
i.cancel = cancel

i.initializeMemoryPool(ctx, handler)
_, err := i.jetstream.Subscribe(func(msg *nats.Msg) {
// Send the message to the queue
i.queue <- msg
})
if err != nil {
return err
for _, s := range i.split {
queue := s.queue
_, err := i.JSClient.Context.QueueSubscribe(s.subject, s.queueGroup, func(msg *nats.Msg) {
queue <- msg
})
if err != nil {
i.monitor.Error(fmt.Errorf("nats: queue subscribe error %v", err))
continue
}
i.monitor.Info(fmt.Sprintf("%s->%s split created", s.subject, s.table))
}
return nil
}

// Initialze memory pool for fixed number of goroutine to process the message
func (i *Ingress) initializeMemoryPool(ctx context.Context, handler func(b []map[string]interface{})) {
for n := 0; n < 100; n++ {
go func(n int, ctx context.Context, queue chan *nats.Msg) {
for {
select {
case <-ctx.Done():
return
case msg := <-queue:
// Wait for the message
block := make([]map[string]interface{}, 0)
if err := json.Unmarshal(msg.Data, &block); err != nil {
i.monitor.Error(errors.Internal("nats: unable to unmarshal", err))
func (i *Ingress) initializeMemoryPool(ctx context.Context, handler func(b []map[string]interface{}, split string)) {
for _, s := range i.split {
for n := 0; n < 100; n++ {
go func(n int, ctx context.Context, queue chan *nats.Msg, table string, sub string) {
for {
select {
case <-ctx.Done():
return
case msg := <-queue:
// Wait for the message
block := make([]map[string]interface{}, 0)
if err := json.Unmarshal(msg.Data, &block); err != nil {
i.monitor.Error(errors.Internal("nats: unable to unmarshal", err))
}
i.monitor.Count1(ctxTag, "NATS.msg.count")
// asynchornously execute handler to reduce the message process time to avoid being slow consumer.
handler(block, table)
}
i.monitor.Count1(ctxTag, "NATS.subscribe.count")
// asynchornously execute handler to reduce the message process time to avoid being slow consumer.
handler(block)
}
}
}(n, ctx, i.queue)
}(n, ctx, s.queue, s.table, s.subject)
}
}
}

// Close ingress
func (i *Ingress) Close() {
i.conn.Close()
i.JSClient.Server.Close()
i.cancel()
return
}
Loading

0 comments on commit 2a864b1

Please sign in to comment.