Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache collected file statistics #3649

Merged
merged 2 commits into from
Oct 3, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 88 additions & 5 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

//! The table implementation.

use ahash::HashMap;
use std::{any::Any, sync::Arc};

use arrow::datatypes::{Field, Schema, SchemaRef};
use async_trait::async_trait;
use futures::{future, stream, StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::ObjectMeta;
use parking_lot::RwLock;

use crate::datasource::{
file_format::{
Expand Down Expand Up @@ -237,6 +241,36 @@ impl ListingOptions {
}
}

/// Collected statistics for files
/// Cache is invalided when file size or last modification has changed
#[derive(Default)]
struct StatisticsCache {
statistics: RwLock<HashMap<Path, (ObjectMeta, Statistics)>>,
}

impl StatisticsCache {
/// Get `Statistics` for file location. Returns None if file has changed or not found.
fn get(&self, meta: &ObjectMeta) -> Option<Statistics> {
let map = self.statistics.read();
let (saved_meta, statistics) = map.get(&meta.location)?;

if saved_meta.size != meta.size || saved_meta.last_modified != meta.last_modified
{
// file has changed
return None;
}

Some(statistics.clone())
}

/// Save collected file statistics
fn save(&self, meta: ObjectMeta, statistics: Statistics) {
self.statistics
.write()
.insert(meta.location.clone(), (meta, statistics));
}
}

/// An implementation of `TableProvider` that uses the object store
/// or file system listing capability to get the list of files.
pub struct ListingTable {
Expand All @@ -247,6 +281,7 @@ pub struct ListingTable {
table_schema: SchemaRef,
options: ListingOptions,
definition: Option<String>,
collected_statistics: StatisticsCache,
}

impl ListingTable {
Expand Down Expand Up @@ -282,6 +317,7 @@ impl ListingTable {
table_schema: Arc::new(Schema::new(table_fields)),
options,
definition: None,
collected_statistics: Default::default(),
};

Ok(table)
Expand Down Expand Up @@ -400,14 +436,26 @@ impl ListingTable {
let file_list = stream::iter(file_list).flatten();

// collect the statistics if required by the config
// TODO: Collect statistics and schema in single-pass
let files = file_list.then(|part_file| async {
let part_file = part_file?;
let statistics = if self.options.collect_stat {
self.options
.format
.infer_stats(&store, self.file_schema.clone(), &part_file.object_meta)
.await?
match self.collected_statistics.get(&part_file.object_meta) {
Some(statistics) => statistics,
None => {
let statistics = self
.options
.format
.infer_stats(
&store,
self.file_schema.clone(),
&part_file.object_meta,
)
.await?;
self.collected_statistics
.save(part_file.object_meta.clone(), statistics.clone());
statistics
}
}
} else {
Statistics::default()
};
Expand All @@ -434,6 +482,7 @@ mod tests {
test::{columns, object_store::register_test_store},
};
use arrow::datatypes::DataType;
use chrono::DateTime;

use super::*;

Expand Down Expand Up @@ -752,4 +801,38 @@ mod tests {

Ok(())
}

#[test]
fn test_statistics_cache() {
let meta = ObjectMeta {
location: Path::from("test"),
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
.unwrap()
.into(),
size: 1024,
};

let cache = StatisticsCache::default();
assert!(cache.get(&meta).is_none());

cache.save(meta.clone(), Statistics::default());
assert!(cache.get(&meta).is_some());

// file size changed
let mut meta2 = meta.clone();
meta2.size = 2048;
assert!(cache.get(&meta2).is_none());

// file last_modified changed
let mut meta2 = meta.clone();
meta2.last_modified = DateTime::parse_from_rfc3339("2022-09-27T22:40:00+02:00")
.unwrap()
.into();
assert!(cache.get(&meta2).is_none());

// different file
let mut meta2 = meta;
meta2.location = Path::from("test2");
assert!(cache.get(&meta2).is_none());
}
}