From ed2aff9e8a4a2c6c7bf3ec4f4375ebf76a089814 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Thu, 15 Aug 2024 11:43:21 -0700 Subject: [PATCH 01/18] [FEAT] Add DAFT_ENABLE_ACTOR_POOL_PROJECTS=1 feature flag and specifying concurrency --- daft/udf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/daft/udf.py b/daft/udf.py index 51082d9878..56950294d6 100644 --- a/daft/udf.py +++ b/daft/udf.py @@ -445,6 +445,7 @@ def udf( num_gpus: float | None = None, memory_bytes: int | None = None, batch_size: int | None = None, + _concurrency: int | None = None, ) -> Callable[[UserProvidedPythonFunction | type], StatelessUDF | StatefulUDF]: """Decorator to convert a Python function into a UDF From b5d6848a9c447e2ebba319b404810e779dccb7f2 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 6 Aug 2024 15:33:06 -0700 Subject: [PATCH 02/18] [FEAT] Implement splitting of Project into actor pool projects Implement core logic Add TreeNodeRewriter logic Add passing unit test Add test with multiple stateful projections Add another test that is currently failing --- src/daft-dsl/src/expr.rs | 2 +- .../src/logical_optimization/rules/mod.rs | 1 + .../rules/split_actor_pool_projects.rs | 468 ++++++++++++++++++ .../src/logical_optimization/test/mod.rs | 3 +- 4 files changed, 472 insertions(+), 2 deletions(-) create mode 100644 src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs diff --git a/src/daft-dsl/src/expr.rs b/src/daft-dsl/src/expr.rs index 3bf1631888..97f29b1d98 100644 --- a/src/daft-dsl/src/expr.rs +++ b/src/daft-dsl/src/expr.rs @@ -610,7 +610,7 @@ impl Expr { } } - pub(super) fn with_new_children(&self, children: Vec) -> Expr { + pub fn with_new_children(&self, children: Vec) -> Expr { use Expr::*; match self { // no children diff --git a/src/daft-plan/src/logical_optimization/rules/mod.rs b/src/daft-plan/src/logical_optimization/rules/mod.rs index 2c78fd9b7b..3854e2f503 100644 --- a/src/daft-plan/src/logical_optimization/rules/mod.rs +++ b/src/daft-plan/src/logical_optimization/rules/mod.rs @@ -3,6 +3,7 @@ mod push_down_filter; mod push_down_limit; mod push_down_projection; mod rule; +mod split_actor_pool_projects; pub use drop_repartition::DropRepartition; pub use push_down_filter::PushDownFilter; diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs new file mode 100644 index 0000000000..20b2cbbefe --- /dev/null +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -0,0 +1,468 @@ +use std::{collections::HashSet, default, iter, sync::Arc}; + +use common_error::DaftResult; +use common_treenode::{TreeNode, TreeNodeRewriter}; +use daft_dsl::{ + functions::{ + python::{PythonUDF, StatefulPythonUDF}, + FunctionExpr, + }, + optimization::{get_required_columns, requires_computation}, + Expr, ExprRef, +}; + +use crate::{ + logical_ops::{ActorPoolProject, Project}, + LogicalPlan, +}; + +use super::{ApplyOrder, OptimizerRule, Transformed}; + +#[derive(Default, Debug)] +pub struct SplitActorPoolProjects {} + +/// Implement SplitActorPoolProjects as an OptimizerRule which will: +/// +/// 1. Go top-down from the root of the LogicalPlan +/// 2. Whenever it sees a Project, it will iteratively split it into the necessary Project/ActorPoolProject sequences, +/// depending on the underlying Expressions' layouts of StatefulPythonUDFs. +/// +/// The general idea behind the splitting is that this is a greedy algorithm which will: +/// * Skim off the top of every expression in the projection to generate "stages" for every expression +/// * Generate Project/ActorPoolProject nodes based on those stages (coalesce non-stateful stages into a Project, and run the stateful stages as ActorPoolProjects sequentially) +/// * Loop until every expression in the projection has been exhausted +/// +/// For a given expression tree, skimming a Stage off the top entails: +/// 1. Iterate down the root of the tree, stopping whenever we encounter a StatefulUDF expression +/// 2. If the current stage is rooted at a StatefulUDF expression, then replace its children with Expr::Columns and return the StatefulUDF expression as its own stage +/// 3. Otherwise, the current stage is not a StatefulUDF expression: chop off any StatefulUDF children and replace them with Expr::Columns +impl OptimizerRule for SplitActorPoolProjects { + fn apply_order(&self) -> ApplyOrder { + ApplyOrder::TopDown + } + + fn try_optimize(&self, plan: Arc) -> DaftResult>> { + // TODO: Figure out num_actors! How do we propagate this correctly? + let num_actors = 1; + + match plan.as_ref() { + LogicalPlan::Project(projection) => try_optimize_project( + projection, + plan.clone(), + num_actors, + ), + // TODO: Figure out how to split other nodes as well such as Filter, Agg etc + _ => Ok(Transformed::No(plan)), + } + } +} + +struct SplitExprByStatefulUDF { + // Initialized to True, but once we encounter non-aliases this will be set to false + is_parsing_stateful_udf: bool, + next_exprs: Vec, +} + +impl SplitExprByStatefulUDF { + fn new() -> Self { + Self { + is_parsing_stateful_udf: true, + next_exprs: Vec::new(), + } + } +} + +impl TreeNodeRewriter for SplitExprByStatefulUDF { + type Node = ExprRef; + + fn f_down(&mut self, node: Self::Node) -> DaftResult> { + match node.as_ref() { + // Encountered alias: keep going if we are ignoring aliases + Expr::Alias { .. } if self.is_parsing_stateful_udf => { + Ok(common_treenode::Transformed::no(node)) + } + // Encountered stateful UDF: chop off all children and add to self.next_exprs + Expr::Function { + func: FunctionExpr::Python(PythonUDF::Stateful(StatefulPythonUDF { .. })), + inputs, + } => { + assert!(self.is_parsing_stateful_udf, "SplitExprByStatefulUDF.is_parsing_stateful_udf should be True if we encounter a stateful UDF expression"); + + let new_inputs = inputs.iter().map(|e| { + if requires_computation(e.as_ref()) { + // Truncate the child if it requires computation, and push it onto the stack to indicate that it needs computation in a different stage + self.next_exprs.push(e.clone()); + Expr::Column(e.name().into()).arced() + } else { + e.clone() + } + }); + let new_truncated_node = node.with_new_children(new_inputs.collect()).arced(); + + Ok(common_treenode::Transformed::new( + new_truncated_node, + true, + common_treenode::TreeNodeRecursion::Jump, + )) + } + expr => { + // Indicate that we are now parsing a stateless expression tree + self.is_parsing_stateful_udf = false; + + // None of the direct children are stateful UDFs, so we keep going + if node.children().iter().all(|e| { + !matches!( + e.as_ref(), + Expr::Function { + func: FunctionExpr::Python(PythonUDF::Stateful( + StatefulPythonUDF { .. } + )), + .. + } + ) + }) { + return Ok(common_treenode::Transformed::no(node)); + } + + // If any children are stateful UDFs, we truncate + let inputs = expr.children(); + let new_inputs = inputs.iter().map(|e| { + if matches!( + e.as_ref(), + Expr::Function { + func: FunctionExpr::Python(PythonUDF::Stateful( + StatefulPythonUDF { .. } + )), + .. + } + ) { + self.next_exprs.push(e.clone()); + Expr::Column(e.name().into()).arced() + } else { + e.clone() + } + }); + let new_truncated_node = node.with_new_children(new_inputs.collect()).arced(); + + Ok(common_treenode::Transformed::yes(new_truncated_node)) + } + } + } +} + +fn try_optimize_project( + projection: &Project, + plan: Arc, + num_actors: usize, +) -> DaftResult>> { + // Simple common case: no stateful UDFs at all and we have no transformations + let has_stateful_udfs = projection.projection.iter().any(has_stateful_udf); + if !has_stateful_udfs { + return Ok(Transformed::No(plan)); + } + + let (remaining, next_stages): (Vec, Vec) = { + let mut remaining = Vec::new(); + let mut next_stages = Vec::new(); + for expr in projection.projection.iter() { + let mut rewriter = SplitExprByStatefulUDF::new(); + let root = expr.clone().rewrite(&mut rewriter)?.data; + next_stages.push(root); + remaining.extend(rewriter.next_exprs); + } + (remaining, next_stages) + }; + + // Start building the tree back up starting from the children + let new_plan_child = if remaining.is_empty() { + // Nothing remaining, we're done splitting and should wire the new node up with the child of the Project + plan.children()[0].clone() + } else { + // Recursively run the rule on the new child Project + let new_project = Project::try_new( + plan.children()[0].clone(), + remaining, + )?; + let new_child_project = LogicalPlan::Project(new_project.clone()).arced(); + let optimized_child_plan = try_optimize_project( + &new_project, + new_child_project.clone(), + num_actors, + )?; + optimized_child_plan.unwrap().clone() + }; + + // Start building a chain of `child -> Optional -> ActorPoolProject -> ActorPoolProject -> ...` + let (stateful_stages, stateless_stages): (Vec<_>, Vec<_>) = + next_stages.into_iter().partition(has_stateful_udf); + let stateless_stages_names: HashSet = stateless_stages + .iter() + .map(|e| e.name().to_string()) + .collect(); + + // Conditionally build on the tree next with a stateless Project + let new_plan = if stateless_stages.is_empty() { + new_plan_child.clone() + } else { + // Stateless projection consists of stateless expressions, but also pass-through of any columns required by subsequent stateful ActorPoolProjects + let stateful_stages_columns_required: HashSet = stateful_stages + .iter() + .flat_map(get_required_columns) + .collect(); + let stateless_projection = stateless_stages + .into_iter() + .chain(stateful_stages_columns_required.iter().filter_map(|name| { + if stateless_stages_names.contains(name.as_str()) { + None + } else { + Some(Expr::Column(name.as_str().into()).arced()) + } + })) + .collect(); + + LogicalPlan::Project(Project::try_new( + new_plan_child.clone(), + stateless_projection, + )?) + .arced() + }; + + // Build on the tree with the necessary stateful ActorPoolProject nodes + let new_plan = { + let mut child = new_plan; + + for idx in 0..stateful_stages.len() { + let stateful_expr = stateful_stages[idx].clone(); + let stateful_expr_name = stateful_expr.name().to_string(); + let remaining_stateful_stages_columns_required: HashSet = stateful_stages + .as_slice()[idx + 1..] + .iter() + .flat_map(get_required_columns) + .collect(); + let stateful_projection = remaining_stateful_stages_columns_required + .iter() + .chain(stateless_stages_names.iter()) + .filter_map(|name| { + if name == &stateful_expr_name { + None + } else { + Some(Expr::Column(name.as_str().into()).arced()) + } + }) + .chain(iter::once(stateful_expr)) + .collect(); + child = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + child, + stateful_projection, + num_actors, + )?) + .arced(); + } + child + }; + + Ok(Transformed::Yes(new_plan)) +} + +#[inline] +fn has_stateful_udf(e: &ExprRef) -> bool { + e.exists(|e| { + matches!( + e.as_ref(), + Expr::Function { + func: FunctionExpr::Python(PythonUDF::Stateful(_)), + .. + } + ) + }) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_error::DaftResult; + use common_resource_request::ResourceRequest; + use daft_core::datatypes::Field; + use daft_dsl::{ + col, + functions::{ + python::{PythonUDF, StatefulPythonUDF}, + FunctionExpr, + }, + Expr, ExprRef, + }; + + use crate::{ + logical_ops::{ActorPoolProject, Project}, + logical_optimization::test::assert_optimized_plan_with_rules_eq, + test::{dummy_scan_node, dummy_scan_operator}, + LogicalPlan, + }; + + use super::SplitActorPoolProjects; + + /// Helper that creates an optimizer with the SplitExprByStatefulUDF rule registered, optimizes + /// the provided plan with said optimizer, and compares the optimized plan with + /// the provided expected plan. + fn assert_optimized_plan_eq( + plan: Arc, + expected: Arc, + ) -> DaftResult<()> { + assert_optimized_plan_with_rules_eq( + plan, + expected, + vec![Box::new(SplitActorPoolProjects {})], + ) + } + + #[cfg(not(feature = "python"))] + fn create_stateful_udf(inputs: Vec) -> ExprRef { + Expr::Function { + func: FunctionExpr::Python(PythonUDF::Stateful(StatefulPythonUDF { + name: Arc::new("foo".to_string()), + num_expressions: 1, + return_dtype: daft_core::DataType::Binary, + resource_request: Some(create_resource_request()), + })), + inputs, + } + .arced() + } + + fn create_resource_request() -> ResourceRequest { + ResourceRequest { + num_cpus: Some(8.), + num_gpus: Some(1.), + memory_bytes: None, + } + } + + // TODO: need to figure out how users will pass this in + static NUM_ACTORS: usize = 1; + + // #[cfg(not(feature = "python"))] + #[test] + fn test_with_column_stateful_udf_happypath() -> DaftResult<()> { + let resource_request = create_resource_request(); + let scan_op = dummy_scan_operator(vec![Field::new("a", daft_core::DataType::Utf8)]); + let scan_plan = dummy_scan_node(scan_op); + let stateful_project_expr = create_stateful_udf(vec![col("a")]); + + // Add a Projection with StatefulUDF and resource request + let project_plan = scan_plan + .with_columns( + vec![stateful_project_expr.clone().alias("b")], + None, + )? + .build(); + + // Project([col("a")]) --> ActorPoolProject([col("a"), foo(col("a")).alias("b")]) + let expected = scan_plan.select(vec![col("a")])?.build(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![col("a"), stateful_project_expr.clone().alias("b")], + NUM_ACTORS, + )?) + .arced(); + + assert_optimized_plan_eq(project_plan, expected)?; + + Ok(()) + } + + #[test] + fn test_multiple_with_column_parallel() -> DaftResult<()> { + let resource_request = create_resource_request(); + let scan_op = dummy_scan_operator(vec![Field::new("a", daft_core::DataType::Utf8)]); + let scan_plan = dummy_scan_node(scan_op); + let stateful_project_expr = create_stateful_udf(vec![col("a")]); + + // Add a Projection with StatefulUDF and resource request + let project_plan = scan_plan + .with_columns( + vec![ + stateful_project_expr.clone().alias("b"), + stateful_project_expr.clone().alias("c"), + ], + None, + )? + .build(); + + // Project([col("a")]) + // --> ActorPoolProject([col("a"), foo(col("a")).alias("SOME_FACTORED_NAME")]) + // --> Project([col("a"), col("SOME_FACTORED_NAME").alias("b"), foo(col("SOME_FACTORED_NAME")).alias("c")]) + let factored_column_name = "Function_Python(Stateful(StatefulPythonUDF { name: \"foo\", num_expressions: 1, return_dtype: Binary }))(a)"; + let expected = scan_plan.select(vec![col("a").alias("a")])?.build(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col("a"), + stateful_project_expr.clone().alias(factored_column_name), + ], + NUM_ACTORS, + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![ + col("a"), + col(factored_column_name).alias("b"), + col(factored_column_name).alias("c"), + ], + )?) + .arced(); + + assert_optimized_plan_eq(project_plan, expected)?; + + Ok(()) + } + + #[test] + fn test_multiple_with_column_serial() -> DaftResult<()> { + let resource_request = create_resource_request(); + let scan_op = dummy_scan_operator(vec![Field::new("a", daft_core::DataType::Utf8)]); + let scan_plan = dummy_scan_node(scan_op); + let stacked_stateful_project_expr = + create_stateful_udf(vec![create_stateful_udf(vec![col("a")])]); + + // Add a Projection with StatefulUDF and resource request + let project_plan = scan_plan + .with_columns( + vec![stacked_stateful_project_expr.clone().alias("b")], + None, + )? + .build(); + + // ActorPoolProject([col("a"), foo(col("a")).alias("SOME_INTERMEDIATE_NAME")]) + // --> ActorPoolProject([col("a"), foo(col("SOME_INTERMEDIATE_NAME")).alias("b")]) + let intermediate_name = "TODO_fix_this_intermediate_name"; + let expected = scan_plan.build(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col("a"), + create_stateful_udf(vec![col("a")]) + .clone() + .alias(intermediate_name), + ], + NUM_ACTORS, + )?) + .arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col("a"), + create_stateful_udf(vec![col(intermediate_name)]) + .clone() + .alias("b"), + ], + NUM_ACTORS, + )?) + .arced(); + + assert_optimized_plan_eq(project_plan, expected)?; + + Ok(()) + } +} diff --git a/src/daft-plan/src/logical_optimization/test/mod.rs b/src/daft-plan/src/logical_optimization/test/mod.rs index ed3fd3ccfb..7f26e317e6 100644 --- a/src/daft-plan/src/logical_optimization/test/mod.rs +++ b/src/daft-plan/src/logical_optimization/test/mod.rs @@ -35,7 +35,8 @@ pub fn assert_optimized_plan_with_rules_eq( assert_eq!( optimized_plan, expected, - "\n\nOptimized plan not equal to expected.\n\nOptimized:\n{}\n\nExpected:\n{}", + "\n\nOptimized plan not equal to expected.\n\nBefore Optimization:\n{}\n\nOptimized:\n{}\n\nExpected:\n{}", + plan.repr_ascii(false), optimized_plan.repr_ascii(false), expected.repr_ascii(false) ); From 97dd7d25cd22154c4f3694339f0b6c98d4b2a0b1 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Fri, 9 Aug 2024 13:16:54 -0700 Subject: [PATCH 03/18] Fix parallel unit test --- .../rules/split_actor_pool_projects.rs | 50 +++++++------------ 1 file changed, 19 insertions(+), 31 deletions(-) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 20b2cbbefe..4f5089d447 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, default, iter, sync::Arc}; +use std::{collections::HashSet, iter, sync::Arc}; use common_error::DaftResult; use common_treenode::{TreeNode, TreeNodeRewriter}; @@ -46,11 +46,9 @@ impl OptimizerRule for SplitActorPoolProjects { let num_actors = 1; match plan.as_ref() { - LogicalPlan::Project(projection) => try_optimize_project( - projection, - plan.clone(), - num_actors, - ), + LogicalPlan::Project(projection) => { + try_optimize_project(projection, plan.clone(), num_actors) + } // TODO: Figure out how to split other nodes as well such as Filter, Agg etc _ => Ok(Transformed::No(plan)), } @@ -179,16 +177,10 @@ fn try_optimize_project( plan.children()[0].clone() } else { // Recursively run the rule on the new child Project - let new_project = Project::try_new( - plan.children()[0].clone(), - remaining, - )?; + let new_project = Project::try_new(plan.children()[0].clone(), remaining)?; let new_child_project = LogicalPlan::Project(new_project.clone()).arced(); - let optimized_child_plan = try_optimize_project( - &new_project, - new_child_project.clone(), - num_actors, - )?; + let optimized_child_plan = + try_optimize_project(&new_project, new_child_project.clone(), num_actors)?; optimized_child_plan.unwrap().clone() }; @@ -341,20 +333,16 @@ mod tests { // TODO: need to figure out how users will pass this in static NUM_ACTORS: usize = 1; - // #[cfg(not(feature = "python"))] + #[cfg(not(feature = "python"))] #[test] fn test_with_column_stateful_udf_happypath() -> DaftResult<()> { - let resource_request = create_resource_request(); let scan_op = dummy_scan_operator(vec![Field::new("a", daft_core::DataType::Utf8)]); let scan_plan = dummy_scan_node(scan_op); let stateful_project_expr = create_stateful_udf(vec![col("a")]); // Add a Projection with StatefulUDF and resource request let project_plan = scan_plan - .with_columns( - vec![stateful_project_expr.clone().alias("b")], - None, - )? + .with_columns(vec![stateful_project_expr.clone().alias("b")], None)? .build(); // Project([col("a")]) --> ActorPoolProject([col("a"), foo(col("a")).alias("b")]) @@ -371,14 +359,18 @@ mod tests { Ok(()) } + #[cfg(not(feature = "python"))] #[test] fn test_multiple_with_column_parallel() -> DaftResult<()> { - let resource_request = create_resource_request(); let scan_op = dummy_scan_operator(vec![Field::new("a", daft_core::DataType::Utf8)]); let scan_plan = dummy_scan_node(scan_op); let stateful_project_expr = create_stateful_udf(vec![col("a")]); // Add a Projection with StatefulUDF and resource request + // NOTE: Our common-subtree elimination will build this as 2 project nodes: + // Project([col("a").alias("a"), foo(col("a")).alias(factored_column_name)]) + // --> Project([col("a"), col(factored_column_name).alias("b"), col(factored_column_name).alias("c")]) + let factored_column_name = "Function_Python(Stateful(StatefulPythonUDF { name: \"foo\", num_expressions: 1, return_dtype: Binary, resource_request: Some(ResourceRequest { num_cpus: Some(8.0), num_gpus: Some(1.0), memory_bytes: None }) }))(a)"; let project_plan = scan_plan .with_columns( vec![ @@ -389,10 +381,9 @@ mod tests { )? .build(); - // Project([col("a")]) - // --> ActorPoolProject([col("a"), foo(col("a")).alias("SOME_FACTORED_NAME")]) - // --> Project([col("a"), col("SOME_FACTORED_NAME").alias("b"), foo(col("SOME_FACTORED_NAME")).alias("c")]) - let factored_column_name = "Function_Python(Stateful(StatefulPythonUDF { name: \"foo\", num_expressions: 1, return_dtype: Binary }))(a)"; + // Project([col("a").alias("a")]) + // --> ActorPoolProject([col("a"), foo(col("a")).alias(factored_columns_name)]) + // --> Project([col("a"), col(factored_columns_name).alias("b"), foo(col(factored_columns_name)).alias("c")]) let expected = scan_plan.select(vec![col("a").alias("a")])?.build(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( expected, @@ -420,18 +411,15 @@ mod tests { #[test] fn test_multiple_with_column_serial() -> DaftResult<()> { - let resource_request = create_resource_request(); let scan_op = dummy_scan_operator(vec![Field::new("a", daft_core::DataType::Utf8)]); let scan_plan = dummy_scan_node(scan_op); let stacked_stateful_project_expr = create_stateful_udf(vec![create_stateful_udf(vec![col("a")])]); // Add a Projection with StatefulUDF and resource request + // Project([col("a"), foo(foo(col("a"))).alias("b")]) let project_plan = scan_plan - .with_columns( - vec![stacked_stateful_project_expr.clone().alias("b")], - None, - )? + .with_columns(vec![stacked_stateful_project_expr.clone().alias("b")], None)? .build(); // ActorPoolProject([col("a"), foo(col("a")).alias("SOME_INTERMEDIATE_NAME")]) From 27cd504497e5a51c5651e2a33d5af1de4e8e1f61 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Fri, 9 Aug 2024 13:53:54 -0700 Subject: [PATCH 04/18] Passing serial test as well --- .../rules/split_actor_pool_projects.rs | 77 ++++++++++++++----- 1 file changed, 56 insertions(+), 21 deletions(-) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 4f5089d447..7e3606382f 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -24,8 +24,8 @@ pub struct SplitActorPoolProjects {} /// Implement SplitActorPoolProjects as an OptimizerRule which will: /// /// 1. Go top-down from the root of the LogicalPlan -/// 2. Whenever it sees a Project, it will iteratively split it into the necessary Project/ActorPoolProject sequences, -/// depending on the underlying Expressions' layouts of StatefulPythonUDFs. +/// 2. Whenever it sees a Project with StatefulUDF(s), it will iteratively split it into a chain of sequences like so: +/// `(Project -> ActorPoolProject -> ActorPoolProject -> ...) -> (Project -> ActorPoolProject -> ActorPoolProject -> ...) -> ...` /// /// The general idea behind the splitting is that this is a greedy algorithm which will: /// * Skim off the top of every expression in the projection to generate "stages" for every expression @@ -47,7 +47,7 @@ impl OptimizerRule for SplitActorPoolProjects { match plan.as_ref() { LogicalPlan::Project(projection) => { - try_optimize_project(projection, plan.clone(), num_actors) + try_optimize_project(projection, plan.clone(), num_actors, 0) } // TODO: Figure out how to split other nodes as well such as Filter, Agg etc _ => Ok(Transformed::No(plan)), @@ -59,13 +59,17 @@ struct SplitExprByStatefulUDF { // Initialized to True, but once we encounter non-aliases this will be set to false is_parsing_stateful_udf: bool, next_exprs: Vec, + stage_id: usize, + monotonically_increasing_expr_identifier: usize, } impl SplitExprByStatefulUDF { - fn new() -> Self { + fn new(stage_id: usize) -> Self { Self { is_parsing_stateful_udf: true, next_exprs: Vec::new(), + stage_id, + monotonically_increasing_expr_identifier: 0, } } } @@ -88,9 +92,17 @@ impl TreeNodeRewriter for SplitExprByStatefulUDF { let new_inputs = inputs.iter().map(|e| { if requires_computation(e.as_ref()) { + // Give the new child a deterministic name + let intermediate_expr_name = format!( + "__SplitExprByStatefulUDF_{}-{}_stateful_child__", + self.stage_id, self.monotonically_increasing_expr_identifier + ); + self.monotonically_increasing_expr_identifier += 1; + // Truncate the child if it requires computation, and push it onto the stack to indicate that it needs computation in a different stage - self.next_exprs.push(e.clone()); - Expr::Column(e.name().into()).arced() + self.next_exprs + .push(e.clone().alias(intermediate_expr_name.as_str())); + Expr::Column(intermediate_expr_name.as_str().into()).arced() } else { e.clone() } @@ -103,6 +115,10 @@ impl TreeNodeRewriter for SplitExprByStatefulUDF { common_treenode::TreeNodeRecursion::Jump, )) } + Expr::Column(_) => { + self.next_exprs.push(node.clone()); + Ok(common_treenode::Transformed::no(node)) + } expr => { // Indicate that we are now parsing a stateless expression tree self.is_parsing_stateful_udf = false; @@ -134,8 +150,14 @@ impl TreeNodeRewriter for SplitExprByStatefulUDF { .. } ) { - self.next_exprs.push(e.clone()); - Expr::Column(e.name().into()).arced() + let intermediate_expr_name = format!( + "__SplitExprByStatefulUDF_{}-{}_stateful__", + self.stage_id, self.monotonically_increasing_expr_identifier + ); + self.monotonically_increasing_expr_identifier += 1; + self.next_exprs + .push(e.clone().alias(intermediate_expr_name.as_str())); + Expr::Column(intermediate_expr_name.as_str().into()).arced() } else { e.clone() } @@ -152,6 +174,7 @@ fn try_optimize_project( projection: &Project, plan: Arc, num_actors: usize, + recursive_count: usize, ) -> DaftResult>> { // Simple common case: no stateful UDFs at all and we have no transformations let has_stateful_udfs = projection.projection.iter().any(has_stateful_udf); @@ -163,7 +186,7 @@ fn try_optimize_project( let mut remaining = Vec::new(); let mut next_stages = Vec::new(); for expr in projection.projection.iter() { - let mut rewriter = SplitExprByStatefulUDF::new(); + let mut rewriter = SplitExprByStatefulUDF::new(recursive_count); let root = expr.clone().rewrite(&mut rewriter)?.data; next_stages.push(root); remaining.extend(rewriter.next_exprs); @@ -172,19 +195,26 @@ fn try_optimize_project( }; // Start building the tree back up starting from the children - let new_plan_child = if remaining.is_empty() { + let new_plan_child = if remaining + .iter() + .all(|e| matches!(e.as_ref(), Expr::Column(_))) + { // Nothing remaining, we're done splitting and should wire the new node up with the child of the Project plan.children()[0].clone() } else { // Recursively run the rule on the new child Project let new_project = Project::try_new(plan.children()[0].clone(), remaining)?; let new_child_project = LogicalPlan::Project(new_project.clone()).arced(); - let optimized_child_plan = - try_optimize_project(&new_project, new_child_project.clone(), num_actors)?; + let optimized_child_plan = try_optimize_project( + &new_project, + new_child_project.clone(), + num_actors, + recursive_count + 1, + )?; optimized_child_plan.unwrap().clone() }; - // Start building a chain of `child -> Optional -> ActorPoolProject -> ActorPoolProject -> ...` + // Start building a chain of `child -> Project -> ActorPoolProject -> ActorPoolProject -> ...` let (stateful_stages, stateless_stages): (Vec<_>, Vec<_>) = next_stages.into_iter().partition(has_stateful_udf); let stateless_stages_names: HashSet = stateless_stages @@ -192,10 +222,8 @@ fn try_optimize_project( .map(|e| e.name().to_string()) .collect(); - // Conditionally build on the tree next with a stateless Project - let new_plan = if stateless_stages.is_empty() { - new_plan_child.clone() - } else { + // Build the new stateless Project + let new_plan = { // Stateless projection consists of stateless expressions, but also pass-through of any columns required by subsequent stateful ActorPoolProjects let stateful_stages_columns_required: HashSet = stateful_stages .iter() @@ -422,10 +450,12 @@ mod tests { .with_columns(vec![stacked_stateful_project_expr.clone().alias("b")], None)? .build(); - // ActorPoolProject([col("a"), foo(col("a")).alias("SOME_INTERMEDIATE_NAME")]) - // --> ActorPoolProject([col("a"), foo(col("SOME_INTERMEDIATE_NAME")).alias("b")]) - let intermediate_name = "TODO_fix_this_intermediate_name"; - let expected = scan_plan.build(); + // Project([col("a")]) + // --> ActorPoolProject([col("a"), foo(col("a")).alias("__SplitExprByStatefulUDF_0-0_stateful_child__")]) + // --> Project([col("a"), col("__SplitExprByStatefulUDF_0-0_stateful_child__")]) + // --> ActorPoolProject([col("a"), foo(col("__SplitExprByStatefulUDF_0-0_stateful_child__")).alias("b")]) + let intermediate_name = "__SplitExprByStatefulUDF_0-0_stateful_child__"; + let expected = scan_plan.select(vec![col("a")])?.build(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( expected, vec![ @@ -437,6 +467,11 @@ mod tests { NUM_ACTORS, )?) .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![col("a"), col(intermediate_name)], + )?) + .arced(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( expected, vec![ From 49ae3ee670d860c9ff6637304aab47804e1c1fc8 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Fri, 9 Aug 2024 15:14:08 -0700 Subject: [PATCH 05/18] Change rule to use final projection --- .../rules/split_actor_pool_projects.rs | 159 ++++++++++++++---- 1 file changed, 127 insertions(+), 32 deletions(-) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 7e3606382f..5d4eeb387a 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -7,9 +7,10 @@ use daft_dsl::{ python::{PythonUDF, StatefulPythonUDF}, FunctionExpr, }, - optimization::{get_required_columns, requires_computation}, + optimization::requires_computation, Expr, ExprRef, }; +use itertools::Itertools; use crate::{ logical_ops::{ActorPoolProject, Project}, @@ -99,7 +100,7 @@ impl TreeNodeRewriter for SplitExprByStatefulUDF { ); self.monotonically_increasing_expr_identifier += 1; - // Truncate the child if it requires computation, and push it onto the stack to indicate that it needs computation in a different stage + // Truncate the child and push it onto the stack to indicate that it needs computation in a different stage self.next_exprs .push(e.clone().alias(intermediate_expr_name.as_str())); Expr::Column(intermediate_expr_name.as_str().into()).arced() @@ -194,6 +195,26 @@ fn try_optimize_project( (remaining, next_stages) }; + println!( + "Optimizing: {}", + projection + .projection + .iter() + .map(|e| e.as_ref().to_string()) + .join(", ") + ); + println!( + "Remaining: {}", + remaining.iter().map(|e| e.as_ref().to_string()).join(", ") + ); + println!( + "Next Stages: {}", + next_stages + .iter() + .map(|e| e.as_ref().to_string()) + .join(", ") + ); + // Start building the tree back up starting from the children let new_plan_child = if remaining .iter() @@ -213,6 +234,7 @@ fn try_optimize_project( )?; optimized_child_plan.unwrap().clone() }; + println!("New child:\n{}", new_plan_child.repr_ascii(false)); // Start building a chain of `child -> Project -> ActorPoolProject -> ActorPoolProject -> ...` let (stateful_stages, stateless_stages): (Vec<_>, Vec<_>) = @@ -222,47 +244,43 @@ fn try_optimize_project( .map(|e| e.name().to_string()) .collect(); - // Build the new stateless Project + // Build the new stateless Project: [...stateless_projections, ...all columns that came before it] + let stateless_projection = stateless_stages + .into_iter() + .chain( + new_plan_child + .schema() + .fields + .iter() + .filter_map(|(name, _)| { + if stateless_stages_names.contains(name) { + None + } else { + Some(Expr::Column(name.clone().into()).arced()) + } + }), + ) + .collect(); let new_plan = { - // Stateless projection consists of stateless expressions, but also pass-through of any columns required by subsequent stateful ActorPoolProjects - let stateful_stages_columns_required: HashSet = stateful_stages - .iter() - .flat_map(get_required_columns) - .collect(); - let stateless_projection = stateless_stages - .into_iter() - .chain(stateful_stages_columns_required.iter().filter_map(|name| { - if stateless_stages_names.contains(name.as_str()) { - None - } else { - Some(Expr::Column(name.as_str().into()).arced()) - } - })) - .collect(); - LogicalPlan::Project(Project::try_new( new_plan_child.clone(), stateless_projection, )?) .arced() }; + println!("With new project:\n{}", new_plan.repr_ascii(false)); - // Build on the tree with the necessary stateful ActorPoolProject nodes + // Iteratively build ActorPoolProject nodes: [StatefulUDF, ...all columns that came before it] let new_plan = { let mut child = new_plan; - for idx in 0..stateful_stages.len() { - let stateful_expr = stateful_stages[idx].clone(); + for stateful_expr in stateful_stages { let stateful_expr_name = stateful_expr.name().to_string(); - let remaining_stateful_stages_columns_required: HashSet = stateful_stages - .as_slice()[idx + 1..] - .iter() - .flat_map(get_required_columns) - .collect(); - let stateful_projection = remaining_stateful_stages_columns_required + let stateful_projection = child + .schema() + .fields .iter() - .chain(stateless_stages_names.iter()) - .filter_map(|name| { + .filter_map(|(name, _)| { if name == &stateful_expr_name { None } else { @@ -280,8 +298,24 @@ fn try_optimize_project( } child }; + println!( + "With new actor pool projects:\n{}", + new_plan.repr_ascii(false) + ); + + // One final project to select just the columns we need + // This will help us do the necessary column pruning via projection pushdowns + let final_selection_project = LogicalPlan::Project(Project::try_new( + new_plan, + projection + .projection + .iter() + .map(|e| Expr::Column(e.name().into()).arced()) + .collect(), + )?) + .arced(); - Ok(Transformed::Yes(new_plan)) + Ok(Transformed::Yes(final_selection_project)) } #[inline] @@ -341,7 +375,7 @@ mod tests { Expr::Function { func: FunctionExpr::Python(PythonUDF::Stateful(StatefulPythonUDF { name: Arc::new("foo".to_string()), - num_expressions: 1, + num_expressions: inputs.len(), return_dtype: daft_core::DataType::Binary, resource_request: Some(create_resource_request()), })), @@ -485,7 +519,68 @@ mod tests { .arced(); assert_optimized_plan_eq(project_plan, expected)?; + Ok(()) + } + + #[test] + fn test_multiple_with_column_serial_multiarg() -> DaftResult<()> { + let scan_op = dummy_scan_operator(vec![ + Field::new("a", daft_core::DataType::Utf8), + Field::new("b", daft_core::DataType::Utf8), + ]); + let scan_plan = dummy_scan_node(scan_op); + let stacked_stateful_project_expr = create_stateful_udf(vec![ + create_stateful_udf(vec![col("a")]), + create_stateful_udf(vec![col("b")]), + ]); + // Add a Projection with StatefulUDF and resource request + // Project([foo(foo(col("a")), foo(col("b"))).alias("c")]) + let project_plan = scan_plan + .select(vec![stacked_stateful_project_expr.clone().alias("c")])? + .build(); + + // Project([col("a"), col("b")]) + // --> ActorPoolProject([col("b"), foo(col("a")).alias("__SplitExprByStatefulUDF_0-0_stateful_child__")]) + // --> ActorPoolProject([col("__SplitExprByStatefulUDF_0-0_stateful_child__"), foo(col("b")).alias("__SplitExprByStatefulUDF_0-1_stateful_child__")]) + // --> ActorPoolProject([foo(col("__SplitExprByStatefulUDF_0-0_stateful_child__"), col("__SplitExprByStatefulUDF_0-1_stateful_child__")).alias("c")]) + let intermediate_name_0 = "__SplitExprByStatefulUDF_0-0_stateful_child__"; + let intermediate_name_1 = "__SplitExprByStatefulUDF_0-1_stateful_child__"; + let expected = scan_plan.select(vec![col("a"), col("b")])?.build(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col("b"), + create_stateful_udf(vec![col("a")]) + .clone() + .alias(intermediate_name_0), + ], + NUM_ACTORS, + )?) + .arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col(intermediate_name_0), + create_stateful_udf(vec![col("b")]) + .clone() + .alias(intermediate_name_1), + ], + NUM_ACTORS, + )?) + .arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + create_stateful_udf(vec![col(intermediate_name_0), col(intermediate_name_1)]) + .clone() + .alias("c"), + ], + NUM_ACTORS, + )?) + .arced(); + + assert_optimized_plan_eq(project_plan, expected)?; Ok(()) } } From efbb4d86ead47162f6f09dd42948426c8e1ac0f3 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Mon, 12 Aug 2024 17:46:24 -0700 Subject: [PATCH 06/18] passing tests add more docstrings --- Cargo.lock | 141 +++++++++++++- Cargo.toml | 1 + src/daft-plan/Cargo.toml | 1 + .../rules/split_actor_pool_projects.rs | 182 +++++++++++------- 4 files changed, 250 insertions(+), 75 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 00dcccea8c..e5845fe9f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,6 +94,55 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" +[[package]] +name = "anstream" +version = "0.6.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64e15c1ab1f89faffbf04a634d5e1962e9074f2741eef6d97f3c4e322426d526" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1" + +[[package]] +name = "anstyle-parse" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb47de1e80c2b463c735db5b217a0ddc39d612e7ac9e2e96a5aed1f57616c1cb" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d36fc52c7f6c869915e99412912f22093507da8d9e942ceaf66fe4b7c14422a" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bf74e1b6e971609db8ca7a9ce79fd5768ab6ae46441c572e46cf596f59e57f8" +dependencies = [ + "anstyle", + "windows-sys 0.52.0", +] + [[package]] name = "anyhow" version = "1.0.86" @@ -992,7 +1041,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40723b8fb387abc38f4f4a37c09073622e41dd12327033091ef8950659e6dc0c" dependencies = [ "memchr", - "regex-automata", + "regex-automata 0.4.7", "serde", ] @@ -1211,6 +1260,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d7b894f5411737b7867f4827955924d7c254fc9f4d91a6aad6b097804b1018b" +[[package]] +name = "colorchoice" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" + [[package]] name = "comfy-table" version = "6.2.0" @@ -1952,6 +2007,7 @@ dependencies = [ "rstest", "serde", "snafu", + "test-log", ] [[package]] @@ -2134,6 +2190,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "env_filter" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f2c92ceda6ceec50f43169f9ee8424fe2db276791afde7b2cd8bc084cb376ab" +dependencies = [ + "log", +] + [[package]] name = "env_logger" version = "0.8.4" @@ -2144,6 +2209,18 @@ dependencies = [ "regex", ] +[[package]] +name = "env_logger" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13fa619b91fb2381732789fc5de83b45675e882f66623b7d8cb4f643017018d" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "log", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -2480,7 +2557,7 @@ dependencies = [ "aho-corasick", "bstr", "log", - "regex-automata", + "regex-automata 0.4.7", "regex-syntax 0.8.4", ] @@ -2889,6 +2966,12 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + [[package]] name = "itertools" version = "0.10.5" @@ -3195,6 +3278,15 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matrixmultiply" version = "0.3.9" @@ -4040,7 +4132,7 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "588f6378e4dd99458b60ec275b4477add41ce4fa9f64dcba6f15adccb19b50d6" dependencies = [ - "env_logger", + "env_logger 0.8.4", "log", "rand 0.8.5", ] @@ -4207,10 +4299,19 @@ checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619" dependencies = [ "aho-corasick", "memchr", - "regex-automata", + "regex-automata 0.4.7", "regex-syntax 0.8.4", ] +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", +] + [[package]] name = "regex-automata" version = "0.4.7" @@ -5004,6 +5105,28 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "test-log" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dffced63c2b5c7be278154d76b479f9f9920ed34e7574201407f0b14e2bbb93" +dependencies = [ + "env_logger 0.11.5", + "test-log-macros", + "tracing-subscriber", +] + +[[package]] +name = "test-log-macros" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5999e24eaa32083191ba4e425deb75cdf25efefabe5aaccb7446dd0d4122a3f5" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.74", +] + [[package]] name = "textwrap" version = "0.16.1" @@ -5312,10 +5435,14 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] @@ -5494,6 +5621,12 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86bd8d4e895da8537e5315b8254664e6b769c4ff3db18321b297a1e7004392e3" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "uuid" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index b066dace61..c3af87b59e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -161,6 +161,7 @@ sketches-ddsketch = {version = "0.2.2", features = ["use_serde"]} snafu = {version = "0.7.4", features = ["futures"]} sqlparser = "0.49.0" sysinfo = "0.30.12" +test-log = "0.2.16" tiktoken-rs = "0.5.9" tokio = {version = "1.37.0", features = [ "net", diff --git a/src/daft-plan/Cargo.toml b/src/daft-plan/Cargo.toml index 8ed445b5f1..ca6e0749c5 100644 --- a/src/daft-plan/Cargo.toml +++ b/src/daft-plan/Cargo.toml @@ -35,6 +35,7 @@ snafu = {workspace = true} daft-functions = {path = "../daft-functions", default-features = false} pretty_assertions = {workspace = true} rstest = {workspace = true} +test-log = {workspace = true} [features] python = [ diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 5d4eeb387a..5ea703aff3 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -25,18 +25,26 @@ pub struct SplitActorPoolProjects {} /// Implement SplitActorPoolProjects as an OptimizerRule which will: /// /// 1. Go top-down from the root of the LogicalPlan -/// 2. Whenever it sees a Project with StatefulUDF(s), it will iteratively split it into a chain of sequences like so: -/// `(Project -> ActorPoolProject -> ActorPoolProject -> ...) -> (Project -> ActorPoolProject -> ActorPoolProject -> ...) -> ...` +/// 2. Whenever it sees a Project with StatefulUDF(s), it will split it like so: +/// Project_recursive (optional) -> Project_stateless -> ActorPoolProject(s)... -> Project_final +/// 3. Then it recurses on `Project_recursive` until there is no more `Project_recursive` to split anymore /// -/// The general idea behind the splitting is that this is a greedy algorithm which will: -/// * Skim off the top of every expression in the projection to generate "stages" for every expression -/// * Generate Project/ActorPoolProject nodes based on those stages (coalesce non-stateful stages into a Project, and run the stateful stages as ActorPoolProjects sequentially) -/// * Loop until every expression in the projection has been exhausted +/// Invariants: +/// * `Project_recursive` definitely contains at least 1 stateful UDF, and hence need to be recursively split. If it is not constructed, then this is the base case. +/// * `Project_stateless` contains: [...stateless_projections, ...passthrough_columns_as_colexprs] +/// * Subsequent `ActorPoolProject(s)` contain: [Single StatefulUDF, ...passthrough_columns_as_colexprs] +/// * `Project_final` contains only Expr::Columns, and has the same column names (and column ordering) as the original Projection +/// * At the end of splitting, all Project nodes will never contain a StatefulUDF, and all ActorPoolProject nodes will contain one-and-only-one StatefulUDF /// -/// For a given expression tree, skimming a Stage off the top entails: -/// 1. Iterate down the root of the tree, stopping whenever we encounter a StatefulUDF expression -/// 2. If the current stage is rooted at a StatefulUDF expression, then replace its children with Expr::Columns and return the StatefulUDF expression as its own stage -/// 3. Otherwise, the current stage is not a StatefulUDF expression: chop off any StatefulUDF children and replace them with Expr::Columns +/// How splitting is performed on a given Project: +/// 1. For every expression in the Project, "skim off the top" +/// * If the root expression is a StatefulUDF, truncate all of its children, alias them, and then add them to `Project_recursive` +/// * If the root expression is not a StatefulUDF, truncate any StatefulUDF children, alias them, and add them to `Project_recursive` +/// 2. Recursively perform splitting on `Project_recursive` +/// 3. Now for the current truncated expressions, split them into stateless vs stateful expressions: +/// * All stateless expressions go into a single `Project_stateless` node +/// * For each stateful expression, they go into their own dedicated `ActorPoolProject` node +/// * The final `Project_final` node is naively constructed using the names of the original Project impl OptimizerRule for SplitActorPoolProjects { fn apply_order(&self) -> ApplyOrder { ApplyOrder::TopDown @@ -59,7 +67,7 @@ impl OptimizerRule for SplitActorPoolProjects { struct SplitExprByStatefulUDF { // Initialized to True, but once we encounter non-aliases this will be set to false is_parsing_stateful_udf: bool, - next_exprs: Vec, + remaining_exprs: Vec, stage_id: usize, monotonically_increasing_expr_identifier: usize, } @@ -68,7 +76,7 @@ impl SplitExprByStatefulUDF { fn new(stage_id: usize) -> Self { Self { is_parsing_stateful_udf: true, - next_exprs: Vec::new(), + remaining_exprs: Vec::new(), stage_id, monotonically_increasing_expr_identifier: 0, } @@ -80,7 +88,7 @@ impl TreeNodeRewriter for SplitExprByStatefulUDF { fn f_down(&mut self, node: Self::Node) -> DaftResult> { match node.as_ref() { - // Encountered alias: keep going if we are ignoring aliases + // Encountered alias: keep going if we are parsing stateful UDFs because we should ignoring aliases Expr::Alias { .. } if self.is_parsing_stateful_udf => { Ok(common_treenode::Transformed::no(node)) } @@ -101,7 +109,7 @@ impl TreeNodeRewriter for SplitExprByStatefulUDF { self.monotonically_increasing_expr_identifier += 1; // Truncate the child and push it onto the stack to indicate that it needs computation in a different stage - self.next_exprs + self.remaining_exprs .push(e.clone().alias(intermediate_expr_name.as_str())); Expr::Column(intermediate_expr_name.as_str().into()).arced() } else { @@ -117,7 +125,7 @@ impl TreeNodeRewriter for SplitExprByStatefulUDF { )) } Expr::Column(_) => { - self.next_exprs.push(node.clone()); + self.remaining_exprs.push(node.clone()); Ok(common_treenode::Transformed::no(node)) } expr => { @@ -156,7 +164,7 @@ impl TreeNodeRewriter for SplitExprByStatefulUDF { self.stage_id, self.monotonically_increasing_expr_identifier ); self.monotonically_increasing_expr_identifier += 1; - self.next_exprs + self.remaining_exprs .push(e.clone().alias(intermediate_expr_name.as_str())); Expr::Column(intermediate_expr_name.as_str().into()).arced() } else { @@ -177,25 +185,13 @@ fn try_optimize_project( num_actors: usize, recursive_count: usize, ) -> DaftResult>> { - // Simple common case: no stateful UDFs at all and we have no transformations + // Base case: no stateful UDFs at all let has_stateful_udfs = projection.projection.iter().any(has_stateful_udf); if !has_stateful_udfs { return Ok(Transformed::No(plan)); } - let (remaining, next_stages): (Vec, Vec) = { - let mut remaining = Vec::new(); - let mut next_stages = Vec::new(); - for expr in projection.projection.iter() { - let mut rewriter = SplitExprByStatefulUDF::new(recursive_count); - let root = expr.clone().rewrite(&mut rewriter)?.data; - next_stages.push(root); - remaining.extend(rewriter.next_exprs); - } - (remaining, next_stages) - }; - - println!( + log::debug!( "Optimizing: {}", projection .projection @@ -203,19 +199,31 @@ fn try_optimize_project( .map(|e| e.as_ref().to_string()) .join(", ") ); - println!( - "Remaining: {}", - remaining.iter().map(|e| e.as_ref().to_string()).join(", ") - ); - println!( - "Next Stages: {}", - next_stages + + // Split the Projection into: + // * remaining: remaining parts of the Project to recurse on + // * truncated_exprs: current parts of the Project to split into (Project -> ActorPoolProjects -> Project) + let (remaining, truncated_exprs): (Vec, Vec) = { + let mut remaining = Vec::new(); + let mut truncated_exprs = Vec::new(); + for expr in projection.projection.iter() { + let mut rewriter = SplitExprByStatefulUDF::new(recursive_count); + let root = expr.clone().rewrite(&mut rewriter)?.data; + truncated_exprs.push(root); + remaining.extend(rewriter.remaining_exprs); + } + (remaining, truncated_exprs) + }; + + log::debug!( + "Truncated Exprs: {}", + truncated_exprs .iter() .map(|e| e.as_ref().to_string()) .join(", ") ); - // Start building the tree back up starting from the children + // Recurse if necessary (if there are any non-noop expressions left to run in `remaining`) let new_plan_child = if remaining .iter() .all(|e| matches!(e.as_ref(), Expr::Column(_))) @@ -234,41 +242,39 @@ fn try_optimize_project( )?; optimized_child_plan.unwrap().clone() }; - println!("New child:\n{}", new_plan_child.repr_ascii(false)); - // Start building a chain of `child -> Project -> ActorPoolProject -> ActorPoolProject -> ...` + // Start building a chain of `child -> Project -> ActorPoolProject -> ActorPoolProject -> ... -> Project` let (stateful_stages, stateless_stages): (Vec<_>, Vec<_>) = - next_stages.into_iter().partition(has_stateful_udf); - let stateless_stages_names: HashSet = stateless_stages - .iter() - .map(|e| e.name().to_string()) - .collect(); + truncated_exprs.into_iter().partition(has_stateful_udf); // Build the new stateless Project: [...stateless_projections, ...all columns that came before it] + let passthrough_columns = { + let stateless_stages_names: HashSet = stateless_stages + .iter() + .map(|e| e.name().to_string()) + .collect(); + new_plan_child + .schema() + .names() + .into_iter() + .filter_map(|name| { + if stateless_stages_names.contains(name.as_str()) { + None + } else { + Some(Expr::Column(name.as_str().into()).arced()) + } + }) + .collect_vec() + }; let stateless_projection = stateless_stages .into_iter() - .chain( - new_plan_child - .schema() - .fields - .iter() - .filter_map(|(name, _)| { - if stateless_stages_names.contains(name) { - None - } else { - Some(Expr::Column(name.clone().into()).arced()) - } - }), - ) + .chain(passthrough_columns) .collect(); - let new_plan = { - LogicalPlan::Project(Project::try_new( - new_plan_child.clone(), - stateless_projection, - )?) - .arced() - }; - println!("With new project:\n{}", new_plan.repr_ascii(false)); + let new_plan = LogicalPlan::Project(Project::try_new( + new_plan_child.clone(), + stateless_projection, + )?) + .arced(); // Iteratively build ActorPoolProject nodes: [StatefulUDF, ...all columns that came before it] let new_plan = { @@ -298,10 +304,6 @@ fn try_optimize_project( } child }; - println!( - "With new actor pool projects:\n{}", - new_plan.repr_ascii(false) - ); // One final project to select just the columns we need // This will help us do the necessary column pruning via projection pushdowns @@ -334,6 +336,7 @@ fn has_stateful_udf(e: &ExprRef) -> bool { #[cfg(test)] mod tests { use std::sync::Arc; + use test_log::test; use common_error::DaftResult; use common_resource_request::ResourceRequest; @@ -407,7 +410,7 @@ mod tests { .with_columns(vec![stateful_project_expr.clone().alias("b")], None)? .build(); - // Project([col("a")]) --> ActorPoolProject([col("a"), foo(col("a")).alias("b")]) + // Project([col("a")]) --> ActorPoolProject([col("a"), foo(col("a")).alias("b")]) --> Project([col("a"), col("b")]) let expected = scan_plan.select(vec![col("a")])?.build(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( expected, @@ -415,6 +418,8 @@ mod tests { NUM_ACTORS, )?) .arced(); + let expected = + LogicalPlan::Project(Project::try_new(expected, vec![col("a"), col("b")])?).arced(); assert_optimized_plan_eq(project_plan, expected)?; @@ -445,6 +450,7 @@ mod tests { // Project([col("a").alias("a")]) // --> ActorPoolProject([col("a"), foo(col("a")).alias(factored_columns_name)]) + // --> Project([col("a"), col(factored_columns_name)]) // --> Project([col("a"), col(factored_columns_name).alias("b"), foo(col(factored_columns_name)).alias("c")]) let expected = scan_plan.select(vec![col("a").alias("a")])?.build(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( @@ -456,6 +462,11 @@ mod tests { NUM_ACTORS, )?) .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![col("a"), col(factored_column_name)], + )?) + .arced(); let expected = LogicalPlan::Project(Project::try_new( expected, vec![ @@ -487,7 +498,9 @@ mod tests { // Project([col("a")]) // --> ActorPoolProject([col("a"), foo(col("a")).alias("__SplitExprByStatefulUDF_0-0_stateful_child__")]) // --> Project([col("a"), col("__SplitExprByStatefulUDF_0-0_stateful_child__")]) + // --> Project([col("a"), col("__SplitExprByStatefulUDF_0-0_stateful_child__")]) // --> ActorPoolProject([col("a"), foo(col("__SplitExprByStatefulUDF_0-0_stateful_child__")).alias("b")]) + // --> Project([col("a"), col("b")]) let intermediate_name = "__SplitExprByStatefulUDF_0-0_stateful_child__"; let expected = scan_plan.select(vec![col("a")])?.build(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( @@ -506,10 +519,16 @@ mod tests { vec![col("a"), col(intermediate_name)], )?) .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![col("a"), col(intermediate_name)], + )?) + .arced(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( expected, vec![ col("a"), + col(intermediate_name), create_stateful_udf(vec![col(intermediate_name)]) .clone() .alias("b"), @@ -517,6 +536,8 @@ mod tests { NUM_ACTORS, )?) .arced(); + let expected = + LogicalPlan::Project(Project::try_new(expected, vec![col("a"), col("b")])?).arced(); assert_optimized_plan_eq(project_plan, expected)?; Ok(()) @@ -543,13 +564,17 @@ mod tests { // Project([col("a"), col("b")]) // --> ActorPoolProject([col("b"), foo(col("a")).alias("__SplitExprByStatefulUDF_0-0_stateful_child__")]) // --> ActorPoolProject([col("__SplitExprByStatefulUDF_0-0_stateful_child__"), foo(col("b")).alias("__SplitExprByStatefulUDF_0-1_stateful_child__")]) + // --> Project([col("__SplitExprByStatefulUDF_0-0_stateful_child__"), col("__SplitExprByStatefulUDF_0-1_stateful_child__")]) + // --> Project([col("__SplitExprByStatefulUDF_0-0_stateful_child__"), col("__SplitExprByStatefulUDF_0-1_stateful_child__")]) // --> ActorPoolProject([foo(col("__SplitExprByStatefulUDF_0-0_stateful_child__"), col("__SplitExprByStatefulUDF_0-1_stateful_child__")).alias("c")]) + // --> Project([col("c")]) let intermediate_name_0 = "__SplitExprByStatefulUDF_0-0_stateful_child__"; let intermediate_name_1 = "__SplitExprByStatefulUDF_0-1_stateful_child__"; let expected = scan_plan.select(vec![col("a"), col("b")])?.build(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( expected, vec![ + col("a"), col("b"), create_stateful_udf(vec![col("a")]) .clone() @@ -561,6 +586,8 @@ mod tests { let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( expected, vec![ + col("a"), + col("b"), col(intermediate_name_0), create_stateful_udf(vec![col("b")]) .clone() @@ -569,9 +596,21 @@ mod tests { NUM_ACTORS, )?) .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![col(intermediate_name_0), col(intermediate_name_1)], + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![col(intermediate_name_0), col(intermediate_name_1)], + )?) + .arced(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( expected, vec![ + col(intermediate_name_0), + col(intermediate_name_1), create_stateful_udf(vec![col(intermediate_name_0), col(intermediate_name_1)]) .clone() .alias("c"), @@ -579,6 +618,7 @@ mod tests { NUM_ACTORS, )?) .arced(); + let expected = LogicalPlan::Project(Project::try_new(expected, vec![col("c")])?).arced(); assert_optimized_plan_eq(project_plan, expected)?; Ok(()) From 4adb5ae2278f7282424251f9ad6cea7c5ff2c4f5 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 13 Aug 2024 09:45:18 -0700 Subject: [PATCH 07/18] Fix pathological case with ColumnExpr being added twice --- .../rules/split_actor_pool_projects.rs | 124 +++++++++++++++--- 1 file changed, 107 insertions(+), 17 deletions(-) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 5ea703aff3..d87313b035 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -70,6 +70,7 @@ struct SplitExprByStatefulUDF { remaining_exprs: Vec, stage_id: usize, monotonically_increasing_expr_identifier: usize, + truncated_node_names: HashSet, } impl SplitExprByStatefulUDF { @@ -79,6 +80,7 @@ impl SplitExprByStatefulUDF { remaining_exprs: Vec::new(), stage_id, monotonically_increasing_expr_identifier: 0, + truncated_node_names: HashSet::new(), } } } @@ -107,6 +109,8 @@ impl TreeNodeRewriter for SplitExprByStatefulUDF { self.stage_id, self.monotonically_increasing_expr_identifier ); self.monotonically_increasing_expr_identifier += 1; + self.truncated_node_names + .insert(intermediate_expr_name.clone()); // Truncate the child and push it onto the stack to indicate that it needs computation in a different stage self.remaining_exprs @@ -124,10 +128,12 @@ impl TreeNodeRewriter for SplitExprByStatefulUDF { common_treenode::TreeNodeRecursion::Jump, )) } - Expr::Column(_) => { + // If we encounter a ColumnExpr, we only add it to the remaining exprs if it wasn't a ColumnExpr that was added by SplitExprByStatefulUDF + Expr::Column(name) if !self.truncated_node_names.contains(name.as_ref()) => { self.remaining_exprs.push(node.clone()); Ok(common_treenode::Transformed::no(node)) } + // If we encounter a stateless expression, we try to truncate any children that are StatefulUDFs expr => { // Indicate that we are now parsing a stateless expression tree self.is_parsing_stateful_udf = false; @@ -164,6 +170,8 @@ impl TreeNodeRewriter for SplitExprByStatefulUDF { self.stage_id, self.monotonically_increasing_expr_identifier ); self.monotonically_increasing_expr_identifier += 1; + self.truncated_node_names + .insert(intermediate_expr_name.clone()); self.remaining_exprs .push(e.clone().alias(intermediate_expr_name.as_str())); Expr::Column(intermediate_expr_name.as_str().into()).arced() @@ -172,7 +180,6 @@ impl TreeNodeRewriter for SplitExprByStatefulUDF { } }); let new_truncated_node = node.with_new_children(new_inputs.collect()).arced(); - Ok(common_treenode::Transformed::yes(new_truncated_node)) } } @@ -247,7 +254,7 @@ fn try_optimize_project( let (stateful_stages, stateless_stages): (Vec<_>, Vec<_>) = truncated_exprs.into_iter().partition(has_stateful_udf); - // Build the new stateless Project: [...stateless_projections, ...all columns that came before it] + // Build the new stateless Project: [...all columns that came before it, ...stateless_projections] let passthrough_columns = { let stateless_stages_names: HashSet = stateless_stages .iter() @@ -266,9 +273,9 @@ fn try_optimize_project( }) .collect_vec() }; - let stateless_projection = stateless_stages + let stateless_projection = passthrough_columns .into_iter() - .chain(passthrough_columns) + .chain(stateless_stages) .collect(); let new_plan = LogicalPlan::Project(Project::try_new( new_plan_child.clone(), @@ -276,7 +283,7 @@ fn try_optimize_project( )?) .arced(); - // Iteratively build ActorPoolProject nodes: [StatefulUDF, ...all columns that came before it] + // Iteratively build ActorPoolProject nodes: [...all columns that came before it, StatefulUDF] let new_plan = { let mut child = new_plan; @@ -352,7 +359,9 @@ mod tests { use crate::{ logical_ops::{ActorPoolProject, Project}, - logical_optimization::test::assert_optimized_plan_with_rules_eq, + logical_optimization::{ + rules::PushDownProjection, test::assert_optimized_plan_with_rules_eq, + }, test::{dummy_scan_node, dummy_scan_operator}, LogicalPlan, }; @@ -379,7 +388,7 @@ mod tests { func: FunctionExpr::Python(PythonUDF::Stateful(StatefulPythonUDF { name: Arc::new("foo".to_string()), num_expressions: inputs.len(), - return_dtype: daft_core::DataType::Binary, + return_dtype: daft_core::DataType::Int64, resource_request: Some(create_resource_request()), })), inputs, @@ -437,7 +446,7 @@ mod tests { // NOTE: Our common-subtree elimination will build this as 2 project nodes: // Project([col("a").alias("a"), foo(col("a")).alias(factored_column_name)]) // --> Project([col("a"), col(factored_column_name).alias("b"), col(factored_column_name).alias("c")]) - let factored_column_name = "Function_Python(Stateful(StatefulPythonUDF { name: \"foo\", num_expressions: 1, return_dtype: Binary, resource_request: Some(ResourceRequest { num_cpus: Some(8.0), num_gpus: Some(1.0), memory_bytes: None }) }))(a)"; + let factored_column_name = "Function_Python(Stateful(StatefulPythonUDF { name: \"foo\", num_expressions: 1, return_dtype: Int64, resource_request: Some(ResourceRequest { num_cpus: Some(8.0), num_gpus: Some(1.0), memory_bytes: None }) }))(a)"; let project_plan = scan_plan .with_columns( vec![ @@ -448,10 +457,6 @@ mod tests { )? .build(); - // Project([col("a").alias("a")]) - // --> ActorPoolProject([col("a"), foo(col("a")).alias(factored_columns_name)]) - // --> Project([col("a"), col(factored_columns_name)]) - // --> Project([col("a"), col(factored_columns_name).alias("b"), foo(col(factored_columns_name)).alias("c")]) let expected = scan_plan.select(vec![col("a").alias("a")])?.build(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( expected, @@ -476,8 +481,7 @@ mod tests { ], )?) .arced(); - - assert_optimized_plan_eq(project_plan, expected)?; + assert_optimized_plan_eq(project_plan.clone(), expected.clone())?; Ok(()) } @@ -521,14 +525,14 @@ mod tests { .arced(); let expected = LogicalPlan::Project(Project::try_new( expected, - vec![col("a"), col(intermediate_name)], + vec![col(intermediate_name), col("a")], )?) .arced(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( expected, vec![ - col("a"), col(intermediate_name), + col("a"), create_stateful_udf(vec![col(intermediate_name)]) .clone() .alias("b"), @@ -623,4 +627,90 @@ mod tests { assert_optimized_plan_eq(project_plan, expected)?; Ok(()) } + + #[test] + fn test_multiple_with_column_serial_multiarg_with_intermediate_stateless() -> DaftResult<()> { + let scan_op = dummy_scan_operator(vec![ + Field::new("a", daft_core::DataType::Int64), + Field::new("b", daft_core::DataType::Int64), + ]); + let scan_plan = dummy_scan_node(scan_op); + let stacked_stateful_project_expr = create_stateful_udf(vec![create_stateful_udf(vec![ + col("a"), + ]) + .add(create_stateful_udf(vec![col("b")]))]); + + // Add a Projection with StatefulUDF and resource request + // Project([foo(foo(col("a")) + foo(col("b"))).alias("c")]) + let project_plan = scan_plan + .select(vec![stacked_stateful_project_expr.clone().alias("c")])? + .build(); + + let intermediate_name_0 = "__SplitExprByStatefulUDF_1-0_stateful__"; + let intermediate_name_1 = "__SplitExprByStatefulUDF_1-1_stateful__"; + let intermediate_name_2 = "__SplitExprByStatefulUDF_0-0_stateful_child__"; + let expected = scan_plan.select(vec![col("a"), col("b")])?.build(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col("a"), + col("b"), + create_stateful_udf(vec![col("a")]) + .clone() + .alias(intermediate_name_0), + ], + NUM_ACTORS, + )?) + .arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col("a"), + col("b"), + col(intermediate_name_0), + create_stateful_udf(vec![col("b")]) + .clone() + .alias(intermediate_name_1), + ], + NUM_ACTORS, + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![col(intermediate_name_0), col(intermediate_name_1)], + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![ + col(intermediate_name_0), + col(intermediate_name_1), + col(intermediate_name_0) + .add(col(intermediate_name_1)) + .alias(intermediate_name_2), + ], + )?) + .arced(); + let expected = + LogicalPlan::Project(Project::try_new(expected, vec![col(intermediate_name_2)])?) + .arced(); + let expected = + LogicalPlan::Project(Project::try_new(expected, vec![col(intermediate_name_2)])?) + .arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col(intermediate_name_2), + create_stateful_udf(vec![col(intermediate_name_2)]) + .clone() + .alias("c"), + ], + NUM_ACTORS, + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new(expected, vec![col("c")])?).arced(); + + assert_optimized_plan_eq(project_plan, expected)?; + Ok(()) + } } From c9120ce39c72e4b5fdba6d74c90557e9dab4de09 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 13 Aug 2024 10:01:56 -0700 Subject: [PATCH 08/18] Add tests with Projection Pushdown enabled --- .../rules/split_actor_pool_projects.rs | 165 ++++++++++++++++-- 1 file changed, 149 insertions(+), 16 deletions(-) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index d87313b035..86534826c7 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -382,6 +382,23 @@ mod tests { ) } + /// Helper that creates an optimizer with the SplitExprByStatefulUDF rule registered, optimizes + /// the provided plan with said optimizer, and compares the optimized plan with + /// the provided expected plan. + fn assert_optimized_plan_eq_with_projection_pushdown( + plan: Arc, + expected: Arc, + ) -> DaftResult<()> { + assert_optimized_plan_with_rules_eq( + plan, + expected, + vec![ + Box::new(SplitActorPoolProjects {}), + Box::new(PushDownProjection::new()), + ], + ) + } + #[cfg(not(feature = "python"))] fn create_stateful_udf(inputs: Vec) -> ExprRef { Expr::Function { @@ -483,6 +500,28 @@ mod tests { .arced(); assert_optimized_plan_eq(project_plan.clone(), expected.clone())?; + // With Projection Pushdown, elide intermediate Projects and also perform column pushdown + let expected = scan_plan.select(vec![col("a").alias("a")])?.build(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col("a"), + stateful_project_expr.clone().alias(factored_column_name), + ], + NUM_ACTORS, + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![ + col("a"), + col(factored_column_name).alias("b"), + col(factored_column_name).alias("c"), + ], + )?) + .arced(); + assert_optimized_plan_eq_with_projection_pushdown(project_plan, expected)?; + Ok(()) } @@ -499,12 +538,6 @@ mod tests { .with_columns(vec![stacked_stateful_project_expr.clone().alias("b")], None)? .build(); - // Project([col("a")]) - // --> ActorPoolProject([col("a"), foo(col("a")).alias("__SplitExprByStatefulUDF_0-0_stateful_child__")]) - // --> Project([col("a"), col("__SplitExprByStatefulUDF_0-0_stateful_child__")]) - // --> Project([col("a"), col("__SplitExprByStatefulUDF_0-0_stateful_child__")]) - // --> ActorPoolProject([col("a"), foo(col("__SplitExprByStatefulUDF_0-0_stateful_child__")).alias("b")]) - // --> Project([col("a"), col("b")]) let intermediate_name = "__SplitExprByStatefulUDF_0-0_stateful_child__"; let expected = scan_plan.select(vec![col("a")])?.build(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( @@ -542,8 +575,38 @@ mod tests { .arced(); let expected = LogicalPlan::Project(Project::try_new(expected, vec![col("a"), col("b")])?).arced(); + assert_optimized_plan_eq(project_plan.clone(), expected.clone())?; - assert_optimized_plan_eq(project_plan, expected)?; + // With Projection Pushdown, elide intermediate Projects and also perform column pushdown + let expected = scan_plan.build(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col("a"), + create_stateful_udf(vec![col("a")]) + .clone() + .alias(intermediate_name), + ], + NUM_ACTORS, + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![col(intermediate_name), col("a")], + )?) + .arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col("a"), + create_stateful_udf(vec![col(intermediate_name)]) + .clone() + .alias("b"), + ], + NUM_ACTORS, + )?) + .arced(); + assert_optimized_plan_eq_with_projection_pushdown(project_plan, expected)?; Ok(()) } @@ -565,13 +628,6 @@ mod tests { .select(vec![stacked_stateful_project_expr.clone().alias("c")])? .build(); - // Project([col("a"), col("b")]) - // --> ActorPoolProject([col("b"), foo(col("a")).alias("__SplitExprByStatefulUDF_0-0_stateful_child__")]) - // --> ActorPoolProject([col("__SplitExprByStatefulUDF_0-0_stateful_child__"), foo(col("b")).alias("__SplitExprByStatefulUDF_0-1_stateful_child__")]) - // --> Project([col("__SplitExprByStatefulUDF_0-0_stateful_child__"), col("__SplitExprByStatefulUDF_0-1_stateful_child__")]) - // --> Project([col("__SplitExprByStatefulUDF_0-0_stateful_child__"), col("__SplitExprByStatefulUDF_0-1_stateful_child__")]) - // --> ActorPoolProject([foo(col("__SplitExprByStatefulUDF_0-0_stateful_child__"), col("__SplitExprByStatefulUDF_0-1_stateful_child__")).alias("c")]) - // --> Project([col("c")]) let intermediate_name_0 = "__SplitExprByStatefulUDF_0-0_stateful_child__"; let intermediate_name_1 = "__SplitExprByStatefulUDF_0-1_stateful_child__"; let expected = scan_plan.select(vec![col("a"), col("b")])?.build(); @@ -623,8 +679,44 @@ mod tests { )?) .arced(); let expected = LogicalPlan::Project(Project::try_new(expected, vec![col("c")])?).arced(); + assert_optimized_plan_eq(project_plan.clone(), expected.clone())?; - assert_optimized_plan_eq(project_plan, expected)?; + // With Projection Pushdown, elide intermediate Projects and also perform column pushdown + let expected = scan_plan.build(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col("a"), // TODO: This should be able to be pruned as well, but it seems Projection Pushdown isn't working as intended + col("b"), + create_stateful_udf(vec![col("a")]) + .clone() + .alias(intermediate_name_0), + ], + NUM_ACTORS, + )?) + .arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col(intermediate_name_0), + create_stateful_udf(vec![col("b")]) + .clone() + .alias(intermediate_name_1), + ], + NUM_ACTORS, + )?) + .arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + create_stateful_udf(vec![col(intermediate_name_0), col(intermediate_name_1)]) + .clone() + .alias("c"), + ], + NUM_ACTORS, + )?) + .arced(); + assert_optimized_plan_eq_with_projection_pushdown(project_plan.clone(), expected.clone())?; Ok(()) } @@ -709,8 +801,49 @@ mod tests { )?) .arced(); let expected = LogicalPlan::Project(Project::try_new(expected, vec![col("c")])?).arced(); + assert_optimized_plan_eq(project_plan.clone(), expected.clone())?; - assert_optimized_plan_eq(project_plan, expected)?; + // With Projection Pushdown, elide intermediate Projects and also perform column pushdown + let expected = scan_plan.build(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col("a"), // TODO: This should be pruned by Projection Pushdown, but isn't for some reason + col("b"), + create_stateful_udf(vec![col("a")]) + .clone() + .alias(intermediate_name_0), + ], + NUM_ACTORS, + )?) + .arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col(intermediate_name_0), + create_stateful_udf(vec![col("b")]) + .clone() + .alias(intermediate_name_1), + ], + NUM_ACTORS, + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![col(intermediate_name_0) + .add(col(intermediate_name_1)) + .alias(intermediate_name_2)], + )?) + .arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![create_stateful_udf(vec![col(intermediate_name_2)]) + .clone() + .alias("c")], + NUM_ACTORS, + )?) + .arced(); + assert_optimized_plan_eq_with_projection_pushdown(project_plan.clone(), expected.clone())?; Ok(()) } } From d1be209090c3037bd44cdb806081d99f9e83fc4a Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 13 Aug 2024 20:35:22 -0700 Subject: [PATCH 09/18] Add fix for testcase with repeated ColExprs --- .../rules/split_actor_pool_projects.rs | 107 +++++++++++++++++- 1 file changed, 104 insertions(+), 3 deletions(-) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 86534826c7..1e8ab23823 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -22,6 +22,12 @@ use super::{ApplyOrder, OptimizerRule, Transformed}; #[derive(Default, Debug)] pub struct SplitActorPoolProjects {} +impl SplitActorPoolProjects { + pub fn new() -> Self { + Self {} + } +} + /// Implement SplitActorPoolProjects as an OptimizerRule which will: /// /// 1. Go top-down from the root of the LogicalPlan @@ -130,7 +136,14 @@ impl TreeNodeRewriter for SplitExprByStatefulUDF { } // If we encounter a ColumnExpr, we only add it to the remaining exprs if it wasn't a ColumnExpr that was added by SplitExprByStatefulUDF Expr::Column(name) if !self.truncated_node_names.contains(name.as_ref()) => { - self.remaining_exprs.push(node.clone()); + if !self + .remaining_exprs + .iter() + .map(|e| e.name()) + .contains(&name.as_ref()) + { + self.remaining_exprs.push(node.clone()); + } Ok(common_treenode::Transformed::no(node)) } // If we encounter a stateless expression, we try to truncate any children that are StatefulUDFs @@ -211,13 +224,24 @@ fn try_optimize_project( // * remaining: remaining parts of the Project to recurse on // * truncated_exprs: current parts of the Project to split into (Project -> ActorPoolProjects -> Project) let (remaining, truncated_exprs): (Vec, Vec) = { - let mut remaining = Vec::new(); + let mut remaining: Vec = Vec::new(); let mut truncated_exprs = Vec::new(); for expr in projection.projection.iter() { let mut rewriter = SplitExprByStatefulUDF::new(recursive_count); let root = expr.clone().rewrite(&mut rewriter)?.data; truncated_exprs.push(root); - remaining.extend(rewriter.remaining_exprs); + + let filtered_remaining_exprs = rewriter + .remaining_exprs + .into_iter() + .filter(|new| { + !remaining + .iter() + .map(|existing| existing.name()) + .contains(&new.name()) + }) + .collect_vec(); + remaining.extend(filtered_remaining_exprs); } (remaining, truncated_exprs) }; @@ -229,6 +253,10 @@ fn try_optimize_project( .map(|e| e.as_ref().to_string()) .join(", ") ); + log::debug!( + "Remaining: {}", + remaining.iter().map(|e| e.as_ref().to_string()).join(", ") + ); // Recurse if necessary (if there are any non-noop expressions left to run in `remaining`) let new_plan_child = if remaining @@ -846,4 +874,77 @@ mod tests { assert_optimized_plan_eq_with_projection_pushdown(project_plan.clone(), expected.clone())?; Ok(()) } + + #[test] + fn test_nested_with_column_same_names() -> DaftResult<()> { + let scan_op = dummy_scan_operator(vec![Field::new("a", daft_core::DataType::Int64)]); + let scan_plan = dummy_scan_node(scan_op); + let stacked_stateful_project_expr = + create_stateful_udf(vec![col("a").add(create_stateful_udf(vec![col("a")]))]); + + // Add a Projection with StatefulUDF and resource request + // Project([foo(col("a") + foo(col("a"))).alias("c")]) + let project_plan = scan_plan + .select(vec![ + col("a"), + stacked_stateful_project_expr.clone().alias("c"), + ])? + .build(); + + let intermediate_name_0 = "__SplitExprByStatefulUDF_1-0_stateful__"; + let intermediate_name_1 = "__SplitExprByStatefulUDF_0-0_stateful_child__"; + let expected = scan_plan.build(); + let expected = LogicalPlan::Project(Project::try_new(expected, vec![col("a")])?).arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col("a"), + create_stateful_udf(vec![col("a")]).alias(intermediate_name_0), + ], + NUM_ACTORS, + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![col("a"), col(intermediate_name_0)], + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![ + col(intermediate_name_0), + col("a"), + col("a") + .add(col(intermediate_name_0)) + .alias(intermediate_name_1), + ], + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![col("a"), col(intermediate_name_1)], + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![col(intermediate_name_1), col("a")], + )?) + .arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col(intermediate_name_1), + col("a"), + create_stateful_udf(vec![col(intermediate_name_1)]).alias("c"), + ], + NUM_ACTORS, + )?) + .arced(); + let expected = + LogicalPlan::Project(Project::try_new(expected, vec![col("a"), col("c")])?).arced(); + + assert_optimized_plan_eq(project_plan.clone(), expected.clone())?; + + Ok(()) + } } From 3c3e009a30db191f3b70a7b338ff1b819d6564fd Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Wed, 14 Aug 2024 17:19:16 -0700 Subject: [PATCH 10/18] Refactor for code cleanup and add new unit test --- .../rules/split_actor_pool_projects.rs | 340 +++++++++++++----- 1 file changed, 259 insertions(+), 81 deletions(-) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 1e8ab23823..616c4c0e68 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -1,13 +1,13 @@ use std::{collections::HashSet, iter, sync::Arc}; use common_error::DaftResult; -use common_treenode::{TreeNode, TreeNodeRewriter}; +use common_treenode::{TreeNode, TreeNodeRecursion, TreeNodeRewriter}; use daft_dsl::{ functions::{ python::{PythonUDF, StatefulPythonUDF}, FunctionExpr, }, - optimization::requires_computation, + optimization::{get_required_columns, requires_computation}, Expr, ExprRef, }; use itertools::Itertools; @@ -70,56 +70,75 @@ impl OptimizerRule for SplitActorPoolProjects { } } -struct SplitExprByStatefulUDF { - // Initialized to True, but once we encounter non-aliases this will be set to false - is_parsing_stateful_udf: bool, - remaining_exprs: Vec, - stage_id: usize, - monotonically_increasing_expr_identifier: usize, - truncated_node_names: HashSet, +// TreeNodeRewriter that assumes the Expression tree is rooted at a StatefulUDF (or alias of a StatefulUDF) +// and its children need to be truncated + replaced with Expr::Columns +struct TruncateRootStatefulUDF { + pub(crate) new_children: Vec, + stage_idx: usize, + expr_idx: usize, } -impl SplitExprByStatefulUDF { - fn new(stage_id: usize) -> Self { +impl TruncateRootStatefulUDF { + fn new(stage_idx: usize, expr_idx: usize) -> Self { Self { - is_parsing_stateful_udf: true, - remaining_exprs: Vec::new(), - stage_id, - monotonically_increasing_expr_identifier: 0, - truncated_node_names: HashSet::new(), + new_children: Vec::new(), + stage_idx, + expr_idx, } } } -impl TreeNodeRewriter for SplitExprByStatefulUDF { +// TreeNodeRewriter that assumes the Expression tree has some children which are StatefulUDFs +// which needs to be truncated and replaced with Expr::Columns +struct TruncateAnyStatefulUDFChildren { + pub(crate) new_children: Vec, + stage_idx: usize, + expr_idx: usize, +} + +impl TruncateAnyStatefulUDFChildren { + fn new(stage_idx: usize, expr_idx: usize) -> Self { + Self { + new_children: Vec::new(), + stage_idx, + expr_idx, + } + } +} + +impl TreeNodeRewriter for TruncateRootStatefulUDF { type Node = ExprRef; fn f_down(&mut self, node: Self::Node) -> DaftResult> { match node.as_ref() { - // Encountered alias: keep going if we are parsing stateful UDFs because we should ignoring aliases - Expr::Alias { .. } if self.is_parsing_stateful_udf => { + // If we encounter a ColumnExpr, we add it to new_children only if it hasn't already been accounted for + Expr::Column(name) => { + if !self + .new_children + .iter() + .map(|e| e.name()) + .contains(&name.as_ref()) + { + self.new_children.push(node.clone()); + } Ok(common_treenode::Transformed::no(node)) } - // Encountered stateful UDF: chop off all children and add to self.next_exprs + // Encountered stateful UDF: chop off all children and add to self.next_children Expr::Function { func: FunctionExpr::Python(PythonUDF::Stateful(StatefulPythonUDF { .. })), inputs, } => { - assert!(self.is_parsing_stateful_udf, "SplitExprByStatefulUDF.is_parsing_stateful_udf should be True if we encounter a stateful UDF expression"); - + let mut monotonically_increasing_expr_identifier = 0; let new_inputs = inputs.iter().map(|e| { if requires_computation(e.as_ref()) { // Give the new child a deterministic name let intermediate_expr_name = format!( - "__SplitExprByStatefulUDF_{}-{}_stateful_child__", - self.stage_id, self.monotonically_increasing_expr_identifier + "__TruncateRootStatefulUDF_{}-{}-{}__", + self.stage_idx, self.expr_idx, monotonically_increasing_expr_identifier ); - self.monotonically_increasing_expr_identifier += 1; - self.truncated_node_names - .insert(intermediate_expr_name.clone()); + monotonically_increasing_expr_identifier += 1; - // Truncate the child and push it onto the stack to indicate that it needs computation in a different stage - self.remaining_exprs + self.new_children .push(e.clone().alias(intermediate_expr_name.as_str())); Expr::Column(intermediate_expr_name.as_str().into()).arced() } else { @@ -127,30 +146,41 @@ impl TreeNodeRewriter for SplitExprByStatefulUDF { } }); let new_truncated_node = node.with_new_children(new_inputs.collect()).arced(); + Ok(common_treenode::Transformed::yes(new_truncated_node)) + } + _ => Ok(common_treenode::Transformed::no(node)), + } + } +} + +impl TreeNodeRewriter for TruncateAnyStatefulUDFChildren { + type Node = ExprRef; - Ok(common_treenode::Transformed::new( - new_truncated_node, - true, - common_treenode::TreeNodeRecursion::Jump, - )) + fn f_down(&mut self, node: Self::Node) -> DaftResult> { + match node.as_ref() { + // This rewriter should never encounter a StatefulUDF expression (they should always be truncated and replaced) + Expr::Function { + func: FunctionExpr::Python(PythonUDF::Stateful(StatefulPythonUDF { .. })), + .. + } => { + unreachable!( + "TruncateAnyStatefulUDFChildren should never run on a StatefulUDF expression" + ); } - // If we encounter a ColumnExpr, we only add it to the remaining exprs if it wasn't a ColumnExpr that was added by SplitExprByStatefulUDF - Expr::Column(name) if !self.truncated_node_names.contains(name.as_ref()) => { + // If we encounter a ColumnExpr, we add it to new_children only if it hasn't already been accounted for + Expr::Column(name) => { if !self - .remaining_exprs + .new_children .iter() .map(|e| e.name()) .contains(&name.as_ref()) { - self.remaining_exprs.push(node.clone()); + self.new_children.push(node.clone()); } Ok(common_treenode::Transformed::no(node)) } - // If we encounter a stateless expression, we try to truncate any children that are StatefulUDFs + // Attempt to truncate any children that are StatefulUDFs, replacing them with a Expr::Column expr => { - // Indicate that we are now parsing a stateless expression tree - self.is_parsing_stateful_udf = false; - // None of the direct children are stateful UDFs, so we keep going if node.children().iter().all(|e| { !matches!( @@ -166,7 +196,7 @@ impl TreeNodeRewriter for SplitExprByStatefulUDF { return Ok(common_treenode::Transformed::no(node)); } - // If any children are stateful UDFs, we truncate + let mut monotonically_increasing_expr_identifier = 0; let inputs = expr.children(); let new_inputs = inputs.iter().map(|e| { if matches!( @@ -179,13 +209,12 @@ impl TreeNodeRewriter for SplitExprByStatefulUDF { } ) { let intermediate_expr_name = format!( - "__SplitExprByStatefulUDF_{}-{}_stateful__", - self.stage_id, self.monotonically_increasing_expr_identifier + "__TruncateAnyStatefulUDFChildren_{}-{}-{}__", + self.stage_idx, self.expr_idx, monotonically_increasing_expr_identifier ); - self.monotonically_increasing_expr_identifier += 1; - self.truncated_node_names - .insert(intermediate_expr_name.clone()); - self.remaining_exprs + monotonically_increasing_expr_identifier += 1; + + self.new_children .push(e.clone().alias(intermediate_expr_name.as_str())); Expr::Column(intermediate_expr_name.as_str().into()).arced() } else { @@ -199,6 +228,82 @@ impl TreeNodeRewriter for SplitExprByStatefulUDF { } } +/// Splits a projection down into two sets of new projections: (truncated_exprs, new_children) +/// +/// `truncated_exprs` are the newly truncated expressions from `projection`. This has the same +/// length as `projection`, and also the same names as each Expr in `projection`. However, their +/// children are (potentially) truncated and replaced with Expr::Column nodes, which refer to +/// Exprs in `new_children`. +/// +/// `new_children` are the new children of `truncated_exprs`. Every Expr::Column leaf node in +/// `truncated_exprs` should have a corresponding expression in `new_children` with the appropriate +/// name. +fn split_projection( + projection: &[ExprRef], + stage_idx: usize, +) -> DaftResult<(Vec, Vec)> { + let mut truncated_exprs = Vec::new(); + let (mut new_children_seen, mut new_children): (HashSet, Vec) = + (HashSet::new(), Vec::new()); + + fn _is_stateful_udf_and_should_truncate_children(expr: &ExprRef) -> bool { + let mut is_stateful_udf = true; + expr.apply(|e| match e.as_ref() { + Expr::Alias(..) => Ok(TreeNodeRecursion::Continue), + Expr::Function { + func: FunctionExpr::Python(PythonUDF::Stateful(StatefulPythonUDF { .. })), + .. + } => Ok(TreeNodeRecursion::Stop), + _ => { + is_stateful_udf = false; + Ok(TreeNodeRecursion::Stop) + } + }) + .unwrap(); + is_stateful_udf + } + + for (expr_idx, expr) in projection.iter().enumerate() { + // Run the TruncateRootStatefulUDF TreeNodeRewriter + if _is_stateful_udf_and_should_truncate_children(expr) { + let mut rewriter = TruncateRootStatefulUDF::new(stage_idx, expr_idx); + let rewritten_root = expr.clone().rewrite(&mut rewriter)?.data; + truncated_exprs.push(rewritten_root); + for new_child in rewriter.new_children { + if !new_children_seen.contains(new_child.name()) { + new_children_seen.insert(new_child.name().to_string()); + new_children.push(new_child.clone()); + } + } + + // Run the TruncateAnyStatefulUDFChildren TreeNodeRewriter + } else if has_stateful_udf(expr) { + let mut rewriter = TruncateAnyStatefulUDFChildren::new(stage_idx, expr_idx); + let rewritten_root = expr.clone().rewrite(&mut rewriter)?.data; + truncated_exprs.push(rewritten_root); + for new_child in rewriter.new_children { + if !new_children_seen.contains(new_child.name()) { + new_children_seen.insert(new_child.name().to_string()); + new_children.push(new_child.clone()); + } + } + + // No need to rewrite the tree + } else { + truncated_exprs.push(expr.clone()); + for required_col_name in get_required_columns(expr) { + if !new_children_seen.contains(&required_col_name) { + let colexpr = Expr::Column(required_col_name.as_str().into()).arced(); + new_children_seen.insert(required_col_name); + new_children.push(colexpr); + } + } + } + } + + Ok((truncated_exprs, new_children)) +} + fn try_optimize_project( projection: &Project, plan: Arc, @@ -223,28 +328,8 @@ fn try_optimize_project( // Split the Projection into: // * remaining: remaining parts of the Project to recurse on // * truncated_exprs: current parts of the Project to split into (Project -> ActorPoolProjects -> Project) - let (remaining, truncated_exprs): (Vec, Vec) = { - let mut remaining: Vec = Vec::new(); - let mut truncated_exprs = Vec::new(); - for expr in projection.projection.iter() { - let mut rewriter = SplitExprByStatefulUDF::new(recursive_count); - let root = expr.clone().rewrite(&mut rewriter)?.data; - truncated_exprs.push(root); - - let filtered_remaining_exprs = rewriter - .remaining_exprs - .into_iter() - .filter(|new| { - !remaining - .iter() - .map(|existing| existing.name()) - .contains(&new.name()) - }) - .collect_vec(); - remaining.extend(filtered_remaining_exprs); - } - (remaining, truncated_exprs) - }; + let (truncated_exprs, remaining): (Vec, Vec) = + split_projection(projection.projection.as_slice(), recursive_count)?; log::debug!( "Truncated Exprs: {}", @@ -452,7 +537,6 @@ mod tests { // TODO: need to figure out how users will pass this in static NUM_ACTORS: usize = 1; - #[cfg(not(feature = "python"))] #[test] fn test_with_column_stateful_udf_happypath() -> DaftResult<()> { let scan_op = dummy_scan_operator(vec![Field::new("a", daft_core::DataType::Utf8)]); @@ -480,9 +564,103 @@ mod tests { Ok(()) } - #[cfg(not(feature = "python"))] #[test] fn test_multiple_with_column_parallel() -> DaftResult<()> { + let scan_op = dummy_scan_operator(vec![ + Field::new("a", daft_core::DataType::Utf8), + Field::new("b", daft_core::DataType::Utf8), + ]); + let scan_plan = dummy_scan_node(scan_op); + let project_plan = scan_plan + .with_columns( + vec![ + create_stateful_udf(vec![create_stateful_udf(vec![col("a")])]).alias("a_prime"), + create_stateful_udf(vec![create_stateful_udf(vec![col("b")])]).alias("b_prime"), + ], + None, + )? + .build(); + + let intermediate_column_name_0 = "__TruncateRootStatefulUDF_0-2-0__"; + let intermediate_column_name_1 = "__TruncateRootStatefulUDF_0-3-0__"; + let expected = scan_plan.select(vec![col("a"), col("b")])?.build(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col("a"), + col("b"), + create_stateful_udf(vec![col("a")]).alias(intermediate_column_name_0), + ], + NUM_ACTORS, + )?) + .arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col("a"), + col("b"), + col(intermediate_column_name_0), + create_stateful_udf(vec![col("b")]).alias(intermediate_column_name_1), + ], + NUM_ACTORS, + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![ + col("a"), + col("b"), + col(intermediate_column_name_0), + col(intermediate_column_name_1), + ], + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![ + col(intermediate_column_name_0), + col(intermediate_column_name_1), + col("a"), + col("b"), + ], + )?) + .arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col(intermediate_column_name_0), + col(intermediate_column_name_1), + col("a"), + col("b"), + create_stateful_udf(vec![col(intermediate_column_name_0)]).alias("a_prime"), + ], + NUM_ACTORS, + )?) + .arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col(intermediate_column_name_0), + col(intermediate_column_name_1), + col("a"), + col("b"), + col("a_prime"), + create_stateful_udf(vec![col(intermediate_column_name_1)]).alias("b_prime"), + ], + NUM_ACTORS, + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![col("a"), col("b"), col("a_prime"), col("b_prime")], + )?) + .arced(); + assert_optimized_plan_eq(project_plan, expected)?; + Ok(()) + } + + #[test] + fn test_multiple_with_column_parallel_common_subtree_eliminated() -> DaftResult<()> { let scan_op = dummy_scan_operator(vec![Field::new("a", daft_core::DataType::Utf8)]); let scan_plan = dummy_scan_node(scan_op); let stateful_project_expr = create_stateful_udf(vec![col("a")]); @@ -566,7 +744,7 @@ mod tests { .with_columns(vec![stacked_stateful_project_expr.clone().alias("b")], None)? .build(); - let intermediate_name = "__SplitExprByStatefulUDF_0-0_stateful_child__"; + let intermediate_name = "__TruncateRootStatefulUDF_0-1-0__"; let expected = scan_plan.select(vec![col("a")])?.build(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( expected, @@ -656,8 +834,8 @@ mod tests { .select(vec![stacked_stateful_project_expr.clone().alias("c")])? .build(); - let intermediate_name_0 = "__SplitExprByStatefulUDF_0-0_stateful_child__"; - let intermediate_name_1 = "__SplitExprByStatefulUDF_0-1_stateful_child__"; + let intermediate_name_0 = "__TruncateRootStatefulUDF_0-0-0__"; + let intermediate_name_1 = "__TruncateRootStatefulUDF_0-0-1__"; let expected = scan_plan.select(vec![col("a"), col("b")])?.build(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( expected, @@ -766,9 +944,9 @@ mod tests { .select(vec![stacked_stateful_project_expr.clone().alias("c")])? .build(); - let intermediate_name_0 = "__SplitExprByStatefulUDF_1-0_stateful__"; - let intermediate_name_1 = "__SplitExprByStatefulUDF_1-1_stateful__"; - let intermediate_name_2 = "__SplitExprByStatefulUDF_0-0_stateful_child__"; + let intermediate_name_0 = "__TruncateAnyStatefulUDFChildren_1-0-0__"; + let intermediate_name_1 = "__TruncateAnyStatefulUDFChildren_1-0-1__"; + let intermediate_name_2 = "__TruncateRootStatefulUDF_0-0-0__"; let expected = scan_plan.select(vec![col("a"), col("b")])?.build(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( expected, @@ -891,8 +1069,8 @@ mod tests { ])? .build(); - let intermediate_name_0 = "__SplitExprByStatefulUDF_1-0_stateful__"; - let intermediate_name_1 = "__SplitExprByStatefulUDF_0-0_stateful_child__"; + let intermediate_name_0 = "__TruncateAnyStatefulUDFChildren_1-1-0__"; + let intermediate_name_1 = "__TruncateRootStatefulUDF_0-1-0__"; let expected = scan_plan.build(); let expected = LogicalPlan::Project(Project::try_new(expected, vec![col("a")])?).arced(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( From 252726c1d676772981c7370dcb6631955edc9b8a Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Wed, 14 Aug 2024 19:09:50 -0700 Subject: [PATCH 11/18] Rebase with new main changes --- .../rules/split_actor_pool_projects.rs | 29 ++++++++----------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 616c4c0e68..a2501bac3d 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -520,6 +520,7 @@ mod tests { num_expressions: inputs.len(), return_dtype: daft_core::DataType::Int64, resource_request: Some(create_resource_request()), + batch_size: None, })), inputs, } @@ -545,7 +546,7 @@ mod tests { // Add a Projection with StatefulUDF and resource request let project_plan = scan_plan - .with_columns(vec![stateful_project_expr.clone().alias("b")], None)? + .with_columns(vec![stateful_project_expr.clone().alias("b")])? .build(); // Project([col("a")]) --> ActorPoolProject([col("a"), foo(col("a")).alias("b")]) --> Project([col("a"), col("b")]) @@ -572,13 +573,10 @@ mod tests { ]); let scan_plan = dummy_scan_node(scan_op); let project_plan = scan_plan - .with_columns( - vec![ - create_stateful_udf(vec![create_stateful_udf(vec![col("a")])]).alias("a_prime"), - create_stateful_udf(vec![create_stateful_udf(vec![col("b")])]).alias("b_prime"), - ], - None, - )? + .with_columns(vec![ + create_stateful_udf(vec![create_stateful_udf(vec![col("a")])]).alias("a_prime"), + create_stateful_udf(vec![create_stateful_udf(vec![col("b")])]).alias("b_prime"), + ])? .build(); let intermediate_column_name_0 = "__TruncateRootStatefulUDF_0-2-0__"; @@ -669,15 +667,12 @@ mod tests { // NOTE: Our common-subtree elimination will build this as 2 project nodes: // Project([col("a").alias("a"), foo(col("a")).alias(factored_column_name)]) // --> Project([col("a"), col(factored_column_name).alias("b"), col(factored_column_name).alias("c")]) - let factored_column_name = "Function_Python(Stateful(StatefulPythonUDF { name: \"foo\", num_expressions: 1, return_dtype: Int64, resource_request: Some(ResourceRequest { num_cpus: Some(8.0), num_gpus: Some(1.0), memory_bytes: None }) }))(a)"; + let factored_column_name = "Function_Python(Stateful(StatefulPythonUDF { name: \"foo\", num_expressions: 1, return_dtype: Int64, resource_request: Some(ResourceRequest { num_cpus: Some(8.0), num_gpus: Some(1.0), memory_bytes: None }), batch_size: None }))(a)"; let project_plan = scan_plan - .with_columns( - vec![ - stateful_project_expr.clone().alias("b"), - stateful_project_expr.clone().alias("c"), - ], - None, - )? + .with_columns(vec![ + stateful_project_expr.clone().alias("b"), + stateful_project_expr.clone().alias("c"), + ])? .build(); let expected = scan_plan.select(vec![col("a").alias("a")])?.build(); @@ -741,7 +736,7 @@ mod tests { // Add a Projection with StatefulUDF and resource request // Project([col("a"), foo(foo(col("a"))).alias("b")]) let project_plan = scan_plan - .with_columns(vec![stacked_stateful_project_expr.clone().alias("b")], None)? + .with_columns(vec![stacked_stateful_project_expr.clone().alias("b")])? .build(); let intermediate_name = "__TruncateRootStatefulUDF_0-1-0__"; From afccf9e7cdd0f58ce6c368f6bcd565125a00348c Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Thu, 15 Aug 2024 15:31:18 -0700 Subject: [PATCH 12/18] Rebase --- .../rules/push_down_projection.rs | 2 - .../rules/split_actor_pool_projects.rs | 48 ++----------------- 2 files changed, 5 insertions(+), 45 deletions(-) diff --git a/src/daft-plan/src/logical_optimization/rules/push_down_projection.rs b/src/daft-plan/src/logical_optimization/rules/push_down_projection.rs index 7f5341cbf1..b6c0e3e58f 100644 --- a/src/daft-plan/src/logical_optimization/rules/push_down_projection.rs +++ b/src/daft-plan/src/logical_optimization/rules/push_down_projection.rs @@ -796,7 +796,6 @@ mod tests { } /// Projection<-ActorPoolProject prunes columns from the ActorPoolProject - #[cfg(not(feature = "python"))] #[test] fn test_projection_pushdown_into_actorpoolproject() -> DaftResult<()> { use crate::logical_ops::ActorPoolProject; @@ -848,7 +847,6 @@ mod tests { } /// Projection<-ActorPoolProject prunes ActorPoolProject entirely if the stateful projection column is pruned - #[cfg(not(feature = "python"))] #[test] fn test_projection_pushdown_into_actorpoolproject_completely_removed() -> DaftResult<()> { use crate::logical_ops::ActorPoolProject; diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index a2501bac3d..ba65678540 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -57,13 +57,8 @@ impl OptimizerRule for SplitActorPoolProjects { } fn try_optimize(&self, plan: Arc) -> DaftResult>> { - // TODO: Figure out num_actors! How do we propagate this correctly? - let num_actors = 1; - match plan.as_ref() { - LogicalPlan::Project(projection) => { - try_optimize_project(projection, plan.clone(), num_actors, 0) - } + LogicalPlan::Project(projection) => try_optimize_project(projection, plan.clone(), 0), // TODO: Figure out how to split other nodes as well such as Filter, Agg etc _ => Ok(Transformed::No(plan)), } @@ -307,7 +302,6 @@ fn split_projection( fn try_optimize_project( projection: &Project, plan: Arc, - num_actors: usize, recursive_count: usize, ) -> DaftResult>> { // Base case: no stateful UDFs at all @@ -354,12 +348,8 @@ fn try_optimize_project( // Recursively run the rule on the new child Project let new_project = Project::try_new(plan.children()[0].clone(), remaining)?; let new_child_project = LogicalPlan::Project(new_project.clone()).arced(); - let optimized_child_plan = try_optimize_project( - &new_project, - new_child_project.clone(), - num_actors, - recursive_count + 1, - )?; + let optimized_child_plan = + try_optimize_project(&new_project, new_child_project.clone(), recursive_count + 1)?; optimized_child_plan.unwrap().clone() }; @@ -418,7 +408,6 @@ fn try_optimize_project( child = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( child, stateful_projection, - num_actors, )?) .arced(); } @@ -521,6 +510,7 @@ mod tests { return_dtype: daft_core::DataType::Int64, resource_request: Some(create_resource_request()), batch_size: None, + concurrency: Some(8), })), inputs, } @@ -535,9 +525,6 @@ mod tests { } } - // TODO: need to figure out how users will pass this in - static NUM_ACTORS: usize = 1; - #[test] fn test_with_column_stateful_udf_happypath() -> DaftResult<()> { let scan_op = dummy_scan_operator(vec![Field::new("a", daft_core::DataType::Utf8)]); @@ -554,7 +541,6 @@ mod tests { let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( expected, vec![col("a"), stateful_project_expr.clone().alias("b")], - NUM_ACTORS, )?) .arced(); let expected = @@ -589,7 +575,6 @@ mod tests { col("b"), create_stateful_udf(vec![col("a")]).alias(intermediate_column_name_0), ], - NUM_ACTORS, )?) .arced(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( @@ -600,7 +585,6 @@ mod tests { col(intermediate_column_name_0), create_stateful_udf(vec![col("b")]).alias(intermediate_column_name_1), ], - NUM_ACTORS, )?) .arced(); let expected = LogicalPlan::Project(Project::try_new( @@ -632,7 +616,6 @@ mod tests { col("b"), create_stateful_udf(vec![col(intermediate_column_name_0)]).alias("a_prime"), ], - NUM_ACTORS, )?) .arced(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( @@ -645,7 +628,6 @@ mod tests { col("a_prime"), create_stateful_udf(vec![col(intermediate_column_name_1)]).alias("b_prime"), ], - NUM_ACTORS, )?) .arced(); let expected = LogicalPlan::Project(Project::try_new( @@ -667,7 +649,7 @@ mod tests { // NOTE: Our common-subtree elimination will build this as 2 project nodes: // Project([col("a").alias("a"), foo(col("a")).alias(factored_column_name)]) // --> Project([col("a"), col(factored_column_name).alias("b"), col(factored_column_name).alias("c")]) - let factored_column_name = "Function_Python(Stateful(StatefulPythonUDF { name: \"foo\", num_expressions: 1, return_dtype: Int64, resource_request: Some(ResourceRequest { num_cpus: Some(8.0), num_gpus: Some(1.0), memory_bytes: None }), batch_size: None }))(a)"; + let factored_column_name = "Function_Python(Stateful(StatefulPythonUDF { name: \"foo\", num_expressions: 1, return_dtype: Int64, resource_request: Some(ResourceRequest { num_cpus: Some(8.0), num_gpus: Some(1.0), memory_bytes: None }), batch_size: None, concurrency: Some(8) }))(a)"; let project_plan = scan_plan .with_columns(vec![ stateful_project_expr.clone().alias("b"), @@ -682,7 +664,6 @@ mod tests { col("a"), stateful_project_expr.clone().alias(factored_column_name), ], - NUM_ACTORS, )?) .arced(); let expected = LogicalPlan::Project(Project::try_new( @@ -709,7 +690,6 @@ mod tests { col("a"), stateful_project_expr.clone().alias(factored_column_name), ], - NUM_ACTORS, )?) .arced(); let expected = LogicalPlan::Project(Project::try_new( @@ -749,7 +729,6 @@ mod tests { .clone() .alias(intermediate_name), ], - NUM_ACTORS, )?) .arced(); let expected = LogicalPlan::Project(Project::try_new( @@ -771,7 +750,6 @@ mod tests { .clone() .alias("b"), ], - NUM_ACTORS, )?) .arced(); let expected = @@ -788,7 +766,6 @@ mod tests { .clone() .alias(intermediate_name), ], - NUM_ACTORS, )?) .arced(); let expected = LogicalPlan::Project(Project::try_new( @@ -804,7 +781,6 @@ mod tests { .clone() .alias("b"), ], - NUM_ACTORS, )?) .arced(); assert_optimized_plan_eq_with_projection_pushdown(project_plan, expected)?; @@ -841,7 +817,6 @@ mod tests { .clone() .alias(intermediate_name_0), ], - NUM_ACTORS, )?) .arced(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( @@ -854,7 +829,6 @@ mod tests { .clone() .alias(intermediate_name_1), ], - NUM_ACTORS, )?) .arced(); let expected = LogicalPlan::Project(Project::try_new( @@ -876,7 +850,6 @@ mod tests { .clone() .alias("c"), ], - NUM_ACTORS, )?) .arced(); let expected = LogicalPlan::Project(Project::try_new(expected, vec![col("c")])?).arced(); @@ -893,7 +866,6 @@ mod tests { .clone() .alias(intermediate_name_0), ], - NUM_ACTORS, )?) .arced(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( @@ -904,7 +876,6 @@ mod tests { .clone() .alias(intermediate_name_1), ], - NUM_ACTORS, )?) .arced(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( @@ -914,7 +885,6 @@ mod tests { .clone() .alias("c"), ], - NUM_ACTORS, )?) .arced(); assert_optimized_plan_eq_with_projection_pushdown(project_plan.clone(), expected.clone())?; @@ -952,7 +922,6 @@ mod tests { .clone() .alias(intermediate_name_0), ], - NUM_ACTORS, )?) .arced(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( @@ -965,7 +934,6 @@ mod tests { .clone() .alias(intermediate_name_1), ], - NUM_ACTORS, )?) .arced(); let expected = LogicalPlan::Project(Project::try_new( @@ -998,7 +966,6 @@ mod tests { .clone() .alias("c"), ], - NUM_ACTORS, )?) .arced(); let expected = LogicalPlan::Project(Project::try_new(expected, vec![col("c")])?).arced(); @@ -1015,7 +982,6 @@ mod tests { .clone() .alias(intermediate_name_0), ], - NUM_ACTORS, )?) .arced(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( @@ -1026,7 +992,6 @@ mod tests { .clone() .alias(intermediate_name_1), ], - NUM_ACTORS, )?) .arced(); let expected = LogicalPlan::Project(Project::try_new( @@ -1041,7 +1006,6 @@ mod tests { vec![create_stateful_udf(vec![col(intermediate_name_2)]) .clone() .alias("c")], - NUM_ACTORS, )?) .arced(); assert_optimized_plan_eq_with_projection_pushdown(project_plan.clone(), expected.clone())?; @@ -1074,7 +1038,6 @@ mod tests { col("a"), create_stateful_udf(vec![col("a")]).alias(intermediate_name_0), ], - NUM_ACTORS, )?) .arced(); let expected = LogicalPlan::Project(Project::try_new( @@ -1110,7 +1073,6 @@ mod tests { col("a"), create_stateful_udf(vec![col(intermediate_name_1)]).alias("c"), ], - NUM_ACTORS, )?) .arced(); let expected = From fe3645ee963da20290714860837261e478abecec Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Thu, 15 Aug 2024 16:33:43 -0700 Subject: [PATCH 13/18] Enable optimization behind feature flag --- .../src/logical_optimization/optimizer.rs | 73 ++++++++++++------- .../src/logical_optimization/rules/mod.rs | 1 + .../rules/split_actor_pool_projects.rs | 1 - 3 files changed, 46 insertions(+), 29 deletions(-) diff --git a/src/daft-plan/src/logical_optimization/optimizer.rs b/src/daft-plan/src/logical_optimization/optimizer.rs index fd68fe5bc2..43bab4e30c 100644 --- a/src/daft-plan/src/logical_optimization/optimizer.rs +++ b/src/daft-plan/src/logical_optimization/optimizer.rs @@ -8,7 +8,7 @@ use super::{ logical_plan_tracker::LogicalPlanTracker, rules::{ ApplyOrder, DropRepartition, OptimizerRule, PushDownFilter, PushDownLimit, - PushDownProjection, Transformed, + PushDownProjection, SplitActorPoolProjects, Transformed, }, }; use common_treenode::DynTreeNode; @@ -18,12 +18,15 @@ use common_treenode::DynTreeNode; pub struct OptimizerConfig { // Default maximum number of optimization passes the optimizer will make over a fixed-point RuleBatch. pub default_max_optimizer_passes: usize, + // Feature flag for enabling creating ActorPoolProject nodes during plan optimization + pub enable_actor_pool_projections: bool, } impl OptimizerConfig { - fn new(max_optimizer_passes: usize) -> Self { + fn new(max_optimizer_passes: usize, enable_actor_pool_projections: bool) -> Self { OptimizerConfig { default_max_optimizer_passes: max_optimizer_passes, + enable_actor_pool_projections, } } } @@ -31,7 +34,7 @@ impl OptimizerConfig { impl Default for OptimizerConfig { fn default() -> Self { // Default to a max of 5 optimizer passes for a given batch. - OptimizerConfig::new(5) + OptimizerConfig::new(5, false) } } @@ -129,30 +132,44 @@ pub struct Optimizer { impl Optimizer { pub fn new(config: OptimizerConfig) -> Self { - // Default rule batches. - let rule_batches: Vec = vec![ - RuleBatch::new( + let mut rule_batches = Vec::new(); + + // --- Split ActorPoolProjection nodes from Project nodes --- + // This is feature-flagged behind DAFT_ENABLE_ACTOR_POOL_PROJECTIONS=1 + if config.enable_actor_pool_projections { + rule_batches.push(RuleBatch::new( vec![ - Box::new(DropRepartition::new()), - Box::new(PushDownFilter::new()), + Box::new(PushDownProjection::new()), + Box::new(SplitActorPoolProjects::new()), Box::new(PushDownProjection::new()), ], - // Use a fixed-point policy for the pushdown rules: PushDownProjection can produce a Filter node - // at the current node, which would require another batch application in order to have a chance to push - // that Filter node through upstream nodes. - // TODO(Clark): Refine this fixed-point policy. - RuleExecutionStrategy::FixedPoint(Some(3)), - ), - RuleBatch::new( - vec![ - // This needs to be separate from PushDownProjection because otherwise the limit and - // projection just keep swapping places, preventing optimization - // (see https://github.com/Eventual-Inc/Daft/issues/2616) - Box::new(PushDownLimit::new()), - ], - RuleExecutionStrategy::FixedPoint(Some(3)), - ), - ]; + RuleExecutionStrategy::Once, + )); + } + + // --- Bulk of our rules --- + rule_batches.push(RuleBatch::new( + vec![ + Box::new(DropRepartition::new()), + Box::new(PushDownFilter::new()), + Box::new(PushDownProjection::new()), + ], + // Use a fixed-point policy for the pushdown rules: PushDownProjection can produce a Filter node + // at the current node, which would require another batch application in order to have a chance to push + // that Filter node through upstream nodes. + // TODO(Clark): Refine this fixed-point policy. + RuleExecutionStrategy::FixedPoint(Some(3)), + )); + + // --- Limit pushdowns --- + // This needs to be separate from PushDownProjection because otherwise the limit and + // projection just keep swapping places, preventing optimization + // (see https://github.com/Eventual-Inc/Daft/issues/2616) + rule_batches.push(RuleBatch::new( + vec![Box::new(PushDownLimit::new())], + RuleExecutionStrategy::FixedPoint(Some(3)), + )); + Self::with_rule_batches(rule_batches, config) } @@ -344,7 +361,7 @@ mod tests { vec![Box::new(NoOp::new())], RuleExecutionStrategy::Once, )], - OptimizerConfig::new(5), + OptimizerConfig::new(5, false), ); let plan: Arc = dummy_scan_node(dummy_scan_operator(vec![Field::new("a", DataType::Int64)])).build(); @@ -395,7 +412,7 @@ mod tests { vec![Box::new(RotateProjection::new(false))], RuleExecutionStrategy::FixedPoint(Some(20)), )], - OptimizerConfig::new(20), + OptimizerConfig::new(20, false), ); let proj_exprs = vec![ col("a").add(lit(1)), @@ -430,7 +447,7 @@ mod tests { vec![Box::new(RotateProjection::new(true))], RuleExecutionStrategy::FixedPoint(Some(20)), )], - OptimizerConfig::new(20), + OptimizerConfig::new(20, false), ); let proj_exprs = vec![ col("a").add(lit(1)), @@ -481,7 +498,7 @@ mod tests { RuleExecutionStrategy::Once, ), ], - OptimizerConfig::new(20), + OptimizerConfig::new(20, false), ); let proj_exprs = vec![ col("a").add(lit(1)), diff --git a/src/daft-plan/src/logical_optimization/rules/mod.rs b/src/daft-plan/src/logical_optimization/rules/mod.rs index 3854e2f503..fc137589c5 100644 --- a/src/daft-plan/src/logical_optimization/rules/mod.rs +++ b/src/daft-plan/src/logical_optimization/rules/mod.rs @@ -10,3 +10,4 @@ pub use push_down_filter::PushDownFilter; pub use push_down_limit::PushDownLimit; pub use push_down_projection::PushDownProjection; pub use rule::{ApplyOrder, OptimizerRule, Transformed}; +pub use split_actor_pool_projects::SplitActorPoolProjects; diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index ba65678540..62c0e5827a 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -501,7 +501,6 @@ mod tests { ) } - #[cfg(not(feature = "python"))] fn create_stateful_udf(inputs: Vec) -> ExprRef { Expr::Function { func: FunctionExpr::Python(PythonUDF::Stateful(StatefulPythonUDF { From cd3c4866b8819991e9c65e46eeb685293495e575 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Mon, 19 Aug 2024 09:46:33 -0500 Subject: [PATCH 14/18] Add ASCII art documentation for clarity --- .../rules/split_actor_pool_projects.rs | 132 ++++++++++++++---- 1 file changed, 103 insertions(+), 29 deletions(-) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 62c0e5827a..7d0a47a76c 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -28,29 +28,98 @@ impl SplitActorPoolProjects { } } -/// Implement SplitActorPoolProjects as an OptimizerRule which will: +/// Implement SplitActorPoolProjects as an OptimizerRule +/// * Splits PROJECT nodes into chains of (PROJECT -> ...ACTOR_POOL_PROJECTS -> PROJECT) ... +/// * Resultant PROJECT nodes will never contain any StatefulUDF expressions +/// * Each ACTOR_POOL_PROJECT node only contains a single StatefulUDF expression /// -/// 1. Go top-down from the root of the LogicalPlan -/// 2. Whenever it sees a Project with StatefulUDF(s), it will split it like so: -/// Project_recursive (optional) -> Project_stateless -> ActorPoolProject(s)... -> Project_final -/// 3. Then it recurses on `Project_recursive` until there is no more `Project_recursive` to split anymore +/// Given a projection with 3 expressions that look like the following: /// -/// Invariants: -/// * `Project_recursive` definitely contains at least 1 stateful UDF, and hence need to be recursively split. If it is not constructed, then this is the base case. -/// * `Project_stateless` contains: [...stateless_projections, ...passthrough_columns_as_colexprs] -/// * Subsequent `ActorPoolProject(s)` contain: [Single StatefulUDF, ...passthrough_columns_as_colexprs] -/// * `Project_final` contains only Expr::Columns, and has the same column names (and column ordering) as the original Projection -/// * At the end of splitting, all Project nodes will never contain a StatefulUDF, and all ActorPoolProject nodes will contain one-and-only-one StatefulUDF +/// ┌─────────────────────────────────────────────── PROJECTION ────┐ +/// │ │ +/// │ ┌─────┐ ┌─────┐ ┌─────┐ │ +/// │ │ E1 │ │ E2 │ │ E3 │ │ +/// │ │ │ │ │ │ │ │ +/// │ StatefulUDF│ Stateless│ Stateless│ │ +/// │ └──┬──┘ └─┬┬──┘ └──┬──┘ │ +/// │ │ ┌──┘└──┐ │ │ +/// │ ┌──▼──┐ ┌───▼─┐ ┌─▼───┐ ┌──▼────┐ │ +/// │ │ E1a │ │ E2a │ │ E2b │ │col(E3)│ │ +/// │ │ │ │ │ │ │ └───────┘ │ +/// │ Any │ StatefulUDF│ │ Stateless │ +/// │ └─────┘ └─────┘ └─────┘ │ +/// │ │ +/// └───────────────────────────────────────────────────────────────┘ /// -/// How splitting is performed on a given Project: -/// 1. For every expression in the Project, "skim off the top" -/// * If the root expression is a StatefulUDF, truncate all of its children, alias them, and then add them to `Project_recursive` -/// * If the root expression is not a StatefulUDF, truncate any StatefulUDF children, alias them, and add them to `Project_recursive` -/// 2. Recursively perform splitting on `Project_recursive` -/// 3. Now for the current truncated expressions, split them into stateless vs stateful expressions: -/// * All stateless expressions go into a single `Project_stateless` node -/// * For each stateful expression, they go into their own dedicated `ActorPoolProject` node -/// * The final `Project_final` node is naively constructed using the names of the original Project +/// We will attempt to split this recursively into "stages". We split a given projection by truncating each expression as follows: +/// +/// 1. (See E1 -> E1') Expressions with (aliased) StatefulUDFs as root nodes have all their children truncated +/// 2. (See E2 -> E2') Expressions with children StatefulUDFs have each child StatefulUDF truncated +/// 3. (See E3) Expressions without any StatefulUDFs at all are not modified +/// +/// The truncated children as well as any required `col` references are collected into a new set of [`remaining`] +/// expressions. The new [`truncated_exprs`] make up current stage, and the [`remaining`] exprs represent the projections +/// from prior stages that will need to be recursively split into more stages. +/// +/// ┌───────────────────────────────────────────────────────────SPLIT: split_projection() +/// │ │ +/// │ TruncateRootStatefulUDF TruncateAnyStatefulUDFChildren No-Op │ +/// │ ======================= ============================== ===== │ +/// │ ┌─────┐ ┌─────┐ ┌─────┐ │ +/// │ │ E1' │ │ E2' │ │ E3 │ │ +/// │ StatefulUDF│ Stateless│ Stateless│ │ +/// │ └───┬─┘ └─┬┬──┘ └──┬──┘ │ +/// │ │ ┌───┘└───┐ │ │ +/// │ *--- ▼---* *--- ▼---* ┌──▼──┐ ┌──▼────┐ │ +/// │ / col(x) / / col(y) / │ E2b │ │col(E3)│ │ +/// │ *--------* *--------* │ Stateless └───────┘ │ +/// │ └─────┘ │ +/// │ │ +/// │ [`truncated_exprs`] │ +/// ├─- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -─┤ +/// │ [`remaining`] │ +/// │ │ +/// │ *----------* *----------* ┌────────┐ ┌───────┐ │ +/// │ / alias(y) / / alias(y) / │col(E2b)│ │col(E3)│ │ +/// │ *-----│----* *-------│--* └────────┘ └───────┘ │ +/// │ ┌──▼──┐ ┌───▼─┐ │ +/// │ │ E1a │ │ E2a │ │ +/// │ │ │ │ │ │ +/// │ Any │ StatefulUDF│ │ +/// │ └─────┘ └─────┘ │ +/// └─────────────────────────────────────────────────────────────────────────────────┘ +/// +/// We then perform recursion on [`remaining`] until [`remaining`] becomes only `col` references, +/// as this would indicate that no further work needs to be performed by the projection. +/// +/// ┌───────────────────────────── Recursively split [`remaining`] ─┐ +/// │ │ +/// │ *----------* *----------* ┌────────┐ ┌───────┐ │ +/// │ / alias(y) / / alias(y) / │col(E2b)│ │col(E3)│ │ +/// │ *-----│----* *-------│--* └────────┘ └───────┘ │ +/// │ ┌──▼──┐ ┌───▼─┐ │ +/// │ │ E1a │ │ E2a │ │ +/// │ │ │ │ │ │ +/// │ Any │ StatefulUDF│ │ +/// │ └─────┘ └─────┘ │ +/// │ │ +/// └─┬─────────────────────────────────────────────────────────────┘ +/// | +/// │ Then, we link this up with our current stage, which will be resolved into a chain of logical nodes: +/// | * The first PROJECT contains all the stateless expressions (E2' and E3) and passes through all required columns. +/// | * Subsequent ACTOR_POOL_PROJECT nodes each contain only one StatefulUDF, and passes through all required columns. +/// | * The last PROJECT contains only `col` references, and correctly orders/prunes columns according to the original projection. +/// | +/// │ +/// │ [`truncated_exprs`] resolved as a chain of logical nodes: +/// │ ┌─────────────────┐ ┌────────────────────┐ ┌───────────┐ +/// │ │ PROJECT │ │ ACTOR_POOL_PROJECT │ │ PROJECT │ +/// │ │ ------- │ │ ------------------ │ ...ACTOR_PPs, │ ----------│ +/// └───►│ E2', E3, col(*) ├─►│ E1', col(*) ├─ 1 per each ─►│ col("e1") │ +/// │ │ │ │ StatefulUDF │ col("e2") │ +/// │ │ │ │ │ col("e3") │ +/// │ │ │ │ │ │ +/// └─────────────────┘ └────────────────────┘ └───────────┘ impl OptimizerRule for SplitActorPoolProjects { fn apply_order(&self) -> ApplyOrder { ApplyOrder::TopDown @@ -101,6 +170,13 @@ impl TruncateAnyStatefulUDFChildren { } } +/// Performs truncation of Expressions which are assumed to be rooted at a StatefulUDF expression +/// +/// This TreeNodeRewriter will truncate all children of the StatefulUDF expression like so: +/// +/// 1. Add an `alias(...)` to the child and push it onto `self.new_children` +/// 2. Replace the child with a `col("...")` +/// 3. Add any `col("...")` leaf nodes to `self.new_children` (only once per unique column name) impl TreeNodeRewriter for TruncateRootStatefulUDF { type Node = ExprRef; @@ -148,6 +224,13 @@ impl TreeNodeRewriter for TruncateRootStatefulUDF { } } +/// Performs truncation of Expressions which are assumed to have some subtrees which contain StatefulUDF expressions +/// +/// This TreeNodeRewriter will truncate StatefulUDF expressions from the tree like so: +/// +/// 1. Add an `alias(...)` to any StatefulUDF child and push it onto `self.new_children` +/// 2. Replace the child with a `col("...")` +/// 3. Add any `col("...")` leaf nodes to `self.new_children` (only once per unique column name) impl TreeNodeRewriter for TruncateAnyStatefulUDFChildren { type Node = ExprRef; @@ -224,15 +307,6 @@ impl TreeNodeRewriter for TruncateAnyStatefulUDFChildren { } /// Splits a projection down into two sets of new projections: (truncated_exprs, new_children) -/// -/// `truncated_exprs` are the newly truncated expressions from `projection`. This has the same -/// length as `projection`, and also the same names as each Expr in `projection`. However, their -/// children are (potentially) truncated and replaced with Expr::Column nodes, which refer to -/// Exprs in `new_children`. -/// -/// `new_children` are the new children of `truncated_exprs`. Every Expr::Column leaf node in -/// `truncated_exprs` should have a corresponding expression in `new_children` with the appropriate -/// name. fn split_projection( projection: &[ExprRef], stage_idx: usize, From dc7e648731ce701443828bd0c95e755ac264b1f6 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Mon, 19 Aug 2024 09:59:04 -0500 Subject: [PATCH 15/18] Rebase on new changes --- .../logical_optimization/rules/split_actor_pool_projects.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 7d0a47a76c..00496e1087 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -417,10 +417,10 @@ fn try_optimize_project( .all(|e| matches!(e.as_ref(), Expr::Column(_))) { // Nothing remaining, we're done splitting and should wire the new node up with the child of the Project - plan.children()[0].clone() + projection.input.clone() } else { // Recursively run the rule on the new child Project - let new_project = Project::try_new(plan.children()[0].clone(), remaining)?; + let new_project = Project::try_new(projection.input.clone(), remaining)?; let new_child_project = LogicalPlan::Project(new_project.clone()).arced(); let optimized_child_plan = try_optimize_project(&new_project, new_child_project.clone(), recursive_count + 1)?; From 77ada3339ad167cf37984e28d86f93c4e5a254e1 Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Thu, 22 Aug 2024 17:16:00 -0700 Subject: [PATCH 16/18] Update src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs Co-authored-by: Desmond Cheong --- .../src/logical_optimization/rules/split_actor_pool_projects.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 00496e1087..18506e4711 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -80,7 +80,7 @@ impl SplitActorPoolProjects { /// │ [`remaining`] │ /// │ │ /// │ *----------* *----------* ┌────────┐ ┌───────┐ │ -/// │ / alias(y) / / alias(y) / │col(E2b)│ │col(E3)│ │ +/// │ / alias(x) / / alias(y) / │col(E2b)│ │col(E3)│ │ /// │ *-----│----* *-------│--* └────────┘ └───────┘ │ /// │ ┌──▼──┐ ┌───▼─┐ │ /// │ │ E1a │ │ E2a │ │ From 19fe229657cae4a214fa3f872a6db74ec5117fdc Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Thu, 22 Aug 2024 17:16:18 -0700 Subject: [PATCH 17/18] Update src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs Co-authored-by: Desmond Cheong --- .../src/logical_optimization/rules/split_actor_pool_projects.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 18506e4711..0d16fe79bd 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -95,7 +95,7 @@ impl SplitActorPoolProjects { /// ┌───────────────────────────── Recursively split [`remaining`] ─┐ /// │ │ /// │ *----------* *----------* ┌────────┐ ┌───────┐ │ -/// │ / alias(y) / / alias(y) / │col(E2b)│ │col(E3)│ │ +/// │ / alias(x) / / alias(y) / │col(E2b)│ │col(E3)│ │ /// │ *-----│----* *-------│--* └────────┘ └───────┘ │ /// │ ┌──▼──┐ ┌───▼─┐ │ /// │ │ E1a │ │ E2a │ │ From 1fe85eb44aa53d444156afabe9fe080739ccf184 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Fri, 23 Aug 2024 13:33:59 -0700 Subject: [PATCH 18/18] Add another unit test with stateless-(statefulchild, statelesschild) --- .../rules/split_actor_pool_projects.rs | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 0d16fe79bd..568c511519 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -1155,4 +1155,56 @@ mod tests { Ok(()) } + + #[test] + fn test_stateless_expr_with_only_some_stateful_children() -> DaftResult<()> { + let scan_op = dummy_scan_operator(vec![Field::new("a", daft_core::DataType::Int64)]); + let scan_plan = dummy_scan_node(scan_op); + + // (col("a") + col("a")) + foo(col("a")) + let stateful_project_expr = col("a") + .add(col("a")) + .add(create_stateful_udf(vec![col("a")])) + .alias("result"); + let project_plan = scan_plan + .select(vec![col("a"), stateful_project_expr])? + .build(); + + let intermediate_name_0 = "__TruncateAnyStatefulUDFChildren_0-1-0__"; + // let intermediate_name_1 = "__TruncateRootStatefulUDF_0-1-0__"; + let expected = scan_plan.build(); + let expected = LogicalPlan::Project(Project::try_new(expected, vec![col("a")])?).arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col("a"), + create_stateful_udf(vec![col("a")]).alias(intermediate_name_0), + ], + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![col("a"), col(intermediate_name_0)], + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new( + expected, + vec![ + col(intermediate_name_0), + col("a"), + col("a") + .add(col("a")) + .add(col(intermediate_name_0)) + .alias("result"), + ], + )?) + .arced(); + let expected = + LogicalPlan::Project(Project::try_new(expected, vec![col("a"), col("result")])?) + .arced(); + + assert_optimized_plan_eq(project_plan.clone(), expected.clone())?; + + Ok(()) + } }