Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Fix projection pushdowns in actor pool project #2680

Merged
merged 4 commits into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion src/daft-plan/src/logical_ops/actor_pool_project.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use common_error::DaftError;
use common_resource_request::ResourceRequest;
use common_treenode::TreeNode;
use daft_core::schema::{Schema, SchemaRef};
Expand All @@ -14,7 +15,7 @@ use itertools::Itertools;
use snafu::ResultExt;

use crate::{
logical_plan::{CreationSnafu, Result},
logical_plan::{CreationSnafu, Error, Result},
LogicalPlan,
};

Expand All @@ -30,7 +31,33 @@ impl ActorPoolProject {
pub(crate) fn try_new(input: Arc<LogicalPlan>, projection: Vec<ExprRef>) -> Result<Self> {
let (projection, fields) =
resolve_exprs(projection, input.schema().as_ref()).context(CreationSnafu)?;

let num_stateful_udf_exprs: usize = projection
.iter()
.map(|expr| {
let mut num_stateful_udfs = 0;
expr.apply(|e| {
if matches!(
e.as_ref(),
Expr::Function {
func: FunctionExpr::Python(PythonUDF::Stateful(_)),
..
}
) {
num_stateful_udfs += 1;
}
Ok(common_treenode::TreeNodeRecursion::Continue)
})
.unwrap();
num_stateful_udfs
})
.sum();
if !num_stateful_udf_exprs == 1 {
return Err(Error::CreationError { source: DaftError::InternalError(format!("Expected ActorPoolProject to have exactly 1 stateful UDF expression but found: {num_stateful_udf_exprs}")) });
}

let projected_schema = Schema::new(fields).context(CreationSnafu)?.into();

Ok(ActorPoolProject {
input,
projection,
Expand Down
203 changes: 199 additions & 4 deletions src/daft-plan/src/logical_optimization/rules/push_down_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@ use common_error::DaftResult;

use common_treenode::TreeNode;
use daft_core::{schema::Schema, JoinType};
use daft_dsl::{col, optimization::replace_columns_with_expressions, Expr, ExprRef};
use daft_dsl::{
col,
functions::{python::PythonUDF, FunctionExpr},
optimization::{get_required_columns, replace_columns_with_expressions, requires_computation},
Expr, ExprRef,
};
use indexmap::IndexSet;
use itertools::Itertools;

use crate::{
logical_ops::{ActorPoolProject, Aggregate, Join, Pivot, Project, Source},
Expand Down Expand Up @@ -230,6 +236,81 @@ impl PushDownProjection {
}
}
LogicalPlan::ActorPoolProject(upstream_actor_pool_projection) => {
// Attempt to merge the current Projection into the upstream ActorPoolProject
// if there aren't any actual computations being performed in the Projection, and
// if each upstream column is used only once (no common subtrees)
if projection
.projection
.iter()
.all(|e| !requires_computation(e))
{
// Only perform this optimization if all required column names are distinct
let required_column_names = projection
.projection
.iter()
.flat_map(get_required_columns)
.collect_vec();
let mut all_required_column_names_distinct = true;
let mut distinct_required_column_names = IndexSet::new();
for required_col_name in required_column_names {
if distinct_required_column_names.contains(&required_col_name) {
all_required_column_names_distinct = false;
break;
} else {
distinct_required_column_names.insert(required_col_name);
}
}

if all_required_column_names_distinct {
let actor_pool_projection_map = upstream_actor_pool_projection
.projection
.iter()
.map(|e| (e.name().to_string(), e.clone()))
.collect::<HashMap<String, ExprRef>>();
let new_actor_pool_projections = projection
.projection
.iter()
.map(|p| {
replace_columns_with_expressions(
p.clone(),
&actor_pool_projection_map,
)
})
.collect_vec();

// Construct either a new ActorPoolProject or Project, depending on whether the pruned projection still has StatefulUDFs
let new_plan = if new_actor_pool_projections.iter().any(|e| {
e.exists(|e| {
matches!(
e.as_ref(),
Expr::Function {
func: FunctionExpr::Python(PythonUDF::Stateful(_)),
..
}
)
})
}) {
LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
upstream_actor_pool_projection.input.clone(),
new_actor_pool_projections,
)?)
.arced()
} else {
LogicalPlan::Project(Project::try_new(
upstream_actor_pool_projection.input.clone(),
new_actor_pool_projections,
)?)
.arced()
};

// Retry optimization now that the node is different.
let new_plan = self
.try_optimize(new_plan.clone())?
.or(Transformed::Yes(new_plan));
return Ok(new_plan);
}
}

// Prune columns from the child ActorPoolProjection that are not used in this projection.
let required_columns = &plan.required_columns()[0];
if required_columns.len() < upstream_schema.names().len() {
Expand Down Expand Up @@ -437,6 +518,34 @@ impl PushDownProjection {
}
}

fn try_optimize_actor_pool_project(
&self,
actor_pool_project: &ActorPoolProject,
plan: Arc<LogicalPlan>,
) -> DaftResult<Transformed<Arc<LogicalPlan>>> {
// If this ActorPoolPorject prunes columns from its upstream,
// then explicitly create a projection to do so.
let upstream_plan = &actor_pool_project.input;
let upstream_schema = upstream_plan.schema();

let actor_pool_project_required_cols = &plan.required_columns()[0];
if actor_pool_project_required_cols.len() < upstream_schema.names().len() {
let new_subprojection: LogicalPlan = {
let pushdown_column_exprs = actor_pool_project_required_cols
.iter()
.map(|s| col(s.as_str()))
.collect::<Vec<_>>();

Project::try_new(upstream_plan.clone(), pushdown_column_exprs)?.into()
};

let new_actor_pool_project = plan.with_new_children(&[new_subprojection.into()]);
Ok(Transformed::Yes(new_actor_pool_project.into()))
} else {
Ok(Transformed::No(plan))
}
}

fn try_optimize_aggregation(
&self,
aggregation: &Aggregate,
Expand Down Expand Up @@ -543,6 +652,10 @@ impl OptimizerRule for PushDownProjection {
fn try_optimize(&self, plan: Arc<LogicalPlan>) -> DaftResult<Transformed<Arc<LogicalPlan>>> {
match plan.as_ref() {
LogicalPlan::Project(projection) => self.try_optimize_project(projection, plan.clone()),
// ActorPoolProjects also do column projection
LogicalPlan::ActorPoolProject(actor_pool_project) => {
self.try_optimize_actor_pool_project(actor_pool_project, plan.clone())
}
// Aggregations also do column projection
LogicalPlan::Aggregate(aggregation) => {
self.try_optimize_aggregation(aggregation, plan.clone())
Expand Down Expand Up @@ -810,7 +923,7 @@ mod tests {
Field::new("b", DataType::Boolean),
Field::new("c", DataType::Int64),
]);
let scan_node = dummy_scan_node(scan_op).build();
let scan_node = dummy_scan_node(scan_op.clone());
let mock_stateful_udf = Expr::Function {
func: FunctionExpr::Python(PythonUDF::Stateful(StatefulPythonUDF {
name: Arc::new("my-udf".to_string()),
Expand All @@ -826,7 +939,7 @@ mod tests {

// Select the `udf_results` column, so the ActorPoolProject should apply column pruning to the other columns
let actor_pool_project = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
scan_node.clone(),
scan_node.build(),
vec![col("a"), col("b"), mock_stateful_udf.alias("udf_results")],
)?)
.arced();
Expand All @@ -837,7 +950,11 @@ mod tests {
.arced();

let expected_actor_pool_project = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
scan_node.clone(),
dummy_scan_node_with_pushdowns(
scan_op,
Pushdowns::default().with_columns(Some(Arc::new(vec!["c".to_string()]))),
)
.build(),
vec![mock_stateful_udf.alias("udf_results")],
)?)
.arced();
Expand All @@ -846,6 +963,84 @@ mod tests {
Ok(())
}

/// Projection<-ActorPoolProject<-ActorPoolProject prunes columns from both ActorPoolProjects
#[test]
fn test_projection_pushdown_into_double_actorpoolproject() -> DaftResult<()> {
use crate::logical_ops::ActorPoolProject;
use crate::logical_ops::Project;
use common_resource_request::ResourceRequest;
use daft_dsl::functions::python::{PythonUDF, StatefulPythonUDF};
use daft_dsl::functions::FunctionExpr;
use daft_dsl::Expr;

let scan_op = dummy_scan_operator(vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Boolean),
Field::new("c", DataType::Int64),
]);
let scan_node = dummy_scan_node(scan_op.clone()).build();
let mock_stateful_udf = Expr::Function {
func: FunctionExpr::Python(PythonUDF::Stateful(StatefulPythonUDF {
name: Arc::new("my-udf".to_string()),
num_expressions: 1,
return_dtype: DataType::Utf8,
resource_request: Some(ResourceRequest::default_cpu()),
batch_size: None,
concurrency: Some(8),
})),
inputs: vec![col("a")],
}
.arced();

// Select the `udf_results` column, so the ActorPoolProject should apply column pruning to the other columns
let plan = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
scan_node.clone(),
vec![col("a"), col("b"), mock_stateful_udf.alias("udf_results_0")],
)?)
.arced();
let plan = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
plan,
vec![
col("a"),
col("b"),
col("udf_results_0"),
mock_stateful_udf.alias("udf_results_1"),
],
)?)
.arced();
let plan = LogicalPlan::Project(Project::try_new(
plan,
vec![
col("udf_results_0").alias("udf_results_0_alias"),
col("udf_results_1"),
],
)?)
.arced();

let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
dummy_scan_node_with_pushdowns(
scan_op,
Pushdowns::default().with_columns(Some(Arc::new(vec!["a".to_string()]))),
)
.build(),
// col("b") is pruned
vec![mock_stateful_udf.alias("udf_results_0"), col("a")],
)?)
.arced();
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected.clone(),
vec![
// Absorbed a non-computational expression (alias) from the Projection
col("udf_results_0").alias("udf_results_0_alias"),
mock_stateful_udf.alias("udf_results_1"),
],
)?)
.arced();

