Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 92 additions & 1 deletion datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ use datafusion_common::{assert_contains, Result};
use datafusion_execution::memory_pool::{
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
};
use datafusion_execution::TaskContext;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::{DiskManager, TaskContext};
use datafusion_expr::{Expr, TableType};
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_optimizer::join_selection::JoinSelection;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::collect as collect_batches;
use datafusion_physical_plan::common::collect;
use datafusion_physical_plan::spill::get_record_batch_memory_size;
use rand::Rng;
Expand Down Expand Up @@ -524,6 +526,95 @@ async fn test_external_sort_zero_merge_reservation() {
assert!(spill_count > 0);
}

// Tests for disk limit (`max_temp_directory_size` in `DiskManager`)
// ------------------------------------------------------------------

// Create a new `SessionContext` with speicified disk limit and memory pool limit
async fn setup_context(
disk_limit: u64,
memory_pool_limit: usize,
) -> Result<SessionContext> {
let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;

let disk_manager = Arc::try_unwrap(disk_manager)
.expect("DiskManager should be a single instance")
.with_max_temp_directory_size(disk_limit)?;

let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(memory_pool_limit)))
.build_arc()
.unwrap();

let runtime = Arc::new(RuntimeEnv {
memory_pool: runtime.memory_pool.clone(),
disk_manager: Arc::new(disk_manager),
cache_manager: runtime.cache_manager.clone(),
object_store_registry: runtime.object_store_registry.clone(),
});

let config = SessionConfig::new()
.with_sort_spill_reservation_bytes(64 * 1024) // 256KB
.with_sort_in_place_threshold_bytes(0)
.with_batch_size(64) // To reduce test memory usage
.with_target_partitions(1);

Ok(SessionContext::new_with_config_rt(config, runtime))
}

/// If the spilled bytes exceed the disk limit, the query should fail
/// (specified by `max_temp_directory_size` in `DiskManager`)
#[tokio::test]
async fn test_disk_spill_limit_reached() -> Result<()> {
let ctx = setup_context(1024 * 1024, 1024 * 1024).await?; // 1MB disk limit, 1MB memory limit

let df = ctx
.sql("select * from generate_series(1, 1000000000000) as t1(v1) order by v1")
.await
.unwrap();

let err = df.collect().await.unwrap_err();
assert_contains!(
err.to_string(),
"The used disk space during the spilling process has exceeded the allowable limit"
);

Ok(())
}

/// External query should succeed, if the spilled bytes is less than the disk limit
/// Also verify that after the query is finished, all the disk usage accounted by
/// tempfiles are cleaned up.
#[tokio::test]
async fn test_disk_spill_limit_not_reached() -> Result<()> {
let disk_spill_limit = 1024 * 1024; // 1MB
let ctx = setup_context(disk_spill_limit, 128 * 1024).await?; // 1MB disk limit, 128KB memory limit

let df = ctx
.sql("select * from generate_series(1, 10000) as t1(v1) order by v1")
.await
.unwrap();
let plan = df.create_physical_plan().await.unwrap();

let task_ctx = ctx.task_ctx();
let _ = collect_batches(Arc::clone(&plan), task_ctx)
.await
.expect("Query execution failed");

let spill_count = plan.metrics().unwrap().spill_count().unwrap();
let spilled_bytes = plan.metrics().unwrap().spilled_bytes().unwrap();

println!("spill count {}, spill bytes {}", spill_count, spilled_bytes);
assert!(spill_count > 0);
assert!((spilled_bytes as u64) < disk_spill_limit);

// Verify that all temporary files have been properly cleaned up by checking
// that the total disk usage tracked by the disk manager is zero
let current_disk_usage = ctx.runtime_env().disk_manager.used_disk_space();
assert_eq!(current_disk_usage, 0);

Ok(())
}

/// Run the query with the specified memory limit,
/// and verifies the expected errors are returned
#[derive(Clone, Debug)]
Expand Down
102 changes: 99 additions & 3 deletions datafusion/execution/src/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@

