From 06a750db890ef14d0fa40f4318dabed80dc3e512 Mon Sep 17 00:00:00 2001 From: "M.Amin Rayej" Date: Mon, 28 Aug 2023 14:10:35 +0330 Subject: [PATCH 1/6] first draft --- tokio/src/fs/file.rs | 71 ++++++++++++++++++++++++++++++++++++++++ tokio/src/io/blocking.rs | 14 ++++++++ 2 files changed, 85 insertions(+) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 1e88f30434e..12993a4b11c 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -725,6 +725,77 @@ impl AsyncWrite for File { } } + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); + let me = self.get_mut(); + let inner = me.inner.get_mut(); + + if let Some(e) = inner.last_write_err.take() { + return Ready(Err(e.into())); + } + + loop { + match inner.state { + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap(); + + let seek = if !buf.is_empty() { + Some(SeekFrom::Current(buf.discard_read())) + } else { + None + }; + + let n = buf.copy_from_bufs(bufs)?; + let std = me.std.clone(); + + let blocking_task_join_handle = spawn_mandatory_blocking(move || { + let res = if let Some(seek) = seek { + (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) + } else { + buf.write_to(&mut &*std) + }; + + (Operation::Write(res), buf) + }) + .ok_or_else(|| { + io::Error::new(io::ErrorKind::Other, "background task failed") + })?; + + inner.state = Busy(blocking_task_join_handle); + + return Ready(Ok(n)); + } + Busy(ref mut rx) => { + let (op, buf) = ready!(Pin::new(rx).poll(cx))?; + inner.state = Idle(Some(buf)); + + match op { + Operation::Read(_) => { + // We don't care about the result here. The fact + // that the cursor has advanced will be reflected in + // the next iteration of the loop + continue; + } + Operation::Write(res) => { + // If the previous write was successful, continue. + // Otherwise, error. + res?; + continue; + } + Operation::Seek(_) => { + // Ignore the seek + continue; + } + } + } + } + } + } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { ready!(crate::trace::trace_leaf(cx)); let inner = self.inner.get_mut(); diff --git a/tokio/src/io/blocking.rs b/tokio/src/io/blocking.rs index 416573e9732..02977737436 100644 --- a/tokio/src/io/blocking.rs +++ b/tokio/src/io/blocking.rs @@ -276,5 +276,19 @@ cfg_fs! { self.buf.truncate(0); ret } + + pub(crate) fn copy_from_bufs(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result { + let len = bufs.iter().map(|b| b.len()).sum(); + if len > MAX_BUF { + return Err(io::Error::new(io::ErrorKind::Other, "The sum of bufs is too large")); + } + + self.buf.reserve(len); + for buf in bufs { + self.buf.extend_from_slice(buf); + } + + Ok(len) + } } } From cb53930b6c83270e1225a1fe03ef9de704e076fe Mon Sep 17 00:00:00 2001 From: "M.Amin Rayej" Date: Mon, 28 Aug 2023 21:25:03 +0330 Subject: [PATCH 2/6] do not return result --- tokio/src/fs/file.rs | 2 +- tokio/src/io/blocking.rs | 21 +++++++++++++-------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 12993a4b11c..59a5c2a992d 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -749,7 +749,7 @@ impl AsyncWrite for File { None }; - let n = buf.copy_from_bufs(bufs)?; + let n = buf.copy_from_bufs(bufs); let std = me.std.clone(); let blocking_task_join_handle = spawn_mandatory_blocking(move || { diff --git a/tokio/src/io/blocking.rs b/tokio/src/io/blocking.rs index 02977737436..eefbfc697c2 100644 --- a/tokio/src/io/blocking.rs +++ b/tokio/src/io/blocking.rs @@ -277,18 +277,23 @@ cfg_fs! { ret } - pub(crate) fn copy_from_bufs(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result { - let len = bufs.iter().map(|b| b.len()).sum(); - if len > MAX_BUF { - return Err(io::Error::new(io::ErrorKind::Other, "The sum of bufs is too large")); - } + pub(crate) fn copy_from_bufs(&mut self, bufs: &[io::IoSlice<'_>]) -> usize { + assert!(self.is_empty()); + + let n = bufs.iter().map(|b| b.len()).sum::().min(MAX_BUF); - self.buf.reserve(len); + let mut rem = n; for buf in bufs { - self.buf.extend_from_slice(buf); + if rem == 0 { + break + } + + let len = buf.len().min(rem); + self.buf.extend_from_slice(&buf[..len]); + rem -= len; } - Ok(len) + n } } } From 619728f230253a180955c2d6f48fc765e4b880b3 Mon Sep 17 00:00:00 2001 From: "M.Amin Rayej" Date: Tue, 29 Aug 2023 00:29:38 +0330 Subject: [PATCH 3/6] add test for write_vectored on files --- tokio/tests/fs_file.rs | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tokio/tests/fs_file.rs b/tokio/tests/fs_file.rs index 40bd4fce564..1e35162be29 100644 --- a/tokio/tests/fs_file.rs +++ b/tokio/tests/fs_file.rs @@ -2,6 +2,7 @@ #![cfg(all(feature = "full", not(target_os = "wasi")))] // WASI does not support all fs operations use std::io::prelude::*; +use std::io::IoSlice; use tempfile::NamedTempFile; use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; @@ -49,6 +50,36 @@ async fn basic_write_and_shutdown() { assert_eq!(file, HELLO); } +#[tokio::test] +async fn write_vectored() { + let tempfile = tempfile(); + + let mut file = File::create(tempfile.path()).await.unwrap(); + + file.write_vectored(&[IoSlice::new(HELLO), IoSlice::new(HELLO)]) + .await + .unwrap(); + file.flush().await.unwrap(); + + let file = std::fs::read(tempfile.path()).unwrap(); + assert_eq!(file, [HELLO, HELLO].concat()); +} + +#[tokio::test] +async fn write_vectored_and_shutdown() { + let tempfile = tempfile(); + + let mut file = File::create(tempfile.path()).await.unwrap(); + + file.write_vectored(&[IoSlice::new(HELLO), IoSlice::new(HELLO)]) + .await + .unwrap(); + file.shutdown().await.unwrap(); + + let file = std::fs::read(tempfile.path()).unwrap(); + assert_eq!(file, [HELLO, HELLO].concat()); +} + #[tokio::test] async fn rewind_seek_position() { let tempfile = tempfile(); From d4c480aef1421f953e23b01ed36cfa0dd2ece65a Mon Sep 17 00:00:00 2001 From: "M.Amin Rayej" Date: Tue, 29 Aug 2023 10:58:39 +0330 Subject: [PATCH 4/6] fix clippy error --- tokio/tests/fs_file.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tokio/tests/fs_file.rs b/tokio/tests/fs_file.rs index 1e35162be29..2b47f799327 100644 --- a/tokio/tests/fs_file.rs +++ b/tokio/tests/fs_file.rs @@ -56,9 +56,11 @@ async fn write_vectored() { let mut file = File::create(tempfile.path()).await.unwrap(); - file.write_vectored(&[IoSlice::new(HELLO), IoSlice::new(HELLO)]) + let ret = file + .write_vectored(&[IoSlice::new(HELLO), IoSlice::new(HELLO)]) .await .unwrap(); + assert_eq!(ret, HELLO.bytes().count() * 2); file.flush().await.unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); @@ -71,9 +73,11 @@ async fn write_vectored_and_shutdown() { let mut file = File::create(tempfile.path()).await.unwrap(); - file.write_vectored(&[IoSlice::new(HELLO), IoSlice::new(HELLO)]) + let ret = file + .write_vectored(&[IoSlice::new(HELLO), IoSlice::new(HELLO)]) .await .unwrap(); + assert_eq!(ret, HELLO.bytes().count() * 2); file.shutdown().await.unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); From 66127641dba6e03fb30c3e7cec59d9798c21cbe2 Mon Sep 17 00:00:00 2001 From: "M.Amin Rayej" Date: Tue, 29 Aug 2023 12:52:52 +0330 Subject: [PATCH 5/6] resolve comments --- tokio/src/io/blocking.rs | 6 ++---- tokio/tests/fs_file.rs | 4 ++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/tokio/src/io/blocking.rs b/tokio/src/io/blocking.rs index eefbfc697c2..27d08b1fcc7 100644 --- a/tokio/src/io/blocking.rs +++ b/tokio/src/io/blocking.rs @@ -280,9 +280,7 @@ cfg_fs! { pub(crate) fn copy_from_bufs(&mut self, bufs: &[io::IoSlice<'_>]) -> usize { assert!(self.is_empty()); - let n = bufs.iter().map(|b| b.len()).sum::().min(MAX_BUF); - - let mut rem = n; + let mut rem = MAX_BUF; for buf in bufs { if rem == 0 { break @@ -293,7 +291,7 @@ cfg_fs! { rem -= len; } - n + MAX_BUF - rem } } } diff --git a/tokio/tests/fs_file.rs b/tokio/tests/fs_file.rs index 2b47f799327..6a8b07a7ffe 100644 --- a/tokio/tests/fs_file.rs +++ b/tokio/tests/fs_file.rs @@ -60,7 +60,7 @@ async fn write_vectored() { .write_vectored(&[IoSlice::new(HELLO), IoSlice::new(HELLO)]) .await .unwrap(); - assert_eq!(ret, HELLO.bytes().count() * 2); + assert_eq!(ret, HELLO.len() * 2); file.flush().await.unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); @@ -77,7 +77,7 @@ async fn write_vectored_and_shutdown() { .write_vectored(&[IoSlice::new(HELLO), IoSlice::new(HELLO)]) .await .unwrap(); - assert_eq!(ret, HELLO.bytes().count() * 2); + assert_eq!(ret, HELLO.len() * 2); file.shutdown().await.unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); From 49b89618cf3a331f29b24d6aa273083ee1d5e77c Mon Sep 17 00:00:00 2001 From: "M.Amin Rayej" Date: Tue, 29 Aug 2023 14:32:26 +0330 Subject: [PATCH 6/6] indicate that File has an efficient poll_write_vectored impl --- tokio/src/fs/file.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 59a5c2a992d..cd140c6cc97 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -796,6 +796,10 @@ impl AsyncWrite for File { } } + fn is_write_vectored(&self) -> bool { + true + } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { ready!(crate::trace::trace_leaf(cx)); let inner = self.inner.get_mut();