diff --git a/src/query/catalog/src/runtime_filter_info.rs b/src/query/catalog/src/runtime_filter_info.rs index 0e57da36ccae7..865e659c9a5bb 100644 --- a/src/query/catalog/src/runtime_filter_info.rs +++ b/src/query/catalog/src/runtime_filter_info.rs @@ -66,6 +66,7 @@ pub struct RuntimeFilterEntry { pub stats: Arc, pub build_rows: usize, pub build_table_rows: Option, + pub probe_table_rows: Option, pub enabled: bool, } diff --git a/src/query/service/src/physical_plans/physical_hash_join.rs b/src/query/service/src/physical_plans/physical_hash_join.rs index 553a462ba688e..5d32f54861896 100644 --- a/src/query/service/src/physical_plans/physical_hash_join.rs +++ b/src/query/service/src/physical_plans/physical_hash_join.rs @@ -1339,6 +1339,7 @@ impl PhysicalPlanBuilder { &right_join_conditions, left_join_conditions_rt, build_table_indexes, + self.runtime_filter_routing.as_deref(), ) .await?; diff --git a/src/query/service/src/physical_plans/physical_plan_builder.rs b/src/query/service/src/physical_plans/physical_plan_builder.rs index 5190a683affe3..b46327ee3d773 100644 --- a/src/query/service/src/physical_plans/physical_plan_builder.rs +++ b/src/query/service/src/physical_plans/physical_plan_builder.rs @@ -31,6 +31,7 @@ use databend_storages_common_table_meta::meta::TableSnapshot; use crate::physical_plans::explain::PlanStatsInfo; use crate::physical_plans::physical_plan::PhysicalPlan; +use crate::physical_plans::runtime_filter::RuntimeFilterRouting; pub struct PhysicalPlanBuilder { pub metadata: MetadataRef, @@ -39,6 +40,7 @@ pub struct PhysicalPlanBuilder { pub dry_run: bool, // DataMutation info, used to build MergeInto physical plan pub mutation_build_info: Option, + pub runtime_filter_routing: Option>, } impl PhysicalPlanBuilder { @@ -50,6 +52,7 @@ impl PhysicalPlanBuilder { func_ctx, dry_run, mutation_build_info: None, + runtime_filter_routing: None, } } @@ -63,6 +66,12 @@ impl PhysicalPlanBuilder { } pub async fn build(&mut self, s_expr: &SExpr, required: ColumnSet) -> Result { + if self.runtime_filter_routing.is_none() + && self.ctx.get_settings().get_enable_join_runtime_filter()? + { + let routing = RuntimeFilterRouting::build(&self.metadata, s_expr)?; + self.runtime_filter_routing = Some(Arc::new(routing)); + } let mut plan = self.build_physical_plan(s_expr, required).await?; plan.adjust_plan_id(&mut 0); diff --git a/src/query/service/src/physical_plans/runtime_filter/builder.rs b/src/query/service/src/physical_plans/runtime_filter/builder.rs index 60cf42e3685b5..946b80b723691 100644 --- a/src/query/service/src/physical_plans/runtime_filter/builder.rs +++ b/src/query/service/src/physical_plans/runtime_filter/builder.rs @@ -12,62 +12,33 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::HashSet; use std::sync::Arc; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; -use databend_common_expression::types::DataType; +use databend_common_expression::type_check::check_cast; use databend_common_expression::RemoteExpr; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_sql::optimizer::ir::SExpr; use databend_common_sql::plans::Exchange; use databend_common_sql::plans::Join; -use databend_common_sql::plans::JoinEquiCondition; -use databend_common_sql::plans::JoinType; -use databend_common_sql::plans::RelOperator; -use databend_common_sql::plans::ScalarExpr; -use databend_common_sql::ColumnEntry; use databend_common_sql::IndexType; use databend_common_sql::MetadataRef; -use databend_common_sql::TypeCheck; +use super::routing::RuntimeFilterRouting; +use super::routing::RuntimeFilterTarget; use super::types::PhysicalRuntimeFilter; use super::types::PhysicalRuntimeFilters; +use super::utils::is_type_supported_for_bloom_filter; +use super::utils::is_type_supported_for_min_max_filter; +use super::utils::supported_join_type_for_runtime_filter; +use crate::physical_plans::runtime_filter::utils::is_valid_probe_key; /// Type alias for probe keys with runtime filter information /// Contains: (RemoteExpr, scan_id, table_index, column_idx) type ProbeKeysWithRuntimeFilter = Vec, usize, usize, IndexType)>>; -/// Check if a data type is supported for bloom filter -/// -/// Currently supports: numbers and strings -pub fn is_type_supported_for_bloom_filter(data_type: &DataType) -> bool { - data_type.is_number() || data_type.is_string() -} - -/// Check if a data type is supported for min-max filter -/// -/// Currently supports: numbers, dates, and strings -pub fn is_type_supported_for_min_max_filter(data_type: &DataType) -> bool { - data_type.is_number() || data_type.is_date() || data_type.is_string() -} - -/// Check if the join type is supported for runtime filter -/// -/// Runtime filters are only applicable to certain join types where -/// filtering the probe side can reduce processing -pub fn supported_join_type_for_runtime_filter(join_type: &JoinType) -> bool { - matches!( - join_type, - JoinType::Inner - | JoinType::Right - | JoinType::RightSemi - | JoinType::RightAnti - | JoinType::LeftMark - ) -} - /// Build runtime filters for a join operation /// /// This is the legacy method that creates one runtime filter per probe key. @@ -91,6 +62,7 @@ pub async fn build_runtime_filter( build_keys: &[RemoteExpr], probe_keys: ProbeKeysWithRuntimeFilter, build_table_indexes: Vec>, + routing: Option<&RuntimeFilterRouting>, ) -> Result { if !ctx.get_settings().get_enable_join_runtime_filter()? { return Ok(Default::default()); @@ -112,11 +84,11 @@ pub async fn build_runtime_filter( } let mut filters = Vec::new(); - - let probe_side = s_expr.probe_side_child(); + // Safety: routing is always Some if enable_join_runtime_filter is true. + let routing = routing.unwrap(); // Process each probe key that has runtime filter information - for (build_key, probe_key, scan_id, _table_index, column_idx, build_table_index) in build_keys + for (build_key, probe_key, _scan_id, table_index, column_idx, build_table_index) in build_keys .iter() .zip(probe_keys.into_iter()) .zip(build_table_indexes.into_iter()) @@ -128,26 +100,19 @@ pub async fn build_runtime_filter( { // Skip if the probe expression is neither a direct column reference nor a // cast from not null to nullable type (e.g. CAST(col AS Nullable(T))). - match &probe_key { - RemoteExpr::ColumnRef { .. } => {} - RemoteExpr::Cast { - expr: box RemoteExpr::ColumnRef { data_type, .. }, - dest_type, - .. - } if &dest_type.remove_nullable() == data_type => {} - _ => continue, + if !is_valid_probe_key(&probe_key) { + continue; } - let probe_targets = - find_probe_targets(metadata, probe_side, &probe_key, scan_id, column_idx)?; + let targets = routing.find_targets(s_expr, column_idx)?; + + let probe_targets = cast_probe_targets(targets, column_idx, &probe_key, build_key)?; - let build_table_rows = - get_build_table_rows(ctx.clone(), metadata, build_table_index).await?; + let build_table_rows = get_table_rows(ctx.clone(), metadata, build_table_index).await?; + let probe_table_rows = get_table_rows(ctx.clone(), metadata, Some(table_index)).await?; - let data_type = build_key - .as_expr(&BUILTIN_FUNCTIONS) - .data_type() - .remove_nullable(); + let build_key_expr = build_key.as_expr(&BUILTIN_FUNCTIONS); + let data_type = build_key_expr.data_type().remove_nullable(); let id = metadata.write().next_runtime_filter_id(); let enable_bloom_runtime_filter = is_type_supported_for_bloom_filter(&data_type); @@ -160,6 +125,7 @@ pub async fn build_runtime_filter( build_key: build_key.clone(), probe_targets, build_table_rows, + probe_table_rows, enable_bloom_runtime_filter, enable_inlist_runtime_filter: true, enable_min_max_runtime_filter, @@ -170,12 +136,12 @@ pub async fn build_runtime_filter( Ok(PhysicalRuntimeFilters { filters }) } -async fn get_build_table_rows( +async fn get_table_rows( ctx: Arc, metadata: &MetadataRef, - build_table_index: Option, + table_index: Option, ) -> Result> { - if let Some(table_index) = build_table_index { + if let Some(table_index) = table_index { let table = { let metadata_read = metadata.read(); metadata_read.table(table_index).table().clone() @@ -188,127 +154,36 @@ async fn get_build_table_rows( Ok(None) } -fn find_probe_targets( - metadata: &MetadataRef, - s_expr: &SExpr, - probe_key: &RemoteExpr, - probe_scan_id: usize, - probe_key_col_idx: IndexType, +fn cast_probe_targets( + targets: Vec, + current_column_idx: IndexType, + current_probe_key: &RemoteExpr, + build_key: &RemoteExpr, ) -> Result, usize)>> { - let mut uf = UnionFind::new(); - let mut column_to_remote: HashMap, usize)> = HashMap::new(); - column_to_remote.insert(probe_key_col_idx, (probe_key.clone(), probe_scan_id)); - - let equi_conditions = collect_equi_conditions(s_expr)?; - for cond in equi_conditions { - if let ( - Some((left_remote, left_scan_id, left_idx)), - Some((right_remote, right_scan_id, right_idx)), - ) = ( - scalar_to_remote_expr(metadata, &cond.left)?, - scalar_to_remote_expr(metadata, &cond.right)?, - ) { - uf.union(left_idx, right_idx); - column_to_remote.insert(left_idx, (left_remote, left_scan_id)); - column_to_remote.insert(right_idx, (right_remote, right_scan_id)); - } - } - - let equiv_class = uf.get_equivalence_class(probe_key_col_idx); - + let build_expr = build_key.as_expr(&BUILTIN_FUNCTIONS); + let build_type = build_expr.data_type().clone(); + let mut dedup = HashSet::new(); let mut result = Vec::new(); - for idx in equiv_class { - if let Some((remote_expr, scan_id)) = column_to_remote.get(&idx) { - result.push((remote_expr.clone(), *scan_id)); - } - } - - Ok(result) -} -fn collect_equi_conditions(s_expr: &SExpr) -> Result> { - let mut conditions = Vec::new(); - - if let RelOperator::Join(join) = s_expr.plan() { - if matches!(join.join_type, JoinType::Inner) { - conditions.extend(join.equi_conditions.clone()); - } - } - - for child in s_expr.children() { - conditions.extend(collect_equi_conditions(child)?); - } - - Ok(conditions) -} - -fn scalar_to_remote_expr( - metadata: &MetadataRef, - scalar: &ScalarExpr, -) -> Result, usize, IndexType)>> { - if scalar.used_columns().iter().all(|idx| { - matches!( - metadata.read().column(*idx), - ColumnEntry::BaseTableColumn(_) - ) - }) { - if let Some(column_idx) = scalar.used_columns().iter().next() { - let scan_id = metadata.read().base_column_scan_id(*column_idx); - - if let Some(scan_id) = scan_id { - let remote_expr = scalar - .as_raw_expr() - .type_check(&*metadata.read())? - .project_column_ref(|col| Ok(col.column_name.clone()))? - .as_remote_expr(); - - return Ok(Some((remote_expr, scan_id, *column_idx))); - } - } - } - - Ok(None) -} - -struct UnionFind { - parent: HashMap, -} - -impl UnionFind { - fn new() -> Self { - Self { - parent: HashMap::new(), - } - } - - fn find(&mut self, x: IndexType) -> IndexType { - if !self.parent.contains_key(&x) { - self.parent.insert(x, x); - return x; - } - - let parent = *self.parent.get(&x).unwrap(); - if parent != x { - let root = self.find(parent); - self.parent.insert(x, root); - } - *self.parent.get(&x).unwrap() - } - - fn union(&mut self, x: IndexType, y: IndexType) { - let root_x = self.find(x); - let root_y = self.find(y); - if root_x != root_y { - self.parent.insert(root_x, root_y); + for target in targets { + if !dedup.insert((target.scan_id, target.column_idx)) { + continue; } + let expr = if target.column_idx == current_column_idx { + current_probe_key.clone() + } else { + let target_expr = target.expr.as_expr(&BUILTIN_FUNCTIONS); + let casted = check_cast( + target_expr.span(), + false, + target_expr, + &build_type, + &BUILTIN_FUNCTIONS, + )?; + casted.as_remote_expr() + }; + result.push((expr, target.scan_id)); } - fn get_equivalence_class(&mut self, x: IndexType) -> Vec { - let root = self.find(x); - let all_keys: Vec = self.parent.keys().copied().collect(); - all_keys - .into_iter() - .filter(|&k| self.find(k) == root) - .collect() - } + Ok(result) } diff --git a/src/query/service/src/physical_plans/runtime_filter/mod.rs b/src/query/service/src/physical_plans/runtime_filter/mod.rs index d28e5ff6a8968..f60a17e5b07b6 100644 --- a/src/query/service/src/physical_plans/runtime_filter/mod.rs +++ b/src/query/service/src/physical_plans/runtime_filter/mod.rs @@ -13,7 +13,10 @@ // limitations under the License. mod builder; +mod routing; mod types; +mod utils; pub use builder::build_runtime_filter; +pub use routing::RuntimeFilterRouting; pub use types::*; diff --git a/src/query/service/src/physical_plans/runtime_filter/routing.rs b/src/query/service/src/physical_plans/runtime_filter/routing.rs new file mode 100644 index 0000000000000..3e58b43405631 --- /dev/null +++ b/src/query/service/src/physical_plans/runtime_filter/routing.rs @@ -0,0 +1,380 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::collections::HashSet; + +use databend_common_exception::Result; +use databend_common_expression::RemoteExpr; +use databend_common_sql::optimizer::ir::SExpr; +use databend_common_sql::plans::Join; +use databend_common_sql::plans::RelOperator; +use databend_common_sql::IndexType; +use databend_common_sql::MetadataRef; + +use super::utils::scalar_to_remote_expr; +use super::utils::supported_join_type_for_runtime_filter; + +#[derive(Clone, Debug)] +pub struct RuntimeFilterTarget { + pub column_idx: IndexType, + pub scan_id: usize, + pub expr: RemoteExpr, +} + +#[derive(Clone)] +struct RuntimeFilterColumn { + column_idx: IndexType, + scan_id: usize, + expr: RemoteExpr, +} + +#[derive(Clone)] +struct ScanInfo { + ancestors: Vec, +} + +#[derive(Clone, Copy, PartialEq, Eq)] +enum JoinSide { + Probe, + Build, +} + +type JoinPtr = usize; + +#[derive(Clone, Copy)] +struct AncestorInfo { + join_ptr: JoinPtr, + side: JoinSide, +} + +struct JoinNodeInfo { + parent: Option<(JoinPtr, JoinSide)>, + supported: bool, +} + +pub struct RuntimeFilterRouting { + join_nodes: HashMap, + scan_infos: HashMap, + column_classes: HashMap, + class_members: HashMap>, +} + +impl RuntimeFilterRouting { + pub fn build(metadata: &MetadataRef, root: &SExpr) -> Result { + let mut builder = RuntimeFilterRoutingBuilder::new(metadata.clone()); + builder.collect(root, None, &mut Vec::new())?; + builder.finish() + } + + pub fn find_targets( + &self, + join_s_expr: &SExpr, + column_idx: IndexType, + ) -> Result> { + let join_ptr = join_s_expr as *const SExpr as JoinPtr; + let candidate_joins = self.build_chain(join_ptr); + let class_id = self.column_classes.get(&column_idx).copied().unwrap(); + + let mut result = Vec::new(); + if let Some(columns) = self.class_members.get(&class_id) { + for column in columns { + if self.scan_in_probe_subtree(column.scan_id, &candidate_joins) { + result.push(RuntimeFilterTarget { + column_idx: column.column_idx, + scan_id: column.scan_id, + expr: column.expr.clone(), + }); + } + } + } + + Ok(result) + } + + fn scan_in_probe_subtree(&self, scan_id: usize, candidate_set: &HashSet) -> bool { + let Some(info) = self.scan_infos.get(&scan_id) else { + return false; + }; + + info.ancestors.iter().any(|ancestor| { + ancestor.side == JoinSide::Probe && candidate_set.contains(&ancestor.join_ptr) + }) + } + + fn build_chain(&self, join_ptr: JoinPtr) -> HashSet { + let mut chain = HashSet::new(); + let mut current = Some(join_ptr); + while let Some(ptr) = current { + chain.insert(ptr); + let Some(node) = self.join_nodes.get(&ptr) else { + break; + }; + match node.parent { + Some((parent_ptr, JoinSide::Build)) => { + if let Some(parent_node) = self.join_nodes.get(&parent_ptr) { + if parent_node.supported { + current = Some(parent_ptr); + continue; + } + } + break; + } + _ => break, + } + } + chain + } +} + +struct RuntimeFilterRoutingBuilder { + metadata: MetadataRef, + join_nodes: HashMap, + scan_infos: HashMap, + columns: HashMap, + union_find: UnionFind, +} + +impl RuntimeFilterRoutingBuilder { + fn new(metadata: MetadataRef) -> Self { + Self { + metadata, + join_nodes: HashMap::new(), + scan_infos: HashMap::new(), + columns: HashMap::new(), + union_find: UnionFind::new(), + } + } + + fn collect( + &mut self, + s_expr: &SExpr, + parent: Option<(JoinPtr, JoinSide)>, + ancestors: &mut Vec, + ) -> Result<()> { + match s_expr.plan() { + RelOperator::Join(join) => { + let ptr = s_expr as *const SExpr as JoinPtr; + let supported = supported_join_type_for_runtime_filter(&join.join_type); + self.join_nodes + .insert(ptr, JoinNodeInfo { parent, supported }); + self.process_join(join)?; + + let probe_child = s_expr.child(0)?; + ancestors.push(AncestorInfo { + join_ptr: ptr, + side: JoinSide::Probe, + }); + self.collect(probe_child, Some((ptr, JoinSide::Probe)), ancestors)?; + ancestors.pop(); + + let build_child = s_expr.child(1)?; + ancestors.push(AncestorInfo { + join_ptr: ptr, + side: JoinSide::Build, + }); + self.collect(build_child, Some((ptr, JoinSide::Build)), ancestors)?; + ancestors.pop(); + } + RelOperator::Scan(scan) => { + self.scan_infos.insert(scan.scan_id, ScanInfo { + ancestors: ancestors.clone(), + }); + } + _ => { + for child in s_expr.children() { + self.collect(child, parent, ancestors)?; + } + } + } + Ok(()) + } + + fn process_join(&mut self, join: &Join) -> Result<()> { + for condition in join.equi_conditions.iter() { + let left = scalar_to_remote_expr(&self.metadata, &condition.left)?; + let right = scalar_to_remote_expr(&self.metadata, &condition.right)?; + match (left, right) { + (Some(left), Some(right)) => { + self.union_find.union(left.2, right.2); + self.insert_column(left); + self.insert_column(right); + } + (Some(column), None) | (None, Some(column)) => { + self.insert_column(column); + } + (None, None) => {} + }; + } + Ok(()) + } + + fn insert_column( + &mut self, + (expr, scan_id, column_idx): (RemoteExpr, usize, IndexType), + ) { + self.columns.insert(column_idx, RuntimeFilterColumn { + column_idx, + scan_id, + expr, + }); + } + + fn finish(mut self) -> Result { + let mut column_classes = HashMap::new(); + let mut class_members: HashMap> = HashMap::new(); + for (column_idx, column) in self.columns.iter() { + let class_id = self.union_find.find(*column_idx); + column_classes.insert(*column_idx, class_id); + class_members + .entry(class_id) + .or_default() + .push(column.clone()); + } + + Ok(RuntimeFilterRouting { + join_nodes: self.join_nodes, + scan_infos: self.scan_infos, + column_classes, + class_members, + }) + } +} + +struct UnionFind { + parent: HashMap, +} + +impl UnionFind { + fn new() -> Self { + Self { + parent: HashMap::new(), + } + } + + fn find(&mut self, x: IndexType) -> IndexType { + if !self.parent.contains_key(&x) { + self.parent.insert(x, x); + return x; + } + + let parent = *self.parent.get(&x).unwrap(); + if parent != x { + let root = self.find(parent); + self.parent.insert(x, root); + } + *self.parent.get(&x).unwrap() + } + + fn union(&mut self, x: IndexType, y: IndexType) { + let root_x = self.find(x); + let root_y = self.find(y); + if root_x != root_y { + self.parent.insert(root_x, root_y); + } + } +} + +#[cfg(test)] +mod tests { + use databend_common_ast::Span; + use databend_common_expression::types::DataType; + use databend_common_expression::types::NumberDataType; + use databend_common_sql::plans::DummyTableScan; + use databend_common_sql::plans::RelOperator; + + use super::*; + + fn dummy_expr(name: &str) -> RemoteExpr { + RemoteExpr::ColumnRef { + span: Span::default(), + id: name.to_string(), + data_type: DataType::Number(NumberDataType::Int32), + display_name: name.to_string(), + } + } + + #[test] + fn test_find_targets_through_ancestors() { + let join_expr = SExpr::create_leaf(RelOperator::DummyTableScan(DummyTableScan)); + let parent_expr = SExpr::create_leaf(RelOperator::DummyTableScan(DummyTableScan)); + let join_ptr = &join_expr as *const SExpr as JoinPtr; + let parent_ptr = &parent_expr as *const SExpr as JoinPtr; + + let mut routing = RuntimeFilterRouting { + join_nodes: HashMap::new(), + scan_infos: HashMap::new(), + column_classes: HashMap::new(), + class_members: HashMap::new(), + }; + + routing.join_nodes.insert(parent_ptr, JoinNodeInfo { + parent: None, + supported: true, + }); + routing.join_nodes.insert(join_ptr, JoinNodeInfo { + parent: Some((parent_ptr, JoinSide::Build)), + supported: true, + }); + + routing.scan_infos.insert(10, ScanInfo { + ancestors: vec![AncestorInfo { + join_ptr, + side: JoinSide::Probe, + }], + }); + routing.scan_infos.insert(20, ScanInfo { + ancestors: vec![AncestorInfo { + join_ptr: parent_ptr, + side: JoinSide::Probe, + }], + }); + routing.scan_infos.insert(30, ScanInfo { + ancestors: vec![AncestorInfo { + join_ptr, + side: JoinSide::Build, + }], + }); + + routing.column_classes.insert(1, 100); + routing.column_classes.insert(2, 100); + routing.column_classes.insert(3, 100); + routing.class_members.insert(100, vec![ + RuntimeFilterColumn { + column_idx: 1, + scan_id: 10, + expr: dummy_expr("c1"), + }, + RuntimeFilterColumn { + column_idx: 2, + scan_id: 20, + expr: dummy_expr("c2"), + }, + RuntimeFilterColumn { + column_idx: 3, + scan_id: 30, + expr: dummy_expr("c3"), + }, + ]); + + let mut targets = routing.find_targets(&join_expr, 1).unwrap(); + targets.sort_by_key(|t| t.scan_id); + assert_eq!(targets.len(), 2); + assert_eq!(targets[0].scan_id, 10); + assert_eq!(targets[1].scan_id, 20); + + let none = routing.find_targets(&join_expr, 99).unwrap(); + assert!(none.is_empty()); + } +} diff --git a/src/query/service/src/physical_plans/runtime_filter/types.rs b/src/query/service/src/physical_plans/runtime_filter/types.rs index 11a7a9992f5c6..8b67c15019bf2 100644 --- a/src/query/service/src/physical_plans/runtime_filter/types.rs +++ b/src/query/service/src/physical_plans/runtime_filter/types.rs @@ -43,6 +43,7 @@ pub struct PhysicalRuntimeFilter { pub probe_targets: Vec<(RemoteExpr, usize)>, pub build_table_rows: Option, + pub probe_table_rows: Option, /// Enable bloom filter for this runtime filter pub enable_bloom_runtime_filter: bool, diff --git a/src/query/service/src/physical_plans/runtime_filter/utils.rs b/src/query/service/src/physical_plans/runtime_filter/utils.rs new file mode 100644 index 0000000000000..d95406e0b160f --- /dev/null +++ b/src/query/service/src/physical_plans/runtime_filter/utils.rs @@ -0,0 +1,96 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_expression::types::DataType; +use databend_common_expression::RemoteExpr; +use databend_common_sql::plans::JoinType; +use databend_common_sql::ColumnEntry; +use databend_common_sql::IndexType; +use databend_common_sql::MetadataRef; +use databend_common_sql::ScalarExpr; +use databend_common_sql::TypeCheck; + +/// Check if a data type is supported for bloom filter +/// +/// Currently supports: numbers and strings +pub fn is_type_supported_for_bloom_filter(data_type: &DataType) -> bool { + data_type.is_number() || data_type.is_string() +} + +/// Check if a data type is supported for min-max filter +/// +/// Currently supports: numbers, dates, and strings +pub fn is_type_supported_for_min_max_filter(data_type: &DataType) -> bool { + data_type.is_number() || data_type.is_date() || data_type.is_string() +} + +/// Check if the join type is supported for runtime filter +/// +/// Runtime filters are only applicable to certain join types where +/// filtering the probe side can reduce processing +pub fn supported_join_type_for_runtime_filter(join_type: &JoinType) -> bool { + matches!( + join_type, + JoinType::Inner + | JoinType::Right + | JoinType::RightSemi + | JoinType::RightAnti + | JoinType::LeftMark + ) +} + +pub fn scalar_to_remote_expr( + metadata: &MetadataRef, + scalar: &ScalarExpr, +) -> Result, usize, IndexType)>> { + if scalar.used_columns().iter().all(|idx| { + matches!( + metadata.read().column(*idx), + ColumnEntry::BaseTableColumn(_) + ) + }) { + if let Some(column_idx) = scalar.used_columns().iter().next() { + let scan_id = metadata.read().base_column_scan_id(*column_idx); + + if let Some(scan_id) = scan_id { + let remote_expr = scalar + .as_raw_expr() + .type_check(&*metadata.read())? + .project_column_ref(|col| Ok(col.column_name.clone()))? + .as_remote_expr(); + + if !is_valid_probe_key(&remote_expr) { + return Ok(None); + } + + return Ok(Some((remote_expr, scan_id, *column_idx))); + } + } + } + + Ok(None) +} + +pub fn is_valid_probe_key(probe_key: &RemoteExpr) -> bool { + match probe_key { + RemoteExpr::ColumnRef { .. } => true, + RemoteExpr::Cast { + expr: box RemoteExpr::ColumnRef { data_type, .. }, + dest_type, + .. + } if &dest_type.remove_nullable() == data_type => true, + _ => false, + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs index ae2a25d06733b..24a34a89fd70c 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs @@ -71,6 +71,7 @@ pub struct RuntimeFilterDesc { pub build_key: Expr, pub probe_targets: Vec<(Expr, usize)>, pub build_table_rows: Option, + pub probe_table_rows: Option, pub enable_bloom_runtime_filter: bool, pub enable_inlist_runtime_filter: bool, pub enable_min_max_runtime_filter: bool, @@ -99,6 +100,7 @@ impl From<&PhysicalRuntimeFilter> for RuntimeFilterDesc { .map(|(probe_key, scan_id)| (probe_key.as_expr(&BUILTIN_FUNCTIONS), *scan_id)) .collect(), build_table_rows: runtime_filter.build_table_rows, + probe_table_rows: runtime_filter.probe_table_rows, enable_bloom_runtime_filter: runtime_filter.enable_bloom_runtime_filter, enable_inlist_runtime_filter: runtime_filter.enable_inlist_runtime_filter, enable_min_max_runtime_filter: runtime_filter.enable_min_max_runtime_filter, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs index 9af1bec4b542b..ffd246cd5902c 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs @@ -42,6 +42,7 @@ struct JoinRuntimeFilterPacketBuilder<'a> { bloom_threshold: usize, min_max_threshold: usize, selectivity_threshold: u64, + probe_ratio_threshold: u64, } impl<'a> JoinRuntimeFilterPacketBuilder<'a> { @@ -53,6 +54,7 @@ impl<'a> JoinRuntimeFilterPacketBuilder<'a> { bloom_threshold: usize, min_max_threshold: usize, selectivity_threshold: u64, + probe_ratio_threshold: u64, ) -> Result { let build_key_column = Self::eval_build_key_column(data_blocks, func_ctx, build_key)?; Ok(Self { @@ -62,6 +64,7 @@ impl<'a> JoinRuntimeFilterPacketBuilder<'a> { bloom_threshold, min_max_threshold, selectivity_threshold, + probe_ratio_threshold, }) } fn build(&self, desc: &RuntimeFilterDesc) -> Result { @@ -69,6 +72,7 @@ impl<'a> JoinRuntimeFilterPacketBuilder<'a> { desc, self.build_key_column.len(), self.selectivity_threshold, + self.probe_ratio_threshold, ) { return Ok(RuntimeFilterPacket { id: desc.id, @@ -201,6 +205,7 @@ pub(super) fn should_enable_runtime_filter( desc: &RuntimeFilterDesc, build_num_rows: usize, selectivity_threshold: u64, + probe_ratio_threshold: u64, ) -> bool { if build_num_rows == 0 { return false; @@ -214,6 +219,21 @@ pub(super) fn should_enable_runtime_filter( return false; }; + if let Some(probe_table_rows) = desc.probe_table_rows { + let ratio = probe_table_rows as f64 / build_num_rows as f64; + if ratio < probe_ratio_threshold as f64 { + log::info!( + "RUNTIME-FILTER: Disable runtime filter {} - low probe/build ratio: {:.2} < {} (probe_rows={}, build_rows={})", + desc.id, + ratio, + probe_ratio_threshold, + probe_table_rows, + build_num_rows + ); + return false; + } + } + let selectivity_pct = (build_num_rows as f64 / build_table_rows as f64) * 100.0; if selectivity_pct < selectivity_threshold as f64 { @@ -248,6 +268,7 @@ pub fn build_runtime_filter_packet( bloom_threshold: usize, min_max_threshold: usize, selectivity_threshold: u64, + probe_ratio_threshold: u64, is_spill_happened: bool, ) -> Result { if is_spill_happened { @@ -274,6 +295,7 @@ pub fn build_runtime_filter_packet( bloom_threshold, min_max_threshold, selectivity_threshold, + probe_ratio_threshold, )? .build(rf)?, ); diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs index ec607c43f7e05..e79e31d943d7b 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs @@ -51,6 +51,7 @@ pub async fn build_runtime_filter_infos( packet: JoinRuntimeFilterPacket, runtime_filter_descs: HashMap, selectivity_threshold: u64, + probe_ratio_threshold: u64, max_threads: usize, ) -> Result> { let total_build_rows = packet.build_rows; @@ -62,7 +63,12 @@ pub async fn build_runtime_filter_infos( // Iterate over all runtime filter packets for packet in packets.into_values() { let desc = runtime_filter_descs.get(&packet.id).unwrap(); - let enabled = should_enable_runtime_filter(desc, total_build_rows, selectivity_threshold); + let enabled = should_enable_runtime_filter( + desc, + total_build_rows, + selectivity_threshold, + probe_ratio_threshold, + ); // Apply this single runtime filter to all probe targets (scan_id, probe_key pairs) // This implements the design goal: "one runtime filter built once, pushed down to multiple scans" @@ -106,6 +112,7 @@ pub async fn build_runtime_filter_infos( stats: Arc::new(RuntimeFilterStats::new()), build_rows: total_build_rows, build_table_rows: desc.build_table_rows, + probe_table_rows: desc.probe_table_rows, enabled, }; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/interface.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/interface.rs index 298d1a85ac58e..5ac2a33066ead 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/interface.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/interface.rs @@ -53,6 +53,10 @@ pub async fn build_and_push_down_runtime_filter( .ctx .get_settings() .get_join_runtime_filter_selectivity_threshold()?; + let probe_ratio_threshold = join + .ctx + .get_settings() + .get_join_runtime_filter_probe_ratio_threshold()?; let build_start = Instant::now(); let mut packet = build_runtime_filter_packet( @@ -64,6 +68,7 @@ pub async fn build_and_push_down_runtime_filter( bloom_threshold, min_max_threshold, selectivity_threshold, + probe_ratio_threshold, is_spill_happened, )?; let build_time = build_start.elapsed(); @@ -90,6 +95,7 @@ pub async fn build_and_push_down_runtime_filter( packet, runtime_filter_descs, selectivity_threshold, + probe_ratio_threshold, max_threads, ) .await?; diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs index 3b1dab84b39a7..880e20d3b1287 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs @@ -103,6 +103,7 @@ impl Join for InnerHashJoin { desc.bloom_threshold, desc.min_max_threshold, desc.selectivity_threshold, + desc.probe_ratio_threshold, false, ) } diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs index 8993c49ec38bd..299038d2f5bf6 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs @@ -33,6 +33,7 @@ pub struct RuntimeFiltersDesc { pub inlist_threshold: usize, pub min_max_threshold: usize, pub selectivity_threshold: u64, + pub probe_ratio_threshold: u64, broadcast_id: Option, pub filters_desc: Vec, @@ -46,6 +47,7 @@ impl RuntimeFiltersDesc { let inlist_threshold = settings.get_inlist_runtime_filter_threshold()? as usize; let min_max_threshold = settings.get_min_max_runtime_filter_threshold()? as usize; let selectivity_threshold = settings.get_join_runtime_filter_selectivity_threshold()?; + let probe_ratio_threshold = settings.get_join_runtime_filter_probe_ratio_threshold()?; let mut filters_desc = Vec::with_capacity(join.runtime_filter.filters.len()); let mut runtime_filters_ready = Vec::with_capacity(join.runtime_filter.filters.len()); @@ -71,6 +73,7 @@ impl RuntimeFiltersDesc { inlist_threshold, min_max_threshold, selectivity_threshold, + probe_ratio_threshold, runtime_filters_ready, ctx: ctx.clone(), broadcast_id: join.broadcast_id, @@ -87,6 +90,7 @@ impl RuntimeFiltersDesc { packet, runtime_filter_descs, self.selectivity_threshold, + self.probe_ratio_threshold, self.ctx.get_settings().get_max_threads()? as usize, ) .await?; diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 655073e222c06..4259ae1ef3c90 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -557,6 +557,7 @@ impl QueryContext { stats: RuntimeFilterStatsSnapshot, build_rows: usize, build_table_rows: Option, + probe_table_rows: Option, enabled: bool, } @@ -579,6 +580,7 @@ impl QueryContext { stats: entry.stats.snapshot(), build_rows: entry.build_rows, build_table_rows: entry.build_table_rows, + probe_table_rows: entry.probe_table_rows, enabled: entry.enabled, }); } @@ -612,6 +614,7 @@ impl QueryContext { stats, build_rows, build_table_rows, + probe_table_rows, enabled, } = filter; @@ -642,6 +645,12 @@ impl QueryContext { .map(|v| v.to_string()) .unwrap_or_else(|| "unknown".to_string()) )), + FormatTreeNode::new(format!( + "probe table rows: {}", + probe_table_rows + .map(|v| v.to_string()) + .unwrap_or_else(|| "unknown".to_string()) + )), ]; if let Some(column) = bloom_column { diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 795c82339870d..144b17e29546f 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -516,6 +516,13 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(1..=u64::MAX)), }), + ("join_runtime_filter_probe_ratio_threshold", DefaultSettingValue { + value: UserSettingValue::UInt64(3), + desc: "Minimum probe/build rows ratio required to enable join runtime filters. Filters are enabled only when (probe_rows / build_rows) >= threshold. Default 3 means probe side must be at least 3x larger.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(1..=u64::MAX)), + }), ("max_execute_time_in_seconds", DefaultSettingValue { value: UserSettingValue::UInt64(0), desc: "Sets the maximum query execution time in seconds. Setting it to 0 means no limit.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 6057058270e25..cfd8417a69fca 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -392,6 +392,10 @@ impl Settings { self.try_get_u64("join_runtime_filter_selectivity_threshold") } + pub fn get_join_runtime_filter_probe_ratio_threshold(&self) -> Result { + self.try_get_u64("join_runtime_filter_probe_ratio_threshold") + } + pub fn get_prefer_broadcast_join(&self) -> Result { Ok(self.try_get_u64("prefer_broadcast_join")? != 0) }