Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
346 changes: 141 additions & 205 deletions Cargo.lock

Large diffs are not rendered by default.

21 changes: 11 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,17 @@ prost-types = "0.14"
# The `axum` version must match the one used in `tonic` (replace `RELEASE` with the release we are using):
# https://github.com/hyperium/tonic/blob/vRELEASE/tonic/Cargo.toml
axum = "0.8.8"
datafusion = { version = "51.0.0", features = ["serde", "avro", "sql"] }
datafusion-common = { version = "51.0.0", features = ["object_store", "avro"] }
datafusion-datasource = { version = "51.0.0" }
datafusion-expr = { version = "51.0.0" }
datafusion-expr-common = { version = "51.0.0" }
datafusion-proto = { version = "51.0.0" }
datafusion-functions = { version = "51.0.0" }
datafusion-functions-nested = { version = "51.0.0" }
datafusion-physical-expr = { version = "51.0.0" }
datafusion-spark = { version = "51.0.0" }
datafusion = { version = "52.1.0", features = ["serde", "avro", "sql"] }
datafusion-common = { version = "52.1.0", features = ["object_store", "avro"] }
datafusion-datasource = { version = "52.1.0" }
datafusion-expr = { version = "52.1.0" }
datafusion-expr-common = { version = "52.1.0" }
datafusion-proto = { version = "52.1.0" }
datafusion-functions = { version = "52.1.0" }
datafusion-functions-nested = { version = "52.1.0" }
datafusion-physical-expr = { version = "52.1.0" }
datafusion-session = { version = "52.1.0" }
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to later change all imports of session from datafusion-catalog to imports from datafusion-session.

datafusion-spark = { version = "52.1.0" }
# The `pyo3` version must match the one used in `arrow-pyarrow` (replace `RELEASE` with the release we are using):
# https://github.com/apache/arrow-rs/blob/RELEASE/arrow-pyarrow/Cargo.toml
pyo3 = { version = "0.26.0", features = ["serde"] }
Expand Down
147 changes: 129 additions & 18 deletions crates/sail-cache/src/file_listing_cache.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
use std::collections::HashMap;
use std::mem::size_of;
use std::sync::Arc;
use std::time::Duration;

use datafusion::execution::cache::CacheAccessor;
use datafusion::common::{Result as DataFusionResult, TableReference};
use datafusion::execution::cache::cache_manager::ListFilesCache;
use datafusion::execution::cache::{CacheAccessor, ListFilesEntry, TableScopedPath};
use log::debug;
use moka::sync::Cache;
use object_store::path::Path;
use object_store::ObjectMeta;

pub struct MokaFileListingCache {
objects: Cache<Path, Arc<Vec<ObjectMeta>>>,
objects: Cache<TableScopedPath, Arc<Vec<ObjectMeta>>>,
ttl: Option<Duration>,
max_entries: Option<u64>,
}

