Skip to content

Commit

Permalink
[BUG] [Query Planner] Properly track ascending/descending sort order …
Browse files Browse the repository at this point in the history
…for range partitioning and sorting. (#1862)

This PR ensures that we properly track ascending/descending sort order
for range partitioning and sorting ops, to ensure that downstream
operations correctly interpret the partition spec.

For example, before this PR, the planner would always assume that if
both sides of the join are sorted, then they're sorted in ascending
order and could therefore be efficiently joined with our sort-merge join
(which currently only does ascending-order sorts/merges); this would
lead to incorrect results if either side was sorted in descending order.

Closes #1829 

## Follow-ups (future PRs)

- Refactor sort-merge join to work for both ascending and descending
order; this can be done when porting the `Partitioning` abstraction to
Rust to enable better dtype support.
  • Loading branch information
clarkzinzow authored Feb 14, 2024
1 parent 74591c1 commit e47eeda
Show file tree
Hide file tree
Showing 14 changed files with 595 additions and 187 deletions.
32 changes: 29 additions & 3 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,40 @@ class PartitionSpec:
scheme: PartitionScheme
num_partitions: int
by: list[PyExpr]
scheme_config: RangeConfig | HashConfig | RandomConfig | UnknownConfig

def __init__(
self, scheme: PartitionScheme = PartitionScheme.Unknown, num_partitions: int = 0, by: list[PyExpr] | None = None
): ...
@staticmethod
def range(by: list[PyExpr], num_partitions: int, descending: list[bool]) -> PartitionSpec: ...
@staticmethod
def hash(by: list[PyExpr], num_partitions: int) -> PartitionSpec: ...
@staticmethod
def random(num_partitions: int) -> PartitionSpec: ...
@staticmethod
def unknown(num_partitions: int) -> PartitionSpec: ...
def __eq__(self, other: PartitionSpec) -> bool: ... # type: ignore[override]
def __ne__(self, other: PartitionSpec) -> bool: ... # type: ignore[override]
def __str__(self) -> str: ...

class RangeConfig:
"""Configuration of a range partitioning."""

descending: list[bool]

class HashConfig:
"""Configuration of a hash partitioning."""

...

class RandomConfig:
"""Configuration of a random partitioning."""

...

class UnknownConfig:
"""Configuration of an unknown partitioning."""

...

class ResourceRequest:
"""
Resource request for a query fragment task.
Expand Down
38 changes: 35 additions & 3 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
logical_ops,
logical_plan::LogicalPlan,
optimization::Optimizer,
partitioning::PartitionSchemeConfig,
planner::plan,
sink_info::{OutputFileInfo, SinkInfo},
source_info::{
Expand Down Expand Up @@ -162,13 +163,13 @@ impl LogicalPlanBuilder {
&self,
num_partitions: Option<usize>,
partition_by: Vec<Expr>,
scheme: PartitionScheme,
scheme_config: PartitionSchemeConfig,
) -> DaftResult<Self> {
let logical_plan: LogicalPlan = logical_ops::Repartition::try_new(
self.plan.clone(),
num_partitions,
partition_by,
scheme,
scheme_config,
)?
.into();
Ok(logical_plan.into())
Expand Down Expand Up @@ -380,17 +381,48 @@ impl PyLogicalPlanBuilder {

pub fn repartition(
&self,
py: Python<'_>,
partition_by: Vec<PyExpr>,
scheme: PartitionScheme,
num_partitions: Option<usize>,
scheme_config: Option<PyObject>,
) -> PyResult<Self> {
let partition_by_exprs: Vec<Expr> = partition_by
.iter()
.map(|expr| expr.clone().into())
.collect();
let partition_scheme_config = match scheme {
PartitionScheme::Range => {
if let Some(scheme_config) = scheme_config {
PartitionSchemeConfig::Range(scheme_config.extract(py)?)
} else {
return Err(DaftError::ValueError(
"Must provide a scheme config with ascending/descending list if repartitioning by range.".to_string(),
).into());
}
}
PartitionScheme::Hash => PartitionSchemeConfig::Hash(
scheme_config
.map(|c| c.extract(py))
.transpose()?
.unwrap_or_default(),
),
PartitionScheme::Random => PartitionSchemeConfig::Random(
scheme_config
.map(|c| c.extract(py))
.transpose()?
.unwrap_or_default(),
),
PartitionScheme::Unknown => PartitionSchemeConfig::Unknown(
scheme_config
.map(|c| c.extract(py))
.transpose()?
.unwrap_or_default(),
),
};
Ok(self
.builder
.repartition(num_partitions, partition_by_exprs, scheme)?
.repartition(num_partitions, partition_by_exprs, partition_scheme_config)?
.into())
}

Expand Down
6 changes: 4 additions & 2 deletions src/daft-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ use daft_scan::{
};
pub use join::{JoinStrategy, JoinType};
pub use logical_plan::LogicalPlan;
pub use partitioning::{PartitionScheme, PartitionSpec};
pub use partitioning::{PartitionScheme, PartitionSchemeConfig, PartitionSpec};
pub use physical_plan::PhysicalPlanScheduler;
pub use resource_request::ResourceRequest;
pub use source_info::{FileInfo, FileInfos};

#[cfg(feature = "python")]
use daft_scan::storage_config::PythonStorageConfig;
#[cfg(feature = "python")]
use partitioning::PyPartitionSpec;
#[cfg(feature = "python")]
use pyo3::prelude::*;

#[cfg(feature = "python")]
Expand All @@ -46,7 +48,7 @@ pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> {
parent.add_class::<ParquetSourceConfig>()?;
parent.add_class::<JsonSourceConfig>()?;
parent.add_class::<CsvSourceConfig>()?;
parent.add_class::<PartitionSpec>()?;
parent.add_class::<PyPartitionSpec>()?;
parent.add_class::<PartitionScheme>()?;
parent.add_class::<JoinType>()?;
parent.add_class::<JoinStrategy>()?;
Expand Down
21 changes: 15 additions & 6 deletions src/daft-plan/src/logical_ops/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,25 @@ use common_error::{DaftError, DaftResult};
use daft_dsl::Expr;
use itertools::Itertools;

use crate::{LogicalPlan, PartitionScheme};
use crate::{partitioning::PartitionSchemeConfig, LogicalPlan};

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Repartition {
// Upstream node.
pub input: Arc<LogicalPlan>,
pub num_partitions: Option<usize>,
pub partition_by: Vec<Expr>,
pub scheme: PartitionScheme,
pub scheme_config: PartitionSchemeConfig,
}

impl Repartition {
pub(crate) fn try_new(
input: Arc<LogicalPlan>,
num_partitions: Option<usize>,
partition_by: Vec<Expr>,
scheme: PartitionScheme,
scheme_config: PartitionSchemeConfig,
) -> DaftResult<Self> {
if matches!(scheme, PartitionScheme::Range) {
if matches!(scheme_config, PartitionSchemeConfig::Range(_)) {
return Err(DaftError::ValueError(
"Repartitioning with the Range partition scheme is not supported.".to_string(),
));
Expand All @@ -31,13 +31,22 @@ impl Repartition {
input,
num_partitions,
partition_by,
scheme,
scheme_config,
})
}

pub fn multiline_display(&self) -> Vec<String> {
let mut res = vec![];
res.push(format!("Repartition: Scheme = {:?}", self.scheme));
let scheme_config = self.scheme_config.multiline_display();
res.push(format!(
"Repartition: Scheme = {}{}",
self.scheme_config.var_name(),
if scheme_config.is_empty() {
"".to_string()
} else {
format!("({})", scheme_config.join(", "))
}
));
res.push(format!(
"Number of partitions = {}",
self.num_partitions
Expand Down
2 changes: 1 addition & 1 deletion src/daft-plan/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl LogicalPlan {
Self::Limit(Limit { limit, eager, .. }) => Self::Limit(Limit::new(input.clone(), *limit, *eager)),
Self::Explode(Explode { to_explode, .. }) => Self::Explode(Explode::try_new(input.clone(), to_explode.clone()).unwrap()),
Self::Sort(Sort { sort_by, descending, .. }) => Self::Sort(Sort::try_new(input.clone(), sort_by.clone(), descending.clone()).unwrap()),
Self::Repartition(Repartition { num_partitions, partition_by, scheme, .. }) => Self::Repartition(Repartition::try_new(input.clone(), *num_partitions, partition_by.clone(), scheme.clone()).unwrap()),
Self::Repartition(Repartition { num_partitions, partition_by, scheme_config: scheme, .. }) => Self::Repartition(Repartition::try_new(input.clone(), *num_partitions, partition_by.clone(), scheme.clone()).unwrap()),
Self::Distinct(_) => Self::Distinct(Distinct::new(input.clone())),
Self::Aggregate(Aggregate { aggregations, groupby, ..}) => Self::Aggregate(Aggregate::try_new(input.clone(), aggregations.clone(), groupby.clone()).unwrap()),
Self::Sink(Sink { sink_info, .. }) => Self::Sink(Sink::try_new(input.clone(), sink_info.clone()).unwrap()),
Expand Down
14 changes: 11 additions & 3 deletions src/daft-plan/src/optimization/rules/drop_repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ mod tests {
Optimizer,
},
test::dummy_scan_node,
LogicalPlan, PartitionScheme,
LogicalPlan, PartitionSchemeConfig,
};

/// Helper that creates an optimizer with the DropRepartition rule registered, optimizes
Expand Down Expand Up @@ -94,8 +94,16 @@ mod tests {
Field::new("a", DataType::Int64),
Field::new("b", DataType::Utf8),
])
.repartition(Some(10), vec![col("a")], PartitionScheme::Hash)?
.repartition(Some(5), vec![col("a")], PartitionScheme::Hash)?
.repartition(
Some(10),
vec![col("a")],
PartitionSchemeConfig::Hash(Default::default()),
)?
.repartition(
Some(5),
vec![col("a")],
PartitionSchemeConfig::Hash(Default::default()),
)?
.build();
let expected = "\
Repartition: Scheme = Hash, Number of partitions = 5, Partition by = col(a)\
Expand Down
8 changes: 6 additions & 2 deletions src/daft-plan/src/optimization/rules/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ mod tests {
Optimizer,
},
test::{dummy_scan_node, dummy_scan_operator_node},
JoinType, LogicalPlan, PartitionScheme,
JoinType, LogicalPlan, PartitionSchemeConfig,
};

/// Helper that creates an optimizer with the PushDownFilter rule registered, optimizes
Expand Down Expand Up @@ -492,7 +492,11 @@ mod tests {
Field::new("a", DataType::Int64),
Field::new("b", DataType::Utf8),
])
.repartition(Some(1), vec![col("a")], PartitionScheme::Hash)?
.repartition(
Some(1),
vec![col("a")],
PartitionSchemeConfig::Hash(Default::default()),
)?
.filter(col("a").lt(&lit(2)))?
.build();
let expected = "\
Expand Down
8 changes: 6 additions & 2 deletions src/daft-plan/src/optimization/rules/push_down_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ mod tests {
dummy_scan_node, dummy_scan_node_with_pushdowns,
dummy_scan_operator_node_with_pushdowns,
},
LogicalPlan, PartitionScheme,
LogicalPlan, PartitionSchemeConfig,
};

/// Helper that creates an optimizer with the PushDownLimit rule registered, optimizes
Expand Down Expand Up @@ -315,7 +315,11 @@ mod tests {
Field::new("a", DataType::Int64),
Field::new("b", DataType::Utf8),
])
.repartition(Some(1), vec![col("a")], PartitionScheme::Hash)?
.repartition(
Some(1),
vec![col("a")],
PartitionSchemeConfig::Hash(Default::default()),
)?
.limit(5, false)?
.build();
let expected = "\
Expand Down
Loading

0 comments on commit e47eeda

Please sign in to comment.