diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index aa6b3e91cf..83f86dbee8 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1230,21 +1230,20 @@ impl PhysicalPlanner { )), )) } else { - // we insert a projection around the hash join in this case - let projection = + let swapped_hash_join = swap_hash_join(hash_join.as_ref(), PartitionMode::Partitioned)?; - let swapped_hash_join = Arc::clone(projection.children()[0]); - let mut additional_native_plans = swapped_hash_join - .children() - .iter() - .map(|p| Arc::clone(p)) - .collect::>(); - additional_native_plans.push(Arc::clone(&swapped_hash_join)); + + let mut additional_native_plans = vec![]; + if swapped_hash_join.as_any().is::() { + // a projection was added to the hash join + additional_native_plans.push(Arc::clone(swapped_hash_join.children()[0])); + } + Ok(( scans, Arc::new(SparkPlan::new_with_additional( spark_plan.plan_id, - projection, + swapped_hash_join, vec![join_params.left, join_params.right], additional_native_plans, )), @@ -2550,12 +2549,7 @@ mod tests { assert_eq!("FilterExec", filter_exec.native_plan.name()); assert_eq!(1, filter_exec.children.len()); - assert_eq!(1, filter_exec.additional_native_plans.len()); - assert_eq!("ScanExec", filter_exec.additional_native_plans[0].name()); - - let scan_exec = &filter_exec.children()[0]; - assert_eq!("ScanExec", scan_exec.native_plan.name()); - assert_eq!(0, scan_exec.additional_native_plans.len()); + assert_eq!(0, filter_exec.additional_native_plans.len()); } #[test] @@ -2581,10 +2575,6 @@ mod tests { assert_eq!(2, hash_join_exec.children.len()); assert_eq!("ScanExec", hash_join_exec.children[0].native_plan.name()); assert_eq!("ScanExec", hash_join_exec.children[1].native_plan.name()); - - assert_eq!(2, hash_join_exec.additional_native_plans.len()); - assert_eq!("ScanExec", hash_join_exec.additional_native_plans[0].name()); - assert_eq!("ScanExec", hash_join_exec.additional_native_plans[1].name()); } fn create_bound_reference(index: i32) -> Expr { diff --git a/native/core/src/execution/datafusion/spark_plan.rs b/native/core/src/execution/datafusion/spark_plan.rs index 6660c5bc4f..4e26ff888f 100644 --- a/native/core/src/execution/datafusion/spark_plan.rs +++ b/native/core/src/execution/datafusion/spark_plan.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::execution::operators::{CopyExec, ScanExec}; +use crate::execution::operators::CopyExec; use arrow_schema::SchemaRef; use datafusion::physical_plan::ExecutionPlan; use std::sync::Arc; @@ -32,7 +32,7 @@ pub(crate) struct SparkPlan { /// Child Spark plans pub(crate) children: Vec>, /// Additional native plans that were generated for this Spark plan that we need - /// to collect metrics for (such as CopyExec and ScanExec) + /// to collect metrics for pub(crate) additional_native_plans: Vec>, } @@ -94,9 +94,5 @@ fn collect_additional_plans( ) { if child.as_any().is::() { additional_native_plans.push(Arc::clone(&child)); - // CopyExec may be wrapping a ScanExec - collect_additional_plans(Arc::clone(child.children()[0]), additional_native_plans); - } else if child.as_any().is::() { - additional_native_plans.push(Arc::clone(&child)); } }