Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 43 additions & 22 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1028,36 +1028,57 @@ impl GroupedHashAggregateStream {
Ok(None)
}
OutOfMemoryMode::EmitEarly if self.group_values.len() > 1 => {
let n = if self.group_values.len() >= self.batch_size {
// Try to emit an integer multiple of batch size if possible
self.group_values.len() / self.batch_size * self.batch_size
} else {
// Otherwise emit whatever we can
self.group_values.len()
};

// Clamp to the sort boundary when using partial group ordering,
// otherwise remove_groups panics (#20445).
let n = match &self.group_ordering {
GroupOrdering::None => n,
_ => match self.group_ordering.emit_to() {
Some(EmitTo::First(max)) => n.min(max),
_ => 0,
},
};

if n > 0
&& let Some(batch) = self.emit(EmitTo::First(n), false)?
if let Some(emit_to) = self.emit_target_for_oom()
&& let Some(batch) = self.emit(emit_to, false)?
{
Ok(Some(ExecutionState::ProducingOutput(batch)))
} else {
Err(oom)
return Ok(Some(ExecutionState::ProducingOutput(batch)));
}
Err(oom)
}
_ => Err(oom),
OutOfMemoryMode::EmitEarly
| OutOfMemoryMode::Spill
| OutOfMemoryMode::ReportError => Err(oom),
}
}

/// Returns the number of groups groups that can be emitted to avoid an
/// out-of-memory condition.
///
/// Returns `None` if emitting is not possible.
///
/// Returns `Some(EmitTo)` with the number of groups to emit if it is possible
/// to emit some groups to free memory.
fn emit_target_for_oom(&self) -> Option<EmitTo> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emit_target_for_oom is a nice extraction, but it mixes two different concerns:

  • the generic OOM‑batching policy (“how many rows would I like to emit?”) and
  • the GroupOrdering rules (“what emit strategies are legal given the current ordering?” – None has its own special case).

I think renaming/doc‑tweaking the helper (e.g. oom_emit_to / “returns the emit strategy to use under OOM”) would clarify this.

Alternatively, move the ordering‑aware clamping logic onto GroupOrdering (or into aggregates/order/mod.rs).
That way row_hash.rs just decides how much it wants to emit under memory pressure, and the ordering module enforces its invariants – which keeps the rules in one place and makes the helper reusable for other emit paths.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good insight -- I moved the code (well had codex move the code) into GroupValues and I think it is a much nicer encapsulation of concerns. It also is easier to unit test

let n = if self.group_values.len() >= self.batch_size {
// Try to emit an integer multiple of batch size if possible
self.group_values.len() / self.batch_size * self.batch_size
} else {
// Otherwise emit whatever we can
self.group_values.len()
};

// Special case for GroupOrdering::None since emit_to() returns None for
// that case, but we can still emit some groups to try to resolve the OOM
if matches!(&self.group_ordering, GroupOrdering::None) {
return Some(EmitTo::First(n));
};

// For the case of GroupOrdering::Partial or GroupOrdering::Full, use
// the ordering's emit_to() method to determine how many groups can be
// emitted while respecting the ordering guarantees, clamped to the
// batch size.
self.group_ordering.emit_to().map(|emit_to| match emit_to {
// If the ordering allows emitting some groups,
// emit as many as we can to try to resolve the OOM,
EmitTo::First(max) => EmitTo::First(n.min(max)),
// if the ordering allows emitting all groups, we can emit n
// groups to try to resolve the OOM
EmitTo::All => EmitTo::First(n),
})
}

fn update_memory_reservation(&mut self) -> Result<()> {
let acc = self.accumulators.iter().map(|x| x.size()).sum::<usize>();
let new_size = acc
Expand Down
Loading