-
Notifications
You must be signed in to change notification settings - Fork 19
/
subscriber.go
83 lines (70 loc) · 1.78 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package hub
import (
"sync"
)
type (
alertFunc func(missed int)
nonBlockingSubscriber struct {
ch chan Message
alert alertFunc
onceClose sync.Once
}
// blockingSubscriber uses an channel to receive events.
blockingSubscriber struct {
ch chan Message
onceClose sync.Once
}
)
// newNonBlockingSubscriber returns a new nonBlockingSubscriber
// this subscriber will never block when sending an message, if the capacity is full
// we will ignore the message and call the Alert function from the Alerter.
func newNonBlockingSubscriber(cap int, alerter alertFunc) *nonBlockingSubscriber {
if cap <= 0 {
cap = 10
}
return &nonBlockingSubscriber{
ch: make(chan Message, cap),
alert: alerter,
}
}
// Set inserts the given Event into the diode.
func (s *nonBlockingSubscriber) Set(msg Message) {
select {
case s.ch <- msg:
default:
s.alert(1)
}
}
// Ch return the channel used by subscriptions to consume messages.
func (s *nonBlockingSubscriber) Ch() <-chan Message {
return s.ch
}
// Close will close the internal channel and stop receiving messages.
func (s *nonBlockingSubscriber) Close() {
s.onceClose.Do(func() {
close(s.ch)
})
}
// newBlockingSubscriber returns a blocking subscriber using chanels imternally.
func newBlockingSubscriber(cap int) *blockingSubscriber {
if cap < 0 {
cap = 0
}
return &blockingSubscriber{
ch: make(chan Message, cap),
}
}
// Set will send the message using the channel.
func (s *blockingSubscriber) Set(msg Message) {
s.ch <- msg
}
// Ch return the channel used by subscriptions to consume messages.
func (s *blockingSubscriber) Ch() <-chan Message {
return s.ch
}
// Close will close the internal channel and stop receiving messages.
func (s *blockingSubscriber) Close() {
s.onceClose.Do(func() {
close(s.ch)
})
}