Skip to content
Merged
70 changes: 69 additions & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::{Arc, Weak};
use std::time::Duration;

use super::options::ReadOptions;
use crate::datasource::dynamic_file::DynamicListTableFactory;
Expand Down Expand Up @@ -72,7 +73,10 @@ use datafusion_common::{
tree_node::{TreeNodeRecursion, TreeNodeVisitor},
DFSchema, DataFusionError, ParamValues, SchemaReference, TableReference,
};
use datafusion_execution::cache::cache_manager::DEFAULT_METADATA_CACHE_LIMIT;
use datafusion_execution::cache::cache_manager::{
DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL,
DEFAULT_METADATA_CACHE_LIMIT,
};
pub use datafusion_execution::config::SessionConfig;
use datafusion_execution::disk_manager::{
DiskManagerBuilder, DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
Expand Down Expand Up @@ -1168,6 +1172,14 @@ impl SessionContext {
let limit = Self::parse_memory_limit(value)?;
builder.with_metadata_cache_limit(limit)
}
"list_files_cache_limit" => {
let limit = Self::parse_memory_limit(value)?;
builder.with_object_list_cache_limit(limit)
}
"list_files_cache_ttl" => {
let duration = Self::parse_duration(value)?;
builder.with_object_list_cache_ttl(Some(duration))
}
_ => return plan_err!("Unknown runtime configuration: {variable}"),
// Remember to update `reset_runtime_variable()` when adding new options
};
Expand Down Expand Up @@ -1199,6 +1211,14 @@ impl SessionContext {
"metadata_cache_limit" => {
builder = builder.with_metadata_cache_limit(DEFAULT_METADATA_CACHE_LIMIT);
}
"list_files_cache_limit" => {
builder = builder
.with_object_list_cache_limit(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT);
}
"list_files_cache_ttl" => {
builder =
builder.with_object_list_cache_ttl(DEFAULT_LIST_FILES_CACHE_TTL);
}
_ => return plan_err!("Unknown runtime configuration: {variable}"),
};

Expand Down Expand Up @@ -1239,6 +1259,34 @@ impl SessionContext {
}
}

fn parse_duration(duration: &str) -> Result<Duration> {
let mut minutes = None;
let mut seconds = None;

for duration in duration.split_inclusive(&['m', 's']) {
let (number, unit) = duration.split_at(duration.len() - 1);
let number: u64 = number.parse().map_err(|_| {
plan_datafusion_err!("Failed to parse number from duration '{duration}'")
})?;

match unit {
"m" if minutes.is_none() && seconds.is_none() => minutes = Some(number),
"s" if seconds.is_none() => seconds = Some(number),
_ => plan_err!("Invalid duration, unit must be either 'm' (minutes), or 's' (seconds), and be in the correct order")?,
}
}

let duration = Duration::from_secs(
minutes.unwrap_or_default() * 60 + seconds.unwrap_or_default(),
);

if duration.is_zero() {
return plan_err!("Duration must be greater than 0 seconds");
}

Ok(duration)
}

async fn create_custom_table(
&self,
cmd: &CreateExternalTable,
Expand Down Expand Up @@ -2673,4 +2721,24 @@ mod tests {

Ok(())
}

#[test]
fn test_parse_duration() {
// Valid durations
for (duration, want) in [
("1s", Duration::from_secs(1)),
("1m", Duration::from_secs(60)),
("1m0s", Duration::from_secs(60)),
("1m1s", Duration::from_secs(61)),
] {
let have = SessionContext::parse_duration(duration).unwrap();
assert_eq!(want, have);
}

// Invalid durations
for duration in ["0s", "0m", "1s0m", "1s1m"] {
let have = SessionContext::parse_duration(duration);
assert!(have.is_err());
}
}
}
92 changes: 92 additions & 0 deletions datafusion/core/tests/sql/runtime_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
//! Tests for runtime configuration SQL interface

use std::sync::Arc;
use std::time::Duration;

use datafusion::execution::context::SessionContext;
use datafusion::execution::context::TaskContext;
use datafusion::prelude::SessionConfig;
use datafusion_execution::cache::cache_manager::CacheManagerConfig;
use datafusion_execution::cache::DefaultListFilesCache;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_physical_plan::common::collect;

