Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF] Add physical plan optimizer and optimization #2557

Merged
merged 13 commits into from
Jul 31, 2024
4 changes: 4 additions & 0 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ def set_execution_config(
broadcast_join_size_bytes_threshold: int | None = None,
parquet_split_row_groups_max_files: int | None = None,
sort_merge_join_sort_with_aligned_boundaries: bool | None = None,
hash_join_partition_size_leniency: bool | None = None,
sample_size_for_sort: int | None = None,
num_preview_rows: int | None = None,
parquet_target_filesize: int | None = None,
Expand Down Expand Up @@ -305,6 +306,9 @@ def set_execution_config(
sort_merge_join_sort_with_aligned_boundaries: Whether to use a specialized algorithm for sorting both sides of a
sort-merge join such that they have aligned boundaries. This can lead to a faster merge-join at the cost of
more skewed sorted join inputs, increasing the risk of OOMs.
hash_join_partition_size_leniency: If the left side of a hash join is already correctly partitioned and the right side isn't,
and the ratio between the left and right size is at least this value, then the right side is repartitioned to have an equal
number of partitions as the left. Defaults to 0.5.
sample_size_for_sort: number of elements to sample from each partition when running sort,
Default is 20.
num_preview_rows: number of rows to when showing a dataframe preview,
Expand Down
3 changes: 3 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1674,6 +1674,7 @@ class PyDaftExecutionConfig:
broadcast_join_size_bytes_threshold: int | None = None,
parquet_split_row_groups_max_files: int | None = None,
sort_merge_join_sort_with_aligned_boundaries: bool | None = None,
hash_join_partition_size_leniency: float | None = None,
sample_size_for_sort: int | None = None,
num_preview_rows: int | None = None,
parquet_target_filesize: int | None = None,
Expand All @@ -1695,6 +1696,8 @@ class PyDaftExecutionConfig:
@property
def sort_merge_join_sort_with_aligned_boundaries(self) -> bool: ...
@property
def hash_join_partition_size_leniency(self) -> float: ...
@property
def sample_size_for_sort(self) -> int: ...
@property
def num_preview_rows(self) -> int: ...
Expand Down
2 changes: 2 additions & 0 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct DaftExecutionConfig {
pub scan_tasks_max_size_bytes: usize,
pub broadcast_join_size_bytes_threshold: usize,
pub sort_merge_join_sort_with_aligned_boundaries: bool,
pub hash_join_partition_size_leniency: f64,
pub sample_size_for_sort: usize,
pub parquet_split_row_groups_max_files: usize,
pub num_preview_rows: usize,
Expand All @@ -51,6 +52,7 @@ impl Default for DaftExecutionConfig {
scan_tasks_max_size_bytes: 384 * 1024 * 1024, // 384MB
broadcast_join_size_bytes_threshold: 10 * 1024 * 1024, // 10 MiB
sort_merge_join_sort_with_aligned_boundaries: false,
hash_join_partition_size_leniency: 0.5,
sample_size_for_sort: 20,
parquet_split_row_groups_max_files: 10,
num_preview_rows: 8,
Expand Down
9 changes: 9 additions & 0 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl PyDaftExecutionConfig {
broadcast_join_size_bytes_threshold: Option<usize>,
parquet_split_row_groups_max_files: Option<usize>,
sort_merge_join_sort_with_aligned_boundaries: Option<bool>,
hash_join_partition_size_leniency: Option<f64>,
sample_size_for_sort: Option<usize>,
num_preview_rows: Option<usize>,
parquet_target_filesize: Option<usize>,
Expand Down Expand Up @@ -122,6 +123,9 @@ impl PyDaftExecutionConfig {
config.sort_merge_join_sort_with_aligned_boundaries =
sort_merge_join_sort_with_aligned_boundaries;
}
if let Some(hash_join_partition_size_leniency) = hash_join_partition_size_leniency {
config.hash_join_partition_size_leniency = hash_join_partition_size_leniency;
}
if let Some(sample_size_for_sort) = sample_size_for_sort {
config.sample_size_for_sort = sample_size_for_sort;
}
Expand Down Expand Up @@ -183,6 +187,11 @@ impl PyDaftExecutionConfig {
Ok(self.config.sort_merge_join_sort_with_aligned_boundaries)
}

#[getter]
fn get_hash_join_partition_size_leniency(&self) -> PyResult<f64> {
Ok(self.config.hash_join_partition_size_leniency)
}

#[getter]
fn get_sample_size_for_sort(&self) -> PyResult<usize> {
Ok(self.config.sample_size_for_sort)
Expand Down
9 changes: 9 additions & 0 deletions src/daft-dsl/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use daft_core::{
schema::Schema,
utils::supertype::try_get_supertype,
};
use itertools::Itertools;

use crate::{
functions::{
Expand Down Expand Up @@ -1253,6 +1254,14 @@ pub fn resolve_aggexprs(
itertools::process_results(resolved_iter, |res| res.unzip())
}

// Check if one set of columns is a reordering of the other
pub fn is_partition_compatible(a: &[ExprRef], b: &[ExprRef]) -> bool {
// sort a and b by name
let a: Vec<&str> = a.iter().map(|a| a.name()).sorted().collect();
let b: Vec<&str> = b.iter().map(|a| a.name()).sorted().collect();
a == b
}

#[cfg(test)]
mod tests {

Expand Down
4 changes: 3 additions & 1 deletion src/daft-dsl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ mod treenode;
pub use common_treenode;
pub use expr::binary_op;
pub use expr::col;
pub use expr::{resolve_aggexpr, resolve_aggexprs, resolve_expr, resolve_exprs};
pub use expr::{
is_partition_compatible, resolve_aggexpr, resolve_aggexprs, resolve_expr, resolve_exprs,
};
pub use expr::{AggExpr, ApproxPercentileParams, Expr, ExprRef, Operator};
pub use lit::{lit, null_lit, Literal, LiteralValue};
#[cfg(feature = "python")]
Expand Down
1 change: 1 addition & 0 deletions src/daft-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod logical_optimization;
mod logical_plan;
mod partitioning;
pub mod physical_ops;
mod physical_optimization;
mod physical_plan;
mod physical_planner;
mod resource_request;
Expand Down
17 changes: 16 additions & 1 deletion src/daft-plan/src/physical_ops/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,28 @@ pub struct Project {
}

impl Project {
// uses input to create output clustering spec
pub(crate) fn try_new(
input: PhysicalPlanRef,
projection: Vec<ExprRef>,
resource_request: ResourceRequest,
) -> DaftResult<Self> {
let clustering_spec = translate_clustering_spec(input.clustering_spec(), &projection);
Ok(Self {
input,
projection,
resource_request,
clustering_spec,
})
}

// does not re-create clustering spec, unlike try_new
pub(crate) fn new_with_clustering_spec(
input: PhysicalPlanRef,
projection: Vec<ExprRef>,
resource_request: ResourceRequest,
clustering_spec: Arc<ClusteringSpec>,
) -> DaftResult<Self> {
let clustering_spec = translate_clustering_spec(clustering_spec, &projection);
Ok(Self {
input,
projection,
Expand Down
3 changes: 3 additions & 0 deletions src/daft-plan/src/physical_optimization/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod optimizer;
mod plan_context;
mod rules;
Loading
Loading