Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 18 additions & 17 deletions rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use self::write::FragmentCreateBuilder;
use super::hash_joiner::HashJoiner;
use super::rowids::load_row_id_sequence;
use super::scanner::Scanner;
use super::statistics::FieldStatistics;

use super::updater::Updater;
use super::{schema_evolution, NewColumnTransform, WriteParams};
use crate::dataset::fragment::session::FragmentSession;
Expand Down Expand Up @@ -113,8 +113,8 @@ pub trait GenericFileReader: std::fmt::Debug + Send + Sync {
/// Schema of the reader
fn projection(&self) -> &Arc<Schema>;

/// Update storage statistics (ignored by v1 reader)
fn update_storage_stats(&self, field_stats: &mut HashMap<u32, FieldStatistics>);
/// Get storage statistics for this file (ignored by v1 reader)
fn storage_stats(&self) -> Vec<(u32, u64)>;

// Helper functions to fallback to the legacy implementation while we
// slowly migrate functionality over to the generic reader
Expand Down Expand Up @@ -271,8 +271,9 @@ impl GenericFileReader for V1Reader {
self.reader.len() as u32
}

fn update_storage_stats(&self, _field_stats: &mut HashMap<u32, FieldStatistics>) {
fn storage_stats(&self) -> Vec<(u32, u64)> {
// No-op for v1 files
Vec::new()
}

fn clone_box(&self) -> Box<dyn GenericFileReader> {
Expand Down Expand Up @@ -442,27 +443,25 @@ mod v2_adapter {
.boxed())
}

fn update_storage_stats(&self, field_stats: &mut HashMap<u32, FieldStatistics>) {
fn storage_stats(&self) -> Vec<(u32, u64)> {
let file_statistics = self.reader.file_statistics();
let column_idx_to_field_id = self
.field_id_to_column_idx
.iter()
.map(|(field_id, column_idx)| (*column_idx, *field_id))
.collect::<HashMap<_, _>>();

let mut stats = Vec::new();
// Some fields span more than one column. We assume a column that doesn't have an
// entry in the field_id_to_column_idx map is a continuation of the previous field.
let mut current_field_id = 0;
for (column_idx, stats) in file_statistics.columns.iter().enumerate() {
for (column_idx, col_stats) in file_statistics.columns.iter().enumerate() {
if let Some(field_id) = column_idx_to_field_id.get(&(column_idx as u32)) {
current_field_id = *field_id;
}
// If the field_id is not in the map then the field may no longer be part of the
// dataset
if let Some(field_stats) = field_stats.get_mut(&current_field_id) {
field_stats.bytes_on_disk += stats.size_bytes;
}
stats.push((current_field_id, col_stats.size_bytes));
}
stats
}

fn projection(&self) -> &Arc<Schema> {
Expand Down Expand Up @@ -571,8 +570,9 @@ impl GenericFileReader for NullReader {
self.read_ranges_tasks(vec![0..num_rows].into(), batch_size, projection)
}

fn update_storage_stats(&self, _field_stats: &mut HashMap<u32, FieldStatistics>) {
fn storage_stats(&self) -> Vec<(u32, u64)> {
// No-op for null reader
Vec::new()
}

fn projection(&self) -> &Arc<Schema> {
Expand Down Expand Up @@ -790,22 +790,23 @@ impl FileFragment {
}
}

pub(crate) async fn update_storage_stats(
/// Returns storage stats as `(field_id, bytes_on_disk)` pairs for this fragment.
pub(crate) async fn storage_stats(
&self,
field_stats: &mut HashMap<u32, FieldStatistics>,
dataset_schema: &Schema,
scan_scheduler: Arc<ScanScheduler>,
) -> Result<()> {
) -> Result<Vec<(u32, u64)>> {
let mut stats = Vec::new();
for reader in self
.open_readers(
dataset_schema,
&FragReadConfig::default().with_scan_scheduler(scan_scheduler),
)
.await?
{
reader.update_storage_stats(field_stats);
stats.extend(reader.storage_stats());
}
Ok(())
Ok(stats)
}

pub fn dataset(&self) -> &Dataset {
Expand Down
27 changes: 21 additions & 6 deletions rust/lance/src/dataset/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use std::{collections::HashMap, future::Future, sync::Arc};

use futures::{StreamExt, TryStreamExt};
use lance_core::Result;
use lance_io::scheduler::{ScanScheduler, SchedulerConfig};

Expand Down Expand Up @@ -51,12 +52,26 @@ impl DatasetStatisticsExt for Dataset {
self.object_store.clone(),
SchedulerConfig::max_bandwidth(self.object_store.as_ref()),
);
for fragment in self.fragments().as_ref() {
let file_fragment = FileFragment::new(self.clone(), fragment.clone());
file_fragment
.update_storage_stats(&mut field_stats, self.schema(), scan_scheduler.clone())
.await?;
}
let schema = self.schema().clone();
let dataset = self.clone();
let fragments = self.fragments().as_ref().clone();
futures::stream::iter(fragments)
.map(|fragment| {
let file_fragment = FileFragment::new(dataset.clone(), fragment);
let schema = schema.clone();
let scan_scheduler = scan_scheduler.clone();
async move { file_fragment.storage_stats(&schema, scan_scheduler).await }
})
.buffer_unordered(self.object_store.io_parallelism())
.try_for_each(|fragment_stats| {
for (field_id, bytes) in fragment_stats {
if let Some(stats) = field_stats.get_mut(&field_id) {
stats.bytes_on_disk += bytes;
}
}
futures::future::ready(Ok(()))
})
.await?;
}
let field_stats = field_ids
.into_iter()
Expand Down
Loading