Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More optimization #397

Merged
merged 24 commits into from
Feb 22, 2021
Merged

More optimization #397

merged 24 commits into from
Feb 22, 2021

Conversation

puellanivis
Copy link
Collaborator

  • MarshalBinary for packets must now include 4 empty bytes at the start for filling in the length of the packet.
  • Packets may now implement marshalPacket that can return both the packet and payload as separate byte-slices, allowing the payload to be written separately without having to copy it into a separate marshaled packet slice first.
  • rigorously define behavior of chan result in sending packets to the server to guarantee at-most-once delivery on each channel, meaning we can halve the space overhead from allocating a buffered channel (happens nearly every request).
  • fix a write-read race condition in request-server_test.go
  • fix a bunch of edge-cases in client_integration_test.go that were causing integration tests to lock up, be flaky or just not work.
  • implement WriterAt
  • new concurrency model of Map→Worker→Reduce for better high-latency concurrency in: ReadAt, WriteAt, ReadFrom.
  • if a ReadAt or WriteAt can be done in one request, short-circuit concurrency and just do the request straight
  • try and guess if a ReadFrom can be done in one request, and short-circuit to a synchronous loop (this is to be absolutely sure the io.Reader is read to io.EOF, even though it’s strongly likely that it will only ever run one loop.)
  • TODO: currently pondering how best to do WriteTo efficiently with the Map→Worker→Reduce paradigm. For sure though, it won’t end up as similar as the other three are to each other.

@puellanivis
Copy link
Collaborator Author

Hm… benchmarks are wildly variant.

As noted, the delayed benchmarks show vastly bigger improvement on speed, and throughput.

Allocation memory used by Writes and ReadFrom are down ~99% and ~91% respectively. For Writes+Delay, and ReadFrom+Delay, ~75% and ~70% respectively.

Allocs per op are down -14% for ops <= 32k, but up about 20% for bigger ops.

These are nice:

Write1k-12                   90.6MB/s ± 1%  103.5MB/s ± 1%   +14.21%  (p=0.000 n=10+10)
Write16k-12                   597MB/s ± 2%    807MB/s ± 3%   +35.09%  (p=0.000 n=10+10)
Write32k-12                   742MB/s ± 2%   1067MB/s ± 3%   +43.77%  (p=0.000 n=10+10)
Write128k-12                  932MB/s ± 3%   1335MB/s ± 3%   +43.28%  (p=0.000 n=10+9)
Write512k-12                 1.29GB/s ± 3%   1.52GB/s ± 1%   +17.63%  (p=0.000 n=9+9)
Write1MiB-12                 1.35GB/s ± 3%   1.53GB/s ± 4%   +12.59%  (p=0.000 n=10+10)
Write4MiB-12                 1.45GB/s ± 2%   1.55GB/s ± 4%    +7.25%  (p=0.000 n=10+10)
Write4MiBDelay10Msec-12      38.3MB/s ± 0%  113.2MB/s ± 2%  +195.55%  (p=0.000 n=10+10)
Write4MiBDelay50Msec-12      7.98MB/s ± 0%  25.35MB/s ± 0%  +217.87%  (p=0.000 n=9+10)
Write4MiBDelay150Msec-12     2.68MB/s ± 0%   8.64MB/s ± 0%  +222.28%  (p=0.000 n=10+10)
ReadFrom1k-12                1.64GB/s ± 3%   1.74GB/s ± 4%    +6.27%  (p=0.000 n=10+10)
ReadFrom16k-12               1.52GB/s ± 5%   1.62GB/s ± 5%    +6.46%  (p=0.000 n=10+10)
ReadFrom32k-12               1.48GB/s ± 3%   1.56GB/s ± 7%    +5.35%  (p=0.003 n=10+10)
ReadFrom128k-12              1.46GB/s ± 2%   1.55GB/s ± 3%    +6.36%  (p=0.000 n=10+10)
ReadFrom512k-12              1.43GB/s ± 4%   1.53GB/s ± 4%    +7.29%  (p=0.000 n=10+10)
ReadFrom1MiB-12              1.43GB/s ± 1%   1.54GB/s ± 3%    +7.94%  (p=0.000 n=9+10)
ReadFrom4MiB-12              1.42GB/s ± 4%   1.53GB/s ± 5%    +7.92%  (p=0.000 n=10+10)
ReadFrom4MiBDelay10Msec-12   71.3MB/s ± 0%  111.7MB/s ± 0%   +56.58%  (p=0.000 n=10+9)
ReadFrom4MiBDelay50Msec-12   14.8MB/s ± 0%   23.1MB/s ± 0%   +55.72%  (p=0.000 n=10+6)
ReadFrom4MiBDelay150Msec-12  4.97MB/s ± 0%   7.74MB/s ± 0%   +55.73%  (p=0.000 n=10+10)

