Skip to content

Commit

Permalink
Add implementation of eliminate_one_union
Browse files Browse the repository at this point in the history
  • Loading branch information
Evgeny Maruschenko committed Sep 29, 2023
1 parent 2bc53fc commit 7259e9f
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 20 deletions.
125 changes: 125 additions & 0 deletions datafusion/optimizer/src/eliminate_one_union.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Optimizer rule to eliminate one union.
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::logical_plan::{LogicalPlan, Union};

use crate::optimizer::ApplyOrder;

#[derive(Default)]
/// An optimization rule that eliminates union with one element.
pub struct EliminateOneUnion;

impl EliminateOneUnion {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}

impl OptimizerRule for EliminateOneUnion {
fn try_optimize(
&self,
plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Union(union) if union.inputs.len() == 1 => {
let Union { inputs, schema: _ } = union;

Ok(inputs.first().map(|input| input.as_ref().clone()))
}
_ => Ok(None),
}
}

fn name(&self) -> &str {
"eliminate_one_union"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::eliminate_filter::EliminateFilter;
use crate::propagate_empty_relation::PropagateEmptyRelation;
use crate::test::*;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::ScalarValue;
use datafusion_expr::{logical_plan::table_scan, Expr};
use std::sync::Arc;

fn schema() -> Schema {
Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Int32, false),
])
}

fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> {
assert_optimized_plan_eq_with_rules(
vec![
Arc::new(EliminateFilter::new()),
Arc::new(PropagateEmptyRelation::new()),
Arc::new(EliminateOneUnion::new()),
],
plan,
expected,
)
}

#[test]
fn eliminate_nothing() -> Result<()> {
let plan_builder = table_scan(Some("table"), &schema(), None)?;

let plan = plan_builder
.clone()
.union(plan_builder.clone().build()?)?
.build()?;

let expected = "\
Union\
\n TableScan: table\
\n TableScan: table";
assert_optimized_plan_equal(&plan, expected)
}

#[test]
fn eliminate_nested_union() -> Result<()> {
let plan_builder = table_scan(Some("table"), &schema(), None)?;

let plan = plan_builder
.clone()
.union(
plan_builder
.clone()
.filter(Expr::Literal(ScalarValue::Boolean(Some(false))))?
.build()?,
)?
.build()?;

let expected = "TableScan: table";
assert_optimized_plan_equal(&plan, expected)
}
}
1 change: 1 addition & 0 deletions datafusion/optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub mod eliminate_filter;
pub mod eliminate_join;
pub mod eliminate_limit;
pub mod eliminate_nested_union;
pub mod eliminate_one_union;
pub mod eliminate_outer_join;
pub mod eliminate_project;
pub mod extract_equijoin_predicate;
Expand Down
2 changes: 2 additions & 0 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::eliminate_filter::EliminateFilter;
use crate::eliminate_join::EliminateJoin;
use crate::eliminate_limit::EliminateLimit;
use crate::eliminate_nested_union::EliminateNestedUnion;
use crate::eliminate_one_union::EliminateOneUnion;
use crate::eliminate_outer_join::EliminateOuterJoin;
use crate::eliminate_project::EliminateProjection;
use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
Expand Down Expand Up @@ -243,6 +244,7 @@ impl Optimizer {
Arc::new(PropagateEmptyRelation::new()),
Arc::new(FilterNullJoinKeys::default()),
Arc::new(EliminateOuterJoin::new()),
Arc::new(EliminateOneUnion::new()),
// Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit
Arc::new(PushDownLimit::new()),
Arc::new(PushDownFilter::new()),
Expand Down
31 changes: 11 additions & 20 deletions datafusion/optimizer/src/propagate_empty_relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,10 @@ fn empty_child(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
mod tests {
use crate::eliminate_filter::EliminateFilter;
use crate::eliminate_nested_union::EliminateNestedUnion;
use crate::optimizer::Optimizer;
use crate::test::{
assert_optimized_plan_eq, test_table_scan, test_table_scan_fields,
test_table_scan_with_name,
assert_optimized_plan_eq, assert_optimized_plan_eq_with_rules, test_table_scan,
test_table_scan_fields, test_table_scan_with_name,
};
use crate::OptimizerContext;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::{Column, DFField, DFSchema, ScalarValue};
use datafusion_expr::logical_plan::table_scan;
Expand All @@ -207,22 +205,15 @@ mod tests {
plan: &LogicalPlan,
expected: &str,
) -> Result<()> {
fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
let optimizer = Optimizer::with_rules(vec![
Arc::new(EliminateFilter::new()),
Arc::new(EliminateNestedUnion::new()),
Arc::new(PropagateEmptyRelation::new()),
]);
let config = &mut OptimizerContext::new()
.with_max_passes(1)
.with_skip_failing_rules(false);
let optimized_plan = optimizer
.optimize(plan, config, observe)
.expect("failed to optimize plan");
let formatted_plan = format!("{optimized_plan:?}");
assert_eq!(formatted_plan, expected);
assert_eq!(plan.schema(), optimized_plan.schema());
Ok(())
assert_optimized_plan_eq_with_rules(
vec![
Arc::new(EliminateFilter::new()),
Arc::new(EliminateNestedUnion::new()),
Arc::new(PropagateEmptyRelation::new()),
],
&plan,
expected,
)
}

#[test]
Expand Down
19 changes: 19 additions & 0 deletions datafusion/optimizer/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,25 @@ pub fn assert_optimized_plan_eq(
Ok(())
}

pub fn assert_optimized_plan_eq_with_rules(
rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
plan: &LogicalPlan,
expected: &str,
) -> Result<()> {
fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
let config = &mut OptimizerContext::new()
.with_max_passes(1)
.with_skip_failing_rules(false);
let optimizer = Optimizer::with_rules(rules);
let optimized_plan = optimizer
.optimize(plan, config, observe)
.expect("failed to optimize plan");
let formatted_plan = format!("{optimized_plan:?}");
assert_eq!(formatted_plan, expected);
assert_eq!(plan.schema(), optimized_plan.schema());
Ok(())
}

pub fn assert_optimized_plan_eq_display_indent(
rule: Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,
Expand Down

0 comments on commit 7259e9f

Please sign in to comment.