Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 229 additions & 0 deletions crates/sail-cache/src/file_metadata_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use datafusion::execution::cache::cache_manager::{
FileMetadata, FileMetadataCache, FileMetadataCacheEntry,
};
use datafusion::execution::cache::CacheAccessor;
use log::debug;
use moka::policy::EvictionPolicy;
use moka::sync::Cache;
use object_store::path::Path;
use object_store::ObjectMeta;

pub struct MokaFileMetadataCache {
size_limit: Option<u64>,
metadata: Cache<Path, (ObjectMeta, Arc<dyn FileMetadata>)>,
}

impl MokaFileMetadataCache {
const NAME: &'static str = "MokaFileMetadataCache";

pub fn new(ttl: Option<u64>, size_limit: Option<u64>) -> Self {
let mut builder = Cache::builder().eviction_policy(EvictionPolicy::lru());

if let Some(ttl) = ttl {
debug!("Setting TTL for {} to {ttl} second(s)", Self::NAME);
builder = builder.time_to_live(Duration::from_secs(ttl));
}
if let Some(size_limit) = size_limit {
debug!(
"Setting size limit for {} to {size_limit} byte(s)",
Self::NAME
);
builder = builder
.weigher(
|_key: &Path, (_, meta): &(ObjectMeta, Arc<dyn FileMetadata>)| -> u32 {
meta.memory_size() as u32
},
)
.max_capacity(size_limit);
} else {
debug!("No size limit set for {}", Self::NAME);
}

Self {
size_limit,
metadata: builder.build(),
}
}
}

impl FileMetadataCache for MokaFileMetadataCache {
fn cache_limit(&self) -> usize {
self.size_limit
.map(|size| size as usize)
.unwrap_or(usize::MAX)
}

fn update_cache_limit(&self, _limit: usize) {
// TODO: support dynamic update of cache limit
}

fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry> {
self.metadata
.iter()
.map(|(path, (object_meta, meta))| {
(
path.as_ref().clone(),
FileMetadataCacheEntry {
object_meta,
size_bytes: meta.memory_size(),
// TODO: get hits from the cache
hits: 0,
extra: meta.extra_info(),
},
)
})
.collect()
}
}

impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for MokaFileMetadataCache {
type Extra = ObjectMeta;

fn get(&self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
self.metadata
.get(&k.location)
.and_then(|(extra, metadata)| {
if extra.size == k.size && extra.last_modified == k.last_modified {
Some(Arc::clone(&metadata))
} else {
None
}
})
}

fn get_with_extra(&self, k: &ObjectMeta, _e: &Self::Extra) -> Option<Arc<dyn FileMetadata>> {
self.get(k)
}

fn put(&self, key: &ObjectMeta, value: Arc<dyn FileMetadata>) -> Option<Arc<dyn FileMetadata>> {
self.metadata
.insert(key.location.clone(), (key.clone(), value));
None
}

fn put_with_extra(
&self,
key: &ObjectMeta,
value: Arc<dyn FileMetadata>,
_e: &Self::Extra,
) -> Option<Arc<dyn FileMetadata>> {
self.put(key, value)
}

fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
self.metadata
.remove(&k.location)
.map(|(_, metadata)| metadata)
}

fn contains_key(&self, k: &ObjectMeta) -> bool {
self.metadata
.get(&k.location)
.map(|(extra, _)| extra.size == k.size && extra.last_modified == k.last_modified)
.unwrap_or(false)
}

fn len(&self) -> usize {
self.metadata.entry_count() as usize
}

fn clear(&self) {
self.metadata.invalidate_all();
}

fn name(&self) -> String {
Self::NAME.to_string()
}
}