#[tokio::test]
Expand Down Expand Up @@ -233,6 +238,93 @@ async fn test_test_metadata_cache_limit() {
assert_eq!(get_limit(&ctx), 123 * 1024);
}

#[tokio::test]
async fn test_list_files_cache_limit() {
let list_files_cache = Arc::new(DefaultListFilesCache::default());

let rt = RuntimeEnvBuilder::new()
.with_cache_manager(
CacheManagerConfig::default().with_list_files_cache(Some(list_files_cache)),
)
.build_arc()
.unwrap();

let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt);

let update_limit = async |ctx: &SessionContext, limit: &str| {
ctx.sql(
format!("SET datafusion.runtime.list_files_cache_limit = '{limit}'").as_str(),
)
.await
.unwrap()
.collect()
.await
.unwrap();
};

let get_limit = |ctx: &SessionContext| -> usize {
ctx.task_ctx()
.runtime_env()
.cache_manager
.get_list_files_cache()
.unwrap()
.cache_limit()
};

update_limit(&ctx, "100M").await;
assert_eq!(get_limit(&ctx), 100 * 1024 * 1024);

update_limit(&ctx, "2G").await;
assert_eq!(get_limit(&ctx), 2 * 1024 * 1024 * 1024);

update_limit(&ctx, "123K").await;
assert_eq!(get_limit(&ctx), 123 * 1024);
}

#[tokio::test]
async fn test_list_files_cache_ttl() {
let list_files_cache = Arc::new(DefaultListFilesCache::default());

let rt = RuntimeEnvBuilder::new()
.with_cache_manager(
CacheManagerConfig::default().with_list_files_cache(Some(list_files_cache)),
)
.build_arc()
.unwrap();

let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt);

let update_limit = async |ctx: &SessionContext, limit: &str| {
ctx.sql(
format!("SET datafusion.runtime.list_files_cache_ttl = '{limit}'").as_str(),
)
.await
.unwrap()
.collect()
.await
.unwrap();
};

let get_limit = |ctx: &SessionContext| -> Duration {
ctx.task_ctx()
.runtime_env()
.cache_manager
.get_list_files_cache()
.unwrap()
.cache_ttl()
.unwrap()
};

update_limit(&ctx, "1m").await;
assert_eq!(get_limit(&ctx), Duration::from_secs(60));

update_limit(&ctx, "30s").await;
assert_eq!(get_limit(&ctx), Duration::from_secs(30));

update_limit(&ctx, "1m30s").await;
assert_eq!(get_limit(&ctx), Duration::from_secs(90));
}

#[tokio::test]
async fn test_unknown_runtime_config() {
let ctx = SessionContext::new();
Expand Down
23 changes: 18 additions & 5 deletions datafusion/execution/src/cache/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::Duration;

use super::list_files_cache::DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT;
pub use super::list_files_cache::{
DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL,
};

/// A cache for [`Statistics`].
///
Expand Down Expand Up @@ -76,6 +78,9 @@ pub trait ListFilesCache:

/// Updates the cache with a new memory limit in bytes.
fn update_cache_limit(&self, limit: usize);

/// Updates the cache with a new TTL (time-to-live).
fn update_cache_ttl(&self, ttl: Option<Duration>);
}

