Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 37 additions & 23 deletions crates/pixi_build_frontend/src/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use pixi_build_types::{
};
use pixi_manifest::PackageManifest;
use rattler_conda_types::ChannelConfig;
use stderr::{stderr_null, stderr_stream};
use stderr::stderr_stream;
use thiserror::Error;
use tokio::{
io::{AsyncBufReadExt, BufReader, Lines},
Expand All @@ -39,6 +39,7 @@ use tokio::{

use crate::{
jsonrpc::{stdio_transport, RpcParams},
protocols::stderr::stderr_buffer,
tool::Tool,
CondaBuildReporter, CondaMetadataReporter,
};
Expand All @@ -64,6 +65,8 @@ pub enum ProtocolError {
"Ensure that the build backend implements the JSON-RPC protocol correctly."
))]
JsonRpc(String, #[source] ClientError),
#[error("the build backend ({0}) exited prematurely.\nBuild backend output:\n\n{1}")]
PrematureExit(String, String),
#[error("received invalid response from the build backend ({0}) when calling '{1}'")]
ParseError(String, String, #[source] serde_json::Error),
#[error(transparent)]
Expand Down Expand Up @@ -95,6 +98,7 @@ impl ProtocolError {
err: ClientError,
method: &str,
root_dir: &Path,
backend_output: Option<String>,
) -> Self {
match err {
Error::Call(err) if err.code() > -32001 => {
Expand All @@ -103,6 +107,10 @@ impl ProtocolError {
Error::Call(err) if err.code() == ErrorCode::MethodNotFound.code() => {
Self::MethodNotImplemented(backend_identifier, method.to_string())
}
Error::RestartNeeded(_err) if backend_output.is_some() => Self::PrematureExit(
backend_identifier,
backend_output.expect("safe because checked above"),
),
Error::ParseError(err) => Self::ParseError(backend_identifier, method.to_string(), err),
e => Self::JsonRpc(backend_identifier, e),
}
Expand Down Expand Up @@ -252,6 +260,7 @@ impl JsonRPCBuildProtocol {
err,
procedures::negotiate_capabilities::METHOD_NAME,
manifest_path.parent().unwrap_or(&manifest_path),
None,
)
})?;

Expand Down Expand Up @@ -279,6 +288,7 @@ impl JsonRPCBuildProtocol {
err,
procedures::initialize::METHOD_NAME,
manifest_path.parent().unwrap_or(&manifest_path),
None,
)
})?;

Expand All @@ -304,7 +314,7 @@ impl JsonRPCBuildProtocol {
// Cancellation signal
let (cancel_tx, cancel_rx) = oneshot::channel();
// Spawn the stderr forwarding task
let handle = tokio::spawn(stderr_null(stderr.clone(), cancel_rx));
let handle = tokio::spawn(stderr_buffer(stderr.clone(), cancel_rx));
(cancel_tx, handle)
});

Expand All @@ -317,33 +327,37 @@ impl JsonRPCBuildProtocol {
procedures::conda_metadata::METHOD_NAME,
RpcParams::from(request),
)
.await
.map_err(|err| {
ProtocolError::from_client_error(
self.backend_identifier.clone(),
err,
procedures::conda_metadata::METHOD_NAME,
self.manifest_path.parent().unwrap_or(&self.manifest_path),
)
});
.await;

// Wait for the stderr sink to finish, by signaling it to stop
if let Some((cancel_tx, handle)) = stderr {
// Cancel the stderr forwarding
if cancel_tx.send(()).is_err() {
return Err(ProtocolError::StdErrPipeStopped);
}
handle.await.map_or_else(
let backend_output = if let Some((cancel_tx, handle)) = stderr {
// Cancel the stderr forwarding. Ignore any error because that means the
// tasks also finished.
let _err = cancel_tx.send(());
let lines = handle.await.map_or_else(
|e| match e.try_into_panic() {
Ok(panic) => std::panic::resume_unwind(panic),
Err(_) => Err(ProtocolError::StdErrPipeStopped),
},
|e| e.map_err(|_| ProtocolError::StdErrPipeStopped),
)?;
}

Some(lines)
} else {
None
};

reporter.on_metadata_end(operation);
result

result.map_err(|err| {
ProtocolError::from_client_error(
self.backend_identifier.clone(),
err,
procedures::conda_metadata::METHOD_NAME,
self.manifest_path.parent().unwrap_or(&self.manifest_path),
backend_output,
)
})
}

