Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/query/catalog/src/runtime_filter_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub struct RuntimeFilterEntry {
pub stats: Arc<RuntimeFilterStats>,
pub build_rows: usize,
pub build_table_rows: Option<u64>,
pub probe_table_rows: Option<u64>,
pub enabled: bool,
}

Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/physical_plans/physical_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,7 @@ impl PhysicalPlanBuilder {
&right_join_conditions,
left_join_conditions_rt,
build_table_indexes,
self.runtime_filter_routing.as_deref(),
)
.await?;

Expand Down
9 changes: 9 additions & 0 deletions src/query/service/src/physical_plans/physical_plan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use databend_storages_common_table_meta::meta::TableSnapshot;

use crate::physical_plans::explain::PlanStatsInfo;
use crate::physical_plans::physical_plan::PhysicalPlan;
use crate::physical_plans::runtime_filter::RuntimeFilterRouting;

pub struct PhysicalPlanBuilder {
pub metadata: MetadataRef,
Expand All @@ -39,6 +40,7 @@ pub struct PhysicalPlanBuilder {
pub dry_run: bool,
// DataMutation info, used to build MergeInto physical plan
pub mutation_build_info: Option<MutationBuildInfo>,
pub runtime_filter_routing: Option<Arc<RuntimeFilterRouting>>,
}

impl PhysicalPlanBuilder {
Expand All @@ -50,6 +52,7 @@ impl PhysicalPlanBuilder {
func_ctx,
dry_run,
mutation_build_info: None,
runtime_filter_routing: None,
}
}

Expand All @@ -63,6 +66,12 @@ impl PhysicalPlanBuilder {
}

pub async fn build(&mut self, s_expr: &SExpr, required: ColumnSet) -> Result<PhysicalPlan> {
if self.runtime_filter_routing.is_none()
&& self.ctx.get_settings().get_enable_join_runtime_filter()?
{
let routing = RuntimeFilterRouting::build(&self.metadata, s_expr)?;
self.runtime_filter_routing = Some(Arc::new(routing));
}
let mut plan = self.build_physical_plan(s_expr, required).await?;
plan.adjust_plan_id(&mut 0);

Expand Down
227 changes: 51 additions & 176 deletions src/query/service/src/physical_plans/runtime_filter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,62 +12,33 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::types::DataType;
use databend_common_expression::type_check::check_cast;
use databend_common_expression::RemoteExpr;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_sql::optimizer::ir::SExpr;
use databend_common_sql::plans::Exchange;
use databend_common_sql::plans::Join;
use databend_common_sql::plans::JoinEquiCondition;
use databend_common_sql::plans::JoinType;
use databend_common_sql::plans::RelOperator;
use databend_common_sql::plans::ScalarExpr;
use databend_common_sql::ColumnEntry;
use databend_common_sql::IndexType;
use databend_common_sql::MetadataRef;
use databend_common_sql::TypeCheck;

use super::routing::RuntimeFilterRouting;
use super::routing::RuntimeFilterTarget;
use super::types::PhysicalRuntimeFilter;
use super::types::PhysicalRuntimeFilters;
use super::utils::is_type_supported_for_bloom_filter;
use super::utils::is_type_supported_for_min_max_filter;
use super::utils::supported_join_type_for_runtime_filter;
use crate::physical_plans::runtime_filter::utils::is_valid_probe_key;

/// Type alias for probe keys with runtime filter information
/// Contains: (RemoteExpr, scan_id, table_index, column_idx)
type ProbeKeysWithRuntimeFilter = Vec<Option<(RemoteExpr<String>, usize, usize, IndexType)>>;

/// Check if a data type is supported for bloom filter
///
/// Currently supports: numbers and strings
pub fn is_type_supported_for_bloom_filter(data_type: &DataType) -> bool {
data_type.is_number() || data_type.is_string()
}

/// Check if a data type is supported for min-max filter
///
/// Currently supports: numbers, dates, and strings
pub fn is_type_supported_for_min_max_filter(data_type: &DataType) -> bool {
data_type.is_number() || data_type.is_date() || data_type.is_string()
}

/// Check if the join type is supported for runtime filter
///
/// Runtime filters are only applicable to certain join types where
/// filtering the probe side can reduce processing
pub fn supported_join_type_for_runtime_filter(join_type: &JoinType) -> bool {
matches!(
join_type,
JoinType::Inner
| JoinType::Right
| JoinType::RightSemi
| JoinType::RightAnti
| JoinType::LeftMark
)
}

/// Build runtime filters for a join operation
///
/// This is the legacy method that creates one runtime filter per probe key.
Expand All @@ -91,6 +62,7 @@ pub async fn build_runtime_filter(
build_keys: &[RemoteExpr],
probe_keys: ProbeKeysWithRuntimeFilter,
build_table_indexes: Vec<Option<IndexType>>,
routing: Option<&RuntimeFilterRouting>,
) -> Result<PhysicalRuntimeFilters> {
if !ctx.get_settings().get_enable_join_runtime_filter()? {
return Ok(Default::default());
Expand All @@ -112,11 +84,11 @@ pub async fn build_runtime_filter(
}

let mut filters = Vec::new();

let probe_side = s_expr.probe_side_child();
// Safety: routing is always Some if enable_join_runtime_filter is true.
let routing = routing.unwrap();

// Process each probe key that has runtime filter information
for (build_key, probe_key, scan_id, _table_index, column_idx, build_table_index) in build_keys
for (build_key, probe_key, _scan_id, table_index, column_idx, build_table_index) in build_keys
.iter()
.zip(probe_keys.into_iter())
.zip(build_table_indexes.into_iter())
Expand All @@ -128,26 +100,19 @@ pub async fn build_runtime_filter(
{
// Skip if the probe expression is neither a direct column reference nor a
// cast from not null to nullable type (e.g. CAST(col AS Nullable(T))).
match &probe_key {
RemoteExpr::ColumnRef { .. } => {}
RemoteExpr::Cast {
expr: box RemoteExpr::ColumnRef { data_type, .. },
dest_type,
..
} if &dest_type.remove_nullable() == data_type => {}
_ => continue,
if !is_valid_probe_key(&probe_key) {
continue;
}

let probe_targets =
find_probe_targets(metadata, probe_side, &probe_key, scan_id, column_idx)?;
let targets = routing.find_targets(s_expr, column_idx)?;

let probe_targets = cast_probe_targets(targets, column_idx, &probe_key, build_key)?;

let build_table_rows =
get_build_table_rows(ctx.clone(), metadata, build_table_index).await?;
let build_table_rows = get_table_rows(ctx.clone(), metadata, build_table_index).await?;
let probe_table_rows = get_table_rows(ctx.clone(), metadata, Some(table_index)).await?;

let data_type = build_key
.as_expr(&BUILTIN_FUNCTIONS)
.data_type()
.remove_nullable();
let build_key_expr = build_key.as_expr(&BUILTIN_FUNCTIONS);
let data_type = build_key_expr.data_type().remove_nullable();
let id = metadata.write().next_runtime_filter_id();

let enable_bloom_runtime_filter = is_type_supported_for_bloom_filter(&data_type);
Expand All @@ -160,6 +125,7 @@ pub async fn build_runtime_filter(
build_key: build_key.clone(),
probe_targets,
build_table_rows,
probe_table_rows,
enable_bloom_runtime_filter,
enable_inlist_runtime_filter: true,
enable_min_max_runtime_filter,
Expand All @@ -170,12 +136,12 @@ pub async fn build_runtime_filter(
Ok(PhysicalRuntimeFilters { filters })
}

async fn get_build_table_rows(
async fn get_table_rows(
ctx: Arc<dyn TableContext>,
metadata: &MetadataRef,
build_table_index: Option<IndexType>,
table_index: Option<IndexType>,
) -> Result<Option<u64>> {
if let Some(table_index) = build_table_index {
if let Some(table_index) = table_index {
let table = {
let metadata_read = metadata.read();
metadata_read.table(table_index).table().clone()
Expand All @@ -188,127 +154,36 @@ async fn get_build_table_rows(
Ok(None)
}

fn find_probe_targets(
metadata: &MetadataRef,
s_expr: &SExpr,
probe_key: &RemoteExpr<String>,
probe_scan_id: usize,
probe_key_col_idx: IndexType,
fn cast_probe_targets(
targets: Vec<RuntimeFilterTarget>,
current_column_idx: IndexType,
current_probe_key: &RemoteExpr<String>,
build_key: &RemoteExpr,
) -> Result<Vec<(RemoteExpr<String>, usize)>> {
let mut uf = UnionFind::new();
let mut column_to_remote: HashMap<IndexType, (RemoteExpr<String>, usize)> = HashMap::new();
column_to_remote.insert(probe_key_col_idx, (probe_key.clone(), probe_scan_id));

let equi_conditions = collect_equi_conditions(s_expr)?;
for cond in equi_conditions {
if let (
Some((left_remote, left_scan_id, left_idx)),
Some((right_remote, right_scan_id, right_idx)),
) = (
scalar_to_remote_expr(metadata, &cond.left)?,
scalar_to_remote_expr(metadata, &cond.right)?,
) {
uf.union(left_idx, right_idx);
column_to_remote.insert(left_idx, (left_remote, left_scan_id));
column_to_remote.insert(right_idx, (right_remote, right_scan_id));
}
}

let equiv_class = uf.get_equivalence_class(probe_key_col_idx);

let build_expr = build_key.as_expr(&BUILTIN_FUNCTIONS);
let build_type = build_expr.data_type().clone();
let mut dedup = HashSet::new();
let mut result = Vec::new();
for idx in equiv_class {
if let Some((remote_expr, scan_id)) = column_to_remote.get(&idx) {
result.push((remote_expr.clone(), *scan_id));
}
}

Ok(result)
}

fn collect_equi_conditions(s_expr: &SExpr) -> Result<Vec<JoinEquiCondition>> {
let mut conditions = Vec::new();

if let RelOperator::Join(join) = s_expr.plan() {
if matches!(join.join_type, JoinType::Inner) {
conditions.extend(join.equi_conditions.clone());
}
}

for child in s_expr.children() {
conditions.extend(collect_equi_conditions(child)?);
}

Ok(conditions)
}

fn scalar_to_remote_expr(
metadata: &MetadataRef,
scalar: &ScalarExpr,
) -> Result<Option<(RemoteExpr<String>, usize, IndexType)>> {
if scalar.used_columns().iter().all(|idx| {
matches!(
metadata.read().column(*idx),
ColumnEntry::BaseTableColumn(_)
)
}) {
if let Some(column_idx) = scalar.used_columns().iter().next() {
let scan_id = metadata.read().base_column_scan_id(*column_idx);

if let Some(scan_id) = scan_id {
let remote_expr = scalar
.as_raw_expr()
.type_check(&*metadata.read())?
.project_column_ref(|col| Ok(col.column_name.clone()))?
.as_remote_expr();

return Ok(Some((remote_expr, scan_id, *column_idx)));
}
}
}

Ok(None)
}

struct UnionFind {
parent: HashMap<IndexType, IndexType>,
}

impl UnionFind {
fn new() -> Self {
Self {
parent: HashMap::new(),
}
}

fn find(&mut self, x: IndexType) -> IndexType {
if !self.parent.contains_key(&x) {
self.parent.insert(x, x);
return x;
}

let parent = *self.parent.get(&x).unwrap();
if parent != x {
let root = self.find(parent);
self.parent.insert(x, root);
}
*self.parent.get(&x).unwrap()
}

fn union(&mut self, x: IndexType, y: IndexType) {
let root_x = self.find(x);
let root_y = self.find(y);
if root_x != root_y {
self.parent.insert(root_x, root_y);
for target in targets {
if !dedup.insert((target.scan_id, target.column_idx)) {
continue;
}
let expr = if target.column_idx == current_column_idx {
current_probe_key.clone()
} else {
let target_expr = target.expr.as_expr(&BUILTIN_FUNCTIONS);
let casted = check_cast(
target_expr.span(),
false,
target_expr,
&build_type,
&BUILTIN_FUNCTIONS,
)?;
casted.as_remote_expr()
};
result.push((expr, target.scan_id));
}

fn get_equivalence_class(&mut self, x: IndexType) -> Vec<IndexType> {
let root = self.find(x);
let all_keys: Vec<IndexType> = self.parent.keys().copied().collect();
all_keys
.into_iter()
.filter(|&k| self.find(k) == root)
.collect()
}
Ok(result)
}
3 changes: 3 additions & 0 deletions src/query/service/src/physical_plans/runtime_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
// limitations under the License.

mod builder;
mod routing;
mod types;
mod utils;

pub use builder::build_runtime_filter;
pub use routing::RuntimeFilterRouting;
pub use types::*;
Loading
Loading