Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataLoaders 4: add generic folder DataLoader #4520

Merged
merged 4 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions crates/re_data_source/src/data_loader/loader_directory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/// Recursively oads entire directories, using the appropriate [`crate::DataLoader`]:s for each
/// files within.
//
// TODO(cmc): There are a lot more things than can be done be done when it comes to the semantics
// of a folder, e.g.: HIVE-like partitioning, similarly named files with different indices and/or
// timestamps (e.g. a folder of video frames), etc.
// We could support some of those at some point, or at least add examples to show users how.
pub struct DirectoryLoader;
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved

impl crate::DataLoader for DirectoryLoader {
#[inline]
fn name(&self) -> String {
"rerun.data_loaders.Directory".into()
}

#[cfg(not(target_arch = "wasm32"))]
fn load_from_path(
&self,
store_id: re_log_types::StoreId,
dirpath: std::path::PathBuf,
tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
if dirpath.is_file() {
return Ok(()); // simply not interested
}

re_tracing::profile_function!(dirpath.display().to_string());

re_log::debug!(?dirpath, loader = self.name(), "Loading directory…",);

for entry in walkdir::WalkDir::new(&dirpath) {
let entry = match entry {
Ok(entry) => entry,
Err(err) => {
re_log::error!(loader = self.name(), ?dirpath, %err, "Failed to open filesystem entry");
continue;
}
};

let filepath = entry.path();
if filepath.is_file() {
let store_id = store_id.clone();
let filepath = filepath.to_owned();
let tx = tx.clone();

// NOTE: spawn is fine, this whole function is native-only.
rayon::spawn(move || {
let data = match crate::load_file::load(&store_id, &filepath, false, None) {
Ok(data) => data,
Err(err) => {
re_log::error!(?filepath, %err, "Failed to load directory entry");
return;
}
};

for datum in data {
if tx.send(datum).is_err() {
break;
}
}
});
}
}

Ok(())
}

#[inline]
fn load_from_file_contents(
&self,
_store_id: re_log_types::StoreId,
_path: std::path::PathBuf,
_contents: std::borrow::Cow<'_, [u8]>,
_tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
// TODO(cmc): This could make sense to implement for e.g. archive formats (zip, tar, …)
Ok(()) // simply not interested
}
}
3 changes: 3 additions & 0 deletions crates/re_data_source/src/data_loader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ static BUILTIN_LOADERS: Lazy<Vec<Arc<dyn DataLoader>>> = Lazy::new(|| {
vec![
Arc::new(RrdLoader) as Arc<dyn DataLoader>,
Arc::new(ArchetypeLoader),
Arc::new(DirectoryLoader),
]
});

Expand All @@ -217,7 +218,9 @@ pub fn iter_loaders() -> impl ExactSizeIterator<Item = Arc<dyn DataLoader>> {
// ---

mod loader_archetype;
mod loader_directory;
mod loader_rrd;

pub use self::loader_archetype::ArchetypeLoader;
pub use self::loader_directory::DirectoryLoader;
pub use self::loader_rrd::RrdLoader;
8 changes: 4 additions & 4 deletions crates/re_data_source/src/load_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ pub fn extension(path: &std::path::Path) -> String {
/// This does _not_ access the filesystem.
#[inline]
pub fn is_associated_with_builtin_loader(path: &std::path::Path, is_dir: bool) -> bool {
!is_dir && crate::is_supported_file_extension(&extension(path))
is_dir || crate::is_supported_file_extension(&extension(path))
}

/// Prepares an adequate [`re_log_types::StoreInfo`] [`LogMsg`] given the input.
fn prepare_store_info(
pub(crate) fn prepare_store_info(
store_id: &re_log_types::StoreId,
file_source: FileSource,
path: &std::path::Path,
Expand Down Expand Up @@ -139,7 +139,7 @@ fn prepare_store_info(
/// - On native, this is filled asynchronously from other threads.
/// - On wasm, this is pre-filled synchronously.
#[cfg_attr(target_arch = "wasm32", allow(clippy::needless_pass_by_value))]
fn load(
pub(crate) fn load(
store_id: &re_log_types::StoreId,
path: &std::path::Path,
is_dir: bool,
Expand Down Expand Up @@ -218,7 +218,7 @@ fn load(
/// Forwards the data in `rx_loader` to `tx`, taking care of necessary conversions, if any.
///
/// Runs asynchronously from another thread on native, synchronously on wasm.
fn send(
pub(crate) fn send(
store_id: &re_log_types::StoreId,
rx_loader: std::sync::mpsc::Receiver<LoadedData>,
tx: &Sender<LogMsg>,
Expand Down
Loading