Skip to content

Commit 3056f21

Browse files
committed
disco: add ControllerManager
1 parent 1795187 commit 3056f21

File tree

4 files changed

+64
-22
lines changed

4 files changed

+64
-22
lines changed

disco/control.go

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package disco
2+
3+
type Controller interface {
4+
Handle(b []byte)
5+
Name() string
6+
Type() uint8
7+
}
8+
9+
type ControllerManager interface {
10+
Register(Controller)
11+
Unregister(Controller)
12+
}

disco/disco.go

+1
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ func (d *Disco) magic() []byte {
7575

7676
type PeerStore interface {
7777
FindPeer(peer.ID) (*PeerContext, bool)
78+
Peers() []PeerState
7879
}
7980

8081
type PeerContext struct {

disco/ws.go

+43-14
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,27 @@ import (
2222
)
2323

2424
var (
25-
_ io.ReadWriter = (*WSConn)(nil)
25+
_ io.ReadWriter = (*WSConn)(nil)
26+
_ ControllerManager = (*WSConn)(nil)
2627
)
2728

2829
type WSConn struct {
2930
*websocket.Conn
30-
server *peer.Peermap
31-
connectedServer string
32-
peerID peer.ID
33-
metadata url.Values
34-
closedSig chan int
35-
datagrams chan *Datagram
36-
peers chan *PeerFindEvent
37-
peersUDPAddrs chan *PeerUDPAddrEvent
38-
nonce byte
39-
stuns []string
40-
activeTime atomic.Int64
41-
writeMutex sync.Mutex
42-
rateLimiter *rate.Limiter
31+
server *peer.Peermap
32+
connectedServer string
33+
peerID peer.ID
34+
metadata url.Values
35+
closedSig chan int
36+
datagrams chan *Datagram
37+
peers chan *PeerFindEvent
38+
peersUDPAddrs chan *PeerUDPAddrEvent
39+
nonce byte
40+
stuns []string
41+
activeTime atomic.Int64
42+
writeMutex sync.Mutex
43+
rateLimiter *rate.Limiter
44+
controllersMutex sync.RWMutex
45+
controllers map[uint8][]Controller
4346

4447
connData chan []byte
4548
connBuf []byte
@@ -135,6 +138,24 @@ func (c *WSConn) ServerURL() string {
135138
return c.connectedServer
136139
}
137140

141+
func (c *WSConn) Register(ctr Controller) {
142+
c.controllersMutex.Lock()
143+
defer c.controllersMutex.Unlock()
144+
c.controllers[ctr.Type()] = append(c.controllers[ctr.Type()], ctr)
145+
}
146+
147+
func (c *WSConn) Unregister(ctr Controller) {
148+
c.controllersMutex.Lock()
149+
defer c.controllersMutex.Unlock()
150+
var filterd []Controller
151+
for _, ct := range c.controllers[ctr.Type()] {
152+
if ct.Name() != ctr.Name() {
153+
filterd = append(filterd, ct)
154+
}
155+
}
156+
c.controllers[ctr.Type()] = filterd
157+
}
158+
138159
func (c *WSConn) dial(ctx context.Context, server string) error {
139160
networkSecret, err := c.server.SecretStore().NetworkSecret()
140161
if err != nil {
@@ -343,6 +364,13 @@ func (c *WSConn) handleEvents(b []byte) {
343364
go c.updateNetworkSecret(secret)
344365
case peer.CONTROL_CONN:
345366
c.connData <- b[1:]
367+
default:
368+
c.controllersMutex.RLock()
369+
ctrs := c.controllers[b[0]]
370+
c.controllersMutex.RUnlock()
371+
for _, ctr := range ctrs {
372+
ctr.Handle(b)
373+
}
346374
}
347375
}
348376

@@ -384,6 +412,7 @@ func DialPeermap(ctx context.Context, server *peer.Peermap, peerID peer.ID, meta
384412
peers: make(chan *PeerFindEvent, 20),
385413
peersUDPAddrs: make(chan *PeerUDPAddrEvent, 20),
386414
connData: make(chan []byte, 128),
415+
controllers: make(map[uint8][]Controller),
387416
}
388417
if err := wsConn.dial(ctx, ""); err != nil {
389418
return nil, err

p2p/conn.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,6 @@ func (c *PeerPacketConn) TryLeadDisco(peerID peer.ID) {
185185
}
186186
}
187187

188-
// UDPConn return the os udp socket
189-
func (c *PeerPacketConn) UDPConn() net.PacketConn {
190-
return c.udpConn
191-
}
192-
193188
// ServerStream is the connection stream to the peermap server
194189
func (c *PeerPacketConn) ServerStream() io.ReadWriter {
195190
return c.wsConn
@@ -200,9 +195,14 @@ func (c *PeerPacketConn) ServerURL() string {
200195
return c.wsConn.ServerURL()
201196
}
202197

203-
// Peers return the found peers
204-
func (c *PeerPacketConn) Peers() []disco.PeerState {
205-
return c.udpConn.Peers()
198+
// ControllerManager makes changes attempting to move the current state towards the desired state
199+
func (c *PeerPacketConn) ControllerManager() disco.ControllerManager {
200+
return c.wsConn
201+
}
202+
203+
// PeerStore stores the found peers
204+
func (c *PeerPacketConn) PeerStore() disco.PeerStore {
205+
return c.udpConn
206206
}
207207

208208
// runControlEventLoop events control loop

0 commit comments

Comments
 (0)