Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce latency when loading data from external loaders #4797

Merged
merged 2 commits into from
Jan 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 51 additions & 4 deletions crates/re_data_source/src/data_loader/loader_external.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::io::Read;
use std::{
io::Read,
sync::{atomic::AtomicBool, Arc},
};

use ahash::HashMap;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -152,6 +155,10 @@ impl crate::DataLoader for ExternalLoader {

re_log::debug!(?filepath, loader = ?exe, "Loading data from filesystem using external loader…",);

// A single value will be sent on this channel as soon as the child process starts
// streaming data to stdout.
let is_sending_data = Arc::new(AtomicBool::new(false));

let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let stdout = std::io::BufReader::new(stdout);
match re_log_encoding::decoder::Decoder::new(version_policy, stdout) {
Expand All @@ -164,9 +171,8 @@ impl crate::DataLoader for ExternalLoader {
.name(format!("decode_and_stream({filepath:?})"))
.spawn({
let filepath = filepath.clone();
move || {
decode_and_stream(&filepath, &tx, decoder);
}
let is_sending_data = Arc::clone(&is_sending_data);
move || decode_and_stream(&filepath, &tx, is_sending_data, decoder)
})
{
re_log::error!(?filepath, loader = ?exe, %err, "Failed to open spawn IO thread");
Expand All @@ -185,6 +191,43 @@ impl crate::DataLoader for ExternalLoader {
}
};

// We have to wait in order to know whether the child process is a compatible loader.
//
// This can manifest itself in two distinct ways:
// 1. If it exits immediately with an INCOMPATIBLE exit code, then we have our
// answer straight away.
// - If it starts streaming data, then we immediately assume it's compatible.
loop {
re_tracing::profile_scope!("waiting for compatibility");

match child.try_wait() {
Ok(Some(_)) => break,
Ok(None) => {
if is_sending_data.load(std::sync::atomic::Ordering::Relaxed) {
// The child process has started streaming data, it is therefore compatible.
// Let's get out ASAP.
re_log::debug!(loader = ?exe, ?filepath, "compatible external loader found");
tx_feedback.send(CompatibleLoaderFound).ok();
break; // we still want to check for errors once it finally exits!
}

// NOTE: This will busy loop if there's no work available in neither
// the rayon threadpool nor the native OS threadpool.
match rayon::yield_now() {
Some(rayon::Yield::Executed) => {}
_ => std::thread::yield_now(),
}

teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
Err(err) => {
re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
return;
}
};
}

// NOTE: `try_wait` and `wait` are idempotent.
let status = match child.wait() {
Ok(output) => output,
Err(err) => {
Expand Down Expand Up @@ -239,14 +282,18 @@ impl crate::DataLoader for ExternalLoader {
}
}

#[allow(clippy::needless_pass_by_value)]
fn decode_and_stream<R: std::io::Read>(
filepath: &std::path::Path,
tx: &std::sync::mpsc::Sender<crate::LoadedData>,
is_sending_data: Arc<AtomicBool>,
decoder: re_log_encoding::decoder::Decoder<R>,
) {
re_tracing::profile_function!(filepath.display().to_string());

for msg in decoder {
is_sending_data.store(true, std::sync::atomic::Ordering::Relaxed);

let msg = match msg {
Ok(msg) => msg,
Err(err) => {
Expand Down
Loading