Skip to content

Commit

Permalink
DataLoaders 6: first-class support for NotSupported (#4565)
Browse files Browse the repository at this point in the history
Introduce a first-class protocol for `DataLoader`s -- whether they are
builtin, custom, or external -- to announce that they don't support
loading a given piece of data.

This fixes one of the oldest issue when loading files in Rerun: loading
unsupported data would always bottom out in either the image or
websocket paths, leading to unrelated errors very confusing to users.

The loading process is now a two-pass process:
1. Dispatch the data to be loaded to all loaders, which will respond
ASAP whether they can do it or not.
1.1. If at least one compatible loader is found, go to 2
1.2. Otherwise, fail early with a nice error message for the end user
2. Dispatch the actual data loading.

This has important/subtle ramifications on the threading model, but
other than that is what you'd expect.


Checks:
- [x] Native: CLI examples/assets/*
- [x] Native: CLI examples/assets
- [x] Native: CLI examples/assets/* containing unsupported files
- [x] Native: CLI examples/assets containing unsupported files
- [x] Native: File>Open examples/assets/*
- [x] Native: File>Open examples/assets
- [x] Native: File>Open examples/assets/* containing unsupported files
- [x] Native: File>Open examples/assets containing unsupported files
- [x] Native: Drag-n-drop examples/assets/*
- [x] Native: Drag-n-drop examples/assets
- [x] Native: Drag-n-drop examples/assets/* containing unsupported files
- [x] Native: Drag-n-drop examples/assets containing unsupported files
- [x] Web: File>Open examples/assets/*
- [x] Web: Drag-n-drop examples/assets/*
- [x] Web: File>Open examples/assets/* containing unsupported files
- [x] Web: Drag-n-drop examples/assets/* containing unsupported files



---

Part of a series of PRs to make it possible to load _any_ file from the
local filesystem, by any means, on web and native:
- #4516
- #4517 
- #4518 
- #4519 
- #4520 
- #4521 
- #4565
- #4566
- #4567
  • Loading branch information
teh-cmc authored Dec 19, 2023
1 parent d7b5867 commit b7f7a55
Show file tree
Hide file tree
Showing 15 changed files with 256 additions and 101 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions crates/re_data_source/src/data_loader/loader_archetype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl DataLoader for ArchetypeLoader {
use anyhow::Context as _;

if filepath.is_dir() {
return Ok(()); // simply not interested
return Err(crate::DataLoaderError::Incompatible(filepath.clone()));
}

re_tracing::profile_function!(filepath.display().to_string());
Expand All @@ -45,6 +45,11 @@ impl DataLoader for ArchetypeLoader {
contents: std::borrow::Cow<'_, [u8]>,
tx: std::sync::mpsc::Sender<LoadedData>,
) -> Result<(), crate::DataLoaderError> {
let extension = crate::extension(&filepath);
if !crate::is_supported_file_extension(&extension) {
return Err(crate::DataLoaderError::Incompatible(filepath.clone()));
}

re_tracing::profile_function!(filepath.display().to_string());

let entity_path = EntityPath::from_file_path(&filepath);
Expand Down Expand Up @@ -76,8 +81,6 @@ impl DataLoader for ArchetypeLoader {
}
}

let extension = crate::extension(&filepath);

let mut rows = Vec::new();

if crate::SUPPORTED_IMAGE_EXTENSIONS.contains(&extension.as_str()) {
Expand Down
40 changes: 23 additions & 17 deletions crates/re_data_source/src/data_loader/loader_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl crate::DataLoader for DirectoryLoader {
tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
if dirpath.is_file() {
return Ok(()); // simply not interested
return Err(crate::DataLoaderError::Incompatible(dirpath.clone()));
}

re_tracing::profile_function!(dirpath.display().to_string());
Expand All @@ -43,22 +43,28 @@ impl crate::DataLoader for DirectoryLoader {
let filepath = filepath.to_owned();
let tx = tx.clone();

// NOTE: spawn is fine, this whole function is native-only.
rayon::spawn(move || {
let data = match crate::load_file::load(&store_id, &filepath, false, None) {
Ok(data) => data,
Err(err) => {
re_log::error!(?filepath, %err, "Failed to load directory entry");
return;
}
};
// NOTE(1): `spawn` is fine, this whole function is native-only.
// NOTE(2): this must spawned on a dedicated thread to avoid a deadlock!
// `load` will spawn a bunch of loaders on the common rayon thread pool and wait for
// their response via channels: we cannot be waiting for these responses on the
// common rayon thread pool.
_ = std::thread::Builder::new()
.name(format!("load_dir_entry({filepath:?})"))
.spawn(move || {
let data = match crate::load_file::load(&store_id, &filepath, None) {
Ok(data) => data,
Err(err) => {
re_log::error!(?filepath, %err, "Failed to load directory entry");
return;
}
};

for datum in data {
if tx.send(datum).is_err() {
break;
for datum in data {
if tx.send(datum).is_err() {
break;
}
}
}
});
});
}
}

Expand All @@ -69,11 +75,11 @@ impl crate::DataLoader for DirectoryLoader {
fn load_from_file_contents(
&self,
_store_id: re_log_types::StoreId,
_path: std::path::PathBuf,
path: std::path::PathBuf,
_contents: std::borrow::Cow<'_, [u8]>,
_tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
// TODO(cmc): This could make sense to implement for e.g. archive formats (zip, tar, …)
Ok(()) // simply not interested
Err(crate::DataLoaderError::Incompatible(path))
}
}
59 changes: 52 additions & 7 deletions crates/re_data_source/src/data_loader/loader_external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ use std::io::Read;

use once_cell::sync::Lazy;

// ---

/// To register a new external data loader, simply add an executable in your $PATH whose name
/// starts with this prefix.
pub const EXTERNAL_DATA_LOADER_PREFIX: &str = "rerun-loader-";

/// When an external [`crate::DataLoader`] is asked to load some data that it doesn't know
/// how to load, it should exit with this exit code.
// NOTE: Always keep in sync with other languages.
pub const EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE: i32 = 66;

/// Keeps track of the paths all external executable [`crate::DataLoader`]s.
///
/// Lazy initialized the first time a file is opened by running a full scan of the `$PATH`.
Expand Down Expand Up @@ -78,14 +85,18 @@ impl crate::DataLoader for ExternalLoader {

re_tracing::profile_function!(filepath.display().to_string());

#[derive(PartialEq, Eq)]
struct CompatibleLoaderFound;
let (tx_feedback, rx_feedback) = std::sync::mpsc::channel::<CompatibleLoaderFound>();

for exe in EXTERNAL_LOADER_PATHS.iter() {
let store_id = store_id.clone();
let filepath = filepath.clone();
let tx = tx.clone();
let tx_feedback = tx_feedback.clone();

// NOTE: spawn is fine, the entire loader is native-only.
rayon::spawn(move || {
re_tracing::profile_function!();
re_tracing::profile_function!(exe.to_string_lossy());

let child = Command::new(exe)
.arg(filepath.clone())
Expand Down Expand Up @@ -119,14 +130,28 @@ impl crate::DataLoader for ExternalLoader {
let stdout = std::io::BufReader::new(stdout);
match re_log_encoding::decoder::Decoder::new(version_policy, stdout) {
Ok(decoder) => {
decode_and_stream(&filepath, &tx, decoder);
let filepath = filepath.clone();
let tx = tx.clone();
// NOTE: This is completely IO bound, it must run on a dedicated thread, not the shared
// rayon thread pool.
if let Err(err) = std::thread::Builder::new()
.name(format!("decode_and_stream({filepath:?})"))
.spawn({
let filepath = filepath.clone();
move || {
decode_and_stream(&filepath, &tx, decoder);
}
})
{
re_log::error!(?filepath, loader = ?exe, %err, "Failed to open spawn IO thread");
return;
}
}
Err(re_log_encoding::decoder::DecodeError::Read(_)) => {
// The child was not interested in that file and left without logging
// anything.
// That's fine, we just need to make sure to check its exit status further
// down, still.
return;
}
Err(err) => {
re_log::error!(?filepath, loader = ?exe, %err, "Failed to decode external loader's output");
Expand All @@ -142,29 +167,49 @@ impl crate::DataLoader for ExternalLoader {
}
};

if !status.success() {
// NOTE: We assume that plugins are compatible until proven otherwise.
let is_compatible =
status.code() != Some(crate::EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE);

if is_compatible && !status.success() {
let mut stderr = std::io::BufReader::new(stderr);
let mut reason = String::new();
stderr.read_to_string(&mut reason).ok();
re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
}

if is_compatible {
re_log::debug!(loader = ?exe, ?filepath, "compatible external loader found");
tx_feedback.send(CompatibleLoaderFound).ok();
}
});
}

re_tracing::profile_wait!("compatible_loader");

drop(tx_feedback);

let any_compatible_loader = rx_feedback.recv() == Ok(CompatibleLoaderFound);
if !any_compatible_loader {
// NOTE: The only way to get here is if all loaders closed then sending end of the
// channel without sending anything, i.e. none of them are compatible.
return Err(crate::DataLoaderError::Incompatible(filepath.clone()));
}

Ok(())
}

#[inline]
fn load_from_file_contents(
&self,
_store_id: re_log_types::StoreId,
_path: std::path::PathBuf,
path: std::path::PathBuf,
_contents: std::borrow::Cow<'_, [u8]>,
_tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
// TODO(cmc): You could imagine a world where plugins can be streamed rrd data via their
// standard input… but today is not world.
Ok(()) // simply not interested
Err(crate::DataLoaderError::Incompatible(path))
}
}

Expand Down
18 changes: 15 additions & 3 deletions crates/re_data_source/src/data_loader/loader_rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl crate::DataLoader for RrdLoader {

let extension = crate::extension(&filepath);
if extension != "rrd" {
return Ok(()); // simply not interested
return Err(crate::DataLoaderError::Incompatible(filepath.clone()));
}

re_log::debug!(
Expand All @@ -40,7 +40,17 @@ impl crate::DataLoader for RrdLoader {
let file = std::io::BufReader::new(file);

let decoder = re_log_encoding::decoder::Decoder::new(version_policy, file)?;
decode_and_stream(&filepath, &tx, decoder);

// NOTE: This is IO bound, it must run on a dedicated thread, not the shared rayon thread pool.
std::thread::Builder::new()
.name(format!("decode_and_stream({filepath:?})"))
.spawn({
let filepath = filepath.clone();
move || {
decode_and_stream(&filepath, &tx, decoder);
}
})
.with_context(|| format!("Failed to open spawn IO thread for {filepath:?}"))?;

Ok(())
}
Expand All @@ -57,7 +67,7 @@ impl crate::DataLoader for RrdLoader {

let extension = crate::extension(&filepath);
if extension != "rrd" {
return Ok(()); // simply not interested
return Err(crate::DataLoaderError::Incompatible(filepath));
}

let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
Expand All @@ -71,7 +81,9 @@ impl crate::DataLoader for RrdLoader {
_ => return Err(err.into()),
},
};

decode_and_stream(&filepath, &tx, decoder);

Ok(())
}
}
Expand Down
17 changes: 14 additions & 3 deletions crates/re_data_source/src/data_loader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ use re_log_types::{ArrowMsg, DataRow, LogMsg};
/// - [`DirectoryLoader`] for recursively loading folders.
/// - [`ExternalLoader`], which looks for user-defined data loaders in $PATH.
///
/// ## Registering custom loaders
///
/// TODO(cmc): web guide in upcoming PR
///
/// ## Execution
///
/// **All** registered [`DataLoader`]s get called when a user tries to open a file, unconditionally.
Expand Down Expand Up @@ -131,6 +135,9 @@ pub enum DataLoaderError {
#[error(transparent)]
Decode(#[from] re_log_encoding::decoder::DecodeError),

#[error("No data-loader support for {0:?}")]
Incompatible(std::path::PathBuf),

#[error(transparent)]
Other(#[from] anyhow::Error),
}
Expand All @@ -144,6 +151,11 @@ impl DataLoaderError {
_ => false,
}
}

#[inline]
pub fn is_incompatible(&self) -> bool {
matches!(self, Self::Incompatible { .. })
}
}

/// What [`DataLoader`]s load.
Expand Down Expand Up @@ -234,9 +246,8 @@ pub use self::loader_archetype::ArchetypeLoader;
pub use self::loader_directory::DirectoryLoader;
pub use self::loader_rrd::RrdLoader;

#[cfg(not(target_arch = "wasm32"))]
pub(crate) use self::loader_external::EXTERNAL_LOADER_PATHS;
#[cfg(not(target_arch = "wasm32"))]
pub use self::loader_external::{
iter_external_loaders, ExternalLoader, EXTERNAL_DATA_LOADER_PREFIX,
iter_external_loaders, ExternalLoader, EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE,
EXTERNAL_DATA_LOADER_PREFIX,
};
6 changes: 5 additions & 1 deletion crates/re_data_source/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ pub use self::load_file::{extension, load_from_file_contents};
pub use self::web_sockets::connect_to_ws_url;

#[cfg(not(target_arch = "wasm32"))]
pub use self::data_loader::{iter_external_loaders, ExternalLoader};
pub use self::data_loader::{
iter_external_loaders, ExternalLoader, EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE,
EXTERNAL_DATA_LOADER_PREFIX,
};

#[cfg(not(target_arch = "wasm32"))]
pub use self::load_file::load_from_path;
Expand Down Expand Up @@ -63,6 +66,7 @@ pub fn supported_extensions() -> impl Iterator<Item = &'static str> {
.iter()
.chain(SUPPORTED_IMAGE_EXTENSIONS)
.chain(SUPPORTED_MESH_EXTENSIONS)
.chain(SUPPORTED_POINT_CLOUD_EXTENSIONS)
.chain(SUPPORTED_TEXT_EXTENSIONS)
.copied()
}
Expand Down
Loading

0 comments on commit b7f7a55

Please sign in to comment.