Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move wasmtime-wasi's unit test for stdin to a separate integration test #7564

Merged
merged 3 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/wasi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,8 @@ preview1-on-preview2 = [
"preview2",
"wiggle",
]

[[test]]
name = "process_stdin"
harness = false

191 changes: 0 additions & 191 deletions crates/wasi/src/preview2/stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,194 +257,3 @@ impl<T: WasiView> terminal_stderr::Host for T {
}
}
}

#[cfg(all(unix, test))]
mod test {
use crate::preview2::HostInputStream;
use libc;
use std::fs::File;
use std::io::{BufRead, BufReader, Write};
use std::os::fd::FromRawFd;

fn test_child_stdin<T, P>(child: T, parent: P)
where
T: FnOnce(File),
P: FnOnce(File, BufReader<File>),
{
unsafe {
// Make pipe for emulating stdin.
let mut stdin_fds: [libc::c_int; 2] = [0; 2];
assert_eq!(
libc::pipe(stdin_fds.as_mut_ptr()),
0,
"Failed to create stdin pipe"
);
let [stdin_read, stdin_write] = stdin_fds;

// Make pipe for getting results.
let mut result_fds: [libc::c_int; 2] = [0; 2];
assert_eq!(
libc::pipe(result_fds.as_mut_ptr()),
0,
"Failed to create result pipe"
);
let [result_read, result_write] = result_fds;

let child_pid = libc::fork();
if child_pid == 0 {
libc::close(stdin_write);
libc::close(result_read);

libc::close(libc::STDIN_FILENO);
libc::dup2(stdin_read, libc::STDIN_FILENO);

let result_write = File::from_raw_fd(result_write);
child(result_write);
} else {
libc::close(stdin_read);
libc::close(result_write);

let stdin_write = File::from_raw_fd(stdin_write);
let result_read = BufReader::new(File::from_raw_fd(result_read));
parent(stdin_write, result_read);
}
}
}

// This could even be parameterized somehow to use the worker thread stdin vs the asyncfd
// stdin.
fn test_stdin_by_forking<S, T>(mk_stdin: T)
where
S: HostInputStream,
T: Fn() -> S,
{
test_child_stdin(
|mut result_write| {
let mut child_running = true;
while child_running {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
'task: loop {
println!("child: creating stdin");
let mut stdin = mk_stdin();

println!("child: checking that stdin is not ready");
assert!(
tokio::time::timeout(
std::time::Duration::from_millis(100),
stdin.ready()
)
.await
.is_err(),
"stdin available too soon"
);

writeln!(&mut result_write, "start").unwrap();

println!("child: started");

let mut buffer = String::new();
loop {
println!("child: waiting for stdin to be ready");
stdin.ready().await;

println!("child: reading input");
// We can't effectively test for the case where stdin was closed, so panic if it is...
let bytes = stdin.read(1024).unwrap();

println!("child got: {:?}", bytes);

buffer.push_str(std::str::from_utf8(bytes.as_ref()).unwrap());
if let Some((line, rest)) = buffer.split_once('\n') {
if line == "all done" {
writeln!(&mut result_write, "done").unwrap();
println!("child: exiting...");
child_running = false;
break 'task;
} else if line == "restart_runtime" {
writeln!(&mut result_write, "restarting").unwrap();
println!("child: restarting runtime...");
break 'task;
} else if line == "restart_task" {
writeln!(&mut result_write, "restarting").unwrap();
println!("child: restarting task...");
continue 'task;
} else {
writeln!(&mut result_write, "{}", line).unwrap();
}

buffer = rest.to_owned();
}
}
}
});
println!("runtime exited");
}
println!("child exited");
},
|mut stdin_write, mut result_read| {
let mut line = String::new();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, "start\n");

for i in 0..5 {
let message = format!("some bytes {}\n", i);
stdin_write.write_all(message.as_bytes()).unwrap();
line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, message);
}

writeln!(&mut stdin_write, "restart_task").unwrap();
line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, "restarting\n");
line.clear();

result_read.read_line(&mut line).unwrap();
assert_eq!(line, "start\n");

for i in 0..10 {
let message = format!("more bytes {}\n", i);
stdin_write.write_all(message.as_bytes()).unwrap();
line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, message);
}

writeln!(&mut stdin_write, "restart_runtime").unwrap();
line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, "restarting\n");
line.clear();

result_read.read_line(&mut line).unwrap();
assert_eq!(line, "start\n");