/// Generic file-embedded metadata used with [`FileMetadataCache`].
Expand Down Expand Up @@ -173,7 +178,15 @@ impl CacheManager {
let file_statistic_cache =
config.table_files_statistics_cache.as_ref().map(Arc::clone);

let list_files_cache = config.list_files_cache.as_ref().map(Arc::clone);
let list_files_cache = config
.list_files_cache
.as_ref()
.inspect(|c| {
// the cache memory limit or ttl might have changed, ensure they are updated
c.update_cache_limit(config.list_files_cache_limit);
c.update_cache_ttl(config.list_files_cache_ttl);
})
.map(Arc::clone);

let file_metadata_cache = config
.file_metadata_cache
Expand Down Expand Up @@ -262,7 +275,7 @@ impl Default for CacheManagerConfig {
table_files_statistics_cache: Default::default(),
list_files_cache: Default::default(),
list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
list_files_cache_ttl: None,
list_files_cache_ttl: DEFAULT_LIST_FILES_CACHE_TTL,
file_metadata_cache: Default::default(),
metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT,
}
Expand Down Expand Up @@ -303,8 +316,8 @@ impl CacheManagerConfig {
/// Sets the TTL (time-to-live) for entries in the list files cache.
///
/// Default: None (infinite).
pub fn with_list_files_cache_ttl(mut self, ttl: Duration) -> Self {
self.list_files_cache_ttl = Some(ttl);
pub fn with_list_files_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
self.list_files_cache_ttl = ttl;
self
}

Expand Down
13 changes: 11 additions & 2 deletions datafusion/execution/src/cache/list_files_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize {
}

/// The default memory limit for the [`DefaultListFilesCache`]
pub(super) const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB
pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB

/// The default cache TTL for the [`DefaultListFilesCache`]
pub const DEFAULT_LIST_FILES_CACHE_TTL: Option<Duration> = None; // Infinite

/// Handles the inner state of the [`DefaultListFilesCache`] struct.
pub struct DefaultListFilesCacheState {
Expand All @@ -157,7 +160,7 @@ impl Default for DefaultListFilesCacheState {
lru_queue: LruQueue::new(),
memory_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
memory_used: 0,
ttl: None,
ttl: DEFAULT_LIST_FILES_CACHE_TTL,
}
}
}
Expand Down Expand Up @@ -294,6 +297,12 @@ impl ListFilesCache for DefaultListFilesCache {
state.memory_limit = limit;
state.evict_entries();
}

fn update_cache_ttl(&self, ttl: Option<Duration>) {
let mut state = self.state.lock().unwrap();
state.ttl = ttl;
state.evict_entries();
}
}

impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
Expand Down
38 changes: 37 additions & 1 deletion datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ fn create_runtime_config_entries(
max_temp_directory_size: Option<String>,
temp_directory: Option<String>,
metadata_cache_limit: Option<String>,
list_files_cache_limit: Option<String>,
list_files_cache_ttl: Option<String>,
) -> Vec<ConfigEntry> {
vec![
ConfigEntry {
Expand All @@ -123,6 +125,16 @@ fn create_runtime_config_entries(
value: metadata_cache_limit,
description: "Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
},
ConfigEntry {
key: "datafusion.runtime.list_files_cache_limit".to_string(),
value: list_files_cache_limit,
description: "Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
},
ConfigEntry {
key: "datafusion.runtime.list_files_cache_ttl".to_string(),
value: list_files_cache_ttl,
description: "TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes.",
},
]
}

Expand Down Expand Up @@ -227,6 +239,14 @@ impl RuntimeEnv {
}
}

fn format_duration(duration: Duration) -> String {
let total = duration.as_secs();
let mins = total / 60;
let secs = total % 60;

format!("{mins}m{secs}s")
}

let memory_limit_value = match self.memory_pool.memory_limit() {
MemoryLimit::Finite(size) => Some(format_byte_size(
size.try_into()
Expand Down Expand Up @@ -259,11 +279,25 @@ impl RuntimeEnv {
.expect("Metadata cache size conversion failed"),
);

let list_files_cache_limit = self.cache_manager.get_list_files_cache_limit();
let list_files_cache_value = format_byte_size(
list_files_cache_limit
.try_into()
.expect("List files cache size conversion failed"),
);

let list_files_cache_ttl = self
.cache_manager
.get_list_files_cache_ttl()
.map(format_duration);

create_runtime_config_entries(
memory_limit_value,
Some(max_temp_dir_value),
temp_dir_value,
Some(metadata_cache_value),
Some(list_files_cache_value),
list_files_cache_ttl,
)
}
}
Expand Down Expand Up @@ -394,7 +428,7 @@ impl RuntimeEnvBuilder {
}

/// Specifies the duration entries in the object list cache will be considered valid.
pub fn with_object_list_cache_ttl(mut self, ttl: Duration) -> Self {
pub fn with_object_list_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
self.cache_manager = self.cache_manager.with_list_files_cache_ttl(ttl);
self
}
Expand Down Expand Up @@ -473,6 +507,8 @@ impl RuntimeEnvBuilder {
Some("100G".to_string()),
None,
Some("50M".to_owned()),
Some("1M".to_owned()),
None,
)
}

Expand Down
Loading