Skip to content

Commit

Permalink
TreeNode Refactor Part 2 (#8653)
Browse files Browse the repository at this point in the history
* Refactor TreeNode's

* Update utils.rs

* Final review

* Remove unnecessary clones, more idiomatic Rust

---------

Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
berkaysynnada and ozankabak authored Dec 27, 2023
1 parent 28ca6d1 commit 6403222
Show file tree
Hide file tree
Showing 9 changed files with 872 additions and 1,014 deletions.
767 changes: 339 additions & 428 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs

Large diffs are not rendered by default.

554 changes: 284 additions & 270 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions datafusion/core/src/physical_optimizer/output_requirements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ impl ExecutionPlan for OutputRequirementExec {
self.input.output_ordering()
}

fn maintains_input_order(&self) -> Vec<bool> {
vec![true]
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}
Expand Down
32 changes: 13 additions & 19 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ use std::sync::Arc;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::joins::SymmetricHashJoinExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};

use datafusion_common::config::OptimizerOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported};
use datafusion_physical_plan::joins::SymmetricHashJoinExec;

/// The PipelineChecker rule rejects non-runnable query plans that use
/// pipeline-breaking operators on infinite input(s).
Expand Down Expand Up @@ -70,14 +70,14 @@ impl PhysicalOptimizerRule for PipelineChecker {
pub struct PipelineStatePropagator {
pub(crate) plan: Arc<dyn ExecutionPlan>,
pub(crate) unbounded: bool,
pub(crate) children: Vec<PipelineStatePropagator>,
pub(crate) children: Vec<Self>,
}

impl PipelineStatePropagator {
/// Constructs a new, default pipelining state.
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
let children = plan.children();
PipelineStatePropagator {
Self {
plan,
unbounded: false,
children: children.into_iter().map(Self::new).collect(),
Expand All @@ -86,10 +86,7 @@ impl PipelineStatePropagator {

/// Returns the children unboundedness information.
pub fn children_unbounded(&self) -> Vec<bool> {
self.children
.iter()
.map(|c| c.unbounded)
.collect::<Vec<_>>()
self.children.iter().map(|c| c.unbounded).collect()
}
}

Expand All @@ -109,26 +106,23 @@ impl TreeNode for PipelineStatePropagator {
Ok(VisitRecursion::Continue)
}

fn map_children<F>(self, transform: F) -> Result<Self>
fn map_children<F>(mut self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
{
if !self.children.is_empty() {
let new_children = self
self.children = self
.children
.into_iter()
.map(transform)
.collect::<Result<Vec<_>>>()?;
let children_plans = new_children.iter().map(|c| c.plan.clone()).collect();

Ok(PipelineStatePropagator {
plan: with_new_children_if_necessary(self.plan, children_plans)?.into(),
unbounded: self.unbounded,
children: new_children,
})
} else {
Ok(self)
.collect::<Result<_>>()?;
self.plan = with_new_children_if_necessary(
self.plan,
self.children.iter().map(|c| c.plan.clone()).collect(),
)?
.into();
}
Ok(self)
}
}

Expand Down
Loading

0 comments on commit 6403222

Please sign in to comment.