diff --git a/datafusion/core/tests/physical_optimizer/pushdown_sort.rs b/datafusion/core/tests/physical_optimizer/pushdown_sort.rs index caef0fba052cb..efe3c53487fcc 100644 --- a/datafusion/core/tests/physical_optimizer/pushdown_sort.rs +++ b/datafusion/core/tests/physical_optimizer/pushdown_sort.rs @@ -32,8 +32,8 @@ use datafusion_physical_optimizer::pushdown_sort::PushdownSort; use std::sync::Arc; use crate::physical_optimizer::test_utils::{ - OptimizationTest, coalesce_batches_exec, coalesce_partitions_exec, parquet_exec, - parquet_exec_with_sort, projection_exec, projection_exec_with_alias, + OptimizationTest, coalesce_batches_exec, coalesce_partitions_exec, exact_test_scan, + parquet_exec, parquet_exec_with_sort, projection_exec, projection_exec_with_alias, repartition_exec, schema, simple_projection_exec, sort_exec, sort_exec_with_fetch, sort_expr, sort_expr_named, test_scan_with_ordering, }; @@ -1038,3 +1038,58 @@ fn test_sort_pushdown_with_test_scan_arbitrary_ordering() { " ); } + +// ============================================================================ +// EXACT PUSHDOWN TESTS (source guarantees ordering, SortExec removed) +// ============================================================================ + +#[test] +fn test_sort_pushdown_exact_no_fetch_no_limit() { + let schema = schema(); + let a = sort_expr("a", &schema); + let b = sort_expr("b", &schema); + let source = exact_test_scan(schema.clone()); + + let ordering = LexOrdering::new(vec![a, b.reverse()]).unwrap(); + let plan = sort_exec(ordering, source); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST], preserve_partitioning=[false] + - ExactTestScan + output: + Ok: + - ExactTestScan: ordered=[a@0 ASC, b@1 DESC NULLS LAST] + " + ); +} + +#[test] +fn test_sort_pushdown_exact_preserves_fetch() { + // When a source returns Exact and the SortExec has fetch (LIMIT), + // the optimizer tries to push the limit into the source via with_fetch(). + // ExactTestScan supports with_fetch(), so the limit should appear + // directly on the source (no GlobalLimitExec wrapper needed). + let schema = schema(); + let a = sort_expr("a", &schema); + let source = exact_test_scan(schema.clone()); + + let ordering = LexOrdering::new(vec![a]).unwrap(); + let plan = sort_exec_with_fetch(ordering, Some(10), source); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[false] + - ExactTestScan + output: + Ok: + - ExactTestScan: ordered=[a@0 ASC], fetch=10 + " + ); +} diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 5b50181d7fd3e..f05baa094069c 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -979,3 +979,144 @@ pub fn test_scan_with_ordering( ) -> Arc { Arc::new(TestScan::with_ordering(schema, ordering)) } + +/// A test scan that returns `Exact` from `try_pushdown_sort`. +#[derive(Debug, Clone)] +pub struct ExactTestScan { + schema: SchemaRef, + plan_properties: PlanProperties, + requested_ordering: Option, + fetch: Option, +} + +impl ExactTestScan { + pub fn new(schema: SchemaRef) -> Self { + let eq_properties = EquivalenceProperties::new(Arc::clone(&schema)); + let plan_properties = PlanProperties::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ); + Self { + schema, + plan_properties, + requested_ordering: None, + fetch: None, + } + } +} + +impl DisplayAs for ExactTestScan { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "ExactTestScan")?; + if let Some(ref req) = self.requested_ordering { + write!(f, ": ordered=[")?; + for (i, sort_expr) in req.iter().enumerate() { + if i > 0 { + write!(f, ", ")?; + } + write!(f, "{sort_expr}")?; + } + write!(f, "]")?; + } + if let Some(fetch) = self.fetch { + write!(f, ", fetch={fetch}")?; + } + Ok(()) + } + DisplayFormatType::TreeRender => { + write!(f, "ExactTestScan") + } + } + } +} + +impl ExecutionPlan for ExactTestScan { + fn name(&self) -> &str { + "ExactTestScan" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.plan_properties + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + if children.is_empty() { + Ok(self) + } else { + internal_err!("ExactTestScan should have no children") + } + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + internal_err!("ExactTestScan is for testing optimizer only, not for execution") + } + + fn partition_statistics(&self, _partition: Option) -> Result { + Ok(Statistics::new_unknown(&self.schema)) + } + + fn with_fetch(&self, fetch: Option) -> Option> { + let mut new_scan = self.clone(); + new_scan.fetch = fetch; + Some(Arc::new(new_scan)) + } + + fn fetch(&self) -> Option { + self.fetch + } + + fn try_pushdown_sort( + &self, + order: &[PhysicalSortExpr], + ) -> Result>> { + let requested_ordering = LexOrdering::new(order.to_vec()); + + let orderings: Vec> = vec![order.to_vec()]; + let eq_properties = EquivalenceProperties::new_with_orderings( + Arc::clone(&self.schema), + orderings, + ); + let plan_properties = PlanProperties::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ); + + let new_scan = ExactTestScan { + schema: Arc::clone(&self.schema), + plan_properties, + requested_ordering, + fetch: self.fetch, + }; + + // Return Exact: this source guarantees the requested ordering + Ok(SortOrderPushdownResult::Exact { + inner: Arc::new(new_scan), + }) + } +} + +/// Helper function to create an ExactTestScan +pub fn exact_test_scan(schema: SchemaRef) -> Arc { + Arc::new(ExactTestScan::new(schema)) +} diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs b/datafusion/physical-optimizer/src/pushdown_sort.rs index 1fa15492d2a92..9f5f590385393 100644 --- a/datafusion/physical-optimizer/src/pushdown_sort.rs +++ b/datafusion/physical-optimizer/src/pushdown_sort.rs @@ -55,6 +55,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::SortOrderPushdownResult; +use datafusion_physical_plan::limit::GlobalLimitExec; use datafusion_physical_plan::sorts::sort::SortExec; use std::sync::Arc; @@ -95,8 +96,27 @@ impl PhysicalOptimizerRule for PushdownSort { // Each node type defines its own pushdown behavior via try_pushdown_sort() match sort_input.try_pushdown_sort(required_ordering)? { SortOrderPushdownResult::Exact { inner } => { - // Data source guarantees perfect ordering - remove the Sort operator - Ok(Transformed::yes(inner)) + // Data source guarantees perfect ordering - remove the Sort operator. + // + // If the SortExec carried a fetch (LIMIT), we must preserve it. + // First try pushing the limit into the source via `with_fetch()` + // If the source doesn't support `with_fetch`, fall back to + // wrapping with GlobalLimitExec. + // + // Note: LimitPushdown runs *before* PushdownSort in the optimizer + // pipeline, so we need to handle the limit manually here. + if let Some(fetch) = sort_exec.fetch() { + let limited = inner + .with_fetch(Some(fetch)) + .unwrap_or_else(|| { + Arc::new(GlobalLimitExec::new( + inner, 0, Some(fetch), + )) + }); + Ok(Transformed::yes(limited)) + } else { + Ok(Transformed::yes(inner)) + } } SortOrderPushdownResult::Inexact { inner } => { // Data source is optimized for the ordering but not perfectly sorted