Skip to content

Commit 5eb0968

Browse files
authored
Fix: fetch is missing in EnforceSorting optimizer (two places) (#15822)
* Fix: fetch is missing in EnforceSort * add ut test_parallelize_sort_preserves_fetch * add ut: test_plan_with_order_preserving_variants_preserves_fetch * update * address comments
1 parent 84513a4 commit 5eb0968

File tree

4 files changed

+109
-6
lines changed

4 files changed

+109
-6
lines changed

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3440,3 +3440,38 @@ fn test_handles_multiple_orthogonal_sorts() -> Result<()> {
34403440

34413441
Ok(())
34423442
}
3443+
3444+
#[test]
3445+
fn test_parallelize_sort_preserves_fetch() -> Result<()> {
3446+
// Create a schema
3447+
let schema = create_test_schema3()?;
3448+
let parquet_exec = parquet_exec(&schema);
3449+
let coalesced = Arc::new(CoalescePartitionsExec::new(parquet_exec.clone()));
3450+
let top_coalesced = CoalescePartitionsExec::new(coalesced.clone())
3451+
.with_fetch(Some(10))
3452+
.unwrap();
3453+
3454+
let requirements = PlanWithCorrespondingCoalescePartitions::new(
3455+
top_coalesced.clone(),
3456+
true,
3457+
vec![PlanWithCorrespondingCoalescePartitions::new(
3458+
coalesced,
3459+
true,
3460+
vec![PlanWithCorrespondingCoalescePartitions::new(
3461+
parquet_exec,
3462+
false,
3463+
vec![],
3464+
)],
3465+
)],
3466+
);
3467+
3468+
let res = parallelize_sorts(requirements)?;
3469+
3470+
// Verify fetch was preserved
3471+
assert_eq!(
3472+
res.data.plan.fetch(),
3473+
Some(10),
3474+
"Fetch value was not preserved after transformation"
3475+
);
3476+
Ok(())
3477+
}

datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
use std::sync::Arc;
1919

2020
use crate::physical_optimizer::test_utils::{
21-
check_integrity, sort_preserving_merge_exec, stream_exec_ordered_with_projection,
21+
check_integrity, create_test_schema3, sort_preserving_merge_exec,
22+
stream_exec_ordered_with_projection,
2223
};
2324

2425
use datafusion::prelude::SessionContext;
@@ -40,13 +41,14 @@ use datafusion_physical_plan::{
4041
};
4142
use datafusion::datasource::source::DataSourceExec;
4243
use datafusion_common::tree_node::{TransformedResult, TreeNode};
43-
use datafusion_common::Result;
44+
use datafusion_common::{assert_contains, Result};
4445
use datafusion_expr::{JoinType, Operator};
4546
use datafusion_physical_expr::expressions::{self, col, Column};
4647
use datafusion_physical_expr::PhysicalSortExpr;
47-
use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preserving_variants::{replace_with_order_preserving_variants, OrderPreservationContext};
48+
use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preserving_variants::{plan_with_order_preserving_variants, replace_with_order_preserving_variants, OrderPreservationContext};
4849
use datafusion_common::config::ConfigOptions;
4950

51+
use crate::physical_optimizer::enforce_sorting::parquet_exec_sorted;
5052
use object_store::memory::InMemory;
5153
use object_store::ObjectStore;
5254
use rstest::rstest;
@@ -1259,3 +1261,52 @@ fn memory_exec_sorted(
12591261
))
12601262
})
12611263
}
1264+
1265+
#[test]
1266+
fn test_plan_with_order_preserving_variants_preserves_fetch() -> Result<()> {
1267+
// Create a schema
1268+
let schema = create_test_schema3()?;
1269+
let parquet_sort_exprs = vec![crate::physical_optimizer::test_utils::sort_expr(
1270+
"a", &schema,
1271+
)];
1272+
let parquet_exec = parquet_exec_sorted(&schema, parquet_sort_exprs);
1273+
let coalesced = CoalescePartitionsExec::new(parquet_exec.clone())
1274+
.with_fetch(Some(10))
1275+
.unwrap();
1276+
1277+
// Test sort's fetch is greater than coalesce fetch, return error because it's not reasonable
1278+
let requirements = OrderPreservationContext::new(
1279+
coalesced.clone(),
1280+
false,
1281+
vec![OrderPreservationContext::new(
1282+
parquet_exec.clone(),
1283+
false,
1284+
vec![],
1285+
)],
1286+
);
1287+
let res = plan_with_order_preserving_variants(requirements, false, true, Some(15));
1288+
assert_contains!(res.unwrap_err().to_string(), "CoalescePartitionsExec fetch [10] should be greater than or equal to SortExec fetch [15]");
1289+
1290+
// Test sort is without fetch, expected to get the fetch value from the coalesced
1291+
let requirements = OrderPreservationContext::new(
1292+
coalesced.clone(),
1293+
false,
1294+
vec![OrderPreservationContext::new(
1295+
parquet_exec.clone(),
1296+
false,
1297+
vec![],
1298+
)],
1299+
);
1300+
let res = plan_with_order_preserving_variants(requirements, false, true, None)?;
1301+
assert_eq!(res.plan.fetch(), Some(10),);
1302+
1303+
// Test sort's fetch is less than coalesces fetch, expected to get the fetch value from the sort
1304+
let requirements = OrderPreservationContext::new(
1305+
coalesced,
1306+
false,
1307+
vec![OrderPreservationContext::new(parquet_exec, false, vec![])],
1308+
);
1309+
let res = plan_with_order_preserving_variants(requirements, false, true, Some(5))?;
1310+
assert_eq!(res.plan.fetch(), Some(5),);
1311+
Ok(())
1312+
}

