diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index a29313254283f..5bba78e44d271 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -20,6 +20,7 @@ use std::collections::HashSet; use std::fmt::Debug; use std::sync::{Arc, Weak}; +use std::time::Duration; use super::options::ReadOptions; use crate::datasource::dynamic_file::DynamicListTableFactory; @@ -72,7 +73,10 @@ use datafusion_common::{ tree_node::{TreeNodeRecursion, TreeNodeVisitor}, DFSchema, DataFusionError, ParamValues, SchemaReference, TableReference, }; -use datafusion_execution::cache::cache_manager::DEFAULT_METADATA_CACHE_LIMIT; +use datafusion_execution::cache::cache_manager::{ + DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL, + DEFAULT_METADATA_CACHE_LIMIT, +}; pub use datafusion_execution::config::SessionConfig; use datafusion_execution::disk_manager::{ DiskManagerBuilder, DEFAULT_MAX_TEMP_DIRECTORY_SIZE, @@ -1168,6 +1172,14 @@ impl SessionContext { let limit = Self::parse_memory_limit(value)?; builder.with_metadata_cache_limit(limit) } + "list_files_cache_limit" => { + let limit = Self::parse_memory_limit(value)?; + builder.with_object_list_cache_limit(limit) + } + "list_files_cache_ttl" => { + let duration = Self::parse_duration(value)?; + builder.with_object_list_cache_ttl(Some(duration)) + } _ => return plan_err!("Unknown runtime configuration: {variable}"), // Remember to update `reset_runtime_variable()` when adding new options }; @@ -1199,6 +1211,14 @@ impl SessionContext { "metadata_cache_limit" => { builder = builder.with_metadata_cache_limit(DEFAULT_METADATA_CACHE_LIMIT); } + "list_files_cache_limit" => { + builder = builder + .with_object_list_cache_limit(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT); + } + "list_files_cache_ttl" => { + builder = + builder.with_object_list_cache_ttl(DEFAULT_LIST_FILES_CACHE_TTL); + } _ => return plan_err!("Unknown runtime configuration: {variable}"), }; @@ -1239,6 +1259,34 @@ impl SessionContext { } } + fn parse_duration(duration: &str) -> Result { + let mut minutes = None; + let mut seconds = None; + + for duration in duration.split_inclusive(&['m', 's']) { + let (number, unit) = duration.split_at(duration.len() - 1); + let number: u64 = number.parse().map_err(|_| { + plan_datafusion_err!("Failed to parse number from duration '{duration}'") + })?; + + match unit { + "m" if minutes.is_none() && seconds.is_none() => minutes = Some(number), + "s" if seconds.is_none() => seconds = Some(number), + _ => plan_err!("Invalid duration, unit must be either 'm' (minutes), or 's' (seconds), and be in the correct order")?, + } + } + + let duration = Duration::from_secs( + minutes.unwrap_or_default() * 60 + seconds.unwrap_or_default(), + ); + + if duration.is_zero() { + return plan_err!("Duration must be greater than 0 seconds"); + } + + Ok(duration) + } + async fn create_custom_table( &self, cmd: &CreateExternalTable, @@ -2673,4 +2721,24 @@ mod tests { Ok(()) } + + #[test] + fn test_parse_duration() { + // Valid durations + for (duration, want) in [ + ("1s", Duration::from_secs(1)), + ("1m", Duration::from_secs(60)), + ("1m0s", Duration::from_secs(60)), + ("1m1s", Duration::from_secs(61)), + ] { + let have = SessionContext::parse_duration(duration).unwrap(); + assert_eq!(want, have); + } + + // Invalid durations + for duration in ["0s", "0m", "1s0m", "1s1m"] { + let have = SessionContext::parse_duration(duration); + assert!(have.is_err()); + } + } } diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index 9627d7bccdb04..d6dc6983998d8 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -18,9 +18,14 @@ //! Tests for runtime configuration SQL interface use std::sync::Arc; +use std::time::Duration; use datafusion::execution::context::SessionContext; use datafusion::execution::context::TaskContext; +use datafusion::prelude::SessionConfig; +use datafusion_execution::cache::cache_manager::CacheManagerConfig; +use datafusion_execution::cache::DefaultListFilesCache; +use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_physical_plan::common::collect; #[tokio::test] @@ -233,6 +238,93 @@ async fn test_test_metadata_cache_limit() { assert_eq!(get_limit(&ctx), 123 * 1024); } +#[tokio::test] +async fn test_list_files_cache_limit() { + let list_files_cache = Arc::new(DefaultListFilesCache::default()); + + let rt = RuntimeEnvBuilder::new() + .with_cache_manager( + CacheManagerConfig::default().with_list_files_cache(Some(list_files_cache)), + ) + .build_arc() + .unwrap(); + + let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt); + + let update_limit = async |ctx: &SessionContext, limit: &str| { + ctx.sql( + format!("SET datafusion.runtime.list_files_cache_limit = '{limit}'").as_str(), + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + }; + + let get_limit = |ctx: &SessionContext| -> usize { + ctx.task_ctx() + .runtime_env() + .cache_manager + .get_list_files_cache() + .unwrap() + .cache_limit() + }; + + update_limit(&ctx, "100M").await; + assert_eq!(get_limit(&ctx), 100 * 1024 * 1024); + + update_limit(&ctx, "2G").await; + assert_eq!(get_limit(&ctx), 2 * 1024 * 1024 * 1024); + + update_limit(&ctx, "123K").await; + assert_eq!(get_limit(&ctx), 123 * 1024); +} + +#[tokio::test] +async fn test_list_files_cache_ttl() { + let list_files_cache = Arc::new(DefaultListFilesCache::default()); + + let rt = RuntimeEnvBuilder::new() + .with_cache_manager( + CacheManagerConfig::default().with_list_files_cache(Some(list_files_cache)), + ) + .build_arc() + .unwrap(); + + let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt); + + let update_limit = async |ctx: &SessionContext, limit: &str| { + ctx.sql( + format!("SET datafusion.runtime.list_files_cache_ttl = '{limit}'").as_str(), + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + }; + + let get_limit = |ctx: &SessionContext| -> Duration { + ctx.task_ctx() + .runtime_env() + .cache_manager + .get_list_files_cache() + .unwrap() + .cache_ttl() + .unwrap() + }; + + update_limit(&ctx, "1m").await; + assert_eq!(get_limit(&ctx), Duration::from_secs(60)); + + update_limit(&ctx, "30s").await; + assert_eq!(get_limit(&ctx), Duration::from_secs(30)); + + update_limit(&ctx, "1m30s").await; + assert_eq!(get_limit(&ctx), Duration::from_secs(90)); +} + #[tokio::test] async fn test_unknown_runtime_config() { let ctx = SessionContext::new(); diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index ad92e06e7c3da..2df5ef1b4458c 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -27,7 +27,9 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use std::time::Duration; -use super::list_files_cache::DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT; +pub use super::list_files_cache::{ + DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL, +}; /// A cache for [`Statistics`]. /// @@ -76,6 +78,9 @@ pub trait ListFilesCache: /// Updates the cache with a new memory limit in bytes. fn update_cache_limit(&self, limit: usize); + + /// Updates the cache with a new TTL (time-to-live). + fn update_cache_ttl(&self, ttl: Option); } /// Generic file-embedded metadata used with [`FileMetadataCache`]. @@ -173,7 +178,15 @@ impl CacheManager { let file_statistic_cache = config.table_files_statistics_cache.as_ref().map(Arc::clone); - let list_files_cache = config.list_files_cache.as_ref().map(Arc::clone); + let list_files_cache = config + .list_files_cache + .as_ref() + .inspect(|c| { + // the cache memory limit or ttl might have changed, ensure they are updated + c.update_cache_limit(config.list_files_cache_limit); + c.update_cache_ttl(config.list_files_cache_ttl); + }) + .map(Arc::clone); let file_metadata_cache = config .file_metadata_cache @@ -262,7 +275,7 @@ impl Default for CacheManagerConfig { table_files_statistics_cache: Default::default(), list_files_cache: Default::default(), list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, - list_files_cache_ttl: None, + list_files_cache_ttl: DEFAULT_LIST_FILES_CACHE_TTL, file_metadata_cache: Default::default(), metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT, } @@ -303,8 +316,8 @@ impl CacheManagerConfig { /// Sets the TTL (time-to-live) for entries in the list files cache. /// /// Default: None (infinite). - pub fn with_list_files_cache_ttl(mut self, ttl: Duration) -> Self { - self.list_files_cache_ttl = Some(ttl); + pub fn with_list_files_cache_ttl(mut self, ttl: Option) -> Self { + self.list_files_cache_ttl = ttl; self } diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index c209a012741bc..285abea2e66ae 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -141,7 +141,10 @@ fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize { } /// The default memory limit for the [`DefaultListFilesCache`] -pub(super) const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB +pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB + +/// The default cache TTL for the [`DefaultListFilesCache`] +pub const DEFAULT_LIST_FILES_CACHE_TTL: Option = None; // Infinite /// Handles the inner state of the [`DefaultListFilesCache`] struct. pub struct DefaultListFilesCacheState { @@ -157,7 +160,7 @@ impl Default for DefaultListFilesCacheState { lru_queue: LruQueue::new(), memory_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, memory_used: 0, - ttl: None, + ttl: DEFAULT_LIST_FILES_CACHE_TTL, } } } @@ -294,6 +297,12 @@ impl ListFilesCache for DefaultListFilesCache { state.memory_limit = limit; state.evict_entries(); } + + fn update_cache_ttl(&self, ttl: Option) { + let mut state = self.state.lock().unwrap(); + state.ttl = ttl; + state.evict_entries(); + } } impl CacheAccessor>> for DefaultListFilesCache { diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index e38a9f0854de5..67398d59f1374 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -101,6 +101,8 @@ fn create_runtime_config_entries( max_temp_directory_size: Option, temp_directory: Option, metadata_cache_limit: Option, + list_files_cache_limit: Option, + list_files_cache_ttl: Option, ) -> Vec { vec![ ConfigEntry { @@ -123,6 +125,16 @@ fn create_runtime_config_entries( value: metadata_cache_limit, description: "Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", }, + ConfigEntry { + key: "datafusion.runtime.list_files_cache_limit".to_string(), + value: list_files_cache_limit, + description: "Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", + }, + ConfigEntry { + key: "datafusion.runtime.list_files_cache_ttl".to_string(), + value: list_files_cache_ttl, + description: "TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes.", + }, ] } @@ -227,6 +239,14 @@ impl RuntimeEnv { } } + fn format_duration(duration: Duration) -> String { + let total = duration.as_secs(); + let mins = total / 60; + let secs = total % 60; + + format!("{mins}m{secs}s") + } + let memory_limit_value = match self.memory_pool.memory_limit() { MemoryLimit::Finite(size) => Some(format_byte_size( size.try_into() @@ -259,11 +279,25 @@ impl RuntimeEnv { .expect("Metadata cache size conversion failed"), ); + let list_files_cache_limit = self.cache_manager.get_list_files_cache_limit(); + let list_files_cache_value = format_byte_size( + list_files_cache_limit + .try_into() + .expect("List files cache size conversion failed"), + ); + + let list_files_cache_ttl = self + .cache_manager + .get_list_files_cache_ttl() + .map(format_duration); + create_runtime_config_entries( memory_limit_value, Some(max_temp_dir_value), temp_dir_value, Some(metadata_cache_value), + Some(list_files_cache_value), + list_files_cache_ttl, ) } } @@ -394,7 +428,7 @@ impl RuntimeEnvBuilder { } /// Specifies the duration entries in the object list cache will be considered valid. - pub fn with_object_list_cache_ttl(mut self, ttl: Duration) -> Self { + pub fn with_object_list_cache_ttl(mut self, ttl: Option) -> Self { self.cache_manager = self.cache_manager.with_list_files_cache_ttl(ttl); self } @@ -473,6 +507,8 @@ impl RuntimeEnvBuilder { Some("100G".to_string()), None, Some("50M".to_owned()), + Some("1M".to_owned()), + None, ) } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index cc45e6dfc577f..b5429d68c99f0 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -319,6 +319,8 @@ datafusion.optimizer.repartition_sorts true datafusion.optimizer.repartition_windows true datafusion.optimizer.skip_failed_rules false datafusion.optimizer.top_down_join_key_reordering true +datafusion.runtime.list_files_cache_limit 1M +datafusion.runtime.list_files_cache_ttl NULL datafusion.runtime.max_temp_directory_size 100G datafusion.runtime.memory_limit unlimited datafusion.runtime.metadata_cache_limit 50M @@ -450,6 +452,8 @@ datafusion.optimizer.repartition_sorts true Should DataFusion execute sorts in a datafusion.optimizer.repartition_windows true Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level datafusion.optimizer.skip_failed_rules false When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail datafusion.optimizer.top_down_join_key_reordering true When set to true, the physical plan optimizer will run a top down process to reorder the join keys +datafusion.runtime.list_files_cache_limit 1M Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. +datafusion.runtime.list_files_cache_ttl NULL TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes. datafusion.runtime.max_temp_directory_size 100G Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. datafusion.runtime.memory_limit unlimited Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. datafusion.runtime.metadata_cache_limit 50M Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. diff --git a/datafusion/sqllogictest/test_files/set_variable.slt b/datafusion/sqllogictest/test_files/set_variable.slt index 82bd71d72b9e5..8957404799b73 100644 --- a/datafusion/sqllogictest/test_files/set_variable.slt +++ b/datafusion/sqllogictest/test_files/set_variable.slt @@ -351,6 +351,18 @@ RESET datafusion.runtime.memory_limit statement ok EXPLAIN ANALYZE SELECT * FROM generate_series(1, 1000) AS t1(v1) ORDER BY v1 +statement ok +SET datafusion.runtime.list_files_cache_limit = '1K' + +statement ok +RESET datafusion.runtime.list_files_cache_limit + +statement ok +SET datafusion.runtime.list_files_cache_ttl = '1m' + +statement ok +RESET datafusion.runtime.list_files_cache_ttl + # reset invalid variable - typo in namespace statement error DataFusion error: Invalid or Unsupported Configuration: Could not find config namespace "dataexplosion" RESET dataexplosion.execution.batch_size @@ -411,6 +423,8 @@ datafusion.runtime.metadata_cache_limit 200M query T SELECT name FROM information_schema.df_settings WHERE name LIKE 'datafusion.runtime.%' ORDER BY name ---- +datafusion.runtime.list_files_cache_limit +datafusion.runtime.list_files_cache_ttl datafusion.runtime.max_temp_directory_size datafusion.runtime.memory_limit datafusion.runtime.metadata_cache_limit diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 0c91eb57975e1..156df1d9d70aa 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -202,6 +202,8 @@ The following runtime configuration settings are available: | key | default | description | | ------------------------------------------ | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.runtime.list_files_cache_limit | 1M | Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.list_files_cache_ttl | NULL | TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes. | | datafusion.runtime.max_temp_directory_size | 100G | Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | | datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | | datafusion.runtime.metadata_cache_limit | 50M | Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. |