-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat: Add config max_temp_directory_size to limit max disk usage for spilling queries
#14975
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,14 +17,26 @@ | |
|
|
||
| //! [`DiskManager`]: Manages files generated during query execution | ||
|
|
||
| use datafusion_common::{resources_datafusion_err, DataFusionError, Result}; | ||
| use arrow::array::RecordBatch; | ||
| use arrow::datatypes::Schema; | ||
| use arrow::ipc::writer::StreamWriter; | ||
| use datafusion_common::{ | ||
| config_err, exec_datafusion_err, internal_err, resources_datafusion_err, | ||
| resources_err, DataFusionError, Result, | ||
| }; | ||
| use log::debug; | ||
| use parking_lot::Mutex; | ||
| use rand::{thread_rng, Rng}; | ||
| use std::fs::File; | ||
| use std::path::{Path, PathBuf}; | ||
| use std::sync::Arc; | ||
| use tempfile::{Builder, NamedTempFile, TempDir}; | ||
|
|
||
| use crate::memory_pool::human_readable_size; | ||
| use crate::metrics::{Count, SpillMetrics}; | ||
|
|
||
| const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB | ||
|
|
||
| /// Configuration for temporary disk access | ||
| #[derive(Debug, Clone)] | ||
| pub enum DiskManagerConfig { | ||
|
|
@@ -75,6 +87,14 @@ pub struct DiskManager { | |
| /// If `Some(vec![])` a new OS specified temporary directory will be created | ||
| /// If `None` an error will be returned (configured not to spill) | ||
| local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>, | ||
|
|
||
| /// The maximum amount of data (in bytes) stored inside the temporary directories. | ||
| /// Default to 100GB | ||
| max_temp_directory_size: u64, | ||
|
|
||
| /// Used disk space in the temporary directories. Now only spilled data for | ||
| /// external executors are counted. | ||
| used_disk_space: Count, | ||
| } | ||
|
|
||
| impl DiskManager { | ||
|
|
@@ -84,6 +104,8 @@ impl DiskManager { | |
| DiskManagerConfig::Existing(manager) => Ok(manager), | ||
| DiskManagerConfig::NewOs => Ok(Arc::new(Self { | ||
| local_dirs: Mutex::new(Some(vec![])), | ||
| max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, | ||
| used_disk_space: Count::default(), | ||
| })), | ||
| DiskManagerConfig::NewSpecified(conf_dirs) => { | ||
| let local_dirs = create_local_dirs(conf_dirs)?; | ||
|
|
@@ -93,14 +115,67 @@ impl DiskManager { | |
| ); | ||
| Ok(Arc::new(Self { | ||
| local_dirs: Mutex::new(Some(local_dirs)), | ||
| max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, | ||
| used_disk_space: Count::default(), | ||
| })) | ||
| } | ||
| DiskManagerConfig::Disabled => Ok(Arc::new(Self { | ||
| local_dirs: Mutex::new(None), | ||
| max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, | ||
| used_disk_space: Count::default(), | ||
| })), | ||
| } | ||
| } | ||
|
|
||
| pub fn try_new_without_arc(config: DiskManagerConfig) -> Result<Self> { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand the need for this function and find the name confusing. It seems like the only difference is that it will error if it doesn't have exclusive access to the disk manager. I have an alternate suggestion above |
||
| match config { | ||
| DiskManagerConfig::Existing(manager) => { | ||
| Arc::try_unwrap(manager).map_err(|_| { | ||
| DataFusionError::Internal("Failed to unwrap Arc".to_string()) | ||
| }) | ||
| } | ||
| DiskManagerConfig::NewOs => Ok(Self { | ||
| local_dirs: Mutex::new(Some(vec![])), | ||
| max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, | ||
| used_disk_space: Count::default(), | ||
| }), | ||
| DiskManagerConfig::NewSpecified(conf_dirs) => { | ||
| let local_dirs = create_local_dirs(conf_dirs)?; | ||
| debug!( | ||
| "Created local dirs {:?} as DataFusion working directory", | ||
| local_dirs | ||
| ); | ||
| Ok(Self { | ||
| local_dirs: Mutex::new(Some(local_dirs)), | ||
| max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, | ||
| used_disk_space: Count::default(), | ||
| }) | ||
| } | ||
| DiskManagerConfig::Disabled => Ok(Self { | ||
| local_dirs: Mutex::new(None), | ||
| max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, | ||
| used_disk_space: Count::default(), | ||
| }), | ||
| } | ||
| } | ||
|
|
||
| /// Set the maximum amount of data (in bytes) stored inside the temporary directories. | ||
| pub fn with_max_temp_directory_size( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expect that this function should be on the Then you won't have to add the new |
||
| mut self, | ||
| max_temp_directory_size: u64, | ||
| ) -> Result<Self> { | ||
| // If the disk manager is disabled and `max_temp_directory_size` is not 0, | ||
| // this operation is not meaningful, fail early. | ||
| if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 { | ||
| return config_err!( | ||
| "Cannot set max temp directory size for disabled disk manager" | ||
| ); | ||
| } | ||
|
|
||
| self.max_temp_directory_size = max_temp_directory_size; | ||
| Ok(self) | ||
| } | ||
|
|
||
| /// Return true if this disk manager supports creating temporary | ||
| /// files. If this returns false, any call to `create_tmp_file` | ||
| /// will error. | ||
|
|
@@ -144,6 +219,94 @@ impl DiskManager { | |
| .map_err(DataFusionError::IoError)?, | ||
| }) | ||
| } | ||
|
|
||
| /// Write record batches to a temporary file, and return the spill file handle. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure about putting this method on the I have been meaning to file tickets about this and I will do so shortly. What would you think about introducing a new struct that would be responsible for managing the spill files for a particular operation Something like this perhaps: struct SpillFiles {
env: Arc<RuntimeEnv>
files: Vec<RefCountedTempFile>
...
}
...
impl SpillFiles {
pub fn try_spill_record_batches(
&self,
batches: &[RecordBatch],
request_description: &str,
caller_spill_metrics: &mut SpillMetrics,
) -> Result<()> {..}
} |
||
| /// | ||
| /// This method is used within executors with spilling capabilities to write | ||
| /// temporary `RecordBatch`es to disk. Resource errors are returned if the written | ||
| /// file size exceeds the disk limit specified in `max_temp_directory_size` from | ||
| /// `DiskManager`. | ||
| /// | ||
| /// # Arguments | ||
| /// | ||
| /// * `batches` - A slice of `RecordBatch` to be written to disk. Note that this | ||
| /// slice can't be empty. | ||
| /// * `request_description` - A description of the request for logging and error messages. | ||
| /// * `caller_spill_metrics` - Metrics to be updated with the spill operation details, from the calling exeuctor. | ||
| pub fn try_spill_record_batches( | ||
| &self, | ||
| batches: &[RecordBatch], | ||
| request_description: &str, | ||
| caller_spill_metrics: &mut SpillMetrics, | ||
| ) -> Result<RefCountedTempFile> { | ||
| if batches.is_empty() { | ||
| return internal_err!( | ||
| "`try_spill_record_batches` requires at least one batch" | ||
| ); | ||
| } | ||
|
|
||
| let spill_file = self.create_tmp_file(request_description)?; | ||
| let schema = batches[0].schema(); | ||
|
|
||
| let mut stream_writer = IPCStreamWriter::new(spill_file.path(), schema.as_ref())?; | ||
|
|
||
| for batch in batches { | ||
| // The IPC Stream writer does not have a mechanism to avoid writing duplicate | ||
| // `Buffer`s repeatedly, so we do not use `get_record_batch_memory_size()` | ||
| // to estimate the memory size with duplicated `Buffer`s. | ||
| let estimate_extra_size = batch.get_array_memory_size(); | ||
|
|
||
| if (self.used_disk_space.value() + estimate_extra_size) as u64 | ||
| > self.max_temp_directory_size | ||
| { | ||
| return resources_err!( | ||
| "The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.", | ||
| human_readable_size(self.max_temp_directory_size as usize) | ||
| ); | ||
| } | ||
|
|
||
| self.used_disk_space.add(estimate_extra_size); | ||
| stream_writer.write(batch)?; | ||
| } | ||
|
|
||
| stream_writer.finish()?; | ||
|
|
||
| // Update calling executor's spill metrics | ||
| caller_spill_metrics | ||
| .spilled_bytes | ||
| .add(stream_writer.num_bytes); | ||
| caller_spill_metrics | ||
| .spilled_rows | ||
| .add(stream_writer.num_rows); | ||
| caller_spill_metrics.spill_file_count.add(1); | ||
|
|
||
| Ok(spill_file) | ||
| } | ||
|
|
||
| /// Refer to the documentation for [`Self::try_spill_record_batches`]. This method | ||
| /// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`. | ||
| pub fn try_spill_record_batch_by_size( | ||
| &self, | ||
| batch: &RecordBatch, | ||
| request_description: &str, | ||
| spill_metrics: &mut SpillMetrics, | ||
| row_limit: usize, | ||
| ) -> Result<RefCountedTempFile> { | ||
| let total_rows = batch.num_rows(); | ||
| let mut batches = Vec::new(); | ||
| let mut offset = 0; | ||
|
|
||
| // It's ok to calculate all slices first, because slicing is zero-copy. | ||
| while offset < total_rows { | ||
| let length = std::cmp::min(total_rows - offset, row_limit); | ||
| let sliced_batch = batch.slice(offset, length); | ||
| batches.push(sliced_batch); | ||
| offset += length; | ||
| } | ||
|
|
||
| // Spill the sliced batches to disk | ||
| self.try_spill_record_batches(&batches, request_description, spill_metrics) | ||
| } | ||
| } | ||
|
|
||
| /// A wrapper around a [`NamedTempFile`] that also contains | ||
|
|
@@ -183,6 +346,51 @@ fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<Arc<TempDir>>> { | |
| .collect() | ||
| } | ||
|
|
||
| /// Write in Arrow IPC Stream format to a file. | ||
| /// | ||
| /// Stream format is used for spill because it supports dictionary replacement, and the random | ||
| /// access of IPC File format is not needed (IPC File format doesn't support dictionary replacement). | ||
| pub struct IPCStreamWriter { | ||
| /// Inner writer | ||
| pub writer: StreamWriter<File>, | ||
| /// Batches written | ||
| pub num_batches: usize, | ||
| /// Rows written | ||
| pub num_rows: usize, | ||
| /// Bytes written | ||
| pub num_bytes: usize, | ||
| } | ||
|
|
||
| impl IPCStreamWriter { | ||
| /// Create new writer | ||
| pub fn new(path: &Path, schema: &Schema) -> Result<Self> { | ||
| let file = File::create(path).map_err(|e| { | ||
| exec_datafusion_err!("Failed to create partition file at {path:?}: {e:?}") | ||
| })?; | ||
| Ok(Self { | ||
| num_batches: 0, | ||
| num_rows: 0, | ||
| num_bytes: 0, | ||
| writer: StreamWriter::try_new(file, schema)?, | ||
| }) | ||
| } | ||
|
|
||
| /// Write one single batch | ||
| pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { | ||
| self.writer.write(batch)?; | ||
| self.num_batches += 1; | ||
| self.num_rows += batch.num_rows(); | ||
| let num_bytes: usize = batch.get_array_memory_size(); | ||
| self.num_bytes += num_bytes; | ||
| Ok(()) | ||
| } | ||
|
|
||
| /// Finish the writer | ||
| pub fn finish(&mut self) -> Result<()> { | ||
| self.writer.finish().map_err(Into::into) | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't use the builder to specify disk limit for now, because
DiskManagerConfigis anenuminstead ofstruct, so now the setup routine is a bit hacky.Changing it I think will inevitably cause API change, so I prefer to leave it to a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you coudl do this iinstead of adding a new function:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DiskManagerBuilderto construct DiskManagers #15319 to track