#[allow(clippy::unwrap_used)]
#[cfg(test)]
mod tests {
use std::any::Any;
use std::sync::Arc;

use chrono::DateTime;
use datafusion::execution::cache::cache_manager::FileMetadata;
use object_store::path::Path;
use object_store::ObjectMeta;

use super::*;

pub struct TestFileMetadata {
metadata: String,
}

impl FileMetadata for TestFileMetadata {
fn as_any(&self) -> &dyn Any {
self
}

fn memory_size(&self) -> usize {
self.metadata.len()
}

fn extra_info(&self) -> HashMap<String, String> {
HashMap::new()
}
}

#[test]
fn test_file_metadata_cache() {
let object_meta = ObjectMeta {
location: Path::from("test"),
last_modified: DateTime::parse_from_rfc3339("2025-07-29T12:12:12+00:00")
.unwrap()
.into(),
size: 1024,
e_tag: None,
version: None,
};

let metadata: Arc<dyn FileMetadata> = Arc::new(TestFileMetadata {
metadata: "retrieved_metadata".to_owned(),
});

let mut cache = MokaFileMetadataCache::new(None, None);
assert!(cache.get(&object_meta).is_none());

// put
cache.put(&object_meta, metadata);

// get and contains of a valid entry
assert!(cache.contains_key(&object_meta));
let value = cache.get(&object_meta);
assert!(value.is_some());
let test_file_metadata = Arc::downcast::<TestFileMetadata>(value.unwrap());
assert!(test_file_metadata.is_ok());
assert_eq!(test_file_metadata.unwrap().metadata, "retrieved_metadata");

// file size changed
let mut object_meta2 = object_meta.clone();
object_meta2.size = 2048;
assert!(cache.get(&object_meta2).is_none());
assert!(!cache.contains_key(&object_meta2));

// file last_modified changed
let mut object_meta2 = object_meta.clone();
object_meta2.last_modified = DateTime::parse_from_rfc3339("2025-07-29T13:13:13+00:00")
.unwrap()
.into();
assert!(cache.get(&object_meta2).is_none());
assert!(!cache.contains_key(&object_meta2));

// different file
let mut object_meta2 = object_meta.clone();
object_meta2.location = Path::from("test2");
assert!(cache.get(&object_meta2).is_none());
assert!(!cache.contains_key(&object_meta2));

// remove
cache.remove(&object_meta);
assert!(cache.get(&object_meta).is_none());
assert!(!cache.contains_key(&object_meta));
}
}
1 change: 1 addition & 0 deletions crates/sail-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use log::error;

pub mod error;
pub mod file_listing_cache;
pub mod file_metadata_cache;
pub mod file_statistics_cache;

#[allow(dead_code)]
Expand Down
11 changes: 11 additions & 0 deletions crates/sail-common/src/config/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ pub struct ParquetConfig {
pub maximum_parallel_row_group_writers: usize,
pub maximum_buffered_record_batches_per_stream: usize,
pub file_statistics_cache: FileStatisticsCacheConfig,
pub file_metadata_cache: FileMetadataCacheConfig,
}

#[derive(Debug, Clone, Deserialize)]
Expand All @@ -239,6 +240,16 @@ pub struct FileStatisticsCacheConfig {
pub max_entries: Option<u64>,
}

#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct FileMetadataCacheConfig {
pub r#type: CacheType,
#[serde(deserialize_with = "deserialize_non_zero")]
pub ttl: Option<u64>,
#[serde(deserialize_with = "deserialize_non_zero")]
pub size_limit: Option<u64>,
}

#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CacheType {
Expand Down
40 changes: 36 additions & 4 deletions crates/sail-common/src/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@
description: |
The time-to-live (TTL) in seconds for cached directory listings.
Entries expire after this duration from when they were cached,
ensuring eventual consistency with the file system.
ensuring eventual consistency with the storage system.
This setting is only effective when the cache is enabled.
Setting the value to `0` disables the TTL.
This setting can only be configured at startup and cannot be changed at runtime.
Expand Down Expand Up @@ -496,9 +496,9 @@
type: number
default: "1800"
description: |
The time-to-live (TTL) in seconds for cached Parquet files statistics.
(Reading) The time-to-live (TTL) in seconds for cached Parquet files statistics.
Entries expire after this duration from when they were cached,
ensuring eventual consistency with the file system.
ensuring eventual consistency with the storage system.
This setting is only effective when the cache is enabled.
Setting the value to `0` disables the TTL.
This setting can only be configured at startup and cannot be changed at runtime.
Expand All @@ -508,13 +508,45 @@
type: number
default: "10000"
description: |
Maximum number of Parquet files statistics to cache.
(Reading) Maximum number of Parquet files statistics to cache.
This setting is only effective when the cache is enabled.
When the limit is reached, least recently used entries are evicted.
Setting the value to `0` disables the limit.
This setting can only be configured at startup and cannot be changed at runtime.
experimental: true

