-
Notifications
You must be signed in to change notification settings - Fork 3
/
forwarder.go
86 lines (76 loc) · 1.73 KB
/
forwarder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package ayu
import (
"context"
"sync"
)
type forwarder struct {
pubsubs *redisPubSubManager
bufs map[RoomID]*roomMessageBuffer
mu sync.RWMutex
logger Logger
}
func newForwarder(pubsubs *redisPubSubManager, logger Logger) *forwarder {
return &forwarder{
pubsubs: pubsubs,
bufs: map[RoomID]*roomMessageBuffer{},
mu: sync.RWMutex{},
logger: logger,
}
}
func (f *forwarder) Forward(ctx context.Context, roomID RoomID, rm *roomMessage, otherClientExists bool) error {
if !otherClientExists {
if rm.Type == roomMessageTypeForward {
f.getBuf(roomID).Add(rm)
f.logger.Debugf("buffered message (room: %s): %+v", roomID, rm)
}
return nil
}
for _, rm := range append(f.getBuf(roomID).Flush(), rm) {
if err := f.pubsubs.Publish(ctx, roomID, rm); err != nil {
return err
}
}
return nil
}
func (f *forwarder) ForwardBuffered(ctx context.Context, roomID RoomID, otherClientExists bool) error {
if !otherClientExists {
return nil
}
for _, rm := range f.getBuf(roomID).Flush() {
if err := f.pubsubs.Publish(ctx, roomID, rm); err != nil {
return err
}
}
return nil
}
func (f *forwarder) getBuf(roomID RoomID) *roomMessageBuffer {
f.mu.Lock()
defer f.mu.Unlock()
_, ok := f.bufs[roomID]
if !ok {
f.bufs[roomID] = newRoomMessageBuffer()
}
return f.bufs[roomID]
}
type roomMessageBuffer struct {
buf []*roomMessage
mu sync.Mutex
}
func newRoomMessageBuffer() *roomMessageBuffer {
return &roomMessageBuffer{
buf: nil,
mu: sync.Mutex{},
}
}
func (b *roomMessageBuffer) Add(rms ...*roomMessage) {
b.mu.Lock()
defer b.mu.Unlock()
b.buf = append(b.buf, rms...)
}
func (b *roomMessageBuffer) Flush() []*roomMessage {
b.mu.Lock()
defer b.mu.Unlock()
rms := b.buf
b.buf = nil
return rms
}