diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index b43d6acae22..8255262d87e 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -55,7 +55,7 @@ use self::write::FragmentCreateBuilder; use super::hash_joiner::HashJoiner; use super::rowids::load_row_id_sequence; use super::scanner::Scanner; -use super::statistics::FieldStatistics; + use super::updater::Updater; use super::{schema_evolution, NewColumnTransform, WriteParams}; use crate::dataset::fragment::session::FragmentSession; @@ -113,8 +113,8 @@ pub trait GenericFileReader: std::fmt::Debug + Send + Sync { /// Schema of the reader fn projection(&self) -> &Arc; - /// Update storage statistics (ignored by v1 reader) - fn update_storage_stats(&self, field_stats: &mut HashMap); + /// Get storage statistics for this file (ignored by v1 reader) + fn storage_stats(&self) -> Vec<(u32, u64)>; // Helper functions to fallback to the legacy implementation while we // slowly migrate functionality over to the generic reader @@ -271,8 +271,9 @@ impl GenericFileReader for V1Reader { self.reader.len() as u32 } - fn update_storage_stats(&self, _field_stats: &mut HashMap) { + fn storage_stats(&self) -> Vec<(u32, u64)> { // No-op for v1 files + Vec::new() } fn clone_box(&self) -> Box { @@ -442,7 +443,7 @@ mod v2_adapter { .boxed()) } - fn update_storage_stats(&self, field_stats: &mut HashMap) { + fn storage_stats(&self) -> Vec<(u32, u64)> { let file_statistics = self.reader.file_statistics(); let column_idx_to_field_id = self .field_id_to_column_idx @@ -450,19 +451,17 @@ mod v2_adapter { .map(|(field_id, column_idx)| (*column_idx, *field_id)) .collect::>(); + let mut stats = Vec::new(); // Some fields span more than one column. We assume a column that doesn't have an // entry in the field_id_to_column_idx map is a continuation of the previous field. let mut current_field_id = 0; - for (column_idx, stats) in file_statistics.columns.iter().enumerate() { + for (column_idx, col_stats) in file_statistics.columns.iter().enumerate() { if let Some(field_id) = column_idx_to_field_id.get(&(column_idx as u32)) { current_field_id = *field_id; } - // If the field_id is not in the map then the field may no longer be part of the - // dataset - if let Some(field_stats) = field_stats.get_mut(¤t_field_id) { - field_stats.bytes_on_disk += stats.size_bytes; - } + stats.push((current_field_id, col_stats.size_bytes)); } + stats } fn projection(&self) -> &Arc { @@ -571,8 +570,9 @@ impl GenericFileReader for NullReader { self.read_ranges_tasks(vec![0..num_rows].into(), batch_size, projection) } - fn update_storage_stats(&self, _field_stats: &mut HashMap) { + fn storage_stats(&self) -> Vec<(u32, u64)> { // No-op for null reader + Vec::new() } fn projection(&self) -> &Arc { @@ -790,12 +790,13 @@ impl FileFragment { } } - pub(crate) async fn update_storage_stats( + /// Returns storage stats as `(field_id, bytes_on_disk)` pairs for this fragment. + pub(crate) async fn storage_stats( &self, - field_stats: &mut HashMap, dataset_schema: &Schema, scan_scheduler: Arc, - ) -> Result<()> { + ) -> Result> { + let mut stats = Vec::new(); for reader in self .open_readers( dataset_schema, @@ -803,9 +804,9 @@ impl FileFragment { ) .await? { - reader.update_storage_stats(field_stats); + stats.extend(reader.storage_stats()); } - Ok(()) + Ok(stats) } pub fn dataset(&self) -> &Dataset { diff --git a/rust/lance/src/dataset/statistics.rs b/rust/lance/src/dataset/statistics.rs index e2dfa34e353..68b92f0d70b 100644 --- a/rust/lance/src/dataset/statistics.rs +++ b/rust/lance/src/dataset/statistics.rs @@ -5,6 +5,7 @@ use std::{collections::HashMap, future::Future, sync::Arc}; +use futures::{StreamExt, TryStreamExt}; use lance_core::Result; use lance_io::scheduler::{ScanScheduler, SchedulerConfig}; @@ -51,12 +52,26 @@ impl DatasetStatisticsExt for Dataset { self.object_store.clone(), SchedulerConfig::max_bandwidth(self.object_store.as_ref()), ); - for fragment in self.fragments().as_ref() { - let file_fragment = FileFragment::new(self.clone(), fragment.clone()); - file_fragment - .update_storage_stats(&mut field_stats, self.schema(), scan_scheduler.clone()) - .await?; - } + let schema = self.schema().clone(); + let dataset = self.clone(); + let fragments = self.fragments().as_ref().clone(); + futures::stream::iter(fragments) + .map(|fragment| { + let file_fragment = FileFragment::new(dataset.clone(), fragment); + let schema = schema.clone(); + let scan_scheduler = scan_scheduler.clone(); + async move { file_fragment.storage_stats(&schema, scan_scheduler).await } + }) + .buffer_unordered(self.object_store.io_parallelism()) + .try_for_each(|fragment_stats| { + for (field_id, bytes) in fragment_stats { + if let Some(stats) = field_stats.get_mut(&field_id) { + stats.bytes_on_disk += bytes; + } + } + futures::future::ready(Ok(())) + }) + .await?; } let field_stats = field_ids .into_iter()