@@ -2,14 +2,14 @@ package raknet
2
2
3
3
import (
4
4
"bytes"
5
- "context"
6
5
"encoding"
7
6
"errors"
8
7
"fmt"
9
8
"github.com/sandertv/go-raknet/internal/message"
10
9
"io"
11
10
"net"
12
11
"net/netip"
12
+ "slices"
13
13
"sync"
14
14
"sync/atomic"
15
15
"time"
@@ -85,11 +85,6 @@ type Conn struct {
85
85
// datagram sequence number.
86
86
retransmission * resendMap
87
87
88
- // readDeadline is a channel that receives a time.Time after a specific
89
- // time. It is used to listen for timeouts in Read after calling
90
- // SetReadDeadline.
91
- readDeadline <- chan time.Time
92
-
93
88
lastActivity atomic.Pointer [time.Time ]
94
89
}
95
90
@@ -111,8 +106,8 @@ func newConn(conn net.PacketConn, raddr net.Addr, mtu uint16, h connectionHandle
111
106
packetQueue : newPacketQueue (),
112
107
retransmission : newRecoveryQueue (),
113
108
buf : bytes .NewBuffer (make ([]byte , 0 , mtu )),
114
- ackBuf : bytes .NewBuffer (make ([]byte , 0 , 256 )),
115
- nackBuf : bytes .NewBuffer (make ([]byte , 0 , 256 )),
109
+ ackBuf : bytes .NewBuffer (make ([]byte , 0 , 128 )),
110
+ nackBuf : bytes .NewBuffer (make ([]byte , 0 , 64 )),
116
111
}
117
112
t := time .Now ()
118
113
c .lastActivity .Store (& t )
@@ -142,34 +137,34 @@ func (conn *Conn) startTicking() {
142
137
case t := <- ticker .C :
143
138
i ++
144
139
conn .flushACKs ()
145
- if i % 2 == 0 {
146
- // Ping the other end periodically to prevent timeouts.
147
- _ = conn .send (& message.ConnectedPing {ClientTimestamp : timestamp ()})
148
- }
149
140
if i % 3 == 0 {
150
141
conn .checkResend (t )
151
142
}
152
- if i % 5 == 0 {
153
- conn .mu .Lock ()
154
- if t .Sub (* conn .lastActivity .Load ()) > time .Second * 5 + conn .retransmission .rtt ()* 2 {
155
- // No activity for too long: Start timeout.
156
- _ = conn .Close ()
157
- }
158
- conn .mu .Unlock ()
159
- }
160
143
if unix := conn .closing .Load (); unix != 0 {
161
144
before := acksLeft
162
145
conn .mu .Lock ()
163
146
acksLeft = len (conn .retransmission .unacknowledged )
164
147
conn .mu .Unlock ()
148
+
165
149
if before != 0 && acksLeft == 0 {
166
150
conn .closeImmediately ()
167
151
}
168
-
169
152
since := time .Since (time .Unix (unix , 0 ))
170
- if (acksLeft == 0 && since > time .Second ) || since > time .Second * 8 {
153
+ if (acksLeft == 0 && since > time .Second ) || since > time .Second * 5 {
171
154
conn .closeImmediately ()
172
155
}
156
+ continue
157
+ }
158
+ if i % 5 == 0 {
159
+ // Ping the other end periodically to prevent timeouts.
160
+ _ = conn .send (& message.ConnectedPing {ClientTimestamp : timestamp ()})
161
+
162
+ conn .mu .Lock ()
163
+ if t .Sub (* conn .lastActivity .Load ()) > time .Second * 5 + conn .retransmission .rtt ()* 2 {
164
+ // No activity for too long: Start timeout.
165
+ _ = conn .Close ()
166
+ }
167
+ conn .mu .Unlock ()
173
168
}
174
169
case <- conn .closed :
175
170
return
@@ -227,7 +222,7 @@ func (conn *Conn) Write(b []byte) (n int, err error) {
227
222
default :
228
223
conn .mu .Lock ()
229
224
defer conn .mu .Unlock ()
230
- n , err : = conn .write (b )
225
+ n , err = conn .write (b )
231
226
return n , conn .error (err , "write" )
232
227
}
233
228
}
@@ -239,8 +234,7 @@ func (conn *Conn) Write(b []byte) (n int, err error) {
239
234
// Write, write will not lock.
240
235
func (conn * Conn ) write (b []byte ) (n int , err error ) {
241
236
fragments := split (b , conn .effectiveMTU ())
242
- orderIndex := conn .orderIndex
243
- conn .orderIndex ++
237
+ orderIndex := conn .orderIndex .Inc ()
244
238
245
239
splitID := uint16 (conn .splitID )
246
240
if len (fragments ) > 1 {
@@ -258,9 +252,7 @@ func (conn *Conn) write(b []byte) (n int, err error) {
258
252
copy (pk .content , content )
259
253
260
254
pk .orderIndex = orderIndex
261
- pk .messageIndex = conn .messageIndex
262
- conn .messageIndex ++
263
-
255
+ pk .messageIndex = conn .messageIndex .Inc ()
264
256
if pk .split = len (fragments ) > 1 ; pk .split {
265
257
// If there were more than one fragment, the pk was split, so we
266
258
// need to make sure we set the appropriate fields.
@@ -284,13 +276,11 @@ func (conn *Conn) Read(b []byte) (n int, err error) {
284
276
select {
285
277
case pk := <- conn .packets :
286
278
if len (b ) < len (* pk ) {
287
- err = conn .error (errBufferTooSmall , "read" )
279
+ err = conn .error (ErrBufferTooSmall , "read" )
288
280
}
289
281
return copy (b , * pk ), err
290
282
case <- conn .closed :
291
283
return 0 , conn .error (net .ErrClosed , "read" )
292
- case <- conn .readDeadline :
293
- return 0 , conn .error (context .DeadlineExceeded , "read" )
294
284
}
295
285
}
296
286
@@ -303,8 +293,6 @@ func (conn *Conn) ReadPacket() (b []byte, err error) {
303
293
return * pk , err
304
294
case <- conn .closed :
305
295
return nil , conn .error (net .ErrClosed , "read" )
306
- case <- conn .readDeadline :
307
- return nil , conn .error (context .DeadlineExceeded , "read" )
308
296
}
309
297
}
310
298
@@ -338,35 +326,14 @@ func (conn *Conn) LocalAddr() net.Addr {
338
326
return conn .conn .LocalAddr ()
339
327
}
340
328
341
- // SetReadDeadline sets the read deadline of the connection. An error is
342
- // returned only if the time passed is before time.Now(). Calling
343
- // SetReadDeadline means the next Read call that exceeds the deadline will fail
344
- // and return an error. Setting the read deadline to the default value of
345
- // time.Time removes the deadline.
346
- func (conn * Conn ) SetReadDeadline (t time.Time ) error {
347
- if t .IsZero () {
348
- conn .readDeadline = make (chan time.Time )
349
- return nil
350
- }
351
- if t .Before (time .Now ()) {
352
- panic (fmt .Errorf ("read deadline cannot be before now" ))
353
- }
354
- conn .readDeadline = time .After (time .Until (t ))
355
- return nil
356
- }
329
+ // SetReadDeadline is unimplemented. It always returns ErrNotSupported.
330
+ func (conn * Conn ) SetReadDeadline (time.Time ) error { return ErrNotSupported }
357
331
358
- // SetWriteDeadline has no behaviour. It is merely there to satisfy the
359
- // net.Conn interface.
360
- func (conn * Conn ) SetWriteDeadline (time.Time ) error {
361
- return nil
362
- }
332
+ // SetWriteDeadline is unimplemented. It always returns ErrNotSupported.
333
+ func (conn * Conn ) SetWriteDeadline (time.Time ) error { return ErrNotSupported }
363
334
364
- // SetDeadline sets the deadline of the connection for both Read and Write.
365
- // SetDeadline is equivalent to calling both SetReadDeadline and
366
- // SetWriteDeadline.
367
- func (conn * Conn ) SetDeadline (t time.Time ) error {
368
- return conn .SetReadDeadline (t )
369
- }
335
+ // SetDeadline is unimplemented. It always returns ErrNotSupported.
336
+ func (conn * Conn ) SetDeadline (time.Time ) error { return ErrNotSupported }
370
337
371
338
// Latency returns a rolling average of rtt between the sending and the
372
339
// receiving end of the connection. The rtt returned is updated continuously
@@ -541,26 +508,14 @@ func (conn *Conn) receiveSplitPacket(p *packet) error {
541
508
}
542
509
m [p .splitIndex ] = p .content
543
510
544
- size := 0
545
- for _ , fragment := range m {
546
- if len (fragment ) == 0 {
547
- // We haven't yet received all split fragments, so we cannot add the
548
- // packets together yet.
549
- return nil
550
- }
551
- // First we calculate the total size required to hold the content of the
552
- // combined content.
553
- size += len (fragment )
554
- }
555
-
556
- content := make ([]byte , 0 , size )
557
- for _ , fragment := range m {
558
- content = append (content , fragment ... )
511
+ if slices .ContainsFunc (m , func (i []byte ) bool { return len (i ) == 0 }) {
512
+ // We haven't yet received all split fragments, so we cannot add the
513
+ // packets together yet.
514
+ return nil
559
515
}
516
+ p .content = slices .Concat (m ... )
560
517
561
518
delete (conn .splits , p .splitID )
562
-
563
- p .content = content
564
519
return conn .receivePacket (p )
565
520
}
566
521
@@ -611,11 +566,10 @@ func (conn *Conn) handleACK(b []byte) error {
611
566
}
612
567
for _ , sequenceNumber := range ack .packets {
613
568
// Take out all stored packets from the recovery queue.
614
- p , ok := conn .retransmission .acknowledge (sequenceNumber )
615
- if ok {
569
+ if p , ok := conn .retransmission .acknowledge (sequenceNumber ); ok {
616
570
// Clear the packet and return it to the pool so that it may be
617
571
// re-used.
618
- p .content = nil
572
+ p .content = p . content [: 0 ]
619
573
packetPool .Put (p )
620
574
}
621
575
}
@@ -654,9 +608,8 @@ func (conn *Conn) resend(sequenceNumbers []uint24) (err error) {
654
608
// passed. It is assigned a new sequence number and added to the retransmission.
655
609
func (conn * Conn ) sendDatagram (pk * packet ) error {
656
610
conn .buf .WriteByte (bitFlagDatagram | bitFlagNeedsBAndAS )
657
- newSeqNum := conn .seq
658
- conn .seq ++
659
- writeUint24 (conn .buf , newSeqNum )
611
+ seq := conn .seq .Inc ()
612
+ writeUint24 (conn .buf , seq )
660
613
pk .write (conn .buf )
661
614
662
615
// We then send the pk to the connection.
@@ -665,7 +618,7 @@ func (conn *Conn) sendDatagram(pk *packet) error {
665
618
}
666
619
// We then re-add the pk to the recovery queue in case the new one gets
667
620
// lost too, in which case we need to resend it again.
668
- conn .retransmission .add (newSeqNum , pk )
621
+ conn .retransmission .add (seq , pk )
669
622
conn .buf .Reset ()
670
623
return nil
671
624
}
0 commit comments