diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 6bfa02adf6dc3..dd705cb5311c9 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -90,7 +90,7 @@ impl EnforceSorting { /// via its children. pub type PlanWithCorrespondingSort = PlanContext; -fn update_sort_ctx_children( +fn update_sort_ctx_children_data( mut node: PlanWithCorrespondingSort, data: bool, ) -> Result { @@ -119,7 +119,7 @@ fn update_sort_ctx_children( } node.data = data; - node.update_plan_from_children() + Ok(node) } /// This object is used within the [`EnforceSorting`] rule to track the closest @@ -322,7 +322,10 @@ pub fn parallelize_sorts( pub fn ensure_sorting( mut requirements: PlanWithCorrespondingSort, ) -> Result> { - requirements = update_sort_ctx_children(requirements, false)?; + // Before starting, making requirements' children's ExecutionPlan be same as the requirements' plan's children's ExecutionPlan. + // It should be guaranteed by previous code, but we need to make sure to avoid any potential missing. + requirements = requirements.update_plan_from_children()?; + requirements = update_sort_ctx_children_data(requirements, false)?; // Perform naive analysis at the beginning -- remove already-satisfied sorts: if requirements.children.is_empty() { @@ -353,7 +356,8 @@ pub fn ensure_sorting( child = update_child_to_remove_unnecessary_sort(idx, child, plan)?; } child = add_sort_above(child, required, None); - child = update_sort_ctx_children(child, true)?; + child = child.update_plan_from_children()?; + child = update_sort_ctx_children_data(child, true)?; } } else if physical_ordering.is_none() || !plan.maintains_input_order()[idx] @@ -383,9 +387,10 @@ pub fn ensure_sorting( Arc::new(LocalLimitExec::new(Arc::clone(&child_node.plan), fetch)); } return Ok(Transformed::yes(child_node)); + } else { + requirements = requirements.update_plan_from_children()?; } - - update_sort_ctx_children(requirements, false).map(Transformed::yes) + update_sort_ctx_children_data(requirements, false).map(Transformed::yes) } /// Analyzes a given [`SortExec`] (`plan`) to determine whether its input @@ -609,8 +614,9 @@ fn remove_corresponding_sort_from_sub_plan( } }) .collect::>()?; + node = node.update_plan_from_children()?; if any_connection || node.children.is_empty() { - node = update_sort_ctx_children(node, false)?; + node = update_sort_ctx_children_data(node, false)?; } // Replace with variants that do not preserve order. @@ -643,7 +649,8 @@ fn remove_corresponding_sort_from_sub_plan( Arc::new(CoalescePartitionsExec::new(plan)) as _ }; node = PlanWithCorrespondingSort::new(plan, false, vec![node]); - node = update_sort_ctx_children(node, false)?; + node = node.update_plan_from_children()?; + node = update_sort_ctx_children_data(node, false)?; } Ok(node) } diff --git a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs index c542f9261a247..2c5c0d4d510ec 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs @@ -45,7 +45,7 @@ use itertools::izip; pub type OrderPreservationContext = PlanContext; /// Updates order-preservation data for all children of the given node. -pub fn update_children(opc: &mut OrderPreservationContext) { +pub fn update_order_preservation_ctx_children_data(opc: &mut OrderPreservationContext) { for PlanContext { plan, children, @@ -244,7 +244,7 @@ pub fn replace_with_order_preserving_variants( is_spm_better: bool, config: &ConfigOptions, ) -> Result> { - update_children(&mut requirements); + update_order_preservation_ctx_children_data(&mut requirements); if !(is_sort(&requirements.plan) && requirements.children[0].data) { return Ok(Transformed::no(requirements)); } diff --git a/datafusion/physical-plan/src/tree_node.rs b/datafusion/physical-plan/src/tree_node.rs index 96bd0de3d37c2..7a5edc7aadddc 100644 --- a/datafusion/physical-plan/src/tree_node.rs +++ b/datafusion/physical-plan/src/tree_node.rs @@ -42,6 +42,8 @@ impl DynTreeNode for dyn ExecutionPlan { /// A node object beneficial for writing optimizer rules, encapsulating an [`ExecutionPlan`] node with a payload. /// Since there are two ways to access child plans—directly from the plan and through child nodes—it's recommended /// to perform mutable operations via [`Self::update_plan_from_children`]. +/// After update `children`, please do the sync updating for `plan`'s children. +/// Or after creating the `PlanContext`, if you can't guarantee they are consistent, call `update_plan_from_children` to sync. #[derive(Debug)] pub struct PlanContext { /// The execution plan associated with this context.