Skip to content

Commit

Permalink
Big refactor:
Browse files Browse the repository at this point in the history
- use threads
- add timeouts
- split copy in two
- remove stream control API
  • Loading branch information
matklad committed Dec 23, 2024
1 parent 3835a18 commit 93fcf0e
Show file tree
Hide file tree
Showing 7 changed files with 654 additions and 430 deletions.
8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ categories = ["development-tools::build-utils", "filesystem"]
version = "0.2.7" # also update xshell-macros/Cargo.toml and CHANGELOG.md
license = "MIT OR Apache-2.0"
repository = "https://github.com/matklad/xshell"
authors = ["Aleksey Kladov <[email protected]>"]
authors = ["Alex Kladov <[email protected]>"]
edition = "2021"
rust-version = "1.63"

Expand All @@ -16,5 +16,11 @@ exclude = [".github/", "bors.toml", "rustfmt.toml", "cbench", "mock_bin/"]
[dependencies]
xshell-macros = { version = "=0.2.7", path = "./xshell-macros" }

[target.'cfg(unix)'.dependencies]
libc = "0.2.155"

[target.'cfg(windows)'.dependencies]
miow = "0.6.0"

[dev-dependencies]
anyhow = "1.0.56"
12 changes: 6 additions & 6 deletions examples/ci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ fn test(sh: &Shell) -> Result<()> {

{
let _s = Section::new("BUILD");
cmd!(sh, "cargo test --workspace --no-run").run()?;
cmd!(sh, "cargo test --workspace --no-run").run_echo()?;
}

{
let _s = Section::new("TEST");
cmd!(sh, "cargo test --workspace").run()?;
cmd!(sh, "cargo test --workspace").run_echo()?;
}
Ok(())
}
Expand All @@ -58,10 +58,10 @@ fn publish(sh: &Shell) -> Result<()> {
if current_branch == "master" && !tag_exists {
// Could also just use `CARGO_REGISTRY_TOKEN` environmental variable.
let token = sh.env_var("CRATES_IO_TOKEN").unwrap_or("DUMMY_TOKEN".to_string());
cmd!(sh, "git tag v{version}").run()?;
cmd!(sh, "cargo publish --token {token} --package xshell-macros").run()?;
cmd!(sh, "cargo publish --token {token} --package xshell").run()?;
cmd!(sh, "git push --tags").run()?;
cmd!(sh, "git tag v{version}").run_echo()?;
cmd!(sh, "cargo publish --token {token} --package xshell-macros").run_echo()?;
cmd!(sh, "cargo publish --token {token} --package xshell").run_echo()?;
cmd!(sh, "git push --tags").run_echo()?;
}
Ok(())
}
Expand Down
130 changes: 88 additions & 42 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
sync::Arc,
};

use crate::Cmd;
use crate::{Cmd, STREAM_SUFFIX_SIZE};

/// `Result` from std, with the error type defaulting to xshell's [`Error`].
pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand All @@ -29,10 +29,7 @@ enum ErrorKind {
HardLink { err: io::Error, src: PathBuf, dst: PathBuf },
CreateDir { err: io::Error, path: PathBuf },
RemovePath { err: io::Error, path: PathBuf },
CmdStatus { cmd: Cmd, status: ExitStatus },
CmdIo { err: io::Error, cmd: Cmd },
CmdUtf8 { err: FromUtf8Error, cmd: Cmd },
CmdStdin { err: io::Error, cmd: Cmd },
Cmd(CmdError),
}

impl From<ErrorKind> for Error {
Expand All @@ -42,6 +39,20 @@ impl From<ErrorKind> for Error {
}
}

struct CmdError {
cmd: Cmd,
kind: CmdErrorKind,
stdout: Vec<u8>,
stderr: Vec<u8>,
}

pub(crate) enum CmdErrorKind {
Io(io::Error),
Utf8(FromUtf8Error),
Status(ExitStatus),
Timeout,
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &*self.kind {
Expand Down Expand Up @@ -84,45 +95,71 @@ impl fmt::Display for Error {
let path = path.display();
write!(f, "failed to remove path `{path}`: {err}")
}
ErrorKind::CmdStatus { cmd, status } => match status.code() {
Some(code) => write!(f, "command exited with non-zero code `{cmd}`: {code}"),
ErrorKind::Cmd(cmd) => fmt::Display::fmt(cmd, f),
}?;
Ok(())
}
}

impl fmt::Debug for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(self, f)
}
}
impl std::error::Error for Error {}

