Skip to content

Commit

Permalink
Enable optimization behind feature flag
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Aug 15, 2024
1 parent 22441fc commit 1fb4fb0
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 29 deletions.
73 changes: 45 additions & 28 deletions src/daft-plan/src/logical_optimization/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::{
logical_plan_tracker::LogicalPlanTracker,
rules::{
ApplyOrder, DropRepartition, OptimizerRule, PushDownFilter, PushDownLimit,
PushDownProjection, Transformed,
PushDownProjection, SplitActorPoolProjects, Transformed,
},
};

Expand All @@ -17,20 +17,23 @@ use super::{
pub struct OptimizerConfig {
// Default maximum number of optimization passes the optimizer will make over a fixed-point RuleBatch.
pub default_max_optimizer_passes: usize,
// Feature flag for enabling creating ActorPoolProject nodes during plan optimization
pub enable_actor_pool_projections: bool,
}

impl OptimizerConfig {
fn new(max_optimizer_passes: usize) -> Self {
fn new(max_optimizer_passes: usize, enable_actor_pool_projections: bool) -> Self {
OptimizerConfig {
default_max_optimizer_passes: max_optimizer_passes,
enable_actor_pool_projections,
}
}
}

impl Default for OptimizerConfig {
fn default() -> Self {
// Default to a max of 5 optimizer passes for a given batch.
OptimizerConfig::new(5)
OptimizerConfig::new(5, false)
}
}

Expand Down Expand Up @@ -128,30 +131,44 @@ pub struct Optimizer {

impl Optimizer {
pub fn new(config: OptimizerConfig) -> Self {
// Default rule batches.
let rule_batches: Vec<RuleBatch> = vec![
RuleBatch::new(
let mut rule_batches = Vec::new();

// --- Split ActorPoolProjection nodes from Project nodes ---
// This is feature-flagged behind DAFT_ENABLE_ACTOR_POOL_PROJECTIONS=1
if config.enable_actor_pool_projections {
rule_batches.push(RuleBatch::new(
vec![
Box::new(DropRepartition::new()),
Box::new(PushDownFilter::new()),
Box::new(PushDownProjection::new()),
Box::new(SplitActorPoolProjects::new()),
Box::new(PushDownProjection::new()),
],
// Use a fixed-point policy for the pushdown rules: PushDownProjection can produce a Filter node
// at the current node, which would require another batch application in order to have a chance to push
// that Filter node through upstream nodes.
// TODO(Clark): Refine this fixed-point policy.
RuleExecutionStrategy::FixedPoint(Some(3)),
),
RuleBatch::new(
vec![
// This needs to be separate from PushDownProjection because otherwise the limit and
// projection just keep swapping places, preventing optimization
// (see https://github.com/Eventual-Inc/Daft/issues/2616)
Box::new(PushDownLimit::new()),
],
RuleExecutionStrategy::FixedPoint(Some(3)),
),
];
RuleExecutionStrategy::Once,
));
}

// --- Bulk of our rules ---
rule_batches.push(RuleBatch::new(
vec![
Box::new(DropRepartition::new()),
Box::new(PushDownFilter::new()),
Box::new(PushDownProjection::new()),
],
// Use a fixed-point policy for the pushdown rules: PushDownProjection can produce a Filter node
// at the current node, which would require another batch application in order to have a chance to push
// that Filter node through upstream nodes.
// TODO(Clark): Refine this fixed-point policy.
RuleExecutionStrategy::FixedPoint(Some(3)),
));

// --- Limit pushdowns ---
// This needs to be separate from PushDownProjection because otherwise the limit and
// projection just keep swapping places, preventing optimization
// (see https://github.com/Eventual-Inc/Daft/issues/2616)
rule_batches.push(RuleBatch::new(
vec![Box::new(PushDownLimit::new())],
RuleExecutionStrategy::FixedPoint(Some(3)),
));

Self::with_rule_batches(rule_batches, config)
}

Expand Down Expand Up @@ -343,7 +360,7 @@ mod tests {
vec![Box::new(NoOp::new())],
RuleExecutionStrategy::Once,
)],
OptimizerConfig::new(5),
OptimizerConfig::new(5, false),
);
let plan: Arc<LogicalPlan> =
dummy_scan_node(dummy_scan_operator(vec![Field::new("a", DataType::Int64)])).build();
Expand Down Expand Up @@ -394,7 +411,7 @@ mod tests {
vec![Box::new(RotateProjection::new(false))],
RuleExecutionStrategy::FixedPoint(Some(20)),
)],
OptimizerConfig::new(20),
OptimizerConfig::new(20, false),
);
let proj_exprs = vec![
col("a").add(lit(1)),
Expand Down Expand Up @@ -429,7 +446,7 @@ mod tests {
vec![Box::new(RotateProjection::new(true))],
RuleExecutionStrategy::FixedPoint(Some(20)),
)],
OptimizerConfig::new(20),
OptimizerConfig::new(20, false),
);
let proj_exprs = vec![
col("a").add(lit(1)),
Expand Down Expand Up @@ -480,7 +497,7 @@ mod tests {
RuleExecutionStrategy::Once,
),
],
OptimizerConfig::new(20),
OptimizerConfig::new(20, false),
);
let proj_exprs = vec![
col("a").add(lit(1)),
Expand Down
1 change: 1 addition & 0 deletions src/daft-plan/src/logical_optimization/rules/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ pub use push_down_filter::PushDownFilter;
pub use push_down_limit::PushDownLimit;
pub use push_down_projection::PushDownProjection;
pub use rule::{ApplyOrder, OptimizerRule, Transformed};
pub use split_actor_pool_projects::SplitActorPoolProjects;
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,6 @@ mod tests {
)
}

#[cfg(not(feature = "python"))]
fn create_stateful_udf(inputs: Vec<ExprRef>) -> ExprRef {
Expr::Function {
func: FunctionExpr::Python(PythonUDF::Stateful(StatefulPythonUDF {
Expand Down

0 comments on commit 1fb4fb0

Please sign in to comment.