Vs. this PR without concurrency updates:

Except delayed operations, throughput all stays pretty similar.

Delayed Read/Write/ReadFrom all show more speed, much more throughput, and about 20–25% more allocs/op.

ReadFrom without delays uses about 23 times more memory usage on each buffer size. (Could be bad sync.Pool behavior?)

Writes <= 32k show 90% less memory usage, but for bigger writes -50% (less) to +27% (more) memory. (probably due to greater concurrency being enabled as buffer size grows) Throughputs are all relatively unchanged.


I think with the throughputs being mostly a wash, memory usage being down, despite a larger number of allocs/op, the tradeoff for better performance on delayed lines is probably a good trade. Well, once I figure out the 23x more memory being used by ReadFrom 😆

@drakkan
Copy link
Collaborator

drakkan commented Nov 26, 2020

Hi,

I did a quick test after applying this patch

0001-replace-sync.Pool-with-our-allocator.zip

the results are quite similar:

name                         old time/op    new time/op    delta
ReadFrom1k-12                  8.12ms ± 6%    8.36ms ± 6%    ~     (p=0.075 n=10+10)
ReadFrom16k-12                 8.45ms ± 3%    8.82ms ± 3%  +4.32%  (p=0.000 n=10+9)
ReadFrom32k-12                 9.13ms ± 6%    8.77ms ± 2%  -4.00%  (p=0.001 n=10+10)
ReadFrom128k-12                9.46ms ± 5%    8.87ms ± 4%  -6.22%  (p=0.000 n=10+10)
ReadFrom512k-12                9.59ms ± 3%    8.90ms ± 2%  -7.15%  (p=0.000 n=9+9)
ReadFrom4MiB-12                9.36ms ± 4%    8.94ms ± 1%  -4.49%  (p=0.000 n=10+9)
ReadFrom4MiBDelay10Msec-12     93.9ms ± 0%    94.0ms ± 0%    ~     (p=0.105 n=10+10)
ReadFrom4MiBDelay50Msec-12      456ms ± 0%     456ms ± 0%    ~     (p=0.497 n=9+10)
ReadFrom4MiBDelay150Msec-12     1.36s ± 0%     1.36s ± 0%    ~     (p=0.529 n=10+10)

name                         old speed      new speed      delta
ReadFrom1k-12                1.29GB/s ± 6%  1.26GB/s ± 7%    ~     (p=0.075 n=10+10)
ReadFrom16k-12               1.24GB/s ± 3%  1.19GB/s ± 3%  -4.15%  (p=0.000 n=10+9)
ReadFrom32k-12               1.15GB/s ± 6%  1.20GB/s ± 2%  +4.09%  (p=0.001 n=10+10)
ReadFrom128k-12              1.11GB/s ± 5%  1.18GB/s ± 5%  +6.60%  (p=0.000 n=10+10)
ReadFrom512k-12              1.09GB/s ± 3%  1.18GB/s ± 2%  +7.68%  (p=0.000 n=9+9)
ReadFrom4MiB-12              1.12GB/s ± 4%  1.17GB/s ± 1%  +4.66%  (p=0.000 n=10+9)
ReadFrom4MiBDelay10Msec-12    112MB/s ± 0%   112MB/s ± 0%    ~     (p=0.127 n=10+10)
ReadFrom4MiBDelay50Msec-12   23.0MB/s ± 0%  23.0MB/s ± 0%    ~     (p=0.283 n=9+10)
ReadFrom4MiBDelay150Msec-12  7.73MB/s ± 0%  7.73MB/s ± 0%    ~     (all equal)

