-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbus.go
234 lines (204 loc) · 6.12 KB
/
bus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
package bus
import (
"context"
"errors"
"fmt"
"reflect"
"sync"
)
// Bus exposes the Subscriber and Publisher and is the main interface used to interact with the message bus.
//
// Define a new message bus like so
//
// msgBus := bus.New()
//
// Subscribe to a message
//
// msgBus.Subscribe(func(ctx context.Context, message Message) error {
// message.Result = "1234"
// return nil
// })
//
// Then publish a message to the bus
//
// msgBus.Publish(context.Background(), Message{Content: "hello"})
type Bus interface {
Subscriber
Publisher
}
// Subscriber listens to events published to the bus. Use Subscribe to listen to events synchronously and
// SubscribeAsync to listen to events asynchronously. The Must methods simplify subscription but panic internally
// if there are no subscribers, therefore these methods should only be used for defining static relationships, not
// dynamic relationships defined at runtime.
// The Message type must match the handler subscriber type. Pointer and
// non-pointer messages are considered as separate types - internally subscribers are keyed using the message type which
// includes a pointer symbol in the lookup key.
type Subscriber interface {
// Subscribe is used to listen to events synchronously
Subscribe(fn interface{}) error
// MustSubscribe is used to listen to events synchronously. This method simplifies subscription but panics internally
// if there are no subscribers. It is recommended to only use this for defining static relationships rather than
// dynamic relationships defined at runtime
MustSubscribe(fn interface{})
// SubscribeAsync is used to listen to events asynchronously. Subscribers are run in a separate go routine and data
// is passed into the subscriber via a channel
SubscribeAsync(fn interface{}) error
// MustSubscribeAsync is used to listen to events asynchronously. This method simplifies subscription but panics internally
// if there are no subscribers. It is recommended to only use this for defining static relationships rather than
// dynamic relationships defined at runtime
MustSubscribeAsync(fn interface{})
}
// Publisher publishes an event to the bus. The Message type must match the handler subscriber type. Pointer and
//// non-pointer messages are considered as separate types - internally subscribers are keyed using the message type which
//// includes a pointer symbol in the lookup key.
type Publisher interface {
Publish(ctx context.Context, msg Message) error
}
// ErrHandlerNotFound is returned when publishing an event that does not have any subscribers
var ErrHandlerNotFound = errors.New("handler not found")
// Message the data that is published. The implementing type is used as the handler key
type Message interface{}
// New create a new message bus.
func New(queueSize ...int) Bus {
handlers := newHandlers()
size := defaultAsyncHandlerQueueSize
if len(queueSize) > 0 {
size = queueSize[0]
}
return &eventBus{
handlers: handlers,
queueSize: size,
}
}
type eventBus struct {
handlers *handlers
queueSize int
}
type handler struct {
Handler reflect.Value
isAsync bool
queue chan []reflect.Value
}
func (e *eventBus) Subscribe(fn interface{}) error {
return e.subscribe(fn, false)
}
func (e *eventBus) MustSubscribe(fn interface{}) {
if err := e.subscribe(fn, false); err != nil {
panic(err)
}
}
func (e *eventBus) SubscribeAsync(fn interface{}) error {
return e.subscribe(fn, true)
}
func (e *eventBus) MustSubscribeAsync(fn interface{}) {
if err := e.subscribe(fn, true); err != nil {
panic(err)
}
}
func (e *eventBus) subscribe(fn interface{}, isAsync bool) error {
if err := validateHandler(fn); err != nil {
return err
}
handlerArgTypeName := reflect.TypeOf(fn).In(1).String()
handler := handler{
Handler: reflect.ValueOf(fn),
isAsync: isAsync,
}
if isAsync {
handler.queue = make(chan []reflect.Value, defaultAsyncHandlerQueueSize)
go func() {
for params := range handler.queue {
handler.Handler.Call(params)
}
}()
}
e.handlers.Add(handlerArgTypeName, handler)
return nil
}
func (e *eventBus) Publish(ctx context.Context, msg Message) error {
msgTypeName := reflect.TypeOf(msg).String()
_, ok := e.handlers.Get(msgTypeName)
if !ok {
return ErrHandlerNotFound
}
var params = []reflect.Value{}
params = append(params, reflect.ValueOf(ctx))
params = append(params, reflect.ValueOf(msg))
// dispatch async handlers first
for messageHandlers := range e.handlers.Iter() {
if messageHandlers.Key == msgTypeName {
for _, handler := range messageHandlers.Value {
if handler.isAsync {
handler.queue <- params
}
}
}
}
// handle sync handlers. If a handler errors we end the chain
for messageHandlers := range e.handlers.Iter() {
if messageHandlers.Key == msgTypeName {
for _, handler := range messageHandlers.Value {
isSync := !handler.isAsync
if isSync {
result := handler.Handler.Call(params)
if err := result[0].Interface(); err != nil {
return err.(error)
}
}
}
}
}
return nil
}
func validateHandler(fn interface{}) error {
typeOf := reflect.TypeOf(fn)
if typeOf.Kind() != reflect.Func {
return fmt.Errorf("'%s' is not a function", typeOf)
}
if typeOf.NumIn() < 2 {
return errors.New("invalid number of handler arguments. Must be context.Context followed by a struct")
}
if typeOf.In(0).String() != "context.Context" {
return errors.New("first argument must be context.Context")
}
return nil
}
type handlers struct {
sync.RWMutex
items map[string][]handler
}
type handlerItem struct {
Key string
Value []handler
}
func newHandlers() *handlers {
cm := &handlers{
items: make(map[string][]handler),
}
return cm
}
func (cm *handlers) Add(key string, value handler) {
cm.Lock()
defer cm.Unlock()
cm.items[key] = append(cm.items[key], value)
}
func (cm *handlers) Get(key string) ([]handler, bool) {
cm.Lock()
defer cm.Unlock()
value, ok := cm.items[key]
return value, ok
}
func (cm *handlers) Iter() <-chan handlerItem {
c := make(chan handlerItem)
f := func() {
cm.Lock()
defer cm.Unlock()
for k, v := range cm.items {
c <- handlerItem{k, v}
}
close(c)
}
go f()
return c
}
const defaultAsyncHandlerQueueSize = 1000