Skip to content

Commit

Permalink
Allow actor pool projects to also perform column pushdowns
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Aug 24, 2024
1 parent bf5c853 commit b35f110
Showing 1 changed file with 101 additions and 0 deletions.
101 changes: 101 additions & 0 deletions src/daft-plan/src/logical_optimization/rules/push_down_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,34 @@ impl PushDownProjection {
}
}

fn try_optimize_actor_pool_project(
&self,
actor_pool_project: &ActorPoolProject,
plan: Arc<LogicalPlan>,
) -> DaftResult<Transformed<Arc<LogicalPlan>>> {
// If this ActorPoolPorject prunes columns from its upstream,
// then explicitly create a projection to do so.
let upstream_plan = &actor_pool_project.input;
let upstream_schema = upstream_plan.schema();

let actor_pool_project_required_cols = &plan.required_columns()[0];
if actor_pool_project_required_cols.len() < upstream_schema.names().len() {
let new_subprojection: LogicalPlan = {
let pushdown_column_exprs = actor_pool_project_required_cols
.iter()
.map(|s| col(s.as_str()))
.collect::<Vec<_>>();

Project::try_new(upstream_plan.clone(), pushdown_column_exprs)?.into()
};

let new_actor_pool_project = plan.with_new_children(&[new_subprojection.into()]);
Ok(Transformed::Yes(new_actor_pool_project.into()))
} else {
Ok(Transformed::No(plan))
}
}

fn try_optimize_aggregation(
&self,
aggregation: &Aggregate,
Expand Down Expand Up @@ -543,6 +571,10 @@ impl OptimizerRule for PushDownProjection {
fn try_optimize(&self, plan: Arc<LogicalPlan>) -> DaftResult<Transformed<Arc<LogicalPlan>>> {
match plan.as_ref() {
LogicalPlan::Project(projection) => self.try_optimize_project(projection, plan.clone()),
// ActorPoolProjects also do column projection
LogicalPlan::ActorPoolProject(actor_pool_project) => {
self.try_optimize_actor_pool_project(actor_pool_project, plan.clone())
}
// Aggregations also do column projection
LogicalPlan::Aggregate(aggregation) => {
self.try_optimize_aggregation(aggregation, plan.clone())
Expand Down Expand Up @@ -846,6 +878,75 @@ mod tests {
Ok(())
}

/// Projection<-ActorPoolProject<-ActorPoolProject prunes columns from both ActorPoolProjects
#[test]
fn test_projection_pushdown_into_double_actorpoolproject() -> DaftResult<()> {
use crate::logical_ops::ActorPoolProject;
use crate::logical_ops::Project;
use common_resource_request::ResourceRequest;
use daft_dsl::functions::python::{PythonUDF, StatefulPythonUDF};
use daft_dsl::functions::FunctionExpr;
use daft_dsl::Expr;

let scan_op = dummy_scan_operator(vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Boolean),
Field::new("c", DataType::Int64),
]);
let scan_node = dummy_scan_node(scan_op).build();
let mock_stateful_udf = Expr::Function {
func: FunctionExpr::Python(PythonUDF::Stateful(StatefulPythonUDF {
name: Arc::new("my-udf".to_string()),
num_expressions: 1,
return_dtype: DataType::Utf8,
resource_request: Some(ResourceRequest::default_cpu()),
batch_size: None,
concurrency: Some(8),
})),
inputs: vec![col("a")],
}
.arced();

// Select the `udf_results` column, so the ActorPoolProject should apply column pruning to the other columns
let plan = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
scan_node.clone(),
vec![col("a"), col("b"), mock_stateful_udf.alias("udf_results_0")],
)?)
.arced();
let plan = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
plan,
vec![
col("a"),
col("b"),
col("udf_results_0"),
mock_stateful_udf.alias("udf_results_1"),
],
)?)
.arced();
let plan = LogicalPlan::Project(Project::try_new(
plan,
vec![col("udf_results_0"), col("udf_results_1")],
)?)
.arced();

let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
scan_node.clone(),
vec![col("a"), mock_stateful_udf.alias("udf_results_0")],
)?)
.arced();
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected.clone(),
vec![
col("udf_results_0"),
mock_stateful_udf.alias("udf_results_1"),
],
)?)
.arced();

assert_optimized_plan_eq(plan, expected)?;
Ok(())
}

/// Projection<-ActorPoolProject prunes ActorPoolProject entirely if the stateful projection column is pruned
#[test]
fn test_projection_pushdown_into_actorpoolproject_completely_removed() -> DaftResult<()> {
Expand Down

0 comments on commit b35f110

Please sign in to comment.