Skip to content
Closed
62 changes: 39 additions & 23 deletions tokio/src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ enum State {
#[derive(Debug)]
enum Operation {
Read(io::Result<usize>),
Write(io::Result<()>),
Write(io::Result<usize>),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changing Write enum variant from Write(io::Result<()>) to Write(io::Result) so that the pos usize value can be propagated out from spawn_blocking closure

Seek(io::Result<u64>),
}

Expand Down Expand Up @@ -590,14 +590,14 @@ impl AsyncRead for File {

let me = self.get_mut();
let inner = me.inner.get_mut();

loop {
match inner.state {
State::Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();

if !buf.is_empty() || dst.remaining() == 0 {
buf.copy_to(dst);
let copied = buf.copy_to(dst);
// Update position based on bytes copied into the user's buffer
inner.pos += copied as u64;
*buf_cell = Some(buf);
return Poll::Ready(Ok(()));
}
Expand All @@ -617,9 +617,11 @@ impl AsyncRead for File {
let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;

match op {
Operation::Read(Ok(_)) => {
Operation::Read(Ok(n)) => {
buf.copy_to(dst);
inner.state = State::Idle(Some(buf));
// Update position by `n` bytes returned from the read operation
inner.pos += n as u64;
return Poll::Ready(Ok(()));
}
Operation::Read(Err(e)) => {
Expand All @@ -628,7 +630,7 @@ impl AsyncRead for File {
inner.state = State::Idle(Some(buf));
return Poll::Ready(Err(e));
}
Operation::Write(Ok(())) => {
Operation::Write(Ok(_)) => {
assert!(buf.is_empty());
inner.state = State::Idle(Some(buf));
continue;
Expand Down Expand Up @@ -742,14 +744,18 @@ impl AsyncWrite for File {
None
};

let n = buf.copy_from(src, me.max_buf_size);
_ = buf.copy_from(src, me.max_buf_size);
let std = me.std.clone();

let blocking_task_join_handle = spawn_mandatory_blocking(move || {
let n = buf.len();
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
(&*std)
.seek(seek)
.and_then(|_| buf.write_to(&mut &*std))
.map(|_| n)
} else {
buf.write_to(&mut &*std)
buf.write_to(&mut &*std).map(|_| n)
};

(Operation::Write(res), buf)
Expand All @@ -759,8 +765,6 @@ impl AsyncWrite for File {
})?;

inner.state = State::Busy(blocking_task_join_handle);

return Poll::Ready(Ok(n));
Copy link
Contributor Author

@Suryakant-Soni Suryakant-Soni May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning Poll::Ready(Ok(n)) is now deferred to the next polling iterations, allowing inner.pos to be updated post spawn task completion

}
State::Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
Expand All @@ -774,10 +778,15 @@ impl AsyncWrite for File {
continue;
}
Operation::Write(res) => {
// If the previous write was successful, continue.
// If the previous write was successful, Update position & return
// Otherwise, error.
res?;
continue;
match res {
Ok(n) => {
inner.pos += n as u64;
return Poll::Ready(Ok(n));
}
Err(e) => return Poll::Ready(Err(e)),
}
}
Operation::Seek(_) => {
// Ignore the seek
Expand Down Expand Up @@ -813,14 +822,18 @@ impl AsyncWrite for File {
None
};

let n = buf.copy_from_bufs(bufs, me.max_buf_size);
_ = buf.copy_from_bufs(bufs, me.max_buf_size);
let std = me.std.clone();

let blocking_task_join_handle = spawn_mandatory_blocking(move || {
let n = buf.len();
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
(&*std)
.seek(seek)
.and_then(|_| buf.write_to(&mut &*std))
.map(|_| n)
} else {
buf.write_to(&mut &*std)
buf.write_to(&mut &*std).map(|_| n)
};

(Operation::Write(res), buf)
Expand All @@ -830,8 +843,6 @@ impl AsyncWrite for File {
})?;

inner.state = State::Busy(blocking_task_join_handle);

return Poll::Ready(Ok(n));
}
State::Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
Expand All @@ -845,10 +856,15 @@ impl AsyncWrite for File {
continue;
}
Operation::Write(res) => {
// If the previous write was successful, continue.
// If the previous write was successful, Update position and return
// Otherwise, error.
res?;
continue;
match res {
Ok(n) => {
inner.pos += n as u64;
return Poll::Ready(Ok(n));
}
Err(e) => return Poll::Ready(Err(e)),
}
}
Operation::Seek(_) => {
// Ignore the seek
Expand Down Expand Up @@ -973,7 +989,7 @@ impl Inner {

match op {
Operation::Read(_) => Poll::Ready(Ok(())),
Operation::Write(res) => Poll::Ready(res),
Operation::Write(_) => Poll::Ready(Ok(())),
Operation::Seek(_) => Poll::Ready(Ok(())),
}
}
Expand Down
Loading
Loading