Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/query/sql/src/planner/optimizer/ir/stats/column_stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub struct ColumnStat {
/// Count of null values
pub null_count: u64,

pub num_rows: u64,
pub origin_ndv: f64,

/// Histogram of column
pub histogram: Option<Histogram>,
}
Expand Down
4 changes: 4 additions & 0 deletions src/query/sql/src/planner/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ pub async fn optimize_query(opt_ctx: Arc<OptimizerContext>, s_expr: SExpr) -> Re
]))
// 10. Apply DPhyp algorithm for cost-based join reordering
.add(DPhpyOptimizer::new(opt_ctx.clone()))
.add(RecursiveRuleOptimizer::new(
opt_ctx.clone(),
[RuleID::PushDownAntiJoin].as_slice(),
))
// 11. After join reorder, Convert some single join to inner join.
.add(SingleToInnerOptimizer::new())
// 12. Deduplicate join conditions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,18 @@ impl<'a> InferFilterOptimizer<'a> {
arguments: vec![
self.exprs[equal_indexes[i]].clone(),
self.exprs[equal_indexes[j]].clone(),
// ScalarExpr::FunctionCall(FunctionCall {
// span: None,
// func_name: "infer_predicate".to_string(),
// params: vec![],
// arguments: vec![self.exprs[equal_indexes[i]].clone()],
// }),
// ScalarExpr::FunctionCall(FunctionCall {
// span: None,
// func_name: "infer_predicate".to_string(),
// params: vec![],
// arguments: vec![self.exprs[equal_indexes[j]].clone()],
// }),
],
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use crate::optimizer::optimizers::rule::RulePushDownRankLimitAggregate;
use crate::optimizer::optimizers::rule::RulePushDownSortEvalScalar;
use crate::optimizer::optimizers::rule::RulePushDownSortFilterScan;
use crate::optimizer::optimizers::rule::RulePushDownSortScan;
use crate::optimizer::optimizers::rule::RulePushdownAntiJoin;
use crate::optimizer::optimizers::rule::RuleSemiToInnerJoin;
use crate::optimizer::optimizers::rule::RuleSplitAggregate;
use crate::optimizer::optimizers::rule::RuleTryApplyAggIndex;
Expand Down Expand Up @@ -130,6 +131,7 @@ impl RuleFactory {
RuleID::MergeFilterIntoMutation => {
Ok(Box::new(RuleMergeFilterIntoMutation::new(metadata)))
}
RuleID::PushDownAntiJoin => Ok(Box::new(RulePushdownAntiJoin::new())),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use crate::optimizer::ir::Matcher;
use crate::optimizer::ir::RelExpr;
use crate::optimizer::ir::SExpr;
use crate::optimizer::optimizers::operator::EquivalentConstantsVisitor;
use crate::optimizer::optimizers::operator::InferFilterOptimizer;
use crate::optimizer::optimizers::operator::JoinProperty;
use crate::optimizer::optimizers::rule::can_filter_null;
use crate::optimizer::optimizers::rule::constant::false_constant;
use crate::optimizer::optimizers::rule::constant::is_falsy;
Expand Down Expand Up @@ -242,9 +240,9 @@ pub fn try_push_down_filter_join(s_expr: &SExpr, metadata: MetadataRef) -> Resul
right_push_down = vec![];
}
}
let join_prop = JoinProperty::new(&left_prop.output_columns, &right_prop.output_columns);
let mut infer_filter = InferFilterOptimizer::new(Some(join_prop));
push_down_predicates = infer_filter.optimize(push_down_predicates)?;
// let join_prop = JoinProperty::new(&left_prop.output_columns, &right_prop.output_columns);
// let mut infer_filter = InferFilterOptimizer::new(Some(join_prop));
// push_down_predicates = infer_filter.optimize(push_down_predicates)?;
}

let mut all_push_down = vec![];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ mod push_down_filter_join;
mod rule_commute_join;
mod rule_commute_join_base_table;
mod rule_left_exchange_join;
mod rule_push_down_anti_join;
mod rule_semi_to_inner_join;
mod util;

