From 8ce3b5d015dd6ee1781c89fd8ddd87e877806f41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9mie=20Drouet?= Date: Fri, 2 May 2025 11:11:21 +0200 Subject: [PATCH 1/4] feat: create helpers to set the max_temp_directory_size MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérémie Drouet --- datafusion/execution/src/disk_manager.rs | 28 +++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/datafusion/execution/src/disk_manager.rs b/datafusion/execution/src/disk_manager.rs index 2b21a6dbf175f..5eaa92f6cf953 100644 --- a/datafusion/execution/src/disk_manager.rs +++ b/datafusion/execution/src/disk_manager.rs @@ -120,10 +120,10 @@ impl DiskManager { } } - pub fn with_max_temp_directory_size( - mut self, + pub fn set_max_temp_directory_size( + &mut self, max_temp_directory_size: u64, - ) -> Result { + ) -> Result<()> { // If the disk manager is disabled and `max_temp_directory_size` is not 0, // this operation is not meaningful, fail early. if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 { @@ -133,6 +133,28 @@ impl DiskManager { } self.max_temp_directory_size = max_temp_directory_size; + Ok(()) + } + + pub fn set_arc_max_temp_directory_size( + this: &mut Arc, + max_temp_directory_size: u64, + ) -> Result<()> { + if let Some(inner) = Arc::get_mut(this) { + inner.set_max_temp_directory_size(max_temp_directory_size)?; + Ok(()) + } else { + return config_err!( + "Cannot set max temp directory size for a disk manager already in used" + ); + } + } + + pub fn with_max_temp_directory_size( + mut self, + max_temp_directory_size: u64, + ) -> Result { + self.set_max_temp_directory_size(max_temp_directory_size)?; Ok(self) } From 74416044a33ff613b94d4c1f57af1193a9379eb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9mie=20Drouet?= Date: Fri, 2 May 2025 11:20:51 +0200 Subject: [PATCH 2/4] refactor: use helper in cli MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérémie Drouet --- datafusion-cli/src/main.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index dad2d15f01a11..b37c1fa87ff86 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -177,13 +177,14 @@ async fn main_inner() -> Result<()> { // set disk limit if let Some(disk_limit) = args.disk_limit { - let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; + let mut disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; - let disk_manager = Arc::try_unwrap(disk_manager) - .expect("DiskManager should be a single instance") - .with_max_temp_directory_size(disk_limit.try_into().unwrap())?; + DiskManager::set_arc_max_temp_directory_size( + &mut disk_manager, + disk_limit.try_into().unwrap(), + )?; - let disk_config = DiskManagerConfig::new_existing(Arc::new(disk_manager)); + let disk_config = DiskManagerConfig::new_existing(disk_manager); rt_builder = rt_builder.with_disk_manager(disk_config); } From 66b2ee2b820d56ad27a59655d5d3895cac839695 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9mie=20Drouet?= Date: Fri, 2 May 2025 11:21:36 +0200 Subject: [PATCH 3/4] refactor: update error message MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérémie Drouet --- datafusion/execution/src/disk_manager.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/execution/src/disk_manager.rs b/datafusion/execution/src/disk_manager.rs index 5eaa92f6cf953..555b44ad5ceb5 100644 --- a/datafusion/execution/src/disk_manager.rs +++ b/datafusion/execution/src/disk_manager.rs @@ -144,9 +144,7 @@ impl DiskManager { inner.set_max_temp_directory_size(max_temp_directory_size)?; Ok(()) } else { - return config_err!( - "Cannot set max temp directory size for a disk manager already in used" - ); + config_err!("DiskManager should be a single instance") } } From 8a6148b6c9b528ea9ca0176bbaf8765d1b0737bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9mie=20Drouet?= Date: Fri, 2 May 2025 11:23:38 +0200 Subject: [PATCH 4/4] refactor: use setter in tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérémie Drouet --- datafusion/core/tests/memory_limit/mod.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 01342d1604fca..c8beedb8c2c99 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -534,11 +534,9 @@ async fn setup_context( disk_limit: u64, memory_pool_limit: usize, ) -> Result { - let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; + let mut disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; - let disk_manager = Arc::try_unwrap(disk_manager) - .expect("DiskManager should be a single instance") - .with_max_temp_directory_size(disk_limit)?; + DiskManager::set_arc_max_temp_directory_size(&mut disk_manager, disk_limit)?; let runtime = RuntimeEnvBuilder::new() .with_memory_pool(Arc::new(FairSpillPool::new(memory_pool_limit))) @@ -547,7 +545,7 @@ async fn setup_context( let runtime = Arc::new(RuntimeEnv { memory_pool: runtime.memory_pool.clone(), - disk_manager: Arc::new(disk_manager), + disk_manager, cache_manager: runtime.cache_manager.clone(), object_store_registry: runtime.object_store_registry.clone(), });