Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1055,24 +1055,30 @@ impl SessionContext {
fn set_runtime_variable(&self, variable: &str, value: &str) -> Result<()> {
let key = variable.strip_prefix("datafusion.runtime.").unwrap();

match key {
let mut state = self.state.write();

let mut builder = RuntimeEnvBuilder::from_runtime_env(state.runtime_env());
builder = match key {
"memory_limit" => {
let memory_limit = Self::parse_memory_limit(value)?;

let mut state = self.state.write();
let mut builder =
RuntimeEnvBuilder::from_runtime_env(state.runtime_env());
builder = builder.with_memory_limit(memory_limit, 1.0);
*state = SessionStateBuilder::from(state.clone())
.with_runtime_env(Arc::new(builder.build()?))
.build();
builder.with_memory_limit(memory_limit, 1.0)
}
"max_temp_directory_size" => {
let directory_size = Self::parse_memory_limit(value)?;
builder.with_max_temp_directory_size(directory_size as u64)
}
"temp_directory" => builder.with_temp_file_path(value),
_ => {
return Err(DataFusionError::Plan(format!(
"Unknown runtime configuration: {variable}"
)))
}
}
};

*state = SessionStateBuilder::from(state.clone())
.with_runtime_env(Arc::new(builder.build()?))
.build();

Ok(())
}

Expand Down
48 changes: 48 additions & 0 deletions datafusion/core/tests/sql/runtime_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,54 @@ async fn test_invalid_memory_limit() {
assert!(error_message.contains("Unsupported unit 'X'"));
}

#[tokio::test]
async fn test_max_temp_directory_size_enforcement() {
let ctx = SessionContext::new();

ctx.sql("SET datafusion.runtime.memory_limit = '1M'")
.await
.unwrap()
.collect()
.await
.unwrap();

ctx.sql("SET datafusion.execution.sort_spill_reservation_bytes = 0")
.await
.unwrap()
.collect()
.await
.unwrap();

ctx.sql("SET datafusion.runtime.max_temp_directory_size = '0K'")
.await
.unwrap()
.collect()
.await
.unwrap();

let query = "select * from generate_series(1,100000) as t1(v1) order by v1;";
let result = ctx.sql(query).await.unwrap().collect().await;

assert!(
result.is_err(),
"Should fail due to max temp directory size limit"
);

ctx.sql("SET datafusion.runtime.max_temp_directory_size = '1M'")
.await
.unwrap()
.collect()
.await
.unwrap();

let result = ctx.sql(query).await.unwrap().collect().await;

assert!(
result.is_ok(),
"Should not fail due to max temp directory size limit"
);
}

#[tokio::test]
async fn test_unknown_runtime_config() {
let ctx = SessionContext::new();
Expand Down
35 changes: 26 additions & 9 deletions datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,19 @@ impl RuntimeEnvBuilder {
}

/// Use the specified path to create any needed temporary files
pub fn with_temp_file_path(self, path: impl Into<PathBuf>) -> Self {
pub fn with_temp_file_path(mut self, path: impl Into<PathBuf>) -> Self {
let builder = self.disk_manager_builder.take().unwrap_or_default();
self.with_disk_manager_builder(
DiskManagerBuilder::default()
.with_mode(DiskManagerMode::Directories(vec![path.into()])),
builder.with_mode(DiskManagerMode::Directories(vec![path.into()])),
)
}

/// Specify a limit on the size of the temporary file directory in bytes
pub fn with_max_temp_directory_size(mut self, size: u64) -> Self {
let builder = self.disk_manager_builder.take().unwrap_or_default();
self.with_disk_manager_builder(builder.with_max_temp_directory_size(size))
}

/// Build a RuntimeEnv
pub fn build(self) -> Result<RuntimeEnv> {
let Self {
Expand Down Expand Up @@ -315,12 +321,23 @@ impl RuntimeEnvBuilder {

/// Returns a list of all available runtime configurations with their current values and descriptions
pub fn entries(&self) -> Vec<ConfigEntry> {
// Memory pool configuration
vec![ConfigEntry {
key: "datafusion.runtime.memory_limit".to_string(),
value: None, // Default is system-dependent
description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
}]
vec![
ConfigEntry {
key: "datafusion.runtime.memory_limit".to_string(),
value: None, // Default is system-dependent
description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
},
ConfigEntry {
key: "datafusion.runtime.max_temp_directory_size".to_string(),
value: Some("100G".to_string()),
description: "Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
},
ConfigEntry {
key: "datafusion.runtime.temp_directory".to_string(),
value: None, // Default is system-dependent
description: "The path to the temporary file directory.",
}
]
}

/// Generate documentation that can be included in the user guide
Expand Down
8 changes: 5 additions & 3 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ SET datafusion.runtime.memory_limit = '2G';

The following runtime configuration settings are available:

| key | default | description |
| ------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------- |
| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. |
| key | default | description |
| ------------------------------------------ | ------- | ------------------------------------------------------------------------------------------------------------------------------------------- |
| datafusion.runtime.max_temp_directory_size | 100G | Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. |
| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. |
| datafusion.runtime.temp_directory | NULL | The path to the temporary file directory. |