Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,14 @@ async fn main_inner() -> Result<()> {

// set disk limit
if let Some(disk_limit) = args.disk_limit {
let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
let mut disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be nicer would be if try_new didn't return an Arc -- but that is a more disruptive change so I think this solution improves things even if it is non ideal


let disk_manager = Arc::try_unwrap(disk_manager)
.expect("DiskManager should be a single instance")
.with_max_temp_directory_size(disk_limit.try_into().unwrap())?;
DiskManager::set_arc_max_temp_directory_size(
&mut disk_manager,
disk_limit.try_into().unwrap(),
)?;

let disk_config = DiskManagerConfig::new_existing(Arc::new(disk_manager));
let disk_config = DiskManagerConfig::new_existing(disk_manager);
rt_builder = rt_builder.with_disk_manager(disk_config);
}

Expand Down
8 changes: 3 additions & 5 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,11 +534,9 @@ async fn setup_context(
disk_limit: u64,
memory_pool_limit: usize,
) -> Result<SessionContext> {
let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
let mut disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;

let disk_manager = Arc::try_unwrap(disk_manager)
.expect("DiskManager should be a single instance")
.with_max_temp_directory_size(disk_limit)?;
DiskManager::set_arc_max_temp_directory_size(&mut disk_manager, disk_limit)?;

let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(memory_pool_limit)))
Expand All @@ -547,7 +545,7 @@ async fn setup_context(

let runtime = Arc::new(RuntimeEnv {
memory_pool: runtime.memory_pool.clone(),
disk_manager: Arc::new(disk_manager),
disk_manager,
cache_manager: runtime.cache_manager.clone(),
object_store_registry: runtime.object_store_registry.clone(),
});
Expand Down
26 changes: 23 additions & 3 deletions datafusion/execution/src/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ impl DiskManager {
}
}

pub fn with_max_temp_directory_size(
mut self,
pub fn set_max_temp_directory_size(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps can we add some comments / example to these functions?

&mut self,
max_temp_directory_size: u64,
) -> Result<Self> {
) -> Result<()> {
// If the disk manager is disabled and `max_temp_directory_size` is not 0,
// this operation is not meaningful, fail early.
if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
Expand All @@ -133,6 +133,26 @@ impl DiskManager {
}

self.max_temp_directory_size = max_temp_directory_size;
Ok(())
}

pub fn set_arc_max_temp_directory_size(
this: &mut Arc<Self>,
max_temp_directory_size: u64,
) -> Result<()> {
if let Some(inner) = Arc::get_mut(this) {
inner.set_max_temp_directory_size(max_temp_directory_size)?;
Ok(())
} else {
config_err!("DiskManager should be a single instance")
}
}

pub fn with_max_temp_directory_size(
mut self,
max_temp_directory_size: u64,
) -> Result<Self> {
self.set_max_temp_directory_size(max_temp_directory_size)?;
Ok(self)
}

Expand Down