datafusion/physical-optimizer/src/enforce_sorting/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ pub fn parallelize_sorts(
400400
),
401401
))
402402
} else if is_coalesce_partitions(&requirements.plan) {
403+
let fetch = requirements.plan.fetch();
403404
// There is an unnecessary `CoalescePartitionsExec` in the plan.
404405
// This will handle the recursive `CoalescePartitionsExec` plans.
405406
requirements = remove_bottleneck_in_subplan(requirements)?;
@@ -408,7 +409,10 @@ pub fn parallelize_sorts(
408409

409410
Ok(Transformed::yes(
410411
PlanWithCorrespondingCoalescePartitions::new(
411-
Arc::new(CoalescePartitionsExec::new(Arc::clone(&requirements.plan))),
412+
// Safe to unwrap, because `CoalescePartitionsExec` has a fetch
413+
CoalescePartitionsExec::new(Arc::clone(&requirements.plan))
414+
.with_fetch(fetch)
415+
.unwrap(),
412416
false,
413417
vec![requirements],
414418
),

datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::utils::{
2727

2828
use datafusion_common::config::ConfigOptions;
2929
use datafusion_common::tree_node::Transformed;
30-
use datafusion_common::Result;
30+
use datafusion_common::{internal_err, Result};
3131
use datafusion_physical_expr_common::sort_expr::LexOrdering;
3232
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
3333
use datafusion_physical_plan::execution_plan::EmissionType;
@@ -93,7 +93,7 @@ pub fn update_order_preservation_ctx_children_data(opc: &mut OrderPreservationCo
9393
/// inside `sort_input` with their order-preserving variants. This will
9494
/// generate an alternative plan, which will be accepted or rejected later on
9595
/// depending on whether it helps us remove a `SortExec`.
96-
fn plan_with_order_preserving_variants(
96+
pub fn plan_with_order_preserving_variants(
9797
mut sort_input: OrderPreservationContext,
9898
// Flag indicating that it is desirable to replace `RepartitionExec`s with
9999
// `SortPreservingRepartitionExec`s:
@@ -138,6 +138,19 @@ fn plan_with_order_preserving_variants(
138138
} else if is_coalesce_partitions(&sort_input.plan) && is_spm_better {
139139
let child = &sort_input.children[0].plan;
140140
if let Some(ordering) = child.output_ordering() {
141+
let mut fetch = fetch;
142+
if let Some(coalesce_fetch) = sort_input.plan.fetch() {
143+
if let Some(sort_fetch) = fetch {
144+
if coalesce_fetch < sort_fetch {
145+
return internal_err!(
146+
"CoalescePartitionsExec fetch [{:?}] should be greater than or equal to SortExec fetch [{:?}]", coalesce_fetch, sort_fetch
147+
);
148+
}
149+
} else {
150+
// If the sort node does not have a fetch, we need to keep the coalesce node's fetch.
151+
fetch = Some(coalesce_fetch);
152+
}
153+
};
141154
// When the input of a `CoalescePartitionsExec` has an ordering,
142155
// replace it with a `SortPreservingMergeExec` if appropriate:
143156
let spm = SortPreservingMergeExec::new(ordering.clone(), Arc::clone(child))

0 commit comments

Comments
 (0)