Skip to content

fix(query): unify optimize plan call in one place #15630

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
May 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,13 @@ impl CopyIntoTableInterpreter {
.await?;
let mut update_stream_meta_reqs = vec![];
let (source, project_columns) = if let Some(ref query) = plan.query {
let (query_interpreter, update_stream_meta) = self.build_query(query).await?;
let query = if plan.enable_distributed {
query.remove_exchange_for_select()
} else {
*query.clone()
};

let (query_interpreter, update_stream_meta) = self.build_query(&query).await?;
update_stream_meta_reqs = update_stream_meta;
let query_physical_plan = Box::new(query_interpreter.build_physical_plan().await?);

Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ impl Interpreter for InsertInterpreter {
let catalog = self.ctx.get_catalog(&self.plan.catalog).await?;
let catalog_info = catalog.info();

// here we remove the last exchange merge plan to trigger distribute insert
let insert_select_plan = match select_plan {
PhysicalPlan::Exchange(ref mut exchange) => {
// insert can be dispatched to different nodes
Expand Down
6 changes: 1 addition & 5 deletions src/query/sql/src/planner/binder/ddl/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ use derive_visitor::Drive;
use derive_visitor::DriveMut;

use crate::binder::Binder;
use crate::optimizer::optimize;
use crate::optimizer::OptimizerContext;
use crate::plans::CreateIndexPlan;
use crate::plans::CreateTableIndexPlan;
use crate::plans::DropIndexPlan;
Expand Down Expand Up @@ -350,9 +348,7 @@ impl Binder {

bind_context.planning_agg_index = true;
let plan = if let Statement::Query(_) = &stmt {
let select_plan = self.bind_statement(bind_context, &stmt).await?;
let opt_ctx = OptimizerContext::new(self.ctx.clone(), self.metadata.clone());
Ok(optimize(opt_ctx, select_plan).await?)
self.bind_statement(bind_context, &stmt).await
} else {
Err(ErrorCode::UnsupportedIndex("statement is not query"))
};
Expand Down
7 changes: 1 addition & 6 deletions src/query/sql/src/planner/binder/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ use crate::binder::scalar::ScalarBinder;
use crate::binder::Binder;
use crate::binder::ColumnBindingBuilder;
use crate::binder::Visibility;
use crate::optimizer::optimize;
use crate::optimizer::OptimizerContext;
use crate::parse_computed_expr_to_string;
use crate::parse_default_expr_to_string;
use crate::planner::semantic::normalize_identifier;
Expand Down Expand Up @@ -661,10 +659,7 @@ impl Binder {
let mut bind_context = BindContext::new();
let stmt = Statement::Query(Box::new(*query.clone()));
let select_plan = self.bind_statement(&mut bind_context, &stmt).await?;
// Don't enable distributed optimization for `CREATE TABLE ... AS SELECT ...` for now
let opt_ctx = OptimizerContext::new(self.ctx.clone(), self.metadata.clone());
let optimized_plan = optimize(opt_ctx, select_plan).await?;
Some(Box::new(optimized_plan))
Some(Box::new(select_plan))
} else {
None
},
Expand Down
8 changes: 1 addition & 7 deletions src/query/sql/src/planner/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ use databend_common_meta_app::principal::OnErrorMode;

use crate::binder::Binder;
use crate::normalize_identifier;
use crate::optimizer::optimize;
use crate::optimizer::OptimizerContext;
use crate::plans::insert::InsertValue;
use crate::plans::CopyIntoTableMode;
use crate::plans::Insert;
Expand Down Expand Up @@ -179,11 +177,7 @@ impl Binder {
InsertSource::Select { query } => {
let statement = Statement::Query(query);
let select_plan = self.bind_statement(bind_context, &statement).await?;
let opt_ctx = OptimizerContext::new(self.ctx.clone(), self.metadata.clone())
.with_enable_distributed_optimization(!self.ctx.get_cluster().is_empty());

let optimized_plan = optimize(opt_ctx, select_plan).await?;
Ok(InsertInputSource::SelectPlan(Box::new(optimized_plan)))
Ok(InsertInputSource::SelectPlan(Box::new(select_plan)))
}
};

Expand Down
8 changes: 1 addition & 7 deletions src/query/sql/src/planner/binder/insert_multi_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ use databend_common_expression::DataSchemaRef;
use databend_common_expression::TableSchema;

use crate::binder::ScalarBinder;
use crate::optimizer::optimize;
use crate::optimizer::OptimizerContext;
use crate::plans::Else;
use crate::plans::InsertMultiTable;
use crate::plans::Into;
Expand Down Expand Up @@ -62,9 +60,6 @@ impl Binder {
};

let (s_expr, bind_context) = self.bind_single_table(bind_context, &table_ref).await?;
let opt_ctx = OptimizerContext::new(self.ctx.clone(), self.metadata.clone())
.with_enable_distributed_optimization(!self.ctx.get_cluster().is_empty());

let select_plan = Plan::Query {
s_expr: Box::new(s_expr),
metadata: self.metadata.clone(),
Expand All @@ -74,8 +69,7 @@ impl Binder {
ignore_result: false,
};

let optimized_plan = optimize(opt_ctx, select_plan).await?;
(optimized_plan, bind_context)
(select_plan, bind_context)
};

let source_schema = input_source.schema();
Expand Down
7 changes: 1 addition & 6 deletions src/query/sql/src/planner/binder/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ use databend_common_meta_app::principal::OnErrorMode;

use crate::binder::Binder;
use crate::normalize_identifier;
use crate::optimizer::optimize;
use crate::optimizer::OptimizerContext;
use crate::plans::insert::InsertValue;
use crate::plans::CopyIntoTableMode;
use crate::plans::InsertInputSource;
Expand Down Expand Up @@ -161,10 +159,7 @@ impl Binder {
InsertSource::Select { query } => {
let statement = Statement::Query(query);
let select_plan = self.bind_statement(bind_context, &statement).await?;
let opt_ctx = OptimizerContext::new(self.ctx.clone(), self.metadata.clone())
.with_enable_distributed_optimization(false);
let optimized_plan = optimize(opt_ctx, select_plan).await?;
Ok(InsertInputSource::SelectPlan(Box::new(optimized_plan)))
Ok(InsertInputSource::SelectPlan(Box::new(select_plan)))
}
};

Expand Down
57 changes: 57 additions & 0 deletions src/query/sql/src/planner/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashSet;
use std::sync::Arc;

use async_recursion::async_recursion;
use databend_common_ast::ast::ExplainKind;
use databend_common_catalog::merge_into_join::MergeIntoJoin;
use databend_common_catalog::merge_into_join::MergeIntoJoinType;
Expand Down Expand Up @@ -50,6 +51,7 @@ use crate::plans::Join;
use crate::plans::MergeInto;
use crate::plans::Plan;
use crate::plans::RelOperator;
use crate::InsertInputSource;
use crate::MetadataRef;

#[derive(Clone, Educe)]
Expand Down Expand Up @@ -156,6 +158,7 @@ impl<'a> RecursiveOptimizer<'a> {
}

#[minitrace::trace]
#[async_recursion]
pub async fn optimize(opt_ctx: OptimizerContext, plan: Plan) -> Result<Plan> {
match plan {
Plan::Query {
Expand Down Expand Up @@ -224,10 +227,64 @@ pub async fn optimize(opt_ctx: OptimizerContext, plan: Plan) -> Result<Plan> {
"after optimization enable_distributed_copy? : {}",
plan.enable_distributed
);

if let Some(p) = &plan.query {
let optimized_plan = optimize(opt_ctx.clone(), *p.clone()).await?;
plan.query = Some(Box::new(optimized_plan));
}
Ok(Plan::CopyIntoTable(plan))
}
Plan::MergeInto(plan) => optimize_merge_into(opt_ctx.clone(), plan).await,

// distributed insert will be optimized in `physical_plan_builder`
Plan::Insert(mut plan) => {
match plan.source {
InsertInputSource::SelectPlan(p) => {
let optimized_plan = optimize(opt_ctx.clone(), *p.clone()).await?;
plan.source = InsertInputSource::SelectPlan(Box::new(optimized_plan));
}
InsertInputSource::Stage(p) => {
let optimized_plan = optimize(opt_ctx.clone(), *p.clone()).await?;
plan.source = InsertInputSource::Stage(Box::new(optimized_plan));
}
_ => {}
}
Ok(Plan::Insert(plan))
}
Plan::InsertMultiTable(mut plan) => {
plan.input_source = optimize(opt_ctx.clone(), plan.input_source.clone()).await?;
Ok(Plan::InsertMultiTable(plan))
}
Plan::Replace(mut plan) => {
match plan.source {
InsertInputSource::SelectPlan(p) => {
let optimized_plan = optimize(opt_ctx.clone(), *p.clone()).await?;
plan.source = InsertInputSource::SelectPlan(Box::new(optimized_plan));
}
InsertInputSource::Stage(p) => {
let optimized_plan = optimize(opt_ctx.clone(), *p.clone()).await?;
plan.source = InsertInputSource::Stage(Box::new(optimized_plan));
}
_ => {}
}
Ok(Plan::Replace(plan))
}

Plan::CreateTable(mut plan) => {
if let Some(p) = &plan.as_select {
let optimized_plan = optimize(opt_ctx.clone(), *p.clone()).await?;
plan.as_select = Some(Box::new(optimized_plan));
}

Ok(Plan::CreateTable(plan))
}
Plan::RefreshIndex(mut plan) => {
// use fresh index
let opt_ctx =
OptimizerContext::new(opt_ctx.table_ctx.clone(), opt_ctx.metadata.clone());
plan.query_plan = Box::new(optimize(opt_ctx.clone(), *plan.query_plan.clone()).await?);
Ok(Plan::RefreshIndex(plan))
}
// Pass through statements.
_ => Ok(plan),
}
Expand Down
27 changes: 27 additions & 0 deletions src/query/sql/src/planner/plans/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use databend_common_expression::DataSchema;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::DataSchemaRefExt;

use super::Exchange;
use super::RelOperator;
use crate::binder::ExplainConfig;
use crate::optimizer::SExpr;
use crate::plans::copy_into_location::CopyIntoLocationPlan;
Expand Down Expand Up @@ -488,4 +490,29 @@ impl Plan {
pub fn has_result_set(&self) -> bool {
!self.schema().fields().is_empty()
}

pub fn remove_exchange_for_select(&self) -> Self {
if let Plan::Query {
s_expr,
metadata,
bind_context,
rewrite_kind,
formatted_ast,
ignore_result,
} = self
{
if let RelOperator::Exchange(Exchange::Merge) = s_expr.plan.as_ref() {
let s_expr = Box::new(s_expr.child(0).unwrap().clone());
return Plan::Query {
s_expr,
metadata: metadata.clone(),
bind_context: bind_context.clone(),
rewrite_kind: rewrite_kind.clone(),
formatted_ast: formatted_ast.clone(),
ignore_result: *ignore_result,
};
}
}
self.clone()
}
}
44 changes: 44 additions & 0 deletions tests/sqllogictests/suites/mode/cluster/create_table.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
query T
explain create or replace table t2 as select number % 400 d, max(number) from numbers(10000000) group by number limit 3;
----
CreateTableAsSelect:
(empty)
EvalScalar
├── output columns: [max(number) (#6), d (#7)]
├── expressions: [numbers.number (#4) % 400]
├── estimated rows: 3.00
└── Limit
├── output columns: [max(number) (#6), numbers.number (#4)]
├── limit: 3
├── offset: 0
├── estimated rows: 3.00
└── Exchange
├── output columns: [max(number) (#6), numbers.number (#4)]
├── exchange type: Merge
└── Limit
├── output columns: [max(number) (#6), numbers.number (#4)]
├── limit: 3
├── offset: 0
├── estimated rows: 3.00
└── AggregateFinal
├── output columns: [max(number) (#6), numbers.number (#4)]
├── group by: [number]
├── aggregate functions: [max(number)]
├── limit: 3
├── estimated rows: 10000000.00
└── Exchange
├── output columns: [max(number) (#6), numbers.number (#4)]
├── exchange type: Hash(0)
└── AggregatePartial
├── group by: [number]
├── aggregate functions: [max(number)]
├── estimated rows: 10000000.00
└── TableScan
├── table: default.system.numbers
├── output columns: [number (#4)]
├── read rows: 10000000
├── read size: 76.29 MiB
├── partitions total: 153
├── partitions scanned: 153
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 10000000.00
Loading
Loading