impl fmt::Display for CmdError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let nl = if (self.stdout.len() > 0 || self.stderr.len() > 0)
&& !matches!(self.kind, CmdErrorKind::Utf8(_))
{
"\n"
} else {
""
};
let cmd = &self.cmd;
match &self.kind {
CmdErrorKind::Status(status) => match status.code() {
Some(code) => write!(f, "command exited with non-zero code `{cmd}`: {code}{nl}")?,
#[cfg(unix)]
None => {
use std::os::unix::process::ExitStatusExt;
match status.signal() {
Some(sig) => write!(f, "command was terminated by a signal `{cmd}`: {sig}"),
None => write!(f, "command was terminated by a signal `{cmd}`"),
Some(sig) => {
write!(f, "command was terminated by a signal `{cmd}`: {sig}{nl}")?
}
None => write!(f, "command was terminated by a signal `{cmd}`{nl}")?,
}
}
#[cfg(not(unix))]
None => write!(f, "command was terminated by a signal `{cmd}`"),
None => write!(f, "command was terminated by a signal `{cmd}`{nl}"),
},
ErrorKind::CmdIo { err, cmd } => {
CmdErrorKind::Utf8(err) => {
write!(f, "command produced invalid utf-8 `{cmd}`: {err}")?;
return Ok(());
}
CmdErrorKind::Io(err) => {
if err.kind() == io::ErrorKind::NotFound {
let prog = cmd.prog.as_path().display();
write!(f, "command not found: `{prog}`")
let prog = self.cmd.prog.as_path().display();
write!(f, "command not found: `{prog}`{nl}")?;
} else {
write!(f, "io error when running command `{cmd}`: {err}")
write!(f, "io error when running command `{cmd}`: {err}{nl}")?;
}
}
ErrorKind::CmdUtf8 { err, cmd } => {
write!(f, "failed to decode output of command `{cmd}`: {err}")
}
ErrorKind::CmdStdin { err, cmd } => {
write!(f, "failed to write to stdin of command `{cmd}`: {err}")
CmdErrorKind::Timeout => {
write!(f, "command timed out `{cmd}`{nl}")?;
}
}?;
}
if self.stdout.len() > 0 {
write!(f, "stdout suffix\n:{}\n", String::from_utf8_lossy(&self.stdout))?;
}
if self.stderr.len() > 0 {
write!(f, "stderr suffix:\n:{}\n", String::from_utf8_lossy(&self.stderr))?;
}
Ok(())
}
}

impl fmt::Debug for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(self, f)
}
}
impl std::error::Error for Error {}