impl MokaFileListingCache {
Expand All @@ -17,9 +23,10 @@ impl MokaFileListingCache {
pub fn new(ttl: Option<u64>, max_entries: Option<u64>) -> Self {
let mut builder = Cache::builder();

let ttl = ttl.map(Duration::from_secs);
if let Some(ttl) = ttl {
debug!("Setting TTL for {} to {ttl} second(s)", Self::NAME);
builder = builder.time_to_live(Duration::from_secs(ttl));
debug!("Setting TTL for {} to {:?} second(s)", Self::NAME, ttl);
Comment thread
linhr marked this conversation as resolved.
builder = builder.time_to_live(ttl);
}
if let Some(max_entries) = max_entries {
debug!(
Expand All @@ -31,40 +38,87 @@ impl MokaFileListingCache {

Self {
objects: builder.build(),
ttl,
max_entries,
}
}
}

impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for MokaFileListingCache {
type Extra = ObjectMeta;
/// Calculates the number of bytes an [`ObjectMeta`] occupies in the heap.
fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize {
let mut size = object_meta.location.as_ref().len();

fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
self.objects.get(k)
if let Some(e) = &object_meta.e_tag {
size += e.len();
}
if let Some(v) = &object_meta.version {
size += v.len();
}

size
}

impl CacheAccessor<TableScopedPath, Arc<Vec<ObjectMeta>>> for MokaFileListingCache {
type Extra = Option<Path>;

fn get(&self, k: &TableScopedPath) -> Option<Arc<Vec<ObjectMeta>>> {
self.get_with_extra(k, &None)
}

fn get_with_extra(
&self,
k: &TableScopedPath,
prefix: &Self::Extra,
) -> Option<Arc<Vec<ObjectMeta>>> {
let objects = self.objects.get(k)?;

let Some(prefix) = prefix else {
return Some(objects);
};

fn get_with_extra(&self, k: &Path, _e: &Self::Extra) -> Option<Arc<Vec<ObjectMeta>>> {
self.get(k)
// Build full prefix: table_base/prefix
let table_base = &k.path;
let mut parts: Vec<_> = table_base.parts().collect();
parts.extend(prefix.parts());
let full_prefix = Path::from_iter(parts);
let full_prefix_str = full_prefix.as_ref();

let filtered = objects
.iter()
.filter(|meta| meta.location.as_ref().starts_with(full_prefix_str))
.cloned()
.collect::<Vec<_>>();

if filtered.is_empty() {
None
} else {
Some(Arc::new(filtered))
}
}

fn put(&self, key: &Path, value: Arc<Vec<ObjectMeta>>) -> Option<Arc<Vec<ObjectMeta>>> {
fn put(
&self,
key: &TableScopedPath,
value: Arc<Vec<ObjectMeta>>,
) -> Option<Arc<Vec<ObjectMeta>>> {
self.objects.insert(key.clone(), value);
None
}

fn put_with_extra(
&self,
key: &Path,
key: &TableScopedPath,
value: Arc<Vec<ObjectMeta>>,
_e: &Self::Extra,
) -> Option<Arc<Vec<ObjectMeta>>> {
self.put(key, value)
}

fn remove(&mut self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
fn remove(&self, k: &TableScopedPath) -> Option<Arc<Vec<ObjectMeta>>> {
self.objects.remove(k)
}

fn contains_key(&self, k: &Path) -> bool {
fn contains_key(&self, k: &TableScopedPath) -> bool {
self.objects.contains_key(k)
}

Expand All @@ -81,11 +135,64 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for MokaFileListingCache {
}
}

impl ListFilesCache for MokaFileListingCache {
fn cache_limit(&self) -> usize {
self.max_entries
.map(|limit| limit as usize)
.unwrap_or(usize::MAX)
}

fn cache_ttl(&self) -> Option<Duration> {
self.ttl
}

fn update_cache_limit(&self, _limit: usize) {
// TODO: support dynamic update of cache limit
}

fn update_cache_ttl(&self, _ttl: Option<Duration>) {
// TODO: support dynamic update of cache ttl
}
Comment on lines +149 to +155
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moka likely does not support this option, and refactoring here to use locks for protection doesn't seem like a good idea either. So I guess we can leave it as a TODO for now.


fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry> {
self.objects
.iter()
.map(|(table_scoped_path, metas)| {
let metas = Arc::clone(&metas);
let size_bytes = (metas.capacity() * size_of::<ObjectMeta>())
+ metas.iter().map(meta_heap_bytes).sum::<usize>();
(
(*table_scoped_path).clone(),
ListFilesEntry {
metas,
size_bytes,
// moka handles expiration; we don't have per-entry expiration time
expires: None,
},
)
})
.collect()
}

fn drop_table_entries(&self, table_ref: &Option<TableReference>) -> DataFusionResult<()> {
let keys_to_drop: Vec<TableScopedPath> = self
.objects
.iter()
.filter_map(|(k, _v)| (k.table == *table_ref).then_some((*k).clone()))
.collect();

for key in keys_to_drop {
self.objects.invalidate(&key);
}

Ok(())
}
}

#[allow(clippy::unwrap_used)]
#[cfg(test)]
mod tests {
use chrono::DateTime;
use object_store::path::Path;
use object_store::ObjectMeta;

use super::*;
Expand All @@ -103,11 +210,15 @@ mod tests {
};

let cache = MokaFileListingCache::new(None, None);
assert!(cache.get(&meta.location).is_none());
let key = TableScopedPath {
table: None,
path: meta.location.clone(),
};
assert!(cache.get(&key).is_none());

cache.put(&meta.location, vec![meta.clone()].into());
cache.put(&key, vec![meta.clone()].into());
assert_eq!(
cache.get(&meta.location).unwrap().first().unwrap().clone(),
cache.get(&key).unwrap().first().unwrap().clone(),
meta.clone()
);
}
Expand Down
4 changes: 2 additions & 2 deletions crates/sail-cache/src/file_metadata_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for MokaFileMetadataCache
self.put(key, value)
}

fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
fn remove(&self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
self.metadata
.remove(&k.location)
.map(|(_, metadata)| metadata)
Expand Down Expand Up @@ -187,7 +187,7 @@ mod tests {
metadata: "retrieved_metadata".to_owned(),
});

let mut cache = MokaFileMetadataCache::new(None, None);
let cache = MokaFileMetadataCache::new(None, None);
assert!(cache.get(&object_meta).is_none());

// put
Expand Down
24 changes: 23 additions & 1 deletion crates/sail-cache/src/file_statistics_cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use datafusion::common::Statistics;
use datafusion::execution::cache::cache_manager::{FileStatisticsCache, FileStatisticsCacheEntry};
use datafusion::execution::cache::CacheAccessor;
use log::{debug, error};
use moka::sync::Cache;
Expand Down Expand Up @@ -70,7 +72,7 @@ impl CacheAccessor<Path, Arc<Statistics>> for MokaFileStatisticsCache {
None
}

fn remove(&mut self, k: &Path) -> Option<Arc<Statistics>> {
fn remove(&self, k: &Path) -> Option<Arc<Statistics>> {
self.statistics.remove(k).map(|(_, statistics)| statistics)
}

Expand All @@ -90,6 +92,26 @@ impl CacheAccessor<Path, Arc<Statistics>> for MokaFileStatisticsCache {
}
}

impl FileStatisticsCache for MokaFileStatisticsCache {
fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry> {
self.statistics
.iter()
.map(|(path, (object_meta, stats))| {
(
path.as_ref().clone(),
FileStatisticsCacheEntry {
object_meta,
num_rows: stats.num_rows,
num_columns: stats.column_statistics.len(),
table_size_bytes: stats.total_byte_size,
statistics_size_bytes: 0, // TODO: set to the real size in the future
},
)
})
.collect()
}
}

#[allow(clippy::unwrap_used)]
#[cfg(test)]
mod tests {
Expand Down
1 change: 1 addition & 0 deletions crates/sail-common-datafusion/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl CommonErrorCause {
Some(e) => Self::build::<Py>(e, seen),
},
DataFusionError::Shared(e) => Self::build::<Py>(e.as_ref(), seen),
DataFusionError::Ffi(x) => Self::Unknown(x.clone()),
};
}

Expand Down
1 change: 0 additions & 1 deletion crates/sail-common-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub mod logical_expr;
pub mod logical_rewriter;
pub mod physical_expr;
pub mod rename;
pub mod schema_adapter;
pub mod session;
pub mod streaming;
pub mod system;
Expand Down
Loading