diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 2702954e77830..0b7d498fa7a36 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -44,11 +44,13 @@ use datafusion_common::{assert_contains, Result}; use datafusion_execution::memory_pool::{ FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, }; -use datafusion_execution::TaskContext; +use datafusion_execution::runtime_env::RuntimeEnv; +use datafusion_execution::{DiskManager, TaskContext}; use datafusion_expr::{Expr, TableType}; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_optimizer::join_selection::JoinSelection; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::collect as collect_batches; use datafusion_physical_plan::common::collect; use datafusion_physical_plan::spill::get_record_batch_memory_size; use rand::Rng; @@ -524,6 +526,95 @@ async fn test_external_sort_zero_merge_reservation() { assert!(spill_count > 0); } +// Tests for disk limit (`max_temp_directory_size` in `DiskManager`) +// ------------------------------------------------------------------ + +// Create a new `SessionContext` with speicified disk limit and memory pool limit +async fn setup_context( + disk_limit: u64, + memory_pool_limit: usize, +) -> Result { + let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; + + let disk_manager = Arc::try_unwrap(disk_manager) + .expect("DiskManager should be a single instance") + .with_max_temp_directory_size(disk_limit)?; + + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(FairSpillPool::new(memory_pool_limit))) + .build_arc() + .unwrap(); + + let runtime = Arc::new(RuntimeEnv { + memory_pool: runtime.memory_pool.clone(), + disk_manager: Arc::new(disk_manager), + cache_manager: runtime.cache_manager.clone(), + object_store_registry: runtime.object_store_registry.clone(), + }); + + let config = SessionConfig::new() + .with_sort_spill_reservation_bytes(64 * 1024) // 256KB + .with_sort_in_place_threshold_bytes(0) + .with_batch_size(64) // To reduce test memory usage + .with_target_partitions(1); + + Ok(SessionContext::new_with_config_rt(config, runtime)) +} + +/// If the spilled bytes exceed the disk limit, the query should fail +/// (specified by `max_temp_directory_size` in `DiskManager`) +#[tokio::test] +async fn test_disk_spill_limit_reached() -> Result<()> { + let ctx = setup_context(1024 * 1024, 1024 * 1024).await?; // 1MB disk limit, 1MB memory limit + + let df = ctx + .sql("select * from generate_series(1, 1000000000000) as t1(v1) order by v1") + .await + .unwrap(); + + let err = df.collect().await.unwrap_err(); + assert_contains!( + err.to_string(), + "The used disk space during the spilling process has exceeded the allowable limit" + ); + + Ok(()) +} + +/// External query should succeed, if the spilled bytes is less than the disk limit +/// Also verify that after the query is finished, all the disk usage accounted by +/// tempfiles are cleaned up. +#[tokio::test] +async fn test_disk_spill_limit_not_reached() -> Result<()> { + let disk_spill_limit = 1024 * 1024; // 1MB + let ctx = setup_context(disk_spill_limit, 128 * 1024).await?; // 1MB disk limit, 128KB memory limit + + let df = ctx + .sql("select * from generate_series(1, 10000) as t1(v1) order by v1") + .await + .unwrap(); + let plan = df.create_physical_plan().await.unwrap(); + + let task_ctx = ctx.task_ctx(); + let _ = collect_batches(Arc::clone(&plan), task_ctx) + .await + .expect("Query execution failed"); + + let spill_count = plan.metrics().unwrap().spill_count().unwrap(); + let spilled_bytes = plan.metrics().unwrap().spilled_bytes().unwrap(); + + println!("spill count {}, spill bytes {}", spill_count, spilled_bytes); + assert!(spill_count > 0); + assert!((spilled_bytes as u64) < disk_spill_limit); + + // Verify that all temporary files have been properly cleaned up by checking + // that the total disk usage tracked by the disk manager is zero + let current_disk_usage = ctx.runtime_env().disk_manager.used_disk_space(); + assert_eq!(current_disk_usage, 0); + + Ok(()) +} + /// Run the query with the specified memory limit, /// and verifies the expected errors are returned #[derive(Clone, Debug)] diff --git a/datafusion/execution/src/disk_manager.rs b/datafusion/execution/src/disk_manager.rs index caa62eefe14c7..2b21a6dbf175f 100644 --- a/datafusion/execution/src/disk_manager.rs +++ b/datafusion/execution/src/disk_manager.rs @@ -17,14 +17,21 @@ //! [`DiskManager`]: Manages files generated during query execution -use datafusion_common::{resources_datafusion_err, DataFusionError, Result}; +use datafusion_common::{ + config_err, resources_datafusion_err, resources_err, DataFusionError, Result, +}; use log::debug; use parking_lot::Mutex; use rand::{thread_rng, Rng}; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tempfile::{Builder, NamedTempFile, TempDir}; +use crate::memory_pool::human_readable_size; + +const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB + /// Configuration for temporary disk access #[derive(Debug, Clone)] pub enum DiskManagerConfig { @@ -75,6 +82,12 @@ pub struct DiskManager { /// If `Some(vec![])` a new OS specified temporary directory will be created /// If `None` an error will be returned (configured not to spill) local_dirs: Mutex>>>, + /// The maximum amount of data (in bytes) stored inside the temporary directories. + /// Default to 100GB + max_temp_directory_size: u64, + /// Used disk space in the temporary directories. Now only spilled data for + /// external executors are counted. + used_disk_space: Arc, } impl DiskManager { @@ -84,6 +97,8 @@ impl DiskManager { DiskManagerConfig::Existing(manager) => Ok(manager), DiskManagerConfig::NewOs => Ok(Arc::new(Self { local_dirs: Mutex::new(Some(vec![])), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: Arc::new(AtomicU64::new(0)), })), DiskManagerConfig::NewSpecified(conf_dirs) => { let local_dirs = create_local_dirs(conf_dirs)?; @@ -93,14 +108,38 @@ impl DiskManager { ); Ok(Arc::new(Self { local_dirs: Mutex::new(Some(local_dirs)), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: Arc::new(AtomicU64::new(0)), })) } DiskManagerConfig::Disabled => Ok(Arc::new(Self { local_dirs: Mutex::new(None), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: Arc::new(AtomicU64::new(0)), })), } } + pub fn with_max_temp_directory_size( + mut self, + max_temp_directory_size: u64, + ) -> Result { + // If the disk manager is disabled and `max_temp_directory_size` is not 0, + // this operation is not meaningful, fail early. + if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 { + return config_err!( + "Cannot set max temp directory size for a disk manager that spilling is disabled" + ); + } + + self.max_temp_directory_size = max_temp_directory_size; + Ok(self) + } + + pub fn used_disk_space(&self) -> u64 { + self.used_disk_space.load(Ordering::Relaxed) + } + /// Return true if this disk manager supports creating temporary /// files. If this returns false, any call to `create_tmp_file` /// will error. @@ -113,7 +152,7 @@ impl DiskManager { /// If the file can not be created for some reason, returns an /// error message referencing the request description pub fn create_tmp_file( - &self, + self: &Arc, request_description: &str, ) -> Result { let mut guard = self.local_dirs.lock(); @@ -142,18 +181,31 @@ impl DiskManager { tempfile: Builder::new() .tempfile_in(local_dirs[dir_index].as_ref()) .map_err(DataFusionError::IoError)?, + current_file_disk_usage: 0, + disk_manager: Arc::clone(self), }) } } /// A wrapper around a [`NamedTempFile`] that also contains -/// a reference to its parent temporary directory +/// a reference to its parent temporary directory. +/// +/// # Note +/// After any modification to the underlying file (e.g., writing data to it), the caller +/// must invoke [`Self::update_disk_usage`] to update the global disk usage counter. +/// This ensures the disk manager can properly enforce usage limits configured by +/// [`DiskManager::with_max_temp_directory_size`]. #[derive(Debug)] pub struct RefCountedTempFile { /// The reference to the directory in which temporary files are created to ensure /// it is not cleaned up prior to the NamedTempFile _parent_temp_dir: Arc, tempfile: NamedTempFile, + /// Tracks the current disk usage of this temporary file. See + /// [`Self::update_disk_usage`] for more details. + current_file_disk_usage: u64, + /// The disk manager that created and manages this temporary file + disk_manager: Arc, } impl RefCountedTempFile { @@ -164,6 +216,50 @@ impl RefCountedTempFile { pub fn inner(&self) -> &NamedTempFile { &self.tempfile } + + /// Updates the global disk usage counter after modifications to the underlying file. + /// + /// # Errors + /// - Returns an error if the global disk usage exceeds the configured limit. + pub fn update_disk_usage(&mut self) -> Result<()> { + // Get new file size from OS + let metadata = self.tempfile.as_file().metadata()?; + let new_disk_usage = metadata.len(); + + // Update the global disk usage by: + // 1. Subtracting the old file size from the global counter + self.disk_manager + .used_disk_space + .fetch_sub(self.current_file_disk_usage, Ordering::Relaxed); + // 2. Adding the new file size to the global counter + self.disk_manager + .used_disk_space + .fetch_add(new_disk_usage, Ordering::Relaxed); + + // 3. Check if the updated global disk usage exceeds the configured limit + let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed); + if global_disk_usage > self.disk_manager.max_temp_directory_size { + return resources_err!( + "The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.", + human_readable_size(self.disk_manager.max_temp_directory_size as usize) + ); + } + + // 4. Update the local file size tracking + self.current_file_disk_usage = new_disk_usage; + + Ok(()) + } +} + +/// When the temporary file is dropped, subtract its disk usage from the disk manager's total +impl Drop for RefCountedTempFile { + fn drop(&mut self) { + // Subtract the current file's disk usage from the global counter + self.disk_manager + .used_disk_space + .fetch_sub(self.current_file_disk_usage, Ordering::Relaxed); + } } /// Setup local dirs by creating one new dir in each of the given dirs diff --git a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs index 8c1ed77559078..7617e0a22a504 100644 --- a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs +++ b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs @@ -49,7 +49,12 @@ impl InProgressSpillFile { } } - /// Appends a `RecordBatch` to the file, initializing the writer if necessary. + /// Appends a `RecordBatch` to the spill file, initializing the writer if necessary. + /// + /// # Errors + /// - Returns an error if the file is not active (has been finalized) + /// - Returns an error if appending would exceed the disk usage limit configured + /// by `max_temp_directory_size` in `DiskManager` pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<()> { if self.in_progress_file.is_none() { return Err(exec_datafusion_err!( @@ -70,6 +75,11 @@ impl InProgressSpillFile { } if let Some(writer) = &mut self.writer { let (spilled_rows, spilled_bytes) = writer.write(batch)?; + if let Some(in_progress_file) = &mut self.in_progress_file { + in_progress_file.update_disk_usage()?; + } else { + unreachable!() // Already checked inside current function + } // Update metrics self.spill_writer.metrics.spilled_bytes.add(spilled_bytes); diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index 4a8e293323f02..f2c6090f4bb0a 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -73,7 +73,10 @@ impl SpillManager { /// intended to incrementally write in-memory batches into the same spill file, /// use [`Self::create_in_progress_file`] instead. /// None is returned if no batches are spilled. - #[allow(dead_code)] // TODO: remove after change SMJ to use SpillManager + /// + /// # Errors + /// - Returns an error if spilling would exceed the disk usage limit configured + /// by `max_temp_directory_size` in `DiskManager` pub fn spill_record_batch_and_finish( &self, batches: &[RecordBatch], @@ -90,7 +93,10 @@ impl SpillManager { /// Refer to the documentation for [`Self::spill_record_batch_and_finish`]. This method /// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`. - #[allow(dead_code)] // TODO: remove after change aggregate to use SpillManager + /// + /// # Errors + /// - Returns an error if spilling would exceed the disk usage limit configured + /// by `max_temp_directory_size` in `DiskManager` pub fn spill_record_batch_by_size( &self, batch: &RecordBatch,