Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 57 additions & 2 deletions datafusion/core/tests/physical_optimizer/pushdown_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use datafusion_physical_optimizer::pushdown_sort::PushdownSort;
use std::sync::Arc;

use crate::physical_optimizer::test_utils::{
OptimizationTest, coalesce_batches_exec, coalesce_partitions_exec, parquet_exec,
parquet_exec_with_sort, projection_exec, projection_exec_with_alias,
OptimizationTest, coalesce_batches_exec, coalesce_partitions_exec, exact_test_scan,
parquet_exec, parquet_exec_with_sort, projection_exec, projection_exec_with_alias,
repartition_exec, schema, simple_projection_exec, sort_exec, sort_exec_with_fetch,
sort_expr, sort_expr_named, test_scan_with_ordering,
};
Expand Down Expand Up @@ -1038,3 +1038,58 @@ fn test_sort_pushdown_with_test_scan_arbitrary_ordering() {
"
);
}

// ============================================================================
// EXACT PUSHDOWN TESTS (source guarantees ordering, SortExec removed)
// ============================================================================

#[test]
fn test_sort_pushdown_exact_no_fetch_no_limit() {
let schema = schema();
let a = sort_expr("a", &schema);
let b = sort_expr("b", &schema);
let source = exact_test_scan(schema.clone());

let ordering = LexOrdering::new(vec![a, b.reverse()]).unwrap();
let plan = sort_exec(ordering, source);

insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownSort::new(), true),
@r"
OptimizationTest:
input:
- SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST], preserve_partitioning=[false]
- ExactTestScan
output:
Ok:
- ExactTestScan: ordered=[a@0 ASC, b@1 DESC NULLS LAST]
"
);
}

#[test]
fn test_sort_pushdown_exact_preserves_fetch() {
// When a source returns Exact and the SortExec has fetch (LIMIT),
// the optimizer tries to push the limit into the source via with_fetch().
// ExactTestScan supports with_fetch(), so the limit should appear
// directly on the source (no GlobalLimitExec wrapper needed).
let schema = schema();
let a = sort_expr("a", &schema);
let source = exact_test_scan(schema.clone());

let ordering = LexOrdering::new(vec![a]).unwrap();
let plan = sort_exec_with_fetch(ordering, Some(10), source);

insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownSort::new(), true),
@r"
OptimizationTest:
input:
- SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[false]
- ExactTestScan
output:
Ok:
- ExactTestScan: ordered=[a@0 ASC], fetch=10
"
);
}
141 changes: 141 additions & 0 deletions datafusion/core/tests/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -979,3 +979,144 @@ pub fn test_scan_with_ordering(
) -> Arc<dyn ExecutionPlan> {
Arc::new(TestScan::with_ordering(schema, ordering))
}

/// A test scan that returns `Exact` from `try_pushdown_sort`.
#[derive(Debug, Clone)]
pub struct ExactTestScan {
schema: SchemaRef,
plan_properties: PlanProperties,
requested_ordering: Option<LexOrdering>,
fetch: Option<usize>,
}

impl ExactTestScan {
pub fn new(schema: SchemaRef) -> Self {
let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let plan_properties = PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
);
Self {
schema,
plan_properties,
requested_ordering: None,
fetch: None,
}
}
}

impl DisplayAs for ExactTestScan {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "ExactTestScan")?;
if let Some(ref req) = self.requested_ordering {
write!(f, ": ordered=[")?;
for (i, sort_expr) in req.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{sort_expr}")?;
}
write!(f, "]")?;
}
if let Some(fetch) = self.fetch {
write!(f, ", fetch={fetch}")?;
}
Ok(())
}
DisplayFormatType::TreeRender => {
write!(f, "ExactTestScan")
}
}
}
}

impl ExecutionPlan for ExactTestScan {
fn name(&self) -> &str {
"ExactTestScan"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &PlanProperties {
&self.plan_properties
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if children.is_empty() {
Ok(self)
} else {
internal_err!("ExactTestScan should have no children")
}
}

fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
internal_err!("ExactTestScan is for testing optimizer only, not for execution")
}

fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema))
}

fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let mut new_scan = self.clone();
new_scan.fetch = fetch;
Some(Arc::new(new_scan))
}

fn fetch(&self) -> Option<usize> {
self.fetch
}

fn try_pushdown_sort(
&self,
order: &[PhysicalSortExpr],
) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
let requested_ordering = LexOrdering::new(order.to_vec());

let orderings: Vec<Vec<PhysicalSortExpr>> = vec![order.to_vec()];
let eq_properties = EquivalenceProperties::new_with_orderings(
Arc::clone(&self.schema),
orderings,
);
let plan_properties = PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
);

let new_scan = ExactTestScan {
schema: Arc::clone(&self.schema),
plan_properties,
requested_ordering,
fetch: self.fetch,
};

// Return Exact: this source guarantees the requested ordering
Ok(SortOrderPushdownResult::Exact {
inner: Arc::new(new_scan),
})
}
}

/// Helper function to create an ExactTestScan
pub fn exact_test_scan(schema: SchemaRef) -> Arc<dyn ExecutionPlan> {
Arc::new(ExactTestScan::new(schema))
}
24 changes: 22 additions & 2 deletions datafusion/physical-optimizer/src/pushdown_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::SortOrderPushdownResult;
use datafusion_physical_plan::limit::GlobalLimitExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use std::sync::Arc;

Expand Down Expand Up @@ -95,8 +96,27 @@ impl PhysicalOptimizerRule for PushdownSort {
// Each node type defines its own pushdown behavior via try_pushdown_sort()
match sort_input.try_pushdown_sort(required_ordering)? {
SortOrderPushdownResult::Exact { inner } => {
// Data source guarantees perfect ordering - remove the Sort operator
Ok(Transformed::yes(inner))
// Data source guarantees perfect ordering - remove the Sort operator.
//
// If the SortExec carried a fetch (LIMIT), we must preserve it.
// First try pushing the limit into the source via `with_fetch()`
// If the source doesn't support `with_fetch`, fall back to
// wrapping with GlobalLimitExec.
//
// Note: LimitPushdown runs *before* PushdownSort in the optimizer
// pipeline, so we need to handle the limit manually here.
if let Some(fetch) = sort_exec.fetch() {
let limited = inner
.with_fetch(Some(fetch))
.unwrap_or_else(|| {
Arc::new(GlobalLimitExec::new(
inner, 0, Some(fetch),
Comment thread
sgrebnov marked this conversation as resolved.
))
});
Comment thread
sgrebnov marked this conversation as resolved.
Comment thread
sgrebnov marked this conversation as resolved.
Ok(Transformed::yes(limited))
} else {
Ok(Transformed::yes(inner))
}
}
SortOrderPushdownResult::Inexact { inner } => {
// Data source is optimized for the ordering but not perfectly sorted
Expand Down