diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 5bd6ab10331a..04e625e8522a 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -805,40 +805,49 @@ impl LogicalPlan { pub fn with_new_exprs( &self, mut expr: Vec, - mut inputs: Vec, + inputs: Vec, ) -> Result { match self { // Since expr may be different than the previous expr, schema of the projection // may change. We need to use try_new method instead of try_new_with_schema method. LogicalPlan::Projection(Projection { .. }) => { - Projection::try_new(expr, Arc::new(inputs.swap_remove(0))) - .map(LogicalPlan::Projection) + let input = self.only_input(inputs)?; + Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection) } LogicalPlan::Dml(DmlStatement { table_name, table_schema, op, .. - }) => Ok(LogicalPlan::Dml(DmlStatement::new( - table_name.clone(), - Arc::clone(table_schema), - op.clone(), - Arc::new(inputs.swap_remove(0)), - ))), + }) => { + self.assert_no_expressions(expr)?; + let input = self.only_input(inputs)?; + Ok(LogicalPlan::Dml(DmlStatement::new( + table_name.clone(), + Arc::clone(table_schema), + op.clone(), + Arc::new(input), + ))) + } LogicalPlan::Copy(CopyTo { input: _, output_url, file_type, options, partition_by, - }) => Ok(LogicalPlan::Copy(CopyTo { - input: Arc::new(inputs.swap_remove(0)), - output_url: output_url.clone(), - file_type: Arc::clone(file_type), - options: options.clone(), - partition_by: partition_by.clone(), - })), + }) => { + self.assert_no_expressions(expr)?; + let input = self.only_input(inputs)?; + Ok(LogicalPlan::Copy(CopyTo { + input: Arc::new(input), + output_url: output_url.clone(), + file_type: Arc::clone(file_type), + options: options.clone(), + partition_by: partition_by.clone(), + })) + } LogicalPlan::Values(Values { schema, .. }) => { + self.assert_no_inputs(inputs)?; Ok(LogicalPlan::Values(Values { schema: Arc::clone(schema), values: expr @@ -848,54 +857,63 @@ impl LogicalPlan { })) } LogicalPlan::Filter { .. } => { - assert_eq!(1, expr.len()); - let predicate = expr.pop().unwrap(); + let predicate = self.only_expr(expr)?; + let input = self.only_input(inputs)?; - Filter::try_new(predicate, Arc::new(inputs.swap_remove(0))) - .map(LogicalPlan::Filter) + Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter) } LogicalPlan::Repartition(Repartition { partitioning_scheme, .. }) => match partitioning_scheme { Partitioning::RoundRobinBatch(n) => { + self.assert_no_expressions(expr)?; + let input = self.only_input(inputs)?; Ok(LogicalPlan::Repartition(Repartition { partitioning_scheme: Partitioning::RoundRobinBatch(*n), - input: Arc::new(inputs.swap_remove(0)), + input: Arc::new(input), + })) + } + Partitioning::Hash(_, n) => { + let input = self.only_input(inputs)?; + Ok(LogicalPlan::Repartition(Repartition { + partitioning_scheme: Partitioning::Hash(expr, *n), + input: Arc::new(input), })) } - Partitioning::Hash(_, n) => Ok(LogicalPlan::Repartition(Repartition { - partitioning_scheme: Partitioning::Hash(expr, *n), - input: Arc::new(inputs.swap_remove(0)), - })), Partitioning::DistributeBy(_) => { + let input = self.only_input(inputs)?; Ok(LogicalPlan::Repartition(Repartition { partitioning_scheme: Partitioning::DistributeBy(expr), - input: Arc::new(inputs.swap_remove(0)), + input: Arc::new(input), })) } }, LogicalPlan::Window(Window { window_expr, .. }) => { assert_eq!(window_expr.len(), expr.len()); - Window::try_new(expr, Arc::new(inputs.swap_remove(0))) - .map(LogicalPlan::Window) + let input = self.only_input(inputs)?; + Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window) } LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => { + let input = self.only_input(inputs)?; // group exprs are the first expressions let agg_expr = expr.split_off(group_expr.len()); - Aggregate::try_new(Arc::new(inputs.swap_remove(0)), expr, agg_expr) + Aggregate::try_new(Arc::new(input), expr, agg_expr) .map(LogicalPlan::Aggregate) } LogicalPlan::Sort(Sort { expr: sort_expr, fetch, .. - }) => Ok(LogicalPlan::Sort(Sort { - expr: replace_sort_expressions(sort_expr.clone(), expr), - input: Arc::new(inputs.swap_remove(0)), - fetch: *fetch, - })), + }) => { + let input = self.only_input(inputs)?; + Ok(LogicalPlan::Sort(Sort { + expr: replace_sort_expressions(sort_expr.clone(), expr), + input: Arc::new(input), + fetch: *fetch, + })) + } LogicalPlan::Join(Join { join_type, join_constraint, @@ -903,8 +921,8 @@ impl LogicalPlan { null_equals_null, .. }) => { - let schema = - build_join_schema(inputs[0].schema(), inputs[1].schema(), join_type)?; + let (left, right) = self.only_two_inputs(inputs)?; + let schema = build_join_schema(left.schema(), right.schema(), join_type)?; let equi_expr_count = on.len(); assert!(expr.len() >= equi_expr_count); @@ -933,8 +951,8 @@ impl LogicalPlan { }).collect::>>()?; Ok(LogicalPlan::Join(Join { - left: Arc::new(inputs.swap_remove(0)), - right: Arc::new(inputs.swap_remove(0)), + left: Arc::new(left), + right: Arc::new(right), join_type: *join_type, join_constraint: *join_constraint, on: new_on, @@ -944,28 +962,34 @@ impl LogicalPlan { })) } LogicalPlan::CrossJoin(_) => { - let left = inputs.swap_remove(0); - let right = inputs.swap_remove(0); + self.assert_no_expressions(expr)?; + let (left, right) = self.only_two_inputs(inputs)?; LogicalPlanBuilder::from(left).cross_join(right)?.build() } LogicalPlan::Subquery(Subquery { outer_ref_columns, .. }) => { - let subquery = LogicalPlanBuilder::from(inputs.swap_remove(0)).build()?; + self.assert_no_expressions(expr)?; + let input = self.only_input(inputs)?; + let subquery = LogicalPlanBuilder::from(input).build()?; Ok(LogicalPlan::Subquery(Subquery { subquery: Arc::new(subquery), outer_ref_columns: outer_ref_columns.clone(), })) } LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => { - SubqueryAlias::try_new(Arc::new(inputs.swap_remove(0)), alias.clone()) + self.assert_no_expressions(expr)?; + let input = self.only_input(inputs)?; + SubqueryAlias::try_new(Arc::new(input), alias.clone()) .map(LogicalPlan::SubqueryAlias) } LogicalPlan::Limit(Limit { skip, fetch, .. }) => { + self.assert_no_expressions(expr)?; + let input = self.only_input(inputs)?; Ok(LogicalPlan::Limit(Limit { skip: *skip, fetch: *fetch, - input: Arc::new(inputs.swap_remove(0)), + input: Arc::new(input), })) } LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable { @@ -974,31 +998,40 @@ impl LogicalPlan { or_replace, column_defaults, .. - })) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable( - CreateMemoryTable { - input: Arc::new(inputs.swap_remove(0)), - constraints: Constraints::empty(), - name: name.clone(), - if_not_exists: *if_not_exists, - or_replace: *or_replace, - column_defaults: column_defaults.clone(), - }, - ))), + })) => { + self.assert_no_expressions(expr)?; + let input = self.only_input(inputs)?; + Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable( + CreateMemoryTable { + input: Arc::new(input), + constraints: Constraints::empty(), + name: name.clone(), + if_not_exists: *if_not_exists, + or_replace: *or_replace, + column_defaults: column_defaults.clone(), + }, + ))) + } LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { name, or_replace, definition, .. - })) => Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { - input: Arc::new(inputs.swap_remove(0)), - name: name.clone(), - or_replace: *or_replace, - definition: definition.clone(), - }))), + })) => { + self.assert_no_expressions(expr)?; + let input = self.only_input(inputs)?; + Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { + input: Arc::new(input), + name: name.clone(), + or_replace: *or_replace, + definition: definition.clone(), + }))) + } LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension { node: e.node.with_exprs_and_inputs(expr, inputs)?, })), LogicalPlan::Union(Union { schema, .. }) => { + self.assert_no_expressions(expr)?; let input_schema = inputs[0].schema(); // If inputs are not pruned do not change schema. let schema = if schema.fields().len() == input_schema.fields().len() { @@ -1013,12 +1046,17 @@ impl LogicalPlan { } LogicalPlan::Distinct(distinct) => { let distinct = match distinct { - Distinct::All(_) => Distinct::All(Arc::new(inputs.swap_remove(0))), + Distinct::All(_) => { + self.assert_no_expressions(expr)?; + let input = self.only_input(inputs)?; + Distinct::All(Arc::new(input)) + } Distinct::On(DistinctOn { on_expr, select_expr, .. }) => { + let input = self.only_input(inputs)?; let sort_expr = expr.split_off(on_expr.len() + select_expr.len()); let select_expr = expr.split_off(on_expr.len()); assert!(sort_expr.is_empty(), "with_new_exprs for Distinct does not support sort expressions"); @@ -1026,7 +1064,7 @@ impl LogicalPlan { expr, select_expr, None, // no sort expressions accepted - Arc::new(inputs.swap_remove(0)), + Arc::new(input), )?) } }; @@ -1034,30 +1072,31 @@ impl LogicalPlan { } LogicalPlan::RecursiveQuery(RecursiveQuery { name, is_distinct, .. - }) => Ok(LogicalPlan::RecursiveQuery(RecursiveQuery { - name: name.clone(), - static_term: Arc::new(inputs.swap_remove(0)), - recursive_term: Arc::new(inputs.swap_remove(0)), - is_distinct: *is_distinct, - })), + }) => { + self.assert_no_expressions(expr)?; + let (static_term, recursive_term) = self.only_two_inputs(inputs)?; + Ok(LogicalPlan::RecursiveQuery(RecursiveQuery { + name: name.clone(), + static_term: Arc::new(static_term), + recursive_term: Arc::new(recursive_term), + is_distinct: *is_distinct, + })) + } LogicalPlan::Analyze(a) => { - assert!(expr.is_empty()); - assert_eq!(inputs.len(), 1); + self.assert_no_expressions(expr)?; + let input = self.only_input(inputs)?; Ok(LogicalPlan::Analyze(Analyze { verbose: a.verbose, schema: Arc::clone(&a.schema), - input: Arc::new(inputs.swap_remove(0)), + input: Arc::new(input), })) } LogicalPlan::Explain(e) => { - assert!( - expr.is_empty(), - "Invalid EXPLAIN command. Expression should empty" - ); - assert_eq!(inputs.len(), 1, "Invalid EXPLAIN command. Inputs are empty"); + self.assert_no_expressions(expr)?; + let input = self.only_input(inputs)?; Ok(LogicalPlan::Explain(Explain { verbose: e.verbose, - plan: Arc::new(inputs.swap_remove(0)), + plan: Arc::new(input), stringified_plans: e.stringified_plans.clone(), schema: Arc::clone(&e.schema), logical_optimization_succeeded: e.logical_optimization_succeeded, @@ -1065,13 +1104,17 @@ impl LogicalPlan { } LogicalPlan::Prepare(Prepare { name, data_types, .. - }) => Ok(LogicalPlan::Prepare(Prepare { - name: name.clone(), - data_types: data_types.clone(), - input: Arc::new(inputs.swap_remove(0)), - })), + }) => { + self.assert_no_expressions(expr)?; + let input = self.only_input(inputs)?; + Ok(LogicalPlan::Prepare(Prepare { + name: name.clone(), + data_types: data_types.clone(), + input: Arc::new(input), + })) + } LogicalPlan::TableScan(ts) => { - assert!(inputs.is_empty(), "{self:?} should have no inputs"); + self.assert_no_inputs(inputs)?; Ok(LogicalPlan::TableScan(TableScan { filters: expr, ..ts.clone() @@ -1079,26 +1122,89 @@ impl LogicalPlan { } LogicalPlan::EmptyRelation(_) | LogicalPlan::Ddl(_) - | LogicalPlan::Statement(_) => { + | LogicalPlan::Statement(_) + | LogicalPlan::DescribeTable(_) => { // All of these plan types have no inputs / exprs so should not be called - assert!(expr.is_empty(), "{self:?} should have no exprs"); - assert!(inputs.is_empty(), "{self:?} should have no inputs"); + self.assert_no_expressions(expr)?; + self.assert_no_inputs(inputs)?; Ok(self.clone()) } - LogicalPlan::DescribeTable(_) => Ok(self.clone()), LogicalPlan::Unnest(Unnest { exec_columns: columns, options, .. }) => { + self.assert_no_expressions(expr)?; + let input = self.only_input(inputs)?; // Update schema with unnested column type. - let input = inputs.swap_remove(0); let new_plan = unnest_with_options(input, columns.clone(), options.clone())?; Ok(new_plan) } } } + + /// Helper for [Self::with_new_exprs] to use when no expressions are expected. + #[inline] + #[allow(clippy::needless_pass_by_value)] // expr is moved intentionally to ensure it's not used again + fn assert_no_expressions(&self, expr: Vec) -> Result<()> { + if !expr.is_empty() { + return internal_err!("{self:?} should have no exprs, got {:?}", expr); + } + Ok(()) + } + + /// Helper for [Self::with_new_exprs] to use when no inputs are expected. + #[inline] + #[allow(clippy::needless_pass_by_value)] // inputs is moved intentionally to ensure it's not used again + fn assert_no_inputs(&self, inputs: Vec) -> Result<()> { + if !inputs.is_empty() { + return internal_err!("{self:?} should have no inputs, got: {:?}", inputs); + } + Ok(()) + } + + /// Helper for [Self::with_new_exprs] to use when exactly one expression is expected. + #[inline] + fn only_expr(&self, mut expr: Vec) -> Result { + if expr.len() != 1 { + return internal_err!( + "{self:?} should have exactly one expr, got {:?}", + expr + ); + } + Ok(expr.remove(0)) + } + + /// Helper for [Self::with_new_exprs] to use when exactly one input is expected. + #[inline] + fn only_input(&self, mut inputs: Vec) -> Result { + if inputs.len() != 1 { + return internal_err!( + "{self:?} should have exactly one input, got {:?}", + inputs + ); + } + Ok(inputs.remove(0)) + } + + /// Helper for [Self::with_new_exprs] to use when exactly two inputs are expected. + #[inline] + fn only_two_inputs( + &self, + mut inputs: Vec, + ) -> Result<(LogicalPlan, LogicalPlan)> { + if inputs.len() != 2 { + return internal_err!( + "{self:?} should have exactly two inputs, got {:?}", + inputs + ); + } + let right = inputs.remove(1); + let left = inputs.remove(0); + Ok((left, right)) + } + /// Replaces placeholder param values (like `$1`, `$2`) in [`LogicalPlan`] /// with the specified `param_values`. ///