pub use push_down_filter_join::*;
pub use rule_commute_join::RuleCommuteJoin;
pub use rule_commute_join_base_table::RuleCommuteJoinBaseTable;
pub use rule_left_exchange_join::RuleLeftExchangeJoin;
pub use rule_push_down_anti_join::RulePushdownAntiJoin;
pub use rule_semi_to_inner_join::RuleSemiToInnerJoin;
pub use util::get_join_predicates;
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_exception::Result;

use crate::binder::JoinPredicate;
use crate::optimizer::ir::Matcher;
use crate::optimizer::ir::RelExpr;
use crate::optimizer::ir::SExpr;
use crate::optimizer::optimizers::rule::Rule;
use crate::optimizer::optimizers::rule::RuleID;
use crate::optimizer::optimizers::rule::TransformResult;
use crate::plans::Join;
use crate::plans::JoinType;
use crate::plans::RelOp;
use crate::plans::RelOperator;
use crate::ColumnSet;

/// Push `Left/Right Semi|Anti` join closer to the base table that participates
/// in the predicate so that fewer rows stay in the join tree.
pub struct RulePushdownAntiJoin {
id: RuleID,
matchers: Vec<Matcher>,
}

impl RulePushdownAntiJoin {
pub fn new() -> Self {
Self {
id: RuleID::PushDownAntiJoin,
matchers: vec![Matcher::MatchOp {
op_type: RelOp::Join,
children: vec![Matcher::Leaf, Matcher::Leaf],
}],
}
}

fn try_push_down(&self, left: &SExpr, right: &SExpr, join: Join) -> Result<Option<SExpr>> {
let right_rel_expr = RelExpr::with_s_expr(right);

if let Some(inner_join) = extract_inner_join(left)? {
let inner_join_rel_expr = RelExpr::with_s_expr(&inner_join);
let inner_join_left_prop = inner_join_rel_expr.derive_relational_prop_child(0)?;
let inner_join_right_prop = inner_join_rel_expr.derive_relational_prop_child(1)?;

let equi_conditions = join
.equi_conditions
.iter()
.map(|condition| {
JoinPredicate::new(
&condition.left,
&inner_join_left_prop,
&inner_join_right_prop,
)
})
.collect::<Vec<_>>();

if equi_conditions.iter().all(left_predicate) {
// let mut new_equi_conditions = Vec::with_capacity(equi_conditions.len());

// for (idx, (inferred, predicate)) in equi_conditions.into_iter().enumerate() {
// if !inferred || matches!(predicate, JoinPredicate::Left(_)) {
// new_equi_conditions.push(join.equi_conditions[idx].clone());
// }
// }

// join.equi_conditions = new_equi_conditions;
let right_prop = right_rel_expr.derive_relational_prop()?;
let mut union_output_columns = ColumnSet::new();
union_output_columns.extend(right_prop.output_columns.clone());
union_output_columns.extend(inner_join_left_prop.output_columns.clone());

if join
.non_equi_conditions
.iter()
.all(|x| x.used_columns().is_subset(&union_output_columns))
{
let new_inner_join = inner_join.replace_children([
Arc::new(SExpr::create_binary(
RelOperator::Join(join.clone()),
inner_join.child(0)?.clone(),
right.clone(),
)),
Arc::new(inner_join.child(1)?.clone()),
]);

return replace_inner_join(left, new_inner_join);
}
} else if equi_conditions.iter().all(right_predicate) {
// let mut new_equi_conditions = Vec::with_capacity(equi_conditions.len());

// for (idx, (inferred, predicate)) in equi_conditions.into_iter().enumerate() {
// if !inferred || matches!(predicate, JoinPredicate::Left(_)) {
// new_equi_conditions.push(join.equi_conditions[idx].clone());
// }
// }

// join.equi_conditions = new_equi_conditions;
let right_prop = right_rel_expr.derive_relational_prop()?;
let mut union_output_columns = ColumnSet::new();
union_output_columns.extend(right_prop.output_columns.clone());
union_output_columns.extend(inner_join_right_prop.output_columns.clone());

if join
.non_equi_conditions
.iter()
.all(|x| x.used_columns().is_subset(&union_output_columns))
{
let new_inner_join = inner_join.replace_children([
Arc::new(inner_join.child(0)?.clone()),
Arc::new(SExpr::create_binary(
RelOperator::Join(join.clone()),
inner_join.child(1)?.clone(),
right.clone(),
)),
]);

return replace_inner_join(left, new_inner_join);
}
}
}

Ok(None)
}
}