name                         old alloc/op   new alloc/op   delta
ReadFrom1k-12                  2.46MB ± 0%    2.23MB ± 0%  -9.39%  (p=0.000 n=10+10)
ReadFrom16k-12                 2.46MB ± 0%    2.23MB ± 0%  -9.41%  (p=0.000 n=10+10)
ReadFrom32k-12                 2.45MB ± 0%    2.23MB ± 0%  -9.33%  (p=0.000 n=10+10)
ReadFrom128k-12                2.45MB ± 0%    2.23MB ± 0%  -9.34%  (p=0.000 n=9+10)
ReadFrom512k-12                2.45MB ± 0%    2.23MB ± 0%  -9.32%  (p=0.000 n=10+10)
ReadFrom4MiB-12                2.45MB ± 0%    2.23MB ± 0%  -9.24%  (p=0.000 n=10+10)
ReadFrom4MiBDelay10Msec-12     12.9MB ± 0%    12.7MB ± 0%  -1.23%  (p=0.000 n=10+10)
ReadFrom4MiBDelay50Msec-12     12.9MB ± 0%    12.7MB ± 0%  -1.46%  (p=0.000 n=10+10)
ReadFrom4MiBDelay150Msec-12    13.0MB ± 0%    12.7MB ± 0%  -2.10%  (p=0.000 n=9+10)

name                         old allocs/op  new allocs/op  delta
ReadFrom1k-12                   2.59k ± 0%     2.51k ± 0%  -3.33%  (p=0.000 n=10+10)
ReadFrom16k-12                  2.59k ± 0%     2.51k ± 0%  -3.32%  (p=0.000 n=7+6)
ReadFrom32k-12                  2.59k ± 0%     2.51k ± 0%  -3.33%  (p=0.000 n=10+10)
ReadFrom128k-12                 2.59k ± 0%     2.51k ± 0%  -3.34%  (p=0.000 n=10+10)
ReadFrom512k-12                 2.59k ± 0%     2.51k ± 0%  -3.33%  (p=0.000 n=10+10)
ReadFrom4MiB-12                 2.59k ± 0%     2.51k ± 0%  -3.32%  (p=0.000 n=10+10)
ReadFrom4MiBDelay10Msec-12      3.24k ± 0%     3.15k ± 0%  -2.67%  (p=0.000 n=10+10)
ReadFrom4MiBDelay50Msec-12      3.25k ± 0%     3.16k ± 1%  -2.71%  (p=0.000 n=10+10)
ReadFrom4MiBDelay150Msec-12     3.28k ± 1%     3.18k ± 1%  -3.10%  (p=0.000 n=10+10)

@drakkan
Copy link
Collaborator

drakkan commented Dec 9, 2020

Hi,

with this patch applied I have a failure in SFTPGo test cases here. Basically the server calls channel.Close() while a client is connected and uploading a file. It seems that the client does not disconnect anymore in this case. I'll try to write a proper standalone reproducer and/or a test case the next weekend, thank you

@puellanivis
Copy link
Collaborator Author

puellanivis commented Jan 22, 2021

I have committed at least somewhat of an improvement on the WriteTo here, but it still doesn’t really do the reads in a particularly good parallelized way. But the reads and writes are at least done in parallel. Getting the reads to work in parallel is going to require a separate goroutine that acts like a work-feed, pushing work to do to the reader, that then does the work.

I’m not sure if I want to do that improvement now, or in a follow up. I think the general improvements here have been waiting long enough, that we can commit this, and cut a release, then follow up with a more parallelized WriteTo in another PR.

P.S.: I mean, after we’ve resolve the apparent regression issue pointed out from @drakkan

@drakkan
Copy link
Collaborator

drakkan commented Jan 22, 2021

I have committed at least somewhat of an improvement on the WriteTo here, but it still doesn’t really do the reads in a particularly good parallelized way. But the reads and writes are at least done in parallel. Getting the reads to work in parallel is going to require a separate goroutine that acts like a work-feed, pushing work to do to the reader, that then does the work.

I’m not sure if I want to do that improvement now, or in a follow up. I think the general improvements here have been waiting long enough, that we can commit this, and cut a release, then follow up with a more parallelized WriteTo in another PR.

