Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Jan 15, 2024
1 parent 1748969 commit aab0e23
Showing 1 changed file with 16 additions and 16 deletions.
32 changes: 16 additions & 16 deletions crates/re_data_source/src/data_loader/loader_external.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::io::Read;
use std::{
io::Read,
sync::{atomic::AtomicBool, Arc},
};

use ahash::HashMap;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -154,8 +157,7 @@ impl crate::DataLoader for ExternalLoader {

// A single value will be sent on this channel as soon as the child process starts
// streaming data to stdout.
let (tx_is_sending_data, rx_is_sending_data) =
std::sync::mpsc::sync_channel::<()>(1);
let is_sending_data = Arc::new(AtomicBool::new(false));

let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let stdout = std::io::BufReader::new(stdout);
Expand All @@ -169,9 +171,8 @@ impl crate::DataLoader for ExternalLoader {
.name(format!("decode_and_stream({filepath:?})"))
.spawn({
let filepath = filepath.clone();
move || {
decode_and_stream(&filepath, &tx, tx_is_sending_data, decoder);
}
let is_sending_data = Arc::clone(&is_sending_data);
move || decode_and_stream(&filepath, &tx, is_sending_data, decoder)
})
{
re_log::error!(?filepath, loader = ?exe, %err, "Failed to open spawn IO thread");
Expand All @@ -197,17 +198,21 @@ impl crate::DataLoader for ExternalLoader {
// answer straight away.
// - If it starts streaming data, then we immediately assume it's compatible.
loop {
re_tracing::profile_scope!("waiting for compatibility");

match child.try_wait() {
Ok(Some(_)) => break,
Ok(None) => {
// The child process has started streaming data, it is therefore compatible.
// Let's get out ASAP.
if rx_is_sending_data.try_recv().is_ok() {
if is_sending_data.load(std::sync::atomic::Ordering::Relaxed) {
// The child process has started streaming data, it is therefore compatible.
// Let's get out ASAP.
re_log::debug!(loader = ?exe, ?filepath, "compatible external loader found");
tx_feedback.send(CompatibleLoaderFound).ok();
break; // we still want to check for errors once it finally exits!
}

// NOTE: This will busy loop if there's no work available in neither
// the rayon threadpool nor the native OS threadpool.
match rayon::yield_now() {
Some(rayon::Yield::Executed) => {}
_ => std::thread::yield_now(),
Expand Down Expand Up @@ -281,18 +286,13 @@ impl crate::DataLoader for ExternalLoader {
fn decode_and_stream<R: std::io::Read>(
filepath: &std::path::Path,
tx: &std::sync::mpsc::Sender<crate::LoadedData>,
tx_is_sending_data: std::sync::mpsc::SyncSender<()>,
is_sending_data: Arc<AtomicBool>,
decoder: re_log_encoding::decoder::Decoder<R>,
) {
re_tracing::profile_function!(filepath.display().to_string());

let mut is_waiting_for_data = true;

for msg in decoder {
if is_waiting_for_data {
is_waiting_for_data = false;
tx_is_sending_data.send(()).ok();
}
is_sending_data.store(true, std::sync::atomic::Ordering::Relaxed);

let msg = match msg {
Ok(msg) => msg,
Expand Down

0 comments on commit aab0e23

Please sign in to comment.