//! [`DiskManager`]: Manages files generated during query execution

use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
use datafusion_common::{
config_err, resources_datafusion_err, resources_err, DataFusionError, Result,
};
use log::debug;
use parking_lot::Mutex;
use rand::{thread_rng, Rng};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tempfile::{Builder, NamedTempFile, TempDir};

use crate::memory_pool::human_readable_size;

const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB

/// Configuration for temporary disk access
#[derive(Debug, Clone)]
pub enum DiskManagerConfig {
Expand Down Expand Up @@ -75,6 +82,12 @@ pub struct DiskManager {
/// If `Some(vec![])` a new OS specified temporary directory will be created
/// If `None` an error will be returned (configured not to spill)
local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
/// The maximum amount of data (in bytes) stored inside the temporary directories.
/// Default to 100GB
max_temp_directory_size: u64,
/// Used disk space in the temporary directories. Now only spilled data for
/// external executors are counted.
used_disk_space: Arc<AtomicU64>,
}

impl DiskManager {
Expand All @@ -84,6 +97,8 @@ impl DiskManager {
DiskManagerConfig::Existing(manager) => Ok(manager),
DiskManagerConfig::NewOs => Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(vec![])),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: Arc::new(AtomicU64::new(0)),
})),
DiskManagerConfig::NewSpecified(conf_dirs) => {
let local_dirs = create_local_dirs(conf_dirs)?;
Expand All @@ -93,14 +108,38 @@ impl DiskManager {
);
Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(local_dirs)),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: Arc::new(AtomicU64::new(0)),
}))
}
DiskManagerConfig::Disabled => Ok(Arc::new(Self {
local_dirs: Mutex::new(None),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: Arc::new(AtomicU64::new(0)),
})),
}
}

pub fn with_max_temp_directory_size(
mut self,
max_temp_directory_size: u64,
) -> Result<Self> {
// If the disk manager is disabled and `max_temp_directory_size` is not 0,
// this operation is not meaningful, fail early.
if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
return config_err!(
"Cannot set max temp directory size for a disk manager that spilling is disabled"
);
}

self.max_temp_directory_size = max_temp_directory_size;
Ok(self)
}

pub fn used_disk_space(&self) -> u64 {
self.used_disk_space.load(Ordering::Relaxed)
}

/// Return true if this disk manager supports creating temporary
/// files. If this returns false, any call to `create_tmp_file`
/// will error.
Expand All @@ -113,7 +152,7 @@ impl DiskManager {
/// If the file can not be created for some reason, returns an
/// error message referencing the request description
pub fn create_tmp_file(
&self,
self: &Arc<Self>,
request_description: &str,
) -> Result<RefCountedTempFile> {
let mut guard = self.local_dirs.lock();
Expand Down Expand Up @@ -142,18 +181,31 @@ impl DiskManager {
tempfile: Builder::new()
.tempfile_in(local_dirs[dir_index].as_ref())
.map_err(DataFusionError::IoError)?,
current_file_disk_usage: 0,
disk_manager: Arc::clone(self),
})
}
}

/// A wrapper around a [`NamedTempFile`] that also contains
/// a reference to its parent temporary directory
/// a reference to its parent temporary directory.
///
/// # Note
/// After any modification to the underlying file (e.g., writing data to it), the caller
/// must invoke [`Self::update_disk_usage`] to update the global disk usage counter.
/// This ensures the disk manager can properly enforce usage limits configured by
/// [`DiskManager::with_max_temp_directory_size`].
#[derive(Debug)]
pub struct RefCountedTempFile {
/// The reference to the directory in which temporary files are created to ensure
/// it is not cleaned up prior to the NamedTempFile
_parent_temp_dir: Arc<TempDir>,
tempfile: NamedTempFile,
/// Tracks the current disk usage of this temporary file. See
/// [`Self::update_disk_usage`] for more details.
current_file_disk_usage: u64,
/// The disk manager that created and manages this temporary file
disk_manager: Arc<DiskManager>,
}