P.S.: I mean, after we’ve resolve the apparent regression issue pointed out from @drakkan

oops sorry, I'm using my branch and I completely forgot about this issue.

Are you unable to reproduce the issue and do you want a reproducer or a test case? If so I'll try to find some time, thanks

@drakkan
Copy link
Collaborator

drakkan commented Jan 22, 2021

Hi,

I finally did some tests.

As soon as I close the channel from SFTPGo I see this log client side:

./sftpclienttest 
start copy
copy finished, bytes 3014656 err: Connection Lost

but since I'm limiting bandwidth server side, SFTPGo hasn't received 3014656 bytes yet, so the connection stays open (server side) until SFTPGo receives all the pending data.

So in my test case I just need to increase the wait timeout, now it takes about 5 seconds while before the pending data was sent in less than 1 second.

There are now more parallel writes and therefore more pending data.

In short this is not a bug. Sorry for the noise and being so lazy

@puellanivis
Copy link
Collaborator Author

Hm, 🤔 I think we would prefer that the ReadFrom does not ever return until all the pending data has been transferred. Though, I guess this test is done by sending something like /dev/zero over a restricted pipe, then suddenly cutting the connection, and then seeing how long it takes both client and server to finish up? So, the client ends quite abruptly, because it is the source, and notices immediately a disconnect, while on the server, there’s a bunch of pending data trickling in?

If so, there is a certain amount of “maxConcurrency” that we should see. It might be good to tune it and find an appropriate timeout value based on that amount of maxConcurrency * maxPacketSize. Then that should hold regardless. Yeah, it’s kind of interesting that even though we’re doing things in parallel, the data on the pipe is still eventually pushed through sequentially, so increasing concurrency == longer stream. 😆

@drakkan
Copy link
Collaborator

drakkan commented Jan 23, 2021

Hi,

to understand the issue I used this test program

package main

import (
	"crypto/rand"
	"fmt"
	"io"
	"io/ioutil"
	"net"
	"os"
	"path/filepath"

	"github.com/pkg/sftp"
	"golang.org/x/crypto/ssh"
)

func main() {
	username := "b"
	password := "password"
	addr := "127.0.0.1:2022"
	// create a big enough test file
	content := make([]byte, 10485760)
	_, err := rand.Read(content)
	if err != nil {
		panic(err)
	}
	testFilePath := filepath.Join(os.TempDir(), "testfile")
	err = ioutil.WriteFile(testFilePath, content, os.ModePerm)
	if err != nil {
		panic(err)
	}

	config := &ssh.ClientConfig{
		User: username,
		HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
			return nil
		},
		Auth: []ssh.AuthMethod{ssh.Password(password)},
	}
	conn, err := ssh.Dial("tcp", addr, config)
	if err != nil {
		panic(err)
	}
	sftpClient, err := sftp.NewClient(conn)
	if err != nil {
		panic(err)
	}
	defer sftpClient.Close()

	src, err := os.Open(testFilePath)
	if err != nil {
		panic(err)
	}
	defer src.Close()

	dst, err := sftpClient.Create("testupload")
	if err != nil {
		panic(err)
	}
	defer dst.Close()
	fmt.Printf("start copy\n")
	n, err := io.Copy(dst, src)
	fmt.Printf("copy finished, bytes %v err %v\n", n, err)
}

after the upload starts, from the SFTPGo web interface I select the connection and click "Disconnect", this will call channel.Close() server side.

The test program exits printing something like this:

./sftpclienttest 
start copy
copy finished, bytes 2752512 err Connection Lost

from the SFTPGo web interface I see something like this:

Schermata del 2021-01-23 13-09-12

so the connection is still alive even if the sftpclient program is now dead.

The connection ends when the size is the same one printed from the client. I think these data are accumulated inside a crypto/ssh internal buffer.

I could be wrong but I don't see any issue here:

  1. the client disconnects as soon as the channel is closed
  2. the server reads all the accumulated data and then closes the connection

@drakkan
Copy link
Collaborator

drakkan commented Jan 23, 2021

If I repeat the test using pkg/sftp v1.12 I have this output client side:

./sftpclienttest 
start copy
copy finished, bytes 2457600 err <nil>

so no error is detected, you fixed this, great!

Server side I have:

"Unexpected error for transfer, path: \"/tmp/test_home/testupload\", error: \"unexpected EOF\" bytes sent: 0, bytes received: 2457600 transfer running since 18750 ms"

the received bytes (2457600 in this case) always match the ones printed from the client.

If I use this branch the received bytes are always a bit smaller, for example:

./sftpclienttest 
start copy
copy finished, bytes 3080192 err Connection Lost

and the server logs:

"Unexpected error for transfer, path: \"/tmp/test_home/testupload\", error: \"unexpected EOF\" bytes sent: 0, bytes received: 3014656 transfer running since 23000 ms"

do you mean this in the comment above?

@puellanivis
Copy link
Collaborator Author

I was just trying to understand the scope of what you were reporting. The fewer bytes on the server side could be from an error happening ahead of the end of the full transfer, which would result in that error prioritizing ahead of the end of the full transfer.

As long as you’re good with things, I’m probably good with things. Although, I’ve kind of wanted to build some tests of the parallel execution to make sure that it is actually commiting things in order, but I think it would require reworking the ReadFrom to accommodate an arbitrary max data size, so that it splits up a file very finely, and make sure the transfer all happens in order still. That way we wouldn’t need a super large file to get the parallelism to activate. I’ll take a look, but it might be able to be done as a patch level change?

@drakkan
Copy link
Collaborator

drakkan commented Jan 24, 2021

Hi,

I'm only reporting the difference between v1.12/git master and this branch just in case you want to indagate.

I think losing some bytes in error cases is not a problem

@puellanivis
Copy link
Collaborator Author

Agree, dropping bytes is not automatically bad in an error state. At least, as long as any written value reflects actual sequential bytes written.

Hm. 🤔 But also now that I think about it, it’s probably technically possible that we could write something with this code like AAABB0CCC and written == 5. This might cause issues in some cases, were it is assumed that written == 5 means no bytes were ever written after 5, rather than just 5 bytes only were sequentially written.

Although, without reverting to a sequential writing method, I don’t think there is any way to guarantee that we don’t possibly perform an overwrite like this… But then, callers of io.Reader.Read() need to regularly trunc their read-buffers to the length actually read. So maybe as long as we give a good documentation of this behavior, (which should still only ever happen in an error situation), we might be able to avoid confusion.

I’ll work on drafting some language. Should be something like, “note in the case of an error, written will reflect the full length of sequential bytes written without error, however, due to parallel work, additional bytes may possibly be written after that length. You may need to Truncate() the file to this length after an error to prevent bugs.”

@drakkan
Copy link
Collaborator

drakkan commented Jan 25, 2021

Hi,

please consider the upload/resume use case:

  • a client starts an upload and an error happen
  • the server writes all the received bytes to the destination file based on the given offset
  • the client read the actual size on the server and restart from that point.

