Skip to content

Commit

Permalink
immediate compatiblity feedback for external loaders
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Jan 12, 2024
1 parent ef2c8ff commit 4158835
Showing 1 changed file with 48 additions and 1 deletion.
49 changes: 48 additions & 1 deletion crates/re_data_source/src/data_loader/loader_external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ impl crate::DataLoader for ExternalLoader {

re_log::debug!(?filepath, loader = ?exe, "Loading data from filesystem using external loader…",);

// A single value will be sent on this channel as soon as the child process starts
// streaming data to stdout.
let (tx_is_sending_data, rx_is_sending_data) =
std::sync::mpsc::sync_channel::<()>(1);

let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let stdout = std::io::BufReader::new(stdout);
match re_log_encoding::decoder::Decoder::new(version_policy, stdout) {
Expand All @@ -165,7 +170,7 @@ impl crate::DataLoader for ExternalLoader {
.spawn({
let filepath = filepath.clone();
move || {
decode_and_stream(&filepath, &tx, decoder);
decode_and_stream(&filepath, &tx, tx_is_sending_data, decoder);
}
})
{
Expand All @@ -185,6 +190,39 @@ impl crate::DataLoader for ExternalLoader {
}
};

// We have to wait in order to know whether the child process is a compatible loader.
//
// This can manifest itself in two distinct ways:
// 1. If it exits immediately with an INCOMPATIBLE exit code, then we have our
// answer straight away.
// - If it starts streaming data, then we immediately assume it's compatible.
loop {
match child.try_wait() {
Ok(Some(_)) => break,
Ok(None) => {
// The child process has started streaming data, it is therefore compatible.
// Let's get out ASAP.
if rx_is_sending_data.try_recv().is_ok() {
re_log::debug!(loader = ?exe, ?filepath, "compatible external loader found");
tx_feedback.send(CompatibleLoaderFound).ok();
break; // we still want to check for errors once it finally exits!
}

match rayon::yield_now() {
Some(rayon::Yield::Executed) => {}
_ => std::thread::yield_now(),
}

continue;
}
Err(err) => {
re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
return;
}
};
}

// NOTE: `try_wait` and `wait` are idempotent.
let status = match child.wait() {
Ok(output) => output,
Err(err) => {
Expand Down Expand Up @@ -239,14 +277,23 @@ impl crate::DataLoader for ExternalLoader {
}
}

#[allow(clippy::needless_pass_by_value)]
fn decode_and_stream<R: std::io::Read>(
filepath: &std::path::Path,
tx: &std::sync::mpsc::Sender<crate::LoadedData>,
tx_is_sending_data: std::sync::mpsc::SyncSender<()>,
decoder: re_log_encoding::decoder::Decoder<R>,
) {
re_tracing::profile_function!(filepath.display().to_string());

let mut is_waiting_for_data = true;

for msg in decoder {
if is_waiting_for_data {
is_waiting_for_data = false;
tx_is_sending_data.send(()).ok();
}

let msg = match msg {
Ok(msg) => msg,
Err(err) => {
Expand Down

0 comments on commit 4158835

Please sign in to comment.