Skip to content

Commit b2e92ac

Browse files
committed
update
1 parent cf04fd7 commit b2e92ac

File tree

2 files changed

+10
-8
lines changed

2 files changed

+10
-8
lines changed

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3229,6 +3229,8 @@ async fn apply_enforce_distribution_multiple_times() -> Result<()> {
32293229
.create_physical_plan(&optimized_logical_plan, &session_state)
32303230
.await?;
32313231

3232+
// println!("{}", displayable(optimized_physical_plan.as_ref()).indent(true));
3233+
32323234
let mut results = optimized_physical_plan
32333235
.execute(0, ctx.task_ctx().clone())
32343236
.unwrap();

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1016,10 +1016,9 @@ fn remove_dist_changing_operators(
10161016
// All of above operators have a single child. First child is only child.
10171017
// Remove any distribution changing operators at the beginning:
10181018
distribution_context = distribution_context.children.swap_remove(0);
1019-
distribution_context.data.fetch = fetch;
10201019
// Note that they will be re-inserted later on if necessary or helpful.
10211020
}
1022-
1021+
distribution_context.data.fetch = fetch;
10231022
Ok(distribution_context)
10241023
}
10251024

@@ -1048,8 +1047,8 @@ fn replace_order_preserving_variants(
10481047
let mut fetch = None;
10491048
for child in context.children.into_iter() {
10501049
if child.data.has_dist_changing {
1051-
let child = replace_order_preserving_variants(child)?;
1052-
fetch = child.data.fetch;
1050+
let mut child = replace_order_preserving_variants(child)?;
1051+
fetch = child.data.fetch.take();
10531052
children.push(child);
10541053
} else {
10551054
children.push(child);
@@ -1219,6 +1218,7 @@ pub fn ensure_distribution(
12191218
mut data,
12201219
children,
12211220
} = remove_dist_changing_operators(dist_context)?;
1221+
let mut fetch = data.fetch.take();
12221222

12231223
if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
12241224
if let Some(updated_window) = get_best_fitting_window(
@@ -1283,7 +1283,7 @@ pub fn ensure_distribution(
12831283
// Satisfy the distribution requirement if it is unmet.
12841284
match &requirement {
12851285
Distribution::SinglePartition => {
1286-
child = add_spm_on_top(child, &mut data.fetch);
1286+
child = add_spm_on_top(child, &mut fetch);
12871287
}
12881288
Distribution::HashPartitioned(exprs) => {
12891289
if add_roundrobin {
@@ -1319,10 +1319,10 @@ pub fn ensure_distribution(
13191319
&& child.data.has_dist_changing
13201320
{
13211321
child = replace_order_preserving_variants(child)?;
1322+
let fetch = child.data.fetch.take();
13221323
// If ordering requirements were satisfied before repartitioning,
13231324
// make sure ordering requirements are still satisfied after.
13241325
if ordering_satisfied {
1325-
let fetch = child.data.fetch;
13261326
// Make sure to satisfy ordering requirement:
13271327
child = add_sort_above_with_check(
13281328
child,
@@ -1394,15 +1394,15 @@ pub fn ensure_distribution(
13941394
// If `fetch` was not consumed, it means that there was `SortPreservingMergeExec` with fetch before
13951395
// It was removed by `remove_dist_changing_operators`
13961396
// and we need to add it back.
1397-
if data.fetch.is_some() {
1397+
if fetch.is_some() {
13981398
plan = Arc::new(
13991399
SortPreservingMergeExec::new(
14001400
plan.output_ordering()
14011401
.unwrap_or(&LexOrdering::default())
14021402
.clone(),
14031403
plan,
14041404
)
1405-
.with_fetch(data.fetch.take()),
1405+
.with_fetch(fetch.take()),
14061406
)
14071407
}
14081408

0 commit comments

Comments
 (0)