Skip to content

Commit bbc28fd

Browse files
committed
fix: fetch is missed during EnforceDistribution
1 parent f667a01 commit bbc28fd

File tree

1 file changed

+71
-36
lines changed

1 file changed

+71
-36
lines changed

datafusion/core/src/physical_optimizer/enforce_distribution.rs

Lines changed: 71 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -926,7 +926,10 @@ fn add_hash_on_top(
926926
///
927927
/// Updated node with an execution plan, where desired single
928928
/// distribution is satisfied by adding [`SortPreservingMergeExec`].
929-
fn add_spm_on_top(input: DistributionContext) -> DistributionContext {
929+
fn add_spm_on_top(
930+
input: DistributionContext,
931+
fetch: &mut Option<usize>,
932+
) -> DistributionContext {
930933
// Add SortPreservingMerge only when partition count is larger than 1.
931934
if input.plan.output_partitioning().partition_count() > 1 {
932935
// When there is an existing ordering, we preserve ordering
@@ -938,14 +941,17 @@ fn add_spm_on_top(input: DistributionContext) -> DistributionContext {
938941
let should_preserve_ordering = input.plan.output_ordering().is_some();
939942

940943
let new_plan = if should_preserve_ordering {
941-
Arc::new(SortPreservingMergeExec::new(
942-
input
943-
.plan
944-
.output_ordering()
945-
.unwrap_or(&LexOrdering::default())
946-
.clone(),
947-
Arc::clone(&input.plan),
948-
)) as _
944+
Arc::new(
945+
SortPreservingMergeExec::new(
946+
input
947+
.plan
948+
.output_ordering()
949+
.unwrap_or(&LexOrdering::default())
950+
.clone(),
951+
Arc::clone(&input.plan),
952+
)
953+
.with_fetch(fetch.take()),
954+
) as _
949955
} else {
950956
Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
951957
};
@@ -975,18 +981,24 @@ fn add_spm_on_top(input: DistributionContext) -> DistributionContext {
975981
/// ```
976982
fn remove_dist_changing_operators(
977983
mut distribution_context: DistributionContext,
978-
) -> Result<DistributionContext> {
984+
) -> Result<(DistributionContext, Option<usize>)> {
985+
let mut fetch = None;
979986
while is_repartition(&distribution_context.plan)
980987
|| is_coalesce_partitions(&distribution_context.plan)
981988
|| is_sort_preserving_merge(&distribution_context.plan)
982989
{
990+
if is_sort_preserving_merge(&distribution_context.plan) {
991+
if let Some(child_fetch) = distribution_context.plan.fetch() {
992+
fetch = Some(fetch.map_or(child_fetch, |f: usize| f.min(child_fetch)));
993+
}
994+
}
983995
// All of above operators have a single child. First child is only child.
984996
// Remove any distribution changing operators at the beginning:
985997
distribution_context = distribution_context.children.swap_remove(0);
986998
// Note that they will be re-inserted later on if necessary or helpful.
987999
}
9881000

989-
Ok(distribution_context)
1001+
Ok((distribution_context, fetch))
9901002
}
9911003

9921004
/// Updates the [`DistributionContext`] if preserving ordering while changing partitioning is not helpful or desirable.
@@ -1009,23 +1021,26 @@ fn remove_dist_changing_operators(
10091021
/// ```
10101022
fn replace_order_preserving_variants(
10111023
mut context: DistributionContext,
1012-
) -> Result<DistributionContext> {
1013-
context.children = context
1014-
.children
1015-
.into_iter()
1016-
.map(|child| {
1017-
if child.data {
1018-
replace_order_preserving_variants(child)
1019-
} else {
1020-
Ok(child)
1021-
}
1022-
})
1023-
.collect::<Result<Vec<_>>>()?;
1024+
) -> Result<(DistributionContext, Option<usize>)> {
1025+
let mut children = vec![];
1026+
let mut fetch = None;
1027+
for child in context.children.into_iter() {
1028+
if child.data {
1029+
let (child, inner_fetch) = replace_order_preserving_variants(child)?;
1030+
children.push(child);
1031+
fetch = inner_fetch;
1032+
} else {
1033+
children.push(child);
1034+
}
1035+
}
1036+
context.children = children;
10241037

10251038
if is_sort_preserving_merge(&context.plan) {
1039+
// Keep the fetch value of the SortPreservingMerge operator, maybe it will be used later.
1040+
let fetch = context.plan.fetch();
10261041
let child_plan = Arc::clone(&context.children[0].plan);
10271042
context.plan = Arc::new(CoalescePartitionsExec::new(child_plan));
1028-
return Ok(context);
1043+
return Ok((context, fetch));
10291044
} else if let Some(repartition) =
10301045
context.plan.as_any().downcast_ref::<RepartitionExec>()
10311046
{
@@ -1034,11 +1049,11 @@ fn replace_order_preserving_variants(
10341049
Arc::clone(&context.children[0].plan),
10351050
repartition.partitioning().clone(),
10361051
)?);
1037-
return Ok(context);
1052+
return Ok((context, None));
10381053
}
10391054
}
10401055

1041-
context.update_plan_from_children()
1056+
Ok((context.update_plan_from_children()?, fetch))
10421057
}
10431058

10441059
/// A struct to keep track of repartition requirements for each child node.
@@ -1175,11 +1190,14 @@ fn ensure_distribution(
11751190
unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort;
11761191

11771192
// Remove unnecessary repartition from the physical plan if any
1178-
let DistributionContext {
1179-
mut plan,
1180-
data,
1181-
children,
1182-
} = remove_dist_changing_operators(dist_context)?;
1193+
let (
1194+
DistributionContext {
1195+
mut plan,
1196+
data,
1197+
children,
1198+
},
1199+
mut fetch,
1200+
) = remove_dist_changing_operators(dist_context)?;
11831201

11841202
if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
11851203
if let Some(updated_window) = get_best_fitting_window(
@@ -1244,7 +1262,7 @@ fn ensure_distribution(
12441262
// Satisfy the distribution requirement if it is unmet.
12451263
match &requirement {
12461264
Distribution::SinglePartition => {
1247-
child = add_spm_on_top(child);
1265+
child = add_spm_on_top(child, &mut fetch);
12481266
}
12491267
Distribution::HashPartitioned(exprs) => {
12501268
if add_roundrobin {
@@ -1279,15 +1297,17 @@ fn ensure_distribution(
12791297
if (!ordering_satisfied || !order_preserving_variants_desirable)
12801298
&& child.data
12811299
{
1282-
child = replace_order_preserving_variants(child)?;
1300+
let (replaced_child, fetch) =
1301+
replace_order_preserving_variants(child)?;
1302+
child = replaced_child;
12831303
// If ordering requirements were satisfied before repartitioning,
12841304
// make sure ordering requirements are still satisfied after.
12851305
if ordering_satisfied {
12861306
// Make sure to satisfy ordering requirement:
12871307
child = add_sort_above_with_check(
12881308
child,
12891309
required_input_ordering.clone(),
1290-
None,
1310+
fetch,
12911311
);
12921312
}
12931313
}
@@ -1299,12 +1319,12 @@ fn ensure_distribution(
12991319
// Operator requires specific distribution.
13001320
Distribution::SinglePartition | Distribution::HashPartitioned(_) => {
13011321
// Since there is no ordering requirement, preserving ordering is pointless
1302-
child = replace_order_preserving_variants(child)?;
1322+
child = replace_order_preserving_variants(child)?.0;
13031323
}
13041324
Distribution::UnspecifiedDistribution => {
13051325
// Since ordering is lost, trying to preserve ordering is pointless
13061326
if !maintains || plan.as_any().is::<OutputRequirementExec>() {
1307-
child = replace_order_preserving_variants(child)?;
1327+
child = replace_order_preserving_variants(child)?.0;
13081328
}
13091329
}
13101330
}
@@ -1351,6 +1371,21 @@ fn ensure_distribution(
13511371
plan.with_new_children(children_plans)?
13521372
};
13531373

1374+
// If `fetch` was not consumed, it means that there was `SortPreservingMergeExec` with fetch before
1375+
// It was removed by `remove_dist_changing_operators`
1376+
// and we need to add it back.
1377+
if fetch.is_some() {
1378+
plan = Arc::new(
1379+
SortPreservingMergeExec::new(
1380+
plan.output_ordering()
1381+
.unwrap_or(&LexOrdering::default())
1382+
.clone(),
1383+
plan,
1384+
)
1385+
.with_fetch(fetch.take()),
1386+
)
1387+
}
1388+
13541389
Ok(Transformed::yes(DistributionContext::new(
13551390
plan, data, children,
13561391
)))

0 commit comments

Comments
 (0)