From 2534d484918989b4accdfb240bfee762c9efd539 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 19 Mar 2024 17:15:52 -0400 Subject: [PATCH 01/10] Add prototype non copying API for optimize --- datafusion/optimizer/src/optimizer.rs | 17 ++++++++++++++++- .../src/simplify_expressions/simplify_exprs.rs | 16 +++++++++++++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index fe63766fc265..8fdf105c4582 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -48,10 +48,11 @@ use crate::utils::log_plan; use datafusion_common::alias::AliasGenerator; use datafusion_common::config::ConfigOptions; use datafusion_common::instant::Instant; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_expr::logical_plan::LogicalPlan; use chrono::{DateTime, Utc}; +use datafusion_common::tree_node::Transformed; use log::{debug, warn}; /// `OptimizerRule` transforms one [`LogicalPlan`] into another which @@ -85,6 +86,20 @@ pub trait OptimizerRule { fn apply_order(&self) -> Option { None } + + /// does this rule support rewriting owned plans (to reduce copying)? + fn supports_owned(&self) -> bool { + false + } + + /// if supports_owned returns true, calls try_optimize_owned + fn try_optimize_owned( + &self, + _plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result, DataFusionError> { + not_impl_err!("try_optimized_owned is not implemented for this rule") + } } /// Options to control the DataFusion Optimizer. diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index 70b163acc208..01f3596b4c14 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -19,7 +19,8 @@ use std::sync::Arc; -use datafusion_common::{DFSchema, DFSchemaRef, Result}; +use datafusion_common::tree_node::Transformed; +use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, Result}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::simplify::SimplifyContext; @@ -59,6 +60,19 @@ impl OptimizerRule for SimplifyExpressions { execution_props.query_execution_start_time = config.query_execution_start_time(); Ok(Some(Self::optimize_internal(plan, &execution_props)?)) } + + fn supports_owned(&self) -> bool { + true + } + + /// if supports_owned returns true, calls try_optimize_owned + fn try_optimize_owned( + &self, + _plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result, DataFusionError> { + todo!(); + } } impl SimplifyExpressions { From 0806656d8ca08baf09995cfeb225fce60f510a61 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 19 Mar 2024 17:21:54 -0400 Subject: [PATCH 02/10] passed in owned plan --- datafusion/core/src/execution/context/mod.rs | 4 ++-- datafusion/optimizer/src/optimizer.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 116e45c8c130..2107b311c983 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1877,7 +1877,7 @@ impl SessionState { // optimize the child plan, capturing the output of each optimizer let optimized_plan = self.optimizer.optimize( - &analyzed_plan, + analyzed_plan, self, |optimized_plan, optimizer| { let optimizer_name = optimizer.name().to_string(); @@ -1907,7 +1907,7 @@ impl SessionState { let analyzed_plan = self.analyzer .execute_and_check(plan, self.options(), |_, _| {})?; - self.optimizer.optimize(&analyzed_plan, self, |_, _| {}) + self.optimizer.optimize(analyzed_plan, self, |_, _| {}) } } diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 8fdf105c4582..c0d4bfb6c397 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -294,7 +294,7 @@ impl Optimizer { /// invoking observer function after each call pub fn optimize( &self, - plan: &LogicalPlan, + plan: LogicalPlan, config: &dyn OptimizerConfig, mut observer: F, ) -> Result From ff3c32e67871bc75cea94ca32c16068587b0414d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 20 Mar 2024 09:10:28 -0400 Subject: [PATCH 03/10] WIP: rework optimizer to not use rewrite --- datafusion/expr/src/logical_plan/plan.rs | 15 +++ datafusion/optimizer/src/optimizer.rs | 151 ++++++++++++++--------- 2 files changed, 108 insertions(+), 58 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 08fe3380061f..6eae3749fbce 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -361,6 +361,12 @@ impl LogicalPlan { } } + /// takes all inputs of this plan, unwrapping them if they are + /// not shared + pub fn take_inputs(&self) -> Vec { + todo!() + } + /// returns all inputs of this `LogicalPlan` node. Does not /// include inputs to inputs, or subqueries. pub fn inputs(&self) -> Vec<&LogicalPlan> { @@ -517,6 +523,15 @@ impl LogicalPlan { self.with_new_exprs(self.expressions(), inputs.to_vec()) } + /// returns a new LogicalPlan with the new inputs (potentially rewritten) + /// + pub fn with_new_inputs2( + mut self, + new_inputs: Vec, + ) -> Result { + todo!() + } + /// Returns a new `LogicalPlan` based on `self` with inputs and /// expressions replaced. /// diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index c0d4bfb6c397..3902db003f1e 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -48,7 +48,7 @@ use crate::utils::log_plan; use datafusion_common::alias::AliasGenerator; use datafusion_common::config::ConfigOptions; use datafusion_common::instant::Instant; -use datafusion_common::{not_impl_err, DataFusionError, Result}; +use datafusion_common::{not_impl_err, DFSchema, DataFusionError, Result}; use datafusion_expr::logical_plan::LogicalPlan; use chrono::{DateTime, Utc}; @@ -297,7 +297,7 @@ impl Optimizer { plan: LogicalPlan, config: &dyn OptimizerConfig, mut observer: F, - ) -> Result + ) -> Result> where F: FnMut(&LogicalPlan, &dyn OptimizerRule), { @@ -306,6 +306,7 @@ impl Optimizer { let start_time = Instant::now(); + let mut transformed = false; let mut previous_plans = HashSet::with_capacity(16); previous_plans.insert(LogicalPlanSignature::new(&new_plan)); @@ -314,21 +315,31 @@ impl Optimizer { log_plan(&format!("Optimizer input (pass {i})"), &new_plan); for rule in &self.rules { + // if we are skipping failed rules, we need to keep a copy of the plan in case the optimizer fails + let prev_plan = if options.optimizer.skip_failed_rules { + Some(new_plan.clone()) + } else { + None + }; + + let orig_schema = plan.schema().clone(); + let result = - self.optimize_recursively(rule, &new_plan, config) + self.optimize_recursively(rule, new_plan, config) .and_then(|plan| { - if let Some(plan) = &plan { - assert_schema_is_the_same(rule.name(), plan, &new_plan)?; - } + assert_has_schema(rule.name(), &orig_schema, &new_plan)?; Ok(plan) }); - match result { - Ok(Some(plan)) => { - new_plan = plan; + + match (result, prev_plan) { + (Ok(t), _) if t.transformed => { + transformed = true; + new_plan = t.data; observer(&new_plan, rule.as_ref()); log_plan(rule.name(), &new_plan); } - Ok(None) => { + (Ok(t), _) if !t.transformed => { + new_plan = t.data; observer(&new_plan, rule.as_ref()); debug!( "Plan unchanged by optimizer rule '{}' (pass {})", @@ -336,22 +347,22 @@ impl Optimizer { i ); } - Err(e) => { - if options.optimizer.skip_failed_rules { - // Note to future readers: if you see this warning it signals a - // bug in the DataFusion optimizer. Please consider filing a ticket - // https://github.com/apache/arrow-datafusion - warn!( + + (Err(e), Some(prev_plan)) => { + // Note to future readers: if you see this warning it signals a + // bug in the DataFusion optimizer. Please consider filing a ticket + // https://github.com/apache/arrow-datafusion + warn!( "Skipping optimizer rule '{}' due to unexpected error: {}", rule.name(), e ); - } else { - return Err(DataFusionError::Context( - format!("Optimizer rule '{}' failed", rule.name(),), - Box::new(e), - )); - } + } + (Err(e), None) => { + return Err(DataFusionError::Context( + format!("Optimizer rule '{}' failed", rule.name(),), + Box::new(e), + )); } } } @@ -369,45 +380,58 @@ impl Optimizer { } log_plan("Final optimized plan", &new_plan); debug!("Optimizer took {} ms", start_time.elapsed().as_millis()); - Ok(new_plan) + Ok(if transformed { + Transformed::yes(new_plan) + } else { + Transformed::no(new_plan) + }) } fn optimize_node( &self, rule: &Arc, - plan: &LogicalPlan, + plan: LogicalPlan, config: &dyn OptimizerConfig, - ) -> Result> { - // TODO: future feature: We can do Batch optimize - rule.try_optimize(plan, config) + ) -> Result> { + if rule.supports_owned() { + rule.try_optimize_owned(plan, config) + } else { + // TODO: future feature: We can do Batch optimize + rule.try_optimize(&plan, config) + .map(|opt| { + if let Some(opt_plan) = opt { + Transformed::yes(opt_plan) + } else { + // return original plan + Transformed::no(plan) + } + }) + } } fn optimize_inputs( &self, rule: &Arc, - plan: &LogicalPlan, + mut plan: LogicalPlan, config: &dyn OptimizerConfig, - ) -> Result> { - let inputs = plan.inputs(); - let result = inputs - .iter() + ) -> Result> { + let inputs = plan.take_inputs(); + + let new_inputs = inputs + .into_iter() .map(|sub_plan| self.optimize_recursively(rule, sub_plan, config)) .collect::>>()?; - if result.is_empty() || result.iter().all(|o| o.is_none()) { - return Ok(None); - } - let new_inputs = result - .into_iter() - .zip(inputs) - .map(|(new_plan, old_plan)| match new_plan { - Some(plan) => plan, - None => old_plan.clone(), - }) - .collect(); + let transformed = new_inputs.iter().any(|t| t.transformed); + let new_inputs = new_inputs.into_iter().map(|t| t.data).collect(); + + let plan = plan.with_new_inputs2(new_inputs)?; - let exprs = plan.expressions(); - plan.with_new_exprs(exprs, new_inputs).map(Some) + Ok(if transformed { + Transformed::yes(plan) + } else { + Transformed::no(plan) + }) } /// Use a rule to optimize the whole plan. @@ -415,14 +439,19 @@ impl Optimizer { pub fn optimize_recursively( &self, rule: &Arc, - plan: &LogicalPlan, + plan: LogicalPlan, config: &dyn OptimizerConfig, - ) -> Result> { + ) -> Result> { match rule.apply_order() { Some(order) => match order { ApplyOrder::TopDown => { - let optimize_self_opt = self.optimize_node(rule, plan, config)?; - let optimize_inputs_opt = match &optimize_self_opt { + let optimized_plan = self.optimize_node(rule, plan, config)?; + let transformed = optimized_plan.transformed; + + // TODO make a nicer 'and_then' type API on Transformed + let optimized_plan = self.optimize_inputs(rule, optimized_plan.data, config)?; + + let let optimize_inputs_opt = match &optimize_self_opt { Some(optimized_plan) => { self.optimize_inputs(rule, optimized_plan, config)? } @@ -446,22 +475,17 @@ impl Optimizer { } } -/// Returns an error if plans have different schemas. -/// -/// It ignores metadata and nullability. -pub(crate) fn assert_schema_is_the_same( +pub(crate) fn assert_has_schema( rule_name: &str, - prev_plan: &LogicalPlan, + schema: &DFSchema, new_plan: &LogicalPlan, ) -> Result<()> { - let equivalent = new_plan - .schema() - .equivalent_names_and_types(prev_plan.schema()); + let equivalent = new_plan.schema().equivalent_names_and_types(schema); if !equivalent { let e = DataFusionError::Internal(format!( "Failed due to a difference in schemas, original schema: {:?}, new schema: {:?}", - prev_plan.schema(), + schema, new_plan.schema() )); Err(DataFusionError::Context( @@ -473,6 +497,17 @@ pub(crate) fn assert_schema_is_the_same( } } +/// Returns an error if plans have different schemas. +/// +/// It ignores metadata and nullability. +pub(crate) fn assert_schema_is_the_same( + rule_name: &str, + prev_plan: &LogicalPlan, + new_plan: &LogicalPlan, +) -> Result<()> { + assert_has_schema(rule_name, prev_plan.schema(), new_plan) +} + #[cfg(test)] mod tests { use std::sync::{Arc, Mutex}; From da990023a9eea841cb84ac39292ee389855ea21b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 20 Mar 2024 20:19:55 -0400 Subject: [PATCH 04/10] Get it compiling --- datafusion/core/src/dataframe/mod.rs | 6 +- datafusion/core/src/datasource/view.rs | 2 +- datafusion/core/src/execution/context/mod.rs | 44 ++++++++++----- datafusion/optimizer/src/optimizer.rs | 59 ++++++++++---------- 4 files changed, 62 insertions(+), 49 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index eea5fc1127ce..44c55e1f880b 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -175,7 +175,7 @@ impl DataFrame { /// Consume the DataFrame and produce a physical plan pub async fn create_physical_plan(self) -> Result> { - self.session_state.create_physical_plan(&self.plan).await + self.session_state.create_physical_plan(self.plan).await } /// Filter the DataFrame by column. Returns a new DataFrame only containing the @@ -989,7 +989,7 @@ impl DataFrame { /// [`Self::into_optimized_plan`] for more details. pub fn into_optimized_plan(self) -> Result { // Optimize the plan first for better UX - self.session_state.optimize(&self.plan) + self.session_state.optimize(self.plan) } /// Converts this [`DataFrame`] into a [`TableProvider`] that can be registered @@ -1466,7 +1466,7 @@ impl TableProvider for DataFrameTableProvider { expr = expr.limit(0, Some(l))? } let plan = expr.build()?; - state.create_physical_plan(&plan).await + state.create_physical_plan(plan).await } } diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index 85fb8939886c..1a985ad664af 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -141,7 +141,7 @@ impl TableProvider for ViewTable { plan = plan.limit(0, Some(limit))?; } - state.create_physical_plan(&plan.build()?).await + state.create_physical_plan(plan.build()?).await } } diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 2107b311c983..67595ad7192c 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -82,6 +82,7 @@ use datafusion_sql::{ use async_trait::async_trait; use chrono::{DateTime, Utc}; +use datafusion_common::tree_node::Transformed; use parking_lot::RwLock; use sqlparser::dialect::dialect_from_str; use url::Url; @@ -530,7 +531,7 @@ impl SessionContext { } = cmd; let input = Arc::try_unwrap(input).unwrap_or_else(|e| e.as_ref().clone()); - let input = self.state().optimize(&input)?; + let input = self.state().optimize(input)?; let table = self.table(&name).await; match (if_not_exists, or_replace, table) { (true, false, Ok(_)) => self.return_empty_dataframe(), @@ -1839,13 +1840,22 @@ impl SessionState { } /// Optimizes the logical plan by applying optimizer rules. - pub fn optimize(&self, plan: &LogicalPlan) -> Result { - if let LogicalPlan::Explain(e) = plan { - let mut stringified_plans = e.stringified_plans.clone(); + pub fn optimize(&self, plan: LogicalPlan) -> Result { + if let LogicalPlan::Explain(Explain { + verbose, + plan, + mut stringified_plans, + schema, + logical_optimization_succeeded, + }) = plan + { + // TODO this could be a dummy plan + let original_plan = plan.clone(); // keep original plan in case there is an error // analyze & capture output of each rule + // TODO avoid this copy let analyzer_result = self.analyzer.execute_and_check( - e.plan.as_ref(), + &plan, self.options(), |analyzed_plan, analyzer| { let analyzer_name = analyzer.name().to_string(); @@ -1861,10 +1871,10 @@ impl SessionState { .push(StringifiedPlan::new(plan_type, err.to_string())); return Ok(LogicalPlan::Explain(Explain { - verbose: e.verbose, - plan: e.plan.clone(), + verbose, + plan, stringified_plans, - schema: e.schema.clone(), + schema, logical_optimization_succeeded: false, })); } @@ -1885,29 +1895,33 @@ impl SessionState { stringified_plans.push(optimized_plan.to_stringified(plan_type)); }, ); + let (plan, logical_optimization_succeeded) = match optimized_plan { - Ok(plan) => (Arc::new(plan), true), + Ok(plan) => (Arc::new(plan.data), true), Err(DataFusionError::Context(optimizer_name, err)) => { + // TODO show explain error let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name }; stringified_plans .push(StringifiedPlan::new(plan_type, err.to_string())); - (e.plan.clone(), false) + (original_plan, false) } Err(e) => return Err(e), }; Ok(LogicalPlan::Explain(Explain { - verbose: e.verbose, + verbose, plan, stringified_plans, - schema: e.schema.clone(), + schema, logical_optimization_succeeded, })) } else { let analyzed_plan = self.analyzer - .execute_and_check(plan, self.options(), |_, _| {})?; - self.optimizer.optimize(analyzed_plan, self, |_, _| {}) + .execute_and_check(&plan, self.options(), |_, _| {})?; + self.optimizer + .optimize(analyzed_plan, self, |_, _| {}) + .map(|t| t.data) } } @@ -1920,7 +1934,7 @@ impl SessionState { /// DDL `CREATE TABLE` must be handled by another layer. pub async fn create_physical_plan( &self, - logical_plan: &LogicalPlan, + logical_plan: LogicalPlan, ) -> Result> { let logical_plan = self.optimize(logical_plan)?; self.query_planner diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 3902db003f1e..87ad83339210 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -327,7 +327,7 @@ impl Optimizer { let result = self.optimize_recursively(rule, new_plan, config) .and_then(|plan| { - assert_has_schema(rule.name(), &orig_schema, &new_plan)?; + assert_has_schema(rule.name(), &orig_schema, &plan.data)?; Ok(plan) }); @@ -338,7 +338,7 @@ impl Optimizer { observer(&new_plan, rule.as_ref()); log_plan(rule.name(), &new_plan); } - (Ok(t), _) if !t.transformed => { + (Ok(t), _) => { new_plan = t.data; observer(&new_plan, rule.as_ref()); debug!( @@ -347,7 +347,6 @@ impl Optimizer { i ); } - (Err(e), Some(prev_plan)) => { // Note to future readers: if you see this warning it signals a // bug in the DataFusion optimizer. Please consider filing a ticket @@ -357,6 +356,7 @@ impl Optimizer { rule.name(), e ); + new_plan = prev_plan; } (Err(e), None) => { return Err(DataFusionError::Context( @@ -397,15 +397,14 @@ impl Optimizer { rule.try_optimize_owned(plan, config) } else { // TODO: future feature: We can do Batch optimize - rule.try_optimize(&plan, config) - .map(|opt| { - if let Some(opt_plan) = opt { - Transformed::yes(opt_plan) - } else { - // return original plan - Transformed::no(plan) - } - }) + rule.try_optimize(&plan, config).map(|opt| { + if let Some(opt_plan) = opt { + Transformed::yes(opt_plan) + } else { + // return original plan + Transformed::no(plan) + } + }) } } @@ -449,28 +448,28 @@ impl Optimizer { let transformed = optimized_plan.transformed; // TODO make a nicer 'and_then' type API on Transformed - let optimized_plan = self.optimize_inputs(rule, optimized_plan.data, config)?; - - let let optimize_inputs_opt = match &optimize_self_opt { - Some(optimized_plan) => { - self.optimize_inputs(rule, optimized_plan, config)? - } - _ => self.optimize_inputs(rule, plan, config)?, - }; - Ok(optimize_inputs_opt.or(optimize_self_opt)) + let optimized_plan = + self.optimize_inputs(rule, optimized_plan.data, config)?; + Ok(if transformed || optimized_plan.transformed { + Transformed::yes(optimized_plan.data) + } else { + Transformed::no(optimized_plan.data) + }) } ApplyOrder::BottomUp => { - let optimize_inputs_opt = self.optimize_inputs(rule, plan, config)?; - let optimize_self_opt = match &optimize_inputs_opt { - Some(optimized_plan) => { - self.optimize_node(rule, optimized_plan, config)? - } - _ => self.optimize_node(rule, plan, config)?, - }; - Ok(optimize_self_opt.or(optimize_inputs_opt)) + let optimized_plan = self.optimize_inputs(rule, plan, config)?; + let transformed = optimized_plan.transformed; + let optimized_plan = + self.optimize_node(rule, optimized_plan.data, config)?; + + Ok(if transformed || optimized_plan.transformed { + Transformed::yes(optimized_plan.data) + } else { + Transformed::no(optimized_plan.data) + }) } }, - _ => rule.try_optimize(plan, config), + _ => self.optimize_node(rule, plan, config), } } } From c3946981f88b7e54b221c57c0fc35b9a20161ba9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 20 Mar 2024 20:28:20 -0400 Subject: [PATCH 05/10] better API --- datafusion/expr/src/logical_plan/plan.rs | 19 ++++++------------- datafusion/optimizer/src/optimizer.rs | 18 ++++++++---------- 2 files changed, 14 insertions(+), 23 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 6eae3749fbce..335184357993 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -361,10 +361,12 @@ impl LogicalPlan { } } - /// takes all inputs of this plan, unwrapping them if they are - /// not shared - pub fn take_inputs(&self) -> Vec { - todo!() + /// applies the closure `f` to each input of this node, replacing the existing inputs + /// with the result of the closure. + pub fn rewrite_inputs(mut self, f: F) -> Result + where F: FnMut (LogicalPlan) -> Result + { + todo!(); } /// returns all inputs of this `LogicalPlan` node. Does not @@ -523,15 +525,6 @@ impl LogicalPlan { self.with_new_exprs(self.expressions(), inputs.to_vec()) } - /// returns a new LogicalPlan with the new inputs (potentially rewritten) - /// - pub fn with_new_inputs2( - mut self, - new_inputs: Vec, - ) -> Result { - todo!() - } - /// Returns a new `LogicalPlan` based on `self` with inputs and /// expressions replaced. /// diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 87ad83339210..6b515aa1b27b 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -414,17 +414,15 @@ impl Optimizer { mut plan: LogicalPlan, config: &dyn OptimizerConfig, ) -> Result> { - let inputs = plan.take_inputs(); - let new_inputs = inputs - .into_iter() - .map(|sub_plan| self.optimize_recursively(rule, sub_plan, config)) - .collect::>>()?; - - let transformed = new_inputs.iter().any(|t| t.transformed); - let new_inputs = new_inputs.into_iter().map(|t| t.data).collect(); - - let plan = plan.with_new_inputs2(new_inputs)?; + let mut transformed = false; + let plan = plan.rewrite_inputs(|child| { + let t = self.optimize_recursively(rule, child, config)?; + if t.transformed { + transformed = true; + } + Ok(t.data) + })?; Ok(if transformed { Transformed::yes(plan) From 2d92daa118005d2333c35d1839c1351454aa337b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 21 Mar 2024 05:51:31 -0400 Subject: [PATCH 06/10] Add in hack to rewrite --- datafusion/expr/src/logical_plan/plan.rs | 114 ++++++++++++++++++++- datafusion/optimizer/src/optimizer.rs | 1 - datafusion/sqllogictest/test_files/aal.slt | 6 ++ 3 files changed, 117 insertions(+), 4 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/aal.slt diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 335184357993..4ad4efd700b9 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -17,6 +17,7 @@ //! Logical plan types +use std::cell::OnceCell; use std::collections::{HashMap, HashSet}; use std::fmt::{self, Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; @@ -360,13 +361,120 @@ impl LogicalPlan { | LogicalPlan::Prepare(_) => Ok(()), } } +} + +const PLACEHOLDER: OnceCell> = OnceCell::new(); + +// applies f to rewrite the logical plan, replacing `node` +// +// ideally we would remove the Arc nonsense entirely from LogicalPlan and have it own its inputs +// however, for now do a horrible hack +// +// On rewrite the existing plan is destroyed +fn rewrite_arc(node: &mut Arc, f: &mut F) -> Result<()> +where + F: FnMut(LogicalPlan) -> Result, +{ + let mut new_node = PLACEHOLDER + .get_or_init(|| { + Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: DFSchemaRef::new(DFSchema::empty()), + })) + }) + .clone(); + + // take the old value out of the Arc + std::mem::swap(node, &mut new_node); + + let new_node = match Arc::try_unwrap(new_node) { + Ok(node) => { + println!("Unwrapped arc yay"); + node + } + Err(node) => { + println!("Failed to unwrap arc boo"); + node.as_ref().clone() + } + }; + // do the actual transform + let mut new_node = f(new_node).map(Arc::new)?; + // put the new value back into the Arc + std::mem::swap(node, &mut new_node); + + Ok(()) +} +impl LogicalPlan { /// applies the closure `f` to each input of this node, replacing the existing inputs /// with the result of the closure. - pub fn rewrite_inputs(mut self, f: F) -> Result - where F: FnMut (LogicalPlan) -> Result + pub fn rewrite_inputs(mut self, mut f: F) -> Result + where + F: FnMut(LogicalPlan) -> Result, { - todo!(); + match &mut self { + LogicalPlan::Projection(Projection { input, .. }) => { + rewrite_arc(input, &mut f)? + } + LogicalPlan::Filter(Filter { input, .. }) => rewrite_arc(input, &mut f)?, + LogicalPlan::Repartition(Repartition { input, .. }) => { + rewrite_arc(input, &mut f)? + } + LogicalPlan::Window(Window { input, .. }) => rewrite_arc(input, &mut f)?, + LogicalPlan::Aggregate(Aggregate { input, .. }) => { + rewrite_arc(input, &mut f)? + } + LogicalPlan::Sort(Sort { input, .. }) => rewrite_arc(input, &mut f)?, + LogicalPlan::Join(Join { left, right, .. }) => { + rewrite_arc(left, &mut f)?; + rewrite_arc(right, &mut f)?; + } + LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => { + rewrite_arc(left, &mut f)?; + rewrite_arc(right, &mut f)?; + } + LogicalPlan::Limit(Limit { input, .. }) => rewrite_arc(input, &mut f)?, + LogicalPlan::Subquery(Subquery { subquery, .. }) => { + rewrite_arc(subquery, &mut f)? + } + LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => { + rewrite_arc(input, &mut f)? + } + LogicalPlan::Extension(extension) => todo!(), + LogicalPlan::Union(Union { inputs, .. }) => { + inputs + .iter_mut() + .try_for_each(|input| rewrite_arc(input, &mut f))?; + } + LogicalPlan::Distinct( + Distinct::All(input) | Distinct::On(DistinctOn { input, .. }), + ) => rewrite_arc(input, &mut f)?, + LogicalPlan::Explain(explain) => rewrite_arc(&mut explain.plan, &mut f)?, + LogicalPlan::Analyze(analyze) => rewrite_arc(&mut analyze.input, &mut f)?, + LogicalPlan::Dml(write) => rewrite_arc(&mut write.input, &mut f)?, + LogicalPlan::Copy(copy) => rewrite_arc(&mut copy.input, &mut f)?, + LogicalPlan::Ddl(ddl) => { + todo!(); + } + LogicalPlan::Unnest(Unnest { input, .. }) => rewrite_arc(input, &mut f)?, + LogicalPlan::Prepare(Prepare { input, .. }) => rewrite_arc(input, &mut f)?, + LogicalPlan::RecursiveQuery(RecursiveQuery { + static_term, + recursive_term, + .. + }) => { + rewrite_arc(static_term, &mut f)?; + rewrite_arc(recursive_term, &mut f)?; + } + // plans without inputs + LogicalPlan::TableScan { .. } + | LogicalPlan::Statement { .. } + | LogicalPlan::EmptyRelation { .. } + | LogicalPlan::Values { .. } + | LogicalPlan::DescribeTable(_) => {} + } + + Ok(self) } /// returns all inputs of this `LogicalPlan` node. Does not diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 6b515aa1b27b..65cde9bda06d 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -414,7 +414,6 @@ impl Optimizer { mut plan: LogicalPlan, config: &dyn OptimizerConfig, ) -> Result> { - let mut transformed = false; let plan = plan.rewrite_inputs(|child| { let t = self.optimize_recursively(rule, child, config)?; diff --git a/datafusion/sqllogictest/test_files/aal.slt b/datafusion/sqllogictest/test_files/aal.slt new file mode 100644 index 000000000000..f561d873d100 --- /dev/null +++ b/datafusion/sqllogictest/test_files/aal.slt @@ -0,0 +1,6 @@ + +statement ok +create table t as values (1), (2); + +query +select column1 + column1 from t; From 11ebead4933bdb1ffd3b3322a1e0ced167f0dbf9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 21 Mar 2024 06:01:50 -0400 Subject: [PATCH 07/10] fix a bit --- .../simplify_expressions/simplify_exprs.rs | 43 +++++++++++-------- datafusion/sqllogictest/test_files/aal.slt | 15 ++++++- 2 files changed, 40 insertions(+), 18 deletions(-) diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index 01f3596b4c14..cbccc296e477 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -19,8 +19,8 @@ use std::sync::Arc; -use datafusion_common::tree_node::Transformed; -use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, Result}; +use datafusion_common::tree_node::{Transformed, TransformedResult}; +use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, Result, not_impl_err}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::simplify::SimplifyContext; @@ -56,9 +56,7 @@ impl OptimizerRule for SimplifyExpressions { plan: &LogicalPlan, config: &dyn OptimizerConfig, ) -> Result> { - let mut execution_props = ExecutionProps::new(); - execution_props.query_execution_start_time = config.query_execution_start_time(); - Ok(Some(Self::optimize_internal(plan, &execution_props)?)) + return not_impl_err!("Should use optimized owned") } fn supports_owned(&self) -> bool { @@ -68,21 +66,23 @@ impl OptimizerRule for SimplifyExpressions { /// if supports_owned returns true, calls try_optimize_owned fn try_optimize_owned( &self, - _plan: LogicalPlan, - _config: &dyn OptimizerConfig, + plan: LogicalPlan, + config: &dyn OptimizerConfig, ) -> Result, DataFusionError> { - todo!(); + let mut execution_props = ExecutionProps::new(); + execution_props.query_execution_start_time = config.query_execution_start_time(); + Self::optimize_internal(plan, &execution_props) } } impl SimplifyExpressions { fn optimize_internal( - plan: &LogicalPlan, + plan: LogicalPlan, execution_props: &ExecutionProps, - ) -> Result { + ) -> Result> { let schema = if !plan.inputs().is_empty() { DFSchemaRef::new(merge_schema(plan.inputs())) - } else if let LogicalPlan::TableScan(scan) = plan { + } else if let LogicalPlan::TableScan(scan) = &plan { // When predicates are pushed into a table scan, there is no input // schema to resolve predicates against, so it must be handled specially // @@ -102,11 +102,15 @@ impl SimplifyExpressions { }; let info = SimplifyContext::new(execution_props).with_schema(schema); - let new_inputs = plan - .inputs() - .iter() - .map(|input| Self::optimize_internal(input, execution_props)) - .collect::>>()?; + // rewrite all inputs + let mut transformed = false; + let plan = plan.rewrite_inputs(&mut |plan| { + let t = Self::optimize_internal(plan, execution_props)?; + if t.transformed { + transformed = true; + } + Ok(t.data) + })?; let simplifier = ExprSimplifier::new(info); @@ -134,8 +138,13 @@ impl SimplifyExpressions { }) .collect::>>()?; - plan.with_new_exprs(exprs, new_inputs) + Ok(if transformed { + Transformed::yes(plan) + } else { + Transformed::no(plan) + }) } + } impl SimplifyExpressions { diff --git a/datafusion/sqllogictest/test_files/aal.slt b/datafusion/sqllogictest/test_files/aal.slt index f561d873d100..ba07c125d60c 100644 --- a/datafusion/sqllogictest/test_files/aal.slt +++ b/datafusion/sqllogictest/test_files/aal.slt @@ -2,5 +2,18 @@ statement ok create table t as values (1), (2); -query +query I select column1 + column1 from t; +---- +2 +4 + +query TT +explain select column1 + column1 from t; +---- +logical_plan +Projection: t.column1 + t.column1 +--TableScan: t projection=[column1] +physical_plan +ProjectionExec: expr=[column1@0 + column1@0 as t.column1 + t.column1] +--MemoryExec: partitions=1, partition_sizes=[1] From e749ae98e795a680eb209c6922d8bac8b27ed3f8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 21 Mar 2024 07:28:42 -0400 Subject: [PATCH 08/10] bash through working --- datafusion/expr/src/logical_plan/plan.rs | 134 +++++++++++++++++- .../simplify_expressions/simplify_exprs.rs | 29 ++-- datafusion/sqllogictest/test_files/aal.slt | 2 +- 3 files changed, 151 insertions(+), 14 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 4ad4efd700b9..37209d592912 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -39,7 +39,7 @@ use crate::utils::{ split_conjunction, }; use crate::{ - build_join_schema, expr_vec_fmt, BinaryExpr, BuiltInWindowFunction, + build_join_schema, expr_vec_fmt, lit, BinaryExpr, BuiltInWindowFunction, CreateMemoryTable, CreateView, Expr, ExprSchemable, LogicalPlanBuilder, Operator, TableProviderFilterPushDown, TableSource, WindowFunctionDefinition, }; @@ -363,6 +363,138 @@ impl LogicalPlan { } } +/// writes each elemenet in the iterator using `f` +pub fn rewrite_iter_mut<'a, F>( + i: impl IntoIterator, + mut f: F, +) -> Result<()> +where + F: FnMut(Expr) -> Result, +{ + i.into_iter().try_for_each(|e| rewrite_expr(e, &mut f)) +} + +pub fn rewrite_expr<'a, F>(e: &'a mut Expr, mut f: F) -> Result<()> +where + F: FnMut(Expr) -> Result, +{ + let mut t = lit(0); + std::mem::swap(e, &mut t); + // transform + let mut t = f(t)?; + // put it back + std::mem::swap(e, &mut t); + Ok(()) +} + +impl LogicalPlan { + /// applies the closure `f` to each expression of this node, potentially + /// rewriting it in place + /// + /// If the closure returns an error, the error is returned and the expressions + /// are left in a partially modified state + pub fn rewrite_exprs(mut self, mut f: F) -> Result + where + F: FnMut(Expr) -> Result, + { + match &mut self { + LogicalPlan::Projection(Projection { expr, .. }) => { + rewrite_iter_mut(expr.iter_mut(), &mut f)?; + } + LogicalPlan::Values(Values { values, .. }) => { + rewrite_iter_mut(values.iter_mut().flatten(), &mut f)?; + } + LogicalPlan::Filter(Filter { predicate, .. }) => { + rewrite_expr(predicate, &mut f)? + } + LogicalPlan::Repartition(Repartition { + partitioning_scheme, + .. + }) => match partitioning_scheme { + Partitioning::Hash(expr, _) => rewrite_iter_mut(expr.iter_mut(), &mut f)?, + Partitioning::DistributeBy(expr) => { + rewrite_iter_mut(expr.iter_mut(), &mut f)? + } + Partitioning::RoundRobinBatch(_) => {} + }, + LogicalPlan::Window(Window { window_expr, .. }) => { + rewrite_iter_mut(window_expr.iter_mut(), &mut f)?; + } + LogicalPlan::Aggregate(Aggregate { + group_expr, + aggr_expr, + .. + }) => rewrite_iter_mut( + group_expr.iter_mut().chain(aggr_expr.iter_mut()), + &mut f, + )?, + // There are two part of expression for join, equijoin(on) and non-equijoin(filter). + // 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`. + // 2. the second part is non-equijoin(filter). + LogicalPlan::Join(Join { on, filter, .. }) => { + // don't look at the equijoin expressions as a whole + rewrite_iter_mut( + on.iter_mut().flat_map(|(e1, e2)| { + std::iter::once(e1).chain(std::iter::once(e2)) + }), + &mut f, + )?; + + if let Some(filter) = filter.as_mut() { + rewrite_expr(filter, &mut f)?; + } + } + LogicalPlan::Sort(Sort { expr, .. }) => { + rewrite_iter_mut(expr.iter_mut(), &mut f)? + } + LogicalPlan::Extension(extension) => { + // would be nice to avoid this copy -- maybe can + // update extension to just observer Exprs + //extension.node.expressions().iter().try_for_each(f) + todo!(); + } + LogicalPlan::TableScan(TableScan { filters, .. }) => { + rewrite_iter_mut(filters.iter_mut(), &mut f)?; + } + LogicalPlan::Unnest(Unnest { column, .. }) => { + //f(&Expr::Column(column.clone())) + todo!(); + } + LogicalPlan::Distinct(Distinct::On(DistinctOn { + on_expr, + select_expr, + sort_expr, + .. + })) => rewrite_iter_mut( + on_expr + .iter_mut() + .chain(select_expr.iter_mut()) + .chain(sort_expr.iter_mut().flat_map(|x| x.iter_mut())), + &mut f, + )?, + // plans without expressions + LogicalPlan::EmptyRelation(_) + | LogicalPlan::RecursiveQuery(_) + | LogicalPlan::Subquery(_) + | LogicalPlan::SubqueryAlias(_) + | LogicalPlan::Limit(_) + | LogicalPlan::Statement(_) + | LogicalPlan::CrossJoin(_) + | LogicalPlan::Analyze(_) + | LogicalPlan::Explain(_) + | LogicalPlan::Union(_) + | LogicalPlan::Distinct(Distinct::All(_)) + | LogicalPlan::Dml(_) + | LogicalPlan::Ddl(_) + | LogicalPlan::Copy(_) + | LogicalPlan::DescribeTable(_) + | LogicalPlan::Prepare(_) => {} + } + + Ok(self) + } +} + const PLACEHOLDER: OnceCell> = OnceCell::new(); // applies f to rewrite the logical plan, replacing `node` diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index cbccc296e477..fecb0777af2f 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use datafusion_common::tree_node::{Transformed, TransformedResult}; -use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, Result, not_impl_err}; +use datafusion_common::{not_impl_err, DFSchema, DFSchemaRef, DataFusionError, Result}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::simplify::SimplifyContext; @@ -56,7 +56,7 @@ impl OptimizerRule for SimplifyExpressions { plan: &LogicalPlan, config: &dyn OptimizerConfig, ) -> Result> { - return not_impl_err!("Should use optimized owned") + return not_impl_err!("Should use optimized owned"); } fn supports_owned(&self) -> bool { @@ -127,16 +127,22 @@ impl SimplifyExpressions { simplifier }; - let exprs = plan - .expressions() - .into_iter() - .map(|e| { - // TODO: unify with `rewrite_preserving_name` - let original_name = e.name_for_alias()?; - let new_e = simplifier.simplify(e)?; - new_e.alias_if_changed(original_name) + let plan = plan.rewrite_exprs(|e| { + // TODO: unify with `rewrite_preserving_name` + // todo track if e was rewritten + let original_name = e.name_for_alias()?; + let new_e = simplifier.simplify(e)?; + + // inline new_e.alias_if_changed(original_name) + // to figure out if the expression was transformed + let new_name = new_e.name_for_alias()?; + Ok(if new_name == original_name { + new_e + } else { + transformed = true; + new_e.alias(original_name) }) - .collect::>>()?; + })?; Ok(if transformed { Transformed::yes(plan) @@ -144,7 +150,6 @@ impl SimplifyExpressions { Transformed::no(plan) }) } - } impl SimplifyExpressions { diff --git a/datafusion/sqllogictest/test_files/aal.slt b/datafusion/sqllogictest/test_files/aal.slt index ba07c125d60c..f91b4cea7413 100644 --- a/datafusion/sqllogictest/test_files/aal.slt +++ b/datafusion/sqllogictest/test_files/aal.slt @@ -9,7 +9,7 @@ select column1 + column1 from t; 4 query TT -explain select column1 + column1 from t; +explain select column1 + column1, 2+3 from t; ---- logical_plan Projection: t.column1 + t.column1 From dadc30d30bcf070f74a4ad449850b9456b619281 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 21 Mar 2024 07:29:25 -0400 Subject: [PATCH 09/10] Show expression rewriting working --- datafusion/sqllogictest/test_files/aal.slt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/aal.slt b/datafusion/sqllogictest/test_files/aal.slt index f91b4cea7413..14ca43471eaf 100644 --- a/datafusion/sqllogictest/test_files/aal.slt +++ b/datafusion/sqllogictest/test_files/aal.slt @@ -12,8 +12,8 @@ query TT explain select column1 + column1, 2+3 from t; ---- logical_plan -Projection: t.column1 + t.column1 +Projection: t.column1 + t.column1, Int64(5) AS Int64(2) + Int64(3) --TableScan: t projection=[column1] physical_plan -ProjectionExec: expr=[column1@0 + column1@0 as t.column1 + t.column1] +ProjectionExec: expr=[column1@0 + column1@0 as t.column1 + t.column1, 5 as Int64(2) + Int64(3)] --MemoryExec: partitions=1, partition_sizes=[1] From 4a4d133d10e85ec6c89f47f059fa92d14454f037 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 21 Mar 2024 08:33:24 -0400 Subject: [PATCH 10/10] hack enough to time --- datafusion/expr/src/logical_plan/plan.rs | 4 ++-- .../optimizer/src/simplify_expressions/simplify_exprs.rs | 7 +++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 37209d592912..a83ad20ed778 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -521,11 +521,11 @@ where let new_node = match Arc::try_unwrap(new_node) { Ok(node) => { - println!("Unwrapped arc yay"); + //println!("Unwrapped arc yay"); node } Err(node) => { - println!("Failed to unwrap arc boo"); + //println!("Failed to unwrap arc boo"); node.as_ref().clone() } }; diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index fecb0777af2f..31c2d4af6b3c 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -127,7 +127,14 @@ impl SimplifyExpressions { simplifier }; + let is_filter = matches!(plan, LogicalPlan::Filter(_)); + let plan = plan.rewrite_exprs(|e| { + // no aliasing for filters + if is_filter { + return simplifier.simplify(e); + } + // TODO: unify with `rewrite_preserving_name` // todo track if e was rewritten let original_name = e.name_for_alias()?;