Skip to content

feat: partition by left most cluster key when building merge into filter #13547

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 143 additions & 15 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ use std::sync::Arc;
use std::time::Instant;
use std::u64::MAX;

use common_ast::parser::parse_comma_separated_exprs;
use common_ast::parser::tokenize_sql;
use common_ast::Dialect;
use common_catalog::lock::Lock;
use common_catalog::table::TableExt;
use common_exception::ErrorCode;
Expand All @@ -30,6 +33,7 @@ use common_expression::SendableDataBlockStream;
use common_expression::ROW_NUMBER_COL_NAME;
use common_functions::BUILTIN_FUNCTIONS;
use common_meta_app::schema::TableInfo;
use common_sql::bind_one_table;
use common_sql::executor::CommitSink;
use common_sql::executor::Exchange;
use common_sql::executor::FragmentKind::Merge;
Expand All @@ -54,16 +58,20 @@ use common_sql::plans::RelOperator;
use common_sql::plans::ScalarItem;
use common_sql::plans::UpdatePlan;
use common_sql::BindContext;
use common_sql::ColumnBinding;
use common_sql::ColumnBindingBuilder;
use common_sql::IndexType;
use common_sql::MetadataRef;
use common_sql::NameResolutionContext;
use common_sql::ScalarExpr;
use common_sql::TypeCheck;
use common_sql::TypeChecker;
use common_sql::Visibility;
use common_storages_factory::Table;
use common_storages_fuse::FuseTable;
use common_storages_fuse::TableContext;
use itertools::Itertools;
use log::info;
use storages_common_locks::LockManager;
use storages_common_table_meta::meta::TableSnapshot;
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -111,6 +119,20 @@ impl MergeStyleJoin<'_> {
target_sexpr,
}
}

pub fn collect_column_map(&self) -> HashMap<String, ColumnBinding> {
let mut column_map = HashMap::new();
for (t, s) in self
.target_conditions
.iter()
.zip(self.source_conditions.iter())
{
if let (ScalarExpr::BoundColumnRef(t_col), ScalarExpr::BoundColumnRef(s_col)) = (t, s) {
column_map.insert(t_col.column.column_name.clone(), s_col.column.clone());
}
}
column_map
}
}

