-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpubsub.go
197 lines (169 loc) · 5.17 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package pubsub
import (
"github.com/rs/zerolog/log"
"github.com/syntax-framework/chain/pkg"
"sync"
"time"
)
var logger = log.With().Str("package", "chain.pubsub").Logger()
// Adapter Specification to implement a custom PubSub adapter.
type Adapter interface {
Name() string // Returns the Adapter name
Broadcast(topic string, message any) error // Broadcasts the given topic and message to all nodes in the cluster (except the current node itself).
Subscribe(topic string) // The Adapter that has an external broker must subscribe to the given topic
Unsubscribe(topic string) // The Adapter that has an external broker must unsubscribe to the given topic
}
type AdapterConfig struct {
Adapter Adapter
Topics []string
}
type Dispatcher interface {
Dispatch(topic string, message any)
}
// subscription representa as subscrições que este server possui. See pubsub.Subscribe
type subscription struct {
dispatchers map[Dispatcher]int // incremental dispatcher subscriptions
}
// pubsub Realtime Publisher/Subscriber service.
type pubsub struct {
adapters *pkg.WildcardStore[Adapter]
subscriptions map[string]*subscription
unsubscribeTimers map[string]*time.Timer
unsubscribeMutex sync.Mutex
subscriptionsMutex sync.RWMutex
}
var p = &pubsub{
subscriptions: map[string]*subscription{},
unsubscribeTimers: map[string]*time.Timer{},
}
func init() {
SetAdapters([]AdapterConfig{{
Adapter: &LocalAdapter{},
Topics: []string{"*"},
}})
}
func Subscribe(topic string, dispatcher Dispatcher) {
p.subscriptionsMutex.Lock()
defer p.subscriptionsMutex.Unlock()
var sub *subscription
var exist bool
if sub, exist = p.subscriptions[topic]; !exist {
sub = &subscription{dispatchers: map[Dispatcher]int{}}
p.subscriptions[topic] = sub
go trySubscribe(topic)
}
if _, exist = sub.dispatchers[dispatcher]; !exist {
sub.dispatchers[dispatcher] = 0
}
sub.dispatchers[dispatcher] = sub.dispatchers[dispatcher] + 1
}
// Unsubscribe the dispatchFunc from the pubsub adapter's topic.
func Unsubscribe(topic string, dispatcher Dispatcher) {
p.subscriptionsMutex.Lock()
defer p.subscriptionsMutex.Unlock()
var sub *subscription
var exist bool
if sub, exist = p.subscriptions[topic]; !exist {
return
}
if _, exist = sub.dispatchers[dispatcher]; !exist {
return
}
sub.dispatchers[dispatcher] = sub.dispatchers[dispatcher] - 1
if sub.dispatchers[dispatcher] < 1 {
delete(sub.dispatchers, dispatcher)
go scheduleUnsubscribe(topic)
}
}
// Broadcast broadcasts ServiceMsg on given topic across the whole cluster.
//
// Para um dispatcher, ver service.Send
func Broadcast(topic string, message any) (err error) {
if adapter := GetAdapter(topic); adapter != nil {
if err = adapter.Broadcast(topic, message); err == nil {
// local dispatch
go dispatch(topic, message)
}
return
}
// log adapter not found
return
}
// LocalBroadcast broadcasts ServiceMsg on given topic only for the current node.
//
// `topic` - The topic to broadcast to, ie: `"users:123"`
// `message` - The payload of the broadcast
func LocalBroadcast(topic string, message any) {
dispatch(topic, message)
}
// DirectBroadcast Broadcasts ServiceMsg on given topic to a given node.
func DirectBroadcast(nodeName string, topic string, message any, dispatcher string) {
}
// SetAdapters configure the adapters topics
func SetAdapters(adapters []AdapterConfig) {
p.adapters = &pkg.WildcardStore[Adapter]{}
for _, config := range adapters {
for _, topic := range config.Topics {
if err := p.adapters.Insert(topic, config.Adapter); err != nil {
logger.Panic().Err(err).
Str("topic", topic).
Msg("invalid adapter config")
}
}
}
}
func GetAdapter(topic string) Adapter {
return p.adapters.Match(topic)
}
// trySubscribe subscribe the adapter on the given topic
func trySubscribe(topic string) {
p.unsubscribeMutex.Lock()
defer p.unsubscribeMutex.Unlock()
if timer, exist := p.unsubscribeTimers[topic]; exist {
delete(p.unsubscribeTimers, topic)
defer timer.Stop()
}
if adapter := GetAdapter(topic); adapter != nil {
adapter.Subscribe(topic)
}
}
// scheduleUnsubscribe unsubscribe the adapter after 15 seconds
func scheduleUnsubscribe(topic string) {
p.unsubscribeMutex.Lock()
if _, exist := p.unsubscribeTimers[topic]; exist {
p.unsubscribeMutex.Unlock()
return
}
timer := time.NewTimer(time.Second * 15)
p.unsubscribeTimers[topic] = timer
p.unsubscribeMutex.Unlock()
// wait
<-timer.C
p.unsubscribeMutex.Lock()
defer p.unsubscribeMutex.Unlock()
if _, exist := p.unsubscribeTimers[topic]; !exist {
// was removed by pubsub.trySubscribe
return
}
if adapter := GetAdapter(topic); adapter != nil {
adapter.Unsubscribe(topic)
}
}
func dispatch(topic string, message any) {
p.subscriptionsMutex.RLock()
var sub *subscription
var exist bool
if sub, exist = p.subscriptions[topic]; !exist {
p.subscriptionsMutex.RUnlock()
go scheduleUnsubscribe(topic)
return
}
var dispatchers []Dispatcher
for dispatchFunc, _ := range sub.dispatchers {
dispatchers = append(dispatchers, dispatchFunc)
}
p.subscriptionsMutex.RUnlock()
for _, dispatcher := range dispatchers {
dispatcher.Dispatch(topic, message)
}
}