for i in 0..17 {
let message = format!("even more bytes {}\n", i);
stdin_write.write_all(message.as_bytes()).unwrap();
line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, message);
}

writeln!(&mut stdin_write, "all done").unwrap();

line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, "done\n");
},
)
}

// This test doesn't work under qemu because of the use of fork in the test helper.
#[test]
#[cfg_attr(not(target_arch = "x86_64"), ignore)]
fn test_worker_thread_stdin() {
test_stdin_by_forking(super::worker_thread_stdin::stdin);
}
}
165 changes: 165 additions & 0 deletions crates/wasi/tests/process_stdin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
use std::io::{BufRead, Write};
use std::process::Command;
use wasmtime_wasi::preview2::{HostInputStream, Subscribe};

const VAR_NAME: &str = "__CHILD_PROCESS";

fn main() {
if cfg!(miri) {
return;
}
// Skip this tests if it looks like we're in a cross-compiled situation and
// we're emulating this test for a different platform. In that scenario
// emulators (like QEMU) tend to not report signals the same way and such.
if std::env::vars()
.filter(|(k, _v)| k.starts_with("CARGO_TARGET") && k.ends_with("RUNNER"))
.count()
> 0
{
return;
}

match std::env::var(VAR_NAME) {
Ok(_) => child_process(),
Err(_) => parent_process(),
}

fn child_process() {
let mut result_write = std::io::stderr();
let mut child_running = true;
while child_running {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
'task: loop {
println!("child: creating stdin");
let mut stdin = wasmtime_wasi::preview2::stdin();

println!("child: checking that stdin is not ready");
assert!(
tokio::time::timeout(
std::time::Duration::from_millis(100),
stdin.ready()
)
.await
.is_err(),
"stdin available too soon"
);

writeln!(&mut result_write, "start").unwrap();

println!("child: started");

let mut buffer = String::new();
loop {
println!("child: waiting for stdin to be ready");
stdin.ready().await;

println!("child: reading input");
// We can't effectively test for the case where stdin was closed, so panic if it is...
let bytes = stdin.read(1024).unwrap();

println!("child got: {:?}", bytes);

buffer.push_str(std::str::from_utf8(bytes.as_ref()).unwrap());
if let Some((line, rest)) = buffer.split_once('\n') {
if line == "all done" {
writeln!(&mut result_write, "done").unwrap();
println!("child: exiting...");
child_running = false;
break 'task;
} else if line == "restart_runtime" {
writeln!(&mut result_write, "restarting").unwrap();
println!("child: restarting runtime...");
break 'task;
} else if line == "restart_task" {
writeln!(&mut result_write, "restarting").unwrap();
println!("child: restarting task...");
continue 'task;
} else {
writeln!(&mut result_write, "{}", line).unwrap();
}

buffer = rest.to_owned();
}
}
}
});
println!("child: runtime exited");
}
println!("child: exiting");
}
}

fn parent_process() {
let me = std::env::current_exe().unwrap();
let mut cmd = Command::new(me);
cmd.env(VAR_NAME, "1");
cmd.stdin(std::process::Stdio::piped());

if std::env::args().any(|arg| arg == "--nocapture") {
cmd.stdout(std::process::Stdio::inherit());
} else {
cmd.stdout(std::process::Stdio::null());
}

cmd.stderr(std::process::Stdio::piped());
let mut child = cmd.spawn().unwrap();

let mut stdin_write = child.stdin.take().unwrap();
let mut result_read = std::io::BufReader::new(child.stderr.take().unwrap());

let mut line = String::new();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, "start\n");

for i in 0..5 {
let message = format!("some bytes {}\n", i);
stdin_write.write_all(message.as_bytes()).unwrap();
line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, message);
}

writeln!(&mut stdin_write, "restart_task").unwrap();
line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, "restarting\n");
line.clear();

result_read.read_line(&mut line).unwrap();
assert_eq!(line, "start\n");

for i in 0..10 {
let message = format!("more bytes {}\n", i);
stdin_write.write_all(message.as_bytes()).unwrap();
line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, message);
}

writeln!(&mut stdin_write, "restart_runtime").unwrap();
line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, "restarting\n");
line.clear();

result_read.read_line(&mut line).unwrap();
assert_eq!(line, "start\n");

for i in 0..17 {
let message = format!("even more bytes {}\n", i);
stdin_write.write_all(message.as_bytes()).unwrap();
line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, message);
}

writeln!(&mut stdin_write, "all done").unwrap();

line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, "done\n");
}