Skip to content

Commit

Permalink
Add ASCII art documentation for clarity
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Aug 19, 2024
1 parent 1fb4fb0 commit 772c9a2
Showing 1 changed file with 103 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,98 @@ impl SplitActorPoolProjects {
}
}

/// Implement SplitActorPoolProjects as an OptimizerRule which will:
/// Implement SplitActorPoolProjects as an OptimizerRule
/// * Splits PROJECT nodes into chains of (PROJECT -> ...ACTOR_POOL_PROJECTS -> PROJECT) ...
/// * Resultant PROJECT nodes will never contain any StatefulUDF expressions
/// * Each ACTOR_POOL_PROJECT node only contains a single StatefulUDF expression
///
/// 1. Go top-down from the root of the LogicalPlan
/// 2. Whenever it sees a Project with StatefulUDF(s), it will split it like so:
/// Project_recursive (optional) -> Project_stateless -> ActorPoolProject(s)... -> Project_final
/// 3. Then it recurses on `Project_recursive` until there is no more `Project_recursive` to split anymore
/// Given a projection with 3 expressions that look like the following:
///
/// Invariants:
/// * `Project_recursive` definitely contains at least 1 stateful UDF, and hence need to be recursively split. If it is not constructed, then this is the base case.
/// * `Project_stateless` contains: [...stateless_projections, ...passthrough_columns_as_colexprs]
/// * Subsequent `ActorPoolProject(s)` contain: [Single StatefulUDF, ...passthrough_columns_as_colexprs]
/// * `Project_final` contains only Expr::Columns, and has the same column names (and column ordering) as the original Projection
/// * At the end of splitting, all Project nodes will never contain a StatefulUDF, and all ActorPoolProject nodes will contain one-and-only-one StatefulUDF
/// ┌─────────────────────────────────────────────── PROJECTION ────┐
/// │ │
/// │ ┌─────┐ ┌─────┐ ┌─────┐ │
/// │ │ E1 │ │ E2 │ │ E3 │ │
/// │ │ │ │ │ │ │ │
/// │ StatefulUDF│ Stateless│ Stateless│ │
/// │ └──┬──┘ └─┬┬──┘ └──┬──┘ │
/// │ │ ┌──┘└──┐ │ │
/// │ ┌──▼──┐ ┌───▼─┐ ┌─▼───┐ ┌──▼────┐ │
/// │ │ E1a │ │ E2a │ │ E2b │ │col(E3)│ │
/// │ │ │ │ │ │ │ └───────┘ │
/// │ Any │ StatefulUDF│ │ Stateless │
/// │ └─────┘ └─────┘ └─────┘ │
/// │ │
/// └───────────────────────────────────────────────────────────────┘
///
/// How splitting is performed on a given Project:
/// 1. For every expression in the Project, "skim off the top"
/// * If the root expression is a StatefulUDF, truncate all of its children, alias them, and then add them to `Project_recursive`
/// * If the root expression is not a StatefulUDF, truncate any StatefulUDF children, alias them, and add them to `Project_recursive`
/// 2. Recursively perform splitting on `Project_recursive`
/// 3. Now for the current truncated expressions, split them into stateless vs stateful expressions:
/// * All stateless expressions go into a single `Project_stateless` node
/// * For each stateful expression, they go into their own dedicated `ActorPoolProject` node
/// * The final `Project_final` node is naively constructed using the names of the original Project
/// We will attempt to split this recursively into "stages". We split a given projection by truncating each expression as follows:
///
/// 1. (See E1 -> E1') Expressions with (aliased) StatefulUDFs as root nodes have all their children truncated
/// 2. (See E2 -> E2') Expressions with children StatefulUDFs have each child StatefulUDF truncated
/// 3. (See E3) Expressions without any StatefulUDFs at all are not modified
///
/// The truncated children as well as any required `col` references are collected into a new set of [`remaining`]
/// expressions. The new [`truncated_exprs`] make up current stage, and the [`remaining`] exprs represent the projections
/// from prior stages that will need to be recursively split into more stages.
///
/// ┌───────────────────────────────────────────────────────────SPLIT: split_projection()
/// │ │
/// │ TruncateRootStatefulUDF TruncateAnyStatefulUDFChildren No-Op │
/// │ ======================= ============================== ===== │
/// │ ┌─────┐ ┌─────┐ ┌─────┐ │
/// │ │ E1' │ │ E2' │ │ E3 │ │
/// │ StatefulUDF│ Stateless│ Stateless│ │
/// │ └───┬─┘ └─┬┬──┘ └──┬──┘ │
/// │ │ ┌───┘└───┐ │ │
/// │ *--- ▼---* *--- ▼---* ┌──▼──┐ ┌──▼────┐ │
/// │ / col(x) / / col(y) / │ E2b │ │col(E3)│ │
/// │ *--------* *--------* │ Stateless └───────┘ │
/// │ └─────┘ │
/// │ │
/// │ [`truncated_exprs`] │
/// ├─- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -─┤
/// │ [`remaining`] │
/// │ │
/// │ *----------* *----------* ┌────────┐ ┌───────┐ │
/// │ / alias(y) / / alias(y) / │col(E2b)│ │col(E3)│ │
/// │ *-----│----* *-------│--* └────────┘ └───────┘ │
/// │ ┌──▼──┐ ┌───▼─┐ │
/// │ │ E1a │ │ E2a │ │
/// │ │ │ │ │ │
/// │ Any │ StatefulUDF│ │
/// │ └─────┘ └─────┘ │
/// └─────────────────────────────────────────────────────────────────────────────────┘
///
/// We then perform recursion on [`remaining`] until [`remaining`] becomes only `col` references,
/// as this would indicate that no further work needs to be performed by the projection.
///
/// ┌───────────────────────────── Recursively split [`remaining`] ─┐
/// │ │
/// │ *----------* *----------* ┌────────┐ ┌───────┐ │
/// │ / alias(y) / / alias(y) / │col(E2b)│ │col(E3)│ │
/// │ *-----│----* *-------│--* └────────┘ └───────┘ │
/// │ ┌──▼──┐ ┌───▼─┐ │
/// │ │ E1a │ │ E2a │ │
/// │ │ │ │ │ │
/// │ Any │ StatefulUDF│ │
/// │ └─────┘ └─────┘ │
/// │ │
/// └─┬─────────────────────────────────────────────────────────────┘
/// |
/// │ Then, we link this up with our current stage, which will be resolved into a chain of logical nodes:
/// | * The first PROJECT contains all the stateless expressions (E2' and E3) and passes through all required columns.
/// | * Subsequent ACTOR_POOL_PROJECT nodes each contain only one StatefulUDF, and passes through all required columns.
/// | * The last PROJECT contains only `col` references, and correctly orders/prunes columns according to the original projection.
/// |
/// │
/// │ [`truncated_exprs`] resolved as a chain of logical nodes:
/// │ ┌─────────────────┐ ┌────────────────────┐ ┌───────────┐
/// │ │ PROJECT │ │ ACTOR_POOL_PROJECT │ │ PROJECT │
/// │ │ ------- │ │ ------------------ │ ...ACTOR_PPs, │ ----------│
/// └───►│ E2', E3, col(*) ├─►│ E1', col(*) ├─ 1 per each ─►│ col("e1") │
/// │ │ │ │ StatefulUDF │ col("e2") │
/// │ │ │ │ │ col("e3") │
/// │ │ │ │ │ │
/// └─────────────────┘ └────────────────────┘ └───────────┘
impl OptimizerRule for SplitActorPoolProjects {
fn apply_order(&self) -> ApplyOrder {
ApplyOrder::TopDown
Expand Down Expand Up @@ -101,6 +170,13 @@ impl TruncateAnyStatefulUDFChildren {
}
}

/// Performs truncation of Expressions which are assumed to be rooted at a StatefulUDF expression
///
/// This TreeNodeRewriter will truncate all children of the StatefulUDF expression like so:
///
/// 1. Add an `alias(...)` to the child and push it onto `self.new_children`
/// 2. Replace the child with a `col("...")`
/// 3. Add any `col("...")` leaf nodes to `self.new_children` (only once per unique column name)
impl TreeNodeRewriter for TruncateRootStatefulUDF {
type Node = ExprRef;

Expand Down Expand Up @@ -148,6 +224,13 @@ impl TreeNodeRewriter for TruncateRootStatefulUDF {
}
}

/// Performs truncation of Expressions which are assumed to have some subtrees which contain StatefulUDF expressions
///
/// This TreeNodeRewriter will truncate StatefulUDF expressions from the tree like so:
///
/// 1. Add an `alias(...)` to any StatefulUDF child and push it onto `self.new_children`
/// 2. Replace the child with a `col("...")`
/// 3. Add any `col("...")` leaf nodes to `self.new_children` (only once per unique column name)
impl TreeNodeRewriter for TruncateAnyStatefulUDFChildren {
type Node = ExprRef;

Expand Down Expand Up @@ -224,15 +307,6 @@ impl TreeNodeRewriter for TruncateAnyStatefulUDFChildren {
}

/// Splits a projection down into two sets of new projections: (truncated_exprs, new_children)
///
/// `truncated_exprs` are the newly truncated expressions from `projection`. This has the same
/// length as `projection`, and also the same names as each Expr in `projection`. However, their
/// children are (potentially) truncated and replaced with Expr::Column nodes, which refer to
/// Exprs in `new_children`.
///
/// `new_children` are the new children of `truncated_exprs`. Every Expr::Column leaf node in
/// `truncated_exprs` should have a corresponding expression in `new_children` with the appropriate
/// name.
fn split_projection(
projection: &[ExprRef],
stage_idx: usize,
Expand Down

0 comments on commit 772c9a2

Please sign in to comment.