diff --git a/datafusion/execution/src/disk_manager.rs b/datafusion/execution/src/disk_manager.rs index cb87053d8d035..d878fdcf66a4c 100644 --- a/datafusion/execution/src/disk_manager.rs +++ b/datafusion/execution/src/disk_manager.rs @@ -25,7 +25,7 @@ use parking_lot::Mutex; use rand::{Rng, rng}; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use tempfile::{Builder, NamedTempFile, TempDir}; use datafusion_common::human_readable_size; @@ -77,6 +77,7 @@ impl DiskManagerBuilder { local_dirs: Mutex::new(Some(vec![])), max_temp_directory_size: self.max_temp_directory_size, used_disk_space: Arc::new(AtomicU64::new(0)), + active_files_count: Arc::new(AtomicUsize::new(0)), }), DiskManagerMode::Directories(conf_dirs) => { let local_dirs = create_local_dirs(&conf_dirs)?; @@ -87,12 +88,14 @@ impl DiskManagerBuilder { local_dirs: Mutex::new(Some(local_dirs)), max_temp_directory_size: self.max_temp_directory_size, used_disk_space: Arc::new(AtomicU64::new(0)), + active_files_count: Arc::new(AtomicUsize::new(0)), }) } DiskManagerMode::Disabled => Ok(DiskManager { local_dirs: Mutex::new(None), max_temp_directory_size: self.max_temp_directory_size, used_disk_space: Arc::new(AtomicU64::new(0)), + active_files_count: Arc::new(AtomicUsize::new(0)), }), } } @@ -169,6 +172,17 @@ pub struct DiskManager { /// Used disk space in the temporary directories. Now only spilled data for /// external executors are counted. used_disk_space: Arc, + /// Number of active temporary files created by this disk manager + active_files_count: Arc, +} + +/// Information about the current disk usage for spilling +#[derive(Debug, Clone, Copy)] +pub struct SpillingProgress { + /// Total bytes currently used on disk for spilling + pub current_bytes: u64, + /// Total number of active spill files + pub active_files_count: usize, } impl DiskManager { @@ -187,6 +201,7 @@ impl DiskManager { local_dirs: Mutex::new(Some(vec![])), max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, used_disk_space: Arc::new(AtomicU64::new(0)), + active_files_count: Arc::new(AtomicUsize::new(0)), })), DiskManagerConfig::NewSpecified(conf_dirs) => { let local_dirs = create_local_dirs(&conf_dirs)?; @@ -197,12 +212,14 @@ impl DiskManager { local_dirs: Mutex::new(Some(local_dirs)), max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, used_disk_space: Arc::new(AtomicU64::new(0)), + active_files_count: Arc::new(AtomicUsize::new(0)), })) } DiskManagerConfig::Disabled => Ok(Arc::new(Self { local_dirs: Mutex::new(None), max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, used_disk_space: Arc::new(AtomicU64::new(0)), + active_files_count: Arc::new(AtomicUsize::new(0)), })), } } @@ -252,6 +269,14 @@ impl DiskManager { self.max_temp_directory_size } + /// Returns the current spilling progress + pub fn spilling_progress(&self) -> SpillingProgress { + SpillingProgress { + current_bytes: self.used_disk_space.load(Ordering::Relaxed), + active_files_count: self.active_files_count.load(Ordering::Relaxed), + } + } + /// Returns the temporary directory paths pub fn temp_dir_paths(&self) -> Vec { self.local_dirs @@ -301,6 +326,7 @@ impl DiskManager { } let dir_index = rng().random_range(0..local_dirs.len()); + self.active_files_count.fetch_add(1, Ordering::Relaxed); Ok(RefCountedTempFile { parent_temp_dir: Arc::clone(&local_dirs[dir_index]), tempfile: Arc::new( @@ -422,6 +448,9 @@ impl Drop for RefCountedTempFile { self.disk_manager .used_disk_space .fetch_sub(current_usage, Ordering::Relaxed); + self.disk_manager + .active_files_count + .fetch_sub(1, Ordering::Relaxed); } } } diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 67398d59f1374..67604c424c766 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -19,7 +19,7 @@ //! store, memory manager, disk manager. #[expect(deprecated)] -use crate::disk_manager::DiskManagerConfig; +use crate::disk_manager::{DiskManagerConfig, SpillingProgress}; use crate::{ disk_manager::{DiskManager, DiskManagerBuilder, DiskManagerMode}, memory_pool::{ @@ -199,6 +199,11 @@ impl RuntimeEnv { self.object_store_registry.get_store(url.as_ref()) } + /// Returns the current spilling progress + pub fn spilling_progress(&self) -> SpillingProgress { + self.disk_manager.spilling_progress() + } + /// Register an [`EncryptionFactory`] with an associated identifier that can be later /// used to configure encryption when reading or writing Parquet. /// If an encryption factory with the same identifier was already registered, it is replaced and returned. diff --git a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs index d2acf4993b857..0ad7aabf64954 100644 --- a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs +++ b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs @@ -63,7 +63,7 @@ impl InProgressSpillFile { } if self.writer.is_none() { let schema = batch.schema(); - if let Some(ref in_progress_file) = self.in_progress_file { + if let Some(in_progress_file) = &mut self.in_progress_file { self.writer = Some(IPCStreamWriter::new( in_progress_file.path(), schema.as_ref(), @@ -72,18 +72,31 @@ impl InProgressSpillFile { // Update metrics self.spill_writer.metrics.spill_file_count.add(1); + + // Update initial size (schema/header) + in_progress_file.update_disk_usage()?; + let initial_size = in_progress_file.current_disk_usage(); + self.spill_writer + .metrics + .spilled_bytes + .add(initial_size as usize); } } if let Some(writer) = &mut self.writer { let (spilled_rows, _) = writer.write(batch)?; if let Some(in_progress_file) = &mut self.in_progress_file { + let pre_size = in_progress_file.current_disk_usage(); in_progress_file.update_disk_usage()?; + let post_size = in_progress_file.current_disk_usage(); + + self.spill_writer.metrics.spilled_rows.add(spilled_rows); + self.spill_writer + .metrics + .spilled_bytes + .add((post_size - pre_size) as usize); } else { unreachable!() // Already checked inside current function } - - // Update metrics - self.spill_writer.metrics.spilled_rows.add(spilled_rows); } Ok(()) } @@ -106,9 +119,13 @@ impl InProgressSpillFile { // Since spill files are append-only, add the file size to spilled_bytes if let Some(in_progress_file) = &mut self.in_progress_file { // Since writer.finish() writes continuation marker and message length at the end + let pre_size = in_progress_file.current_disk_usage(); in_progress_file.update_disk_usage()?; - let size = in_progress_file.current_disk_usage(); - self.spill_writer.metrics.spilled_bytes.add(size as usize); + let post_size = in_progress_file.current_disk_usage(); + self.spill_writer + .metrics + .spilled_bytes + .add((post_size - pre_size) as usize); } Ok(self.in_progress_file.take()) diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index 78dea99ac820c..166805a337349 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -685,13 +685,13 @@ mod tests { Arc::new(StringArray::from(vec!["d", "e", "f"])), ], )?; - // After appending each batch, spilled_rows should increase, while spill_file_count and - // spilled_bytes remain the same (spilled_bytes is updated only after finish() is called) + // After appending each batch, spilled_rows and spilled_bytes should increase incrementally, + // while spill_file_count remains 1 (since we're writing to the same file) in_progress_file.append_batch(&batch1)?; - verify_metrics(&in_progress_file, 1, 0, 3)?; + verify_metrics(&in_progress_file, 1, 440, 3)?; in_progress_file.append_batch(&batch2)?; - verify_metrics(&in_progress_file, 1, 0, 6)?; + verify_metrics(&in_progress_file, 1, 704, 6)?; let completed_file = in_progress_file.finish()?; assert!(completed_file.is_some()); @@ -799,4 +799,70 @@ mod tests { assert_eq!(alignment, 8); Ok(()) } + #[tokio::test] + async fn test_real_time_spill_metrics() -> Result<()> { + let env = Arc::new(RuntimeEnv::default()); + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ])); + + let spill_manager = Arc::new(SpillManager::new( + Arc::clone(&env), + metrics.clone(), + Arc::clone(&schema), + )); + let mut in_progress_file = spill_manager.create_in_progress_file("Test")?; + + let batch1 = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ], + )?; + + // Before any batch, metrics should be 0 + assert_eq!(metrics.spilled_bytes.value(), 0); + assert_eq!(metrics.spill_file_count.value(), 0); + + // Append first batch + in_progress_file.append_batch(&batch1)?; + + // Metrics should be updated immediately (at least schema and first batch) + let bytes_after_batch1 = metrics.spilled_bytes.value(); + assert_eq!(bytes_after_batch1, 440); + assert_eq!(metrics.spill_file_count.value(), 1); + + // Check global progress + let progress = env.spilling_progress(); + assert_eq!(progress.current_bytes, bytes_after_batch1 as u64); + assert_eq!(progress.active_files_count, 1); + + // Append another batch + in_progress_file.append_batch(&batch1)?; + let bytes_after_batch2 = metrics.spilled_bytes.value(); + assert!(bytes_after_batch2 > bytes_after_batch1); + + // Check global progress again + let progress = env.spilling_progress(); + assert_eq!(progress.current_bytes, bytes_after_batch2 as u64); + + // Finish the file + let spilled_file = in_progress_file.finish()?; + let final_bytes = metrics.spilled_bytes.value(); + assert!(final_bytes > bytes_after_batch2); + + // Even after finish, file is still "active" until dropped + let progress = env.spilling_progress(); + assert!(progress.current_bytes > 0); + assert_eq!(progress.active_files_count, 1); + + drop(spilled_file); + assert_eq!(env.spilling_progress().active_files_count, 0); + assert_eq!(env.spilling_progress().current_bytes, 0); + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/spill/spill_pool.rs b/datafusion/physical-plan/src/spill/spill_pool.rs index e3b547b5731f3..8f7f5212f6c91 100644 --- a/datafusion/physical-plan/src/spill/spill_pool.rs +++ b/datafusion/physical-plan/src/spill/spill_pool.rs @@ -879,8 +879,8 @@ mod tests { ); assert_eq!( metrics.spilled_bytes.value(), - 0, - "Spilled bytes should be 0 before file finalization" + 320, + "Spilled bytes should reflect data written (header + 1 batch)" ); assert_eq!( metrics.spilled_rows.value(), @@ -1300,11 +1300,11 @@ mod tests { writer.push_batch(&batch)?; } - // Check metrics before drop - spilled_bytes should be 0 since file isn't finalized yet + // Check metrics before drop - spilled_bytes already reflects written data let spilled_bytes_before = metrics.spilled_bytes.value(); assert_eq!( - spilled_bytes_before, 0, - "Spilled bytes should be 0 before writer is dropped" + spilled_bytes_before, 1088, + "Spilled bytes should reflect data written (header + 5 batches)" ); // Explicitly drop the writer - this should finalize the current file