Skip to content

Commit

Permalink
Merge branch 'main' into asof
Browse files Browse the repository at this point in the history
  • Loading branch information
zenus authored May 24, 2024
2 parents b501560 + 56f0f17 commit 1d7584b
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,13 @@ impl CopyIntoTableInterpreter {
.await?;
let mut update_stream_meta_reqs = vec![];
let (source, project_columns) = if let Some(ref query) = plan.query {
let (query_interpreter, update_stream_meta) = self.build_query(query).await?;
let query = if plan.enable_distributed {
query.remove_exchange_for_select()
} else {
*query.clone()
};

let (query_interpreter, update_stream_meta) = self.build_query(&query).await?;
update_stream_meta_reqs = update_stream_meta;
let query_physical_plan = Box::new(query_interpreter.build_physical_plan().await?);

Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ impl Interpreter for InsertInterpreter {
let catalog = self.ctx.get_catalog(&self.plan.catalog).await?;
let catalog_info = catalog.info();

// here we remove the last exchange merge plan to trigger distribute insert
let insert_select_plan = match select_plan {
PhysicalPlan::Exchange(ref mut exchange) => {
// insert can be dispatched to different nodes
Expand Down
7 changes: 1 addition & 6 deletions src/query/sql/src/planner/binder/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ use crate::binder::scalar::ScalarBinder;
use crate::binder::Binder;
use crate::binder::ColumnBindingBuilder;
use crate::binder::Visibility;
use crate::optimizer::optimize;
use crate::optimizer::OptimizerContext;
use crate::parse_computed_expr_to_string;
use crate::parse_default_expr_to_string;
use crate::planner::semantic::normalize_identifier;
Expand Down Expand Up @@ -661,10 +659,7 @@ impl Binder {
let mut bind_context = BindContext::new();
let stmt = Statement::Query(Box::new(*query.clone()));
let select_plan = self.bind_statement(&mut bind_context, &stmt).await?;
// Don't enable distributed optimization for `CREATE TABLE ... AS SELECT ...` for now
let opt_ctx = OptimizerContext::new(self.ctx.clone(), self.metadata.clone());
let optimized_plan = optimize(opt_ctx, select_plan).await?;
Some(Box::new(optimized_plan))
Some(Box::new(select_plan))
} else {
None
},
Expand Down
8 changes: 1 addition & 7 deletions src/query/sql/src/planner/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ use databend_common_meta_app::principal::OnErrorMode;

use crate::binder::Binder;
use crate::normalize_identifier;
use crate::optimizer::optimize;
use crate::optimizer::OptimizerContext;
use crate::plans::insert::InsertValue;
use crate::plans::CopyIntoTableMode;
use crate::plans::Insert;
Expand Down Expand Up @@ -179,11 +177,7 @@ impl Binder {
InsertSource::Select { query } => {
let statement = Statement::Query(query);
let select_plan = self.bind_statement(bind_context, &statement).await?;
let opt_ctx = OptimizerContext::new(self.ctx.clone(), self.metadata.clone())
.with_enable_distributed_optimization(!self.ctx.get_cluster().is_empty());

let optimized_plan = optimize(opt_ctx, select_plan).await?;
Ok(InsertInputSource::SelectPlan(Box::new(optimized_plan)))
Ok(InsertInputSource::SelectPlan(Box::new(select_plan)))
}
};

Expand Down
8 changes: 1 addition & 7 deletions src/query/sql/src/planner/binder/insert_multi_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ use databend_common_expression::DataSchemaRef;
use databend_common_expression::TableSchema;

use crate::binder::ScalarBinder;
use crate::optimizer::optimize;
use crate::optimizer::OptimizerContext;
use crate::plans::Else;
use crate::plans::InsertMultiTable;
use crate::plans::Into;
Expand Down Expand Up @@ -62,9 +60,6 @@ impl Binder {
};

let (s_expr, bind_context) = self.bind_single_table(bind_context, &table_ref).await?;
let opt_ctx = OptimizerContext::new(self.ctx.clone(), self.metadata.clone())
.with_enable_distributed_optimization(!self.ctx.get_cluster().is_empty());

let select_plan = Plan::Query {
s_expr: Box::new(s_expr),
metadata: self.metadata.clone(),
Expand All @@ -74,8 +69,7 @@ impl Binder {
ignore_result: false,
};

let optimized_plan = optimize(opt_ctx, select_plan).await?;
(optimized_plan, bind_context)
(select_plan, bind_context)
};

let source_schema = input_source.schema();
Expand Down
7 changes: 1 addition & 6 deletions src/query/sql/src/planner/binder/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ use databend_common_meta_app::principal::OnErrorMode;

use crate::binder::Binder;
use crate::normalize_identifier;
use crate::optimizer::optimize;
use crate::optimizer::OptimizerContext;
use crate::plans::insert::InsertValue;
use crate::plans::CopyIntoTableMode;
use crate::plans::InsertInputSource;
Expand Down Expand Up @@ -161,10 +159,7 @@ impl Binder {
InsertSource::Select { query } => {
let statement = Statement::Query(query);
let select_plan = self.bind_statement(bind_context, &statement).await?;
let opt_ctx = OptimizerContext::new(self.ctx.clone(), self.metadata.clone())
.with_enable_distributed_optimization(false);
let optimized_plan = optimize(opt_ctx, select_plan).await?;
Ok(InsertInputSource::SelectPlan(Box::new(optimized_plan)))
Ok(InsertInputSource::SelectPlan(Box::new(select_plan)))
}
};

Expand Down
58 changes: 58 additions & 0 deletions src/query/sql/src/planner/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashSet;
use std::sync::Arc;

use async_recursion::async_recursion;
use databend_common_ast::ast::ExplainKind;
use databend_common_catalog::merge_into_join::MergeIntoJoin;
use databend_common_catalog::merge_into_join::MergeIntoJoinType;
Expand Down Expand Up @@ -50,6 +51,7 @@ use crate::plans::Join;
use crate::plans::MergeInto;
use crate::plans::Plan;
use crate::plans::RelOperator;
use crate::InsertInputSource;
use crate::MetadataRef;

#[derive(Clone, Educe)]
Expand Down Expand Up @@ -156,6 +158,7 @@ impl<'a> RecursiveOptimizer<'a> {
}

#[minitrace::trace]
#[async_recursion]
pub async fn optimize(opt_ctx: OptimizerContext, plan: Plan) -> Result<Plan> {
match plan {
Plan::Query {
Expand Down Expand Up @@ -224,10 +227,65 @@ pub async fn optimize(opt_ctx: OptimizerContext, plan: Plan) -> Result<Plan> {
"after optimization enable_distributed_copy? : {}",
plan.enable_distributed
);

if let Some(p) = &plan.query {
let optimized_plan = optimize(opt_ctx.clone(), *p.clone()).await?;
plan.query = Some(Box::new(optimized_plan));
}
Ok(Plan::CopyIntoTable(plan))
}
Plan::MergeInto(plan) => optimize_merge_into(opt_ctx.clone(), plan).await,

// distributed insert will be optimized in `physical_plan_builder`
Plan::Insert(mut plan) => {
match plan.source {
InsertInputSource::SelectPlan(p) => {
let optimized_plan = optimize(opt_ctx.clone(), *p.clone()).await?;
plan.source = InsertInputSource::SelectPlan(Box::new(optimized_plan));
}
InsertInputSource::Stage(p) => {
let optimized_plan = optimize(opt_ctx.clone(), *p.clone()).await?;
plan.source = InsertInputSource::Stage(Box::new(optimized_plan));
}
_ => {}
}
Ok(Plan::Insert(plan))
}
Plan::InsertMultiTable(mut plan) => {
plan.input_source = optimize(opt_ctx.clone(), plan.input_source.clone()).await?;
Ok(Plan::InsertMultiTable(plan))
}
Plan::Replace(mut plan) => {
match plan.source {
InsertInputSource::SelectPlan(p) => {
let optimized_plan = optimize(opt_ctx.clone(), *p.clone()).await?;
plan.source = InsertInputSource::SelectPlan(Box::new(optimized_plan));
}
InsertInputSource::Stage(p) => {
let optimized_plan = optimize(opt_ctx.clone(), *p.clone()).await?;
plan.source = InsertInputSource::Stage(Box::new(optimized_plan));
}
_ => {}
}
Ok(Plan::Replace(plan))
}

Plan::CreateTable(mut plan) => {
if let Some(p) = &plan.as_select {
let optimized_plan = optimize(opt_ctx.clone(), *p.clone()).await?;
plan.as_select = Some(Box::new(optimized_plan));
}

Ok(Plan::CreateTable(plan))
}
// Already done in binder
// Plan::RefreshIndex(mut plan) => {
// // use fresh index
// let opt_ctx =
// OptimizerContext::new(opt_ctx.table_ctx.clone(), opt_ctx.metadata.clone());
// plan.query_plan = Box::new(optimize(opt_ctx.clone(), *plan.query_plan.clone()).await?);
// Ok(Plan::RefreshIndex(plan))
// }
// Pass through statements.
_ => Ok(plan),
}
Expand Down
27 changes: 27 additions & 0 deletions src/query/sql/src/planner/plans/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use databend_common_expression::DataSchema;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::DataSchemaRefExt;

use super::Exchange;
use super::RelOperator;
use crate::binder::ExplainConfig;
use crate::optimizer::SExpr;
use crate::plans::copy_into_location::CopyIntoLocationPlan;
Expand Down Expand Up @@ -488,4 +490,29 @@ impl Plan {
pub fn has_result_set(&self) -> bool {
!self.schema().fields().is_empty()
}

pub fn remove_exchange_for_select(&self) -> Self {
if let Plan::Query {
s_expr,
metadata,
bind_context,
rewrite_kind,
formatted_ast,
ignore_result,
} = self
{
if let RelOperator::Exchange(Exchange::Merge) = s_expr.plan.as_ref() {
let s_expr = Box::new(s_expr.child(0).unwrap().clone());
return Plan::Query {
s_expr,
metadata: metadata.clone(),
bind_context: bind_context.clone(),
rewrite_kind: rewrite_kind.clone(),
formatted_ast: formatted_ast.clone(),
ignore_result: *ignore_result,
};
}
}
self.clone()
}
}
44 changes: 44 additions & 0 deletions tests/sqllogictests/suites/mode/cluster/create_table.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
query T
explain create or replace table t2 as select number % 400 d, max(number) from numbers(10000000) group by number limit 3;
----
CreateTableAsSelect:
(empty)
EvalScalar
├── output columns: [max(number) (#6), d (#7)]
├── expressions: [numbers.number (#4) % 400]
├── estimated rows: 3.00
└── Limit
├── output columns: [max(number) (#6), numbers.number (#4)]
├── limit: 3
├── offset: 0
├── estimated rows: 3.00
└── Exchange
├── output columns: [max(number) (#6), numbers.number (#4)]
├── exchange type: Merge
└── Limit
├── output columns: [max(number) (#6), numbers.number (#4)]
├── limit: 3
├── offset: 0
├── estimated rows: 3.00
└── AggregateFinal
├── output columns: [max(number) (#6), numbers.number (#4)]
├── group by: [number]
├── aggregate functions: [max(number)]
├── limit: 3
├── estimated rows: 10000000.00
└── Exchange
├── output columns: [max(number) (#6), numbers.number (#4)]
├── exchange type: Hash(0)
└── AggregatePartial
├── group by: [number]
├── aggregate functions: [max(number)]
├── estimated rows: 10000000.00
└── TableScan
├── table: default.system.numbers
├── output columns: [number (#4)]
├── read rows: 10000000
├── read size: 76.29 MiB
├── partitions total: 153
├── partitions scanned: 153
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 10000000.00
Original file line number Diff line number Diff line change
@@ -1,26 +1,12 @@
statement ok
set enable_distributed_copy_into = 1;

statement ok
drop table if exists test_order;

statement ok
drop table if exists random_source;

statement ok
drop stage if exists test_stage;

statement ok
drop table if exists parquet_table;

statement ok
drop stage if exists parquet_stage;

statement ok
create stage st FILE_FORMAT = (TYPE = CSV);
create or replace stage st FILE_FORMAT = (TYPE = CSV);

statement ok
create table table_random(a int not null,b string not null,c string not null) ENGINE = Random;
create or replace table table_random(a int not null,b string not null,c string not null) ENGINE = Random;

statement ok
copy into @st from (select a,b,c from table_random limit 1000000);
Expand All @@ -47,7 +33,7 @@ statement ok
copy into @st from (select a,b,c from table_random limit 1000000);

statement ok
create table t(a int not null,b string not null,c string not null);
create or replace table t(a int not null,b string not null,c string not null);

statement ok
copy into t from @st force = true;
Expand All @@ -74,10 +60,10 @@ statement ok
set enable_distributed_copy_into = 1;

statement ok
create table t_query(a int not null,b string not null,c string not null);
create or replace table t_query(a int not null,b string not null,c string not null);

statement ok
create stage st_query FILE_FORMAT = (TYPE = TSV);
create or replace stage st_query FILE_FORMAT = (TYPE = TSV);

statement ok
copy into @st_query from (select a,b,c from table_random limit 1000000);
Expand All @@ -100,10 +86,10 @@ select count(*) from t_query;

## add parquet_file_test
statement ok
create table parquet_table(a int not null,b string not null,c string not null);
create or replace table parquet_table(a int not null,b string not null,c string not null);

statement ok
create stage parquet_stage file_format = (type = parquet);
create or replace stage parquet_stage file_format = (type = parquet);

statement ok
copy into @parquet_stage from (select a,b,c from table_random limit 100000);
Expand Down Expand Up @@ -148,10 +134,10 @@ select count(*) from parquet_table;
# make sure it's distributed.

statement ok
create table t_query2(a int not null,b string not null,c string not null);
create or replace table t_query2(a int not null,b string not null,c string not null);

statement ok
create stage st_query2 FILE_FORMAT = (TYPE = TSV);
create or replace stage st_query2 FILE_FORMAT = (TYPE = TSV);

statement ok
copy into @st_query2 from (select a,b,c from table_random limit 10);
Expand All @@ -178,13 +164,13 @@ select block_count from fuse_snapshot('default','t_query2') limit 1;

#test cluster key
statement ok
create table test_order(a int not null,b string not null,c timestamp not null) cluster by(to_yyyymmdd(c),a);
create or replace table test_order(a int not null,b string not null,c timestamp not null) cluster by(to_yyyymmdd(c),a);

statement ok
create table random_source like test_order Engine = Random;
create or replace table random_source like test_order Engine = Random;

statement ok
create stage test_stage;
create or replace stage test_stage;

statement ok
copy into @test_stage from (select * from random_source limit 4000000) FILE_FORMAT=(type=parquet);
Expand Down

0 comments on commit 1d7584b

Please sign in to comment.