Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More optimization #397

Merged
merged 24 commits into from
Feb 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4f2503d
using sftp-server these tests hang without this close
puellanivis Oct 28, 2020
2753834
defers in a for loop don't execute until the end of the function
puellanivis Oct 28, 2020
d178e47
avoid panic if double call to Close
puellanivis Oct 28, 2020
d4ff5ae
remove unnecessary append()s
puellanivis Oct 28, 2020
0314349
more data to debug test failure
puellanivis Oct 28, 2020
c6aaae5
cleanup a t.Fatalf
puellanivis Oct 28, 2020
e3d9bf1
early return error, happy path straight
puellanivis Oct 28, 2020
fef2628
remove write/read race condition
puellanivis Oct 28, 2020
afc8d7b
no need to double type switch
puellanivis Oct 28, 2020
5e8f9f4
MarshalBinary now gives a 4-byte header for length, marshalPacket giv…
puellanivis Oct 28, 2020
a66e205
chan is already available via closure
puellanivis Oct 28, 2020
ac6027d
numerous subtle race conditions resolved in clientConn
puellanivis Oct 28, 2020
89afa80
more concurrency, more overhead, but sometimes more throughput?
puellanivis Oct 29, 2020
59de312
more robust tests, fix typos
puellanivis Nov 14, 2020
79ae5c2
a new server disconnect error point rather than EOF
puellanivis Nov 14, 2020
cdedb55
implement ReadFrom, normalize code patterns
puellanivis Nov 25, 2020
5d0d479
The SSH_FX_CONNECTION_LOST exists precisely for this
puellanivis Dec 8, 2020
29c556e
WriteTo benchmarks
puellanivis Jan 22, 2021
64bc1f8
WriteTo better, but not best, version
puellanivis Jan 22, 2021
0be6950
fix comments and variable names to reflect Write instead of Read
puellanivis Jan 25, 2021
fc15699
fixed concurrent writes, mid-polish
puellanivis Feb 21, 2021
c6f90f0
polishing done?
puellanivis Feb 22, 2021
1d73fd9
fix typo, error message is now two-way greppable
puellanivis Feb 22, 2021
861a8ea
pointer receivers and statusFromError(uint32, error)
puellanivis Feb 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,118 changes: 733 additions & 385 deletions client.go

Large diffs are not rendered by default.

259 changes: 196 additions & 63 deletions client_integration_test.go

Large diffs are not rendered by default.

