diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index ea8850d3b66cc..32231e583fb81 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1055,24 +1055,30 @@ impl SessionContext { fn set_runtime_variable(&self, variable: &str, value: &str) -> Result<()> { let key = variable.strip_prefix("datafusion.runtime.").unwrap(); - match key { + let mut state = self.state.write(); + + let mut builder = RuntimeEnvBuilder::from_runtime_env(state.runtime_env()); + builder = match key { "memory_limit" => { let memory_limit = Self::parse_memory_limit(value)?; - - let mut state = self.state.write(); - let mut builder = - RuntimeEnvBuilder::from_runtime_env(state.runtime_env()); - builder = builder.with_memory_limit(memory_limit, 1.0); - *state = SessionStateBuilder::from(state.clone()) - .with_runtime_env(Arc::new(builder.build()?)) - .build(); + builder.with_memory_limit(memory_limit, 1.0) } + "max_temp_directory_size" => { + let directory_size = Self::parse_memory_limit(value)?; + builder.with_max_temp_directory_size(directory_size as u64) + } + "temp_directory" => builder.with_temp_file_path(value), _ => { return Err(DataFusionError::Plan(format!( "Unknown runtime configuration: {variable}" ))) } - } + }; + + *state = SessionStateBuilder::from(state.clone()) + .with_runtime_env(Arc::new(builder.build()?)) + .build(); + Ok(()) } diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index 18e07bb61ed94..b05c36e335f3d 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -152,6 +152,54 @@ async fn test_invalid_memory_limit() { assert!(error_message.contains("Unsupported unit 'X'")); } +#[tokio::test] +async fn test_max_temp_directory_size_enforcement() { + let ctx = SessionContext::new(); + + ctx.sql("SET datafusion.runtime.memory_limit = '1M'") + .await + .unwrap() + .collect() + .await + .unwrap(); + + ctx.sql("SET datafusion.execution.sort_spill_reservation_bytes = 0") + .await + .unwrap() + .collect() + .await + .unwrap(); + + ctx.sql("SET datafusion.runtime.max_temp_directory_size = '0K'") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let query = "select * from generate_series(1,100000) as t1(v1) order by v1;"; + let result = ctx.sql(query).await.unwrap().collect().await; + + assert!( + result.is_err(), + "Should fail due to max temp directory size limit" + ); + + ctx.sql("SET datafusion.runtime.max_temp_directory_size = '1M'") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let result = ctx.sql(query).await.unwrap().collect().await; + + assert!( + result.is_ok(), + "Should not fail due to max temp directory size limit" + ); +} + #[tokio::test] async fn test_unknown_runtime_config() { let ctx = SessionContext::new(); diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index b086430a4ef71..70b0f0a831704 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -255,13 +255,19 @@ impl RuntimeEnvBuilder { } /// Use the specified path to create any needed temporary files - pub fn with_temp_file_path(self, path: impl Into) -> Self { + pub fn with_temp_file_path(mut self, path: impl Into) -> Self { + let builder = self.disk_manager_builder.take().unwrap_or_default(); self.with_disk_manager_builder( - DiskManagerBuilder::default() - .with_mode(DiskManagerMode::Directories(vec![path.into()])), + builder.with_mode(DiskManagerMode::Directories(vec![path.into()])), ) } + /// Specify a limit on the size of the temporary file directory in bytes + pub fn with_max_temp_directory_size(mut self, size: u64) -> Self { + let builder = self.disk_manager_builder.take().unwrap_or_default(); + self.with_disk_manager_builder(builder.with_max_temp_directory_size(size)) + } + /// Build a RuntimeEnv pub fn build(self) -> Result { let Self { @@ -315,12 +321,23 @@ impl RuntimeEnvBuilder { /// Returns a list of all available runtime configurations with their current values and descriptions pub fn entries(&self) -> Vec { - // Memory pool configuration - vec![ConfigEntry { - key: "datafusion.runtime.memory_limit".to_string(), - value: None, // Default is system-dependent - description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", - }] + vec![ + ConfigEntry { + key: "datafusion.runtime.memory_limit".to_string(), + value: None, // Default is system-dependent + description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", + }, + ConfigEntry { + key: "datafusion.runtime.max_temp_directory_size".to_string(), + value: Some("100G".to_string()), + description: "Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", + }, + ConfigEntry { + key: "datafusion.runtime.temp_directory".to_string(), + value: None, // Default is system-dependent + description: "The path to the temporary file directory.", + } + ] } /// Generate documentation that can be included in the user guide diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 96b7ee672bdb6..890184ed291f6 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -158,6 +158,8 @@ SET datafusion.runtime.memory_limit = '2G'; The following runtime configuration settings are available: -| key | default | description | -| ------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------- | -| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| key | default | description | +| ------------------------------------------ | ------- | ------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.runtime.max_temp_directory_size | 100G | Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.temp_directory | NULL | The path to the temporary file directory. |