Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion datafusion/core/tests/fuzz_cases/sort_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ async fn test_sort_10k_mem() {
#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_100k_mem() {
for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, true)] {
for (batch_size, should_spill) in
[(5, false), (10000, false), (20000, true), (1000000, true)]
{
SortTest::new()
.with_int32_batches(batch_size)
.with_pool_size(100 * KB)
Expand Down
8 changes: 5 additions & 3 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async fn oom_sort() {
.with_expected_errors(vec![
"Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)",
])
.with_memory_limit(200_000)
.with_memory_limit(400_000)
.run()
.await
}
Expand Down Expand Up @@ -271,7 +271,8 @@ async fn sort_spill_reservation() {

// Merge operation needs extra memory to do row conversion, so make the
// memory limit larger.
let mem_limit = partition_size * 2;
let mem_limit =
((partition_size * 2 + 1024) as f64 / MEMORY_FRACTION).ceil() as usize;
let test = TestCase::new()
// This query uses a different order than the input table to
// force a sort. It also needs to have multiple columns to
Expand Down Expand Up @@ -308,7 +309,8 @@ async fn sort_spill_reservation() {

test.clone()
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorterMerge",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:",
"bytes for ExternalSorterMerge",
])
.with_config(config)
.run()
Expand Down
22 changes: 21 additions & 1 deletion datafusion/physical-plan/src/sorts/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,22 @@ pub struct ArrayValues<T: CursorValues> {
// Otherwise, the first null index
null_threshold: usize,
options: SortOptions,

/// Tracks the memory used by the values array,
/// freed on drop.
_reservation: MemoryReservation,
}

impl<T: CursorValues> ArrayValues<T> {
/// Create a new [`ArrayValues`] from the provided `values` sorted according
/// to `options`.
///
/// Panics if the array is empty
pub fn new<A: CursorArray<Values = T>>(options: SortOptions, array: &A) -> Self {
pub fn new<A: CursorArray<Values = T>>(
options: SortOptions,
array: &A,
reservation: MemoryReservation,
) -> Self {
assert!(array.len() > 0, "Empty array passed to FieldCursor");
let null_threshold = match options.nulls_first {
true => array.null_count(),
Expand All @@ -309,6 +317,7 @@ impl<T: CursorValues> ArrayValues<T> {
values: array.values(),
null_threshold,
options,
_reservation: reservation,
}
}

Expand Down Expand Up @@ -360,6 +369,12 @@ impl<T: CursorValues> CursorValues for ArrayValues<T> {

#[cfg(test)]
mod tests {
use std::sync::Arc;

use datafusion_execution::memory_pool::{
GreedyMemoryPool, MemoryConsumer, MemoryPool,
};

use super::*;

fn new_primitive(
Expand All @@ -372,10 +387,15 @@ mod tests {
false => values.len() - null_count,
};

let memory_pool: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(10000));
let consumer = MemoryConsumer::new("test");
let reservation = consumer.register(&memory_pool);

let values = ArrayValues {
values: PrimitiveValues(values),
null_threshold,
options,
_reservation: reservation,
};

Cursor::new(values)
Expand Down
Loading
Loading