Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataLoaders 6: first-class support for Incompatible #4565

Merged
merged 10 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions crates/re_data_source/src/data_loader/loader_archetype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl DataLoader for ArchetypeLoader {
use anyhow::Context as _;

if filepath.is_dir() {
return Ok(()); // simply not interested
return Err(crate::DataLoaderError::Incompatible(filepath.clone()));
}

re_tracing::profile_function!(filepath.display().to_string());
Expand All @@ -45,6 +45,11 @@ impl DataLoader for ArchetypeLoader {
contents: std::borrow::Cow<'_, [u8]>,
tx: std::sync::mpsc::Sender<LoadedData>,
) -> Result<(), crate::DataLoaderError> {
let extension = crate::extension(&filepath);
if !crate::is_supported_file_extension(&extension) {
return Err(crate::DataLoaderError::Incompatible(filepath.clone()));
}

re_tracing::profile_function!(filepath.display().to_string());

let entity_path = EntityPath::from_file_path(&filepath);
Expand Down Expand Up @@ -76,8 +81,6 @@ impl DataLoader for ArchetypeLoader {
}
}

let extension = crate::extension(&filepath);

let mut rows = Vec::new();

if crate::SUPPORTED_IMAGE_EXTENSIONS.contains(&extension.as_str()) {
Expand Down
40 changes: 23 additions & 17 deletions crates/re_data_source/src/data_loader/loader_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl crate::DataLoader for DirectoryLoader {
tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
if dirpath.is_file() {
return Ok(()); // simply not interested
return Err(crate::DataLoaderError::Incompatible(dirpath.clone()));
}

re_tracing::profile_function!(dirpath.display().to_string());
Expand All @@ -43,22 +43,28 @@ impl crate::DataLoader for DirectoryLoader {
let filepath = filepath.to_owned();
let tx = tx.clone();

// NOTE: spawn is fine, this whole function is native-only.
rayon::spawn(move || {
let data = match crate::load_file::load(&store_id, &filepath, false, None) {
Ok(data) => data,
Err(err) => {
re_log::error!(?filepath, %err, "Failed to load directory entry");
return;
}
};
// NOTE(1): `spawn` is fine, this whole function is native-only.
// NOTE(2): this must spawned on a dedicated thread to avoid a deadlock!
// `load` will spawn a bunch of loaders on the common rayon thread pool and wait for
// their response via channels: we cannot be waiting for these responses on the
// common rayon thread pool.
_ = std::thread::Builder::new()
.name(format!("load_dir_entry({filepath:?})"))
.spawn(move || {
let data = match crate::load_file::load(&store_id, &filepath, None) {
Ok(data) => data,
Err(err) => {
re_log::error!(?filepath, %err, "Failed to load directory entry");
return;
}
};

for datum in data {
if tx.send(datum).is_err() {
break;
for datum in data {
if tx.send(datum).is_err() {
break;
}
}
}
});
});
}
}

Expand All @@ -69,11 +75,11 @@ impl crate::DataLoader for DirectoryLoader {
fn load_from_file_contents(
&self,
_store_id: re_log_types::StoreId,
_path: std::path::PathBuf,
path: std::path::PathBuf,
_contents: std::borrow::Cow<'_, [u8]>,
_tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
// TODO(cmc): This could make sense to implement for e.g. archive formats (zip, tar, …)
Ok(()) // simply not interested
Err(crate::DataLoaderError::Incompatible(path))
}
}
59 changes: 52 additions & 7 deletions crates/re_data_source/src/data_loader/loader_external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ use std::io::Read;

use once_cell::sync::Lazy;

// ---

/// To register a new external data loader, simply add an executable in your $PATH whose name
/// starts with this prefix.
pub const EXTERNAL_DATA_LOADER_PREFIX: &str = "rerun-loader-";

/// When an external [`crate::DataLoader`] is asked to load some data that it doesn't know
/// how to load, it should exit with this exit code.
// NOTE: Always keep in sync with other languages.
pub const EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE: i32 = 66;

/// Keeps track of the paths all external executable [`crate::DataLoader`]s.
///
/// Lazy initialized the first time a file is opened by running a full scan of the `$PATH`.
Expand Down Expand Up @@ -78,14 +85,18 @@ impl crate::DataLoader for ExternalLoader {

re_tracing::profile_function!(filepath.display().to_string());

#[derive(PartialEq, Eq)]
struct CompatibleLoaderFound;
let (tx_feedback, rx_feedback) = std::sync::mpsc::channel::<CompatibleLoaderFound>();

for exe in EXTERNAL_LOADER_PATHS.iter() {
let store_id = store_id.clone();
let filepath = filepath.clone();
let tx = tx.clone();
let tx_feedback = tx_feedback.clone();

// NOTE: spawn is fine, the entire loader is native-only.
rayon::spawn(move || {
re_tracing::profile_function!();
re_tracing::profile_function!(exe.to_string_lossy());

let child = Command::new(exe)
.arg(filepath.clone())
Expand Down Expand Up @@ -119,14 +130,28 @@ impl crate::DataLoader for ExternalLoader {
let stdout = std::io::BufReader::new(stdout);
match re_log_encoding::decoder::Decoder::new(version_policy, stdout) {
Ok(decoder) => {
decode_and_stream(&filepath, &tx, decoder);
let filepath = filepath.clone();
let tx = tx.clone();
// NOTE: This is completely IO bound, it must run on a dedicated thread, not the shared
// rayon thread pool.
if let Err(err) = std::thread::Builder::new()
.name(format!("decode_and_stream({filepath:?})"))
.spawn({
let filepath = filepath.clone();
move || {
decode_and_stream(&filepath, &tx, decoder);
}
})
{
re_log::error!(?filepath, loader = ?exe, %err, "Failed to open spawn IO thread");
return;
}
}
Err(re_log_encoding::decoder::DecodeError::Read(_)) => {
// The child was not interested in that file and left without logging
// anything.
// That's fine, we just need to make sure to check its exit status further
// down, still.
return;
}
Err(err) => {
re_log::error!(?filepath, loader = ?exe, %err, "Failed to decode external loader's output");
Expand All @@ -142,29 +167,49 @@ impl crate::DataLoader for ExternalLoader {
}
};

if !status.success() {
// NOTE: We assume that plugins are compatible until proven otherwise.
let is_compatible =
status.code() != Some(crate::EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE);

if is_compatible && !status.success() {
let mut stderr = std::io::BufReader::new(stderr);
let mut reason = String::new();
stderr.read_to_string(&mut reason).ok();
re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
}

if is_compatible {
re_log::debug!(loader = ?exe, ?filepath, "compatible external loader found");
tx_feedback.send(CompatibleLoaderFound).ok();
}
});
}

re_tracing::profile_wait!("compatible_loader");

drop(tx_feedback);

let any_compatible_loader = rx_feedback.recv() == Ok(CompatibleLoaderFound);
if !any_compatible_loader {
// NOTE: The only way to get here is if all loaders closed then sending end of the
// channel without sending anything, i.e. none of them are compatible.
return Err(crate::DataLoaderError::Incompatible(filepath.clone()));
}

Ok(())
}

#[inline]
fn load_from_file_contents(
&self,
_store_id: re_log_types::StoreId,
_path: std::path::PathBuf,
path: std::path::PathBuf,
_contents: std::borrow::Cow<'_, [u8]>,
_tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
// TODO(cmc): You could imagine a world where plugins can be streamed rrd data via their
// standard input… but today is not world.
Ok(()) // simply not interested
Err(crate::DataLoaderError::Incompatible(path))
}
}

Expand Down
18 changes: 15 additions & 3 deletions crates/re_data_source/src/data_loader/loader_rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl crate::DataLoader for RrdLoader {

let extension = crate::extension(&filepath);
if extension != "rrd" {
return Ok(()); // simply not interested
return Err(crate::DataLoaderError::Incompatible(filepath.clone()));
}

re_log::debug!(
Expand All @@ -40,7 +40,17 @@ impl crate::DataLoader for RrdLoader {
let file = std::io::BufReader::new(file);

let decoder = re_log_encoding::decoder::Decoder::new(version_policy, file)?;
decode_and_stream(&filepath, &tx, decoder);

// NOTE: This is IO bound, it must run on a dedicated thread, not the shared rayon thread pool.
std::thread::Builder::new()
.name(format!("decode_and_stream({filepath:?})"))
.spawn({
let filepath = filepath.clone();
move || {
decode_and_stream(&filepath, &tx, decoder);
}
})
.with_context(|| format!("Failed to open spawn IO thread for {filepath:?}"))?;

Ok(())
}
Expand All @@ -57,7 +67,7 @@ impl crate::DataLoader for RrdLoader {

let extension = crate::extension(&filepath);
if extension != "rrd" {
return Ok(()); // simply not interested
return Err(crate::DataLoaderError::Incompatible(filepath));
}

let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
Expand All @@ -71,7 +81,9 @@ impl crate::DataLoader for RrdLoader {
_ => return Err(err.into()),
},
};

decode_and_stream(&filepath, &tx, decoder);

Ok(())
}
}
Expand Down
17 changes: 14 additions & 3 deletions crates/re_data_source/src/data_loader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ use re_log_types::{ArrowMsg, DataRow, LogMsg};
/// - [`DirectoryLoader`] for recursively loading folders.
/// - [`ExternalLoader`], which looks for user-defined data loaders in $PATH.
///
/// ## Registering custom loaders
///
/// TODO(cmc): web guide in upcoming PR
///
/// ## Execution
///
/// **All** registered [`DataLoader`]s get called when a user tries to open a file, unconditionally.
Expand Down Expand Up @@ -131,6 +135,9 @@ pub enum DataLoaderError {
#[error(transparent)]
Decode(#[from] re_log_encoding::decoder::DecodeError),

#[error("No data-loader support for {0:?}")]
Incompatible(std::path::PathBuf),

#[error(transparent)]
Other(#[from] anyhow::Error),
}
Expand All @@ -144,6 +151,11 @@ impl DataLoaderError {
_ => false,
}
}

#[inline]
pub fn is_incompatible(&self) -> bool {
matches!(self, Self::Incompatible { .. })
}
}

/// What [`DataLoader`]s load.
Expand Down Expand Up @@ -234,9 +246,8 @@ pub use self::loader_archetype::ArchetypeLoader;
pub use self::loader_directory::DirectoryLoader;
pub use self::loader_rrd::RrdLoader;

#[cfg(not(target_arch = "wasm32"))]
pub(crate) use self::loader_external::EXTERNAL_LOADER_PATHS;
#[cfg(not(target_arch = "wasm32"))]
pub use self::loader_external::{
iter_external_loaders, ExternalLoader, EXTERNAL_DATA_LOADER_PREFIX,
iter_external_loaders, ExternalLoader, EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE,
EXTERNAL_DATA_LOADER_PREFIX,
};
6 changes: 5 additions & 1 deletion crates/re_data_source/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ pub use self::load_file::{extension, load_from_file_contents};
pub use self::web_sockets::connect_to_ws_url;

#[cfg(not(target_arch = "wasm32"))]
pub use self::data_loader::{iter_external_loaders, ExternalLoader};
pub use self::data_loader::{
iter_external_loaders, ExternalLoader, EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE,
EXTERNAL_DATA_LOADER_PREFIX,
};

#[cfg(not(target_arch = "wasm32"))]
pub use self::load_file::load_from_path;
Expand Down Expand Up @@ -63,6 +66,7 @@ pub fn supported_extensions() -> impl Iterator<Item = &'static str> {
.iter()
.chain(SUPPORTED_IMAGE_EXTENSIONS)
.chain(SUPPORTED_MESH_EXTENSIONS)
.chain(SUPPORTED_POINT_CLOUD_EXTENSIONS)
.chain(SUPPORTED_TEXT_EXTENSIONS)
.copied()
}
Expand Down
Loading
Loading