Skip to content

Commit f20e2e2

Browse files
authored
Remove unused fetch inside ExternalSorter (#15525)
1 parent 1e05bea commit f20e2e2

File tree

1 file changed

+20
-59
lines changed
  • datafusion/physical-plan/src/sorts

1 file changed

+20
-59
lines changed

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 20 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,6 @@ struct ExternalSorter {
207207
expr: Arc<[PhysicalSortExpr]>,
208208
/// RowConverter corresponding to the sort expressions
209209
sort_keys_row_converter: Arc<RowConverter>,
210-
/// If Some, the maximum number of output rows that will be produced
211-
fetch: Option<usize>,
212210
/// The target number of rows for output batches
213211
batch_size: usize,
214212
/// If the in size of buffered memory batches is below this size,
@@ -262,7 +260,6 @@ impl ExternalSorter {
262260
schema: SchemaRef,
263261
expr: LexOrdering,
264262
batch_size: usize,
265-
fetch: Option<usize>,
266263
sort_spill_reservation_bytes: usize,
267264
sort_in_place_threshold_bytes: usize,
268265
metrics: &ExecutionPlanMetricsSet,
@@ -307,7 +304,6 @@ impl ExternalSorter {
307304
expr: expr.into(),
308305
sort_keys_row_converter: Arc::new(converter),
309306
metrics,
310-
fetch,
311307
reservation,
312308
spill_manager,
313309
merge_reservation,
@@ -330,10 +326,8 @@ impl ExternalSorter {
330326

331327
let size = get_reserved_byte_for_record_batch(&input);
332328
if self.reservation.try_grow(size).is_err() {
333-
self.sort_or_spill_in_mem_batches(false).await?;
334-
// We've already freed more than half of reserved memory,
335-
// so we can grow the reservation again. There's nothing we can do
336-
// if this try_grow fails.
329+
self.sort_and_spill_in_mem_batches().await?;
330+
// After spilling all in-memory batches, the retry should succeed
337331
self.reservation.try_grow(size)?;
338332
}
339333

@@ -367,7 +361,7 @@ impl ExternalSorter {
367361
// `in_mem_batches` and the memory limit is almost reached, merging
368362
// them with the spilled files at the same time might cause OOM.
369363
if !self.in_mem_batches.is_empty() {
370-
self.sort_or_spill_in_mem_batches(true).await?;
364+
self.sort_and_spill_in_mem_batches().await?;
371365
}
372366

373367
for spill in self.finished_spill_files.drain(..) {
@@ -386,7 +380,7 @@ impl ExternalSorter {
386380
.with_expressions(expressions.as_ref())
387381
.with_metrics(self.metrics.baseline.clone())
388382
.with_batch_size(self.batch_size)
389-
.with_fetch(self.fetch)
383+
.with_fetch(None)
390384
.with_reservation(self.merge_reservation.new_empty())
391385
.build()
392386
} else {
@@ -532,28 +526,15 @@ impl ExternalSorter {
532526
Ok(())
533527
}
534528

535-
/// Sorts the in_mem_batches and potentially spill the sorted batches.
536-
///
537-
/// If the memory usage has dropped by a factor of 2, it might be a sort with
538-
/// fetch (e.g. sorting 1M rows but only keep the top 100), so we keep the
539-
/// sorted entries inside `in_mem_batches` to be sorted in the next iteration.
540-
/// Otherwise, we spill the sorted run to free up memory for inserting more batches.
541-
///
542-
/// # Arguments
543-
///
544-
/// * `force_spill` - If true, the method will spill the in-memory batches
545-
/// even if the memory usage has not dropped by a factor of 2. Otherwise it will
546-
/// only spill when the memory usage has dropped by the pre-defined factor.
547-
///
548-
async fn sort_or_spill_in_mem_batches(&mut self, force_spill: bool) -> Result<()> {
529+
/// Sorts the in-memory batches and merges them into a single sorted run, then writes
530+
/// the result to spill files.
531+
async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
549532
// Release the memory reserved for merge back to the pool so
550533
// there is some left when `in_mem_sort_stream` requests an
551534
// allocation. At the end of this function, memory will be
552535
// reserved again for the next spill.
553536
self.merge_reservation.free();
554537

555-
let before = self.reservation.size();
556-
557538
let mut sorted_stream =
558539
self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
559540
// After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken
@@ -568,7 +549,6 @@ impl ExternalSorter {
568549
// sort-preserving merge and incrementally append to spill files.
569550
let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
570551

571-
let mut spilled = false;
572552
while let Some(batch) = sorted_stream.next().await {
573553
let batch = batch?;
574554
let sorted_size = get_reserved_byte_for_record_batch(&batch);
@@ -579,7 +559,6 @@ impl ExternalSorter {
579559
globally_sorted_batches.push(batch);
580560
self.consume_and_spill_append(&mut globally_sorted_batches)
581561
.await?; // reservation is freed in spill()
582-
spilled = true;
583562
} else {
584563
globally_sorted_batches.push(batch);
585564
}
@@ -589,33 +568,17 @@ impl ExternalSorter {
589568
// upcoming `self.reserve_memory_for_merge()` may fail due to insufficient memory.
590569
drop(sorted_stream);
591570

592-
// Sorting may free up some memory especially when fetch is `Some`. If we have
593-
// not freed more than 50% of the memory, then we have to spill to free up more
594-
// memory for inserting more batches.
595-
if (self.reservation.size() > before / 2) || force_spill {
596-
// We have not freed more than 50% of the memory, so we have to spill to
597-
// free up more memory
598-
self.consume_and_spill_append(&mut globally_sorted_batches)
599-
.await?;
600-
spilled = true;
601-
}
602-
603-
if spilled {
604-
// There might be some buffered batches that haven't trigger a spill yet.
605-
self.consume_and_spill_append(&mut globally_sorted_batches)
606-
.await?;
607-
self.spill_finish().await?;
608-
} else {
609-
// If the memory limit has reached before calling this function, and it
610-
// didn't spill anything, it means this is a sorting with fetch top K
611-
// element: after sorting only the top K elements will be kept in memory.
612-
// For simplicity, those sorted top K entries are put back to unsorted
613-
// `in_mem_batches` to be consumed by the next sort/merge.
614-
if !self.in_mem_batches.is_empty() {
615-
return internal_err!("in_mem_batches should be cleared before");
616-
}
571+
self.consume_and_spill_append(&mut globally_sorted_batches)
572+
.await?;
573+
self.spill_finish().await?;
617574

618-
self.in_mem_batches = std::mem::take(&mut globally_sorted_batches);
575+
// Sanity check after spilling
576+
let buffers_cleared_property =
577+
self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty();
578+
if !buffers_cleared_property {
579+
return internal_err!(
580+
"in_mem_batches and globally_sorted_batches should be cleared before"
581+
);
619582
}
620583

621584
// Reserve headroom for next sort/merge
@@ -740,7 +703,7 @@ impl ExternalSorter {
740703
.with_expressions(expressions.as_ref())
741704
.with_metrics(metrics)
742705
.with_batch_size(self.batch_size)
743-
.with_fetch(self.fetch)
706+
.with_fetch(None)
744707
.with_reservation(self.merge_reservation.new_empty())
745708
.build()
746709
}
@@ -761,7 +724,6 @@ impl ExternalSorter {
761724
);
762725
let schema = batch.schema();
763726

764-
let fetch = self.fetch;
765727
let expressions: LexOrdering = self.expr.iter().cloned().collect();
766728
let row_converter = Arc::clone(&self.sort_keys_row_converter);
767729
let stream = futures::stream::once(async move {
@@ -775,9 +737,9 @@ impl ExternalSorter {
775737
let sorted = if is_multi_column_with_lists(&sort_columns) {
776738
// lex_sort_to_indices doesn't support List with more than one column
777739
// https://github.com/apache/arrow-rs/issues/5454
778-
sort_batch_row_based(&batch, &expressions, row_converter, fetch)?
740+
sort_batch_row_based(&batch, &expressions, row_converter, None)?
779741
} else {
780-
sort_batch(&batch, &expressions, fetch)?
742+
sort_batch(&batch, &expressions, None)?
781743
};
782744

783745
metrics.record_output(sorted.num_rows());
@@ -1244,7 +1206,6 @@ impl ExecutionPlan for SortExec {
12441206
input.schema(),
12451207
self.expr.clone(),
12461208
context.session_config().batch_size(),
1247-
self.fetch,
12481209
execution_options.sort_spill_reservation_bytes,
12491210
execution_options.sort_in_place_threshold_bytes,
12501211
&self.metrics_set,

0 commit comments

Comments
 (0)