diff --git a/Cargo.lock b/Cargo.lock index a4aa7427890..c2f5c1e2b26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3334,9 +3334,9 @@ dependencies = [ [[package]] name = "wasmer-inline-c" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2405c99de49dc05338e5ed2eb397fe70b7128340d960507d0ba716f7d29a91a" +checksum = "7c4e7a2a3363ceeb2ee60371af9460748f2bf53569b58627f1f640284ab07778" dependencies = [ "assert_cmd", "cc", diff --git a/examples/wasi_pipes.rs b/examples/wasi_pipes.rs index ed7ef03a075..e4319b71688 100644 --- a/examples/wasi_pipes.rs +++ b/examples/wasi_pipes.rs @@ -14,7 +14,7 @@ use std::io::{Read, Write}; use wasmer::{Instance, Module, Store}; use wasmer_compiler_cranelift::Cranelift; -use wasmer_wasi::{Pipe, WasiState}; +use wasmer_wasi::{WasiBidirectionalSharedPipePair, WasiState}; fn main() -> Result<(), Box> { let wasm_path = concat!( @@ -36,8 +36,8 @@ fn main() -> Result<(), Box> { println!("Creating `WasiEnv`..."); // First, we create the `WasiEnv` with the stdio pipes - let mut input = Pipe::new(); - let mut output = Pipe::new(); + let mut input = WasiBidirectionalSharedPipePair::new().with_blocking(false); + let mut output = WasiBidirectionalSharedPipePair::new().with_blocking(false); let wasi_env = WasiState::new("hello") .stdin(Box::new(input.clone())) .stdout(Box::new(output.clone())) diff --git a/lib/api/tests/externals.rs b/lib/api/tests/externals.rs index 5c17579a4ab..f593c763b79 100644 --- a/lib/api/tests/externals.rs +++ b/lib/api/tests/externals.rs @@ -210,28 +210,25 @@ fn memory_grow() -> Result<(), String> { fn function_new() -> Result<(), String> { let mut store = Store::default(); let function = Function::new_typed(&mut store, || {}); - assert_eq!( - function.ty(&mut store).clone(), - FunctionType::new(vec![], vec![]) - ); + assert_eq!(function.ty(&mut store), FunctionType::new(vec![], vec![])); let function = Function::new_typed(&mut store, |_a: i32| {}); assert_eq!( - function.ty(&mut store).clone(), + function.ty(&mut store), FunctionType::new(vec![Type::I32], vec![]) ); let function = Function::new_typed(&mut store, |_a: i32, _b: i64, _c: f32, _d: f64| {}); assert_eq!( - function.ty(&mut store).clone(), + function.ty(&mut store), FunctionType::new(vec![Type::I32, Type::I64, Type::F32, Type::F64], vec![]) ); let function = Function::new_typed(&mut store, || -> i32 { 1 }); assert_eq!( - function.ty(&mut store).clone(), + function.ty(&mut store), FunctionType::new(vec![], vec![Type::I32]) ); let function = Function::new_typed(&mut store, || -> (i32, i64, f32, f64) { (1, 2, 3.0, 4.0) }); assert_eq!( - function.ty(&mut store).clone(), + function.ty(&mut store), FunctionType::new(vec![], vec![Type::I32, Type::I64, Type::F32, Type::F64]) ); Ok(()) @@ -246,14 +243,11 @@ fn function_new_env() -> Result<(), String> { let my_env = MyEnv {}; let env = FunctionEnv::new(&mut store, my_env); let function = Function::new_typed_with_env(&mut store, &env, |_env: FunctionEnvMut| {}); - assert_eq!( - function.ty(&mut store).clone(), - FunctionType::new(vec![], vec![]) - ); + assert_eq!(function.ty(&mut store), FunctionType::new(vec![], vec![])); let function = Function::new_typed_with_env(&mut store, &env, |_env: FunctionEnvMut, _a: i32| {}); assert_eq!( - function.ty(&mut store).clone(), + function.ty(&mut store), FunctionType::new(vec![Type::I32], vec![]) ); let function = Function::new_typed_with_env( @@ -262,13 +256,13 @@ fn function_new_env() -> Result<(), String> { |_env: FunctionEnvMut, _a: i32, _b: i64, _c: f32, _d: f64| {}, ); assert_eq!( - function.ty(&mut store).clone(), + function.ty(&mut store), FunctionType::new(vec![Type::I32, Type::I64, Type::F32, Type::F64], vec![]) ); let function = Function::new_typed_with_env(&mut store, &env, |_env: FunctionEnvMut| -> i32 { 1 }); assert_eq!( - function.ty(&mut store).clone(), + function.ty(&mut store), FunctionType::new(vec![], vec![Type::I32]) ); let function = Function::new_typed_with_env( @@ -277,7 +271,7 @@ fn function_new_env() -> Result<(), String> { |_env: FunctionEnvMut| -> (i32, i64, f32, f64) { (1, 2, 3.0, 4.0) }, ); assert_eq!( - function.ty(&mut store).clone(), + function.ty(&mut store), FunctionType::new(vec![], vec![Type::I32, Type::I64, Type::F32, Type::F64]) ); Ok(()) @@ -294,35 +288,35 @@ fn function_new_dynamic() -> Result<(), String> { &function_type, |_values: &[Value]| unimplemented!(), ); - assert_eq!(function.ty(&mut store).clone(), function_type); + assert_eq!(function.ty(&mut store), function_type); let function_type = FunctionType::new(vec![Type::I32], vec![]); let function = Function::new( &mut store, &function_type, |_values: &[Value]| unimplemented!(), ); - assert_eq!(function.ty(&mut store).clone(), function_type); + assert_eq!(function.ty(&mut store), function_type); let function_type = FunctionType::new(vec![Type::I32, Type::I64, Type::F32, Type::F64], vec![]); let function = Function::new( &mut store, &function_type, |_values: &[Value]| unimplemented!(), ); - assert_eq!(function.ty(&mut store).clone(), function_type); + assert_eq!(function.ty(&mut store), function_type); let function_type = FunctionType::new(vec![], vec![Type::I32]); let function = Function::new( &mut store, &function_type, |_values: &[Value]| unimplemented!(), ); - assert_eq!(function.ty(&mut store).clone(), function_type); + assert_eq!(function.ty(&mut store), function_type); let function_type = FunctionType::new(vec![], vec![Type::I32, Type::I64, Type::F32, Type::F64]); let function = Function::new( &mut store, &function_type, |_values: &[Value]| unimplemented!(), ); - assert_eq!(function.ty(&mut store).clone(), function_type); + assert_eq!(function.ty(&mut store), function_type); // Using array signature let function_type = ([Type::V128], [Type::I32, Type::F32, Type::F64]); @@ -356,7 +350,7 @@ fn function_new_dynamic_env() -> Result<(), String> { &function_type, |_env: FunctionEnvMut, _values: &[Value]| unimplemented!(), ); - assert_eq!(function.ty(&mut store).clone(), function_type); + assert_eq!(function.ty(&mut store), function_type); let function_type = FunctionType::new(vec![Type::I32], vec![]); let function = Function::new_with_env( &mut store, @@ -364,7 +358,7 @@ fn function_new_dynamic_env() -> Result<(), String> { &function_type, |_env: FunctionEnvMut, _values: &[Value]| unimplemented!(), ); - assert_eq!(function.ty(&mut store).clone(), function_type); + assert_eq!(function.ty(&mut store), function_type); let function_type = FunctionType::new(vec![Type::I32, Type::I64, Type::F32, Type::F64], vec![]); let function = Function::new_with_env( &mut store, @@ -372,7 +366,7 @@ fn function_new_dynamic_env() -> Result<(), String> { &function_type, |_env: FunctionEnvMut, _values: &[Value]| unimplemented!(), ); - assert_eq!(function.ty(&mut store).clone(), function_type); + assert_eq!(function.ty(&mut store), function_type); let function_type = FunctionType::new(vec![], vec![Type::I32]); let function = Function::new_with_env( &mut store, @@ -380,7 +374,7 @@ fn function_new_dynamic_env() -> Result<(), String> { &function_type, |_env: FunctionEnvMut, _values: &[Value]| unimplemented!(), ); - assert_eq!(function.ty(&mut store).clone(), function_type); + assert_eq!(function.ty(&mut store), function_type); let function_type = FunctionType::new(vec![], vec![Type::I32, Type::I64, Type::F32, Type::F64]); let function = Function::new_with_env( &mut store, @@ -388,7 +382,7 @@ fn function_new_dynamic_env() -> Result<(), String> { &function_type, |_env: FunctionEnvMut, _values: &[Value]| unimplemented!(), ); - assert_eq!(function.ty(&mut store).clone(), function_type); + assert_eq!(function.ty(&mut store), function_type); // Using array signature let function_type = ([Type::V128], [Type::I32, Type::F32, Type::F64]); diff --git a/lib/api/tests/reference_types.rs b/lib/api/tests/reference_types.rs index aecd46d48ec..8746936f06f 100644 --- a/lib/api/tests/reference_types.rs +++ b/lib/api/tests/reference_types.rs @@ -367,7 +367,7 @@ pub mod reference_types { let global: &Global = instance.exports.get_global("global")?; { let er = ExternRef::new(&mut store, 3usize); - global.set(&mut store, Value::ExternRef(Some(er.clone())))?; + global.set(&mut store, Value::ExternRef(Some(er)))?; } let get_from_global: TypedFunction<(), Option> = instance .exports @@ -398,7 +398,7 @@ pub mod reference_types { let er = ExternRef::new(&mut store, 3usize); - let result = pass_extern_ref.call(&mut store, Some(er.clone())); + let result = pass_extern_ref.call(&mut store, Some(er)); assert!(result.is_err()); Ok(()) @@ -442,7 +442,7 @@ pub mod reference_types { let result = grow_table_with_ref.call(&mut store, Some(er1.clone()), 10_000)?; assert_eq!(result, -1); - let result = grow_table_with_ref.call(&mut store, Some(er1.clone()), 8)?; + let result = grow_table_with_ref.call(&mut store, Some(er1), 8)?; assert_eq!(result, 2); for i in 2..10 { @@ -454,7 +454,7 @@ pub mod reference_types { } { - fill_table_with_ref.call(&mut store, Some(er2.clone()), 0, 2)?; + fill_table_with_ref.call(&mut store, Some(er2), 0, 2)?; } { @@ -462,7 +462,7 @@ pub mod reference_types { table2.set(&mut store, 1, Value::ExternRef(Some(er3.clone())))?; table2.set(&mut store, 2, Value::ExternRef(Some(er3.clone())))?; table2.set(&mut store, 3, Value::ExternRef(Some(er3.clone())))?; - table2.set(&mut store, 4, Value::ExternRef(Some(er3.clone())))?; + table2.set(&mut store, 4, Value::ExternRef(Some(er3)))?; } { diff --git a/lib/c-api/src/wasm_c_api/wasi/mod.rs b/lib/c-api/src/wasm_c_api/wasi/mod.rs index 6598ab2c608..b83dc3fd243 100644 --- a/lib/c-api/src/wasm_c_api/wasi/mod.rs +++ b/lib/c-api/src/wasm_c_api/wasi/mod.rs @@ -10,20 +10,645 @@ use super::{ store::{wasm_store_t, StoreRef}, }; use crate::error::update_last_error; -use std::convert::TryFrom; +use std::convert::TryInto; use std::ffi::CStr; use std::os::raw::c_char; use std::slice; +use std::sync::{Arc, Mutex}; +use std::{ + convert::TryFrom, + ffi::c_void, + fmt, + io::{self, SeekFrom}, + sync::MutexGuard, +}; use wasmer_wasi::{ - get_wasi_version, Pipe, WasiFile, WasiFunctionEnv, WasiState, WasiStateBuilder, WasiVersion, + get_wasi_version, FsError, VirtualFile, WasiBidirectionalPipePair, WasiFile, WasiFunctionEnv, + WasiPipe, WasiState, WasiStateBuilder, WasiVersion, }; +/// Function callback that takes: +/// +/// - a *mut to the environment data (passed in on creation), +/// - the length of the environment data +/// - a *const to the bytes to write +/// - the length of the bytes to write +pub type WasiConsoleIoReadCallback = unsafe extern "C" fn(*const c_void, *mut c_char, usize) -> i64; +pub type WasiConsoleIoWriteCallback = + unsafe extern "C" fn(*const c_void, *const c_char, usize, bool) -> i64; +pub type WasiConsoleIoSeekCallback = unsafe extern "C" fn(*const c_void, c_char, i64) -> i64; +pub type WasiConsoleIoEnvDestructor = unsafe extern "C" fn(*const c_void) -> i64; + +/// The console override is a custom context consisting of callback pointers +/// (which are activated whenever some console I/O occurs) and a "context", which +/// can be owned or referenced from C. This struct can be used in `wasi_config_overwrite_stdin`, +/// `wasi_config_overwrite_stdout` or `wasi_config_overwrite_stderr` to redirect the output or +/// insert input into the console I/O log. +/// +/// Internally the stdout / stdin is synchronized, so the console is usable across threads +/// (only one thread can read / write / seek from the console I/O) +#[allow(non_camel_case_types)] +#[allow(clippy::box_collection, clippy::redundant_allocation)] +#[repr(C)] +#[derive(Clone)] +pub struct wasi_pipe_t { + read: WasiConsoleIoReadCallback, + write: WasiConsoleIoWriteCallback, + seek: WasiConsoleIoSeekCallback, + data: Option>>>, +} + +#[derive(Debug)] +struct WasiPipeDataWithDestructor { + data: Vec, + // Buffer of already-read data that is being read into, + // then the result is returned + temp_buffer: Vec, + destructor: WasiConsoleIoEnvDestructor, +} + +impl WasiPipeDataWithDestructor { + fn read_buffer( + &mut self, + read_cb: WasiConsoleIoReadCallback, + max_read: Option, + ) -> io::Result { + const BLOCK_SIZE: usize = 1024; + + let mut final_buf = Vec::new(); + + let max_to_read = max_read.unwrap_or(usize::MAX); + let max_read = max_to_read.saturating_sub(self.temp_buffer.len()); + if max_read == 0 { + // there are n bytes being available to read in the temp_buffer + return Ok(max_to_read); + } + let mut cur_read = 0; + + // Read bytes until either EOF is reached or max_read bytes are reached + loop { + if cur_read >= max_read { + break; + } + + let mut temp_buffer = if cur_read + BLOCK_SIZE > max_read { + vec![0; max_read - cur_read] + } else { + vec![0; BLOCK_SIZE] + }; + + let result = unsafe { + let ptr = temp_buffer.as_mut_ptr() as *mut c_char; + (read_cb)( + self.data.as_mut_ptr() as *const c_void, + ptr, + temp_buffer.len(), + ) + }; + + if result < 0 { + return Err(io::Error::new( + io::ErrorKind::Other, + format!("could not read from wasi_pipe_t: {result}"), + )); + } + + let result = result as usize; + if result == 0 || result > temp_buffer.len() { + break; // EOF + } + + cur_read += result; + final_buf.extend_from_slice(&temp_buffer[..result]); + } + + let final_buf_len = final_buf.len(); + + // store the bytes in temp_buffer + self.temp_buffer.extend_from_slice(&final_buf); + + // temp_buffer.len() can be smaller than max_read in case we + // encounter EOF earlier than expected + assert!(self.temp_buffer.len() <= max_read); + + // return how many bytes were just read + // + // caller has to clear temp_buffer to advance actual reading + Ok(final_buf_len) + } +} + +impl Drop for WasiPipeDataWithDestructor { + fn drop(&mut self) { + let error = unsafe { (self.destructor)(self.data.as_mut_ptr() as *const c_void) }; + if error < 0 { + panic!("error dropping wasi_pipe_t: {}", error); + } + } +} + +impl wasi_pipe_t { + fn get_data_mut( + &self, + op_id: &'static str, + ) -> io::Result> { + self.data + .as_ref() + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + format!("could not lock mutex ({op_id}) on wasi_pipe_t: no mutex"), + ) + })? + .lock() + .map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("could not lock mutex ({op_id}) on wasi_pipe_t: {e}"), + ) + }) + } +} + +impl fmt::Debug for wasi_pipe_t { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "wasi_pipe_t") + } +} + +impl io::Read for wasi_pipe_t { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let self_read = self.read; + let mut data = self.get_data_mut("read")?; + let _ = data.read_buffer(self_read, Some(buf.len()))?; + let bytes_to_read = buf.len().min(data.temp_buffer.len()); + let bytes_read = data.temp_buffer.drain(..bytes_to_read).collect::>(); + buf[..bytes_read.len()].clone_from_slice(&bytes_read); + Ok(bytes_to_read) + } +} + +impl io::Write for wasi_pipe_t { + fn write(&mut self, buf: &[u8]) -> io::Result { + let self_write = self.write; + let mut data = self.get_data_mut("write")?; + let result = unsafe { + (self_write)( + data.data.as_mut_ptr() as *const c_void, + buf.as_ptr() as *const c_char, + buf.len(), + false, + ) + }; + if result >= 0 { + Ok(result.try_into().unwrap_or(0)) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "could not write {} bytes to wasi_pipe_t: {result}", + buf.len() + ), + )) + } + } + fn flush(&mut self) -> io::Result<()> { + let self_write = self.write; + let mut data = self.get_data_mut("flush")?; + let bytes_to_write = &[]; + let result: i64 = unsafe { + (self_write)( + data.data.as_mut_ptr() as *const c_void, + bytes_to_write.as_ptr(), + 0, + true, + ) + }; + if result >= 0 { + Ok(()) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("could not flush wasi_pipe_t: {result}"), + )) + } + } +} + +impl io::Seek for wasi_pipe_t { + fn seek(&mut self, pos: SeekFrom) -> io::Result { + let self_seek = self.seek; + let mut data = self.get_data_mut("seek")?; + let (id, pos) = match pos { + SeekFrom::Start(s) => (0, s as i64), + SeekFrom::End(s) => (1, s), + SeekFrom::Current(s) => (2, s), + }; + let result = unsafe { (self_seek)(data.data.as_mut_ptr() as *const c_void, id, pos) }; + if result >= 0 { + Ok(result.try_into().unwrap_or(0)) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("could not seek to {pos:?} wasi_pipe_t: {result}"), + )) + } + } +} + +impl VirtualFile for wasi_pipe_t { + fn last_accessed(&self) -> u64 { + 0 + } + fn last_modified(&self) -> u64 { + 0 + } + fn created_time(&self) -> u64 { + 0 + } + fn size(&self) -> u64 { + 0 + } + fn set_len(&mut self, _: u64) -> Result<(), FsError> { + Ok(()) + } + fn unlink(&mut self) -> Result<(), FsError> { + Ok(()) + } + fn bytes_available(&self) -> Result { + Ok(self.bytes_available_read()?.unwrap_or(0usize) + + self.bytes_available_write()?.unwrap_or(0usize)) + } + fn bytes_available_read(&self) -> Result, FsError> { + let self_read = self.read; + let mut data = self.get_data_mut("bytes_available_read")?; + let _ = data.read_buffer(self_read, None)?; + Ok(Some(data.temp_buffer.len())) + } + fn bytes_available_write(&self) -> Result, FsError> { + Ok(None) + } +} + +#[no_mangle] +pub unsafe extern "C" fn wasi_pipe_new_internal( + read: WasiConsoleIoReadCallback, + write: WasiConsoleIoWriteCallback, + seek: WasiConsoleIoSeekCallback, + destructor: WasiConsoleIoEnvDestructor, + env_data: *const c_void, + env_data_len: usize, +) -> *mut wasi_pipe_t { + let data_vec: Vec = + std::slice::from_raw_parts(env_data as *const c_char, env_data_len).to_vec(); + + Box::leak(Box::new(wasi_pipe_t { + read, + write, + seek, + data: Some(Box::new(Arc::new(Mutex::new(WasiPipeDataWithDestructor { + data: data_vec, + temp_buffer: Vec::new(), + destructor, + })))), + })) +} + +/// Creates a `wasi_pipe_t` callback object that does nothing +/// and redirects stdout / stderr to /dev/null +#[no_mangle] +pub unsafe extern "C" fn wasi_pipe_new_null() -> *mut wasi_pipe_t { + let mut data = Vec::new(); + wasi_pipe_new_internal( + wasi_pipe_read_null, + wasi_pipe_write_null, + wasi_pipe_seek_null, + wasi_pipe_delete_null, + data.as_mut_ptr(), + data.len(), + ) +} + +extern "C" fn wasi_pipe_read_null(_: *const c_void, _: *mut c_char, _: usize) -> i64 { + 0 +} + +extern "C" fn wasi_pipe_write_null(_: *const c_void, _: *const c_char, _: usize, _: bool) -> i64 { + 0 +} + +extern "C" fn wasi_pipe_seek_null(_: *const c_void, _: c_char, _: i64) -> i64 { + 0 +} + +extern "C" fn wasi_pipe_delete_null(_: *const c_void) -> i64 { + 0 +} + +unsafe extern "C" fn wasi_pipe_read_memory_2( + ptr: *const c_void, /* = *WasiPipe */ + byte_ptr: *mut c_char, /* &[u8] bytes to read */ + max_bytes: usize, /* max bytes to read */ +) -> i64 { + use std::io::Read; + let ptr = ptr as *mut WasiPipe; + let ptr = &mut *ptr; + let slice = std::slice::from_raw_parts_mut(byte_ptr as *mut u8, max_bytes); + match ptr.read(slice) { + Ok(o) => o as i64, + Err(_) => -1, + } +} + +unsafe extern "C" fn wasi_pipe_write_memory_2( + ptr: *const c_void, /* = *WasiPipe */ + byte_ptr: *const c_char, + byte_len: usize, + flush: bool, +) -> i64 { + use std::io::Write; + + let ptr = ptr as *mut WasiPipe; + let ptr = &mut *ptr; + + if flush { + match ptr.flush() { + Ok(()) => 0, + Err(_) => -1, + } + } else { + let slice = std::slice::from_raw_parts(byte_ptr as *const u8, byte_len); + match ptr.write(slice) { + Ok(o) => o as i64, + Err(_) => -1, + } + } +} + +unsafe extern "C" fn wasi_pipe_seek_memory_2( + ptr: *const c_void, /* = *WasiPipe */ + direction: c_char, + seek_to: i64, +) -> i64 { + use std::io::Seek; + + let ptr = ptr as *mut WasiPipe; + let ptr = &mut *ptr; + + let seek_from = match direction { + 0 => std::io::SeekFrom::Start(seek_to.max(0) as u64), + 1 => std::io::SeekFrom::End(seek_to), + 2 => std::io::SeekFrom::Current(seek_to), + _ => { + return -1; + } + }; + + match ptr.seek(seek_from) { + Ok(o) => o as i64, + Err(_) => -1, + } +} + +#[no_mangle] +unsafe extern "C" fn wasi_pipe_delete_memory_2(ptr: *const c_void /* = *WasiPipe */) -> i64 { + let ptr = ptr as *const WasiPipe; + let mut pipe: WasiPipe = std::mem::transmute_copy(&*ptr); // dropped here, destructors run here + pipe.close(); + 0 +} + +/// Creates a new `wasi_pipe_t` which uses a memory buffer +/// for backing stdin / stdout / stderr +#[no_mangle] +pub unsafe extern "C" fn wasi_pipe_new(ptr_user: &mut *mut wasi_pipe_t) -> *mut wasi_pipe_t { + wasi_pipe_new_internal_memory(ptr_user, false) +} + +/// Same as `wasi_pipe_new`, but the pipe will block to wait for stdin input +#[no_mangle] +pub unsafe extern "C" fn wasi_pipe_new_blocking( + ptr_user: &mut *mut wasi_pipe_t, +) -> *mut wasi_pipe_t { + wasi_pipe_new_internal_memory(ptr_user, true) +} + +unsafe fn wasi_pipe_new_internal_memory( + ptr_user: &mut *mut wasi_pipe_t, + blocking: bool, +) -> *mut wasi_pipe_t { + use std::mem::ManuallyDrop; + + let mut pair = WasiBidirectionalPipePair::new(); + pair.send.set_blocking(blocking); + pair.recv.set_blocking(blocking); + + let mut data1 = ManuallyDrop::new(pair.send); + let ptr1: &mut WasiPipe = &mut data1; + + *ptr_user = wasi_pipe_new_internal( + wasi_pipe_read_memory_2, + wasi_pipe_write_memory_2, + wasi_pipe_seek_memory_2, + wasi_pipe_delete_memory_2, + ptr1 as *mut _ as *mut c_void, + std::mem::size_of::(), + ); + + let mut data2 = ManuallyDrop::new(pair.recv); + let ptr2: &mut WasiPipe = &mut data2; + wasi_pipe_new_internal( + wasi_pipe_read_memory_2, + wasi_pipe_write_memory_2, + wasi_pipe_seek_memory_2, + wasi_pipe_delete_memory_2, + ptr2 as *mut _ as *mut c_void, + std::mem::size_of::(), + ) +} + +#[no_mangle] +pub unsafe extern "C" fn wasi_pipe_delete(ptr: *mut wasi_pipe_t) -> bool { + let _ = Box::from_raw(ptr); + true +} + +#[no_mangle] +pub unsafe extern "C" fn wasi_pipe_write_bytes( + ptr: *mut wasi_pipe_t, + buf: *const c_char, + len: usize, +) -> i64 { + use std::io::Write; + let buf = buf as *const u8; + let ptr = &mut *ptr; + let read_slice = std::slice::from_raw_parts(buf, len); + match ptr.write(read_slice) { + Ok(o) => o as i64, + Err(_) => -1, + } +} + +#[no_mangle] +pub unsafe extern "C" fn wasi_pipe_write_str(ptr: *const wasi_pipe_t, buf: *const c_char) -> i64 { + use std::io::Write; + let c_str = std::ffi::CStr::from_ptr(buf); + let as_bytes_with_nul = c_str.to_bytes(); + let ptr = &mut *(ptr as *mut wasi_pipe_t); + match ptr.write(as_bytes_with_nul) { + Ok(o) => o as i64, + Err(_) => -1, + } +} + +#[no_mangle] +pub unsafe extern "C" fn wasi_pipe_flush(ptr: *mut wasi_pipe_t) -> i64 { + use std::io::Write; + let ptr = &mut *ptr; + match ptr.flush() { + Ok(_) => 0, + Err(_) => -1, + } +} + +#[test] +fn test_wasi_pipe_with_destructor() { + let mut wasi_pipe_t_ptr = std::ptr::null_mut(); + let second_wasi_pipe_t_ptr = unsafe { wasi_pipe_new(&mut wasi_pipe_t_ptr) }; + let wasi_pipe_t_ptr = unsafe { &mut *wasi_pipe_t_ptr }; + let second_wasi_pipe_t_ptr = unsafe { &mut *second_wasi_pipe_t_ptr }; + + let data = b"hello".into_iter().map(|v| *v as i8).collect::>(); + let result = unsafe { wasi_pipe_write_bytes(wasi_pipe_t_ptr, data.as_ptr(), data.len()) }; + assert_eq!(result, 5); + + let bytes_avail = wasi_pipe_t_ptr.bytes_available_read(); + assert_eq!(bytes_avail, Ok(Some(0))); + + let bytes_avail2 = second_wasi_pipe_t_ptr.bytes_available_read(); + assert_eq!(bytes_avail2, Ok(Some(5))); + + let mut read_str_ptr = std::ptr::null_mut(); + let result = unsafe { wasi_pipe_read_str(second_wasi_pipe_t_ptr, &mut read_str_ptr) }; + assert_eq!(result, 6); // hello\0 + let buf_slice = unsafe { std::slice::from_raw_parts_mut(read_str_ptr, result as usize) }; + assert_eq!(buf_slice[..5], data); + + unsafe { + wasi_pipe_delete_str(read_str_ptr); + } + unsafe { wasi_pipe_delete(wasi_pipe_t_ptr) }; + unsafe { wasi_pipe_delete(second_wasi_pipe_t_ptr) }; +} + +#[no_mangle] +pub unsafe extern "C" fn wasi_pipe_read_bytes( + ptr: *const wasi_pipe_t, + buf: *const c_char, + read: usize, +) -> i64 { + use std::io::Read; + let ptr = &mut *(ptr as *mut wasi_pipe_t); + let buf = buf as *mut u8; + let slice = std::slice::from_raw_parts_mut(buf, read); + match ptr.read(slice) { + Ok(o) => o as i64, + Err(_) => -1, + } +} + +#[no_mangle] +pub unsafe extern "C" fn wasi_pipe_delete_str(buf: *mut c_char) { + use std::ffi::CString; + let _ = CString::from_raw(buf); +} + +unsafe fn wasi_pipe_read_bytes_internal(ptr: *const wasi_pipe_t, buf: &mut Vec) -> i64 { + use std::io::Read; + + const BLOCK_SIZE: usize = 1024; + + let ptr = &mut *(ptr as *mut wasi_pipe_t); + let mut target = Vec::new(); + + loop { + let mut v = vec![0; BLOCK_SIZE]; + // read n bytes, maximum of 1024 + match ptr.read(&mut v) { + Ok(0) => { + break; + } + Ok(n) => { + target.extend_from_slice(&v[..n]); + } + Err(_) => { + return -1; + } + } + } + + let len = target.len() as i64; + *buf = target; + len +} + +#[no_mangle] +pub unsafe extern "C" fn wasi_pipe_read_str(ptr: *const wasi_pipe_t, buf: &mut *mut c_char) -> i64 { + use std::ffi::CString; + + let mut target = Vec::new(); + let read_result = wasi_pipe_read_bytes_internal(ptr, &mut target); + if read_result < 0 { + return read_result; + } + + target.push(0); + let len = target.len(); + let c_string = match CString::from_vec_with_nul(target.clone()) { + Ok(o) => o, + Err(_) => { + return -1; + } + }; + + *buf = CString::into_raw(c_string); + len as i64 +} + +#[no_mangle] +pub unsafe extern "C" fn wasi_pipe_seek( + ptr: *mut wasi_pipe_t, + // 0 = from start + // 1 = from end + // 2 = from current position + seek_dir: c_char, + seek: i64, +) -> i64 { + use std::io::Seek; + + let seek_pos = match seek_dir { + 0 => SeekFrom::Start(seek as u64), + 1 => SeekFrom::End(seek), + 2 => SeekFrom::Current(seek), + _ => { + return -1; + } + }; + + let ptr = &mut *ptr; + + ptr.seek(seek_pos) + .ok() + .and_then(|p| p.try_into().ok()) + .unwrap_or(-1) +} + #[derive(Debug)] #[allow(non_camel_case_types)] pub struct wasi_config_t { - inherit_stdout: bool, - inherit_stderr: bool, - inherit_stdin: bool, + stdout: Option>, + stderr: Option>, + stdin: Option>, state_builder: WasiStateBuilder, } @@ -37,16 +662,16 @@ pub unsafe extern "C" fn wasi_config_new( let prog_name = c_try!(name_c_str.to_str()); Some(Box::new(wasi_config_t { - inherit_stdout: true, - inherit_stderr: true, - inherit_stdin: true, + stdout: None, + stderr: None, + stdin: None, state_builder: WasiState::new(prog_name), })) } #[no_mangle] pub unsafe extern "C" fn wasi_config_env( - config: &mut wasi_config_t, + wasi_config: &mut wasi_config_t, key: *const c_char, value: *const c_char, ) { @@ -58,22 +683,22 @@ pub unsafe extern "C" fn wasi_config_env( let value_cstr = CStr::from_ptr(value); let value_bytes = value_cstr.to_bytes(); - config.state_builder.env(key_bytes, value_bytes); + wasi_config.state_builder.env(key_bytes, value_bytes); } #[no_mangle] -pub unsafe extern "C" fn wasi_config_arg(config: &mut wasi_config_t, arg: *const c_char) { +pub unsafe extern "C" fn wasi_config_arg(wasi_config: &mut wasi_config_t, arg: *const c_char) { debug_assert!(!arg.is_null()); let arg_cstr = CStr::from_ptr(arg); let arg_bytes = arg_cstr.to_bytes(); - config.state_builder.arg(arg_bytes); + wasi_config.state_builder.arg(arg_bytes); } #[no_mangle] pub unsafe extern "C" fn wasi_config_preopen_dir( - config: &mut wasi_config_t, + wasi_config: &mut wasi_config_t, dir: *const c_char, ) -> bool { let dir_cstr = CStr::from_ptr(dir); @@ -86,7 +711,7 @@ pub unsafe extern "C" fn wasi_config_preopen_dir( } }; - if let Err(e) = config.state_builder.preopen_dir(dir_str) { + if let Err(e) = wasi_config.state_builder.preopen_dir(dir_str) { update_last_error(e); return false; } @@ -96,7 +721,7 @@ pub unsafe extern "C" fn wasi_config_preopen_dir( #[no_mangle] pub unsafe extern "C" fn wasi_config_mapdir( - config: &mut wasi_config_t, + wasi_config: &mut wasi_config_t, alias: *const c_char, dir: *const c_char, ) -> bool { @@ -120,7 +745,7 @@ pub unsafe extern "C" fn wasi_config_mapdir( } }; - if let Err(e) = config.state_builder.map_dir(alias_str, dir_str) { + if let Err(e) = wasi_config.state_builder.map_dir(alias_str, dir_str) { update_last_error(e); return false; } @@ -129,33 +754,63 @@ pub unsafe extern "C" fn wasi_config_mapdir( } #[no_mangle] -pub extern "C" fn wasi_config_capture_stdout(config: &mut wasi_config_t) { - config.inherit_stdout = false; +pub extern "C" fn wasi_config_capture_stdout(wasi_config: &mut wasi_config_t) { + wasi_config.stdout = Some(unsafe { Box::from_raw(wasi_pipe_new_null()) }); +} + +#[no_mangle] +pub extern "C" fn wasi_config_inherit_stdout(wasi_config: &mut wasi_config_t) { + wasi_config.stdout = None; +} + +#[no_mangle] +pub extern "C" fn wasi_config_capture_stderr(wasi_config: &mut wasi_config_t) { + wasi_config.stderr = Some(unsafe { Box::from_raw(wasi_pipe_new_null()) }); } #[no_mangle] -pub extern "C" fn wasi_config_inherit_stdout(config: &mut wasi_config_t) { - config.inherit_stdout = true; +pub extern "C" fn wasi_config_inherit_stderr(wasi_config: &mut wasi_config_t) { + wasi_config.stderr = None; } #[no_mangle] -pub extern "C" fn wasi_config_capture_stderr(config: &mut wasi_config_t) { - config.inherit_stderr = false; +pub extern "C" fn wasi_config_capture_stdin(wasi_config: &mut wasi_config_t) { + wasi_config.stdin = Some(unsafe { Box::from_raw(wasi_pipe_new_null()) }); } #[no_mangle] -pub extern "C" fn wasi_config_inherit_stderr(config: &mut wasi_config_t) { - config.inherit_stderr = true; +pub extern "C" fn wasi_config_inherit_stdin(wasi_config: &mut wasi_config_t) { + wasi_config.stdin = None; } -//#[no_mangle] -//pub extern "C" fn wasi_config_capture_stdin(config: &mut wasi_config_t) { -// config.inherit_stdin = false; -//} +#[no_mangle] +pub unsafe extern "C" fn wasi_config_overwrite_stdin( + config_overwrite: &mut wasi_config_t, + stdin_overwrite: *mut wasi_pipe_t, +) { + config_overwrite + .state_builder + .stdin(Box::from_raw(stdin_overwrite)); +} #[no_mangle] -pub extern "C" fn wasi_config_inherit_stdin(config: &mut wasi_config_t) { - config.inherit_stdin = true; +pub unsafe extern "C" fn wasi_config_overwrite_stdout( + config_overwrite: &mut wasi_config_t, + stdout_overwrite: *mut wasi_pipe_t, +) { + config_overwrite + .state_builder + .stdout(Box::from_raw(stdout_overwrite)); +} + +#[no_mangle] +pub unsafe extern "C" fn wasi_config_overwrite_stderr( + config_overwrite: &mut wasi_config_t, + stderr_overwrite: *mut wasi_pipe_t, +) { + config_overwrite + .state_builder + .stderr(Box::from_raw(stderr_overwrite)); } #[allow(non_camel_case_types)] @@ -171,21 +826,24 @@ pub struct wasi_env_t { #[no_mangle] pub unsafe extern "C" fn wasi_env_new( store: Option<&mut wasm_store_t>, - mut config: Box, + mut wasi_config: Box, ) -> Option> { let store = &mut store?.inner; let mut store_mut = store.store_mut(); - if !config.inherit_stdout { - config.state_builder.stdout(Box::new(Pipe::new())); + + if let Some(stdout) = wasi_config.stdout { + wasi_config.state_builder.stdout(stdout); } - if !config.inherit_stderr { - config.state_builder.stderr(Box::new(Pipe::new())); + if let Some(stderr) = wasi_config.stderr { + wasi_config.state_builder.stderr(stderr); } - // TODO: impl capturer for stdin + if let Some(stdin) = wasi_config.stdin { + wasi_config.state_builder.stdin(stdin); + } - let wasi_state = c_try!(config.state_builder.finalize(&mut store_mut)); + let wasi_state = c_try!(wasi_config.state_builder.finalize(&mut store_mut)); Some(Box::new(wasi_env_t { inner: wasi_state, @@ -506,4 +1164,218 @@ mod tests { }) .success(); } + + #[test] + fn test_wasi_stdin_set() { + (assert_c! { + #include "tests/wasmer.h" + #include "string.h" + #include "stdio.h" + + int main() { + wasi_pipe_t* override_stdout_1 = NULL; + wasi_pipe_t* override_stdout_2 = wasi_pipe_new(&override_stdout_1); + + assert(override_stdout_1); + assert(override_stdout_2); + + // write to override_stdout_1, then close override_stdout_1 + wasi_pipe_write_str(override_stdout_1, "test"); + wasi_pipe_delete(override_stdout_1); + + // read from override_stdout_2, after override_stdout_1 has been closed so it doesn't block + char* out; + wasi_pipe_read_str(override_stdout_2, &out); + assert(strcmp(out, "test") == 0); + wasi_pipe_delete_str(out); + + // cleanup + wasi_pipe_delete(override_stdout_2); + return 0; + } + }) + .success(); + } + + #[test] + fn test_wasi_stdin_set_2() { + (assert_c! { + #include "tests/wasmer.h" + #include "string.h" + #include "stdio.h" + + int main() { + + wasm_engine_t* engine = wasm_engine_new(); + wasm_store_t* store = wasm_store_new(engine); + wasi_config_t* config = wasi_config_new("example_program"); + + wasi_pipe_t* override_stdout_1 = NULL; + wasi_pipe_t* override_stdout_2 = wasi_pipe_new(&override_stdout_1); + assert(override_stdout_1); + assert(override_stdout_2); + + wasi_pipe_t* override_stderr_1 = NULL; + wasi_pipe_t* override_stderr_2 = wasi_pipe_new(&override_stderr_1); + assert(override_stderr_1); + assert(override_stderr_2); + + wasi_pipe_t* override_stdin_1 = NULL; + wasi_pipe_t* override_stdin_2 = wasi_pipe_new(&override_stdin_1); + assert(override_stdin_1); + assert(override_stdin_2); + + // The override_stdin ownership is moved to the config + wasi_config_overwrite_stdin(config, override_stdin_1); + wasi_config_overwrite_stdout(config, override_stdout_1); + wasi_config_overwrite_stderr(config, override_stderr_1); + + // write to stdin, then close all senders in order + // not to block during execution + wasi_pipe_write_str(override_stdin_2, "hello"); + wasi_pipe_delete(override_stdin_2); + + /* + // testrust.wasm: + + use std::io::{self, Write}; + + fn main() -> io::Result<()> { + + let mut input = String::new(); + io::stdin().read_line(&mut input)?; + + io::stdout().write_all(format!("stdout: {input}").as_bytes())?; + io::stderr().write_all(format!("stderr: {input}").as_bytes())?; + + Ok(()) + } + */ + + // Load binary. + FILE* file = fopen("tests/wasm-c-api/example/testrust.wasm", "rb"); + if (!file) { + printf("> Error loading module!\n"); + return 1; + } + + fseek(file, 0L, SEEK_END); + size_t file_size = ftell(file); + fseek(file, 0L, SEEK_SET); + + wasm_byte_vec_t binary; + wasm_byte_vec_new_uninitialized(&binary, file_size); + + if (fread(binary.data, file_size, 1, file) != 1) { + printf("> Error loading module!\n"); + return 1; + } + + fclose(file); + + wasm_module_t* module = wasm_module_new(store, &binary); + if (!module) { + printf("> Error compiling module!\n"); + return 1; + } + + // The env now has ownership of the config (using the custom stdout / stdin channels) + wasi_env_t *wasi_env = wasi_env_new(store, config); + if (!wasi_env) { + printf("> Error building WASI env!\n"); + return 1; + } + + wasm_importtype_vec_t import_types; + wasm_module_imports(module, &import_types); + + wasm_extern_vec_t imports; + wasm_extern_vec_new_uninitialized(&imports, import_types.size); + wasm_importtype_vec_delete(&import_types); + + bool get_imports_result = wasi_get_imports(store, wasi_env, module, &imports); + + if (!get_imports_result) { + printf("Error getting WASI imports!\n"); + return 1; + } + + // The program should wait for a stdin, then print "stdout: $1" to stdout + // and "stderr: $1" to stderr and exit. + + // Instantiate the module + wasm_instance_t *instance = wasm_instance_new(store, module, &imports, NULL); + if (!instance) { + printf("> Error instantiating module!\n"); + return -1; + } + + // Read the exports. + wasm_extern_vec_t exports; + wasm_instance_exports(instance, &exports); + wasm_memory_t* mem = NULL; + for (size_t i = 0; i < exports.size; i++) { + mem = wasm_extern_as_memory(exports.data[i]); + if (mem) { + break; + } + } + + if (!mem) { + printf("Failed to create instance: Could not find memory in exports\n"); + return -1; + } + wasi_env_set_memory(wasi_env, mem); + + // Get the _start function + wasm_func_t* run_func = wasi_get_start_function(instance); + if (run_func == NULL) { + printf("> Error accessing export!\n"); + return 1; + } + + // Run the _start function + // Running the program should trigger the stdin to write "hello" to the stdin + wasm_val_vec_t args = WASM_EMPTY_VEC; + wasm_val_vec_t res = WASM_EMPTY_VEC; + if (wasm_func_call(run_func, &args, &res)) { + printf("> Error calling function!\n"); + return 1; + } + + // Verify that the stdout / stderr worked as expected + char* out; + wasi_pipe_read_str(override_stdout_2, &out); + assert(strcmp(out, "stdout: hello") == 0); + wasi_pipe_delete_str(out); + + char* out2; + wasi_pipe_read_str(override_stdout_2, &out2); + assert(strcmp(out2, "") == 0); + wasi_pipe_delete_str(out2); + + char* out3; + wasi_pipe_read_str(override_stderr_2, &out3); + assert(strcmp(out3, "stderr: hello") == 0); + wasi_pipe_delete_str(out3); + + char* out4; + wasi_pipe_read_str(override_stderr_2, &out4); + assert(strcmp(out4, "") == 0); + wasi_pipe_delete_str(out4); + + wasi_pipe_delete(override_stdout_2); + wasi_pipe_delete(override_stderr_2); + wasm_byte_vec_delete(&binary); + wasm_module_delete(module); + wasm_func_delete(run_func); + wasi_env_delete(wasi_env); + wasm_store_delete(store); + wasm_engine_delete(engine); + + return 0; + } + }) + .success(); + } } diff --git a/lib/c-api/tests/wasm-c-api/example/stdio.wasm b/lib/c-api/tests/wasm-c-api/example/stdio.wasm new file mode 100755 index 00000000000..9ed5b3d4709 Binary files /dev/null and b/lib/c-api/tests/wasm-c-api/example/stdio.wasm differ diff --git a/lib/c-api/tests/wasm-c-api/example/testrust.wasm b/lib/c-api/tests/wasm-c-api/example/testrust.wasm new file mode 100755 index 00000000000..92a691fc6bb Binary files /dev/null and b/lib/c-api/tests/wasm-c-api/example/testrust.wasm differ diff --git a/lib/vm/src/memory.rs b/lib/vm/src/memory.rs index f8b80d2cee4..056f73354d2 100644 --- a/lib/vm/src/memory.rs +++ b/lib/vm/src/memory.rs @@ -357,7 +357,7 @@ impl VMMemory { /// /// This creates a `Memory` with owned metadata: this can be used to create a memory /// that will be imported into Wasm modules. - pub fn new(memory: &MemoryType, style: &MemoryStyle) -> Result { + pub fn new(memory: &MemoryType, style: &MemoryStyle) -> Result { Ok(Self(Box::new(VMOwnedMemory::new(memory, style)?))) } @@ -372,7 +372,7 @@ impl VMMemory { memory: &MemoryType, style: &MemoryStyle, vm_memory_location: NonNull, - ) -> Result { + ) -> Result { Ok(Self(Box::new(VMOwnedMemory::from_definition( memory, style, @@ -384,9 +384,9 @@ impl VMMemory { /// are natively supported /// - VMOwnedMemory -> VMMemory /// - Box -> VMMemory - pub fn from_custom(memory: IntoVMMemory) -> VMMemory + pub fn from_custom(memory: IntoVMMemory) -> Self where - IntoVMMemory: Into, + IntoVMMemory: Into, { memory.into() } diff --git a/lib/wasi/src/lib.rs b/lib/wasi/src/lib.rs index 1d1b76cfa46..97a79b0d5ed 100644 --- a/lib/wasi/src/lib.rs +++ b/lib/wasi/src/lib.rs @@ -43,8 +43,9 @@ mod utils; use crate::syscalls::*; pub use crate::state::{ - Fd, Pipe, Stderr, Stdin, Stdout, WasiFs, WasiInodes, WasiState, WasiStateBuilder, - WasiStateCreationError, ALL_RIGHTS, VIRTUAL_ROOT_FD, + Fd, Pipe, Stderr, Stdin, Stdout, WasiBidirectionalPipePair, WasiBidirectionalSharedPipePair, + WasiFs, WasiInodes, WasiPipe, WasiState, WasiStateBuilder, WasiStateCreationError, ALL_RIGHTS, + VIRTUAL_ROOT_FD, }; pub use crate::syscalls::types; #[cfg(feature = "wasix")] diff --git a/lib/wasi/src/state/pipe.rs b/lib/wasi/src/state/pipe.rs index 4781ab0a02f..60754a0cb0f 100644 --- a/lib/wasi/src/state/pipe.rs +++ b/lib/wasi/src/state/pipe.rs @@ -2,12 +2,14 @@ use crate::syscalls::types::*; use crate::syscalls::{read_bytes, write_bytes}; use bytes::{Buf, Bytes}; use std::convert::TryInto; -use std::io::{self, Read}; +use std::io::{self, Read, Seek, SeekFrom, Write}; use std::ops::DerefMut; use std::sync::mpsc; +use std::sync::Arc; use std::sync::Mutex; use wasmer::WasmSlice; use wasmer::{MemorySize, MemoryView}; +use wasmer_vfs::{FsError, VirtualFile}; use wasmer_wasi_types::wasi::Errno; #[derive(Debug)] @@ -18,10 +20,70 @@ pub struct WasiPipe { rx: Mutex>>, /// Buffers the last read message from the pipe while its being consumed read_buffer: Option, + /// Whether the pipe should block or not block to wait for stdin reads + block: bool, } -impl WasiPipe { - pub fn new() -> (WasiPipe, WasiPipe) { +/// Pipe pair of (a, b) WasiPipes that are connected together +#[derive(Debug)] +pub struct WasiBidirectionalPipePair { + pub send: WasiPipe, + pub recv: WasiPipe, +} + +impl Write for WasiBidirectionalPipePair { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.send.write(buf) + } + fn flush(&mut self) -> io::Result<()> { + self.send.flush() + } +} + +impl Seek for WasiBidirectionalPipePair { + fn seek(&mut self, _: SeekFrom) -> io::Result { + Ok(0) + } +} + +impl Read for WasiBidirectionalPipePair { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.recv.read(buf) + } +} + +impl VirtualFile for WasiBidirectionalPipePair { + fn last_accessed(&self) -> u64 { + self.recv.last_accessed() + } + fn last_modified(&self) -> u64 { + self.recv.last_modified() + } + fn created_time(&self) -> u64 { + self.recv.created_time() + } + fn size(&self) -> u64 { + self.recv.size() + } + fn set_len(&mut self, i: u64) -> Result<(), FsError> { + self.recv.set_len(i) + } + fn unlink(&mut self) -> Result<(), FsError> { + self.recv.unlink() + } + fn bytes_available_read(&self) -> Result, FsError> { + self.recv.bytes_available_read() + } +} + +impl Default for WasiBidirectionalPipePair { + fn default() -> Self { + Self::new() + } +} + +impl WasiBidirectionalPipePair { + pub fn new() -> WasiBidirectionalPipePair { let (tx1, rx1) = mpsc::channel(); let (tx2, rx2) = mpsc::channel(); @@ -29,15 +91,142 @@ impl WasiPipe { tx: Mutex::new(tx1), rx: Mutex::new(rx2), read_buffer: None, + block: true, }; let pipe2 = WasiPipe { tx: Mutex::new(tx2), rx: Mutex::new(rx1), read_buffer: None, + block: true, }; - (pipe1, pipe2) + WasiBidirectionalPipePair { + send: pipe1, + recv: pipe2, + } + } + + #[allow(dead_code)] + pub fn with_blocking(mut self, block: bool) -> Self { + self.set_blocking(block); + self + } + + /// Whether to block on reads (ususally for waiting for stdin keyboard input). Default: `true` + #[allow(dead_code)] + pub fn set_blocking(&mut self, block: bool) { + self.send.set_blocking(block); + self.recv.set_blocking(block); + } +} + +/// Shared version of WasiBidirectionalPipePair for situations where you need +/// to emulate the old behaviour of `Pipe` (both send and recv on one channel). +#[derive(Debug, Clone)] +pub struct WasiBidirectionalSharedPipePair { + inner: Arc>, +} + +impl Default for WasiBidirectionalSharedPipePair { + fn default() -> Self { + Self::new() + } +} + +impl WasiBidirectionalSharedPipePair { + pub fn new() -> Self { + Self { + inner: Arc::new(Mutex::new(WasiBidirectionalPipePair::new())), + } + } + + #[allow(dead_code)] + pub fn with_blocking(mut self, block: bool) -> Self { + self.set_blocking(block); + self + } + + /// Whether to block on reads (ususally for waiting for stdin keyboard input). Default: `true` + #[allow(dead_code)] + pub fn set_blocking(&mut self, block: bool) { + self.inner.lock().unwrap().set_blocking(block); + } +} + +impl Write for WasiBidirectionalSharedPipePair { + fn write(&mut self, buf: &[u8]) -> io::Result { + match self.inner.lock().as_mut().map(|l| l.write(buf)) { + Ok(r) => r, + Err(_) => Ok(0), + } + } + fn flush(&mut self) -> io::Result<()> { + match self.inner.lock().as_mut().map(|l| l.flush()) { + Ok(r) => r, + Err(_) => Ok(()), + } + } +} + +impl Seek for WasiBidirectionalSharedPipePair { + fn seek(&mut self, _: SeekFrom) -> io::Result { + Ok(0) + } +} + +impl Read for WasiBidirectionalSharedPipePair { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + match self.inner.lock().as_mut().map(|l| l.read(buf)) { + Ok(r) => r, + Err(_) => Ok(0), + } + } +} + +impl VirtualFile for WasiBidirectionalSharedPipePair { + fn last_accessed(&self) -> u64 { + self.inner.lock().map(|l| l.last_accessed()).unwrap_or(0) + } + fn last_modified(&self) -> u64 { + self.inner.lock().map(|l| l.last_modified()).unwrap_or(0) + } + fn created_time(&self) -> u64 { + self.inner.lock().map(|l| l.created_time()).unwrap_or(0) + } + fn size(&self) -> u64 { + self.inner.lock().map(|l| l.size()).unwrap_or(0) + } + fn set_len(&mut self, i: u64) -> Result<(), FsError> { + match self.inner.lock().as_mut().map(|l| l.set_len(i)) { + Ok(r) => r, + Err(_) => Err(FsError::Lock), + } + } + fn unlink(&mut self) -> Result<(), FsError> { + match self.inner.lock().as_mut().map(|l| l.unlink()) { + Ok(r) => r, + Err(_) => Err(FsError::Lock), + } + } + fn bytes_available_read(&self) -> Result, FsError> { + self.inner + .lock() + .map(|l| l.bytes_available_read()) + .unwrap_or(Ok(None)) + } +} + +impl WasiPipe { + /// Same as `set_blocking`, but as a builder method + pub fn with_blocking(mut self, block: bool) -> Self { + self.set_blocking(block); + self + } + + /// Whether to block on reads (ususally for waiting for stdin keyboard input). Default: `true` + pub fn set_blocking(&mut self, block: bool) { + self.block = block; } pub fn recv( @@ -94,26 +283,113 @@ impl WasiPipe { } } +impl Write for WasiPipe { + fn write(&mut self, buf: &[u8]) -> io::Result { + let buf_len = buf.len(); + let tx = self.tx.lock().unwrap(); + tx.send(buf.to_vec()) + .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{e}")))?; + Ok(buf_len) + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl Seek for WasiPipe { + fn seek(&mut self, _: SeekFrom) -> io::Result { + Ok(0) + } +} + impl Read for WasiPipe { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { loop { if let Some(inner_buf) = self.read_buffer.as_mut() { let buf_len = inner_buf.len(); if buf_len > 0 { - let mut reader = inner_buf.as_ref(); - let read = reader.read(buf).map(|_| buf_len as usize)?; - inner_buf.advance(read); - return Ok(read); + if inner_buf.len() > buf.len() { + let mut reader = inner_buf.as_ref(); + let read = reader.read_exact(buf).map(|_| buf.len())?; + inner_buf.advance(read); + return Ok(read); + } else { + let mut reader = inner_buf.as_ref(); + let read = reader.read(buf).map(|_| buf_len as usize)?; + inner_buf.advance(read); + return Ok(read); + } } } let rx = self.rx.lock().unwrap(); - let data = rx.recv().map_err(|_| { - io::Error::new( - io::ErrorKind::BrokenPipe, - "the wasi pipe is not connected".to_string(), - ) - })?; + + // We need to figure out whether we need to block here. + // The problem is that in cases of multiple buffered reads like: + // + // println!("abc"); + // println!("def"); + // + // get_stdout() // would only return "abc\n" instead of "abc\ndef\n" + + let data = match rx.try_recv() { + Ok(mut s) => { + s.append(&mut rx.try_iter().flat_map(|f| f.into_iter()).collect()); + s + } + Err(_) => { + if !self.block { + // If self.block is explicitly set to false, never block + Vec::new() + } else { + // could not immediately receive bytes, so we need to block + match rx.recv() { + Ok(o) => o, + // Errors can happen if the sender has been dropped already + // In this case, just return 0 to indicate that we can't read any + // bytes anymore + Err(_) => { + return Ok(0); + } + } + } + } + }; + if data.is_empty() && self.read_buffer.as_ref().map(|s| s.len()).unwrap_or(0) == 0 { + return Ok(0); + } self.read_buffer.replace(Bytes::from(data)); } } } + +impl VirtualFile for WasiPipe { + fn last_accessed(&self) -> u64 { + 0 + } + fn last_modified(&self) -> u64 { + 0 + } + fn created_time(&self) -> u64 { + 0 + } + fn size(&self) -> u64 { + self.read_buffer + .as_ref() + .map(|s| s.len() as u64) + .unwrap_or_default() + } + fn set_len(&mut self, _: u64) -> Result<(), FsError> { + Ok(()) + } + fn unlink(&mut self) -> Result<(), FsError> { + Ok(()) + } + fn bytes_available_read(&self) -> Result, FsError> { + Ok(Some( + self.read_buffer + .as_ref() + .map(|s| s.len()) + .unwrap_or_default(), + )) + } +} diff --git a/lib/wasi/src/state/types.rs b/lib/wasi/src/state/types.rs index f4aca85ab3f..08b448409bd 100644 --- a/lib/wasi/src/state/types.rs +++ b/lib/wasi/src/state/types.rs @@ -3,12 +3,7 @@ use serde::{Deserialize, Serialize}; #[cfg(all(unix, feature = "sys-poll"))] use std::convert::TryInto; -use std::{ - collections::VecDeque, - io::{self, Read, Seek, Write}, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::time::Duration; use wasmer_vbus::BusError; use wasmer_wasi_types::wasi::{BusErrno, Errno}; @@ -376,79 +371,11 @@ pub(crate) fn poll( pub trait WasiPath {} -/// For piping stdio. Stores all output / input in a byte-vector. -#[derive(Debug, Clone, Default)] -#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))] -pub struct Pipe { - buffer: Arc>>, -} - -impl Pipe { - pub fn new() -> Self { - Self::default() - } -} - -impl Read for Pipe { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let mut buffer = self.buffer.lock().unwrap(); - let amt = std::cmp::min(buf.len(), buffer.len()); - let buf_iter = buffer.drain(..amt).enumerate(); - for (i, byte) in buf_iter { - buf[i] = byte; - } - Ok(amt) - } -} - -impl Write for Pipe { - fn write(&mut self, buf: &[u8]) -> io::Result { - let mut buffer = self.buffer.lock().unwrap(); - buffer.extend(buf); - Ok(buf.len()) - } - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - -impl Seek for Pipe { - fn seek(&mut self, _pos: io::SeekFrom) -> io::Result { - Err(io::Error::new( - io::ErrorKind::Other, - "can not seek in a pipe", - )) - } -} - -//#[cfg_attr(feature = "enable-serde", typetag::serde)] -impl VirtualFile for Pipe { - fn last_accessed(&self) -> u64 { - 0 - } - fn last_modified(&self) -> u64 { - 0 - } - fn created_time(&self) -> u64 { - 0 - } - fn size(&self) -> u64 { - let buffer = self.buffer.lock().unwrap(); - buffer.len() as u64 - } - fn set_len(&mut self, len: u64) -> Result<(), FsError> { - let mut buffer = self.buffer.lock().unwrap(); - buffer.resize(len as usize, 0); - Ok(()) - } - fn unlink(&mut self) -> Result<(), FsError> { - Ok(()) - } - fn bytes_available_read(&self) -> Result, FsError> { - let buffer = self.buffer.lock().unwrap(); - Ok(Some(buffer.len())) - } -} +#[deprecated( + since = "3.0.0-beta.2", + note = "Moved to `wasmer_wasi::pipe::WasiBidirectionalSharedPipePair`, `Pipe` is only a transitional reexport" +)] +pub use crate::state::WasiBidirectionalSharedPipePair as Pipe; /* TODO: Think about using this diff --git a/lib/wasi/src/syscalls/mod.rs b/lib/wasi/src/syscalls/mod.rs index 50e69877c17..42028fa26c9 100644 --- a/lib/wasi/src/syscalls/mod.rs +++ b/lib/wasi/src/syscalls/mod.rs @@ -43,7 +43,7 @@ use crate::{ state::{ self, fs_error_into_wasi_err, iterate_poll_events, net_error_into_wasi_err, poll, virtual_file_type_to_wasi_file_type, Inode, InodeSocket, InodeSocketKind, InodeVal, Kind, - PollEvent, PollEventBuilder, WasiPipe, WasiState, MAX_SYMLINKS, + PollEvent, PollEventBuilder, WasiBidirectionalPipePair, WasiState, MAX_SYMLINKS, }, Fd, WasiEnv, WasiError, WasiThread, WasiThreadId, }; @@ -1804,7 +1804,9 @@ pub fn fd_pipe( let env = ctx.data(); let (memory, state, mut inodes) = env.get_memory_and_wasi_state_and_inodes_mut(&ctx, 0); - let (pipe1, pipe2) = WasiPipe::new(); + let pipes = WasiBidirectionalPipePair::new(); + let pipe1 = pipes.send; + let pipe2 = pipes.recv; let inode1 = state.fs.create_inode_with_default_stat( inodes.deref_mut(), diff --git a/lib/wasi/tests/stdio.rs b/lib/wasi/tests/stdio.rs index 78b93520e1a..fc29889ebc1 100644 --- a/lib/wasi/tests/stdio.rs +++ b/lib/wasi/tests/stdio.rs @@ -1,7 +1,7 @@ use std::io::{Read, Write}; use wasmer::{Instance, Module, Store}; -use wasmer_wasi::{Pipe, WasiState}; +use wasmer_wasi::{WasiBidirectionalSharedPipePair, WasiState}; mod sys { #[test] @@ -73,10 +73,10 @@ fn test_stdout() { "#).unwrap(); // Create the `WasiEnv`. - let mut stdout = Pipe::default(); + let mut pipe = WasiBidirectionalSharedPipePair::new().with_blocking(false); let wasi_env = WasiState::new("command-name") .args(&["Gordon"]) - .stdout(Box::new(stdout.clone())) + .stdout(Box::new(pipe.clone())) .finalize(&mut store) .unwrap(); @@ -93,7 +93,7 @@ fn test_stdout() { start.call(&mut store, &[]).unwrap(); let mut stdout_str = String::new(); - stdout.read_to_string(&mut stdout_str).unwrap(); + pipe.read_to_string(&mut stdout_str).unwrap(); let stdout_as_str = stdout_str.as_str(); assert_eq!(stdout_as_str, "hello world\n"); } @@ -110,7 +110,7 @@ fn test_env() { }); // Create the `WasiEnv`. - let mut stdout = Pipe::new(); + let mut pipe = WasiBidirectionalSharedPipePair::new().with_blocking(false); let mut wasi_state_builder = WasiState::new("command-name"); wasi_state_builder .args(&["Gordon"]) @@ -119,7 +119,7 @@ fn test_env() { .env("TEST2", "VALUE2"); // panic!("envs: {:?}", wasi_state_builder.envs); let wasi_env = wasi_state_builder - .stdout(Box::new(stdout.clone())) + .stdout(Box::new(pipe.clone())) .finalize(&mut store) .unwrap(); @@ -136,7 +136,7 @@ fn test_env() { start.call(&mut store, &[]).unwrap(); let mut stdout_str = String::new(); - stdout.read_to_string(&mut stdout_str).unwrap(); + pipe.read_to_string(&mut stdout_str).unwrap(); let stdout_as_str = stdout_str.as_str(); assert_eq!(stdout_as_str, "Env vars:\nDOG=X\nTEST2=VALUE2\nTEST=VALUE\nDOG Ok(\"X\")\nDOG_TYPE Err(NotPresent)\nSET VAR Ok(\"HELLO\")\n"); } @@ -146,15 +146,16 @@ fn test_stdin() { let module = Module::new(&store, include_bytes!("stdin-hello.wasm")).unwrap(); // Create the `WasiEnv`. - let mut stdin = Pipe::new(); - let wasi_env = WasiState::new("command-name") - .stdin(Box::new(stdin.clone())) - .finalize(&mut store) - .unwrap(); + let mut pipe = WasiBidirectionalSharedPipePair::new().with_blocking(false); // Write to STDIN let buf = "Hello, stdin!\n".as_bytes().to_owned(); - stdin.write(&buf[..]).unwrap(); + pipe.write(&buf[..]).unwrap(); + + let wasi_env = WasiState::new("command-name") + .stdin(Box::new(pipe.clone())) + .finalize(&mut store) + .unwrap(); // Generate an `ImportObject`. let import_object = wasi_env.import_object(&mut store, &module).unwrap(); @@ -167,10 +168,10 @@ fn test_stdin() { // Let's call the `_start` function, which is our `main` function in Rust. let start = instance.exports.get_function("_start").unwrap(); let result = start.call(&mut store, &[]); - assert!(!result.is_err()); + assert!(result.is_ok()); // We assure stdin is now empty let mut buf = Vec::new(); - stdin.read_to_end(&mut buf).unwrap(); + pipe.read_to_end(&mut buf).unwrap(); assert_eq!(buf.len(), 0); } diff --git a/tests/integration/cli/tests/create_exe.rs b/tests/integration/cli/tests/create_exe.rs index 45969f7c69e..866bef6a5ae 100644 --- a/tests/integration/cli/tests/create_exe.rs +++ b/tests/integration/cli/tests/create_exe.rs @@ -277,7 +277,7 @@ fn create_obj(args: Vec<&'static str>, keyword_needle: &str, keyword: &str) -> a let object_path = operating_dir.join("wasm.obj"); let output: Vec = WasmerCreateObj { - current_dir: operating_dir.clone(), + current_dir: operating_dir, wasm_path, output_object_path: object_path.clone(), compiler: Compiler::Cranelift, @@ -292,7 +292,7 @@ fn create_obj(args: Vec<&'static str>, keyword_needle: &str, keyword: &str) -> a "create-obj successfully completed but object output file `{}` missing", object_path.display() ); - let mut object_header_path = object_path.clone(); + let mut object_header_path = object_path; object_header_path.set_extension("h"); assert!( object_header_path.exists(), diff --git a/tests/integration/ios/tests/dylib.rs b/tests/integration/ios/tests/dylib.rs index 7660a62e4aa..db452fffb60 100644 --- a/tests/integration/ios/tests/dylib.rs +++ b/tests/integration/ios/tests/dylib.rs @@ -40,9 +40,8 @@ mod tests { */ let command_success = command.status.success(); let test_success = !stderr.contains("** TEST FAILED **"); - let success = command_success && test_success; - success + command_success && test_success } fn remove_existing_artificats() -> Output { diff --git a/tests/lib/wast/src/wasi_wast.rs b/tests/lib/wast/src/wasi_wast.rs index e31993deb0f..08195b64bd6 100644 --- a/tests/lib/wast/src/wasi_wast.rs +++ b/tests/lib/wast/src/wasi_wast.rs @@ -7,8 +7,8 @@ use wasmer::{FunctionEnv, Imports, Instance, Module, Store}; use wasmer_vfs::{host_fs, mem_fs, FileSystem}; use wasmer_wasi::types::wasi::{Filesize, Timestamp}; use wasmer_wasi::{ - generate_import_object_from_env, get_wasi_version, FsError, Pipe, VirtualFile, WasiEnv, - WasiFunctionEnv, WasiState, WasiVersion, + generate_import_object_from_env, get_wasi_version, FsError, VirtualFile, + WasiBidirectionalPipePair, WasiEnv, WasiFunctionEnv, WasiState, WasiVersion, }; use wast::parser::{self, Parse, ParseBuffer, Parser}; @@ -142,7 +142,7 @@ impl<'a> WasiTest<'a> { )> { let mut builder = WasiState::new(self.wasm_path); - let stdin_pipe = Pipe::new(); + let stdin_pipe = WasiBidirectionalPipePair::new().with_blocking(false); builder.stdin(Box::new(stdin_pipe)); for (name, value) in &self.envs {