impl Rule for RulePushdownAntiJoin {
fn id(&self) -> RuleID {
self.id
}

fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> {
let join: Join = s_expr.plan().clone().try_into()?;

if matches!(join.join_type, JoinType::LeftAnti | JoinType::LeftSemi) {
if let Some(mut result) =
self.try_push_down(s_expr.child(0)?, s_expr.child(1)?, join)?
{
result.set_applied_rule(&self.id);
state.add_result(result);
}
}

Ok(())
}

fn matchers(&self) -> &[Matcher] {
&self.matchers
}
}

impl Default for RulePushdownAntiJoin {
fn default() -> Self {
Self::new()
}
}

fn replace_inner_join(expr: &SExpr, new_join: SExpr) -> Result<Option<SExpr>> {
match expr.plan() {
RelOperator::Join(join) if join.join_type == JoinType::Inner => Ok(Some(new_join)),
RelOperator::Filter(_) => match replace_inner_join(expr.child(0)?, new_join)? {
None => Ok(None),
Some(new_child) => Ok(Some(expr.replace_children([Arc::new(new_child)]))),
},
_ => Ok(None),
}
}

fn extract_inner_join(expr: &SExpr) -> Result<Option<SExpr>> {
match expr.plan() {
RelOperator::Join(join) if join.join_type == JoinType::Inner => Ok(Some(expr.clone())),
RelOperator::Filter(_) => extract_inner_join(expr.child(0)?),
_ => Ok(None),
}
}

// struct ColumnMappingRewriter<'a> {
// mapping: &'a HashMap<usize, ColumnBinding>,
// }
//
// impl VisitorMut<'_> for ColumnMappingRewriter {
// fn visit_bound_column_ref(&mut self, col: &mut BoundColumnRef) -> Result<()> {
// if let Some(&new_index) = self.mapping.get(&col.column.index) {
// col.column = new_index.clone();
// }
// Ok(())
// }
// }
//
// fn replace_by_equivalence(
// expr: &ScalarExpr,
// mapping: &HashMap<IndexType, ColumnBinding>,
// ) -> Result<ScalarExpr> {
// if mapping.is_empty() {
// return Ok(expr.clone());
// }
//
// let mut new_expr = expr.clone();
// let mut rewriter = ColumnMappingRewriter { mapping };
// rewriter.visit(&mut new_expr)?;
// Ok(new_expr)
// }
//
// fn collect_mapping(join: &Join) -> Result<Vec<(ColumnBinding, ColumnBinding)>> {
// for equi_condition in &join.equi_conditions {
// match equi_condition.left
// }
// }
//
// fn all_left(
// conditions: &[JoinEquiCondition],
// inner_join: Join,
// left: Arc<RelationalProperty>,
// ) -> Result<bool> {
// }

fn left_predicate(tuple: &JoinPredicate) -> bool {
matches!(&tuple, JoinPredicate::Left(_))
}

fn right_predicate(tuple: &JoinPredicate) -> bool {
matches!(&tuple, JoinPredicate::Right(_))
}
2 changes: 2 additions & 0 deletions src/query/sql/src/planner/optimizer/optimizers/rule/rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ pub enum RuleID {
PushDownSortFilterScan,
PushDownLimitFilterScan,
SemiToInnerJoin,
PushDownAntiJoin,
EliminateEvalScalar,
EliminateFilter,
EliminateSort,
Expand Down Expand Up @@ -194,6 +195,7 @@ impl Display for RuleID {
RuleID::EliminateUnion => write!(f, "EliminateUnion"),

RuleID::MergeFilterIntoMutation => write!(f, "MergeFilterIntoMutation"),
RuleID::PushDownAntiJoin => write!(f, "PushDownAntiJoin"),
}
}
}
2 changes: 2 additions & 0 deletions src/query/sql/src/planner/plans/constant_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ impl Operator for ConstantTableScan {
max,
ndv: ndv as f64,
null_count,
num_rows: self.num_rows as u64,
origin_ndv: ndv as f64,
histogram,
};
column_stats.insert(*index, column_stat);
Expand Down
Loading
Loading