diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 65e87f9821a8..09ac899aa80e 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -483,71 +483,15 @@ impl ObjectStore for LocalFileSystem { } fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { - let config = Arc::clone(&self.config); - - let root_path = match prefix { - Some(prefix) => match config.prefix_to_filesystem(prefix) { - Ok(path) => path, - Err(e) => return futures::future::ready(Err(e)).into_stream().boxed(), - }, - None => self.config.root.to_file_path().unwrap(), - }; - - let walkdir = WalkDir::new(root_path) - // Don't include the root directory itself - .min_depth(1) - .follow_links(true); - - let s = walkdir.into_iter().flat_map(move |result_dir_entry| { - let entry = match convert_walkdir_result(result_dir_entry).transpose()? { - Ok(entry) => entry, - Err(e) => return Some(Err(e)), - }; - - if !entry.path().is_file() { - return None; - } - - match config.filesystem_to_path(entry.path()) { - Ok(path) => match is_valid_file_path(&path) { - true => convert_entry(entry, path).transpose(), - false => None, - }, - Err(e) => Some(Err(e)), - } - }); - - // If no tokio context, return iterator directly as no - // need to perform chunked spawn_blocking reads - if tokio::runtime::Handle::try_current().is_err() { - return futures::stream::iter(s).boxed(); - } - - // Otherwise list in batches of CHUNK_SIZE - const CHUNK_SIZE: usize = 1024; - - let buffer = VecDeque::with_capacity(CHUNK_SIZE); - futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async move { - if buffer.is_empty() { - (s, buffer) = tokio::task::spawn_blocking(move || { - for _ in 0..CHUNK_SIZE { - match s.next() { - Some(r) => buffer.push_back(r), - None => break, - } - } - (s, buffer) - }) - .await?; - } + self.list_with_maybe_offset(prefix, None) + } - match buffer.pop_front() { - Some(Err(e)) => Err(e), - Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))), - None => Ok(None), - } - }) - .boxed() + fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> BoxStream<'static, Result> { + self.list_with_maybe_offset(prefix, Some(offset)) } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { @@ -678,6 +622,93 @@ impl ObjectStore for LocalFileSystem { } } +impl LocalFileSystem { + fn list_with_maybe_offset( + &self, + prefix: Option<&Path>, + maybe_offset: Option<&Path>, + ) -> BoxStream<'static, Result> { + let config = Arc::clone(&self.config); + + let root_path = match prefix { + Some(prefix) => match config.prefix_to_filesystem(prefix) { + Ok(path) => path, + Err(e) => return futures::future::ready(Err(e)).into_stream().boxed(), + }, + None => config.root.to_file_path().unwrap(), + }; + + let walkdir = WalkDir::new(root_path) + // Don't include the root directory itself + .min_depth(1) + .follow_links(true); + + let maybe_offset = maybe_offset.cloned(); + + let s = walkdir.into_iter().flat_map(move |result_dir_entry| { + // Apply offset filter before proceeding, to reduce statx file system calls + // This matters for NFS mounts + if let (Some(offset), Ok(entry)) = (maybe_offset.as_ref(), result_dir_entry.as_ref()) { + let location = config.filesystem_to_path(entry.path()); + match location { + Ok(path) if path <= *offset => return None, + Err(e) => return Some(Err(e)), + _ => {} + } + } + + let entry = match convert_walkdir_result(result_dir_entry).transpose()? { + Ok(entry) => entry, + Err(e) => return Some(Err(e)), + }; + + if !entry.path().is_file() { + return None; + } + + match config.filesystem_to_path(entry.path()) { + Ok(path) => match is_valid_file_path(&path) { + true => convert_entry(entry, path).transpose(), + false => None, + }, + Err(e) => Some(Err(e)), + } + }); + + // If no tokio context, return iterator directly as no + // need to perform chunked spawn_blocking reads + if tokio::runtime::Handle::try_current().is_err() { + return futures::stream::iter(s).boxed(); + } + + // Otherwise list in batches of CHUNK_SIZE + const CHUNK_SIZE: usize = 1024; + + let buffer = VecDeque::with_capacity(CHUNK_SIZE); + futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async move { + if buffer.is_empty() { + (s, buffer) = tokio::task::spawn_blocking(move || { + for _ in 0..CHUNK_SIZE { + match s.next() { + Some(r) => buffer.push_back(r), + None => break, + } + } + (s, buffer) + }) + .await?; + } + + match buffer.pop_front() { + Some(Err(e)) => Err(e), + Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))), + None => Ok(None), + } + }) + .boxed() + } +} + /// Creates the parent directories of `path` or returns an error based on `source` if no parent fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> { let parent = path.parent().ok_or_else(|| { @@ -1401,6 +1432,66 @@ mod tests { ); } + #[tokio::test] + async fn test_path_with_offset() { + let root = TempDir::new().unwrap(); + let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); + + let root_path = root.path(); + for i in 0..5 { + let filename = format!("test{}.parquet", i); + let file = root_path.join(filename); + std::fs::write(file, "test").unwrap(); + } + let filter_str = "test"; + let filter = String::from(filter_str); + let offset_str = filter + "1"; + let offset = Path::from(offset_str.clone()); + + // Use list_with_offset to retrieve files + let res = integration.list_with_offset(None, &offset); + let offset_paths: Vec<_> = res.map_ok(|x| x.location).try_collect().await.unwrap(); + let mut offset_files: Vec<_> = offset_paths + .iter() + .map(|x| String::from(x.filename().unwrap())) + .collect(); + + // Check result with direct filesystem read + let files = fs::read_dir(root_path).unwrap(); + let filtered_files = files + .filter_map(Result::ok) + .filter_map(|d| { + d.file_name().to_str().and_then(|f| { + if f.contains(filter_str) { + Some(String::from(f)) + } else { + None + } + }) + }) + .collect::>(); + + let mut expected_offset_files: Vec<_> = filtered_files + .iter() + .filter(|s| **s > offset_str) + .cloned() + .collect(); + + fn do_vecs_match(a: &[T], b: &[T]) -> bool { + let matching = a.iter().zip(b.iter()).filter(|&(a, b)| a == b).count(); + matching == a.len() && matching == b.len() + } + + offset_files.sort(); + expected_offset_files.sort(); + + // println!("Expected Offset Files: {:?}", expected_offset_files); + // println!("Actual Offset Files: {:?}", offset_files); + + assert_eq!(offset_files.len(), expected_offset_files.len()); + assert!(do_vecs_match(&expected_offset_files, &offset_files)); + } + #[tokio::test] async fn filesystem_filename_with_percent() { let temp_dir = TempDir::new().unwrap();