Skip to content

Commit

Permalink
Propagate feature flag to optimizer
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Aug 19, 2024
1 parent d0bc513 commit 3bd7a41
Show file tree
Hide file tree
Showing 14 changed files with 103 additions and 62 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,7 @@ def minhash(
ngram_size: int,
seed: int = 1,
) -> PyExpr: ...
def sql(sql: str, catalog: PyCatalog) -> LogicalPlanBuilder: ...
def sql(sql: str, catalog: PyCatalog, daft_planning_config: PyDaftPlanningConfig) -> LogicalPlanBuilder: ...
def sql_expr(sql: str) -> PyExpr: ...
def utf8_count_matches(expr: PyExpr, patterns: PyExpr, whole_words: bool, case_sensitive: bool) -> PyExpr: ...
def list_sort(expr: PyExpr, desc: PyExpr) -> PyExpr: ...
Expand Down
5 changes: 4 additions & 1 deletion daft/sql/sql.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# isort: dont-add-import: from __future__ import annotations

from daft.api_annotations import PublicAPI
from daft.context import get_context
from daft.daft import PyCatalog as _PyCatalog
from daft.daft import sql as _sql
from daft.daft import sql_expr as _sql_expr
Expand Down Expand Up @@ -45,6 +46,8 @@ def sql(sql: str, catalog: SQLCatalog) -> DataFrame:
Returns:
DataFrame: Dataframe containing the results of the query
"""
planning_config = get_context().daft_planning_config

_py_catalog = catalog._catalog
_py_logical = _sql(sql, _py_catalog)
_py_logical = _sql(sql, _py_catalog, planning_config)
return DataFrame(LogicalPlanBuilder(_py_logical))
5 changes: 4 additions & 1 deletion src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub const BOLD_TABLE_HEADERS_IN_DISPLAY: &str = "DAFT_BOLD_TABLE_HEADERS";
/// 1. Creation of a Dataframe including any file listing and schema inference that needs to happen. Note
/// that this does not include the actual scan, which is taken care of by the DaftExecutionConfig.
/// 2. Building of logical plan nodes
#[derive(Clone, Serialize, Deserialize, Default)]
#[derive(Clone, Serialize, Deserialize, Default, Debug)]
pub struct DaftPlanningConfig {
pub default_io_config: IOConfig,
pub enable_actor_pool_projections: bool,
Expand Down Expand Up @@ -109,6 +109,9 @@ mod python;
#[cfg(feature = "python")]
pub use python::PyDaftExecutionConfig;

#[cfg(feature = "python")]
pub use python::PyDaftPlanningConfig;

#[cfg(feature = "python")]
use pyo3::prelude::*;

Expand Down
102 changes: 60 additions & 42 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use crate::{
logical_ops,
logical_optimization::Optimizer,
logical_optimization::{Optimizer, OptimizerConfig},
logical_plan::LogicalPlan,
partitioning::{
HashRepartitionConfig, IntoPartitionsConfig, RandomShuffleConfig, RepartitionSpec,
Expand All @@ -14,6 +14,7 @@ use crate::{
source_info::SourceInfo,
LogicalPlanRef,
};
use common_daft_config::DaftPlanningConfig;
use common_display::DisplayFormat;
use common_error::DaftResult;
use common_io_config::IOConfig;
Expand Down Expand Up @@ -43,36 +44,47 @@ use {
pub struct LogicalPlanBuilder {
// The current root of the logical plan in this builder.
pub plan: Arc<LogicalPlan>,
config: Option<Arc<DaftPlanningConfig>>,
}

impl LogicalPlanBuilder {
pub fn new(plan: Arc<LogicalPlan>) -> Self {
Self { plan }
pub fn new(plan: Arc<LogicalPlan>, config: Option<Arc<DaftPlanningConfig>>) -> Self {
Self { plan, config }
}
}

impl From<LogicalPlan> for LogicalPlanBuilder {
fn from(plan: LogicalPlan) -> Self {
impl From<&LogicalPlanBuilder> for LogicalPlanBuilder {
fn from(builder: &LogicalPlanBuilder) -> Self {
Self {
plan: Arc::new(plan),
plan: builder.plan.clone(),
config: builder.config.clone(),
}
}
}

impl From<LogicalPlanRef> for LogicalPlanBuilder {
fn from(plan: LogicalPlanRef) -> Self {
Self { plan: plan.clone() }
impl From<LogicalPlanBuilder> for LogicalPlanRef {
fn from(value: LogicalPlanBuilder) -> Self {
value.plan
}
}
impl From<&LogicalPlanBuilder> for LogicalPlanBuilder {
fn from(builder: &LogicalPlanBuilder) -> Self {
Self {
plan: builder.plan.clone(),
}

impl From<&LogicalPlanBuilder> for LogicalPlanRef {
fn from(value: &LogicalPlanBuilder) -> Self {
value.plan.clone()
}
}

impl LogicalPlanBuilder {
/// Replace the LogicalPlanBuilder's plan with the provided plan
pub fn with_new_plan<LP: Into<Arc<LogicalPlan>>>(&self, plan: LP) -> Self {
Self::new(plan.into(), self.config.clone())
}

/// Parametrize the LogicalPlanBuilder with a DaftPlanningConfig
pub fn with_config(&self, config: Arc<DaftPlanningConfig>) -> Self {
Self::new(self.plan.clone(), Some(config))
}

#[cfg(feature = "python")]
pub fn in_memory_scan(
partition_key: &str,
Expand All @@ -93,7 +105,7 @@ impl LogicalPlanBuilder {
));
let logical_plan: LogicalPlan =
logical_ops::Source::new(schema.clone(), source_info.into()).into();
Ok(logical_plan.into())
Ok(LogicalPlanBuilder::new(logical_plan.into(), None))
}

pub fn table_scan(
Expand Down Expand Up @@ -127,13 +139,13 @@ impl LogicalPlanBuilder {
};
let logical_plan: LogicalPlan =
logical_ops::Source::new(output_schema, source_info.into()).into();
Ok(logical_plan.into())
Ok(LogicalPlanBuilder::new(logical_plan.into(), None))
}

pub fn select(&self, to_select: Vec<ExprRef>) -> DaftResult<Self> {
let logical_plan: LogicalPlan =
logical_ops::Project::try_new(self.plan.clone(), to_select)?.into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

pub fn with_columns(&self, columns: Vec<ExprRef>) -> DaftResult<Self> {
Expand Down Expand Up @@ -166,7 +178,7 @@ impl LogicalPlanBuilder {

let logical_plan: LogicalPlan =
logical_ops::Project::try_new(self.plan.clone(), exprs)?.into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

pub fn exclude(&self, to_exclude: Vec<String>) -> DaftResult<Self> {
Expand All @@ -187,25 +199,25 @@ impl LogicalPlanBuilder {

let logical_plan: LogicalPlan =
logical_ops::Project::try_new(self.plan.clone(), exprs)?.into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

pub fn filter(&self, predicate: ExprRef) -> DaftResult<Self> {
let logical_plan: LogicalPlan =
logical_ops::Filter::try_new(self.plan.clone(), predicate)?.into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

pub fn limit(&self, limit: i64, eager: bool) -> DaftResult<Self> {
let logical_plan: LogicalPlan =
logical_ops::Limit::new(self.plan.clone(), limit, eager).into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

pub fn explode(&self, to_explode: Vec<ExprRef>) -> DaftResult<Self> {
let logical_plan: LogicalPlan =
logical_ops::Explode::try_new(self.plan.clone(), to_explode)?.into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

pub fn unpivot(
Expand Down Expand Up @@ -243,13 +255,13 @@ impl LogicalPlanBuilder {
value_name,
)?
.into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

pub fn sort(&self, sort_by: Vec<ExprRef>, descending: Vec<bool>) -> DaftResult<Self> {
let logical_plan: LogicalPlan =
logical_ops::Sort::try_new(self.plan.clone(), sort_by, descending)?.into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

pub fn hash_repartition(
Expand All @@ -262,7 +274,7 @@ impl LogicalPlanBuilder {
RepartitionSpec::Hash(HashRepartitionConfig::new(num_partitions, partition_by)),
)?
.into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

pub fn random_shuffle(&self, num_partitions: Option<usize>) -> DaftResult<Self> {
Expand All @@ -271,7 +283,7 @@ impl LogicalPlanBuilder {
RepartitionSpec::Random(RandomShuffleConfig::new(num_partitions)),
)?
.into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

pub fn into_partitions(&self, num_partitions: usize) -> DaftResult<Self> {
Expand All @@ -280,12 +292,12 @@ impl LogicalPlanBuilder {
RepartitionSpec::IntoPartitions(IntoPartitionsConfig::new(num_partitions)),
)?
.into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

pub fn distinct(&self) -> DaftResult<Self> {
let logical_plan: LogicalPlan = logical_ops::Distinct::new(self.plan.clone()).into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

pub fn sample(
Expand All @@ -296,7 +308,7 @@ impl LogicalPlanBuilder {
) -> DaftResult<Self> {
let logical_plan: LogicalPlan =
logical_ops::Sample::new(self.plan.clone(), fraction, with_replacement, seed).into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

pub fn aggregate(
Expand All @@ -306,7 +318,7 @@ impl LogicalPlanBuilder {
) -> DaftResult<Self> {
let logical_plan: LogicalPlan =
logical_ops::Aggregate::try_new(self.plan.clone(), agg_exprs, groupby_exprs)?.into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

pub fn pivot(
Expand All @@ -326,10 +338,10 @@ impl LogicalPlanBuilder {
names,
)?
.into();
Ok(pivot_logical_plan.into())
Ok(self.with_new_plan(pivot_logical_plan))
}

pub fn join<Right: Into<Self>>(
pub fn join<Right: Into<LogicalPlanRef>>(
&self,
right: Right,
left_on: Vec<ExprRef>,
Expand All @@ -339,26 +351,26 @@ impl LogicalPlanBuilder {
) -> DaftResult<Self> {
let logical_plan: LogicalPlan = logical_ops::Join::try_new(
self.plan.clone(),
right.into().plan.clone(),
right.into().clone(),
left_on,
right_on,
join_type,
join_strategy,
)?
.into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

pub fn concat(&self, other: &Self) -> DaftResult<Self> {
let logical_plan: LogicalPlan =
logical_ops::Concat::try_new(self.plan.clone(), other.plan.clone())?.into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

pub fn add_monotonically_increasing_id(&self, column_name: Option<&str>) -> DaftResult<Self> {
let logical_plan: LogicalPlan =
logical_ops::MonotonicallyIncreasingId::new(self.plan.clone(), column_name).into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

pub fn table_write(
Expand All @@ -379,7 +391,7 @@ impl LogicalPlanBuilder {

let logical_plan: LogicalPlan =
logical_ops::Sink::try_new(self.plan.clone(), sink_info.into())?.into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

#[cfg(feature = "python")]
Expand Down Expand Up @@ -408,7 +420,7 @@ impl LogicalPlanBuilder {

let logical_plan: LogicalPlan =
logical_ops::Sink::try_new(self.plan.clone(), sink_info.into())?.into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

#[cfg(feature = "python")]
Expand Down Expand Up @@ -436,7 +448,7 @@ impl LogicalPlanBuilder {

let logical_plan: LogicalPlan =
logical_ops::Sink::try_new(self.plan.clone(), sink_info.into())?.into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

#[cfg(feature = "python")]
Expand All @@ -463,7 +475,7 @@ impl LogicalPlanBuilder {

let logical_plan: LogicalPlan =
logical_ops::Sink::try_new(self.plan.clone(), sink_info.into())?.into();
Ok(logical_plan.into())
Ok(self.with_new_plan(logical_plan))
}

pub fn build(&self) -> Arc<LogicalPlan> {
Expand Down Expand Up @@ -780,7 +792,12 @@ impl PyLogicalPlanBuilder {
/// Optimize the underlying logical plan, returning a new plan builder containing the optimized plan.
pub fn optimize(&self, py: Python) -> PyResult<Self> {
py.allow_threads(|| {
let optimizer = Optimizer::new(Default::default());
// Create optimizer
let default_optimizer_config: OptimizerConfig = Default::default();
let optimizer_config = OptimizerConfig { enable_actor_pool_projections: self.builder.config.as_ref().map(|planning_cfg| planning_cfg.enable_actor_pool_projections).unwrap_or(default_optimizer_config.enable_actor_pool_projections), ..default_optimizer_config };
let optimizer = Optimizer::new(optimizer_config);

// Run LogicalPlan optimizations
let unoptimized_plan = self.builder.build();
let optimized_plan = optimizer.optimize(
unoptimized_plan,
Expand All @@ -803,7 +820,8 @@ impl PyLogicalPlanBuilder {
}
},
)?;
let builder = LogicalPlanBuilder::new(optimized_plan);

let builder = LogicalPlanBuilder::new(optimized_plan, self.builder.config.clone());
Ok(builder.into())
})
}
Expand Down
Loading

0 comments on commit 3bd7a41

Please sign in to comment.