Skip to content

External sort failing with non-spillable operators as input (RepartitionExec) #17334

@16pierre

Description

@16pierre

Describe the bug

I'm trying to sort some input (union of filtered Parquet files) with fixed parallelism, in order to do so I'm manually fiddling with a round-robin RepartitionExec operator before the sort. I reckon this is slightly hacky, they may be alternatives, the possibility of unions of large numbers of inputs makes partitioning trickier though.

When doing so, I get memory allocation failures on the RepartitionExec operator (I suspect this could happen with other non-spillable operators used as input of the Sort nodes though):

ResourcesExhausted(\"Additional allocation failed with top memory consumers (across reservations) as: 
ExternalSorter[0] consumed 145849856 bytes, 
ExternalSorterMerge[0] consumed 50000000 bytes,
RepartitionExec[0] consumed 2951744 bytes. 

Error: Failed to allocate additional 2951744 bytes for RepartitionExec[0] with 2951744 bytes already allocated for this reservation - 1198400 bytes remain available for the total pool

If my understanding is correct, this feels expected given the current sort implementation: we trigger spill only after saturating the memory pool and getting allocation failure on try_grow:

match self.reservation.try_grow(size) {

Which means that once the global memory pool is filled up with sort allocations, any operator used as input of the sort can fail if it uses non-spillable memory allocations.

To Reproduce

My current repro setup cannot be shared as a snippet because it uses a bunch of production codepaths.

Roughly what I'm doing is:

  • write a Parquet file with a couple millions rows (binary columns, 16 bytes each, UUIDs)
  • read the Parquet file and run repartition + sort
// Memory allocator = FairSpillPool

let physical_plan = open_my_parquet_file(...);

// Note: I get failures with input partitions = 1 and sort_parallelism = 1
let partitioning = Partitioning::RoundRobinBatch(sort_parallelism as usize);
let repartitioned_exec = Arc::new(RepartitionExec::try_new(physical_plan, partitioning))
SortExec::new(sort_expression.clone(), repartitioned_exec)

Note we're currently on Datafusion 46.0.1 (will upgrade soon to pick up recent sort improvements)

Expected behavior

No response

Additional context

Main work-arounds/hacks that come to mind:

  • register a fake spillable allocation to force FairSpillPool to spill earlier than before memory is full
  • implement custom MemoryPool to special-case how different operators are tracked

Ideally I'd prefer a solution where we can precisely control how much memory is allocated to the sort compared to the rest of the operators, especially given the subtle memory allocations we faced during the sort merge phase - the solution "fake spillable allocation" is a bit coarse-grain with that regards.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions