diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 731e24b53c60..ed35492041be 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -33,7 +33,9 @@ use crate::metrics::{ }; use crate::projection::{make_with_child, update_expr, ProjectionExec}; use crate::sorts::streaming_merge::StreamingMergeBuilder; -use crate::spill::{get_record_batch_memory_size, InProgressSpillFile, SpillManager}; +use crate::spill::get_record_batch_memory_size; +use crate::spill::in_progress_spill_file::InProgressSpillFile; +use crate::spill::spill_manager::SpillManager; use crate::stream::RecordBatchStreamAdapter; use crate::topk::TopK; use crate::{ diff --git a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs new file mode 100644 index 000000000000..8c1ed7755907 --- /dev/null +++ b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Define the `InProgressSpillFile` struct, which represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`. + +use datafusion_common::Result; +use std::sync::Arc; + +use arrow::array::RecordBatch; +use datafusion_common::exec_datafusion_err; +use datafusion_execution::disk_manager::RefCountedTempFile; + +use super::{spill_manager::SpillManager, IPCStreamWriter}; + +/// Represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`. +/// Caller is able to use this struct to incrementally append in-memory batches to +/// the file, and then finalize the file by calling the `finish` method. +pub struct InProgressSpillFile { + pub(crate) spill_writer: Arc, + /// Lazily initialized writer + writer: Option, + /// Lazily initialized in-progress file, it will be moved out when the `finish` method is invoked + in_progress_file: Option, +} + +impl InProgressSpillFile { + pub fn new( + spill_writer: Arc, + in_progress_file: RefCountedTempFile, + ) -> Self { + Self { + spill_writer, + in_progress_file: Some(in_progress_file), + writer: None, + } + } + + /// Appends a `RecordBatch` to the file, initializing the writer if necessary. + pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<()> { + if self.in_progress_file.is_none() { + return Err(exec_datafusion_err!( + "Append operation failed: No active in-progress file. The file may have already been finalized." + )); + } + if self.writer.is_none() { + let schema = batch.schema(); + if let Some(ref in_progress_file) = self.in_progress_file { + self.writer = Some(IPCStreamWriter::new( + in_progress_file.path(), + schema.as_ref(), + )?); + + // Update metrics + self.spill_writer.metrics.spill_file_count.add(1); + } + } + if let Some(writer) = &mut self.writer { + let (spilled_rows, spilled_bytes) = writer.write(batch)?; + + // Update metrics + self.spill_writer.metrics.spilled_bytes.add(spilled_bytes); + self.spill_writer.metrics.spilled_rows.add(spilled_rows); + } + Ok(()) + } + + /// Finalizes the file, returning the completed file reference. + /// If there are no batches spilled before, it returns `None`. + pub fn finish(&mut self) -> Result> { + if let Some(writer) = &mut self.writer { + writer.finish()?; + } else { + return Ok(None); + } + + Ok(self.in_progress_file.take()) + } +} diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill/mod.rs similarity index 77% rename from datafusion/physical-plan/src/spill.rs rename to datafusion/physical-plan/src/spill/mod.rs index 381761203786..412704f3fa61 100644 --- a/datafusion/physical-plan/src/spill.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -17,17 +17,18 @@ //! Defines the spilling functions +pub(crate) mod in_progress_spill_file; +pub(crate) mod spill_manager; + use std::fs::File; use std::io::BufReader; use std::path::{Path, PathBuf}; use std::ptr::NonNull; -use std::sync::Arc; use arrow::array::ArrayData; use arrow::datatypes::{Schema, SchemaRef}; use arrow::ipc::{reader::StreamReader, writer::StreamWriter}; use arrow::record_batch::RecordBatch; -use datafusion_execution::runtime_env::RuntimeEnv; use log::debug; use tokio::sync::mpsc::Sender; @@ -36,7 +37,6 @@ use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::human_readable_size; use datafusion_execution::SendableRecordBatchStream; -use crate::metrics::SpillMetrics; use crate::stream::RecordBatchReceiverStream; /// Read spilled batches from the disk @@ -229,182 +229,21 @@ impl IPCStreamWriter { } } -/// The `SpillManager` is responsible for the following tasks: -/// - Reading and writing `RecordBatch`es to raw files based on the provided configurations. -/// - Updating the associated metrics. -/// -/// Note: The caller (external operators such as `SortExec`) is responsible for interpreting the spilled files. -/// For example, all records within the same spill file are ordered according to a specific order. -#[derive(Debug, Clone)] -pub(crate) struct SpillManager { - env: Arc, - metrics: SpillMetrics, - schema: SchemaRef, - /// Number of batches to buffer in memory during disk reads - batch_read_buffer_capacity: usize, - // TODO: Add general-purpose compression options -} - -impl SpillManager { - pub fn new(env: Arc, metrics: SpillMetrics, schema: SchemaRef) -> Self { - Self { - env, - metrics, - schema, - batch_read_buffer_capacity: 2, - } - } - - /// Creates a temporary file for in-progress operations, returning an error - /// message if file creation fails. The file can be used to append batches - /// incrementally and then finish the file when done. - pub fn create_in_progress_file( - &self, - request_msg: &str, - ) -> Result { - let temp_file = self.env.disk_manager.create_tmp_file(request_msg)?; - Ok(InProgressSpillFile::new(Arc::new(self.clone()), temp_file)) - } - - /// Spill input `batches` into a single file in a atomic operation. If it is - /// intended to incrementally write in-memory batches into the same spill file, - /// use [`Self::create_in_progress_file`] instead. - /// None is returned if no batches are spilled. - #[allow(dead_code)] // TODO: remove after change SMJ to use SpillManager - pub fn spill_record_batch_and_finish( - &self, - batches: &[RecordBatch], - request_msg: &str, - ) -> Result> { - let mut in_progress_file = self.create_in_progress_file(request_msg)?; - - for batch in batches { - in_progress_file.append_batch(batch)?; - } - - in_progress_file.finish() - } - - /// Refer to the documentation for [`Self::spill_record_batch_and_finish`]. This method - /// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`. - #[allow(dead_code)] // TODO: remove after change aggregate to use SpillManager - pub fn spill_record_batch_by_size( - &self, - batch: &RecordBatch, - request_description: &str, - row_limit: usize, - ) -> Result> { - let total_rows = batch.num_rows(); - let mut batches = Vec::new(); - let mut offset = 0; - - // It's ok to calculate all slices first, because slicing is zero-copy. - while offset < total_rows { - let length = std::cmp::min(total_rows - offset, row_limit); - let sliced_batch = batch.slice(offset, length); - batches.push(sliced_batch); - offset += length; - } - - // Spill the sliced batches to disk - self.spill_record_batch_and_finish(&batches, request_description) - } - - /// Reads a spill file as a stream. The file must be created by the current `SpillManager`. - /// This method will generate output in FIFO order: the batch appended first - /// will be read first. - pub fn read_spill_as_stream( - &self, - spill_file_path: RefCountedTempFile, - ) -> Result { - let mut builder = RecordBatchReceiverStream::builder( - Arc::clone(&self.schema), - self.batch_read_buffer_capacity, - ); - let sender = builder.tx(); - - builder.spawn_blocking(move || read_spill(sender, spill_file_path.path())); - - Ok(builder.build()) - } -} - -/// Represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`. -/// Caller is able to use this struct to incrementally append in-memory batches to -/// the file, and then finalize the file by calling the `finish` method. -pub(crate) struct InProgressSpillFile { - spill_writer: Arc, - /// Lazily initialized writer - writer: Option, - /// Lazily initialized in-progress file, it will be moved out when the `finish` method is invoked - in_progress_file: Option, -} - -impl InProgressSpillFile { - pub fn new( - spill_writer: Arc, - in_progress_file: RefCountedTempFile, - ) -> Self { - Self { - spill_writer, - in_progress_file: Some(in_progress_file), - writer: None, - } - } - - /// Appends a `RecordBatch` to the file, initializing the writer if necessary. - pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<()> { - if self.in_progress_file.is_none() { - return Err(exec_datafusion_err!( - "Append operation failed: No active in-progress file. The file may have already been finalized." - )); - } - if self.writer.is_none() { - let schema = batch.schema(); - if let Some(ref in_progress_file) = self.in_progress_file { - self.writer = Some(IPCStreamWriter::new( - in_progress_file.path(), - schema.as_ref(), - )?); - - // Update metrics - self.spill_writer.metrics.spill_file_count.add(1); - } - } - if let Some(writer) = &mut self.writer { - let (spilled_rows, spilled_bytes) = writer.write(batch)?; - - // Update metrics - self.spill_writer.metrics.spilled_bytes.add(spilled_bytes); - self.spill_writer.metrics.spilled_rows.add(spilled_rows); - } - Ok(()) - } - - /// Finalizes the file, returning the completed file reference. - /// If there are no batches spilled before, it returns `None`. - pub fn finish(&mut self) -> Result> { - if let Some(writer) = &mut self.writer { - writer.finish()?; - } else { - return Ok(None); - } - - Ok(self.in_progress_file.take()) - } -} - #[cfg(test)] mod tests { + use super::in_progress_spill_file::InProgressSpillFile; use super::*; use crate::common::collect; use crate::metrics::ExecutionPlanMetricsSet; + use crate::metrics::SpillMetrics; + use crate::spill::spill_manager::SpillManager; use crate::test::build_table_i32; use arrow::array::{Float64Array, Int32Array, ListArray, StringArray}; use arrow::compute::cast; use arrow::datatypes::{DataType, Field, Int32Type, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::Result; + use datafusion_execution::runtime_env::RuntimeEnv; use std::sync::Arc; diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs new file mode 100644 index 000000000000..4a8e293323f0 --- /dev/null +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Define the `SpillManager` struct, which is responsible for reading and writing `RecordBatch`es to raw files based on the provided configurations. + +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_execution::runtime_env::RuntimeEnv; + +use datafusion_common::Result; +use datafusion_execution::disk_manager::RefCountedTempFile; +use datafusion_execution::SendableRecordBatchStream; + +use crate::metrics::SpillMetrics; +use crate::stream::RecordBatchReceiverStream; + +use super::{in_progress_spill_file::InProgressSpillFile, read_spill}; + +/// The `SpillManager` is responsible for the following tasks: +/// - Reading and writing `RecordBatch`es to raw files based on the provided configurations. +/// - Updating the associated metrics. +/// +/// Note: The caller (external operators such as `SortExec`) is responsible for interpreting the spilled files. +/// For example, all records within the same spill file are ordered according to a specific order. +#[derive(Debug, Clone)] +pub struct SpillManager { + env: Arc, + pub(crate) metrics: SpillMetrics, + schema: SchemaRef, + /// Number of batches to buffer in memory during disk reads + batch_read_buffer_capacity: usize, + // TODO: Add general-purpose compression options +} + +impl SpillManager { + pub fn new(env: Arc, metrics: SpillMetrics, schema: SchemaRef) -> Self { + Self { + env, + metrics, + schema, + batch_read_buffer_capacity: 2, + } + } + + /// Creates a temporary file for in-progress operations, returning an error + /// message if file creation fails. The file can be used to append batches + /// incrementally and then finish the file when done. + pub fn create_in_progress_file( + &self, + request_msg: &str, + ) -> Result { + let temp_file = self.env.disk_manager.create_tmp_file(request_msg)?; + Ok(InProgressSpillFile::new(Arc::new(self.clone()), temp_file)) + } + + /// Spill input `batches` into a single file in a atomic operation. If it is + /// intended to incrementally write in-memory batches into the same spill file, + /// use [`Self::create_in_progress_file`] instead. + /// None is returned if no batches are spilled. + #[allow(dead_code)] // TODO: remove after change SMJ to use SpillManager + pub fn spill_record_batch_and_finish( + &self, + batches: &[RecordBatch], + request_msg: &str, + ) -> Result> { + let mut in_progress_file = self.create_in_progress_file(request_msg)?; + + for batch in batches { + in_progress_file.append_batch(batch)?; + } + + in_progress_file.finish() + } + + /// Refer to the documentation for [`Self::spill_record_batch_and_finish`]. This method + /// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`. + #[allow(dead_code)] // TODO: remove after change aggregate to use SpillManager + pub fn spill_record_batch_by_size( + &self, + batch: &RecordBatch, + request_description: &str, + row_limit: usize, + ) -> Result> { + let total_rows = batch.num_rows(); + let mut batches = Vec::new(); + let mut offset = 0; + + // It's ok to calculate all slices first, because slicing is zero-copy. + while offset < total_rows { + let length = std::cmp::min(total_rows - offset, row_limit); + let sliced_batch = batch.slice(offset, length); + batches.push(sliced_batch); + offset += length; + } + + // Spill the sliced batches to disk + self.spill_record_batch_and_finish(&batches, request_description) + } + + /// Reads a spill file as a stream. The file must be created by the current `SpillManager`. + /// This method will generate output in FIFO order: the batch appended first + /// will be read first. + pub fn read_spill_as_stream( + &self, + spill_file_path: RefCountedTempFile, + ) -> Result { + let mut builder = RecordBatchReceiverStream::builder( + Arc::clone(&self.schema), + self.batch_read_buffer_capacity, + ); + let sender = builder.tx(); + + builder.spawn_blocking(move || read_spill(sender, spill_file_path.path())); + + Ok(builder.build()) + } +}