90 changes: 63 additions & 27 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func (c *conn) Close() error {

type clientConn struct {
conn
wg sync.WaitGroup
wg sync.WaitGroup

sync.Mutex // protects inflight
inflight map[uint32]chan<- result // outstanding requests

Expand Down Expand Up @@ -76,29 +77,53 @@ func (c *clientConn) loop() {
// recv continuously reads from the server and forwards responses to the
// appropriate channel.
func (c *clientConn) recv() error {
defer func() {
c.conn.Close()
}()
defer c.conn.Close()

for {
typ, data, err := c.recvPacket(0)
if err != nil {
return err
}
sid, _ := unmarshalUint32(data)
c.Lock()
ch, ok := c.inflight[sid]
delete(c.inflight, sid)
c.Unlock()

ch, ok := c.getChannel(sid)
if !ok {
// This is an unexpected occurrence. Send the error
// back to all listeners so that they terminate
// gracefully.
return errors.Errorf("sid: %v not fond", sid)
return errors.Errorf("sid not found: %v", sid)
}

ch <- result{typ: typ, data: data}
}
}

func (c *clientConn) putChannel(ch chan<- result, sid uint32) bool {
c.Lock()
defer c.Unlock()

select {
case <-c.closed:
// already closed with broadcastErr, return error on chan.
ch <- result{err: ErrSSHFxConnectionLost}
return false
default:
}

c.inflight[sid] = ch
return true
}

func (c *clientConn) getChannel(sid uint32) (chan<- result, bool) {
c.Lock()
defer c.Unlock()

ch, ok := c.inflight[sid]
delete(c.inflight, sid)

return ch, ok
}

// result captures the result of receiving the a packet from the server
type result struct {
typ byte
Expand All @@ -111,37 +136,48 @@ type idmarshaler interface {
encoding.BinaryMarshaler
}

func (c *clientConn) sendPacket(p idmarshaler) (byte, []byte, error) {
ch := make(chan result, 2)
func (c *clientConn) sendPacket(ch chan result, p idmarshaler) (byte, []byte, error) {
if cap(ch) < 1 {
ch = make(chan result, 1)
}

c.dispatchRequest(ch, p)
s := <-ch
return s.typ, s.data, s.err
}

// dispatchRequest should ideally only be called by race-detection tests outside of this file,
// where you have to ensure two packets are in flight sequentially after each other.
func (c *clientConn) dispatchRequest(ch chan<- result, p idmarshaler) {
c.Lock()
c.inflight[p.id()] = ch
c.Unlock()
sid := p.id()

if !c.putChannel(ch, sid) {
// already closed.
return
}

if err := c.conn.sendPacket(p); err != nil {
c.Lock()
delete(c.inflight, p.id())
c.Unlock()
ch <- result{err: err}
if ch, ok := c.getChannel(sid); ok {
ch <- result{err: err}
}
}
}

// broadcastErr sends an error to all goroutines waiting for a response.
func (c *clientConn) broadcastErr(err error) {
c.Lock()
listeners := make([]chan<- result, 0, len(c.inflight))
for _, ch := range c.inflight {
listeners = append(listeners, ch)
}
c.Unlock()
bcastRes := result{err: errors.New("unexpected server disconnect")}
for _, ch := range listeners {
defer c.Unlock()

bcastRes := result{err: ErrSSHFxConnectionLost}
for sid, ch := range c.inflight {
ch <- bcastRes

// Replace the chan in inflight,
// we have hijacked this chan,
// and this guarantees always-only-once sending.
c.inflight[sid] = make(chan<- result, 1)
}

c.err = err
close(c.closed)
}
Expand All @@ -150,6 +186,6 @@ type serverConn struct {
conn
}

func (s *serverConn) sendError(p ider, err error) error {
return s.sendPacket(statusFromError(p, err))
func (s *serverConn) sendError(id uint32, err error) error {
return s.sendPacket(statusFromError(id, err))
}
2 changes: 1 addition & 1 deletion packet-manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func fake(rid, order uint32) fakepacket {
}

func (fakepacket) MarshalBinary() ([]byte, error) {
return []byte{}, nil
return make([]byte, 4), nil
}

func (fakepacket) UnmarshalBinary([]byte) error {
Expand Down
76 changes: 38 additions & 38 deletions packet-typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,51 +34,51 @@ type notReadOnly interface {

//// define types by adding methods
// hasPath
func (p sshFxpLstatPacket) getPath() string { return p.Path }
func (p sshFxpStatPacket) getPath() string { return p.Path }
func (p sshFxpRmdirPacket) getPath() string { return p.Path }
func (p sshFxpReadlinkPacket) getPath() string { return p.Path }
func (p sshFxpRealpathPacket) getPath() string { return p.Path }
func (p sshFxpMkdirPacket) getPath() string { return p.Path }
func (p sshFxpSetstatPacket) getPath() string { return p.Path }
func (p sshFxpStatvfsPacket) getPath() string { return p.Path }
func (p sshFxpRemovePacket) getPath() string { return p.Filename }
func (p sshFxpRenamePacket) getPath() string { return p.Oldpath }
func (p sshFxpSymlinkPacket) getPath() string { return p.Targetpath }
func (p sshFxpOpendirPacket) getPath() string { return p.Path }
func (p sshFxpOpenPacket) getPath() string { return p.Path }
func (p *sshFxpLstatPacket) getPath() string { return p.Path }
func (p *sshFxpStatPacket) getPath() string { return p.Path }
func (p *sshFxpRmdirPacket) getPath() string { return p.Path }
func (p *sshFxpReadlinkPacket) getPath() string { return p.Path }
func (p *sshFxpRealpathPacket) getPath() string { return p.Path }
func (p *sshFxpMkdirPacket) getPath() string { return p.Path }
func (p *sshFxpSetstatPacket) getPath() string { return p.Path }
func (p *sshFxpStatvfsPacket) getPath() string { return p.Path }
func (p *sshFxpRemovePacket) getPath() string { return p.Filename }
func (p *sshFxpRenamePacket) getPath() string { return p.Oldpath }
func (p *sshFxpSymlinkPacket) getPath() string { return p.Targetpath }
func (p *sshFxpOpendirPacket) getPath() string { return p.Path }
func (p *sshFxpOpenPacket) getPath() string { return p.Path }

func (p sshFxpExtendedPacketPosixRename) getPath() string { return p.Oldpath }
func (p sshFxpExtendedPacketHardlink) getPath() string { return p.Oldpath }
func (p *sshFxpExtendedPacketPosixRename) getPath() string { return p.Oldpath }
func (p *sshFxpExtendedPacketHardlink) getPath() string { return p.Oldpath }

// getHandle
func (p sshFxpFstatPacket) getHandle() string { return p.Handle }
func (p sshFxpFsetstatPacket) getHandle() string { return p.Handle }
func (p sshFxpReadPacket) getHandle() string { return p.Handle }
func (p sshFxpWritePacket) getHandle() string { return p.Handle }
func (p sshFxpReaddirPacket) getHandle() string { return p.Handle }
func (p sshFxpClosePacket) getHandle() string { return p.Handle }
func (p *sshFxpFstatPacket) getHandle() string { return p.Handle }
func (p *sshFxpFsetstatPacket) getHandle() string { return p.Handle }
func (p *sshFxpReadPacket) getHandle() string { return p.Handle }
func (p *sshFxpWritePacket) getHandle() string { return p.Handle }
func (p *sshFxpReaddirPacket) getHandle() string { return p.Handle }
func (p *sshFxpClosePacket) getHandle() string { return p.Handle }

// notReadOnly
func (p sshFxpWritePacket) notReadOnly() {}
func (p sshFxpSetstatPacket) notReadOnly() {}
func (p sshFxpFsetstatPacket) notReadOnly() {}
func (p sshFxpRemovePacket) notReadOnly() {}
func (p sshFxpMkdirPacket) notReadOnly() {}
func (p sshFxpRmdirPacket) notReadOnly() {}
func (p sshFxpRenamePacket) notReadOnly() {}
func (p sshFxpSymlinkPacket) notReadOnly() {}
func (p sshFxpExtendedPacketPosixRename) notReadOnly() {}
func (p sshFxpExtendedPacketHardlink) notReadOnly() {}
func (p *sshFxpWritePacket) notReadOnly() {}
func (p *sshFxpSetstatPacket) notReadOnly() {}
func (p *sshFxpFsetstatPacket) notReadOnly() {}
func (p *sshFxpRemovePacket) notReadOnly() {}
func (p *sshFxpMkdirPacket) notReadOnly() {}
func (p *sshFxpRmdirPacket) notReadOnly() {}
func (p *sshFxpRenamePacket) notReadOnly() {}
func (p *sshFxpSymlinkPacket) notReadOnly() {}
func (p *sshFxpExtendedPacketPosixRename) notReadOnly() {}
func (p *sshFxpExtendedPacketHardlink) notReadOnly() {}

// some packets with ID are missing id()
func (p sshFxpDataPacket) id() uint32 { return p.ID }
func (p sshFxpStatusPacket) id() uint32 { return p.ID }
func (p sshFxpStatResponse) id() uint32 { return p.ID }
func (p sshFxpNamePacket) id() uint32 { return p.ID }
func (p sshFxpHandlePacket) id() uint32 { return p.ID }
func (p StatVFS) id() uint32 { return p.ID }
func (p sshFxVersionPacket) id() uint32 { return 0 }
func (p *sshFxpDataPacket) id() uint32 { return p.ID }
func (p *sshFxpStatusPacket) id() uint32 { return p.ID }
func (p *sshFxpStatResponse) id() uint32 { return p.ID }
func (p *sshFxpNamePacket) id() uint32 { return p.ID }
func (p *sshFxpHandlePacket) id() uint32 { return p.ID }
func (p *StatVFS) id() uint32 { return p.ID }
func (p *sshFxVersionPacket) id() uint32 { return 0 }

// take raw incoming packet data and build packet objects
func makePacket(p rxPacket) (requestPacket, error) {
Expand Down
Loading