Skip to content

Commit 1ef6e0c

Browse files
committed
refactor: cleanup code
1 parent c83f20a commit 1ef6e0c

File tree

2 files changed

+47
-35
lines changed

2 files changed

+47
-35
lines changed

eventstream/stream.go

+17-19
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,16 @@ func (b *EventsStream) RemoveSubscriber(sub Subscriber) {
9494

9595
// Broadcast notifies all subscribers of a given topic of a new message
9696
func (b *EventsStream) Broadcast(msg any, topics []string) {
97-
// broadcast message to all topics.
97+
b.mu.Lock()
98+
defer b.mu.Unlock()
99+
98100
for _, topic := range topics {
99101
for _, consumer := range b.topics[topic] {
100-
m := NewMessage(topic, msg)
101102
if !consumer.Active() {
102-
return
103+
continue
103104
}
104-
go (func(s Subscriber) {
105-
s.signal(m)
106-
})(consumer)
105+
m := NewMessage(topic, msg)
106+
go consumer.signal(m)
107107
}
108108
}
109109
}
@@ -136,33 +136,31 @@ func (b *EventsStream) Subscribe(sub Subscriber, topic string) {
136136

137137
// Unsubscribe removes a subscriber from a topic
138138
func (b *EventsStream) Unsubscribe(sub Subscriber, topic string) {
139-
// unsubscribe to given topic
140139
b.mu.Lock()
141140
defer b.mu.Unlock()
142141

143-
// only unsubscribe active subscriber
144-
if !sub.Active() {
145-
return
142+
if subscribers, ok := b.topics[topic]; ok {
143+
delete(subscribers, sub.ID())
144+
if len(subscribers) == 0 {
145+
delete(b.topics, topic)
146+
}
146147
}
147148

148-
delete(b.topics[topic], sub.ID())
149149
sub.unsubscribe(topic)
150150
}
151151

152152
// Publish publishes a message to a topic
153153
func (b *EventsStream) Publish(topic string, msg any) {
154-
// publish the message to given topic.
155154
b.mu.Lock()
156-
bTopics := b.topics[topic]
155+
subscribers := b.topics[topic]
157156
b.mu.Unlock()
158-
for _, consumer := range bTopics {
159-
m := NewMessage(topic, msg)
157+
158+
for _, consumer := range subscribers {
160159
if !consumer.Active() {
161-
return
160+
continue
162161
}
163-
go (func(s Subscriber) {
164-
s.signal(m)
165-
})(consumer)
162+
m := NewMessage(topic, msg)
163+
go consumer.signal(m)
166164
}
167165
}
168166

eventstream/stream_test.go

+30-16
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,7 @@ func TestStream(t *testing.T) {
5353
broker.Subscribe(cons, "t3")
5454
assert.Zero(t, broker.SubscribersCount("t3"))
5555

56-
t.Cleanup(func() {
57-
broker.Close()
58-
})
56+
broker.Close()
5957
})
6058
t.Run("With Unsubscription", func(t *testing.T) {
6159
broker := New()
@@ -66,12 +64,19 @@ func TestStream(t *testing.T) {
6664
broker.Subscribe(cons, "t1")
6765
broker.Subscribe(cons, "t2")
6866

69-
require.EqualValues(t, 1, broker.SubscribersCount("t1"))
67+
sub2 := broker.AddSubscriber()
68+
require.NotNil(t, sub2)
69+
broker.Subscribe(sub2, "t1")
70+
71+
require.EqualValues(t, 2, broker.SubscribersCount("t1"))
7072
require.EqualValues(t, 1, broker.SubscribersCount("t2"))
7173

74+
sub2.Shutdown()
75+
7276
// Unsubscribe the consumer
7377
broker.Unsubscribe(cons, "t1")
74-
assert.Zero(t, broker.SubscribersCount("t1"))
78+
broker.Unsubscribe(sub2, "t1")
79+
require.Zero(t, broker.SubscribersCount("t1"))
7580
require.EqualValues(t, 1, broker.SubscribersCount("t2"))
7681

7782
broker.Subscribe(cons, "t3")
@@ -82,9 +87,7 @@ func TestStream(t *testing.T) {
8287
broker.Subscribe(cons, "t4")
8388
assert.Zero(t, broker.SubscribersCount("t4"))
8489

85-
t.Cleanup(func() {
86-
broker.Close()
87-
})
90+
broker.Close()
8891
})
8992
t.Run("With Publication", func(t *testing.T) {
9093
broker := New()
@@ -95,25 +98,32 @@ func TestStream(t *testing.T) {
9598
broker.Subscribe(cons, "t1")
9699
broker.Subscribe(cons, "t2")
97100

98-
require.EqualValues(t, 1, broker.SubscribersCount("t1"))
101+
sub2 := broker.AddSubscriber()
102+
require.NotNil(t, sub2)
103+
broker.Subscribe(sub2, "t1")
104+
105+
require.EqualValues(t, 2, broker.SubscribersCount("t1"))
99106
require.EqualValues(t, 1, broker.SubscribersCount("t2"))
100107

108+
sub2.Shutdown()
109+
101110
broker.Publish("t1", "hi")
102111
broker.Publish("t2", "hello")
103112

104113
time.Sleep(time.Second)
105114

106115
var messages []*Message
107116
for message := range cons.Iterator() {
117+
require.NotNil(t, message)
118+
require.NotNil(t, message.Topic())
119+
require.NotNil(t, message.Payload())
108120
messages = append(messages, message)
109121
}
110122

111123
assert.Len(t, messages, 2)
112124
assert.Len(t, cons.Topics(), 2)
113125

114-
t.Cleanup(func() {
115-
broker.Close()
116-
})
126+
broker.Close()
117127
})
118128
t.Run("With Broadcast", func(t *testing.T) {
119129
broker := New()
@@ -124,9 +134,15 @@ func TestStream(t *testing.T) {
124134
broker.Subscribe(cons, "t1")
125135
broker.Subscribe(cons, "t2")
126136

127-
require.EqualValues(t, 1, broker.SubscribersCount("t1"))
137+
sub2 := broker.AddSubscriber()
138+
require.NotNil(t, sub2)
139+
broker.Subscribe(sub2, "t1")
140+
141+
require.EqualValues(t, 2, broker.SubscribersCount("t1"))
128142
require.EqualValues(t, 1, broker.SubscribersCount("t2"))
129143

144+
sub2.Shutdown()
145+
130146
broker.Broadcast("hi", []string{"t1", "t2"})
131147

132148
time.Sleep(time.Second)
@@ -139,8 +155,6 @@ func TestStream(t *testing.T) {
139155
assert.Len(t, messages, 2)
140156
assert.Len(t, cons.Topics(), 2)
141157

142-
t.Cleanup(func() {
143-
broker.Close()
144-
})
158+
broker.Close()
145159
})
146160
}

0 commit comments

Comments
 (0)