/// Build a specific conda package output
Expand Down Expand Up @@ -373,6 +387,7 @@ impl JsonRPCBuildProtocol {
err,
procedures::conda_build::METHOD_NAME,
self.manifest_path.parent().unwrap_or(&self.manifest_path),
None,
)
});

Expand Down Expand Up @@ -400,10 +415,9 @@ impl JsonRPCBuildProtocol {
}
};

// Cancel the stderr forwarding
if cancel_tx.send(()).is_err() {
return Err(ProtocolError::StdErrPipeStopped);
}
// Cancel the stderr forwarding. Ignore any error because it means the task
// already ended.
let _err = cancel_tx.send(());

// Wait for the stderr forwarding to finish, it should because we cancelled
handle.await.map_or_else(
Expand Down
63 changes: 38 additions & 25 deletions crates/pixi_build_frontend/src/protocols/stderr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,48 +6,61 @@ use tokio::{
sync::{mpsc, oneshot, Mutex},
};

/// Stderr sink that captures the stderr output of the backend
/// but does not do anything with it.
pub(crate) async fn stderr_null(
/// Stderr stream that captures the stderr output of the backend
/// and sends it over the stream.
pub(crate) async fn stderr_stream(
buffer: Arc<Mutex<Lines<BufReader<ChildStderr>>>>,
sender: mpsc::Sender<String>,
cancel: oneshot::Receiver<()>,
) -> Result<(), std::io::Error> {
// Create a future that continuously read from the buffer and sends individual
// lines over a channel.
let read_and_forward = async {
let mut lines = buffer.lock().await;
while let Some(line) = lines.next_line().await? {
if let Err(err) = sender.send(line).await {
return Err(std::io::Error::new(std::io::ErrorKind::Other, err));
}
}
Ok(())
};

// Either await until the cancel signal is received or the `read_and_forward`
// future is done, meaning there is not more data to read.
tokio::select! {
// Please stop
_ = cancel => {
Ok(())
}
// Please keep reading
result = async {
let mut lines = buffer.lock().await;
while let Some(_line) = lines.next_line().await? {}
Ok(())
} => {
result = read_and_forward => {
result
}
}
}

/// Stderr stream that captures the stderr output of the backend
/// and sends it over the stream.
pub(crate) async fn stderr_stream(
/// Stderr stream that captures the stderr output of the backend and stores it
/// in a buffer for later use.
pub(crate) async fn stderr_buffer(
buffer: Arc<Mutex<Lines<BufReader<ChildStderr>>>>,
sender: mpsc::Sender<String>,
cancel: oneshot::Receiver<()>,
) -> Result<(), std::io::Error> {
) -> Result<String, std::io::Error> {
// Create a future that continuously read from the buffer and stores the lines
// until all data is received.
let mut lines = Vec::new();
let read_and_buffer = async {
let mut buffer = buffer.lock().await;
while let Some(line) = buffer.next_line().await? {
lines.push(line);
}
Ok(lines.join("\n"))
};

// Either wait until the cancel signal is received or the `read_and_buffer`
// finishes which means there is no more data to read.
tokio::select! {
_ = cancel => {
Ok(())
Ok(lines.join("\n"))
}
result = async {
let mut lines = buffer.lock().await;
while let Some(line) = lines.next_line().await? {
if let Err(err) = sender.send(line).await {
return Err(std::io::Error::new(std::io::ErrorKind::Other, err));
}
}
Ok(())
} => {
result = read_and_buffer => {
result
}
}
Expand Down
Loading