-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathapp.go
83 lines (70 loc) · 1.48 KB
/
app.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package amqp
import "sync"
type MQApp struct {
Client *MQ
listenerMu sync.RWMutex
listener map[*Queue][]MQHandler
listenerRecord map[*Queue]bool
}
func NewApp(config *Config) (*MQApp, error) {
mq, err := New(config)
return &MQApp{
Client: mq,
listener: make(map[*Queue][]MQHandler),
listenerRecord: make(map[*Queue]bool),
}, err
}
// 监听队列触发函数
func (mq *MQApp) On(queue *Queue, handler ...MQHandler) {
mq.listenerMu.RLock()
handlers, ok := mq.listener[queue]
mq.listenerMu.RUnlock()
if !ok {
handlers = handler
} else {
handlers = append(handlers, handler...)
}
mq.listenerMu.Lock()
mq.listener[queue] = handlers
mq.listenerMu.Unlock()
mq.startListen(queue)
}
func (mq *MQApp) Route(routes map[*Queue]MQHandler) {
for q, handler := range routes {
mq.On(q, handler)
}
}
func (mq *MQApp) startListen(queue *Queue) {
_, ok := mq.listenerRecord[queue]
// 之前已经开始了
if ok {
return
}
// 开始监听
go func(queue *Queue) {
ch, err := mq.Client.Sub(queue)
if err != nil {
return
}
mq.listenerMu.Lock()
mq.listenerRecord[queue] = true
mq.listenerMu.Unlock()
for msg := range ch {
ctx := &MQContext{
Request: msg,
Client: mq.Client,
App: mq,
}
handlers := mq.listener[queue]
go func() {
// TODO defer error
for _, h := range handlers {
h(ctx)
}
}()
}
}(queue)
}
func (mq *MQApp) Pub(q *Queue, msg *Message) error {
return mq.Client.Pub(q, msg)
}