/// `pub(crate)` constructors, visible only in this crate.
impl Error {
pub(crate) fn new_current_dir(err: io::Error, path: Option<Arc<Path>>) -> Error {
Expand Down Expand Up @@ -161,24 +198,33 @@ impl Error {
ErrorKind::RemovePath { err, path }.into()
}

pub(crate) fn new_cmd_status(cmd: &Cmd, status: ExitStatus) -> Error {
let cmd = cmd.clone();
ErrorKind::CmdStatus { cmd, status }.into()
}

pub(crate) fn new_cmd_io(cmd: &Cmd, err: io::Error) -> Error {
let cmd = cmd.clone();
ErrorKind::CmdIo { err, cmd }.into()
}
pub(crate) fn new_cmd(
cmd: &Cmd,
kind: CmdErrorKind,
mut stdout: Vec<u8>,
mut stderr: Vec<u8>,
) -> Error {
// Try to determine whether the command failed because the current
// directory does not exist. Return an appropriate error in such a
// case.
if let CmdErrorKind::Io(err) = &kind {
if err.kind() == io::ErrorKind::NotFound {
if let Err(err) = cmd.sh.cwd.metadata() {
return Error::new_current_dir(err, Some(cmd.sh.cwd.clone()));
}
}
}

pub(crate) fn new_cmd_utf8(cmd: &Cmd, err: FromUtf8Error) -> Error {
let cmd = cmd.clone();
ErrorKind::CmdUtf8 { err, cmd }.into()
}
fn trim(xs: &mut Vec<u8>, size: usize) {
if xs.len() > size {
xs.drain(..xs.len() - size);
}
}

pub(crate) fn new_cmd_stdin(cmd: &Cmd, err: io::Error) -> Error {
let cmd = cmd.clone();
ErrorKind::CmdStdin { err, cmd }.into()
trim(&mut stdout, STREAM_SUFFIX_SIZE);
trim(&mut stderr, STREAM_SUFFIX_SIZE);
ErrorKind::Cmd(CmdError { cmd, kind, stdout, stderr }).into()
}
}

Expand Down
160 changes: 160 additions & 0 deletions src/exec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
//! Executes the process, feeding it stdin, reading stdout/stderr (up to the specified limit), and
//! imposing a deadline.
//!
//! This really is quite unhappy code, wasting whopping four threads for the task _and_ including a
//! sleepy loop! This is not system programming, just a pile of work-around. What is my excuse?
//!
//! The _right_ way to do this is of course by using evened syscalls --- concurrently await stream
//! io, timeout, and process termination. The _first_ two kinda-sorta solvable, see the `read2`
//! module in Cargo. For unix, we through fds into a epoll via libc, for windows we use completion
//! ports via miow. That's some ugly platform-specific code and two dependencies, but doable.
//!
//! Both poll and completion ports naturally have a timeout, so that's doable as well. However,
//! tying process termination into the same epoll is not really possible. One can use pidfd's on
//! Linux, but that's even _more_ platform specific code, and there are other UNIXes.
//!
//! Given that, if I were to use evented IO, I'd have to pull dependencies, write a bunch of
//! platform-specific glue code _and_ write some from scratch things for waiting, I decided to stick
//! to blocking APIs.
//!
//! This should be easy, right? Just burn a thread per asynchronous operation! Well, the `wait`
//! strikes again! Both `.kill` and `.wait` require `&mut Child`, so you can't wait on the main
//! thread, and `.kill` from the timeout thread. One can think that that's just deficiency of Rust
//! API, but, now, this is again just UNIX. Both kill and wait operate on pids, and a pid can be
//! re-used immediately after wait. As far as I understand, this is a race condition you can't lock
//! your way out of. Hence the sleepy loop in wait_deadline.
use std::{
collections::VecDeque,
io::{self, Read, Write},
process::{Child, ExitStatus, Stdio},
time::{Duration, Instant},
};

#[derive(Default)]
pub(crate) struct ExecResult {
pub(crate) stdout: Vec<u8>,
pub(crate) stderr: Vec<u8>,
pub(crate) status: Option<ExitStatus>,
pub(crate) error: Option<io::Error>,
}

pub(crate) fn wait_deadline(
child: &mut Child,
deadline: Option<Instant>,
) -> io::Result<ExitStatus> {
let Some(deadline) = deadline else {
return child.wait();
};

let mut sleep_ms = 1;
let sleep_ms_max = 64;
loop {
match child.try_wait()? {
Some(status) => return Ok(status),
None => {}
}
if Instant::now() > deadline {
let _ = child.kill();
let _ = child.wait();
return Err(io::ErrorKind::TimedOut.into());
}
std::thread::sleep(Duration::from_millis(sleep_ms));
sleep_ms = std::cmp::min(sleep_ms * 2, sleep_ms_max);
}
}

pub(crate) fn exec(
mut command: std::process::Command,
stdin_contents: Option<&[u8]>,
stdout_limit: Option<usize>,
stderr_limit: Option<usize>,
deadline: Option<Instant>,
) -> ExecResult {
let mut result = ExecResult::default();
command.stdin(if stdin_contents.is_some() { Stdio::inherit() } else { Stdio::null() });
command.stdout(Stdio::piped());
command.stdout(Stdio::piped());
let mut child = match command.spawn() {
Ok(it) => it,
Err(err) => {
result.error = Some(err);
return result;
}
};

let stdin = child.stdin.take();
let mut in_error = Ok(());

let mut stdout = child.stdout.take().unwrap();
let mut out_deque = VecDeque::new();
let mut out_error = Ok(());

let mut stderr = child.stderr.take().unwrap();
let mut err_deque = VecDeque::new();
let mut err_error = Ok(());

let status = std::thread::scope(|scope| {
if let Some(stdin_contents) = stdin_contents {
scope.spawn(|| in_error = stdin.unwrap().write_all(stdin_contents));
}
scope.spawn(|| {
out_error = (|| {
let mut buffer = [0u8; 4096];
loop {
let n = stdout.read(&mut buffer)?;
if n == 0 {
return Ok(());
}
out_deque.extend(buffer[0..n].iter().copied());
let excess = out_deque.len().saturating_sub(stdout_limit.unwrap_or(usize::MAX));
if excess > 0 {
out_deque.drain(..excess);
}
}
})()
});
scope.spawn(|| {
err_error = (|| {
let mut buffer = [0u8; 4096];
loop {
let n = stderr.read(&mut buffer)?;
if n == 0 {
return Ok(());
}
err_deque.extend(buffer[0..n].iter().copied());
let excess = err_deque.len().saturating_sub(stderr_limit.unwrap_or(usize::MAX));
if excess > 0 {
err_deque.drain(..excess);
}
}
})()
});

wait_deadline(&mut child, deadline)
});

if let Err(err) = err_error {
result.error = err;
}

if let Err(err) = out_error {
result.error = err;
}

if let Err(err) = in_error {
if err.kind() != io::ErrorKind::BrokenPipe {
result.error = Some(err);
}
}

match status {
Ok(status) => result.status = Some(status),
Err(err) => result.error = Some(err),
}

result.stdout = out_deque.into();
result.stderr = err_deque.into();

result
}
Loading

0 comments on commit 93fcf0e

Please sign in to comment.