Skip to content

Commit

Permalink
Fix WASI pipe to properly store read bytes in temp_buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
fschutt committed Oct 17, 2022
1 parent f6fa5b7 commit 05d74ea
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 39 deletions.
93 changes: 64 additions & 29 deletions lib/c-api/src/wasm_c_api/wasi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub struct wasi_pipe_t {
data: Option<Box<Arc<Mutex<WasiPipeDataWithDestructor>>>>,
}

#[derive(Debug)]
struct WasiPipeDataWithDestructor {
data: Vec<c_char>,
// Buffer of already-read data that is being read into,
Expand All @@ -71,12 +72,17 @@ impl WasiPipeDataWithDestructor {
&mut self,
read_cb: WasiConsoleIoReadCallback,
max_read: Option<usize>,
) -> io::Result<Vec<u8>> {
) -> io::Result<usize> {
const BLOCK_SIZE: usize = 1024;

let mut final_buf = Vec::new();

let max_read = max_read.unwrap_or(usize::MAX);
let max_to_read = max_read.unwrap_or(usize::MAX);
let max_read = max_to_read.saturating_sub(self.temp_buffer.len());
if max_read == 0 {
// there are n bytes being available to read in the temp_buffer
return Ok(max_to_read);
}
let mut cur_read = 0;

// Read bytes until either EOF is reached or max_read bytes are reached
Expand Down Expand Up @@ -107,15 +113,28 @@ impl WasiPipeDataWithDestructor {
));
}

if result == 0 {
let result = result as usize;
if result == 0 || result > temp_buffer.len() {
break; // EOF
}

cur_read += temp_buffer.len();
final_buf.append(&mut temp_buffer);
cur_read += result;
final_buf.extend_from_slice(&temp_buffer[..result]);
}

Ok(final_buf)
let final_buf_len = final_buf.len();

// store the bytes in temp_buffer
self.temp_buffer.extend_from_slice(&final_buf);

// temp_buffer.len() can be smaller than max_read in case we
// encounter EOF earlier than expected
assert!(self.temp_buffer.len() <= max_read);

// return how many bytes were just read
//
// caller has to clear temp_buffer to advance actual reading
Ok(final_buf_len)
}
}

Expand Down Expand Up @@ -161,26 +180,11 @@ impl io::Read for wasi_pipe_t {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let self_read = self.read;
let mut data = self.get_data_mut("read")?;

// fill up buf by draining temp_buffer first, then read more bytes
let bytes_to_read = data.temp_buffer.len().min(buf.len());
let mut temp_buffer_drained: Vec<_> = data.temp_buffer.drain(..bytes_to_read).collect();
assert!(temp_buffer_drained.len() <= buf.len());

// If temp_buffer is exhausted, try reading the remaining bytes from the pipe
let mut bytes_read = bytes_to_read;
if buf.len() >= temp_buffer_drained.len() {
let secondary_bytes_to_read = data.temp_buffer.len().min(buf.len());
data.read_buffer(self_read, Some(secondary_bytes_to_read))?;
temp_buffer_drained
.append(&mut data.temp_buffer.drain(..secondary_bytes_to_read).collect());
bytes_read += secondary_bytes_to_read;
}

assert_eq!(buf.len(), temp_buffer_drained.len());
buf.clone_from_slice(&temp_buffer_drained);

Ok(bytes_read)
let _ = data.read_buffer(self_read, Some(buf.len()))?;
let bytes_to_read = buf.len().min(data.temp_buffer.len());
let bytes_read = data.temp_buffer.drain(..bytes_to_read).collect::<Vec<_>>();
buf[..bytes_read.len()].clone_from_slice(&bytes_read);
Ok(bytes_to_read)
}
}

Expand Down Expand Up @@ -278,7 +282,7 @@ impl VirtualFile for wasi_pipe_t {
fn bytes_available_read(&self) -> Result<Option<usize>, FsError> {
let self_read = self.read;
let mut data = self.get_data_mut("bytes_available_read")?;
data.read_buffer(self_read, None)?;
let _ = data.read_buffer(self_read, None)?;
Ok(Some(data.temp_buffer.len()))
}
fn bytes_available_write(&self) -> Result<Option<usize>, FsError> {
Expand Down Expand Up @@ -507,6 +511,36 @@ pub unsafe extern "C" fn wasi_pipe_flush(ptr: *mut wasi_pipe_t) -> i64 {
}
}

#[test]
fn test_wasi_pipe_with_destructor() {
let mut wasi_pipe_t_ptr = std::ptr::null_mut();
let second_wasi_pipe_t_ptr = unsafe { wasi_pipe_new(&mut wasi_pipe_t_ptr) };
let wasi_pipe_t_ptr = unsafe { &mut *wasi_pipe_t_ptr };
let second_wasi_pipe_t_ptr = unsafe { &mut *second_wasi_pipe_t_ptr };

let data = b"hello".into_iter().map(|v| *v as i8).collect::<Vec<_>>();
let result = unsafe { wasi_pipe_write_bytes(wasi_pipe_t_ptr, data.as_ptr(), data.len()) };
assert_eq!(result, 5);

let bytes_avail = wasi_pipe_t_ptr.bytes_available_read();
assert_eq!(bytes_avail, Ok(Some(0)));

let bytes_avail2 = second_wasi_pipe_t_ptr.bytes_available_read();
assert_eq!(bytes_avail2, Ok(Some(5)));

let mut read_str_ptr = std::ptr::null_mut();
let result = unsafe { wasi_pipe_read_str(second_wasi_pipe_t_ptr, &mut read_str_ptr) };
assert_eq!(result, 6); // hello\0
let buf_slice = unsafe { std::slice::from_raw_parts_mut(read_str_ptr, result as usize) };
assert_eq!(buf_slice[..5], data);

unsafe {
wasi_pipe_delete_str(read_str_ptr);
}
unsafe { wasi_pipe_delete(wasi_pipe_t_ptr) };
unsafe { wasi_pipe_delete(second_wasi_pipe_t_ptr) };
}

#[no_mangle]
pub unsafe extern "C" fn wasi_pipe_read_bytes(
ptr: *const wasi_pipe_t,
Expand Down Expand Up @@ -553,12 +587,13 @@ unsafe fn wasi_pipe_read_bytes_internal(ptr: *const wasi_pipe_t, buf: &mut Vec<u
}
}

let len = target.len() as i64;
*buf = target;
0
len
}

#[no_mangle]
pub unsafe extern "C" fn wasi_pipe_read_str(ptr: *const wasi_pipe_t, buf: *mut *mut c_char) -> i64 {
pub unsafe extern "C" fn wasi_pipe_read_str(ptr: *const wasi_pipe_t, buf: &mut *mut c_char) -> i64 {
use std::ffi::CString;

let mut target = Vec::new();
Expand Down
26 changes: 16 additions & 10 deletions lib/wasi/src/state/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,6 @@ impl Read for WasiPipe {
inner_buf.advance(read);
return Ok(read);
}
} else if !self.block {
return Ok(0);
}
}
let rx = self.rx.lock().unwrap();
Expand All @@ -339,18 +337,26 @@ impl Read for WasiPipe {
s
}
Err(_) => {
// could not immediately receive bytes, so we need to block
match rx.recv() {
Ok(o) => o,
// Errors can happen if the sender has been dropped already
// In this case, just return 0 to indicate that we can't read any
// bytes anymore
Err(_) => {
return Ok(0);
if !self.block {
// If self.block is explicitly set to false, never block
Vec::new()
} else {
// could not immediately receive bytes, so we need to block
match rx.recv() {
Ok(o) => o,
// Errors can happen if the sender has been dropped already
// In this case, just return 0 to indicate that we can't read any
// bytes anymore
Err(_) => {
return Ok(0);
}
}
}
}
};
if data.is_empty() && self.read_buffer.as_ref().map(|s| s.len()).unwrap_or(0) == 0 {
return Ok(0);
}
self.read_buffer.replace(Bytes::from(data));
}
}
Expand Down

0 comments on commit 05d74ea

Please sign in to comment.