If the written bytes are not sequential the resume will produce a corrupted file. Am I missing something? (I haven't read the code)

@puellanivis
Copy link
Collaborator Author

Yes, that’s exactly the case that I’m referring to. The problem is that we cannot just automatically Truncate the file ourselves, because of this case:

  • File contains: AAABBBCCCDDD.
  • we try to write XXXYYYZZZ to File, so that we want to end up with XXXYYYZZZDDD.
  • we actually write only XXX at offset 0, YY at offset 3, and ZZZ at offset 6, so we end up with XXXYYBZZZDDD returning a written == 5.
  • If we use File.Truncate(5) here, we’ll destroy the file, and in particular truncate out the DDD data that might not be recoverable.
  • The code should recover by knowing 5 bytes were written correctly, so it only needs to write YZZZ at offset 5.
  • we then write YZZ at offset 5, and Z at offset 8, giving the correct XXXYYYZZZDDD.

However, if we’re transferring a file, and there is no pre-existing data then we have:

  • File contains: `` (nothing).
  • we try to write XXXYYYZZZ to File, so that we end up with XXXYYYZZZ.
  • we actually write only XXX at offset 0, YY at offset 3, and ZZZ at offset 6, so we end up with XXXYY0ZZZ returning written == 5.
  • we got an error, and we know there is no valid data after the written == 5, so we can just File.Truncate(5) and we get: XXXYY.
  • We can now either quit the program, and/or restart the write at from offset 5 ourselves.

@drakkan
Copy link
Collaborator

drakkan commented Jan 25, 2021

Yes, that’s exactly the case that I’m referring to. The problem is that we cannot just automatically Truncate the file ourselves, because of this case:

  • File contains: AAABBBCCCDDD.
  • we try to write XXXYYYZZZ to File, so that we want to end up with XXXYYYZZZDDD.
  • we actually write only XXX at offset 0, YY at offset 3, and ZZZ at offset 6, so we end up with XXXYYBZZZDDD returning a written == 5.
  • If we use File.Truncate(5) here, we’ll destroy the file, and in particular truncate out the DDD data that might not be recoverable.
  • The code should recover by knowing 5 bytes were written correctly, so it only needs to write YZZZ at offset 5.
  • we then write YZZ at offset 5, and Z at offset 8, giving the correct XXXYYYZZZDDD.

However, if we’re transferring a file, and there is no pre-existing data then we have:

  • File contains: `` (nothing).
  • we try to write XXXYYYZZZ to File, so that we end up with XXXYYYZZZ.
  • we actually write only XXX at offset 0, YY at offset 3, and ZZZ at offset 6, so we end up with XXXYY0ZZZ returning written == 5.
  • we got an error, and we know there is no valid data after the written == 5, so we can just File.Truncate(5) and we get: XXXYY.

this will probably not work if the client is killed/forcibly terminated or similar. It is a separate/additional step.

Please read this part of the specs:

 SSH_FXF_APPEND_DATA
      Data is always written at the end of the file.  The offset field
      of SSH_FXP_WRITE requests is ignored.

What do you think about making non-sequential writes configurable?

  • We can now either quit the program, and/or restart the write at from offset 5 ourselves.

@puellanivis
Copy link
Collaborator Author

I can definitely change the code around to do sequential confirmed writes, but that is also going to kill a lot of the performance gains for high latency connections.

On the other hand, it makes the most common case code extremely simplified.

@drakkan
Copy link
Collaborator

drakkan commented Jan 25, 2021

I can definitely change the code around to do sequential confirmed writes, but that is also going to kill a lot of the performance gains for high latency connections.

On the other hand, it makes the most common case code extremely simplified.

If we don't want to make sequential writes configurable I think this is the best thing to do.

For what I understand sequential writes can leads to unexpected bugs if the client process is forcibly terminated.

We still need to allow sequential writes for SFTP servers that ignore the offset in SSH_FXF_APPEND_DATA (our current server implementation never ignores offsets and OpenSSH seems to do the same).

Thank you for all these efforts!

@puellanivis
Copy link
Collaborator Author

If we don't want to make sequential writes configurable I think this is the best thing to do.

For what I understand sequential writes can leads to unexpected bugs if the client process is forcibly terminated.

Do you mean non-sequential writes here? Because sequential writes are kind of the most reliable way to avoid bugs, even if the client process terminates. At least in that case there isn’t a possibility of an out-of-order write to a file.

I’m thinking I make two parallel types of functions, one sequential (like readFromSequential now), and the other… I don’t know what best to call it. 😆 Then we can consider if we want to have a ReadFrom that can switch between the two, so we can use an option flag to switch between them and default to sequential, or if we just straight make ReadFrom sequential, and to get the out-of-order parallel one you have to call a separate function type.

My only remaining concern then is the WriteAt (and thus `Write as well) function itself, it’s also doing a lot of potentially out-of-order writing when given large buffers.

@drakkan
Copy link
Collaborator

drakkan commented Jan 25, 2021

If we don't want to make sequential writes configurable I think this is the best thing to do.
For what I understand sequential writes can leads to unexpected bugs if the client process is forcibly terminated.

Do you mean non-sequential writes here? Because sequential writes are kind of the most reliable way to avoid bugs, even if the client process terminates. At least in that case there isn’t a possibility of an out-of-order write to a file.

yes, sorry for the typo

I’m thinking I make two parallel types of functions, one sequential (like readFromSequential now), and the other… I don’t know what best to call it. Then we can consider if we want to have a ReadFrom that can switch between the two, so we can use an option flag to switch between them and default to sequential, or if we just straight make ReadFrom sequential, and to get the out-of-order parallel one you have to call a separate function type.

as long as it is configurable it is good for me

My only remaining concern then is the WriteAt (and thus `Write as well) function itself, it’s also doing a lot of potentially out-of-order writing when given large buffers.

the user have to explictly set a max packet > 32768 to trigger this usage. If so I think a documentation note is enough

@puellanivis
Copy link
Collaborator Author

the user have to explictly set a max packet > 32768 to trigger this usage. If so I think a documentation note is enough

No, this happens any time the write buffer is > max packet, regardless of whatever the user has set explicitly. (My idea on the out-of-order read/write testings was even to set a maxPacket = 3 or something to force massively parallel reading/writing.)

So by default, if I try and write say, a 64k buffer to an SFTP file, it will get split up into two packets and sent down the line in parallel.

@drakkan
Copy link
Collaborator

drakkan commented Jan 25, 2021

the user have to explictly set a max packet > 32768 to trigger this usage. If so I think a documentation note is enough

No, this happens any time the write buffer is > max packet, regardless of whatever the user has set explicitly. (My idea on the out-of-order read/write testings was even to set a maxPacket = 3 or something to force massively parallel reading/writing.)

So by default, if I try and write say, a 64k buffer to an SFTP file, it will get split up into two packets and sent down the line in parallel.

Oh yes

if len(b) <= f.c.maxPacket

but since io.Copy use a 32768 buffer I still think a doc note could be enough

@puellanivis
Copy link
Collaborator Author

Right, but io.Copy is going to call into ReadFrom or WriteTo if those are implemented, rather than calling into Write itself.

@puellanivis
Copy link
Collaborator Author

puellanivis commented Feb 22, 2021

OK, default behavior is now to avoid concurrent/out-of-order writes. You must explicitly enable it with UseConcurrentWrites().

I have also written in concurrent reads into WriteTo(), which is still safe, since it is a sequential write process.

I found a bug in my ReadAt that started from f.offset instead of from the given off argument. 😬

I also realized that there was a race condition accessing and setting f.offset in File.Seek(). Now, every subfunction that accesses f.offset should be properly locked.

I’ve polished all the other code as well. I’ve tried to document all the concurrent code well enough.

If we can get some good solid testing on this, and/or eyeballs on the code. 😰

@drakkan
Copy link
Collaborator

drakkan commented Feb 22, 2021

@puellanivis thank you for working on this.

I did some tests and the issue reported above is fixed. Also SFTPGo test cases have no issues now. I also did some manual testing looking at used file descriptors etc.. and all seems fine.

I don't readed the code in depth, at first look it seems ok.

I wonder why for packets methods some methods have a pointer receiver and others don't. What do you think about something like this?

diff --git a/client.go b/client.go
index 8ea9261..5ce2726 100644
--- a/client.go
+++ b/client.go
@@ -916,7 +916,7 @@ func (f *File) Read(b []byte) (int, error) {
 func (f *File) readChunkAt(ch chan result, b []byte, off int64) (n int, err error) {
        for err == nil && n < len(b) {
                id := f.c.nextID()
-               typ, data, err := f.c.sendPacket(ch, sshFxpReadPacket{
+               typ, data, err := f.c.sendPacket(ch, &sshFxpReadPacket{
                        ID:     id,
                        Handle: f.handle,
                        Offset: uint64(off) + uint64(n),
@@ -1282,7 +1282,7 @@ func (f *File) Write(b []byte) (int, error) {
 }
 
 func (f *File) writeChunkAt(ch chan result, b []byte, off int64) (int, error) {
-       typ, data, err := f.c.sendPacket(ch, sshFxpWritePacket{
+       typ, data, err := f.c.sendPacket(ch, &sshFxpWritePacket{
                ID:     f.c.nextID(),
                Handle: f.handle,
                Offset: uint64(off),
diff --git a/packet.go b/packet.go
index 55a6021..18d0b20 100644
--- a/packet.go
+++ b/packet.go
@@ -603,9 +603,9 @@ type sshFxpReadPacket struct {
        Handle string
 }
 
-func (p sshFxpReadPacket) id() uint32 { return p.ID }
+func (p *sshFxpReadPacket) id() uint32 { return p.ID }
 
-func (p sshFxpReadPacket) MarshalBinary() ([]byte, error) {
+func (p *sshFxpReadPacket) MarshalBinary() ([]byte, error) {
        l := 4 + 1 + 4 + // uint32(length) + byte(type) + uint32(id)
                4 + len(p.Handle) +
                8 + 4 // uint64 + uint32
@@ -716,9 +716,9 @@ type sshFxpWritePacket struct {
        Data   []byte
 }
 
-func (p sshFxpWritePacket) id() uint32 { return p.ID }
+func (p *sshFxpWritePacket) id() uint32 { return p.ID }
 
-func (p sshFxpWritePacket) marshalPacket() ([]byte, []byte, error) {
+func (p *sshFxpWritePacket) marshalPacket() ([]byte, []byte, error) {
        l := 4 + 1 + 4 + // uint32(length) + byte(type) + uint32(id)
                4 + len(p.Handle) +
                8 + // uint64
@@ -734,7 +734,7 @@ func (p sshFxpWritePacket) marshalPacket() ([]byte, []byte, error) {
        return b, p.Data, nil
 }
 
-func (p sshFxpWritePacket) MarshalBinary() ([]byte, error) {
+func (p *sshFxpWritePacket) MarshalBinary() ([]byte, error) {
        header, payload, err := p.marshalPacket()
        return append(header, payload...), err
 }
diff --git a/packet_test.go b/packet_test.go
index 8b16be6..0ba78c7 100644
--- a/packet_test.go
+++ b/packet_test.go
@@ -155,7 +155,7 @@ var sendPacketTests = []struct {
                Pflags: flags(os.O_RDONLY),
        }, []byte{0x0, 0x0, 0x0, 0x15, 0x3, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x4, 0x2f, 0x66, 0x6f, 0x6f, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0}},
 
-       {sshFxpWritePacket{
+       {&sshFxpWritePacket{
                ID:     124,
                Handle: "foo",
                Offset: 13,
@@ -321,7 +321,7 @@ func BenchmarkMarshalOpen(b *testing.B) {
 func BenchmarkMarshalWriteWorstCase(b *testing.B) {
        data := make([]byte, 32*1024)
        for i := 0; i < b.N; i++ {
-               sp(sshFxpWritePacket{
+               sp(&sshFxpWritePacket{
                        ID:     1,
                        Handle: "someopaquehandle",
                        Offset: 0,
@@ -334,7 +334,7 @@ func BenchmarkMarshalWriteWorstCase(b *testing.B) {
 func BenchmarkMarshalWrite1k(b *testing.B) {
        data := make([]byte, 1024)
        for i := 0; i < b.N; i++ {
-               sp(sshFxpWritePacket{
+               sp(&sshFxpWritePacket{
                        ID:     1,
                        Handle: "someopaquehandle",
                        Offset: 0,

If I understand the code correctly the issue reported here is not fixed, am I wrong?

@puellanivis
Copy link
Collaborator Author

After some (annoying to setup but) quick benchmarks, yeah, it does look like using pointers here would work slightly better.

@drakkan
Copy link
Collaborator

drakkan commented Feb 22, 2021

After some (annoying to setup but) quick benchmarks, yeah, it does look like using pointers here would work slightly better.

Thank you for confirming, do you want to integrate this change yourself or do you prefer a separate PR?

@drakkan
Copy link
Collaborator

drakkan commented Feb 22, 2021

Thank you! I think this PR should be merged now so that others can test it too. Do you have other pending changes?

@puellanivis
Copy link
Collaborator Author

I don’t think there’s anything on my todo list, so it should be good to merge. Then we can get some people to maybe bleeding-edge update to master, and give it a good work out ahead of cutting a release.

@drakkan drakkan merged commit b8102da into master Feb 22, 2021
@drakkan
Copy link
Collaborator

drakkan commented Feb 22, 2021

This branch is used in sftpgo main now, thank you

@drakkan
Copy link
Collaborator

drakkan commented Apr 3, 2021

@puellanivis thanks for this feature, I never thought I'd use concurrent writes, instead they are very useful on high latency networks, I had a performance improvement from 5MB/s to 15MB/s by enabling them, great!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants