Skip to content

Commit d288bbf

Browse files
committed
conn.go: Use an 'elastic' channel for incoming packets. Resolves #30.
1 parent 563a53f commit d288bbf

File tree

3 files changed

+94
-29
lines changed

3 files changed

+94
-29
lines changed

conn.go

+18-26
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding"
66
"errors"
77
"fmt"
8+
"github.com/sandertv/go-raknet/internal"
89
"github.com/sandertv/go-raknet/internal/message"
910
"io"
1011
"net"
@@ -79,7 +80,7 @@ type Conn struct {
7980
packetQueue *packetQueue
8081
// packets is a channel containing content of packets that were fully
8182
// processed. Calling Conn.Read() consumes a value from this channel.
82-
packets chan *[]byte
83+
packets *internal.ElasticChan[[]byte]
8384

8485
// retransmission is a queue filled with packets that were sent with a given
8586
// datagram sequence number.
@@ -100,7 +101,7 @@ func newConn(conn net.PacketConn, raddr net.Addr, mtu uint16, h connectionHandle
100101
pk: new(packet),
101102
closed: make(chan struct{}),
102103
connected: make(chan struct{}),
103-
packets: make(chan *[]byte, 512),
104+
packets: internal.Chan[[]byte](4),
104105
splits: make(map[uint16][][]byte),
105106
win: newDatagramWindow(),
106107
packetQueue: newPacketQueue(),
@@ -273,27 +274,24 @@ func (conn *Conn) write(b []byte) (n int, err error) {
273274
// Read blocks until a packet is received over the connection, or until the
274275
// session is closed or the read times out, in which case an error is returned.
275276
func (conn *Conn) Read(b []byte) (n int, err error) {
276-
select {
277-
case pk := <-conn.packets:
278-
if len(b) < len(*pk) {
279-
err = conn.error(ErrBufferTooSmall, "read")
280-
}
281-
return copy(b, *pk), err
282-
case <-conn.closed:
277+
pk, ok := conn.packets.Recv(conn.closed)
278+
if !ok {
283279
return 0, conn.error(net.ErrClosed, "read")
280+
} else if len(b) < len(pk) {
281+
return 0, conn.error(ErrBufferTooSmall, "read")
284282
}
283+
return copy(b, pk), err
285284
}
286285

287286
// ReadPacket attempts to read the next packet as a byte slice. ReadPacket
288287
// blocks until a packet is received over the connection, or until the session
289288
// is closed or the read times out, in which case an error is returned.
290289
func (conn *Conn) ReadPacket() (b []byte, err error) {
291-
select {
292-
case pk := <-conn.packets:
293-
return *pk, err
294-
case <-conn.closed:
290+
pk, ok := conn.packets.Recv(conn.closed)
291+
if !ok {
295292
return nil, conn.error(net.ErrClosed, "read")
296293
}
294+
return pk, err
297295
}
298296

299297
// Close closes the connection. All blocking Read or Write actions are
@@ -377,19 +375,19 @@ func (conn *Conn) receiveDatagram(b []byte) error {
377375
return fmt.Errorf("read datagram: %w", io.ErrUnexpectedEOF)
378376
}
379377
seq := loadUint24(b)
380-
conn.ackMu.Lock()
381-
// Add this sequence number to the received datagrams, so that it is
382-
// included in an ACK.
383-
conn.ackSlice = append(conn.ackSlice, seq)
384-
conn.ackMu.Unlock()
385-
386378
if !conn.win.add(seq) {
387379
// Datagram was already received, this might happen if a packet took a
388380
// long time to arrive, and we already sent a NACK for it. This is
389381
// expected to happen sometimes under normal circumstances, so no reason
390382
// to return an error.
391383
return nil
392384
}
385+
conn.ackMu.Lock()
386+
// Add this sequence number to the received datagrams, so that it is
387+
// included in an ACK.
388+
conn.ackSlice = append(conn.ackSlice, seq)
389+
conn.ackMu.Unlock()
390+
393391
if conn.win.shift() == 0 {
394392
// Datagram window couldn't be shifted up, so we're still missing
395393
// packets.
@@ -463,13 +461,7 @@ func (conn *Conn) handlePacket(b []byte) error {
463461
return fmt.Errorf("handle packet: %w", err)
464462
}
465463
if !handled {
466-
// Insert the packet contents the packet queue could release in the
467-
// channel so that Conn.Read() can get a hold of them, but always first
468-
// try to escape if the connection was closed.
469-
select {
470-
case <-conn.closed:
471-
case conn.packets <- &b:
472-
}
464+
conn.packets.Send(b)
473465
}
474466
return nil
475467
}

handler.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,10 @@ func (h listenerConnectionHandler) handleOpenConnectionRequest2(b []byte, addr n
118118
return fmt.Errorf("send OPEN_CONNECTION_REPLY_2: %w", err)
119119
}
120120

121-
conn := newConn(h.l.conn, addr, mtuSize, h)
122-
h.l.connections.Store(resolve(addr), conn)
123-
124121
go func() {
122+
conn := newConn(h.l.conn, addr, mtuSize, h)
123+
h.l.connections.Store(resolve(addr), conn)
124+
125125
t := time.NewTimer(time.Second * 10)
126126
defer t.Stop()
127127
select {

internal/chan.go

+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package internal
2+
3+
import (
4+
"sync"
5+
"sync/atomic"
6+
)
7+
8+
// ElasticChan is a channel that grows if its capacity is reached.
9+
type ElasticChan[T any] struct {
10+
mu sync.RWMutex
11+
len atomic.Int64
12+
ch chan T
13+
}
14+
15+
// Chan creates an ElasticChan of a size.
16+
func Chan[T any](size int) *ElasticChan[T] {
17+
c := new(ElasticChan[T])
18+
c.grow(size)
19+
return c
20+
}
21+
22+
// Recv attempts to read a value from the channel. If cancel is closed, Recv
23+
// will return ok = false.
24+
func (c *ElasticChan[T]) Recv(cancel <-chan struct{}) (val T, ok bool) {
25+
c.mu.RLock()
26+
defer c.mu.RUnlock()
27+
28+
select {
29+
case <-cancel:
30+
return val, false
31+
case val = <-c.ch:
32+
if c.len.Add(-1) < 0 {
33+
panic("unreachable")
34+
}
35+
return val, true
36+
}
37+
}
38+
39+
// Send sends a value to the channel. Send never blocks, because if the maximum
40+
// capacity of the underlying channel is reached, a larger one is created.
41+
func (c *ElasticChan[T]) Send(val T) {
42+
if c.len.Load()+1 >= int64(cap(c.ch)) {
43+
// This check happens outside a lock, meaning in the meantime, a call to
44+
// Recv could cause the length to decrease, technically meaning growing
45+
// is then unnecessary. That isn't a major issue though, as in most
46+
// cases growing would still be necessary later.
47+
c.growSend(val)
48+
return
49+
}
50+
c.len.Add(1)
51+
c.ch <- val
52+
}
53+
54+
// growSend grows the channel to double the capacity, copying all values
55+
// currently in the channel, and sends the value to the new channel.
56+
func (c *ElasticChan[T]) growSend(val T) {
57+
c.mu.Lock()
58+
defer c.mu.Unlock()
59+
60+
c.grow(cap(c.ch) * 2)
61+
c.len.Add(1)
62+
c.ch <- val
63+
}
64+
65+
// grow grows the ElasticChan to the size passed, copying all values currently
66+
// in the channel into a new channel with a bigger buffer.
67+
func (c *ElasticChan[T]) grow(size int) {
68+
ch := make(chan T, size)
69+
for len(c.ch) > 0 {
70+
ch <- <-c.ch
71+
}
72+
c.ch = ch
73+
}

0 commit comments

Comments
 (0)