Skip to content

Commit

Permalink
Fixed Vectored IO when a partial operation occurs (#3736)
Browse files Browse the repository at this point in the history
Fixed Vectored IO when a partial operation occurs (help rx part of #2904)
  • Loading branch information
ptitSeb authored Apr 3, 2023
1 parent 6cdfd09 commit 5a8e1e3
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 13 deletions.
19 changes: 15 additions & 4 deletions lib/wasi/src/syscalls/wasi/fd_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,7 @@ fn fd_read_internal<M: MemorySize>(
.map_err(mem_error_to_wasi)?
.access()
.map_err(mem_error_to_wasi)?;

total_read +=
let local_read =
match handle.read(buf.as_mut()).await.map_err(|err| {
let err = From::<std::io::Error>::from(err);
match err {
Expand All @@ -196,6 +195,10 @@ fn fd_read_internal<M: MemorySize>(
Err(_) if total_read > 0 => break,
Err(err) => return Err(err),
};
total_read += local_read;
if local_read != buf.len() {
break;
}
}
Ok(total_read)
}
Expand Down Expand Up @@ -235,9 +238,13 @@ fn fd_read_internal<M: MemorySize>(
.access()
.map_err(mem_error_to_wasi)?;

total_read += socket
let local_read = socket
.recv(tasks.deref(), buf.as_mut_uninit(), fd_flags)
.await?;
total_read += local_read;
if total_read != buf.len() {
break;
}
}
Ok(total_read)
},
Expand Down Expand Up @@ -279,8 +286,12 @@ fn fd_read_internal<M: MemorySize>(
.access()
.map_err(mem_error_to_wasi)?;

total_read +=
let local_read =
virtual_fs::AsyncReadExt::read(&mut pipe, buf.as_mut()).await?;
total_read += local_read;
if local_read != buf.len() {
break;
}
}
Ok(total_read)
}
Expand Down
30 changes: 24 additions & 6 deletions lib/wasi/src/syscalls/wasi/fd_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,15 @@ fn fd_write_internal<M: MemorySize>(
.map_err(mem_error_to_wasi)?
.access()
.map_err(mem_error_to_wasi)?;
written += match handle.write(buf.as_ref()).await {
let local_written = match handle.write(buf.as_ref()).await {
Ok(s) => s,
Err(_) if written > 0 => break,
Err(err) => return Err(map_io_err(err)),
};
written += local_written;
if local_written != buf.len() {
break;
}
}
Ok(written)
}
Expand Down Expand Up @@ -167,7 +171,12 @@ fn fd_write_internal<M: MemorySize>(
.map_err(mem_error_to_wasi)?
.access()
.map_err(mem_error_to_wasi)?;
sent += socket.send(tasks.deref(), buf.as_ref(), fd_flags).await?;
let local_sent =
socket.send(tasks.deref(), buf.as_ref(), fd_flags).await?;
sent += local_sent;
if local_sent != buf.len() {
break;
}
}
Ok(sent)
})?);
Expand All @@ -180,9 +189,13 @@ fn fd_write_internal<M: MemorySize>(
.slice(&memory, iovs.buf_len)
.map_err(mem_error_to_wasi));
let buf = wasi_try_ok!(buf.access().map_err(mem_error_to_wasi));
written += wasi_try_ok!(
let local_written = wasi_try_ok!(
std::io::Write::write(pipe, buf.as_ref()).map_err(map_io_err)
);
written += local_written;
if local_written != buf.len() {
break;
}
}
(written, false)
}
Expand Down Expand Up @@ -221,9 +234,14 @@ fn fd_write_internal<M: MemorySize>(
.slice(&memory, iovs.buf_len)
.map_err(mem_error_to_wasi));
let buf = wasi_try_ok!(buf.access().map_err(mem_error_to_wasi));
written += wasi_try_ok!(
std::io::Write::write(buffer, buf.as_ref()).map_err(map_io_err)
);
let local_written =
wasi_try_ok!(
std::io::Write::write(buffer, buf.as_ref()).map_err(map_io_err)
);
written += local_written;
if local_written != buf.len() {
break;
}
}
(written, false)
}
Expand Down
6 changes: 5 additions & 1 deletion lib/wasi/src/syscalls/wasix/sock_recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,18 @@ fn sock_recv_internal<M: MemorySize>(
.access()
.map_err(mem_error_to_wasi)?;

total_read += match socket
let local_read = match socket
.recv(env.tasks().deref(), buf.as_mut_uninit(), fd.flags)
.await
{
Ok(s) => s,
Err(_) if total_read > 0 => break,
Err(err) => return Err(err),
};
total_read += local_read;
if local_read != buf.len() {
break;
}
}
Ok(total_read)
},
Expand Down
6 changes: 5 additions & 1 deletion lib/wasi/src/syscalls/wasix/sock_send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,18 @@ pub fn sock_send<M: MemorySize>(
.map_err(mem_error_to_wasi)?
.access()
.map_err(mem_error_to_wasi)?;
sent += match socket
let local_sent = match socket
.send(env.tasks().deref(), buf.as_ref(), fd.flags)
.await
{
Ok(s) => s,
Err(_) if sent > 0 => break,
Err(err) => return Err(err),
};
sent += local_sent;
if local_sent != buf.len() {
break;
}
}
Ok(sent)
})
Expand Down
6 changes: 5 additions & 1 deletion lib/wasi/src/syscalls/wasix/sock_send_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,18 @@ pub fn sock_send_to<M: MemorySize>(
.map_err(mem_error_to_wasi)?
.access()
.map_err(mem_error_to_wasi)?;
sent += match socket
let local_sent = match socket
.send_to::<M>(env.tasks().deref(), buf.as_ref(), addr, fd.flags)
.await
{
Ok(s) => s,
Err(_) if sent > 0 => break,
Err(err) => return Err(err),
};
sent += local_sent;
if local_sent != buf.len() {
break;
}
}
Ok(sent)
},
Expand Down

0 comments on commit 5a8e1e3

Please sign in to comment.