impl RefCountedTempFile {
Expand All @@ -164,6 +216,50 @@ impl RefCountedTempFile {
pub fn inner(&self) -> &NamedTempFile {
&self.tempfile
}

/// Updates the global disk usage counter after modifications to the underlying file.
///
/// # Errors
/// - Returns an error if the global disk usage exceeds the configured limit.
pub fn update_disk_usage(&mut self) -> Result<()> {
// Get new file size from OS
let metadata = self.tempfile.as_file().metadata()?;
let new_disk_usage = metadata.len();

// Update the global disk usage by:
// 1. Subtracting the old file size from the global counter
self.disk_manager
.used_disk_space
.fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
// 2. Adding the new file size to the global counter
self.disk_manager
.used_disk_space
.fetch_add(new_disk_usage, Ordering::Relaxed);

// 3. Check if the updated global disk usage exceeds the configured limit
let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed);
if global_disk_usage > self.disk_manager.max_temp_directory_size {
return resources_err!(
"The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.",
human_readable_size(self.disk_manager.max_temp_directory_size as usize)
);
}

// 4. Update the local file size tracking
self.current_file_disk_usage = new_disk_usage;

Ok(())
}
}

/// When the temporary file is dropped, subtract its disk usage from the disk manager's total
impl Drop for RefCountedTempFile {
fn drop(&mut self) {
// Subtract the current file's disk usage from the global counter
self.disk_manager
.used_disk_space
.fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
}
}

/// Setup local dirs by creating one new dir in each of the given dirs
Expand Down
12 changes: 11 additions & 1 deletion datafusion/physical-plan/src/spill/in_progress_spill_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ impl InProgressSpillFile {
}
}

/// Appends a `RecordBatch` to the file, initializing the writer if necessary.
/// Appends a `RecordBatch` to the spill file, initializing the writer if necessary.
///
/// # Errors
/// - Returns an error if the file is not active (has been finalized)
/// - Returns an error if appending would exceed the disk usage limit configured
/// by `max_temp_directory_size` in `DiskManager`
pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<()> {
if self.in_progress_file.is_none() {
return Err(exec_datafusion_err!(
Expand All @@ -70,6 +75,11 @@ impl InProgressSpillFile {
}
if let Some(writer) = &mut self.writer {
let (spilled_rows, spilled_bytes) = writer.write(batch)?;
if let Some(in_progress_file) = &mut self.in_progress_file {
in_progress_file.update_disk_usage()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is quite nice that this is encapsulated as part of InProgressSpillFile

} else {
unreachable!() // Already checked inside current function
}

// Update metrics
self.spill_writer.metrics.spilled_bytes.add(spilled_bytes);
Expand Down
10 changes: 8 additions & 2 deletions datafusion/physical-plan/src/spill/spill_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ impl SpillManager {
/// intended to incrementally write in-memory batches into the same spill file,
/// use [`Self::create_in_progress_file`] instead.
/// None is returned if no batches are spilled.
#[allow(dead_code)] // TODO: remove after change SMJ to use SpillManager
///
/// # Errors
/// - Returns an error if spilling would exceed the disk usage limit configured
/// by `max_temp_directory_size` in `DiskManager`
pub fn spill_record_batch_and_finish(
&self,
batches: &[RecordBatch],
Expand All @@ -90,7 +93,10 @@ impl SpillManager {

/// Refer to the documentation for [`Self::spill_record_batch_and_finish`]. This method
/// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`.
#[allow(dead_code)] // TODO: remove after change aggregate to use SpillManager
///
/// # Errors
/// - Returns an error if spilling would exceed the disk usage limit configured
/// by `max_temp_directory_size` in `DiskManager`
pub fn spill_record_batch_by_size(
&self,
batch: &RecordBatch,
Expand Down