Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use crate::metrics::{
};
use crate::projection::{make_with_child, update_expr, ProjectionExec};
use crate::sorts::streaming_merge::StreamingMergeBuilder;
use crate::spill::{get_record_batch_memory_size, InProgressSpillFile, SpillManager};
use crate::spill::get_record_batch_memory_size;
use crate::spill::in_progress_spill_file::InProgressSpillFile;
use crate::spill::spill_manager::SpillManager;
use crate::stream::RecordBatchStreamAdapter;
use crate::topk::TopK;
use crate::{
Expand Down
92 changes: 92 additions & 0 deletions datafusion/physical-plan/src/spill/in_progress_spill_file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Define the `InProgressSpillFile` struct, which represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`.

use datafusion_common::Result;
use std::sync::Arc;

use arrow::array::RecordBatch;
use datafusion_common::exec_datafusion_err;
use datafusion_execution::disk_manager::RefCountedTempFile;

use super::{spill_manager::SpillManager, IPCStreamWriter};

/// Represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`.
/// Caller is able to use this struct to incrementally append in-memory batches to
/// the file, and then finalize the file by calling the `finish` method.
pub struct InProgressSpillFile {
pub(crate) spill_writer: Arc<SpillManager>,
/// Lazily initialized writer
writer: Option<IPCStreamWriter>,
/// Lazily initialized in-progress file, it will be moved out when the `finish` method is invoked
in_progress_file: Option<RefCountedTempFile>,
}

impl InProgressSpillFile {
pub fn new(
spill_writer: Arc<SpillManager>,
in_progress_file: RefCountedTempFile,
) -> Self {
Self {
spill_writer,
in_progress_file: Some(in_progress_file),
writer: None,
}
}

/// Appends a `RecordBatch` to the file, initializing the writer if necessary.
pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<()> {
if self.in_progress_file.is_none() {
return Err(exec_datafusion_err!(
"Append operation failed: No active in-progress file. The file may have already been finalized."
));
}
if self.writer.is_none() {
let schema = batch.schema();
if let Some(ref in_progress_file) = self.in_progress_file {
self.writer = Some(IPCStreamWriter::new(
in_progress_file.path(),
schema.as_ref(),
)?);

// Update metrics
self.spill_writer.metrics.spill_file_count.add(1);
}
}
if let Some(writer) = &mut self.writer {
let (spilled_rows, spilled_bytes) = writer.write(batch)?;

// Update metrics
self.spill_writer.metrics.spilled_bytes.add(spilled_bytes);
self.spill_writer.metrics.spilled_rows.add(spilled_rows);
}
Ok(())
}

/// Finalizes the file, returning the completed file reference.
/// If there are no batches spilled before, it returns `None`.
pub fn finish(&mut self) -> Result<Option<RefCountedTempFile>> {
if let Some(writer) = &mut self.writer {
writer.finish()?;
} else {
return Ok(None);
}

Ok(self.in_progress_file.take())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

//! Defines the spilling functions

pub(crate) mod in_progress_spill_file;
pub(crate) mod spill_manager;

use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::ptr::NonNull;
use std::sync::Arc;

use arrow::array::ArrayData;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::ipc::{reader::StreamReader, writer::StreamWriter};
use arrow::record_batch::RecordBatch;
use datafusion_execution::runtime_env::RuntimeEnv;
use log::debug;
use tokio::sync::mpsc::Sender;

Expand All @@ -36,7 +37,6 @@ use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::human_readable_size;
use datafusion_execution::SendableRecordBatchStream;

use crate::metrics::SpillMetrics;
use crate::stream::RecordBatchReceiverStream;

/// Read spilled batches from the disk
Expand Down Expand Up @@ -229,182 +229,21 @@ impl IPCStreamWriter {
}
}

/// The `SpillManager` is responsible for the following tasks:
/// - Reading and writing `RecordBatch`es to raw files based on the provided configurations.
/// - Updating the associated metrics.
///
/// Note: The caller (external operators such as `SortExec`) is responsible for interpreting the spilled files.
/// For example, all records within the same spill file are ordered according to a specific order.
#[derive(Debug, Clone)]
pub(crate) struct SpillManager {
env: Arc<RuntimeEnv>,
metrics: SpillMetrics,
schema: SchemaRef,
/// Number of batches to buffer in memory during disk reads
batch_read_buffer_capacity: usize,
// TODO: Add general-purpose compression options
}

impl SpillManager {
pub fn new(env: Arc<RuntimeEnv>, metrics: SpillMetrics, schema: SchemaRef) -> Self {
Self {
env,
metrics,
schema,
batch_read_buffer_capacity: 2,
}
}

/// Creates a temporary file for in-progress operations, returning an error
/// message if file creation fails. The file can be used to append batches
/// incrementally and then finish the file when done.
pub fn create_in_progress_file(
&self,
request_msg: &str,
) -> Result<InProgressSpillFile> {
let temp_file = self.env.disk_manager.create_tmp_file(request_msg)?;
Ok(InProgressSpillFile::new(Arc::new(self.clone()), temp_file))
}

/// Spill input `batches` into a single file in a atomic operation. If it is
/// intended to incrementally write in-memory batches into the same spill file,
/// use [`Self::create_in_progress_file`] instead.
/// None is returned if no batches are spilled.
#[allow(dead_code)] // TODO: remove after change SMJ to use SpillManager
pub fn spill_record_batch_and_finish(
&self,
batches: &[RecordBatch],
request_msg: &str,
) -> Result<Option<RefCountedTempFile>> {
let mut in_progress_file = self.create_in_progress_file(request_msg)?;

for batch in batches {
in_progress_file.append_batch(batch)?;
}

in_progress_file.finish()
}

/// Refer to the documentation for [`Self::spill_record_batch_and_finish`]. This method
/// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`.
#[allow(dead_code)] // TODO: remove after change aggregate to use SpillManager
pub fn spill_record_batch_by_size(
&self,
batch: &RecordBatch,
request_description: &str,
row_limit: usize,
) -> Result<Option<RefCountedTempFile>> {
let total_rows = batch.num_rows();
let mut batches = Vec::new();
let mut offset = 0;

// It's ok to calculate all slices first, because slicing is zero-copy.
while offset < total_rows {
let length = std::cmp::min(total_rows - offset, row_limit);
let sliced_batch = batch.slice(offset, length);
batches.push(sliced_batch);
offset += length;
}

// Spill the sliced batches to disk
self.spill_record_batch_and_finish(&batches, request_description)
}

/// Reads a spill file as a stream. The file must be created by the current `SpillManager`.
/// This method will generate output in FIFO order: the batch appended first
/// will be read first.
pub fn read_spill_as_stream(
&self,
spill_file_path: RefCountedTempFile,
) -> Result<SendableRecordBatchStream> {
let mut builder = RecordBatchReceiverStream::builder(
Arc::clone(&self.schema),
self.batch_read_buffer_capacity,
);
let sender = builder.tx();

builder.spawn_blocking(move || read_spill(sender, spill_file_path.path()));

Ok(builder.build())
}
}

/// Represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`.
/// Caller is able to use this struct to incrementally append in-memory batches to
/// the file, and then finalize the file by calling the `finish` method.
pub(crate) struct InProgressSpillFile {
spill_writer: Arc<SpillManager>,
/// Lazily initialized writer
writer: Option<IPCStreamWriter>,
/// Lazily initialized in-progress file, it will be moved out when the `finish` method is invoked
in_progress_file: Option<RefCountedTempFile>,
}

impl InProgressSpillFile {
pub fn new(
spill_writer: Arc<SpillManager>,
in_progress_file: RefCountedTempFile,
) -> Self {
Self {
spill_writer,
in_progress_file: Some(in_progress_file),
writer: None,
}
}

/// Appends a `RecordBatch` to the file, initializing the writer if necessary.
pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<()> {
if self.in_progress_file.is_none() {
return Err(exec_datafusion_err!(
"Append operation failed: No active in-progress file. The file may have already been finalized."
));
}
if self.writer.is_none() {
let schema = batch.schema();
if let Some(ref in_progress_file) = self.in_progress_file {
self.writer = Some(IPCStreamWriter::new(
in_progress_file.path(),
schema.as_ref(),
)?);

// Update metrics
self.spill_writer.metrics.spill_file_count.add(1);
}
}
if let Some(writer) = &mut self.writer {
let (spilled_rows, spilled_bytes) = writer.write(batch)?;

// Update metrics
self.spill_writer.metrics.spilled_bytes.add(spilled_bytes);
self.spill_writer.metrics.spilled_rows.add(spilled_rows);
}
Ok(())
}

/// Finalizes the file, returning the completed file reference.
/// If there are no batches spilled before, it returns `None`.
pub fn finish(&mut self) -> Result<Option<RefCountedTempFile>> {
if let Some(writer) = &mut self.writer {
writer.finish()?;
} else {
return Ok(None);
}

Ok(self.in_progress_file.take())
}
}

#[cfg(test)]
mod tests {
use super::in_progress_spill_file::InProgressSpillFile;
use super::*;
use crate::common::collect;
use crate::metrics::ExecutionPlanMetricsSet;
use crate::metrics::SpillMetrics;
use crate::spill::spill_manager::SpillManager;
use crate::test::build_table_i32;
use arrow::array::{Float64Array, Int32Array, ListArray, StringArray};
use arrow::compute::cast;
use arrow::datatypes::{DataType, Field, Int32Type, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_execution::runtime_env::RuntimeEnv;

use std::sync::Arc;

Expand Down
Loading