diff --git a/crates/sail-cache/src/file_metadata_cache.rs b/crates/sail-cache/src/file_metadata_cache.rs new file mode 100644 index 0000000000..b2b5c7195e --- /dev/null +++ b/crates/sail-cache/src/file_metadata_cache.rs @@ -0,0 +1,229 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use datafusion::execution::cache::cache_manager::{ + FileMetadata, FileMetadataCache, FileMetadataCacheEntry, +}; +use datafusion::execution::cache::CacheAccessor; +use log::debug; +use moka::policy::EvictionPolicy; +use moka::sync::Cache; +use object_store::path::Path; +use object_store::ObjectMeta; + +pub struct MokaFileMetadataCache { + size_limit: Option, + metadata: Cache)>, +} + +impl MokaFileMetadataCache { + const NAME: &'static str = "MokaFileMetadataCache"; + + pub fn new(ttl: Option, size_limit: Option) -> Self { + let mut builder = Cache::builder().eviction_policy(EvictionPolicy::lru()); + + if let Some(ttl) = ttl { + debug!("Setting TTL for {} to {ttl} second(s)", Self::NAME); + builder = builder.time_to_live(Duration::from_secs(ttl)); + } + if let Some(size_limit) = size_limit { + debug!( + "Setting size limit for {} to {size_limit} byte(s)", + Self::NAME + ); + builder = builder + .weigher( + |_key: &Path, (_, meta): &(ObjectMeta, Arc)| -> u32 { + meta.memory_size() as u32 + }, + ) + .max_capacity(size_limit); + } else { + debug!("No size limit set for {}", Self::NAME); + } + + Self { + size_limit, + metadata: builder.build(), + } + } +} + +impl FileMetadataCache for MokaFileMetadataCache { + fn cache_limit(&self) -> usize { + self.size_limit + .map(|size| size as usize) + .unwrap_or(usize::MAX) + } + + fn update_cache_limit(&self, _limit: usize) { + // TODO: support dynamic update of cache limit + } + + fn list_entries(&self) -> HashMap { + self.metadata + .iter() + .map(|(path, (object_meta, meta))| { + ( + path.as_ref().clone(), + FileMetadataCacheEntry { + object_meta, + size_bytes: meta.memory_size(), + // TODO: get hits from the cache + hits: 0, + extra: meta.extra_info(), + }, + ) + }) + .collect() + } +} + +impl CacheAccessor> for MokaFileMetadataCache { + type Extra = ObjectMeta; + + fn get(&self, k: &ObjectMeta) -> Option> { + self.metadata + .get(&k.location) + .and_then(|(extra, metadata)| { + if extra.size == k.size && extra.last_modified == k.last_modified { + Some(Arc::clone(&metadata)) + } else { + None + } + }) + } + + fn get_with_extra(&self, k: &ObjectMeta, _e: &Self::Extra) -> Option> { + self.get(k) + } + + fn put(&self, key: &ObjectMeta, value: Arc) -> Option> { + self.metadata + .insert(key.location.clone(), (key.clone(), value)); + None + } + + fn put_with_extra( + &self, + key: &ObjectMeta, + value: Arc, + _e: &Self::Extra, + ) -> Option> { + self.put(key, value) + } + + fn remove(&mut self, k: &ObjectMeta) -> Option> { + self.metadata + .remove(&k.location) + .map(|(_, metadata)| metadata) + } + + fn contains_key(&self, k: &ObjectMeta) -> bool { + self.metadata + .get(&k.location) + .map(|(extra, _)| extra.size == k.size && extra.last_modified == k.last_modified) + .unwrap_or(false) + } + + fn len(&self) -> usize { + self.metadata.entry_count() as usize + } + + fn clear(&self) { + self.metadata.invalidate_all(); + } + + fn name(&self) -> String { + Self::NAME.to_string() + } +} + +#[allow(clippy::unwrap_used)] +#[cfg(test)] +mod tests { + use std::any::Any; + use std::sync::Arc; + + use chrono::DateTime; + use datafusion::execution::cache::cache_manager::FileMetadata; + use object_store::path::Path; + use object_store::ObjectMeta; + + use super::*; + + pub struct TestFileMetadata { + metadata: String, + } + + impl FileMetadata for TestFileMetadata { + fn as_any(&self) -> &dyn Any { + self + } + + fn memory_size(&self) -> usize { + self.metadata.len() + } + + fn extra_info(&self) -> HashMap { + HashMap::new() + } + } + + #[test] + fn test_file_metadata_cache() { + let object_meta = ObjectMeta { + location: Path::from("test"), + last_modified: DateTime::parse_from_rfc3339("2025-07-29T12:12:12+00:00") + .unwrap() + .into(), + size: 1024, + e_tag: None, + version: None, + }; + + let metadata: Arc = Arc::new(TestFileMetadata { + metadata: "retrieved_metadata".to_owned(), + }); + + let mut cache = MokaFileMetadataCache::new(None, None); + assert!(cache.get(&object_meta).is_none()); + + // put + cache.put(&object_meta, metadata); + + // get and contains of a valid entry + assert!(cache.contains_key(&object_meta)); + let value = cache.get(&object_meta); + assert!(value.is_some()); + let test_file_metadata = Arc::downcast::(value.unwrap()); + assert!(test_file_metadata.is_ok()); + assert_eq!(test_file_metadata.unwrap().metadata, "retrieved_metadata"); + + // file size changed + let mut object_meta2 = object_meta.clone(); + object_meta2.size = 2048; + assert!(cache.get(&object_meta2).is_none()); + assert!(!cache.contains_key(&object_meta2)); + + // file last_modified changed + let mut object_meta2 = object_meta.clone(); + object_meta2.last_modified = DateTime::parse_from_rfc3339("2025-07-29T13:13:13+00:00") + .unwrap() + .into(); + assert!(cache.get(&object_meta2).is_none()); + assert!(!cache.contains_key(&object_meta2)); + + // different file + let mut object_meta2 = object_meta.clone(); + object_meta2.location = Path::from("test2"); + assert!(cache.get(&object_meta2).is_none()); + assert!(!cache.contains_key(&object_meta2)); + + // remove + cache.remove(&object_meta); + assert!(cache.get(&object_meta).is_none()); + assert!(!cache.contains_key(&object_meta)); + } +} diff --git a/crates/sail-cache/src/lib.rs b/crates/sail-cache/src/lib.rs index 4152e3ad1d..0a5cce6a10 100644 --- a/crates/sail-cache/src/lib.rs +++ b/crates/sail-cache/src/lib.rs @@ -2,6 +2,7 @@ use log::error; pub mod error; pub mod file_listing_cache; +pub mod file_metadata_cache; pub mod file_statistics_cache; #[allow(dead_code)] diff --git a/crates/sail-common/src/config/application.rs b/crates/sail-common/src/config/application.rs index f7604a66cb..ffc0b95711 100644 --- a/crates/sail-common/src/config/application.rs +++ b/crates/sail-common/src/config/application.rs @@ -227,6 +227,7 @@ pub struct ParquetConfig { pub maximum_parallel_row_group_writers: usize, pub maximum_buffered_record_batches_per_stream: usize, pub file_statistics_cache: FileStatisticsCacheConfig, + pub file_metadata_cache: FileMetadataCacheConfig, } #[derive(Debug, Clone, Deserialize)] @@ -239,6 +240,16 @@ pub struct FileStatisticsCacheConfig { pub max_entries: Option, } +#[derive(Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct FileMetadataCacheConfig { + pub r#type: CacheType, + #[serde(deserialize_with = "deserialize_non_zero")] + pub ttl: Option, + #[serde(deserialize_with = "deserialize_non_zero")] + pub size_limit: Option, +} + #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "snake_case")] pub enum CacheType { diff --git a/crates/sail-common/src/config/application.yaml b/crates/sail-common/src/config/application.yaml index e5a9df106e..854e227edd 100644 --- a/crates/sail-common/src/config/application.yaml +++ b/crates/sail-common/src/config/application.yaml @@ -226,7 +226,7 @@ description: | The time-to-live (TTL) in seconds for cached directory listings. Entries expire after this duration from when they were cached, - ensuring eventual consistency with the file system. + ensuring eventual consistency with the storage system. This setting is only effective when the cache is enabled. Setting the value to `0` disables the TTL. This setting can only be configured at startup and cannot be changed at runtime. @@ -496,9 +496,9 @@ type: number default: "1800" description: | - The time-to-live (TTL) in seconds for cached Parquet files statistics. + (Reading) The time-to-live (TTL) in seconds for cached Parquet files statistics. Entries expire after this duration from when they were cached, - ensuring eventual consistency with the file system. + ensuring eventual consistency with the storage system. This setting is only effective when the cache is enabled. Setting the value to `0` disables the TTL. This setting can only be configured at startup and cannot be changed at runtime. @@ -508,13 +508,45 @@ type: number default: "10000" description: | - Maximum number of Parquet files statistics to cache. + (Reading) Maximum number of Parquet files statistics to cache. This setting is only effective when the cache is enabled. When the limit is reached, least recently used entries are evicted. Setting the value to `0` disables the limit. This setting can only be configured at startup and cannot be changed at runtime. experimental: true +- key: parquet.file_metadata_cache.type + type: string + default: "global" + description: | + (Reading) The type of cache for embedded metadata of Parquet files (footer and page metadata). + This setting avoids repeatedly reading metadata, + which can offer substantial performance improvements for repeated queries over large number of files. + The cache is automatically invalidated when the underlying file is modified. + Valid values are `none`, `global` (for a global cache), and `session` (for a per-session cache). + experimental: true + +- key: parquet.file_metadata_cache.ttl + type: number + default: "1800" + description: | + (Reading) The time-to-live (TTL) in seconds for cached Parquet files metadata. + Entries expire after this duration from when they were cached, + ensuring eventual consistency with the storage system. + This setting is only effective when the cache is enabled. + Setting the value to `0` disables the TTL. + This setting can only be configured at startup and cannot be changed at runtime. + experimental: true + +- key: parquet.file_metadata_cache.size_limit + type: number + default: "0" + description: | + (Reading) Maximum size in bytes for the Parquet metadata cache. + Setting the value to `0` disables the limit. + This setting can only be configured at startup and cannot be changed at runtime. + experimental: true + - key: kubernetes.image type: string default: "sail:latest" diff --git a/crates/sail-spark-connect/src/session_manager.rs b/crates/sail-spark-connect/src/session_manager.rs index c78194cbc8..02f6c2754c 100644 --- a/crates/sail-spark-connect/src/session_manager.rs +++ b/crates/sail-spark-connect/src/session_manager.rs @@ -5,13 +5,14 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use datafusion::execution::cache::cache_manager::{ - CacheManagerConfig, FileStatisticsCache, ListFilesCache, + CacheManagerConfig, FileMetadataCache, FileStatisticsCache, ListFilesCache, }; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::SessionStateBuilder; use datafusion::prelude::{SessionConfig, SessionContext}; use log::{debug, info}; use sail_cache::file_listing_cache::MokaFileListingCache; +use sail_cache::file_metadata_cache::MokaFileMetadataCache; use sail_cache::file_statistics_cache::MokaFileStatisticsCache; use sail_common::config::{AppConfig, CacheType, ExecutionMode}; use sail_common::runtime::RuntimeHandle; @@ -217,10 +218,33 @@ impl SessionManagerActor { } } }; + let file_metadata_cache: Arc = { + let ttl = options.config.parquet.file_metadata_cache.ttl; + let size_limit = options.config.parquet.file_metadata_cache.size_limit; + match options.config.parquet.file_metadata_cache.r#type { + CacheType::None => { + debug!("Not using file metadata cache"); + Arc::new(MokaFileMetadataCache::new(ttl, Some(0))) + } + CacheType::Global => { + debug!("Using global file metadata cache"); + self.global_file_metadata_cache + .get_or_insert_with(|| { + Arc::new(MokaFileMetadataCache::new(ttl, size_limit)) + }) + .clone() + } + CacheType::Session => { + debug!("Using session file metadata cache"); + Arc::new(MokaFileMetadataCache::new(ttl, size_limit)) + } + } + }; let cache_config = CacheManagerConfig::default() .with_files_statistics_cache(file_statistics_cache) - .with_list_files_cache(file_listing_cache); + .with_list_files_cache(file_listing_cache) + .with_file_metadata_cache(Some(file_metadata_cache)); let builder = RuntimeEnvBuilder::default() .with_object_store_registry(Arc::new(registry)) .with_cache_manager(cache_config); @@ -281,6 +305,7 @@ struct SessionManagerActor { sessions: HashMap, global_file_listing_cache: Option>, global_file_statistics_cache: Option>, + global_file_metadata_cache: Option>, } #[tonic::async_trait] @@ -294,6 +319,7 @@ impl Actor for SessionManagerActor { sessions: HashMap::new(), global_file_listing_cache: None, global_file_statistics_cache: None, + global_file_metadata_cache: None, } }