Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: optimizing the chances of large write in copy_bidirectional and copy #6532

Merged
merged 10 commits into from
Jun 4, 2024
55 changes: 30 additions & 25 deletions tokio/src/io/util/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,9 @@ impl CopyBuffer {
// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
loop {
// If our buffer is empty, then we need to read some data to
// continue.
if self.pos == self.cap && !self.read_done {
self.pos = 0;
self.cap = 0;

// If there is some space left in our buffer, then we try to read some
// data to continue, thus maximizing the chances of a large write.
if self.cap < self.buf.len() && !self.read_done {
match self.poll_fill_buf(cx, reader.as_mut()) {
Poll::Ready(Ok(())) => {
#[cfg(any(
Expand Down Expand Up @@ -131,25 +128,29 @@ impl CopyBuffer {
return Poll::Ready(Err(err));
}
Poll::Pending => {
// Try flushing when the reader has no progress to avoid deadlock
// when the reader depends on buffered writer.
if self.need_flush {
ready!(writer.as_mut().poll_flush(cx))?;
#[cfg(any(
feature = "fs",
feature = "io-std",
feature = "net",
feature = "process",
feature = "rt",
feature = "signal",
feature = "sync",
feature = "time",
))]
coop.made_progress();
self.need_flush = false;
}
// Ignore pending reads when our buffer is not empty, because
// we can try to write data immediately.
if self.pos == self.cap {
// Try flushing when the reader has no progress to avoid deadlock
// when the reader depends on buffered writer.
if self.need_flush {
ready!(writer.as_mut().poll_flush(cx))?;
#[cfg(any(
feature = "fs",
feature = "io-std",
feature = "net",
feature = "process",
feature = "rt",
feature = "signal",
feature = "sync",
feature = "time",
))]
coop.made_progress();
self.need_flush = false;
}

return Poll::Pending;
return Poll::Pending;
}
}
}
}
Expand Down Expand Up @@ -188,9 +189,13 @@ impl CopyBuffer {
"writer returned length larger than input slice"
);

// All data has been written, the buffer can be considered empty again
self.pos = 0;
self.cap = 0;

// If we've written all the data and we've seen EOF, flush out the
// data and finish the transfer.
if self.pos == self.cap && self.read_done {
if self.read_done {
ready!(writer.as_mut().poll_flush(cx))?;
#[cfg(any(
feature = "fs",
Expand Down
Loading