Skip to content

Commit 9ec679b

Browse files
authored
Clean up ExternalSorter and use upstream converter (#16109)
1 parent 963b649 commit 9ec679b

File tree

1 file changed

+6
-132
lines changed
  • datafusion/physical-plan/src/sorts

1 file changed

+6
-132
lines changed

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 6 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,10 @@ use crate::{
4444
Statistics,
4545
};
4646

47-
use arrow::array::{
48-
Array, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array,
49-
};
50-
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn};
51-
use arrow::datatypes::{DataType, SchemaRef};
52-
use arrow::row::{RowConverter, Rows, SortField};
53-
use datafusion_common::{
54-
exec_datafusion_err, internal_datafusion_err, internal_err, DataFusionError, Result,
55-
};
47+
use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
48+
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
49+
use arrow::datatypes::SchemaRef;
50+
use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError, Result};
5651
use datafusion_execution::disk_manager::RefCountedTempFile;
5752
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
5853
use datafusion_execution::runtime_env::RuntimeEnv;
@@ -206,8 +201,6 @@ struct ExternalSorter {
206201
schema: SchemaRef,
207202
/// Sort expressions
208203
expr: Arc<[PhysicalSortExpr]>,
209-
/// RowConverter corresponding to the sort expressions
210-
sort_keys_row_converter: Arc<RowConverter>,
211204
/// The target number of rows for output batches
212205
batch_size: usize,
213206
/// If the in size of buffered memory batches is below this size,
@@ -275,22 +268,6 @@ impl ExternalSorter {
275268
MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
276269
.register(&runtime.memory_pool);
277270

278-
// Construct RowConverter for sort keys
279-
let sort_fields = expr
280-
.iter()
281-
.map(|e| {
282-
let data_type = e
283-
.expr
284-
.data_type(&schema)
285-
.map_err(|e| e.context("Resolving sort expression data type"))?;
286-
Ok(SortField::new_with_options(data_type, e.options))
287-
})
288-
.collect::<Result<Vec<_>>>()?;
289-
290-
let converter = RowConverter::new(sort_fields).map_err(|e| {
291-
exec_datafusion_err!("Failed to create RowConverter: {:?}", e)
292-
})?;
293-
294271
let spill_manager = SpillManager::new(
295272
Arc::clone(&runtime),
296273
metrics.spill_metrics.clone(),
@@ -303,7 +280,6 @@ impl ExternalSorter {
303280
in_progress_spill_file: None,
304281
finished_spill_files: vec![],
305282
expr: expr.into(),
306-
sort_keys_row_converter: Arc::new(converter),
307283
metrics,
308284
reservation,
309285
spill_manager,
@@ -728,22 +704,10 @@ impl ExternalSorter {
728704
let schema = batch.schema();
729705

730706
let expressions: LexOrdering = self.expr.iter().cloned().collect();
731-
let row_converter = Arc::clone(&self.sort_keys_row_converter);
732707
let stream = futures::stream::once(async move {
733708
let _timer = metrics.elapsed_compute().timer();
734709

735-
let sort_columns = expressions
736-
.iter()
737-
.map(|expr| expr.evaluate_to_sort_column(&batch))
738-
.collect::<Result<Vec<_>>>()?;
739-
740-
let sorted = if is_multi_column_with_lists(&sort_columns) {
741-
// lex_sort_to_indices doesn't support List with more than one column
742-
// https://github.com/apache/arrow-rs/issues/5454
743-
sort_batch_row_based(&batch, &expressions, row_converter, None)?
744-
} else {
745-
sort_batch(&batch, &expressions, None)?
746-
};
710+
let sorted = sort_batch(&batch, &expressions, None)?;
747711

748712
metrics.record_output(sorted.num_rows());
749713
drop(batch);
@@ -834,45 +798,6 @@ impl Debug for ExternalSorter {
834798
}
835799
}
836800

837-
/// Converts rows into a sorted array of indices based on their order.
838-
/// This function returns the indices that represent the sorted order of the rows.
839-
fn rows_to_indices(rows: Rows, limit: Option<usize>) -> Result<UInt32Array> {
840-
let mut sort: Vec<_> = rows.iter().enumerate().collect();
841-
sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
842-
843-
let mut len = rows.num_rows();
844-
if let Some(limit) = limit {
845-
len = limit.min(len);
846-
}
847-
let indices =
848-
UInt32Array::from_iter_values(sort.iter().take(len).map(|(i, _)| *i as u32));
849-
Ok(indices)
850-
}
851-
852-
/// Sorts a `RecordBatch` by converting its sort columns into Arrow Row Format for faster comparison.
853-
fn sort_batch_row_based(
854-
batch: &RecordBatch,
855-
expressions: &LexOrdering,
856-
row_converter: Arc<RowConverter>,
857-
fetch: Option<usize>,
858-
) -> Result<RecordBatch> {
859-
let sort_columns = expressions
860-
.iter()
861-
.map(|expr| expr.evaluate_to_sort_column(batch).map(|col| col.values))
862-
.collect::<Result<Vec<_>>>()?;
863-
let rows = row_converter.convert_columns(&sort_columns)?;
864-
let indices = rows_to_indices(rows, fetch)?;
865-
let columns = take_arrays(batch.columns(), &indices, None)?;
866-
867-
let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
868-
869-
Ok(RecordBatch::try_new_with_options(
870-
batch.schema(),
871-
columns,
872-
&options,
873-
)?)
874-
}
875-
876801
pub fn sort_batch(
877802
batch: &RecordBatch,
878803
expressions: &LexOrdering,
@@ -883,14 +808,7 @@ pub fn sort_batch(
883808
.map(|expr| expr.evaluate_to_sort_column(batch))
884809
.collect::<Result<Vec<_>>>()?;
885810

886-
let indices = if is_multi_column_with_lists(&sort_columns) {
887-
// lex_sort_to_indices doesn't support List with more than one column
888-
// https://github.com/apache/arrow-rs/issues/5454
889-
lexsort_to_indices_multi_columns(sort_columns, fetch)?
890-
} else {
891-
lexsort_to_indices(&sort_columns, fetch)?
892-
};
893-
811+
let indices = lexsort_to_indices(&sort_columns, fetch)?;
894812
let mut columns = take_arrays(batch.columns(), &indices, None)?;
895813

896814
// The columns may be larger than the unsorted columns in `batch` especially for variable length
@@ -909,50 +827,6 @@ pub fn sort_batch(
909827
)?)
910828
}
911829

912-
#[inline]
913-
fn is_multi_column_with_lists(sort_columns: &[SortColumn]) -> bool {
914-
sort_columns.iter().any(|c| {
915-
matches!(
916-
c.values.data_type(),
917-
DataType::List(_) | DataType::LargeList(_) | DataType::FixedSizeList(_, _)
918-
)
919-
})
920-
}
921-
922-
pub(crate) fn lexsort_to_indices_multi_columns(
923-
sort_columns: Vec<SortColumn>,
924-
limit: Option<usize>,
925-
) -> Result<UInt32Array> {
926-
let (fields, columns) = sort_columns.into_iter().fold(
927-
(vec![], vec![]),
928-
|(mut fields, mut columns), sort_column| {
929-
fields.push(SortField::new_with_options(
930-
sort_column.values.data_type().clone(),
931-
sort_column.options.unwrap_or_default(),
932-
));
933-
columns.push(sort_column.values);
934-
(fields, columns)
935-
},
936-
);
937-
938-
// Note: row converter is reused through `sort_batch_row_based()`, this function
939-
// is not used during normal sort execution, but it's kept temporarily because
940-
// it's inside a public interface `sort_batch()`.
941-
let converter = RowConverter::new(fields)?;
942-
let rows = converter.convert_columns(&columns)?;
943-
let mut sort: Vec<_> = rows.iter().enumerate().collect();
944-
sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
945-
946-
let mut len = rows.num_rows();
947-
if let Some(limit) = limit {
948-
len = limit.min(len);
949-
}
950-
let indices =
951-
UInt32Array::from_iter_values(sort.iter().take(len).map(|(i, _)| *i as u32));
952-
953-
Ok(indices)
954-
}
955-
956830
/// Sort execution plan.
957831
///
958832
/// Support sorting datasets that are larger than the memory allotted

0 commit comments

Comments
 (0)