Skip to content

Commit c473ad7

Browse files
committed
Add pubsub package
1 parent dd6d233 commit c473ad7

File tree

2 files changed

+138
-0
lines changed

2 files changed

+138
-0
lines changed

pubsub/pubsub.go

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package pubsub
2+
3+
import (
4+
"context"
5+
"sync"
6+
)
7+
8+
type PubSub struct {
9+
Capacity int
10+
11+
mu sync.RWMutex
12+
ch []chan interface{}
13+
}
14+
15+
func NewPubSub() *PubSub {
16+
return &PubSub{
17+
Capacity: 10,
18+
mu: sync.RWMutex{},
19+
ch: []chan interface{}{},
20+
}
21+
}
22+
23+
func (p *PubSub) Publish(payload interface{}) {
24+
p.mu.RLock()
25+
defer p.mu.RUnlock()
26+
27+
for _, ch := range p.ch {
28+
ch <- payload
29+
}
30+
}
31+
32+
func (p *PubSub) Subscribe(ctx context.Context, f func(interface{})) <-chan bool {
33+
p.mu.Lock()
34+
defer p.mu.Unlock()
35+
36+
ch := make(chan interface{}, p.Capacity)
37+
p.ch = append(p.ch, ch)
38+
39+
sub := &Subscription{
40+
pubsub: p,
41+
f: f,
42+
ch: ch,
43+
}
44+
45+
waiter := make(chan bool)
46+
47+
go func() {
48+
L:
49+
for ctx.Err() == nil {
50+
select {
51+
case payload, ok := <-sub.ch:
52+
if ok {
53+
sub.f(payload)
54+
}
55+
case <-ctx.Done():
56+
sub.close()
57+
break L
58+
}
59+
}
60+
close(waiter)
61+
}()
62+
63+
return waiter
64+
}
65+
66+
type Subscription struct {
67+
pubsub *PubSub
68+
f func(interface{})
69+
ch chan interface{}
70+
}
71+
72+
func (s *Subscription) close() {
73+
s.pubsub.mu.Lock()
74+
defer s.pubsub.mu.Unlock()
75+
76+
for idx, ch := range s.pubsub.ch {
77+
if ch == s.ch {
78+
deleted := append(s.pubsub.ch[:idx], s.pubsub.ch[idx+1:]...)
79+
channels := make([]chan interface{}, len(deleted))
80+
copy(channels, deleted)
81+
s.pubsub.ch = channels
82+
}
83+
}
84+
85+
close(s.ch)
86+
87+
s.ch = nil
88+
}

pubsub/pubsub_test.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package pubsub
2+
3+
import (
4+
"context"
5+
"sync"
6+
"sync/atomic"
7+
"testing"
8+
"time"
9+
)
10+
11+
func TestPubSub(t *testing.T) {
12+
ctx, cancel := context.WithCancel(context.Background())
13+
defer cancel()
14+
15+
pubsub := NewPubSub()
16+
17+
wg := sync.WaitGroup{}
18+
19+
result1 := int32(0)
20+
pubsub.Subscribe(ctx, func(payload interface{}) {
21+
atomic.AddInt32(&result1, int32(payload.(int)))
22+
wg.Done()
23+
})
24+
result2 := int32(0)
25+
pubsub.Subscribe(ctx, func(payload interface{}) {
26+
atomic.AddInt32(&result2, int32(payload.(int)))
27+
wg.Done()
28+
})
29+
30+
wg.Add(4)
31+
pubsub.Publish(1)
32+
pubsub.Publish(2)
33+
34+
wg.Wait()
35+
36+
if result1 != 3 {
37+
t.Fatalf("invalid 1: %v", result1)
38+
}
39+
if result2 != 3 {
40+
t.Fatalf("invalid 2: %v", result2)
41+
}
42+
}
43+
44+
func TestPubSubUnsubscribe(t *testing.T) {
45+
pubsub := NewPubSub()
46+
47+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
48+
defer cancel()
49+
<-pubsub.Subscribe(ctx, func(payload interface{}) {})
50+
}

0 commit comments

Comments
 (0)