diff --git a/daft/context.py b/daft/context.py index aa98ff23fc..0b2a053cce 100644 --- a/daft/context.py +++ b/daft/context.py @@ -275,6 +275,7 @@ def set_execution_config( broadcast_join_size_bytes_threshold: int | None = None, parquet_split_row_groups_max_files: int | None = None, sort_merge_join_sort_with_aligned_boundaries: bool | None = None, + hash_join_partition_size_leniency: bool | None = None, sample_size_for_sort: int | None = None, num_preview_rows: int | None = None, parquet_target_filesize: int | None = None, @@ -305,6 +306,9 @@ def set_execution_config( sort_merge_join_sort_with_aligned_boundaries: Whether to use a specialized algorithm for sorting both sides of a sort-merge join such that they have aligned boundaries. This can lead to a faster merge-join at the cost of more skewed sorted join inputs, increasing the risk of OOMs. + hash_join_partition_size_leniency: If the left side of a hash join is already correctly partitioned and the right side isn't, + and the ratio between the left and right size is at least this value, then the right side is repartitioned to have an equal + number of partitions as the left. Defaults to 0.5. sample_size_for_sort: number of elements to sample from each partition when running sort, Default is 20. num_preview_rows: number of rows to when showing a dataframe preview, diff --git a/daft/daft.pyi b/daft/daft.pyi index d9560671df..18f33e9b50 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -1674,6 +1674,7 @@ class PyDaftExecutionConfig: broadcast_join_size_bytes_threshold: int | None = None, parquet_split_row_groups_max_files: int | None = None, sort_merge_join_sort_with_aligned_boundaries: bool | None = None, + hash_join_partition_size_leniency: float | None = None, sample_size_for_sort: int | None = None, num_preview_rows: int | None = None, parquet_target_filesize: int | None = None, @@ -1695,6 +1696,8 @@ class PyDaftExecutionConfig: @property def sort_merge_join_sort_with_aligned_boundaries(self) -> bool: ... @property + def hash_join_partition_size_leniency(self) -> float: ... + @property def sample_size_for_sort(self) -> int: ... @property def num_preview_rows(self) -> int: ... diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index ee87b38e7d..4ccbda0395 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -30,6 +30,7 @@ pub struct DaftExecutionConfig { pub scan_tasks_max_size_bytes: usize, pub broadcast_join_size_bytes_threshold: usize, pub sort_merge_join_sort_with_aligned_boundaries: bool, + pub hash_join_partition_size_leniency: f64, pub sample_size_for_sort: usize, pub parquet_split_row_groups_max_files: usize, pub num_preview_rows: usize, @@ -51,6 +52,7 @@ impl Default for DaftExecutionConfig { scan_tasks_max_size_bytes: 384 * 1024 * 1024, // 384MB broadcast_join_size_bytes_threshold: 10 * 1024 * 1024, // 10 MiB sort_merge_join_sort_with_aligned_boundaries: false, + hash_join_partition_size_leniency: 0.5, sample_size_for_sort: 20, parquet_split_row_groups_max_files: 10, num_preview_rows: 8, diff --git a/src/common/daft-config/src/python.rs b/src/common/daft-config/src/python.rs index 557b977bc5..df76474763 100644 --- a/src/common/daft-config/src/python.rs +++ b/src/common/daft-config/src/python.rs @@ -90,6 +90,7 @@ impl PyDaftExecutionConfig { broadcast_join_size_bytes_threshold: Option, parquet_split_row_groups_max_files: Option, sort_merge_join_sort_with_aligned_boundaries: Option, + hash_join_partition_size_leniency: Option, sample_size_for_sort: Option, num_preview_rows: Option, parquet_target_filesize: Option, @@ -122,6 +123,9 @@ impl PyDaftExecutionConfig { config.sort_merge_join_sort_with_aligned_boundaries = sort_merge_join_sort_with_aligned_boundaries; } + if let Some(hash_join_partition_size_leniency) = hash_join_partition_size_leniency { + config.hash_join_partition_size_leniency = hash_join_partition_size_leniency; + } if let Some(sample_size_for_sort) = sample_size_for_sort { config.sample_size_for_sort = sample_size_for_sort; } @@ -183,6 +187,11 @@ impl PyDaftExecutionConfig { Ok(self.config.sort_merge_join_sort_with_aligned_boundaries) } + #[getter] + fn get_hash_join_partition_size_leniency(&self) -> PyResult { + Ok(self.config.hash_join_partition_size_leniency) + } + #[getter] fn get_sample_size_for_sort(&self) -> PyResult { Ok(self.config.sample_size_for_sort) diff --git a/src/daft-dsl/src/expr.rs b/src/daft-dsl/src/expr.rs index 73000c6466..331aa5fcb0 100644 --- a/src/daft-dsl/src/expr.rs +++ b/src/daft-dsl/src/expr.rs @@ -4,6 +4,7 @@ use daft_core::{ schema::Schema, utils::supertype::try_get_supertype, }; +use itertools::Itertools; use crate::{ functions::{ @@ -1253,6 +1254,14 @@ pub fn resolve_aggexprs( itertools::process_results(resolved_iter, |res| res.unzip()) } +// Check if one set of columns is a reordering of the other +pub fn is_partition_compatible(a: &[ExprRef], b: &[ExprRef]) -> bool { + // sort a and b by name + let a: Vec<&str> = a.iter().map(|a| a.name()).sorted().collect(); + let b: Vec<&str> = b.iter().map(|a| a.name()).sorted().collect(); + a == b +} + #[cfg(test)] mod tests { diff --git a/src/daft-dsl/src/lib.rs b/src/daft-dsl/src/lib.rs index fb85b1e083..322adec00e 100644 --- a/src/daft-dsl/src/lib.rs +++ b/src/daft-dsl/src/lib.rs @@ -14,7 +14,9 @@ mod treenode; pub use common_treenode; pub use expr::binary_op; pub use expr::col; -pub use expr::{resolve_aggexpr, resolve_aggexprs, resolve_expr, resolve_exprs}; +pub use expr::{ + is_partition_compatible, resolve_aggexpr, resolve_aggexprs, resolve_expr, resolve_exprs, +}; pub use expr::{AggExpr, ApproxPercentileParams, Expr, ExprRef, Operator}; pub use lit::{lit, null_lit, Literal, LiteralValue}; #[cfg(feature = "python")] diff --git a/src/daft-plan/src/lib.rs b/src/daft-plan/src/lib.rs index c17229b4ba..9b53d83eb9 100644 --- a/src/daft-plan/src/lib.rs +++ b/src/daft-plan/src/lib.rs @@ -10,6 +10,7 @@ mod logical_optimization; mod logical_plan; mod partitioning; pub mod physical_ops; +mod physical_optimization; mod physical_plan; mod physical_planner; mod resource_request; diff --git a/src/daft-plan/src/physical_ops/project.rs b/src/daft-plan/src/physical_ops/project.rs index 5d46465013..864785722d 100644 --- a/src/daft-plan/src/physical_ops/project.rs +++ b/src/daft-plan/src/physical_ops/project.rs @@ -20,13 +20,28 @@ pub struct Project { } impl Project { + // uses input to create output clustering spec pub(crate) fn try_new( input: PhysicalPlanRef, projection: Vec, resource_request: ResourceRequest, + ) -> DaftResult { + let clustering_spec = translate_clustering_spec(input.clustering_spec(), &projection); + Ok(Self { + input, + projection, + resource_request, + clustering_spec, + }) + } + + // does not re-create clustering spec, unlike try_new + pub(crate) fn new_with_clustering_spec( + input: PhysicalPlanRef, + projection: Vec, + resource_request: ResourceRequest, clustering_spec: Arc, ) -> DaftResult { - let clustering_spec = translate_clustering_spec(clustering_spec, &projection); Ok(Self { input, projection, diff --git a/src/daft-plan/src/physical_optimization/mod.rs b/src/daft-plan/src/physical_optimization/mod.rs new file mode 100644 index 0000000000..16cb38c015 --- /dev/null +++ b/src/daft-plan/src/physical_optimization/mod.rs @@ -0,0 +1,3 @@ +pub mod optimizer; +mod plan_context; +mod rules; diff --git a/src/daft-plan/src/physical_optimization/optimizer.rs b/src/daft-plan/src/physical_optimization/optimizer.rs new file mode 100644 index 0000000000..59ee90471d --- /dev/null +++ b/src/daft-plan/src/physical_optimization/optimizer.rs @@ -0,0 +1,248 @@ +use common_error::DaftResult; + +use crate::PhysicalPlanRef; + +use super::rules::{ + drop_repartition::DropRepartitionPhysical, reorder_partition_keys::ReorderPartitionKeys, + PhysicalOptimizerRuleBatch, PhysicalRuleExecutionStrategy, +}; + +pub struct PhysicalOptimizerConfig { + // The upper bound on the number of passes a rule batch can run. + // Depending on its configuration a rule batch may run fewer passes. + // Default is 5 + pub max_passes: usize, +} + +impl PhysicalOptimizerConfig { + #[allow(dead_code)] // used in test + pub fn new(max_passes: usize) -> Self { + PhysicalOptimizerConfig { max_passes } + } +} + +impl Default for PhysicalOptimizerConfig { + fn default() -> Self { + PhysicalOptimizerConfig { max_passes: 5 } + } +} + +pub struct PhysicalOptimizer { + rule_batches: Vec, + config: PhysicalOptimizerConfig, +} + +impl PhysicalOptimizer { + #[allow(dead_code)] // used in test + pub fn new( + rule_batches: Vec, + config: PhysicalOptimizerConfig, + ) -> Self { + PhysicalOptimizer { + rule_batches, + config, + } + } + + pub fn optimize(&self, mut plan: PhysicalPlanRef) -> DaftResult { + for batch in self.rule_batches.iter() { + plan = batch.optimize(plan, &self.config)?; + } + Ok(plan) + } +} + +impl Default for PhysicalOptimizer { + fn default() -> Self { + PhysicalOptimizer { + rule_batches: vec![PhysicalOptimizerRuleBatch::new( + vec![ + Box::new(ReorderPartitionKeys {}), + Box::new(DropRepartitionPhysical {}), + ], + PhysicalRuleExecutionStrategy::Once, + )], + config: PhysicalOptimizerConfig::default(), + } + } +} + +#[cfg(test)] +mod tests { + use std::{assert_matches::assert_matches, sync::Arc}; + + use common_error::DaftResult; + use common_treenode::Transformed; + use daft_core::{ + datatypes::Field, + schema::{Schema, SchemaRef}, + }; + + use crate::{ + partitioning::UnknownClusteringConfig, + physical_ops::{EmptyScan, Limit}, + physical_optimization::{optimizer::PhysicalOptimizerConfig, rules::PhysicalOptimizerRule}, + ClusteringSpec, PhysicalPlan, PhysicalPlanRef, + }; + + use super::{PhysicalOptimizer, PhysicalOptimizerRuleBatch}; + + fn create_dummy_plan(schema: SchemaRef, num_partitions: usize) -> PhysicalPlanRef { + PhysicalPlan::EmptyScan(EmptyScan::new( + schema, + ClusteringSpec::Unknown(UnknownClusteringConfig::new(num_partitions)).into(), + )) + .into() + } + + // rule that increments Limit's limit by 1 per pass + struct CountingRule { + pub cutoff: i64, + } + + impl PhysicalOptimizerRule for CountingRule { + fn rewrite(&self, plan: PhysicalPlanRef) -> DaftResult> { + match plan.as_ref() { + PhysicalPlan::Limit(Limit { + input, + limit, + eager, + num_partitions, + }) => { + if *limit >= self.cutoff { + Ok(Transformed::no(plan)) + } else { + let new_plan = PhysicalPlan::Limit(Limit::new( + input.clone(), + limit + 1, + *eager, + *num_partitions, + )); + Ok(Transformed::yes(new_plan.arced())) + } + } + _ => panic!("expected Limit"), + } + } + } + + // test that Once rule batches only execute once + #[test] + fn test_rule_batch_once() -> DaftResult<()> { + let plan = create_dummy_plan( + Arc::new(Schema::new(vec![Field::new( + "a", + daft_core::DataType::Int32, + )])?), + 1, + ); + + let plan = PhysicalPlan::Limit(Limit::new(plan, 0, true, 1)); + let optimizer = PhysicalOptimizer::new( + vec![PhysicalOptimizerRuleBatch::new( + vec![Box::new(CountingRule { cutoff: 100 })], + super::PhysicalRuleExecutionStrategy::Once, + )], + PhysicalOptimizerConfig::new(5), + ); + let plan = optimizer.optimize(plan.arced())?; + assert_matches!(plan.as_ref(), PhysicalPlan::Limit(Limit { limit: 1, .. })); + Ok(()) + } + + // make sure that fixed point cuts off when not transformed + #[test] + fn test_rule_batch_fixed_point() -> DaftResult<()> { + let plan = create_dummy_plan( + Arc::new(Schema::new(vec![Field::new( + "a", + daft_core::DataType::Int32, + )])?), + 1, + ); + + let plan = PhysicalPlan::Limit(Limit::new(plan, 0, true, 1)); + let optimizer = PhysicalOptimizer::new( + vec![PhysicalOptimizerRuleBatch::new( + vec![Box::new(CountingRule { cutoff: 2 })], + super::PhysicalRuleExecutionStrategy::FixedPoint(Some(4)), + )], + PhysicalOptimizerConfig::new(5), + ); + let plan = optimizer.optimize(plan.arced())?; + assert_matches!(plan.as_ref(), PhysicalPlan::Limit(Limit { limit: 2, .. })); + Ok(()) + } + + // make sure that fixed point stops at maximum + #[test] + fn test_rule_batch_fixed_point_max() -> DaftResult<()> { + let plan = create_dummy_plan( + Arc::new(Schema::new(vec![Field::new( + "a", + daft_core::DataType::Int32, + )])?), + 1, + ); + + let plan = PhysicalPlan::Limit(Limit::new(plan, 0, true, 1)); + let optimizer = PhysicalOptimizer::new( + vec![PhysicalOptimizerRuleBatch::new( + vec![Box::new(CountingRule { cutoff: 100 })], + super::PhysicalRuleExecutionStrategy::FixedPoint(Some(4)), + )], + PhysicalOptimizerConfig::new(5), + ); + let plan = optimizer.optimize(plan.arced())?; + assert_matches!(plan.as_ref(), PhysicalPlan::Limit(Limit { limit: 4, .. })); + Ok(()) + } + + // make sure that fixed point stops at max_passes + #[test] + fn test_rule_batch_fixed_point_max_passes() -> DaftResult<()> { + let plan = create_dummy_plan( + Arc::new(Schema::new(vec![Field::new( + "a", + daft_core::DataType::Int32, + )])?), + 1, + ); + + let plan = PhysicalPlan::Limit(Limit::new(plan, 0, true, 1)); + let optimizer = PhysicalOptimizer::new( + vec![PhysicalOptimizerRuleBatch::new( + vec![Box::new(CountingRule { cutoff: 100 })], + super::PhysicalRuleExecutionStrategy::FixedPoint(Some(7)), + )], + PhysicalOptimizerConfig::new(5), + ); + let plan = optimizer.optimize(plan.arced())?; + assert_matches!(plan.as_ref(), PhysicalPlan::Limit(Limit { limit: 5, .. })); + Ok(()) + } + + // make sure that fixed point stops at max_passes without a limit + #[test] + fn test_rule_batch_fixed_point_no_limit() -> DaftResult<()> { + let plan = create_dummy_plan( + Arc::new(Schema::new(vec![Field::new( + "a", + daft_core::DataType::Int32, + )])?), + 1, + ); + + let plan = PhysicalPlan::Limit(Limit::new(plan, 0, true, 1)); + let optimizer = PhysicalOptimizer::new( + vec![PhysicalOptimizerRuleBatch::new( + vec![Box::new(CountingRule { cutoff: 100 })], + super::PhysicalRuleExecutionStrategy::FixedPoint(None), + )], + PhysicalOptimizerConfig::new(7), + ); + let plan = optimizer.optimize(plan.arced())?; + assert_matches!(plan.as_ref(), PhysicalPlan::Limit(Limit { limit: 7, .. })); + Ok(()) + } +} diff --git a/src/daft-plan/src/physical_optimization/plan_context.rs b/src/daft-plan/src/physical_optimization/plan_context.rs new file mode 100644 index 0000000000..1115607db7 --- /dev/null +++ b/src/daft-plan/src/physical_optimization/plan_context.rs @@ -0,0 +1,66 @@ +use common_error::DaftResult; +use common_treenode::ConcreteTreeNode; + +use crate::PhysicalPlanRef; + +// This struct allows providing context or state to go along +// with visiting TreeNodes. +pub(super) struct PlanContext { + pub plan: PhysicalPlanRef, + pub context: T, + pub children: Vec, +} + +impl PlanContext { + pub fn new(plan: PhysicalPlanRef, context: T, children: Vec) -> Self { + Self { + plan, + context, + children, + } + } + + pub fn with_plan(self, new_plan: PhysicalPlanRef) -> Self { + Self::new(new_plan, self.context, self.children) + } + + pub fn with_context(self, new_context: T) -> Self { + Self::new(self.plan, new_context, self.children) + } +} + +impl PlanContext { + pub fn new_default(plan: PhysicalPlanRef) -> Self { + let children = plan.children().into_iter().map(Self::new_default).collect(); + Self::new(plan, Default::default(), children) + } +} + +impl PlanContext { + // Clone the context to the children + pub fn propagate(mut self) -> Self { + for child in self.children.iter_mut() { + child.context = self.context.clone(); + } + self + } +} + +impl ConcreteTreeNode for PlanContext { + fn children(&self) -> Vec<&Self> { + self.children.iter().collect() + } + + fn take_children(mut self) -> (Self, Vec) { + let children = std::mem::take(&mut self.children); + (self, children) + } + + fn with_new_children(mut self, children: Vec) -> DaftResult { + self.children = children; + let child_plans: Vec = + self.children.iter().map(|x| x.plan.clone()).collect(); + self.plan = self.plan.with_new_children(&child_plans).arced(); + Ok(self) + } +} diff --git a/src/daft-plan/src/physical_optimization/rules/drop_repartition.rs b/src/daft-plan/src/physical_optimization/rules/drop_repartition.rs new file mode 100644 index 0000000000..fc344b9bab --- /dev/null +++ b/src/daft-plan/src/physical_optimization/rules/drop_repartition.rs @@ -0,0 +1,135 @@ +use common_error::DaftResult; +use common_treenode::{Transformed, TreeNode}; + +use crate::{ + physical_ops::FanoutByHash, physical_optimization::rules::PhysicalOptimizerRule, + ClusteringSpec, PhysicalPlan, PhysicalPlanRef, +}; + +pub struct DropRepartitionPhysical {} + +// if we are repartitioning but the child already has the correct spec, then don't repartition +impl PhysicalOptimizerRule for DropRepartitionPhysical { + fn rewrite(&self, plan: PhysicalPlanRef) -> DaftResult> { + plan.transform_up(|c| { + if c.children().len() != 1 { + return Ok(Transformed::no(c)); + } + let children = c.children(); + let child = children.first().unwrap(); + let cur_spec = child.clustering_spec(); + if !matches!(cur_spec.as_ref(), ClusteringSpec::Hash(..)) { + return Ok(Transformed::no(c)); + } + + match c.as_ref() { + PhysicalPlan::FanoutByHash(FanoutByHash { + partition_by, + num_partitions, + .. + }) => { + if *partition_by == cur_spec.partition_by() + && *num_partitions == cur_spec.num_partitions() + { + Ok(Transformed::yes(child.clone())) + } else { + Ok(Transformed::no(c)) + } + } + // remove extra reducemerge + PhysicalPlan::ReduceMerge(..) => match child.as_ref() { + PhysicalPlan::FanoutByHash(..) + | PhysicalPlan::FanoutByRange(..) + | PhysicalPlan::FanoutRandom(..) => Ok(Transformed::no(c)), + _ => Ok(Transformed::yes(child.clone())), + }, + _ => Ok(Transformed::no(c)), + } + }) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_error::DaftResult; + use daft_core::{ + datatypes::Field, + schema::{Schema, SchemaRef}, + }; + use daft_dsl::{col, ExprRef}; + + use crate::{ + partitioning::UnknownClusteringConfig, + physical_ops::{EmptyScan, FanoutByHash, ReduceMerge}, + physical_optimization::rules::PhysicalOptimizerRule, + ClusteringSpec, PhysicalPlan, PhysicalPlanRef, + }; + + use super::DropRepartitionPhysical; + + fn create_dummy_plan(schema: SchemaRef, num_partitions: usize) -> PhysicalPlanRef { + PhysicalPlan::EmptyScan(EmptyScan::new( + schema, + ClusteringSpec::Unknown(UnknownClusteringConfig::new(num_partitions)).into(), + )) + .into() + } + + fn add_repartition( + plan: PhysicalPlanRef, + num_partitions: usize, + partition_by: Vec, + ) -> PhysicalPlanRef { + PhysicalPlan::ReduceMerge(ReduceMerge::new( + PhysicalPlan::FanoutByHash(FanoutByHash::new(plan, num_partitions, partition_by)) + .into(), + )) + .into() + } + + // makes sure trivial repartitions are removed + #[test] + fn test_repartition_removed() -> DaftResult<()> { + let base = create_dummy_plan( + Arc::new(Schema::new(vec![ + Field::new("a", daft_core::DataType::Int32), + Field::new("b", daft_core::DataType::Int32), + Field::new("c", daft_core::DataType::Int32), + ])?), + 1, + ); + let plan = add_repartition(base.clone(), 1, vec![col("a"), col("b")]); + let plan = add_repartition(plan, 1, vec![col("a"), col("b")]); + let rule = DropRepartitionPhysical {}; + let res = rule.rewrite(plan)?; + assert!(res.transformed); + + let expected_plan = add_repartition(base, 1, vec![col("a"), col("b")]); + assert_eq!(res.data, expected_plan); + Ok(()) + } + + // makes sure different repartitions are not removed + #[test] + fn test_repartition_not_removed() -> DaftResult<()> { + let plan = create_dummy_plan( + Arc::new(Schema::new(vec![ + Field::new("a", daft_core::DataType::Int32), + Field::new("b", daft_core::DataType::Int32), + Field::new("c", daft_core::DataType::Int32), + ])?), + 1, + ); + let plan = add_repartition(plan, 1, vec![col("a"), col("b")]); + let plan = add_repartition(plan, 1, vec![col("a"), col("c")]); + let plan = add_repartition(plan, 1, vec![col("a"), col("c"), col("b")]); + let plan = add_repartition(plan, 1, vec![col("a")]); + let rule = DropRepartitionPhysical {}; + let res = rule.rewrite(plan.clone())?; + assert!(!res.transformed); + assert_eq!(res.data, plan); + Ok(()) + } +} diff --git a/src/daft-plan/src/physical_optimization/rules/mod.rs b/src/daft-plan/src/physical_optimization/rules/mod.rs new file mode 100644 index 0000000000..487d44b4b0 --- /dev/null +++ b/src/daft-plan/src/physical_optimization/rules/mod.rs @@ -0,0 +1,5 @@ +pub(super) mod drop_repartition; +pub(super) mod reorder_partition_keys; +mod rule; + +pub use rule::*; diff --git a/src/daft-plan/src/physical_optimization/rules/reorder_partition_keys.rs b/src/daft-plan/src/physical_optimization/rules/reorder_partition_keys.rs new file mode 100644 index 0000000000..a5eef7c432 --- /dev/null +++ b/src/daft-plan/src/physical_optimization/rules/reorder_partition_keys.rs @@ -0,0 +1,295 @@ +use common_error::DaftResult; +use common_treenode::{ConcreteTreeNode, Transformed, TreeNode}; +use daft_dsl::{is_partition_compatible, ExprRef}; + +use crate::{ + partitioning::HashClusteringConfig, + physical_ops::{Aggregate, Explode, FanoutByHash, HashJoin, Project, Unpivot}, + physical_optimization::{plan_context::PlanContext, rules::PhysicalOptimizerRule}, + ClusteringSpec, PhysicalPlan, PhysicalPlanRef, +}; + +pub struct ReorderPartitionKeys {} + +type PartitionContext = PlanContext>; + +// Reorders columns in partitions so that they can be removed later. +// This works by maintaining a "canonical" ordering of the columns as we walk +// down the plan tree. +// For instance, if we see a hash partition by [col("b"), col("a")], then if +// we see something that partitions by [col("a"), col("b")], we reorder it +// to also partition by [col("b"), col("a")]. +// This allows us to remove redundant repartitions, which is done in another rule. +impl PhysicalOptimizerRule for ReorderPartitionKeys { + fn rewrite(&self, plan: PhysicalPlanRef) -> DaftResult> { + let plan_context = PartitionContext::new_default(plan); + + let res_transformed = plan_context.transform_down(|c| { + let plan = c.plan.clone(); + match plan.as_ref() { + // 0-input nodes + #[cfg(feature = "python")] + PhysicalPlan::InMemoryScan(..) => return Ok(Transformed::no(c)), + PhysicalPlan::EmptyScan(..) | + PhysicalPlan::TabularScan(..) => return Ok(Transformed::no(c)), + // 2-input nodes + // for concat, hash partitioning shouldn't change + PhysicalPlan::Concat(..) => return Ok(Transformed::no(c.propagate())), + // for hash join, send separate partitionings to children + PhysicalPlan::HashJoin(HashJoin { left_on, right_on, .. }) => { + let (c, old_children) = c.take_children(); + let num_children = old_children.len(); + let Ok([left_child, right_child]) = TryInto::<[_; 2]>::try_into(old_children) else { + panic!("HashJoin has {} children, expected 2", num_children); + }; + let left_child = left_child.with_context(left_on.clone()); + let right_child = right_child.with_context(right_on.clone()); + return Ok(Transformed::no(c.with_new_children(vec![left_child, right_child])?)) + } + // for other joins, hash partitioning doesn't matter + PhysicalPlan::BroadcastJoin(..) | + PhysicalPlan::SortMergeJoin(..) => return Ok(Transformed::no(c)), + _ => {}, + }; + + // check clustering spec for compatibility + let clustering_spec = c.plan.clustering_spec(); + match clustering_spec.as_ref() { + ClusteringSpec::Hash(HashClusteringConfig { by, .. }) => { + if *by == c.context { + // partition is already perfect + return Ok(Transformed::no(c.propagate())); + } + if !is_partition_compatible(&c.context, by) { + // we are hash partitioned, just by something different + return Ok(Transformed::no(c.with_context(by.clone()).propagate())); + } + // otherwise we need to reorder the columns + } + _ => return Ok(Transformed::no(c)), + }; + + let new_spec = ClusteringSpec::Hash(HashClusteringConfig::new( + clustering_spec.num_partitions(), + c.context.clone(), + )); + + // we are hash partitioned but we might need to transform the expression + match c.plan.as_ref() { + // these store their clustering spec inside + PhysicalPlan::Project(Project { input, projection, resource_request, .. }) => { + let new_plan = PhysicalPlan::Project(Project::new_with_clustering_spec( + input.clone(), + projection.clone(), + resource_request.clone(), + new_spec.into(), + )?); + Ok(Transformed::yes(c.with_plan(new_plan.into()).propagate())) + } + PhysicalPlan::Explode(Explode { input, to_explode, .. }) => { + // can't use try_new because we are setting the clustering spec ourselves + let new_plan = PhysicalPlan::Explode(Explode { + input: input.clone(), + to_explode: to_explode.clone(), + clustering_spec: new_spec.into(), + }); + Ok(Transformed::yes(c.with_plan(new_plan.into()).propagate())) + } + PhysicalPlan::Unpivot(Unpivot { input, ids, values, value_name, variable_name, .. }) => { + // can't use new because we are setting the clustering spec ourselves + let new_plan = PhysicalPlan::Unpivot(Unpivot { + input: input.clone(), + ids: ids.clone(), + values: values.clone(), + value_name: value_name.clone(), + variable_name: variable_name.clone(), + clustering_spec: new_spec.into() + }); + Ok(Transformed::yes(c.with_plan(new_plan.into()).propagate())) + } + PhysicalPlan::FanoutByHash(FanoutByHash { input, num_partitions, .. }) => { + let new_plan = PhysicalPlan::FanoutByHash(FanoutByHash { + input: input.clone(), + num_partitions: *num_partitions, + partition_by: c.context.clone() + }); + Ok(Transformed::yes(c.with_plan(new_plan.into()).propagate())) + } + PhysicalPlan::Aggregate(Aggregate { input, aggregations, .. }) => { + let new_plan = PhysicalPlan::Aggregate(Aggregate { + input: input.clone(), + aggregations: aggregations.clone(), + groupby: c.context.clone(), + }); + Ok(Transformed::yes(c.with_plan(new_plan.into()).propagate())) + } + + // these depend solely on their input + PhysicalPlan::Filter(..) | + PhysicalPlan::Limit(..) | + PhysicalPlan::Sample(..) | + PhysicalPlan::MonotonicallyIncreasingId(..) | + PhysicalPlan::Flatten(..) | + PhysicalPlan::ReduceMerge(..) | + PhysicalPlan::Pivot(..) | + PhysicalPlan::TabularWriteCsv(..) | + PhysicalPlan::TabularWriteJson(..) | + PhysicalPlan::TabularWriteParquet(..) => Ok(Transformed::no(c.propagate())), + + // the rest should have been dealt with earlier + PhysicalPlan::Sort(..) | + PhysicalPlan::InMemoryScan(..) | + PhysicalPlan::TabularScan(..) | + PhysicalPlan::EmptyScan(..) | + PhysicalPlan::Split(..) | + PhysicalPlan::Coalesce(..) | + PhysicalPlan::FanoutRandom(..) | + PhysicalPlan::FanoutByRange(..) | + PhysicalPlan::Concat(..) | + PhysicalPlan::HashJoin(..) | + PhysicalPlan::SortMergeJoin(..) | + PhysicalPlan::BroadcastJoin(..) => unreachable!("PhysicalPlan match for ReorderPartitionKeys physical optimizer rule should not be reachable"), + #[cfg(feature = "python")] + PhysicalPlan::IcebergWrite(..) | PhysicalPlan::DeltaLakeWrite(..) | PhysicalPlan::LanceWrite(..) => { + unreachable!("PhysicalPlan match for ReorderPartitionKeys physical optimizer rule should not be reachable") + } + } + })?; + res_transformed.map_data(|c| Ok(c.plan)) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_error::DaftResult; + use daft_core::{ + datatypes::Field, + schema::{Schema, SchemaRef}, + }; + use daft_dsl::{col, ExprRef}; + + use crate::{ + partitioning::UnknownClusteringConfig, + physical_ops::{EmptyScan, FanoutByHash, HashJoin, ReduceMerge}, + physical_optimization::{ + rules::reorder_partition_keys::ReorderPartitionKeys, rules::PhysicalOptimizerRule, + }, + ClusteringSpec, PhysicalPlan, PhysicalPlanRef, + }; + + fn create_dummy_plan(schema: SchemaRef, num_partitions: usize) -> PhysicalPlanRef { + PhysicalPlan::EmptyScan(EmptyScan::new( + schema, + ClusteringSpec::Unknown(UnknownClusteringConfig::new(num_partitions)).into(), + )) + .into() + } + + fn add_repartition( + plan: PhysicalPlanRef, + num_partitions: usize, + partition_by: Vec, + ) -> PhysicalPlanRef { + PhysicalPlan::ReduceMerge(ReduceMerge::new( + PhysicalPlan::FanoutByHash(FanoutByHash::new(plan, num_partitions, partition_by)) + .into(), + )) + .into() + } + + // makes sure trivial repartitions are modified + #[test] + fn test_repartition_modified() -> DaftResult<()> { + let base = create_dummy_plan( + Arc::new(Schema::new(vec![ + Field::new("a", daft_core::DataType::Int32), + Field::new("b", daft_core::DataType::Int32), + Field::new("c", daft_core::DataType::Int32), + ])?), + 1, + ); + let plan = add_repartition(base.clone(), 1, vec![col("a"), col("b")]); + let plan = add_repartition(plan, 1, vec![col("b"), col("a")]); + let rule = ReorderPartitionKeys {}; + let res = rule.rewrite(plan)?; + assert!(res.transformed); + + // expected is two repartitions by b, a + let expected_plan = add_repartition(base, 1, vec![col("b"), col("a")]); + let expected_plan = add_repartition(expected_plan, 1, vec![col("b"), col("a")]); + assert_eq!(res.data, expected_plan); + Ok(()) + } + + // makes sure different repartitions are not modified + #[test] + fn test_repartition_not_modified() -> DaftResult<()> { + let plan = create_dummy_plan( + Arc::new(Schema::new(vec![ + Field::new("a", daft_core::DataType::Int32), + Field::new("b", daft_core::DataType::Int32), + Field::new("c", daft_core::DataType::Int32), + ])?), + 1, + ); + let plan = add_repartition(plan, 1, vec![col("a"), col("b")]); + let plan = add_repartition(plan, 1, vec![col("a"), col("c")]); + let plan = add_repartition(plan, 1, vec![col("a"), col("c"), col("b")]); + let plan = add_repartition(plan, 1, vec![col("b")]); + let rule = ReorderPartitionKeys {}; + let res = rule.rewrite(plan.clone())?; + assert!(!res.transformed); + assert_eq!(res.data, plan); + Ok(()) + } + + // makes sure hash joins reorder the columns + #[test] + fn test_repartition_hash_join_reorder() -> DaftResult<()> { + let base1 = create_dummy_plan( + Arc::new(Schema::new(vec![ + Field::new("a", daft_core::DataType::Int32), + Field::new("b", daft_core::DataType::Int32), + Field::new("c", daft_core::DataType::Int32), + ])?), + 1, + ); + let plan1 = add_repartition(base1.clone(), 1, vec![col("a"), col("b")]); + + let base2 = create_dummy_plan( + Arc::new(Schema::new(vec![ + Field::new("x", daft_core::DataType::Int32), + Field::new("y", daft_core::DataType::Int32), + Field::new("z", daft_core::DataType::Int32), + ])?), + 1, + ); + let plan2 = add_repartition(base2.clone(), 1, vec![col("x"), col("y")]); + + let plan = PhysicalPlan::HashJoin(HashJoin::new( + plan1, + plan2, + vec![col("b"), col("a")], + vec![col("x"), col("y")], + daft_core::JoinType::Inner, + )) + .arced(); + + let rule = ReorderPartitionKeys {}; + let res = rule.rewrite(plan)?; + assert!(res.transformed); + + let expected_plan = PhysicalPlan::HashJoin(HashJoin::new( + add_repartition(base1, 1, vec![col("b"), col("a")]), + add_repartition(base2, 1, vec![col("x"), col("y")]), + vec![col("b"), col("a")], + vec![col("x"), col("y")], + daft_core::JoinType::Inner, + )) + .arced(); + assert_eq!(res.data, expected_plan); + Ok(()) + } +} diff --git a/src/daft-plan/src/physical_optimization/rules/rule.rs b/src/daft-plan/src/physical_optimization/rules/rule.rs new file mode 100644 index 0000000000..8d20891424 --- /dev/null +++ b/src/daft-plan/src/physical_optimization/rules/rule.rs @@ -0,0 +1,66 @@ +use common_error::DaftResult; +use common_treenode::{Transformed, TransformedResult}; + +use crate::{physical_optimization::optimizer::PhysicalOptimizerConfig, PhysicalPlanRef}; + +pub trait PhysicalOptimizerRule { + fn rewrite(&self, plan: PhysicalPlanRef) -> DaftResult>; +} + +pub enum PhysicalRuleExecutionStrategy { + // Apply the batch of rules only once. + Once, + // Apply the batch of rules multiple times, to a fixed-point or until the max + // passes is hit. + // If parametrized by Some(n), the batch of rules will be run a maximum + // of n passes; if None, the number of passes is capped by the max passes argument. + #[allow(dead_code)] + FixedPoint(Option), +} + +pub struct PhysicalOptimizerRuleBatch { + rules: Vec>, + strategy: PhysicalRuleExecutionStrategy, +} + +// A batch of PhysicalOptimizerRules, which are run in order until +// the condition specified by the PhysicalRuleExecutionStrategy is satisfied. +impl PhysicalOptimizerRuleBatch { + pub fn new( + rules: Vec>, + strategy: PhysicalRuleExecutionStrategy, + ) -> Self { + PhysicalOptimizerRuleBatch { rules, strategy } + } + + fn optimize_once(&self, plan: PhysicalPlanRef) -> DaftResult> { + self.rules + .iter() + .try_fold(Transformed::no(plan), |plan, rule| { + plan.transform_data(|p| rule.rewrite(p)) + }) + } + + pub fn optimize( + &self, + plan: PhysicalPlanRef, + config: &PhysicalOptimizerConfig, + ) -> DaftResult { + match self.strategy { + PhysicalRuleExecutionStrategy::Once => self.optimize_once(plan).data(), + PhysicalRuleExecutionStrategy::FixedPoint(passes) => { + let passes = + passes.map_or(config.max_passes, |x| std::cmp::min(config.max_passes, x)); + let mut plan = plan; + for _ in 0..passes { + let transformed_plan = self.optimize_once(plan.clone())?; + if !transformed_plan.transformed { + break; + } + plan = transformed_plan.data; + } + Ok(plan) + } + } + } +} diff --git a/src/daft-plan/src/physical_plan.rs b/src/daft-plan/src/physical_plan.rs index 9d253055c2..04d0e70071 100644 --- a/src/daft-plan/src/physical_plan.rs +++ b/src/daft-plan/src/physical_plan.rs @@ -99,6 +99,7 @@ impl PhysicalPlan { } pub fn clustering_spec(&self) -> Arc { + // TODO: add cache or something to avoid excessive recalculation match self { Self::InMemoryScan(InMemoryScan { clustering_spec, .. @@ -172,16 +173,10 @@ impl PhysicalPlan { Self::ReduceMerge(ReduceMerge { input }) => input.clustering_spec(), Self::Aggregate(Aggregate { input, groupby, .. }) => { let input_clustering_spec = input.clustering_spec(); - if input_clustering_spec.num_partitions() == 1 { - input_clustering_spec - } else if groupby.is_empty() { + if groupby.is_empty() { ClusteringSpec::Unknown(Default::default()).into() } else { - ClusteringSpec::Hash(HashClusteringConfig::new( - input.clustering_spec().num_partitions(), - groupby.clone(), - )) - .into() + input_clustering_spec } } Self::Pivot(Pivot { input, .. }) => input.clustering_spec(), @@ -462,13 +457,15 @@ impl PhysicalPlan { Self::InMemoryScan(..) => panic!("Source nodes don't have children, with_new_children() should never be called for source ops"), Self::TabularScan(..) | Self::EmptyScan(..) => panic!("Source nodes don't have children, with_new_children() should never be called for source ops"), - Self::Project(Project { projection, resource_request, clustering_spec, .. }) => Self::Project(Project::try_new( + Self::Project(Project { projection, resource_request, clustering_spec, .. }) => + Self::Project(Project::new_with_clustering_spec( input.clone(), projection.clone(), resource_request.clone(), clustering_spec.clone(), ).unwrap()), Self::Filter(Filter { predicate, .. }) => Self::Filter(Filter::new(input.clone(), predicate.clone())), Self::Limit(Limit { limit, eager, num_partitions, .. }) => Self::Limit(Limit::new(input.clone(), *limit, *eager, *num_partitions)), Self::Explode(Explode { to_explode, .. }) => Self::Explode(Explode::try_new(input.clone(), to_explode.clone()).unwrap()), Self::Unpivot(Unpivot { ids, values, variable_name, value_name, .. }) => Self::Unpivot(Unpivot::new(input.clone(), ids.clone(), values.clone(), variable_name, value_name)), + Self::Pivot(Pivot { group_by, pivot_column, value_column, names, .. }) => Self::Pivot(Pivot::new(input.clone(), group_by.clone(), pivot_column.clone(), value_column.clone(), names.clone())), Self::Sample(Sample { fraction, with_replacement, seed, .. }) => Self::Sample(Sample::new(input.clone(), *fraction, *with_replacement, *seed)), Self::Sort(Sort { sort_by, descending, num_partitions, .. }) => Self::Sort(Sort::new(input.clone(), sort_by.clone(), descending.clone(), *num_partitions)), Self::Split(Split { input_num_partitions, output_num_partitions, .. }) => Self::Split(Split::new(input.clone(), *input_num_partitions, *output_num_partitions)), @@ -482,10 +479,14 @@ impl PhysicalPlan { Self::TabularWriteParquet(TabularWriteParquet { schema, file_info, .. }) => Self::TabularWriteParquet(TabularWriteParquet::new(schema.clone(), file_info.clone(), input.clone())), Self::TabularWriteCsv(TabularWriteCsv { schema, file_info, .. }) => Self::TabularWriteCsv(TabularWriteCsv::new(schema.clone(), file_info.clone(), input.clone())), Self::TabularWriteJson(TabularWriteJson { schema, file_info, .. }) => Self::TabularWriteJson(TabularWriteJson::new(schema.clone(), file_info.clone(), input.clone())), + Self::MonotonicallyIncreasingId(MonotonicallyIncreasingId { column_name, .. }) => Self::MonotonicallyIncreasingId(MonotonicallyIncreasingId::new(input.clone(), column_name)), #[cfg(feature = "python")] Self::IcebergWrite(IcebergWrite { schema, iceberg_info, .. }) => Self::IcebergWrite(IcebergWrite::new(schema.clone(), iceberg_info.clone(), input.clone())), #[cfg(feature = "python")] Self::DeltaLakeWrite(DeltaLakeWrite {schema, delta_lake_info, .. }) => Self::DeltaLakeWrite(DeltaLakeWrite::new(schema.clone(), delta_lake_info.clone(), input.clone())), + #[cfg(feature = "python")] + Self::LanceWrite(LanceWrite { schema, lance_info, .. }) => Self::LanceWrite(LanceWrite::new(schema.clone(), lance_info.clone(), input.clone())), + // we should really remove this catch-all _ => panic!("Physical op {:?} has two inputs, but got one", self), }, [input1, input2] => match self { diff --git a/src/daft-plan/src/physical_planner/mod.rs b/src/daft-plan/src/physical_planner/mod.rs index 61f8473595..5012034c74 100644 --- a/src/daft-plan/src/physical_planner/mod.rs +++ b/src/daft-plan/src/physical_planner/mod.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use common_daft_config::DaftExecutionConfig; use common_error::DaftResult; -use crate::physical_plan::PhysicalPlanRef; use crate::LogicalPlan; +use crate::{physical_optimization::optimizer::PhysicalOptimizer, physical_plan::PhysicalPlanRef}; use crate::physical_planner::planner::PhysicalPlanTranslator; use common_treenode::TreeNode; @@ -32,5 +32,7 @@ pub fn logical_to_physical( .physical_children .pop() .expect("should have exactly 1 parent"); + let optimizer = PhysicalOptimizer::default(); + let pplan = optimizer.optimize(pplan)?; Ok(pplan) } diff --git a/src/daft-plan/src/physical_planner/translate.rs b/src/daft-plan/src/physical_planner/translate.rs index d38b228100..ec063649c4 100644 --- a/src/daft-plan/src/physical_planner/translate.rs +++ b/src/daft-plan/src/physical_planner/translate.rs @@ -12,8 +12,8 @@ use daft_core::count_mode::CountMode; use daft_core::join::{JoinStrategy, JoinType}; use daft_core::schema::SchemaRef; use daft_core::DataType; -use daft_dsl::ExprRef; use daft_dsl::{col, ApproxPercentileParams}; +use daft_dsl::{is_partition_compatible, ExprRef}; use daft_scan::PhysicalScanInfo; @@ -103,12 +103,10 @@ pub(super) fn translate_single_logical_node( .. }) => { let input_physical = physical_children.pop().expect("requires 1 input"); - let clustering_spec = input_physical.clustering_spec().clone(); Ok(PhysicalPlan::Project(Project::try_new( input_physical, projection.clone(), resource_request.clone(), - clustering_spec, )?) .arced()) } @@ -327,12 +325,10 @@ pub(super) fn translate_single_logical_node( groupby.clone(), )); - let clustering_spec = second_stage_agg.clustering_spec().clone(); PhysicalPlan::Project(Project::try_new( second_stage_agg.into(), final_exprs, Default::default(), - clustering_spec, )?) } }; @@ -404,12 +400,10 @@ pub(super) fn translate_single_logical_node( group_by_with_pivot, )); - let clustering_spec = second_stage_agg.clustering_spec().clone(); PhysicalPlan::Project(Project::try_new( second_stage_agg.into(), final_exprs, Default::default(), - clustering_spec, )?) } }; @@ -445,15 +439,13 @@ pub(super) fn translate_single_logical_node( left_clustering_spec.num_partitions(), right_clustering_spec.num_partitions(), ); - let new_left_hash_clustering_spec = Arc::new(ClusteringSpec::Hash( - HashClusteringConfig::new(num_partitions, left_on.clone()), - )); - let new_right_hash_clustering_spec = Arc::new(ClusteringSpec::Hash( - HashClusteringConfig::new(num_partitions, right_on.clone()), - )); - let is_left_hash_partitioned = left_clustering_spec == new_left_hash_clustering_spec; - let is_right_hash_partitioned = right_clustering_spec == new_right_hash_clustering_spec; + let is_left_hash_partitioned = + matches!(left_clustering_spec.as_ref(), ClusteringSpec::Hash(..)) + && is_partition_compatible(&left_clustering_spec.partition_by(), left_on); + let is_right_hash_partitioned = + matches!(right_clustering_spec.as_ref(), ClusteringSpec::Hash(..)) + && is_partition_compatible(&right_clustering_spec.partition_by(), right_on); // Left-side of join is considered to be sort-partitioned on the join key if it is sort-partitioned on a // sequence of expressions that has the join key as a prefix. @@ -630,9 +622,33 @@ pub(super) fn translate_single_logical_node( .arced()) } JoinStrategy::Hash => { - if (num_partitions > 1 - || left_clustering_spec.num_partitions() != num_partitions) - && !is_left_hash_partitioned + // allow for leniency in partition size to avoid minor repartitions + let num_left_partitions = left_clustering_spec.num_partitions(); + let num_right_partitions = right_clustering_spec.num_partitions(); + + let num_partitions = match ( + is_left_hash_partitioned, + is_right_hash_partitioned, + num_left_partitions, + num_right_partitions, + ) { + (true, true, a, b) | (false, false, a, b) => max(a, b), + (_, _, 1, x) | (_, _, x, 1) => x, + (true, false, a, b) + if (a as f64) >= (b as f64) * cfg.hash_join_partition_size_leniency => + { + a + } + (false, true, a, b) + if (b as f64) >= (a as f64) * cfg.hash_join_partition_size_leniency => + { + b + } + (_, _, a, b) => max(a, b), + }; + + if num_left_partitions != num_partitions + || (num_partitions > 1 && !is_left_hash_partitioned) { let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new( left_physical, @@ -642,9 +658,8 @@ pub(super) fn translate_single_logical_node( left_physical = PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into())).arced(); } - if (num_partitions > 1 - || right_clustering_spec.num_partitions() != num_partitions) - && !is_right_hash_partitioned + if num_right_partitions != num_partitions + || (num_partitions > 1 && !is_right_hash_partitioned) { let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new( right_physical, @@ -933,6 +948,9 @@ mod tests { use crate::physical_plan::PhysicalPlan; use crate::physical_planner::logical_to_physical; use crate::test::{dummy_scan_node, dummy_scan_operator}; + use crate::{LogicalPlanBuilder, PhysicalPlanRef}; + + use super::HashJoin; /// Tests that planner drops a simple Repartition (e.g. df.into_partitions()) the child already has the desired number of partitions. /// @@ -1022,4 +1040,199 @@ mod tests { assert_matches!(physical_plan.as_ref(), PhysicalPlan::Project(_)); Ok(()) } + + #[derive(Debug, Clone, Copy)] + enum RepartitionOptions { + Good(usize), + Bad(usize), + Reversed(usize), + } + + impl RepartitionOptions { + pub fn scale_by(&self, x: usize) -> Self { + match self { + Self::Good(v) => Self::Good(v * x), + Self::Bad(v) => Self::Bad(v * x), + Self::Reversed(v) => Self::Reversed(v * x), + } + } + } + + fn force_repartition( + node: LogicalPlanBuilder, + opts: RepartitionOptions, + ) -> DaftResult { + match opts { + RepartitionOptions::Good(x) => node.hash_repartition(Some(x), vec![col("a"), col("b")]), + RepartitionOptions::Bad(x) => node.hash_repartition(Some(x), vec![col("a"), col("c")]), + RepartitionOptions::Reversed(x) => { + node.hash_repartition(Some(x), vec![col("b"), col("a")]) + } + } + } + + /// Helper function to get plan for join repartition tests. + fn get_hash_join_plan( + cfg: Arc, + left_partitions: RepartitionOptions, + right_partitions: RepartitionOptions, + ) -> DaftResult> { + let join_node = dummy_scan_node(dummy_scan_operator(vec![ + Field::new("a", DataType::Int64), + Field::new("b", DataType::Int64), + Field::new("c", DataType::Int64), + ])); + let join_node = force_repartition(join_node, right_partitions)?.select(vec![ + col("a"), + col("b"), + col("c").alias("dataR"), + ])?; + + let logical_plan = dummy_scan_node(dummy_scan_operator(vec![ + Field::new("a", DataType::Int64), + Field::new("b", DataType::Int64), + Field::new("c", DataType::Int64), + ])); + let logical_plan = force_repartition(logical_plan, left_partitions)? + .select(vec![col("a"), col("b"), col("c").alias("dataL")])? + .join( + &join_node, + vec![col("a"), col("b")], + vec![col("a"), col("b")], + daft_core::JoinType::Inner, + Some(daft_core::JoinStrategy::Hash), + )? + .build(); + logical_to_physical(logical_plan, cfg) + } + + fn check_physical_matches( + plan: PhysicalPlanRef, + left_repartitions: bool, + right_repartitions: bool, + ) -> bool { + match plan.as_ref() { + PhysicalPlan::HashJoin(HashJoin { left, right, .. }) => { + let left_works = match (left.as_ref(), left_repartitions) { + (PhysicalPlan::ReduceMerge(_), true) => true, + (PhysicalPlan::Project(_), false) => true, + _ => false, + }; + let right_works = match (right.as_ref(), right_repartitions) { + (PhysicalPlan::ReduceMerge(_), true) => true, + (PhysicalPlan::Project(_), false) => true, + _ => false, + }; + left_works && right_works + } + _ => false, + } + } + + /// Tests a variety of settings regarding hash join repartitioning. + #[test] + fn repartition_hash_join_tests() -> DaftResult<()> { + use RepartitionOptions::*; + let cases = vec![ + (Good(30), Good(30), false, false), + (Good(30), Good(40), true, false), + (Good(30), Bad(30), false, true), + (Good(30), Bad(60), false, true), + (Good(30), Bad(70), true, true), + (Reversed(30), Good(30), false, false), + (Reversed(30), Good(40), true, false), + (Reversed(30), Bad(30), false, true), + (Reversed(30), Bad(60), false, true), + (Reversed(30), Bad(70), true, true), + (Reversed(30), Reversed(30), false, false), + (Reversed(30), Reversed(40), true, false), + ]; + let cfg: Arc = DaftExecutionConfig::default().into(); + for (l_opts, r_opts, l_exp, r_exp) in cases { + for mult in [1, 10] { + let plan = + get_hash_join_plan(cfg.clone(), l_opts.scale_by(mult), r_opts.scale_by(mult))?; + if !check_physical_matches(plan, l_exp, r_exp) { + panic!( + "Failed hash join test on case ({:?}, {:?}, {}, {}) with mult {}", + l_opts, r_opts, l_exp, r_exp, mult + ); + } + + // reversed direction + let plan = + get_hash_join_plan(cfg.clone(), r_opts.scale_by(mult), l_opts.scale_by(mult))?; + if !check_physical_matches(plan, r_exp, l_exp) { + panic!( + "Failed hash join test on case ({:?}, {:?}, {}, {}) with mult {}", + r_opts, l_opts, r_exp, l_exp, mult + ); + } + } + } + Ok(()) + } + + /// Tests configuration option for hash join repartition leniency. + #[test] + fn repartition_dropped_hash_join_leniency() -> DaftResult<()> { + let mut cfg = DaftExecutionConfig::default(); + cfg.hash_join_partition_size_leniency = 0.8; + let cfg = Arc::new(cfg); + + let physical_plan = get_hash_join_plan( + cfg.clone(), + RepartitionOptions::Good(20), + RepartitionOptions::Bad(40), + )?; + assert!(check_physical_matches(physical_plan, true, true)); + + let physical_plan = get_hash_join_plan( + cfg.clone(), + RepartitionOptions::Good(20), + RepartitionOptions::Bad(25), + )?; + assert!(check_physical_matches(physical_plan, false, true)); + + let physical_plan = get_hash_join_plan( + cfg.clone(), + RepartitionOptions::Good(20), + RepartitionOptions::Bad(26), + )?; + assert!(check_physical_matches(physical_plan, true, true)); + Ok(()) + } + + /// Tests that single partitions don't repartition. + #[test] + fn hash_join_single_partition_tests() -> DaftResult<()> { + use RepartitionOptions::*; + let cases = vec![ + (Good(1), Good(1), false, false), + (Good(1), Bad(1), false, false), + (Good(1), Reversed(1), false, false), + (Bad(1), Bad(1), false, false), + (Bad(1), Reversed(1), false, false), + ]; + let cfg: Arc = DaftExecutionConfig::default().into(); + for (l_opts, r_opts, l_exp, r_exp) in cases { + let plan = get_hash_join_plan(cfg.clone(), l_opts, r_opts)?; + if !check_physical_matches(plan, l_exp, r_exp) { + panic!( + "Failed single partition hash join test on case ({:?}, {:?}, {}, {})", + l_opts, r_opts, l_exp, r_exp + ); + } + + // reversed direction + let plan = get_hash_join_plan(cfg.clone(), r_opts, l_opts)?; + if !check_physical_matches(plan, r_exp, l_exp) { + panic!( + "Failed single partition hash join test on case ({:?}, {:?}, {}, {})", + r_opts, l_opts, r_exp, l_exp + ); + } + } + Ok(()) + } }