Skip to content

feat: convert in list to subquery #13399

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System
| 'group_by_two_level_threshold' | '20000' | '20000' | 'SESSION' | 'Sets the number of keys in a GROUP BY operation that will trigger a two-level aggregation.' | 'UInt64' |
| 'hide_options_in_show_create_table' | '1' | '1' | 'SESSION' | 'Hides table-relevant information, such as SNAPSHOT_LOCATION and STORAGE_FORMAT, at the end of the result of SHOW TABLE CREATE.' | 'UInt64' |
| 'hive_parquet_chunk_size' | '16384' | '16384' | 'SESSION' | 'the max number of rows each read from parquet to databend processor' | 'UInt64' |
| 'inlist_to_subquery' | '0' | '0' | 'SESSION' | 'Converts IN list to subquery.' | 'UInt64' |
| 'input_read_buffer_size' | '1048576' | '1048576' | 'SESSION' | 'Sets the memory size in bytes allocated to the buffer used by the buffered reader to read data from storage.' | 'UInt64' |
| 'join_spilling_threshold' | '0' | '0' | 'SESSION' | 'Maximum amount of memory can use for hash join, 0 is unlimited.' | 'UInt64' |
| 'lazy_read_threshold' | '1000' | '1000' | 'SESSION' | 'Sets the maximum LIMIT in a query to enable lazy read optimization. Setting it to 0 disables the optimization.' | 'UInt64' |
Expand Down
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ impl DefaultSettings {
possible_values: None,
display_in_show_settings: true,
}),
("inlist_to_subquery", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Converts IN list to subquery.",
possible_values: None,
display_in_show_settings: true,
}),
("disable_join_reorder", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Disable join reorder optimization.",
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ impl Settings {
Ok(self.try_get_u64("enable_cbo")? != 0)
}

pub fn get_inlist_to_subquery(&self) -> Result<bool> {
Ok(self.try_get_u64("inlist_to_subquery")? != 0)
}

pub fn get_disable_join_reorder(&self) -> Result<bool> {
Ok(self.try_get_u64("disable_join_reorder")? != 0)
}
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,5 @@ pub use scalar::ScalarBinder;
pub use scalar_common::*;
pub use scalar_visitor::*;
pub use table::parse_result_scan_args;
pub use values::bind_values;
pub use window::WindowOrderByInfo;
280 changes: 150 additions & 130 deletions src/query/sql/src/planner/binder/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use common_ast::ast::Expr as AExpr;
use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use common_exception::Span;
Expand All @@ -37,6 +38,8 @@ use crate::plans::ConstantTableScan;
use crate::BindContext;
use crate::Binder;
use crate::ColumnBindingBuilder;
use crate::MetadataRef;
use crate::NameResolutionContext;
use crate::ScalarBinder;
use crate::Visibility;

Expand All @@ -48,149 +51,166 @@ impl Binder {
span: Span,
values: &Vec<Vec<AExpr>>,
) -> Result<(SExpr, BindContext)> {
if values.is_empty() {
return Err(ErrorCode::SemanticError(
"Values lists must have at least one row".to_string(),
)
.set_span(span));
}
let same_length = values.windows(2).all(|v| v[0].len() == v[1].len());
if !same_length {
return Err(ErrorCode::SemanticError(
"Values lists must all be the same length".to_string(),
)
.set_span(span));
}

let num_rows = values.len();
let num_cols = values[0].len();

// assigns default column names col0, col1, etc.
let names = (0..num_cols)
.map(|i| format!("col{}", i))
.collect::<Vec<_>>();

let mut scalar_binder = ScalarBinder::new(
bind_context,
bind_values(
self.ctx.clone(),
&self.name_resolution_ctx,
self.metadata.clone(),
&[],
HashMap::new(),
Box::new(IndexMap::new()),
);

let mut col_scalars = vec![Vec::with_capacity(values.len()); num_cols];
let mut common_types: Vec<Option<DataType>> = vec![None; num_cols];

for row_values in values.iter() {
for (i, value) in row_values.iter().enumerate() {
let (scalar, data_type) = scalar_binder.bind(value).await?;
col_scalars[i].push((scalar, data_type.clone()));

// Get the common data type for each columns.
match &common_types[i] {
Some(common_type) => {
if common_type != &data_type {
let new_common_type = common_super_type(
common_type.clone(),
data_type.clone(),
&BUILTIN_FUNCTIONS.default_cast_rules,
);
if new_common_type.is_none() {
return Err(ErrorCode::SemanticError(format!(
"{} and {} don't have common data type",
common_type, data_type
))
.set_span(span));
}
common_types[i] = new_common_type;
bind_context,
span,
values,
)
.await
}
}

pub async fn bind_values(
ctx: Arc<dyn TableContext>,
name_resolution_ctx: &NameResolutionContext,
metadata: MetadataRef,
bind_context: &mut BindContext,
span: Span,
values: &Vec<Vec<AExpr>>,
) -> Result<(SExpr, BindContext)> {
if values.is_empty() {
return Err(ErrorCode::SemanticError(
"Values lists must have at least one row".to_string(),
)
.set_span(span));
}
let same_length = values.windows(2).all(|v| v[0].len() == v[1].len());
if !same_length {
return Err(ErrorCode::SemanticError(
"Values lists must all be the same length".to_string(),
)
.set_span(span));
}

let num_rows = values.len();
let num_cols = values[0].len();

// assigns default column names col0, col1, etc.
let names = (0..num_cols)
.map(|i| format!("col{}", i))
.collect::<Vec<_>>();

let mut scalar_binder = ScalarBinder::new(
bind_context,
ctx.clone(),
name_resolution_ctx,
metadata.clone(),
&[],
HashMap::new(),
Box::new(IndexMap::new()),
);

let mut col_scalars = vec![Vec::with_capacity(values.len()); num_cols];
let mut common_types: Vec<Option<DataType>> = vec![None; num_cols];

for row_values in values.iter() {
for (i, value) in row_values.iter().enumerate() {
let (scalar, data_type) = scalar_binder.bind(value).await?;
col_scalars[i].push((scalar, data_type.clone()));

// Get the common data type for each columns.
match &common_types[i] {
Some(common_type) => {
if common_type != &data_type {
let new_common_type = common_super_type(
common_type.clone(),
data_type.clone(),
&BUILTIN_FUNCTIONS.default_cast_rules,
);
if new_common_type.is_none() {
return Err(ErrorCode::SemanticError(format!(
"{} and {} don't have common data type",
common_type, data_type
))
.set_span(span));
}
common_types[i] = new_common_type;
}
None => {
common_types[i] = Some(data_type);
}
}
None => {
common_types[i] = Some(data_type);
}
}
}
}

let mut value_fields = Vec::with_capacity(names.len());
for (name, common_type) in names.into_iter().zip(common_types.into_iter()) {
let value_field = DataField::new(&name, common_type.unwrap());
value_fields.push(value_field);
}
let value_schema = DataSchema::new(value_fields);

let input = DataBlock::empty();
let func_ctx = self.ctx.get_function_context()?;
let evaluator = Evaluator::new(&input, &func_ctx, &BUILTIN_FUNCTIONS);

// use values to build columns
let mut value_columns = Vec::with_capacity(col_scalars.len());
for (scalars, value_field) in col_scalars.iter().zip(value_schema.fields().iter()) {
let mut builder =
ColumnBuilder::with_capacity(value_field.data_type(), col_scalars.len());
for (scalar, value_type) in scalars {
let scalar = if value_type != value_field.data_type() {
wrap_cast_scalar(scalar, value_type, value_field.data_type())?
} else {
scalar.clone()
};
let expr = scalar.as_expr()?.project_column_ref(|col| {
value_schema.index_of(&col.index.to_string()).unwrap()
});
let result = evaluator.run(&expr)?;

match result.as_scalar() {
Some(val) => {
builder.push(val.as_ref());
}
None => {
return Err(ErrorCode::SemanticError(format!(
"Value must be a scalar, but get {}",
result
))
.set_span(span));
}
let mut value_fields = Vec::with_capacity(names.len());
for (name, common_type) in names.into_iter().zip(common_types.into_iter()) {
let value_field = DataField::new(&name, common_type.unwrap());
value_fields.push(value_field);
}
let value_schema = DataSchema::new(value_fields);

let input = DataBlock::empty();
let func_ctx = ctx.get_function_context()?;
let evaluator = Evaluator::new(&input, &func_ctx, &BUILTIN_FUNCTIONS);

// use values to build columns
let mut value_columns = Vec::with_capacity(col_scalars.len());
for (scalars, value_field) in col_scalars.iter().zip(value_schema.fields().iter()) {
let mut builder = ColumnBuilder::with_capacity(value_field.data_type(), col_scalars.len());
for (scalar, value_type) in scalars {
let scalar = if value_type != value_field.data_type() {
wrap_cast_scalar(scalar, value_type, value_field.data_type())?
} else {
scalar.clone()
};
let expr = scalar
.as_expr()?
.project_column_ref(|col| value_schema.index_of(&col.index.to_string()).unwrap());
let result = evaluator.run(&expr)?;

match result.as_scalar() {
Some(val) => {
builder.push(val.as_ref());
}
None => {
return Err(ErrorCode::SemanticError(format!(
"Value must be a scalar, but get {}",
result
))
.set_span(span));
}
}
value_columns.push(builder.build());
}
value_columns.push(builder.build());
}

// add column bindings
let mut columns = ColumnSet::new();
let mut fields = Vec::with_capacity(values.len());
for value_field in value_schema.fields() {
let index = self
.metadata
.write()
.add_derived_column(value_field.name().clone(), value_field.data_type().clone());
columns.insert(index);

let column_binding = ColumnBindingBuilder::new(
value_field.name().clone(),
index,
Box::new(value_field.data_type().clone()),
Visibility::Visible,
)
.build();
bind_context.add_column_binding(column_binding);

let field = DataField::new(&index.to_string(), value_field.data_type().clone());
fields.push(field);
// add column bindings
let mut columns = ColumnSet::new();
let mut fields = Vec::with_capacity(values.len());
for value_field in value_schema.fields() {
let index = metadata
.write()
.add_derived_column(value_field.name().clone(), value_field.data_type().clone());
columns.insert(index);

let column_binding = ColumnBindingBuilder::new(
value_field.name().clone(),
index,
Box::new(value_field.data_type().clone()),
Visibility::Visible,
)
.build();
bind_context.add_column_binding(column_binding);

let field = DataField::new(&index.to_string(), value_field.data_type().clone());
fields.push(field);
}
let schema = DataSchemaRefExt::create(fields);

let s_expr = SExpr::create_leaf(Arc::new(
ConstantTableScan {
values: value_columns,
num_rows,
schema,
columns,
}
let schema = DataSchemaRefExt::create(fields);

let s_expr = SExpr::create_leaf(Arc::new(
ConstantTableScan {
values: value_columns,
num_rows,
schema,
columns,
}
.into(),
));
.into(),
));

Ok((s_expr, bind_context.clone()))
}
Ok((s_expr, bind_context.clone()))
}
16 changes: 13 additions & 3 deletions src/query/sql/src/planner/optimizer/heuristic/subquery_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,10 @@ impl SubqueryRewriter {
Arc::new(self.rewrite(s_expr.child(0)?)?),
)),

RelOperator::DummyTableScan(_) | RelOperator::Scan(_) | RelOperator::CteScan(_) => {
Ok(s_expr.clone())
}
RelOperator::DummyTableScan(_)
| RelOperator::Scan(_)
| RelOperator::CteScan(_)
| RelOperator::ConstantTableScan(_) => Ok(s_expr.clone()),

_ => Err(ErrorCode::Internal("Invalid plan type")),
}
Expand Down Expand Up @@ -708,6 +709,15 @@ pub fn check_child_expr_in_subquery(
) -> Result<(ScalarExpr, bool)> {
match child_expr {
ScalarExpr::BoundColumnRef(_) => Ok((child_expr.clone(), op != &ComparisonOp::Equal)),
ScalarExpr::FunctionCall(func) => {
if func.func_name.eq("tuple") {
return Ok((child_expr.clone(), op != &ComparisonOp::Equal));
}
Err(ErrorCode::Internal(format!(
"Invalid child expr in subquery: {:?}",
child_expr
)))
}
ScalarExpr::ConstantExpr(_) => Ok((child_expr.clone(), true)),
ScalarExpr::CastExpr(cast) => {
let arg = &cast.argument;
Expand Down
Loading