Skip to content

Commit

Permalink
implement folder loading support
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Dec 14, 2023
1 parent e9ac988 commit f3cff17
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 4 deletions.
75 changes: 75 additions & 0 deletions crates/re_data_source/src/data_loader/loader_directory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// TODO: issue -> HIVE partitiong, timestamp regexes, zip files, that kinda thing

/// Loads entire directories, using the appropriate [`crate::DataLoader`]:s for each files within.
pub struct DirectoryLoader;

impl crate::DataLoader for DirectoryLoader {
#[inline]
fn name(&self) -> String {
"rerun.data_loaders.Directory".into()
}

#[cfg(not(target_arch = "wasm32"))]
fn load_from_file(

Check failure on line 13 in crates/re_data_source/src/data_loader/loader_directory.rs

View workflow job for this annotation

GitHub Actions / Checks / Rust lints (fmt, check, cranky, tests, doc)

method `load_from_file` is not a member of trait `crate::DataLoader`
&self,
store_id: re_log_types::StoreId,
dirpath: std::path::PathBuf,
tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
if dirpath.is_file() {
return Ok(()); // simply not interested
}

re_tracing::profile_function!(dirpath.display().to_string());

re_log::debug!(?dirpath, loader = self.name(), "Loading directory…",);

for entry in walkdir::WalkDir::new(&dirpath) {
let entry = match entry {
Ok(entry) => entry,
Err(err) => {
re_log::error!(loader = self.name(), ?dirpath, %err, "Failed to open filesystem entry");
continue;
}
};

let filepath = entry.path();
if filepath.is_file() {
let store_id = store_id.clone();
let filepath = filepath.to_owned();
let tx = tx.clone();

// NOTE: spawn is fine, this whole function is native-only.
rayon::spawn(move || {
let data = match crate::load_file::load(&store_id, &filepath, false, None) {
Ok(data) => data,
Err(err) => {
re_log::error!(?filepath, %err, "Failed to load directory entry");
return;
}
};

for datum in data {
if tx.send(datum).is_err() {
break;
}
}
});
}
}

Ok(())
}

#[inline]
fn load_from_file_contents(

Check failure on line 65 in crates/re_data_source/src/data_loader/loader_directory.rs

View workflow job for this annotation

GitHub Actions / Checks / Rust lints (fmt, check, cranky, tests, doc)

method `load_from_file_contents` is not a member of trait `crate::DataLoader`
&self,
_store_id: re_log_types::StoreId,
_path: std::path::PathBuf,
_contents: std::borrow::Cow<'_, [u8]>,
_tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
// TODO: zip file supports
Ok(()) // simply not interested
}
}
3 changes: 3 additions & 0 deletions crates/re_data_source/src/data_loader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ static BUILTIN_LOADERS: Lazy<Vec<Arc<dyn DataLoader>>> = Lazy::new(|| {
vec![
Arc::new(RrdLoader) as Arc<dyn DataLoader>,
Arc::new(ArchetypeLoader),
Arc::new(DirectoryLoader),
]
});

Expand All @@ -173,7 +174,9 @@ pub fn iter_loaders() -> impl ExactSizeIterator<Item = Arc<dyn DataLoader>> {
// ---

mod loader_archetype;
mod loader_directory;
mod loader_rrd;

pub use self::loader_archetype::ArchetypeLoader;
pub use self::loader_directory::DirectoryLoader;
pub use self::loader_rrd::RrdLoader;
8 changes: 4 additions & 4 deletions crates/re_data_source/src/load_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ fn extension(path: &std::path::Path) -> String {
/// This does _not_ access the filesystem.
#[inline]
fn is_builtin(path: &std::path::Path, is_dir: bool) -> bool {
!is_dir && crate::is_known_file_extension(&extension(path))
is_dir || crate::is_known_file_extension(&extension(path))
}

/// Prepares an adequate [`re_log_types::StoreInfo`] [`LogMsg`] given the input.
fn prepare_store_info(
pub(crate) fn prepare_store_info(
store_id: &re_log_types::StoreId,
file_source: FileSource,
path: &std::path::Path,
Expand Down Expand Up @@ -131,7 +131,7 @@ fn prepare_store_info(
/// - On native, this is filled asynchronously from other threads.
/// - On wasm, this is pre-filled synchronously.
#[cfg_attr(target_arch = "wasm32", allow(clippy::needless_pass_by_value))]
fn load(
pub(crate) fn load(
store_id: &re_log_types::StoreId,
path: &std::path::Path,
is_dir: bool,
Expand Down Expand Up @@ -210,7 +210,7 @@ fn load(
/// Forwards the data in `rx_loader` to `tx`, taking care of necessary conversions, if any.
///
/// Runs asynchronously from another thread on native, synchronously on wasm.
fn send(
pub(crate) fn send(
store_id: &re_log_types::StoreId,
rx_loader: std::sync::mpsc::Receiver<LoadedData>,
tx: &Sender<LogMsg>,
Expand Down

0 comments on commit f3cff17

Please sign in to comment.