Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce clone of Statistics in ListingTable and PartitionedFile #11802

Merged
merged 12 commits into from
Aug 6, 2024
21 changes: 17 additions & 4 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Helper functions for the table implementation

use std::collections::HashMap;
use std::mem;
use std::sync::Arc;

use super::PartitionedFile;
Expand Down Expand Up @@ -138,10 +139,22 @@ pub fn split_files(

// effectively this is div with rounding up instead of truncating
let chunk_size = (partitioned_files.len() + n - 1) / n;
partitioned_files
.chunks(chunk_size)
.map(|c| c.to_vec())
.collect()
let mut chunks = Vec::with_capacity(n);
let mut current_chunk = Vec::with_capacity(chunk_size);
for file in partitioned_files.drain(..) {
current_chunk.push(file);
if current_chunk.len() == chunk_size {
let full_chunk =
mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size));
chunks.push(full_chunk);
}
}

if !current_chunk.is_empty() {
chunks.push(current_chunk)
}

chunks
}

struct Partition {
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub struct PartitionedFile {
/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
}

impl PartitionedFile {
/// Create a simple file without metadata or partition
pub fn new(path: impl Into<String>, size: u64) -> Self {
Expand Down
26 changes: 14 additions & 12 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -973,15 +973,16 @@ impl ListingTable {
// collect the statistics if required by the config
let files = file_list
.map(|part_file| async {
let mut part_file = part_file?;
let part_file = part_file?;
if self.options.collect_stat {
let statistics =
self.do_collect_statistics(ctx, &store, &part_file).await?;
part_file.statistics = Some(statistics.clone());
Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)>
Ok((part_file, statistics))
} else {
Ok((part_file, Statistics::new_unknown(&self.file_schema)))
as Result<(PartitionedFile, Statistics)>
Ok((
part_file,
Arc::new(Statistics::new_unknown(&self.file_schema)),
))
}
})
.boxed()
Expand Down Expand Up @@ -1011,12 +1012,12 @@ impl ListingTable {
ctx: &SessionState,
store: &Arc<dyn ObjectStore>,
part_file: &PartitionedFile,
) -> Result<Statistics> {
let statistics_cache = self.collected_statistics.clone();
return match statistics_cache
) -> Result<Arc<Statistics>> {
match self
.collected_statistics
.get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
{
Some(statistics) => Ok(statistics.as_ref().clone()),
Some(statistics) => Ok(statistics.clone()),
None => {
let statistics = self
.options
Expand All @@ -1028,14 +1029,15 @@ impl ListingTable {
&part_file.object_meta,
)
.await?;
statistics_cache.put_with_extra(
let statistics = Arc::new(statistics);
self.collected_statistics.put_with_extra(
&part_file.object_meta.location,
statistics.clone().into(),
statistics.clone(),
&part_file.object_meta,
);
Ok(statistics)
}
};
}
}
}

Expand Down
156 changes: 75 additions & 81 deletions datafusion/core/src/datasource/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use std::mem;
use std::sync::Arc;

use super::listing::PartitionedFile;
use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::error::Result;
Expand All @@ -26,16 +29,14 @@ use datafusion_common::stats::Precision;
use datafusion_common::ScalarValue;

use futures::{Stream, StreamExt};
use itertools::izip;
use itertools::multiunzip;

/// Get all files as well as the file level summary statistics (no statistic for partition columns).
/// If the optional `limit` is provided, includes only sufficient files. Needed to read up to
/// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on
/// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive
/// call to `multiunzip` for constructing file level summary statistics.
pub async fn get_statistics_with_limit(
all_files: impl Stream<Item = Result<(PartitionedFile, Statistics)>>,
all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
file_schema: SchemaRef,
limit: Option<usize>,
collect_stats: bool,
Expand All @@ -48,26 +49,27 @@ pub async fn get_statistics_with_limit(
// - zero for summations, and
// - neutral element for extreme points.
let size = file_schema.fields().len();
let mut null_counts: Vec<Precision<usize>> = vec![Precision::Absent; size];
let mut max_values: Vec<Precision<ScalarValue>> = vec![Precision::Absent; size];
let mut min_values: Vec<Precision<ScalarValue>> = vec![Precision::Absent; size];
let mut col_stats_set = vec![ColumnStatistics::default(); size];
let mut num_rows = Precision::<usize>::Absent;
let mut total_byte_size = Precision::<usize>::Absent;

// Fusing the stream allows us to call next safely even once it is finished.
let mut all_files = Box::pin(all_files.fuse());

if let Some(first_file) = all_files.next().await {
let (file, file_stats) = first_file?;
let (mut file, file_stats) = first_file?;
file.statistics = Some(file_stats.as_ref().clone());
result_files.push(file);

// First file, we set them directly from the file statistics.
num_rows = file_stats.num_rows;
total_byte_size = file_stats.total_byte_size;
for (index, file_column) in file_stats.column_statistics.into_iter().enumerate() {
null_counts[index] = file_column.null_count;
max_values[index] = file_column.max_value;
min_values[index] = file_column.min_value;
num_rows = file_stats.num_rows.clone();
total_byte_size = file_stats.total_byte_size.clone();
for (index, file_column) in
file_stats.column_statistics.clone().into_iter().enumerate()
{
col_stats_set[index].null_count = file_column.null_count;
col_stats_set[index].max_value = file_column.max_value;
col_stats_set[index].min_value = file_column.min_value;
}

// If the number of rows exceeds the limit, we can stop processing
Expand All @@ -80,7 +82,8 @@ pub async fn get_statistics_with_limit(
};
if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
while let Some(current) = all_files.next().await {
let (file, file_stats) = current?;
let (mut file, file_stats) = current?;
file.statistics = Some(file_stats.as_ref().clone());
result_files.push(file);
if !collect_stats {
continue;
Expand All @@ -90,38 +93,28 @@ pub async fn get_statistics_with_limit(
// counts across all the files in question. If any file does not
// provide any information or provides an inexact value, we demote
// the statistic precision to inexact.
num_rows = add_row_stats(file_stats.num_rows, num_rows);
num_rows = add_row_stats(file_stats.num_rows.clone(), num_rows);

total_byte_size =
add_row_stats(file_stats.total_byte_size, total_byte_size);
add_row_stats(file_stats.total_byte_size.clone(), total_byte_size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I double checked that stats here are Precision<usize> (and thus this clone is not a performance problem)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also made a small experiment to see if an alternate formulation where it might be clearer that the copy is not occuring (last commit in #11828)


(null_counts, max_values, min_values) = multiunzip(
izip!(
file_stats.column_statistics.into_iter(),
null_counts.into_iter(),
max_values.into_iter(),
min_values.into_iter()
)
.map(
|(
ColumnStatistics {
null_count: file_nc,
max_value: file_max,
min_value: file_min,
distinct_count: _,
},
null_count,
max_value,
min_value,
)| {
(
add_row_stats(file_nc, null_count),
set_max_if_greater(file_max, max_value),
set_min_if_lesser(file_min, min_value),
)
},
),
);
for (file_col_stats, col_stats) in file_stats
.column_statistics
.iter()
.zip(col_stats_set.iter_mut())
{
let ColumnStatistics {
null_count: file_nc,
max_value: file_max,
min_value: file_min,
distinct_count: _,
} = file_col_stats;

col_stats.null_count =
add_row_stats(file_nc.clone(), col_stats.null_count.clone());
set_max_if_greater(file_max, &mut col_stats.max_value);
set_min_if_lesser(file_min, &mut col_stats.min_value)
}

// If the number of rows exceeds the limit, we can stop processing
// files. This only applies when we know the number of rows. It also
Expand All @@ -139,7 +132,7 @@ pub async fn get_statistics_with_limit(
let mut statistics = Statistics {
num_rows,
total_byte_size,
column_statistics: get_col_stats_vec(null_counts, max_values, min_values),
column_statistics: col_stats_set,
};
if all_files.next().await.is_some() {
// If we still have files in the stream, it means that the limit kicked
Expand Down Expand Up @@ -182,21 +175,6 @@ fn add_row_stats(
}
}

pub(crate) fn get_col_stats_vec(
null_counts: Vec<Precision<usize>>,
max_values: Vec<Precision<ScalarValue>>,
min_values: Vec<Precision<ScalarValue>>,
) -> Vec<ColumnStatistics> {
izip!(null_counts, max_values, min_values)
.map(|(null_count, max_value, min_value)| ColumnStatistics {
null_count,
max_value,
min_value,
distinct_count: Precision::Absent,
})
.collect()
}

pub(crate) fn get_col_stats(
schema: &Schema,
null_counts: Vec<Precision<usize>>,
Expand Down Expand Up @@ -238,45 +216,61 @@ fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType {
/// If the given value is numerically greater than the original maximum value,
/// return the new maximum value with appropriate exactness information.
fn set_max_if_greater(
max_nominee: Precision<ScalarValue>,
max_values: Precision<ScalarValue>,
) -> Precision<ScalarValue> {
match (&max_values, &max_nominee) {
(Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => max_nominee,
max_nominee: &Precision<ScalarValue>,
max_value: &mut Precision<ScalarValue>,
) {
match (&max_value, max_nominee) {
(Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => {
*max_value = max_nominee.clone();
}
(Precision::Exact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Exact(val2))
if val1 < val2 =>
{
max_nominee.to_inexact()
*max_value = max_nominee.clone().to_inexact();
}
(Precision::Exact(_), Precision::Absent) => {
let exact_max = mem::take(max_value);
*max_value = exact_max.to_inexact();
}
(Precision::Absent, Precision::Exact(_)) => {
*max_value = max_nominee.clone().to_inexact();
}
(Precision::Absent, Precision::Inexact(_)) => {
*max_value = max_nominee.clone();
}
(Precision::Exact(_), Precision::Absent) => max_values.to_inexact(),
(Precision::Absent, Precision::Exact(_)) => max_nominee.to_inexact(),
(Precision::Absent, Precision::Inexact(_)) => max_nominee,
(Precision::Absent, Precision::Absent) => Precision::Absent,
_ => max_values,
_ => {}
}
}

/// If the given value is numerically lesser than the original minimum value,
/// return the new minimum value with appropriate exactness information.
fn set_min_if_lesser(
min_nominee: Precision<ScalarValue>,
min_values: Precision<ScalarValue>,
) -> Precision<ScalarValue> {
match (&min_values, &min_nominee) {
(Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => min_nominee,
min_nominee: &Precision<ScalarValue>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 to reduce this copy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 to reduce this copy

I think the alternative may be that we refactor the clone expensive scalars to the clone cheap impl (like String to Arc<str>)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would also be something interesting to pursue 💯

min_value: &mut Precision<ScalarValue>,
) {
match (&min_value, min_nominee) {
(Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => {
*min_value = min_nominee.clone();
}
(Precision::Exact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Exact(val2))
if val1 > val2 =>
{
min_nominee.to_inexact()
*min_value = min_nominee.clone().to_inexact();
}
(Precision::Exact(_), Precision::Absent) => {
let exact_min = mem::take(min_value);
*min_value = exact_min.to_inexact();
}
(Precision::Absent, Precision::Exact(_)) => {
*min_value = min_nominee.clone().to_inexact();
}
(Precision::Absent, Precision::Inexact(_)) => {
*min_value = min_nominee.clone();
}
(Precision::Exact(_), Precision::Absent) => min_values.to_inexact(),
(Precision::Absent, Precision::Exact(_)) => min_nominee.to_inexact(),
(Precision::Absent, Precision::Inexact(_)) => min_nominee,
(Precision::Absent, Precision::Absent) => Precision::Absent,
_ => min_values,
_ => {}
}
}