assert_optimized_plan_eq(plan, expected)?;
Ok(())
}

/// Projection<-ActorPoolProject prunes ActorPoolProject entirely if the stateful projection column is pruned
#[test]
fn test_projection_pushdown_into_actorpoolproject_completely_removed() -> DaftResult<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -834,18 +834,13 @@ mod tests {
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected,
vec![
col("a"),
create_stateful_udf(vec![col("a")])
.clone()
.alias(intermediate_name),
col("a"),
],
)?)
.arced();
let expected = LogicalPlan::Project(Project::try_new(
expected,
vec![col(intermediate_name), col("a")],
)?)
.arced();
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected,
vec![
Expand Down Expand Up @@ -933,11 +928,10 @@ mod tests {
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected,
vec![
col("a"), // TODO: This should be able to be pruned as well, but it seems Projection Pushdown isn't working as intended
col("b"),
create_stateful_udf(vec![col("a")])
.clone()
.alias(intermediate_name_0),
col("b"),
],
)?)
.arced();
Expand Down Expand Up @@ -1049,11 +1043,10 @@ mod tests {
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected,
vec![
col("a"), // TODO: This should be pruned by Projection Pushdown, but isn't for some reason
col("b"),
create_stateful_udf(vec![col("a")])
.clone()
.alias(intermediate_name_0),
col("b"),
],
)?)
.arced();
Expand Down
Loading