Skip to content

Commit

Permalink
chore: maintenance changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Feb 17, 2025
1 parent e966047 commit 130fe71
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 23 deletions.
6 changes: 4 additions & 2 deletions discovery/nats/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,15 @@ func (d *Discovery) Initialize() error {
// create a new instance of retrier that will try a maximum of five times, with
// an initial delay of 100 ms and a maximum delay of opts.ReconnectWait
retrier := retry.NewRetrier(d.config.MaxJoinAttempts, 100*time.Millisecond, opts.ReconnectWait)
err = retrier.Run(func() error {
if err = retrier.Run(func() error {
connection, err = opts.Connect()
if err != nil {
return err
}
return nil
})
}); err != nil {
return err
}

// create the NATs connection
d.connection = connection
Expand Down
37 changes: 16 additions & 21 deletions internal/eventstream/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sync"

"github.com/google/uuid"
"go.uber.org/atomic"

"github.com/tochemey/goakt/v3/internal/collection/queue"
)
Expand Down Expand Up @@ -55,7 +56,7 @@ type subscriber struct {
// topics define the topic the subscriber subscribed to
topics map[string]bool
// states whether the given subscriber is active or not
active bool
active *atomic.Bool
}

var _ Subscriber = &subscriber{}
Expand All @@ -72,7 +73,7 @@ func newSubscriber() *subscriber {
sem: sync.Mutex{},
messages: queue.NewQueue(),
topics: make(map[string]bool),
active: true,
active: atomic.NewBool(true),
}
}

Expand All @@ -87,11 +88,7 @@ func (x *subscriber) ID() string {

// Active checks whether the consumer is active
func (x *subscriber) Active() bool {
// acquire the lock
x.sem.Lock()
// release the lock once done
defer x.sem.Unlock()
return x.active
return x.active.Load()
}

// Topics returns the list of topics the consumer has subscribed to
Expand All @@ -109,23 +106,21 @@ func (x *subscriber) Topics() []string {

// Shutdown shutdowns the consumer
func (x *subscriber) Shutdown() {
// acquire the lock
x.sem.Lock()
// release the lock once done
defer x.sem.Unlock()
x.active = false
x.active.Store(false)
}

func (x *subscriber) Iterator() chan *Message {
out := make(chan *Message, x.messages.Length())
defer close(out)
for {
msg := x.messages.Dequeue()
if msg == nil {
break
out := make(chan *Message)
go func() {
defer close(out)
for x.active.Load() && x.messages.Length() > 0 {
msg := x.messages.Dequeue()
if msg == nil {
break
}
out <- msg.(*Message)
}
out <- msg.(*Message)
}
}()
return out
}

Expand All @@ -136,7 +131,7 @@ func (x *subscriber) signal(message *Message) {
// release the lock once done
defer x.sem.Unlock()
// only receive message when active
if x.active {
if x.active.Load() {
x.messages.Enqueue(message)
}
}
Expand Down

0 comments on commit 130fe71

Please sign in to comment.