impl MergeIntoInterpreter {
Expand Down Expand Up @@ -196,7 +218,7 @@ impl MergeIntoInterpreter {
};

let optimized_input = self
.build_static_filter(&input, meta_data, self.ctx.clone())
.build_static_filter(&input, meta_data, self.ctx.clone(), check_table)
.await?;
let mut builder = PhysicalPlanBuilder::new(meta_data.clone(), self.ctx.clone(), false);

Expand Down Expand Up @@ -454,6 +476,7 @@ impl MergeIntoInterpreter {
join: &SExpr,
metadata: &MetadataRef,
ctx: Arc<QueryContext>,
table: Arc<dyn Table>,
) -> Result<Box<SExpr>> {
// 1. collect statistics from the source side
// plan of source table is extended to:
Expand All @@ -469,9 +492,71 @@ impl MergeIntoInterpreter {
let mut eval_scalar_items = Vec::with_capacity(m_join.source_conditions.len());
let mut min_max_binding = Vec::with_capacity(m_join.source_conditions.len() * 2);
let mut min_max_scalar_items = Vec::with_capacity(m_join.source_conditions.len() * 2);
let mut group_items = vec![];
if m_join.source_conditions.is_empty() {
return Ok(Box::new(join.clone()));
}
let column_map = m_join.collect_column_map();
let fuse_table =
table
.as_any()
.downcast_ref::<FuseTable>()
.ok_or(ErrorCode::Unimplemented(format!(
"table {}, engine type {}, does not support MERGE INTO",
table.name(),
table.get_table_info().engine(),
)))?;
let mut group_exprs = vec![];
if let Some(cluster_key_str) = fuse_table.cluster_key_str() {
let sql_dialect = Dialect::MySQL;
let tokens = tokenize_sql(cluster_key_str)?;
let ast_exprs = parse_comma_separated_exprs(&tokens, sql_dialect)?;
let (mut bind_context, metadata) = bind_one_table(table)?;
if !ast_exprs.is_empty() {
let ast_expr = &ast_exprs[0];
let name_resolution_ctx =
NameResolutionContext::try_from(ctx.get_settings().as_ref())?;
let mut type_checker = TypeChecker::new(
&mut bind_context,
ctx.clone(),
&name_resolution_ctx,
metadata.clone(),
&[],
false,
false,
);
let (scalar_expr, _) = *type_checker.resolve(ast_expr).await?;
let projected = scalar_expr.try_project_column_binding(|binding| {
column_map.get(&binding.column_name).cloned()
});
if let Some(p) = projected {
group_exprs.push(p);
}
}
}
for group_expr in group_exprs {
let index = metadata
.write()
.add_derived_column("".to_string(), group_expr.data_type()?);
let evaled = ScalarExpr::BoundColumnRef(BoundColumnRef {
span: None,
column: ColumnBindingBuilder::new(
"".to_string(),
index,
Box::new(group_expr.data_type()?),
Visibility::Visible,
)
.build(),
});
eval_scalar_items.push(ScalarItem {
scalar: group_expr.clone(),
index,
});
group_items.push(ScalarItem {
scalar: evaled.clone(),
index,
});
}
for source_side_expr in m_join.source_conditions {
// eval source side join expr
let index = metadata
Expand Down Expand Up @@ -543,8 +628,6 @@ impl MergeIntoInterpreter {
min_max_scalar_items.push(max);
}

let group_item = eval_scalar_items[0].clone();

let eval_source_side_join_expr_op = EvalScalar {
items: eval_scalar_items,
};
Expand All @@ -568,7 +651,7 @@ impl MergeIntoInterpreter {

let agg_partial_op = Aggregate {
mode: AggregateMode::Partial,
group_items: vec![group_item.clone()],
group_items: group_items.clone(),
aggregate_functions: min_max_scalar_items.clone(),
from_distinct: false,
limit: None,
Expand All @@ -580,7 +663,7 @@ impl MergeIntoInterpreter {
);
let agg_final_op = Aggregate {
mode: AggregateMode::Final,
group_items: vec![group_item],
group_items,
aggregate_functions: min_max_scalar_items,
from_distinct: false,
limit: None,
Expand Down Expand Up @@ -643,16 +726,7 @@ impl MergeIntoInterpreter {
filter_parts.push(and);
}
}
let mut or = filter_parts[0].clone();
for filter_part in filter_parts.iter().skip(1) {
or = ScalarExpr::FunctionCall(FunctionCall {
span: None,
func_name: "or".to_string(),
params: vec![],
arguments: vec![or, filter_part.clone()],
});
}
filters.push(or);
filters.extend(Self::combine_filter_parts(&filter_parts).into_iter());
}
let mut target_plan = m_join.target_sexpr.clone();
Self::push_down_filters(&mut target_plan, &filters)?;
Expand All @@ -661,10 +735,64 @@ impl MergeIntoInterpreter {
Ok(Box::new(new_sexpr))
}

fn combine_filter_parts(filter_parts: &[ScalarExpr]) -> Option<ScalarExpr> {
match filter_parts.len() {
0 => None,
1 => Some(filter_parts[0].clone()),
_ => {
let mid = filter_parts.len() / 2;
let left = Self::combine_filter_parts(&filter_parts[0..mid]);
let right = Self::combine_filter_parts(&filter_parts[mid..]);
if let Some(left) = left {
if let Some(right) = right {
Some(ScalarExpr::FunctionCall(FunctionCall {
span: None,
func_name: "or".to_string(),
params: vec![],
arguments: vec![left, right],
}))
} else {
Some(left)
}
} else {
right
}
}
}
}

fn display_scalar_expr(s: &ScalarExpr) -> String {
match s {
ScalarExpr::BoundColumnRef(x) => x.column.column_name.clone(),
ScalarExpr::ConstantExpr(x) => x.value.to_string(),
ScalarExpr::WindowFunction(x) => format!("{:?}", x),
ScalarExpr::AggregateFunction(x) => format!("{:?}", x),
ScalarExpr::LambdaFunction(x) => format!("{:?}", x),
ScalarExpr::FunctionCall(x) => match x.func_name.as_str() {
"and" | "or" | "gte" | "lte" => {
format!(
"({} {} {})",
Self::display_scalar_expr(&x.arguments[0]),
x.func_name,
Self::display_scalar_expr(&x.arguments[1])
)
}
_ => format!("{:?}", x),
},
ScalarExpr::CastExpr(x) => format!("{:?}", x),
ScalarExpr::SubqueryExpr(x) => format!("{:?}", x),
ScalarExpr::UDFServerCall(x) => format!("{:?}", x),
}
}

fn push_down_filters(s_expr: &mut SExpr, filters: &[ScalarExpr]) -> Result<()> {
match s_expr.plan() {
RelOperator::Scan(s) => {
let mut new_scan = s.clone();
info!("push down {} filters:", filters.len());
for filter in filters {
info!("{}", Self::display_scalar_expr(filter));
}
if let Some(preds) = new_scan.push_down_predicates {
new_scan.push_down_predicates =
Some(preds.iter().chain(filters).cloned().collect());
Expand Down
17 changes: 11 additions & 6 deletions src/query/sql/src/planner/expression_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,11 @@ use crate::BaseTableColumn;
use crate::ColumnEntry;
use crate::IdentifierNormalizer;
use crate::Metadata;
use crate::MetadataRef;
use crate::ScalarExpr;
use crate::Visibility;

pub fn parse_exprs(
ctx: Arc<dyn TableContext>,
table_meta: Arc<dyn Table>,
sql: &str,
) -> Result<Vec<Expr>> {
let settings = Settings::create("".to_string());
pub fn bind_one_table(table_meta: Arc<dyn Table>) -> Result<(BindContext, MetadataRef)> {
let mut bind_context = BindContext::new();
let metadata = Arc::new(RwLock::new(Metadata::default()));
let table_index = metadata.write().add_table(
Expand Down Expand Up @@ -111,7 +107,16 @@ pub fn parse_exprs(

bind_context.add_column_binding(column_binding);
}
Ok((bind_context, metadata))
}

pub fn parse_exprs(
ctx: Arc<dyn TableContext>,
table_meta: Arc<dyn Table>,
sql: &str,
) -> Result<Vec<Expr>> {
let (mut bind_context, metadata) = bind_one_table(table_meta)?;
let settings = Settings::create("".to_string());
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
let mut type_checker = TypeChecker::new(
&mut bind_context,
Expand Down
29 changes: 29 additions & 0 deletions src/query/sql/src/planner/plans/scalar_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,35 @@ impl ScalarExpr {
ScalarExpr::CastExpr(expr) => expr.argument.evaluable(),
}
}

pub fn try_project_column_binding(
&self,
f: impl Fn(&ColumnBinding) -> Option<ColumnBinding> + Copy,
) -> Option<Self> {
match self {
ScalarExpr::BoundColumnRef(expr) => f(&expr.column).map(|x| {
ScalarExpr::BoundColumnRef(BoundColumnRef {
span: None,
column: x,
})
}),
ScalarExpr::FunctionCall(expr) => {
// Any of the arguments return None, then return None
let arguments = expr
.arguments
.iter()
.map(|x| x.try_project_column_binding(f))
.collect::<Option<Vec<_>>>()?;
Some(ScalarExpr::FunctionCall(FunctionCall {
span: None,
func_name: expr.func_name.clone(),
params: expr.params.clone(),
arguments,
}))
}
_ => None,
}
}
}

impl From<BoundColumnRef> for ScalarExpr {
Expand Down