From 772c9a25f36388c79744e2b350dfc88a108c858d Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Mon, 19 Aug 2024 09:46:33 -0500 Subject: [PATCH] Add ASCII art documentation for clarity --- .../rules/split_actor_pool_projects.rs | 132 ++++++++++++++---- 1 file changed, 103 insertions(+), 29 deletions(-) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 62c0e5827a..7d0a47a76c 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -28,29 +28,98 @@ impl SplitActorPoolProjects { } } -/// Implement SplitActorPoolProjects as an OptimizerRule which will: +/// Implement SplitActorPoolProjects as an OptimizerRule +/// * Splits PROJECT nodes into chains of (PROJECT -> ...ACTOR_POOL_PROJECTS -> PROJECT) ... +/// * Resultant PROJECT nodes will never contain any StatefulUDF expressions +/// * Each ACTOR_POOL_PROJECT node only contains a single StatefulUDF expression /// -/// 1. Go top-down from the root of the LogicalPlan -/// 2. Whenever it sees a Project with StatefulUDF(s), it will split it like so: -/// Project_recursive (optional) -> Project_stateless -> ActorPoolProject(s)... -> Project_final -/// 3. Then it recurses on `Project_recursive` until there is no more `Project_recursive` to split anymore +/// Given a projection with 3 expressions that look like the following: /// -/// Invariants: -/// * `Project_recursive` definitely contains at least 1 stateful UDF, and hence need to be recursively split. If it is not constructed, then this is the base case. -/// * `Project_stateless` contains: [...stateless_projections, ...passthrough_columns_as_colexprs] -/// * Subsequent `ActorPoolProject(s)` contain: [Single StatefulUDF, ...passthrough_columns_as_colexprs] -/// * `Project_final` contains only Expr::Columns, and has the same column names (and column ordering) as the original Projection -/// * At the end of splitting, all Project nodes will never contain a StatefulUDF, and all ActorPoolProject nodes will contain one-and-only-one StatefulUDF +/// ┌─────────────────────────────────────────────── PROJECTION ────┐ +/// │ │ +/// │ ┌─────┐ ┌─────┐ ┌─────┐ │ +/// │ │ E1 │ │ E2 │ │ E3 │ │ +/// │ │ │ │ │ │ │ │ +/// │ StatefulUDF│ Stateless│ Stateless│ │ +/// │ └──┬──┘ └─┬┬──┘ └──┬──┘ │ +/// │ │ ┌──┘└──┐ │ │ +/// │ ┌──▼──┐ ┌───▼─┐ ┌─▼───┐ ┌──▼────┐ │ +/// │ │ E1a │ │ E2a │ │ E2b │ │col(E3)│ │ +/// │ │ │ │ │ │ │ └───────┘ │ +/// │ Any │ StatefulUDF│ │ Stateless │ +/// │ └─────┘ └─────┘ └─────┘ │ +/// │ │ +/// └───────────────────────────────────────────────────────────────┘ /// -/// How splitting is performed on a given Project: -/// 1. For every expression in the Project, "skim off the top" -/// * If the root expression is a StatefulUDF, truncate all of its children, alias them, and then add them to `Project_recursive` -/// * If the root expression is not a StatefulUDF, truncate any StatefulUDF children, alias them, and add them to `Project_recursive` -/// 2. Recursively perform splitting on `Project_recursive` -/// 3. Now for the current truncated expressions, split them into stateless vs stateful expressions: -/// * All stateless expressions go into a single `Project_stateless` node -/// * For each stateful expression, they go into their own dedicated `ActorPoolProject` node -/// * The final `Project_final` node is naively constructed using the names of the original Project +/// We will attempt to split this recursively into "stages". We split a given projection by truncating each expression as follows: +/// +/// 1. (See E1 -> E1') Expressions with (aliased) StatefulUDFs as root nodes have all their children truncated +/// 2. (See E2 -> E2') Expressions with children StatefulUDFs have each child StatefulUDF truncated +/// 3. (See E3) Expressions without any StatefulUDFs at all are not modified +/// +/// The truncated children as well as any required `col` references are collected into a new set of [`remaining`] +/// expressions. The new [`truncated_exprs`] make up current stage, and the [`remaining`] exprs represent the projections +/// from prior stages that will need to be recursively split into more stages. +/// +/// ┌───────────────────────────────────────────────────────────SPLIT: split_projection() +/// │ │ +/// │ TruncateRootStatefulUDF TruncateAnyStatefulUDFChildren No-Op │ +/// │ ======================= ============================== ===== │ +/// │ ┌─────┐ ┌─────┐ ┌─────┐ │ +/// │ │ E1' │ │ E2' │ │ E3 │ │ +/// │ StatefulUDF│ Stateless│ Stateless│ │ +/// │ └───┬─┘ └─┬┬──┘ └──┬──┘ │ +/// │ │ ┌───┘└───┐ │ │ +/// │ *--- ▼---* *--- ▼---* ┌──▼──┐ ┌──▼────┐ │ +/// │ / col(x) / / col(y) / │ E2b │ │col(E3)│ │ +/// │ *--------* *--------* │ Stateless └───────┘ │ +/// │ └─────┘ │ +/// │ │ +/// │ [`truncated_exprs`] │ +/// ├─- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -─┤ +/// │ [`remaining`] │ +/// │ │ +/// │ *----------* *----------* ┌────────┐ ┌───────┐ │ +/// │ / alias(y) / / alias(y) / │col(E2b)│ │col(E3)│ │ +/// │ *-----│----* *-------│--* └────────┘ └───────┘ │ +/// │ ┌──▼──┐ ┌───▼─┐ │ +/// │ │ E1a │ │ E2a │ │ +/// │ │ │ │ │ │ +/// │ Any │ StatefulUDF│ │ +/// │ └─────┘ └─────┘ │ +/// └─────────────────────────────────────────────────────────────────────────────────┘ +/// +/// We then perform recursion on [`remaining`] until [`remaining`] becomes only `col` references, +/// as this would indicate that no further work needs to be performed by the projection. +/// +/// ┌───────────────────────────── Recursively split [`remaining`] ─┐ +/// │ │ +/// │ *----------* *----------* ┌────────┐ ┌───────┐ │ +/// │ / alias(y) / / alias(y) / │col(E2b)│ │col(E3)│ │ +/// │ *-----│----* *-------│--* └────────┘ └───────┘ │ +/// │ ┌──▼──┐ ┌───▼─┐ │ +/// │ │ E1a │ │ E2a │ │ +/// │ │ │ │ │ │ +/// │ Any │ StatefulUDF│ │ +/// │ └─────┘ └─────┘ │ +/// │ │ +/// └─┬─────────────────────────────────────────────────────────────┘ +/// | +/// │ Then, we link this up with our current stage, which will be resolved into a chain of logical nodes: +/// | * The first PROJECT contains all the stateless expressions (E2' and E3) and passes through all required columns. +/// | * Subsequent ACTOR_POOL_PROJECT nodes each contain only one StatefulUDF, and passes through all required columns. +/// | * The last PROJECT contains only `col` references, and correctly orders/prunes columns according to the original projection. +/// | +/// │ +/// │ [`truncated_exprs`] resolved as a chain of logical nodes: +/// │ ┌─────────────────┐ ┌────────────────────┐ ┌───────────┐ +/// │ │ PROJECT │ │ ACTOR_POOL_PROJECT │ │ PROJECT │ +/// │ │ ------- │ │ ------------------ │ ...ACTOR_PPs, │ ----------│ +/// └───►│ E2', E3, col(*) ├─►│ E1', col(*) ├─ 1 per each ─►│ col("e1") │ +/// │ │ │ │ StatefulUDF │ col("e2") │ +/// │ │ │ │ │ col("e3") │ +/// │ │ │ │ │ │ +/// └─────────────────┘ └────────────────────┘ └───────────┘ impl OptimizerRule for SplitActorPoolProjects { fn apply_order(&self) -> ApplyOrder { ApplyOrder::TopDown @@ -101,6 +170,13 @@ impl TruncateAnyStatefulUDFChildren { } } +/// Performs truncation of Expressions which are assumed to be rooted at a StatefulUDF expression +/// +/// This TreeNodeRewriter will truncate all children of the StatefulUDF expression like so: +/// +/// 1. Add an `alias(...)` to the child and push it onto `self.new_children` +/// 2. Replace the child with a `col("...")` +/// 3. Add any `col("...")` leaf nodes to `self.new_children` (only once per unique column name) impl TreeNodeRewriter for TruncateRootStatefulUDF { type Node = ExprRef; @@ -148,6 +224,13 @@ impl TreeNodeRewriter for TruncateRootStatefulUDF { } } +/// Performs truncation of Expressions which are assumed to have some subtrees which contain StatefulUDF expressions +/// +/// This TreeNodeRewriter will truncate StatefulUDF expressions from the tree like so: +/// +/// 1. Add an `alias(...)` to any StatefulUDF child and push it onto `self.new_children` +/// 2. Replace the child with a `col("...")` +/// 3. Add any `col("...")` leaf nodes to `self.new_children` (only once per unique column name) impl TreeNodeRewriter for TruncateAnyStatefulUDFChildren { type Node = ExprRef; @@ -224,15 +307,6 @@ impl TreeNodeRewriter for TruncateAnyStatefulUDFChildren { } /// Splits a projection down into two sets of new projections: (truncated_exprs, new_children) -/// -/// `truncated_exprs` are the newly truncated expressions from `projection`. This has the same -/// length as `projection`, and also the same names as each Expr in `projection`. However, their -/// children are (potentially) truncated and replaced with Expr::Column nodes, which refer to -/// Exprs in `new_children`. -/// -/// `new_children` are the new children of `truncated_exprs`. Every Expr::Column leaf node in -/// `truncated_exprs` should have a corresponding expression in `new_children` with the appropriate -/// name. fn split_projection( projection: &[ExprRef], stage_idx: usize,