- key: parquet.file_metadata_cache.type
type: string
default: "global"
description: |
(Reading) The type of cache for embedded metadata of Parquet files (footer and page metadata).
This setting avoids repeatedly reading metadata,
which can offer substantial performance improvements for repeated queries over large number of files.
The cache is automatically invalidated when the underlying file is modified.
Valid values are `none`, `global` (for a global cache), and `session` (for a per-session cache).
experimental: true

- key: parquet.file_metadata_cache.ttl
type: number
default: "1800"
description: |
(Reading) The time-to-live (TTL) in seconds for cached Parquet files metadata.
Entries expire after this duration from when they were cached,
ensuring eventual consistency with the storage system.
This setting is only effective when the cache is enabled.
Setting the value to `0` disables the TTL.
This setting can only be configured at startup and cannot be changed at runtime.
experimental: true

- key: parquet.file_metadata_cache.size_limit
type: number
default: "0"
description: |
(Reading) Maximum size in bytes for the Parquet metadata cache.
Setting the value to `0` disables the limit.
This setting can only be configured at startup and cannot be changed at runtime.
experimental: true

- key: kubernetes.image
type: string
default: "sail:latest"
Expand Down
30 changes: 28 additions & 2 deletions crates/sail-spark-connect/src/session_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use datafusion::execution::cache::cache_manager::{
CacheManagerConfig, FileStatisticsCache, ListFilesCache,
CacheManagerConfig, FileMetadataCache, FileStatisticsCache, ListFilesCache,
};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::{SessionConfig, SessionContext};
use log::{debug, info};
use sail_cache::file_listing_cache::MokaFileListingCache;
use sail_cache::file_metadata_cache::MokaFileMetadataCache;
use sail_cache::file_statistics_cache::MokaFileStatisticsCache;
use sail_common::config::{AppConfig, CacheType, ExecutionMode};
use sail_common::runtime::RuntimeHandle;
Expand Down Expand Up @@ -217,10 +218,33 @@ impl SessionManagerActor {
}
}
};
let file_metadata_cache: Arc<dyn FileMetadataCache> = {
let ttl = options.config.parquet.file_metadata_cache.ttl;
let size_limit = options.config.parquet.file_metadata_cache.size_limit;
match options.config.parquet.file_metadata_cache.r#type {
CacheType::None => {
debug!("Not using file metadata cache");
Arc::new(MokaFileMetadataCache::new(ttl, Some(0)))
}
CacheType::Global => {
debug!("Using global file metadata cache");
self.global_file_metadata_cache
.get_or_insert_with(|| {
Arc::new(MokaFileMetadataCache::new(ttl, size_limit))
})
.clone()
}
CacheType::Session => {
debug!("Using session file metadata cache");
Arc::new(MokaFileMetadataCache::new(ttl, size_limit))
}
}
};

let cache_config = CacheManagerConfig::default()
.with_files_statistics_cache(file_statistics_cache)
.with_list_files_cache(file_listing_cache);
.with_list_files_cache(file_listing_cache)
.with_file_metadata_cache(Some(file_metadata_cache));
let builder = RuntimeEnvBuilder::default()
.with_object_store_registry(Arc::new(registry))
.with_cache_manager(cache_config);
Expand Down Expand Up @@ -281,6 +305,7 @@ struct SessionManagerActor {
sessions: HashMap<SessionKey, SessionContext>,
global_file_listing_cache: Option<Arc<MokaFileListingCache>>,
global_file_statistics_cache: Option<Arc<MokaFileStatisticsCache>>,
global_file_metadata_cache: Option<Arc<MokaFileMetadataCache>>,
}

#[tonic::async_trait]
Expand All @@ -294,6 +319,7 @@ impl Actor for SessionManagerActor {
sessions: HashMap::new(),
global_file_listing_cache: None,
global_file_statistics_cache: None,
global_file_metadata_cache: None,
}
}

Expand Down