Skip to content

Commit

Permalink
Account for memory usage in SortPreservingMerge
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Aug 2, 2023
1 parent 2af3ae7 commit f5019c9
Show file tree
Hide file tree
Showing 16 changed files with 714 additions and 100 deletions.
16 changes: 16 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,22 @@ config_namespace! {
///
/// Defaults to the number of CPU cores on the system
pub planning_concurrency: usize, default = num_cpus::get()

/// How much memory is set aside, when using spillable sorts,
/// to ensure in-memory merge can occur. This setting has no
/// effect of the sort can not spill (there is no
/// `DiskManager` configured)
///
/// Data is first sorted when spilling from memory to disk
/// when available memory is exhausted. However, this pre-sort
/// requires additional memory. To avoid trying to allocate once
/// memory is exhausted, DataFusion will set aside this many bytes.
pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024

/// Below what size should data be concatenated and sorted in
/// a single RecordBatch rather than sorted in batches and merged.
pub sort_in_place_threshold_bytes: usize, default = 1 * 1024 * 1024

}
}

Expand Down
11 changes: 9 additions & 2 deletions datafusion/core/src/physical_plan/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,14 +576,21 @@ impl ExecutionPlan for RepartitionExec {

// Get existing ordering:
let sort_exprs = self.input.output_ordering().unwrap_or(&[]);
// Merge streams (while preserving ordering) coming from input partitions to this partition:

// Merge streams (while preserving ordering) coming from
// input partitions to this partition:
let fetch = None;
let merge_reservation =
MemoryConsumer::new(format!("{}[Merge {partition}]", self.name()))
.register(context.memory_pool());
streaming_merge(
input_streams,
self.schema(),
sort_exprs,
BaselineMetrics::new(&self.metrics, partition),
context.session_config().batch_size(),
None,
fetch,
merge_reservation,
)
} else {
Ok(Box::pin(RepartitionStream {
Expand Down
29 changes: 22 additions & 7 deletions datafusion/core/src/physical_plan/sorts/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use arrow::compute::interleave;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_execution::memory_pool::MemoryReservation;

#[derive(Debug, Copy, Clone, Default)]
struct BatchCursor {
Expand All @@ -37,6 +38,9 @@ pub struct BatchBuilder {
/// Maintain a list of [`RecordBatch`] and their corresponding stream
batches: Vec<(usize, RecordBatch)>,

/// Accounts for memory used by buffered batches
reservation: MemoryReservation,

/// The current [`BatchCursor`] for each stream
cursors: Vec<BatchCursor>,

Expand All @@ -47,23 +51,31 @@ pub struct BatchBuilder {

impl BatchBuilder {
/// Create a new [`BatchBuilder`] with the provided `stream_count` and `batch_size`
pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) -> Self {
pub fn new(
schema: SchemaRef,
stream_count: usize,
batch_size: usize,
reservation: MemoryReservation,
) -> Self {
Self {
schema,
batches: Vec::with_capacity(stream_count * 2),
cursors: vec![BatchCursor::default(); stream_count],
indices: Vec::with_capacity(batch_size),
reservation,
}
}

/// Append a new batch in `stream_idx`
pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) {
pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> {
self.reservation.try_grow(batch.get_array_memory_size())?;
let batch_idx = self.batches.len();
self.batches.push((stream_idx, batch));
self.cursors[stream_idx] = BatchCursor {
batch_idx,
row_idx: 0,
}
};
Ok(())
}

/// Append the next row from `stream_idx`
Expand Down Expand Up @@ -119,14 +131,17 @@ impl BatchBuilder {
// We can therefore drop all but the last batch for each stream
let mut batch_idx = 0;
let mut retained = 0;
self.batches.retain(|(stream_idx, _)| {
self.batches.retain(|(stream_idx, batch)| {
let stream_cursor = &mut self.cursors[*stream_idx];
let retain = stream_cursor.batch_idx == batch_idx;
batch_idx += 1;

if retain {
stream_cursor.batch_idx = retained;
retained += 1;
match retain {
true => {
stream_cursor.batch_idx = retained;
retained += 1;
}
false => self.reservation.shrink(batch.get_array_memory_size()),
}
retain
});
Expand Down
9 changes: 7 additions & 2 deletions datafusion/core/src/physical_plan/sorts/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use arrow::datatypes::ArrowNativeTypeOp;
use arrow::row::{Row, Rows};
use arrow_array::types::ByteArrayType;
use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray};
use datafusion_execution::memory_pool::MemoryReservation;
use std::cmp::Ordering;

/// A [`Cursor`] for [`Rows`]
Expand All @@ -29,6 +30,9 @@ pub struct RowCursor {
num_rows: usize,

rows: Rows,

#[allow(dead_code)]
reservation: MemoryReservation,
}

impl std::fmt::Debug for RowCursor {
Expand All @@ -41,12 +45,13 @@ impl std::fmt::Debug for RowCursor {
}

impl RowCursor {
/// Create a new SortKeyCursor
pub fn new(rows: Rows) -> Self {
/// Create a new SortKeyCursor from `rows` and the associated `reservation`
pub fn new(rows: Rows, reservation: MemoryReservation) -> Self {
Self {
cur_row: 0,
num_rows: rows.num_rows(),
rows,
reservation,
}
}

Expand Down
30 changes: 20 additions & 10 deletions datafusion/core/src/physical_plan/sorts/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_array::*;
use datafusion_common::Result;
use datafusion_execution::memory_pool::MemoryReservation;
use futures::Stream;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
Expand All @@ -42,14 +43,15 @@ macro_rules! primitive_merge_helper {
}

macro_rules! merge_helper {
($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident) => {{
($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident) => {{
let streams = FieldCursorStream::<$t>::new($sort, $streams);
return Ok(Box::pin(SortPreservingMergeStream::new(
Box::new(streams),
$schema,
$tracking_metrics,
$batch_size,
$fetch,
$reservation,
)));
}};
}
Expand All @@ -63,28 +65,36 @@ pub fn streaming_merge(
metrics: BaselineMetrics,
batch_size: usize,
fetch: Option<usize>,
reservation: MemoryReservation,
) -> Result<SendableRecordBatchStream> {
// Special case single column comparisons with optimized cursor implementations
if expressions.len() == 1 {
let sort = expressions[0].clone();
let data_type = sort.expr.data_type(schema.as_ref())?;
downcast_primitive! {
data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch),
DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch)
DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch)
DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch)
DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch)
data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation),
DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
_ => {}
}
}

let streams = RowCursorStream::try_new(schema.as_ref(), expressions, streams)?;
let streams = RowCursorStream::try_new(
schema.as_ref(),
expressions,
streams,
reservation.new_empty(),
)?;

Ok(Box::pin(SortPreservingMergeStream::new(
Box::new(streams),
schema,
metrics,
batch_size,
fetch,
reservation,
)))
}

Expand Down Expand Up @@ -162,11 +172,12 @@ impl<C: Cursor> SortPreservingMergeStream<C> {
metrics: BaselineMetrics,
batch_size: usize,
fetch: Option<usize>,
reservation: MemoryReservation,
) -> Self {
let stream_count = streams.partitions();

Self {
in_progress: BatchBuilder::new(schema, stream_count, batch_size),
in_progress: BatchBuilder::new(schema, stream_count, batch_size, reservation),
streams,
metrics,
aborted: false,
Expand Down Expand Up @@ -197,8 +208,7 @@ impl<C: Cursor> SortPreservingMergeStream<C> {
Some(Err(e)) => Poll::Ready(Err(e)),
Some(Ok((cursor, batch))) => {
self.cursors[idx] = Some(cursor);
self.in_progress.push_batch(idx, batch);
Poll::Ready(Ok(()))
Poll::Ready(self.in_progress.push_batch(idx, batch))
}
}
}
Expand Down
Loading

0 comments on commit f5019c9

Please sign in to comment.