Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
optimize: reduce syscalls using a buffered reader.
Browse files Browse the repository at this point in the history
Previously, each noise message read would make two syscalls:
1. one to read the length prefix.
2. one to read the encrypted payload.

This patch adds bufio.Reader mediation to cushion syscalls, and
significantly enhaces throughput in read-dominated connections, such
as file transfers.
  • Loading branch information
raulk committed Oct 7, 2020
1 parent 02dc2ad commit 4368597
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 25 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ require (
github.com/multiformats/go-multiaddr v0.2.1
github.com/stretchr/testify v1.5.1
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5
)
)
14 changes: 7 additions & 7 deletions rw.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ func (s *secureSession) Write(data []byte) (int, error) {
return written, nil
}

// readNextInsecureMsgLen reads the length of the next message on the insecure channel.
// readNextInsecureMsgLen reads the length of the next message on the insecureConn channel.
func (s *secureSession) readNextInsecureMsgLen() (int, error) {
_, err := io.ReadFull(s.insecure, s.rlen[:])
_, err := io.ReadFull(s.insecureReader, s.rlen[:])
if err != nil {
return 0, err
}
Expand All @@ -140,17 +140,17 @@ func (s *secureSession) readNextInsecureMsgLen() (int, error) {
}

// readNextMsgInsecure tries to read exactly len(buf) bytes into buf from
// the insecure channel and returns the error, if any.
// the insecureConn channel and returns the error, if any.
// Ideally, for reading a message, you'd first want to call `readNextInsecureMsgLen`
// to determine the size of the next message to be read from the insecure channel and then call
// to determine the size of the next message to be read from the insecureConn channel and then call
// this function with a buffer of exactly that size.
func (s *secureSession) readNextMsgInsecure(buf []byte) error {
_, err := io.ReadFull(s.insecure, buf)
_, err := io.ReadFull(s.insecureReader, buf)
return err
}

// writeMsgInsecure writes to the insecure conn.
// writeMsgInsecure writes to the insecureConn conn.
// data will be prefixed with its length in bytes, written as a 16-bit uint in network order.
func (s *secureSession) writeMsgInsecure(data []byte) (int, error) {
return s.insecure.Write(data)
return s.insecureConn.Write(data)
}
35 changes: 20 additions & 15 deletions session.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package noise

import (
"bufio"
"context"
"net"
"sync"
Expand All @@ -22,7 +23,10 @@ type secureSession struct {

readLock sync.Mutex
writeLock sync.Mutex
insecure net.Conn

insecureConn net.Conn
insecureReader *bufio.Reader // to cushion io read syscalls
// we don't buffer writes to avoid introducing latency; optimisation possible. // TODO revisit

qseek int // queued bytes seek value.
qbuf []byte // queued bytes buffer.
Expand All @@ -32,15 +36,16 @@ type secureSession struct {
dec *noise.CipherState
}

// newSecureSession creates a Noise session over the given insecure Conn, using
// newSecureSession creates a Noise session over the given insecureConn Conn, using
// the libp2p identity keypair from the given Transport.
func newSecureSession(tpt *Transport, ctx context.Context, insecure net.Conn, remote peer.ID, initiator bool) (*secureSession, error) {
s := &secureSession{
insecure: insecure,
initiator: initiator,
localID: tpt.localID,
localKey: tpt.privateKey,
remoteID: remote,
insecureConn: insecure,
insecureReader: bufio.NewReader(insecure),
initiator: initiator,
localID: tpt.localID,
localKey: tpt.privateKey,
remoteID: remote,
}

// the go-routine we create to run the handshake will
Expand All @@ -53,22 +58,22 @@ func newSecureSession(tpt *Transport, ctx context.Context, insecure net.Conn, re
select {
case err := <-respCh:
if err != nil {
_ = s.insecure.Close()
_ = s.insecureConn.Close()
}
return s, err

case <-ctx.Done():
// If the context has been cancelled, we close the underlying connection.
// We then wait for the handshake to return because of the first error it encounters
// so we don't return without cleaning up the go-routine.
_ = s.insecure.Close()
_ = s.insecureConn.Close()
<-respCh
return nil, ctx.Err()
}
}

func (s *secureSession) LocalAddr() net.Addr {
return s.insecure.LocalAddr()
return s.insecureConn.LocalAddr()
}

func (s *secureSession) LocalPeer() peer.ID {
Expand All @@ -84,7 +89,7 @@ func (s *secureSession) LocalPublicKey() crypto.PubKey {
}

func (s *secureSession) RemoteAddr() net.Addr {
return s.insecure.RemoteAddr()
return s.insecureConn.RemoteAddr()
}

func (s *secureSession) RemotePeer() peer.ID {
Expand All @@ -96,17 +101,17 @@ func (s *secureSession) RemotePublicKey() crypto.PubKey {
}

func (s *secureSession) SetDeadline(t time.Time) error {
return s.insecure.SetDeadline(t)
return s.insecureConn.SetDeadline(t)
}

func (s *secureSession) SetReadDeadline(t time.Time) error {
return s.insecure.SetReadDeadline(t)
return s.insecureConn.SetReadDeadline(t)
}

func (s *secureSession) SetWriteDeadline(t time.Time) error {
return s.insecure.SetWriteDeadline(t)
return s.insecureConn.SetWriteDeadline(t)
}

func (s *secureSession) Close() error {
return s.insecure.Close()
return s.insecureConn.Close()
}
4 changes: 2 additions & 2 deletions transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func TestReadUnencryptedFails(t *testing.T) {
msg := make([]byte, len(before)+LengthPrefixLength)
binary.BigEndian.PutUint16(msg, uint16(len(before)))
copy(msg[LengthPrefixLength:], before)
n, err := initConn.insecure.Write(msg)
n, err := initConn.insecureConn.Write(msg)
require.NoError(t, err)
require.Equal(t, len(msg), n)

Expand All @@ -319,7 +319,7 @@ func TestReadUnencryptedFails(t *testing.T) {
msg = make([]byte, len(before)+LengthPrefixLength)
binary.BigEndian.PutUint16(msg, uint16(len(before)))
copy(msg[LengthPrefixLength:], before)
n, err = initConn.insecure.Write(msg)
n, err = initConn.insecureConn.Write(msg)
require.NoError(t, err)
require.Equal(t, len(msg), n)

Expand Down

0 comments on commit 4368597

Please sign in to comment.