Skip to content

Commit

Permalink
close the internal state when Unsubscribe (#12)
Browse files Browse the repository at this point in the history
* close the internal state when Unsubscribe

* Added String representation for Fields
  • Loading branch information
leandro-lugaresi authored Jul 24, 2020
1 parent 3a69f30 commit 2e5af52
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 1 deletion.
1 change: 1 addition & 0 deletions hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (h *Hub) NonBlockingSubscribe(cap int, topics ...string) Subscription {
// Unsubscribe remove and close the Subscription.
func (h *Hub) Unsubscribe(sub Subscription) {
h.matcher.Unsubscribe(sub)
sub.subscriber.Close()
}

// Close will unsubscribe all the subscriptions and close them all.
Expand Down
9 changes: 8 additions & 1 deletion hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ func TestHub(t *testing.T) {

func TestNonBlockingSubscriberShouldAlertIfLoseMessages(t *testing.T) {
h := New()
h.NonBlockingSubscribe(10, "a.*.c")
sub := h.NonBlockingSubscribe(10, "a.*.c")

defer h.Unsubscribe(sub)

subsAlert := h.Subscribe(1, AlertTopic)
// send messages without a working subscriber
for i := 0; i < 11; i++ {
Expand Down Expand Up @@ -141,6 +144,10 @@ func TestWith(t *testing.T) {

msg = <-subs.Receiver
require.Equal(t, Fields{"msg": 4, "hub": "subH2", "something": 789}, msg.Fields)

h.Unsubscribe(subs)
v, ok := <-subs.Receiver
require.Falsef(t, ok, "Unsubscribe should close the internal channel, received fields %v", v)
}

func newMessageCounter(s Subscription) *messageCounter {
Expand Down
21 changes: 21 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package hub

import (
"fmt"
"sort"
"strings"
)

type (
// Fields is a [key]value storage for Messages values.
Fields map[string]interface{}
Expand All @@ -17,3 +23,18 @@ type (
func (m *Message) Topic() string {
return m.Name
}

func (f Fields) String() string {
if len(f) == 0 {
return "Fields(<empty>)"
}

fields := make([]string, 0, len(f))
for k, v := range f {
fields = append(fields, fmt.Sprintf("[%s]%v", k, v))
}

sort.Strings(fields)

return "Fields( " + strings.Join(fields, ", ") + " )"
}
23 changes: 23 additions & 0 deletions message_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package hub

import "testing"

func TestFields_String(t *testing.T) {
tests := []struct {
name string
f Fields
want string
}{
{"empty fields", Fields{}, "Fields(<empty>)"},
{"one field", Fields{"foo": "bar"}, "Fields( [foo]bar )"},
{"two fields", Fields{"foo": "bar", "baz": true}, "Fields( [baz]true, [foo]bar )"},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
if got := tt.f.String(); got != tt.want {
t.Errorf("Fields.String() = %v, want %v", got, tt.want)
}
})
}
}

0 comments on commit 2e5af52

Please sign in to comment.