From 24637c0beb7dd85fe32e318f571b3c52fda101a3 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Thu, 30 Nov 2023 18:33:59 +0800 Subject: [PATCH] chore: remove old runtime filter --- .../interpreter_merge_into_static_filter.rs | 1 - .../src/pipelines/builders/builder_join.rs | 98 +- .../service/src/pipelines/pipeline_builder.rs | 9 +- .../service/src/pipelines/processors/mod.rs | 2 - .../pipelines/processors/transforms/mod.rs | 5 - .../transforms/runtime_filter/mod.rs | 19 - .../runtime_filter_connector.rs | 35 - .../runtime_filter/runtime_filter_source.rs | 156 --- .../transforms/transform_runtime_filter.rs | 159 --- .../src/schedulers/fragments/fragmenter.rs | 1 - .../tests/it/sql/planner/format/mod.rs | 1 - src/query/sql/src/executor/format.rs | 23 - src/query/sql/src/executor/physical_plan.rs | 12 +- .../sql/src/executor/physical_plan_builder.rs | 4 - .../sql/src/executor/physical_plan_display.rs | 8 - .../sql/src/executor/physical_plan_visitor.rs | 22 - .../sql/src/executor/physical_plans/mod.rs | 2 - .../physical_plans/physical_hash_join.rs | 3 - .../physical_runtime_filter_source.rs | 128 -- src/query/sql/src/executor/profile.rs | 11 - src/query/sql/src/planner/binder/binder.rs | 10 - src/query/sql/src/planner/binder/join.rs | 1 - .../planner/format/display_rel_operator.rs | 1 - src/query/sql/src/planner/optimizer/format.rs | 1 - .../heuristic/decorrelate/decorrelate.rs | 4 - .../heuristic/decorrelate/flatten_plan.rs | 2 - .../optimizer/heuristic/subquery_rewriter.rs | 3 - .../src/planner/optimizer/hyper_dp/dphyp.rs | 7 +- .../planner/optimizer/hyper_dp/join_node.rs | 1 - src/query/sql/src/planner/optimizer/mod.rs | 1 - .../sql/src/planner/optimizer/optimizer.rs | 8 - .../rule/rewrite/rule_commute_join.rs | 4 - .../planner/optimizer/runtime_filter/mod.rs | 102 -- src/query/sql/src/planner/optimizer/s_expr.rs | 2 - src/query/sql/src/planner/plans/join.rs | 3 - src/query/sql/src/planner/plans/mod.rs | 3 - src/query/sql/src/planner/plans/operator.rs | 30 - .../planner/plans/runtime_filter_source.rs | 110 -- .../suites/crdb/runtime_filter_join.test | 1077 ----------------- .../suites/query/runtime_filter_join.test | 520 -------- 40 files changed, 9 insertions(+), 2580 deletions(-) delete mode 100644 src/query/service/src/pipelines/processors/transforms/runtime_filter/mod.rs delete mode 100644 src/query/service/src/pipelines/processors/transforms/runtime_filter/runtime_filter_connector.rs delete mode 100644 src/query/service/src/pipelines/processors/transforms/runtime_filter/runtime_filter_source.rs delete mode 100644 src/query/service/src/pipelines/processors/transforms/transform_runtime_filter.rs delete mode 100644 src/query/sql/src/executor/physical_plans/physical_runtime_filter_source.rs delete mode 100644 src/query/sql/src/planner/optimizer/runtime_filter/mod.rs delete mode 100644 src/query/sql/src/planner/plans/runtime_filter_source.rs delete mode 100644 tests/sqllogictests/suites/crdb/runtime_filter_join.test delete mode 100644 tests/sqllogictests/suites/query/runtime_filter_join.test diff --git a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs b/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs index 37e9b5056ff7f..aaadf3f1ff9d6 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs @@ -293,7 +293,6 @@ impl MergeIntoInterpreter { RelOperator::Exchange(_) => {} RelOperator::UnionAll(_) => {} RelOperator::DummyTableScan(_) => {} - RelOperator::RuntimeFilterSource(_) => {} RelOperator::Window(_) => {} RelOperator::ProjectSet(_) => {} RelOperator::MaterializedCte(_) => {} diff --git a/src/query/service/src/pipelines/builders/builder_join.rs b/src/query/service/src/pipelines/builders/builder_join.rs index 5ea3424fc8165..3dc99058a9a36 100644 --- a/src/query/service/src/pipelines/builders/builder_join.rs +++ b/src/query/service/src/pipelines/builders/builder_join.rs @@ -16,10 +16,7 @@ use std::sync::Arc; use common_base::base::tokio::sync::Barrier; use common_exception::Result; -use common_pipeline_core::processors::InputPort; use common_pipeline_core::processors::ProcessorPtr; -use common_pipeline_core::Pipe; -use common_pipeline_core::PipeItem; use common_pipeline_sinks::Sinker; use common_pipeline_transforms::processors::ProcessorProfileWrapper; use common_pipeline_transforms::processors::ProfileStub; @@ -27,7 +24,6 @@ use common_pipeline_transforms::processors::Transformer; use common_sql::executor::physical_plans::HashJoin; use common_sql::executor::physical_plans::MaterializedCte; use common_sql::executor::physical_plans::RangeJoin; -use common_sql::executor::physical_plans::RuntimeFilterSource; use common_sql::executor::PhysicalPlan; use common_sql::ColumnBinding; use common_sql::IndexType; @@ -42,13 +38,10 @@ use crate::pipelines::processors::transforms::HashJoinProbeState; use crate::pipelines::processors::transforms::MaterializedCteSink; use crate::pipelines::processors::transforms::MaterializedCteState; use crate::pipelines::processors::transforms::ProbeSpillState; -use crate::pipelines::processors::transforms::RuntimeFilterState; use crate::pipelines::processors::transforms::TransformHashJoinBuild; use crate::pipelines::processors::transforms::TransformHashJoinProbe; use crate::pipelines::processors::HashJoinDesc; use crate::pipelines::processors::HashJoinState; -use crate::pipelines::processors::SinkRuntimeFilterSource; -use crate::pipelines::processors::TransformRuntimeFilter; use crate::pipelines::PipelineBuilder; use crate::sessions::QueryContext; @@ -202,17 +195,11 @@ impl PipelineBuilder { Ok(ProcessorPtr::create(transform)) } }; - if hash_join_plan.contain_runtime_filter { - build_res.main_pipeline.duplicate(false)?; - self.join_state = Some(build_state.clone()); - self.index = Some(self.pipelines.len()); - } else { - // for merge into - if hash_join_plan.need_hold_hash_table { - self.join_state = Some(build_state.clone()) - } - build_res.main_pipeline.add_sink(create_sink_processor)?; + // for merge into + if hash_join_plan.need_hold_hash_table { + self.join_state = Some(build_state.clone()) } + build_res.main_pipeline.add_sink(create_sink_processor)?; self.pipelines.push(build_res.main_pipeline.finalize()); self.pipelines.extend(build_res.sources_pipelines); @@ -301,83 +288,6 @@ impl PipelineBuilder { Ok(()) } - pub fn build_runtime_filter_source( - &mut self, - runtime_filter_source: &RuntimeFilterSource, - ) -> Result<()> { - let state = self.build_runtime_filter_state(self.ctx.clone(), runtime_filter_source)?; - self.expand_runtime_filter_source(&runtime_filter_source.right_side, state.clone())?; - self.build_runtime_filter(&runtime_filter_source.left_side, state)?; - Ok(()) - } - - fn expand_runtime_filter_source( - &mut self, - _right_side: &PhysicalPlan, - state: Arc, - ) -> Result<()> { - let pipeline = &mut self.pipelines[self.index.unwrap()]; - let output_size = pipeline.output_len(); - debug_assert!(output_size % 2 == 0); - - let mut items = Vec::with_capacity(output_size); - // Join - // / \ - // / \ - // RFSource \ - // / \ \ - // / \ \ - // scan t1 scan t2 - for _ in 0..output_size / 2 { - let input = InputPort::create(); - items.push(PipeItem::create( - ProcessorPtr::create(TransformHashJoinBuild::try_create( - input.clone(), - self.join_state.as_ref().unwrap().clone(), - None, - )?), - vec![input], - vec![], - )); - let input = InputPort::create(); - items.push(PipeItem::create( - ProcessorPtr::create(Sinker::::create( - input.clone(), - SinkRuntimeFilterSource::new(state.clone()), - )), - vec![input], - vec![], - )); - } - pipeline.add_pipe(Pipe::create(output_size, 0, items)); - Ok(()) - } - - fn build_runtime_filter( - &mut self, - left_side: &PhysicalPlan, - state: Arc, - ) -> Result<()> { - self.build_pipeline(left_side)?; - self.main_pipeline.add_transform(|input, output| { - let processor = TransformRuntimeFilter::create(input, output, state.clone()); - Ok(ProcessorPtr::create(processor)) - })?; - Ok(()) - } - - fn build_runtime_filter_state( - &self, - ctx: Arc, - runtime_filter_source: &RuntimeFilterSource, - ) -> Result> { - Ok(Arc::new(RuntimeFilterState::new( - ctx, - runtime_filter_source.left_runtime_filters.clone(), - runtime_filter_source.right_runtime_filters.clone(), - ))) - } - pub(crate) fn build_materialized_cte( &mut self, materialized_cte: &MaterializedCte, diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index f0a4e911317c1..9a8c29a153412 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -44,10 +44,7 @@ pub struct PipelineBuilder { // probe data_fields for merge into pub probe_data_fields: Option>, - // Used in runtime filter source pub join_state: Option>, - // record the index of join build side pipeline in `pipelines` - pub index: Option, // Cte -> state, each cte has it's own state pub cte_state: HashMap>, @@ -72,13 +69,12 @@ impl PipelineBuilder { func_ctx, settings, pipelines: vec![], - join_state: None, main_pipeline: Pipeline::with_scopes(scopes), proc_profs: prof_span_set, exchange_injector: DefaultExchangeInjector::create(), - index: None, cte_state: HashMap::new(), probe_data_fields: None, + join_state: None, } } @@ -134,9 +130,6 @@ impl PipelineBuilder { PhysicalPlan::Exchange(_) => Err(ErrorCode::Internal( "Invalid physical plan with PhysicalPlan::Exchange", )), - PhysicalPlan::RuntimeFilterSource(runtime_filter_source) => { - self.build_runtime_filter_source(runtime_filter_source) - } PhysicalPlan::RangeJoin(range_join) => self.build_range_join(range_join), PhysicalPlan::MaterializedCte(materialized_cte) => { self.build_materialized_cte(materialized_cte) diff --git a/src/query/service/src/pipelines/processors/mod.rs b/src/query/service/src/pipelines/processors/mod.rs index 8e1e492e67a91..011163f9c02b5 100644 --- a/src/query/service/src/pipelines/processors/mod.rs +++ b/src/query/service/src/pipelines/processors/mod.rs @@ -18,12 +18,10 @@ pub(crate) mod transforms; pub use transforms::DeduplicateRowNumber; pub use transforms::HashJoinDesc; pub use transforms::HashJoinState; -pub use transforms::SinkRuntimeFilterSource; pub use transforms::TransformAddStreamColumns; pub use transforms::TransformCastSchema; pub use transforms::TransformCreateSets; pub use transforms::TransformLimit; pub use transforms::TransformResortAddOn; pub use transforms::TransformResortAddOnWithoutSourceSchema; -pub use transforms::TransformRuntimeFilter; pub use transforms::TransformWindow; diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index 74639dc06c69f..4e328cacf150a 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -19,7 +19,6 @@ mod processor_accumulate_row_number; mod processor_deduplicate_row_number; mod processor_extract_hash_table_by_row_number; pub(crate) mod range_join; -mod runtime_filter; mod transform_add_computed_columns; mod transform_add_const_columns; mod transform_add_stream_columns; @@ -31,7 +30,6 @@ mod transform_merge_block; mod transform_resort_addon; mod transform_resort_addon_without_source_schema; mod transform_runtime_cast_schema; -mod transform_runtime_filter; mod transform_srf; mod transform_udf; mod window; @@ -41,7 +39,6 @@ pub use processor_accumulate_row_number::AccumulateRowNumber; pub use processor_deduplicate_row_number::DeduplicateRowNumber; pub use processor_extract_hash_table_by_row_number::ExtractHashTableByRowNumber; pub use range_join::RangeJoinState; -pub use runtime_filter::RuntimeFilterState; pub use transform_add_computed_columns::TransformAddComputedColumns; pub use transform_add_const_columns::TransformAddConstColumns; pub use transform_add_stream_columns::TransformAddStreamColumns; @@ -56,8 +53,6 @@ pub use transform_merge_block::TransformMergeBlock; pub use transform_resort_addon::TransformResortAddOn; pub use transform_resort_addon_without_source_schema::TransformResortAddOnWithoutSourceSchema; pub use transform_runtime_cast_schema::TransformRuntimeCastSchema; -pub use transform_runtime_filter::SinkRuntimeFilterSource; -pub use transform_runtime_filter::TransformRuntimeFilter; pub use transform_srf::TransformSRF; pub use transform_udf::TransformUdf; pub use window::FrameBound; diff --git a/src/query/service/src/pipelines/processors/transforms/runtime_filter/mod.rs b/src/query/service/src/pipelines/processors/transforms/runtime_filter/mod.rs deleted file mode 100644 index 4b8ab74376c25..0000000000000 --- a/src/query/service/src/pipelines/processors/transforms/runtime_filter/mod.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -mod runtime_filter_connector; -mod runtime_filter_source; - -pub use runtime_filter_connector::RuntimeFilterConnector; -pub use runtime_filter_source::RuntimeFilterState; diff --git a/src/query/service/src/pipelines/processors/transforms/runtime_filter/runtime_filter_connector.rs b/src/query/service/src/pipelines/processors/transforms/runtime_filter/runtime_filter_connector.rs deleted file mode 100644 index 2656ee9b56d07..0000000000000 --- a/src/query/service/src/pipelines/processors/transforms/runtime_filter/runtime_filter_connector.rs +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_exception::Result; -use common_expression::DataBlock; - -// The trait will connect runtime filter source of join build side -// with join probe side -#[async_trait::async_trait] -pub trait RuntimeFilterConnector: Send + Sync { - fn attach(&self); - - fn detach(&self) -> Result<()>; - - fn is_finished(&self) -> Result; - - async fn wait_finish(&self) -> Result<()>; - - // Consume runtime filter for blocks from join probe side - fn consume(&self, data: &DataBlock) -> Result>; - - // Collect runtime filter from join build side - fn collect(&self, data: &DataBlock) -> Result<()>; -} diff --git a/src/query/service/src/pipelines/processors/transforms/runtime_filter/runtime_filter_source.rs b/src/query/service/src/pipelines/processors/transforms/runtime_filter/runtime_filter_source.rs deleted file mode 100644 index e49d383ff6901..0000000000000 --- a/src/query/service/src/pipelines/processors/transforms/runtime_filter/runtime_filter_source.rs +++ /dev/null @@ -1,156 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::BTreeMap; -use std::collections::HashMap; -use std::sync::Arc; - -use common_arrow::arrow::bitmap::MutableBitmap; -use common_base::base::tokio::sync::Notify; -use common_catalog::table_context::TableContext; -use common_exception::Result; -use common_expression::DataBlock; -use common_expression::Evaluator; -use common_expression::RemoteExpr; -use common_functions::BUILTIN_FUNCTIONS; -use common_sql::plans::RuntimeFilterId; -use parking_lot::Mutex; -use parking_lot::RwLock; -use storages_common_index::filters::Filter; -use storages_common_index::filters::FilterBuilder; -use storages_common_index::filters::Xor8Builder; -use storages_common_index::filters::Xor8Filter; - -use crate::pipelines::processors::transforms::runtime_filter::RuntimeFilterConnector; -use crate::sessions::QueryContext; - -pub struct RuntimeFilterState { - pub(crate) ctx: Arc, - pub(crate) channel_filter_builders: RwLock>, - pub(crate) channel_filters: RwLock>, - pub(crate) left_runtime_filters: BTreeMap, - pub(crate) right_runtime_filters: BTreeMap, - pub(crate) sinker_count: Mutex, - pub(crate) finished_notify: Arc, - pub(crate) finished: Mutex, -} - -impl RuntimeFilterState { - pub fn new( - ctx: Arc, - left_runtime_filters: BTreeMap, - right_runtime_filters: BTreeMap, - ) -> Self { - RuntimeFilterState { - ctx, - channel_filter_builders: Default::default(), - channel_filters: Default::default(), - left_runtime_filters, - right_runtime_filters, - sinker_count: Mutex::new(0), - finished_notify: Arc::new(Default::default()), - finished: Default::default(), - } - } -} - -#[async_trait::async_trait] -impl RuntimeFilterConnector for RuntimeFilterState { - fn attach(&self) { - let mut sinker_count = self.sinker_count.lock(); - *sinker_count += 1; - } - - fn detach(&self) -> Result<()> { - let mut sinker_count = self.sinker_count.lock(); - *sinker_count -= 1; - if *sinker_count == 0 { - let mut channel_filters = self.channel_filters.write(); - let mut channel_filter_builders = self.channel_filter_builders.write(); - for (id, filter_builder) in channel_filter_builders.iter_mut() { - channel_filters.insert(id.clone(), filter_builder.build()?); - } - let mut finished = self.finished.lock(); - *finished = true; - self.finished_notify.notify_waiters(); - } - Ok(()) - } - - fn is_finished(&self) -> Result { - Ok(*self.finished.lock()) - } - - #[async_backtrace::framed] - async fn wait_finish(&self) -> Result<()> { - if !self.is_finished()? { - self.finished_notify.notified().await; - } - Ok(()) - } - - fn consume(&self, data: &DataBlock) -> Result> { - let channel_filters = self.channel_filters.read(); - if channel_filters.is_empty() { - return Ok(vec![data.clone()]); - } - // Create a bitmap to filter data - let mut bitmap = MutableBitmap::from_len_set(data.num_rows()); - let func_ctx = self.ctx.get_function_context()?; - for (id, remote_expr) in self.left_runtime_filters.iter() { - let expr = remote_expr.as_expr(&BUILTIN_FUNCTIONS); - let evaluator = Evaluator::new(data, &func_ctx, &BUILTIN_FUNCTIONS); - let column = evaluator - .run(&expr)? - .convert_to_full_column(expr.data_type(), data.num_rows()); - let channel_filter = channel_filters.get(id).unwrap(); - for (idx, val) in column.iter().enumerate() { - if !channel_filter.contains(&val) { - bitmap.set(idx, false); - } - } - } - Ok(vec![data.clone().filter_with_bitmap(&bitmap.into())?]) - } - - fn collect(&self, data: &DataBlock) -> Result<()> { - let func_ctx = self.ctx.get_function_context()?; - for (id, remote_expr) in self.right_runtime_filters.iter() { - let expr = remote_expr.as_expr(&BUILTIN_FUNCTIONS); - // expr represents equi condition in join build side - // Such as: `select * from t1 inner join t2 on t1.a + 1 = t2.a + 2` - // expr is `t2.a + 2` - // First we need get expected values from data by expr - let evaluator = Evaluator::new(data, &func_ctx, &BUILTIN_FUNCTIONS); - let column = evaluator - .run(&expr)? - .convert_to_full_column(expr.data_type(), data.num_rows()); - - // Generate Xor8 filter by column - let mut channel_filter_builders = self.channel_filter_builders.write(); - if let Some(filter_builder) = channel_filter_builders.get_mut(id) { - for val in column.iter() { - filter_builder.add_key(&val); - } - } else { - let mut filter_builder = Xor8Builder::create(); - for val in column.iter() { - filter_builder.add_key(&val); - } - channel_filter_builders.insert(id.clone(), filter_builder); - } - } - Ok(()) - } -} diff --git a/src/query/service/src/pipelines/processors/transforms/transform_runtime_filter.rs b/src/query/service/src/pipelines/processors/transforms/transform_runtime_filter.rs deleted file mode 100644 index d5455a26ae74c..0000000000000 --- a/src/query/service/src/pipelines/processors/transforms/transform_runtime_filter.rs +++ /dev/null @@ -1,159 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; -use std::collections::VecDeque; -use std::sync::Arc; - -use common_exception::Result; -use common_expression::DataBlock; -use common_pipeline_core::processors::Event; -use common_pipeline_core::processors::InputPort; -use common_pipeline_core::processors::OutputPort; -use common_pipeline_core::processors::Processor; -use common_pipeline_sinks::Sink; - -use crate::pipelines::processors::transforms::runtime_filter::RuntimeFilterConnector; - -// The sinker will contains runtime filter source -// It will be represented as `ChannelFilter -pub struct SinkRuntimeFilterSource { - connector: Arc, -} - -impl SinkRuntimeFilterSource { - pub fn new(connector: Arc) -> Self { - connector.attach(); - Self { connector } - } -} - -impl Sink for SinkRuntimeFilterSource { - const NAME: &'static str = "GenerateRuntimeFilter"; - - fn on_finish(&mut self) -> Result<()> { - self.connector.detach() - } - - fn consume(&mut self, data_block: DataBlock) -> Result<()> { - self.connector.collect(&data_block) - } -} - -enum RuntimeFilterStep { - // Collect runtime filter - Collect, - // Consume runtime filter - Consume, -} - -pub struct TransformRuntimeFilter { - input_data: Option, - output_data_blocks: VecDeque, - input_port: Arc, - output_port: Arc, - pub(crate) connector: Arc, - step: RuntimeFilterStep, -} - -impl TransformRuntimeFilter { - pub fn create( - input_port: Arc, - output_port: Arc, - connector: Arc, - ) -> Box { - Box::new(TransformRuntimeFilter { - input_data: None, - output_data_blocks: Default::default(), - input_port, - output_port, - connector, - step: RuntimeFilterStep::Collect, - }) - } -} - -#[async_trait::async_trait] -impl Processor for TransformRuntimeFilter { - fn name(&self) -> String { - "TransformRuntimeFilter".to_string() - } - - fn as_any(&mut self) -> &mut dyn Any { - self - } - - fn event(&mut self) -> Result { - match self.step { - RuntimeFilterStep::Collect => Ok(Event::Async), - RuntimeFilterStep::Consume => { - if self.output_port.is_finished() { - self.input_port.finish(); - return Ok(Event::Finished); - } - if !self.output_port.can_push() { - self.input_port.set_not_need_data(); - return Ok(Event::NeedConsume); - } - - if !self.output_data_blocks.is_empty() { - let data = self.output_data_blocks.pop_front().unwrap(); - self.output_port.push_data(Ok(data)); - return Ok(Event::NeedConsume); - } - - if self.input_data.is_some() { - return Ok(Event::Sync); - } - - if self.input_port.has_data() { - let data = self.input_port.pull_data().unwrap()?; - self.input_data = Some(data); - return Ok(Event::Sync); - } - - if self.input_port.is_finished() { - self.output_port.finish(); - return Ok(Event::Finished); - } - - self.input_port.set_need_data(); - Ok(Event::NeedData) - } - } - } - - fn process(&mut self) -> Result<()> { - match self.step { - RuntimeFilterStep::Collect => Ok(()), - RuntimeFilterStep::Consume => { - if let Some(data) = self.input_data.take() { - let data = data.convert_to_full(); - self.output_data_blocks - .extend(self.connector.consume(&data)?); - } - Ok(()) - } - } - } - - #[async_backtrace::framed] - async fn async_process(&mut self) -> Result<()> { - if let RuntimeFilterStep::Collect = &self.step { - self.connector.wait_finish().await?; - self.step = RuntimeFilterStep::Consume; - } - Ok(()) - } -} diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs index c18593e267049..d1f28f1946d56 100644 --- a/src/query/service/src/schedulers/fragments/fragmenter.rs +++ b/src/query/service/src/schedulers/fragments/fragmenter.rs @@ -235,7 +235,6 @@ impl PhysicalPlanReplacer for Fragmenter { from_correlated_subquery: plan.from_correlated_subquery, probe_to_build: plan.probe_to_build.clone(), output_schema: plan.output_schema.clone(), - contain_runtime_filter: plan.contain_runtime_filter, need_hold_hash_table: plan.need_hold_hash_table, stat_info: plan.stat_info.clone(), })) diff --git a/src/query/service/tests/it/sql/planner/format/mod.rs b/src/query/service/tests/it/sql/planner/format/mod.rs index 6379e13610dc0..d05f370529a34 100644 --- a/src/query/service/tests/it/sql/planner/format/mod.rs +++ b/src/query/service/tests/it/sql/planner/format/mod.rs @@ -152,7 +152,6 @@ fn test_format() { join_type: JoinType::Inner, marker_index: None, from_correlated_subquery: false, - contain_runtime_filter: false, need_hold_hash_table: false, } .into(), diff --git a/src/query/sql/src/executor/format.rs b/src/query/sql/src/executor/format.rs index 912de92c7544f..e4c650206c42f 100644 --- a/src/query/sql/src/executor/format.rs +++ b/src/query/sql/src/executor/format.rs @@ -45,7 +45,6 @@ use crate::executor::physical_plans::RangeJoin; use crate::executor::physical_plans::RangeJoinType; use crate::executor::physical_plans::ReclusterSink; use crate::executor::physical_plans::RowFetch; -use crate::executor::physical_plans::RuntimeFilterSource; use crate::executor::physical_plans::Sort; use crate::executor::physical_plans::TableScan; use crate::executor::physical_plans::Udf; @@ -203,9 +202,6 @@ fn to_format_tree( PhysicalPlan::CommitSink(plan) => commit_sink_to_format_tree(plan, metadata, profs), PhysicalPlan::ProjectSet(plan) => project_set_to_format_tree(plan, metadata, profs), PhysicalPlan::Udf(plan) => udf_to_format_tree(plan, metadata, profs), - PhysicalPlan::RuntimeFilterSource(plan) => { - runtime_filter_source_to_format_tree(plan, metadata, profs) - } PhysicalPlan::RangeJoin(plan) => range_join_to_format_tree(plan, metadata, profs), PhysicalPlan::CopyIntoTable(plan) => copy_into_table(plan), PhysicalPlan::ReplaceAsyncSourcer(_) => { @@ -1160,25 +1156,6 @@ fn udf_to_format_tree( Ok(FormatTreeNode::with_children("Udf".to_string(), children)) } -fn runtime_filter_source_to_format_tree( - plan: &RuntimeFilterSource, - metadata: &Metadata, - prof_span_set: &SharedProcessorProfiles, -) -> Result> { - let children = vec![ - FormatTreeNode::new(format!( - "output columns: [{}]", - format_output_columns(plan.output_schema()?, metadata, true) - )), - to_format_tree(&plan.left_side, metadata, prof_span_set)?, - to_format_tree(&plan.right_side, metadata, prof_span_set)?, - ]; - Ok(FormatTreeNode::with_children( - "RuntimeFilterSource".to_string(), - children, - )) -} - fn materialized_cte_to_format_tree( plan: &MaterializedCte, metadata: &Metadata, diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index d5e8a2d378160..f84643bf99675 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -48,7 +48,6 @@ use crate::executor::physical_plans::ReplaceAsyncSourcer; use crate::executor::physical_plans::ReplaceDeduplicate; use crate::executor::physical_plans::ReplaceInto; use crate::executor::physical_plans::RowFetch; -use crate::executor::physical_plans::RuntimeFilterSource; use crate::executor::physical_plans::Sort; use crate::executor::physical_plans::TableScan; use crate::executor::physical_plans::Udf; @@ -75,7 +74,6 @@ pub enum PhysicalPlan { RangeJoin(RangeJoin), Exchange(Exchange), UnionAll(UnionAll), - RuntimeFilterSource(RuntimeFilterSource), CteScan(CteScan), MaterializedCte(MaterializedCte), ConstantTableScan(ConstantTableScan), @@ -139,7 +137,6 @@ impl PhysicalPlan { PhysicalPlan::RangeJoin(v) => v.plan_id, PhysicalPlan::Exchange(v) => v.plan_id, PhysicalPlan::UnionAll(v) => v.plan_id, - PhysicalPlan::RuntimeFilterSource(v) => v.plan_id, PhysicalPlan::DistributedInsertSelect(v) => v.plan_id, PhysicalPlan::ExchangeSource(v) => v.plan_id, PhysicalPlan::ExchangeSink(v) => v.plan_id, @@ -183,7 +180,6 @@ impl PhysicalPlan { PhysicalPlan::ExchangeSink(plan) => plan.output_schema(), PhysicalPlan::UnionAll(plan) => plan.output_schema(), PhysicalPlan::ProjectSet(plan) => plan.output_schema(), - PhysicalPlan::RuntimeFilterSource(plan) => plan.output_schema(), PhysicalPlan::RangeJoin(plan) => plan.output_schema(), PhysicalPlan::CopyIntoTable(plan) => plan.output_schema(), PhysicalPlan::CteScan(plan) => plan.output_schema(), @@ -227,7 +223,6 @@ impl PhysicalPlan { PhysicalPlan::ExchangeSource(_) => "Exchange Source".to_string(), PhysicalPlan::ExchangeSink(_) => "Exchange Sink".to_string(), PhysicalPlan::ProjectSet(_) => "Unnest".to_string(), - PhysicalPlan::RuntimeFilterSource(_) => "RuntimeFilterSource".to_string(), PhysicalPlan::CompactSource(_) => "CompactBlock".to_string(), PhysicalPlan::DeleteSource(_) => "DeleteSource".to_string(), PhysicalPlan::CommitSink(_) => "CommitSink".to_string(), @@ -285,10 +280,6 @@ impl PhysicalPlan { } PhysicalPlan::CommitSink(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::ProjectSet(plan) => Box::new(std::iter::once(plan.input.as_ref())), - PhysicalPlan::RuntimeFilterSource(plan) => Box::new( - std::iter::once(plan.left_side.as_ref()) - .chain(std::iter::once(plan.right_side.as_ref())), - ), PhysicalPlan::RangeJoin(plan) => Box::new( std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())), ), @@ -328,8 +319,7 @@ impl PhysicalPlan { PhysicalPlan::ProjectSet(plan) => plan.input.try_find_single_data_source(), PhysicalPlan::RowFetch(plan) => plan.input.try_find_single_data_source(), PhysicalPlan::Udf(plan) => plan.input.try_find_single_data_source(), - PhysicalPlan::RuntimeFilterSource(_) - | PhysicalPlan::UnionAll(_) + PhysicalPlan::UnionAll(_) | PhysicalPlan::ExchangeSource(_) | PhysicalPlan::HashJoin(_) | PhysicalPlan::RangeJoin(_) diff --git a/src/query/sql/src/executor/physical_plan_builder.rs b/src/query/sql/src/executor/physical_plan_builder.rs index c4e764aa86e2d..4d1de21bd6f9b 100644 --- a/src/query/sql/src/executor/physical_plan_builder.rs +++ b/src/query/sql/src/executor/physical_plan_builder.rs @@ -99,10 +99,6 @@ impl PhysicalPlanBuilder { self.build_union_all(s_expr, union_all, required, stat_info) .await } - RelOperator::RuntimeFilterSource(runtime_filter) => { - self.build_runtime_filter(s_expr, runtime_filter, required) - .await - } RelOperator::ProjectSet(project_set) => { self.build_project_set(s_expr, project_set, required, stat_info) .await diff --git a/src/query/sql/src/executor/physical_plan_display.rs b/src/query/sql/src/executor/physical_plan_display.rs index 971815ae4f1d0..1400eb0927113 100644 --- a/src/query/sql/src/executor/physical_plan_display.rs +++ b/src/query/sql/src/executor/physical_plan_display.rs @@ -50,7 +50,6 @@ use crate::executor::physical_plans::ReplaceAsyncSourcer; use crate::executor::physical_plans::ReplaceDeduplicate; use crate::executor::physical_plans::ReplaceInto; use crate::executor::physical_plans::RowFetch; -use crate::executor::physical_plans::RuntimeFilterSource; use crate::executor::physical_plans::Sort; use crate::executor::physical_plans::TableScan; use crate::executor::physical_plans::Udf; @@ -96,7 +95,6 @@ impl<'a> Display for PhysicalPlanIndentFormatDisplay<'a> { PhysicalPlan::DeleteSource(delete) => write!(f, "{}", delete)?, PhysicalPlan::CommitSink(commit) => write!(f, "{}", commit)?, PhysicalPlan::ProjectSet(unnest) => write!(f, "{}", unnest)?, - PhysicalPlan::RuntimeFilterSource(plan) => write!(f, "{}", plan)?, PhysicalPlan::RangeJoin(plan) => write!(f, "{}", plan)?, PhysicalPlan::CopyIntoTable(copy_into_table) => write!(f, "{}", copy_into_table)?, PhysicalPlan::ReplaceAsyncSourcer(async_sourcer) => write!(f, "{}", async_sourcer)?, @@ -436,12 +434,6 @@ impl Display for CopyIntoTable { } } -impl Display for RuntimeFilterSource { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "RuntimeFilterSource") - } -} - impl Display for ProjectSet { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let scalars = self diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs index 4b563d4abca42..12ef0e976be91 100644 --- a/src/query/sql/src/executor/physical_plan_visitor.rs +++ b/src/query/sql/src/executor/physical_plan_visitor.rs @@ -48,7 +48,6 @@ use crate::executor::physical_plans::ReplaceAsyncSourcer; use crate::executor::physical_plans::ReplaceDeduplicate; use crate::executor::physical_plans::ReplaceInto; use crate::executor::physical_plans::RowFetch; -use crate::executor::physical_plans::RuntimeFilterSource; use crate::executor::physical_plans::Sort; use crate::executor::physical_plans::TableScan; use crate::executor::physical_plans::Udf; @@ -78,7 +77,6 @@ pub trait PhysicalPlanReplacer { PhysicalPlan::UnionAll(plan) => self.replace_union(plan), PhysicalPlan::DistributedInsertSelect(plan) => self.replace_insert_select(plan), PhysicalPlan::ProjectSet(plan) => self.replace_project_set(plan), - PhysicalPlan::RuntimeFilterSource(plan) => self.replace_runtime_filter_source(plan), PhysicalPlan::CompactSource(plan) => self.replace_compact_source(plan), PhysicalPlan::DeleteSource(plan) => self.replace_delete_source(plan), PhysicalPlan::CommitSink(plan) => self.replace_commit_sink(plan), @@ -233,7 +231,6 @@ pub trait PhysicalPlanReplacer { from_correlated_subquery: plan.from_correlated_subquery, probe_to_build: plan.probe_to_build.clone(), output_schema: plan.output_schema.clone(), - contain_runtime_filter: plan.contain_runtime_filter, need_hold_hash_table: plan.need_hold_hash_table, stat_info: plan.stat_info.clone(), })) @@ -482,21 +479,6 @@ pub trait PhysicalPlanReplacer { })) } - fn replace_runtime_filter_source( - &mut self, - plan: &RuntimeFilterSource, - ) -> Result { - let left_side = self.replace(&plan.left_side)?; - let right_side = self.replace(&plan.right_side)?; - Ok(PhysicalPlan::RuntimeFilterSource(RuntimeFilterSource { - plan_id: plan.plan_id, - left_side: Box::new(left_side), - right_side: Box::new(right_side), - left_runtime_filters: plan.left_runtime_filters.clone(), - right_runtime_filters: plan.right_runtime_filters.clone(), - })) - } - fn replace_udf(&mut self, plan: &Udf) -> Result { let input = self.replace(&plan.input)?; Ok(PhysicalPlan::Udf(Udf { @@ -583,10 +565,6 @@ impl PhysicalPlan { } CopyIntoTableSource::Stage(_) => {} }, - PhysicalPlan::RuntimeFilterSource(plan) => { - Self::traverse(&plan.left_side, pre_visit, visit, post_visit); - Self::traverse(&plan.right_side, pre_visit, visit, post_visit); - } PhysicalPlan::RangeJoin(plan) => { Self::traverse(&plan.left, pre_visit, visit, post_visit); Self::traverse(&plan.right, pre_visit, visit, post_visit); diff --git a/src/query/sql/src/executor/physical_plans/mod.rs b/src/query/sql/src/executor/physical_plans/mod.rs index 2a1d0af5f7b70..c878c31365b7a 100644 --- a/src/query/sql/src/executor/physical_plans/mod.rs +++ b/src/query/sql/src/executor/physical_plans/mod.rs @@ -78,8 +78,6 @@ mod physical_replace_into; pub use physical_replace_into::ReplaceInto; mod physical_row_fetch; pub use physical_row_fetch::RowFetch; -mod physical_runtime_filter_source; -pub use physical_runtime_filter_source::RuntimeFilterSource; mod physical_sort; pub use physical_sort::Sort; mod physical_table_scan; diff --git a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs index 8bb9f025ec1d4..0f89416c33ea6 100644 --- a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs +++ b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs @@ -61,8 +61,6 @@ pub struct HashJoin { // (probe index, (is probe column nullable, is build column nullable)) pub probe_to_build: Vec<(usize, (bool, bool))>, pub output_schema: DataSchemaRef, - // It means that join has a corresponding runtime filter - pub contain_runtime_filter: bool, // if we execute distributed merge into, we need to hold the // hash table to get not match data from source. pub need_hold_hash_table: bool, @@ -398,7 +396,6 @@ impl PhysicalPlanBuilder { from_correlated_subquery: join.from_correlated_subquery, probe_to_build, output_schema, - contain_runtime_filter: join.contain_runtime_filter, need_hold_hash_table: join.need_hold_hash_table, stat_info: Some(stat_info), })) diff --git a/src/query/sql/src/executor/physical_plans/physical_runtime_filter_source.rs b/src/query/sql/src/executor/physical_plans/physical_runtime_filter_source.rs deleted file mode 100644 index d022e13e1e0ff..0000000000000 --- a/src/query/sql/src/executor/physical_plans/physical_runtime_filter_source.rs +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::BTreeMap; - -use common_exception::ErrorCode; -use common_exception::Result; -use common_expression::type_check; -use common_expression::type_check::common_super_type; -use common_expression::ConstantFolder; -use common_expression::DataSchemaRef; -use common_expression::RemoteExpr; -use common_functions::BUILTIN_FUNCTIONS; - -use crate::executor::PhysicalPlan; -use crate::executor::PhysicalPlanBuilder; -use crate::optimizer::SExpr; -use crate::plans::RuntimeFilterId; -use crate::ColumnSet; -use crate::TypeCheck; - -// Build runtime predicate data from join build side -// Then pass it to runtime filter on join probe side -// It's the children of join node -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct RuntimeFilterSource { - // A unique id of operator in a `PhysicalPlan` tree, only used for display. - pub plan_id: u32, - pub left_side: Box, - pub right_side: Box, - pub left_runtime_filters: BTreeMap, - pub right_runtime_filters: BTreeMap, -} - -impl RuntimeFilterSource { - pub fn output_schema(&self) -> Result { - self.left_side.output_schema() - } -} - -impl PhysicalPlanBuilder { - pub(crate) async fn build_runtime_filter( - &mut self, - s_expr: &SExpr, - runtime_filter: &crate::plans::RuntimeFilterSource, - required: ColumnSet, - ) -> Result { - // 1. Prune unused Columns. - let left_required = runtime_filter - .left_runtime_filters - .iter() - .fold(required.clone(), |acc, v| { - acc.union(&v.1.used_columns()).cloned().collect() - }); - let right_required = runtime_filter - .right_runtime_filters - .iter() - .fold(required, |acc, v| { - acc.union(&v.1.used_columns()).cloned().collect() - }); - - // 2. Build physical plan. - let left_side = Box::new(self.build(s_expr.child(0)?, left_required).await?); - let left_schema = left_side.output_schema()?; - let right_side = Box::new(self.build(s_expr.child(1)?, right_required).await?); - let right_schema = right_side.output_schema()?; - let mut left_runtime_filters = BTreeMap::new(); - let mut right_runtime_filters = BTreeMap::new(); - for (left, right) in runtime_filter - .left_runtime_filters - .iter() - .zip(runtime_filter.right_runtime_filters.iter()) - { - let left_expr = left - .1 - .type_check(left_schema.as_ref())? - .project_column_ref(|index| left_schema.index_of(&index.to_string()).unwrap()); - let right_expr = right - .1 - .type_check(right_schema.as_ref())? - .project_column_ref(|index| right_schema.index_of(&index.to_string()).unwrap()); - - let common_ty = common_super_type(left_expr.data_type().clone(), right_expr.data_type().clone(), &BUILTIN_FUNCTIONS.default_cast_rules) - .ok_or_else(|| ErrorCode::SemanticError(format!("RuntimeFilter's types cannot be matched, left column {:?}, type: {:?}, right column {:?}, type: {:?}", left.0, left_expr.data_type(), right.0, right_expr.data_type())))?; - - let left_expr = type_check::check_cast( - left_expr.span(), - false, - left_expr, - &common_ty, - &BUILTIN_FUNCTIONS, - )?; - let right_expr = type_check::check_cast( - right_expr.span(), - false, - right_expr, - &common_ty, - &BUILTIN_FUNCTIONS, - )?; - - let (left_expr, _) = - ConstantFolder::fold(&left_expr, &self.func_ctx, &BUILTIN_FUNCTIONS); - let (right_expr, _) = - ConstantFolder::fold(&right_expr, &self.func_ctx, &BUILTIN_FUNCTIONS); - - left_runtime_filters.insert(left.0.clone(), left_expr.as_remote_expr()); - right_runtime_filters.insert(right.0.clone(), right_expr.as_remote_expr()); - } - Ok(PhysicalPlan::RuntimeFilterSource(RuntimeFilterSource { - plan_id: self.next_plan_id(), - left_side, - right_side, - left_runtime_filters, - right_runtime_filters, - })) - } -} diff --git a/src/query/sql/src/executor/profile.rs b/src/query/sql/src/executor/profile.rs index e42bfcbd22b53..10e1592b5e416 100644 --- a/src/query/sql/src/executor/profile.rs +++ b/src/query/sql/src/executor/profile.rs @@ -431,17 +431,6 @@ fn flatten_plan_node_profile( }; plan_node_profs.push(prof); } - PhysicalPlan::RuntimeFilterSource(source) => { - let proc_prof = profs.get(&source.plan_id).copied().unwrap_or_default(); - let prof = OperatorProfile { - id: source.plan_id, - operator_type: OperatorType::RuntimeFilter, - execution_info: proc_prof.into(), - children: vec![], - attribute: OperatorAttribute::Empty, - }; - plan_node_profs.push(prof); - } PhysicalPlan::DistributedInsertSelect(select) => { flatten_plan_node_profile(metadata, &select.input, profs, plan_node_profs)?; let proc_prof = profs.get(&select.plan_id).copied().unwrap_or_default(); diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index 30a2ed94904c1..fcddf82cf9d29 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -747,16 +747,6 @@ impl<'a> Binder { } f.scalars().is_empty() } - RelOperator::RuntimeFilterSource(runtime_filter_source) => { - f.reset_finder(); - for scalar in runtime_filter_source.left_runtime_filters.values() { - f.visit(scalar)?; - } - for scalar in runtime_filter_source.right_runtime_filters.values() { - f.visit(scalar)?; - } - f.scalars().is_empty() - } RelOperator::Window(window) => { f.reset_finder(); for scalar_item in &window.arguments { diff --git a/src/query/sql/src/planner/binder/join.rs b/src/query/sql/src/planner/binder/join.rs index 27ade60509a6b..1fae3f49c7622 100644 --- a/src/query/sql/src/planner/binder/join.rs +++ b/src/query/sql/src/planner/binder/join.rs @@ -238,7 +238,6 @@ impl Binder { join_type, marker_index: None, from_correlated_subquery: false, - contain_runtime_filter: false, need_hold_hash_table: false, }; Ok(SExpr::create_binary( diff --git a/src/query/sql/src/planner/format/display_rel_operator.rs b/src/query/sql/src/planner/format/display_rel_operator.rs index 3719a3599a897..c75cecbdadca1 100644 --- a/src/query/sql/src/planner/format/display_rel_operator.rs +++ b/src/query/sql/src/planner/format/display_rel_operator.rs @@ -73,7 +73,6 @@ impl Display for FormatContext { RelOperator::UnionAll(_) => write!(f, "Union"), RelOperator::Pattern(_) => write!(f, "Pattern"), RelOperator::DummyTableScan(_) => write!(f, "DummyTableScan"), - RelOperator::RuntimeFilterSource(_) => write!(f, "RuntimeFilterSource"), RelOperator::Window(_) => write!(f, "WindowFunc"), RelOperator::ProjectSet(_) => write!(f, "ProjectSet"), RelOperator::CteScan(_) => write!(f, "CteScan"), diff --git a/src/query/sql/src/planner/optimizer/format.rs b/src/query/sql/src/planner/optimizer/format.rs index a86ad1886384b..394ac755060ff 100644 --- a/src/query/sql/src/planner/optimizer/format.rs +++ b/src/query/sql/src/planner/optimizer/format.rs @@ -46,7 +46,6 @@ pub fn display_rel_op(rel_op: &RelOperator) -> String { RelOperator::Exchange(_) => "Exchange".to_string(), RelOperator::Pattern(_) => "Pattern".to_string(), RelOperator::DummyTableScan(_) => "DummyTableScan".to_string(), - RelOperator::RuntimeFilterSource(_) => "RuntimeFilterSource".to_string(), RelOperator::ProjectSet(_) => "ProjectSet".to_string(), RelOperator::Window(_) => "WindowFunc".to_string(), RelOperator::CteScan(_) => "CteScan".to_string(), diff --git a/src/query/sql/src/planner/optimizer/heuristic/decorrelate/decorrelate.rs b/src/query/sql/src/planner/optimizer/heuristic/decorrelate/decorrelate.rs index 596cf6aabdab2..c7bb09b341af8 100644 --- a/src/query/sql/src/planner/optimizer/heuristic/decorrelate/decorrelate.rs +++ b/src/query/sql/src/planner/optimizer/heuristic/decorrelate/decorrelate.rs @@ -220,7 +220,6 @@ impl SubqueryRewriter { }, marker_index: None, from_correlated_subquery: true, - contain_runtime_filter: false, need_hold_hash_table: false, }; @@ -295,7 +294,6 @@ impl SubqueryRewriter { join_type: JoinType::LeftSingle, marker_index: None, from_correlated_subquery: true, - contain_runtime_filter: false, need_hold_hash_table: false, }; let s_expr = SExpr::create_binary( @@ -343,7 +341,6 @@ impl SubqueryRewriter { join_type: JoinType::RightMark, marker_index: Some(marker_index), from_correlated_subquery: true, - contain_runtime_filter: false, need_hold_hash_table: false, }; let s_expr = SExpr::create_binary( @@ -406,7 +403,6 @@ impl SubqueryRewriter { join_type: JoinType::RightMark, marker_index: Some(marker_index), from_correlated_subquery: true, - contain_runtime_filter: false, need_hold_hash_table: false, } .into(); diff --git a/src/query/sql/src/planner/optimizer/heuristic/decorrelate/flatten_plan.rs b/src/query/sql/src/planner/optimizer/heuristic/decorrelate/flatten_plan.rs index 584eed1254c9f..c92602d54e5d9 100644 --- a/src/query/sql/src/planner/optimizer/heuristic/decorrelate/flatten_plan.rs +++ b/src/query/sql/src/planner/optimizer/heuristic/decorrelate/flatten_plan.rs @@ -131,7 +131,6 @@ impl SubqueryRewriter { join_type: JoinType::Cross, marker_index: None, from_correlated_subquery: false, - contain_runtime_filter: false, need_hold_hash_table: false, } .into(); @@ -459,7 +458,6 @@ impl SubqueryRewriter { join_type: join.join_type.clone(), marker_index: join.marker_index, from_correlated_subquery: false, - contain_runtime_filter: false, need_hold_hash_table: false, } .into(), diff --git a/src/query/sql/src/planner/optimizer/heuristic/subquery_rewriter.rs b/src/query/sql/src/planner/optimizer/heuristic/subquery_rewriter.rs index 202452462b53c..cd9f0262519a1 100644 --- a/src/query/sql/src/planner/optimizer/heuristic/subquery_rewriter.rs +++ b/src/query/sql/src/planner/optimizer/heuristic/subquery_rewriter.rs @@ -475,7 +475,6 @@ impl SubqueryRewriter { join_type: JoinType::Cross, marker_index: None, from_correlated_subquery: false, - contain_runtime_filter: false, need_hold_hash_table: false, } .into(); @@ -543,7 +542,6 @@ impl SubqueryRewriter { join_type: JoinType::RightMark, marker_index: Some(marker_index), from_correlated_subquery: false, - contain_runtime_filter: false, need_hold_hash_table: false, } .into(); @@ -573,7 +571,6 @@ impl SubqueryRewriter { join_type: JoinType::Cross, marker_index: None, from_correlated_subquery: false, - contain_runtime_filter: false, need_hold_hash_table: false, } .into(); diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs index 3d1bfcd0d3959..5474155aa4dc9 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs @@ -252,10 +252,9 @@ impl DPhpy { self.join_relations.push(JoinRelation::new(&new_s_expr)); Ok((new_s_expr, optimized)) } - RelOperator::Exchange(_) - | RelOperator::AddRowNumber(_) - | RelOperator::Pattern(_) - | RelOperator::RuntimeFilterSource(_) => unreachable!(), + RelOperator::Exchange(_) | RelOperator::AddRowNumber(_) | RelOperator::Pattern(_) => { + unreachable!() + } RelOperator::DummyTableScan(_) | RelOperator::ConstantTableScan(_) | RelOperator::CteScan(_) diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs b/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs index 9c33c99b37d59..3a3a538009f1b 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/join_node.rs @@ -86,7 +86,6 @@ impl JoinNode { join_type: self.join_type.clone(), marker_index: None, from_correlated_subquery: false, - contain_runtime_filter: false, need_hold_hash_table: false, }); let children = self diff --git a/src/query/sql/src/planner/optimizer/mod.rs b/src/query/sql/src/planner/optimizer/mod.rs index 03fe43c9b6fe0..5512e1300932a 100644 --- a/src/query/sql/src/planner/optimizer/mod.rs +++ b/src/query/sql/src/planner/optimizer/mod.rs @@ -26,7 +26,6 @@ mod optimizer; mod pattern_extractor; mod property; mod rule; -mod runtime_filter; pub mod s_expr; mod util; diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 5b9383f2264bf..aa872f1eac7ee 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -28,7 +28,6 @@ use super::Memo; use crate::optimizer::cascades::CascadesOptimizer; use crate::optimizer::distributed::optimize_distributed_query; use crate::optimizer::hyper_dp::DPhpy; -use crate::optimizer::runtime_filter::try_add_runtime_filter_nodes; use crate::optimizer::util::contains_local_table_scan; use crate::optimizer::HeuristicOptimizer; use crate::optimizer::RuleID; @@ -217,13 +216,6 @@ pub fn optimize_query( // with reading data from local tales(e.g. system tables). let enable_distributed_query = opt_ctx.config.enable_distributed_optimization && !contains_local_table_scan; - // Add runtime filter related nodes after cbo - // Because cbo may change join order and we don't want to - // break optimizer due to new added nodes by runtime filter. - // Currently, we only support standalone. - if !enable_distributed_query && ctx.get_settings().get_runtime_filter()? { - result = try_add_runtime_filter_nodes(&result)?; - } if enable_distributed_query { result = optimize_distributed_query(ctx.clone(), &result)?; } diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_commute_join.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_commute_join.rs index bbec090aac195..89e8440b027dd 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_commute_join.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_commute_join.rs @@ -21,7 +21,6 @@ use crate::optimizer::rule::TransformResult; use crate::optimizer::RelExpr; use crate::optimizer::RuleID; use crate::optimizer::SExpr; -use crate::planner::plans::operator::Operator; use crate::plans::Join; use crate::plans::JoinType; use crate::plans::PatternPlan; @@ -65,9 +64,6 @@ impl Rule for RuleCommuteJoin { fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> { let mut join: Join = s_expr.plan().clone().try_into()?; let left_child = s_expr.child(0)?; - if left_child.plan.rel_op() == RelOp::RuntimeFilterSource { - return Ok(()); - } let right_child = s_expr.child(1)?; let left_rel_expr = RelExpr::with_s_expr(left_child); let right_rel_expr = RelExpr::with_s_expr(right_child); diff --git a/src/query/sql/src/planner/optimizer/runtime_filter/mod.rs b/src/query/sql/src/planner/optimizer/runtime_filter/mod.rs deleted file mode 100644 index 8fad75c6868da..0000000000000 --- a/src/query/sql/src/planner/optimizer/runtime_filter/mod.rs +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::BTreeMap; -use std::sync::Arc; - -use common_exception::Result; - -use crate::optimizer::SExpr; -use crate::plans::Join; -use crate::plans::JoinType; -use crate::plans::Operator; -use crate::plans::RelOp; -use crate::plans::RelOperator; -use crate::plans::RuntimeFilterId; -use crate::plans::RuntimeFilterSource; -use crate::ScalarExpr; - -pub struct RuntimeFilterResult { - pub left_runtime_filters: BTreeMap, - pub right_runtime_filters: BTreeMap, -} - -fn create_runtime_filters(join: &Join) -> Result { - let mut left_runtime_filters = BTreeMap::new(); - let mut right_runtime_filters = BTreeMap::new(); - for (idx, exprs) in join - .right_conditions - .iter() - .zip(join.left_conditions.iter()) - .enumerate() - { - right_runtime_filters.insert(RuntimeFilterId::new(idx), exprs.0.clone()); - left_runtime_filters.insert(RuntimeFilterId::new(idx), exprs.1.clone()); - } - Ok(RuntimeFilterResult { - left_runtime_filters, - right_runtime_filters, - }) -} - -fn wrap_runtime_filter_source( - s_expr: &SExpr, - runtime_filter_result: RuntimeFilterResult, -) -> Result { - let source_node = RuntimeFilterSource { - left_runtime_filters: runtime_filter_result.left_runtime_filters, - right_runtime_filters: runtime_filter_result.right_runtime_filters, - }; - let build_side = s_expr.child(1)?.clone(); - let mut probe_side = s_expr.child(0)?.clone(); - probe_side = SExpr::create_binary( - Arc::new(source_node.into()), - Arc::new(probe_side), - Arc::new(build_side.clone()), - ); - let mut join: Join = s_expr.plan().clone().try_into()?; - join.contain_runtime_filter = true; - let s_expr = s_expr.replace_plan(Arc::new(RelOperator::Join(join))); - Ok(s_expr.replace_children(vec![Arc::new(probe_side), Arc::new(build_side)])) -} - -// Traverse plan tree and check if exists join -// Currently, only support inner join. -pub fn try_add_runtime_filter_nodes(expr: &SExpr) -> Result { - if expr.children().len() == 1 && expr.children()[0].is_pattern() { - return Ok(expr.clone()); - } - let mut new_expr = expr.clone(); - if expr.plan.rel_op() == RelOp::Join { - // Todo(xudong): develop a strategy to decide whether to add runtime filter node - new_expr = add_runtime_filter_nodes(expr)?; - } - - let mut children = vec![]; - - for child in new_expr.children.iter() { - children.push(Arc::new(try_add_runtime_filter_nodes(child)?)); - } - Ok(new_expr.replace_children(children)) -} - -fn add_runtime_filter_nodes(expr: &SExpr) -> Result { - assert_eq!(expr.plan.rel_op(), RelOp::Join); - let join: Join = expr.plan().clone().try_into()?; - if join.join_type != JoinType::Inner { - return Ok(expr.clone()); - } - let runtime_filter_result = create_runtime_filters(&join)?; - wrap_runtime_filter_source(expr, runtime_filter_result) -} diff --git a/src/query/sql/src/planner/optimizer/s_expr.rs b/src/query/sql/src/planner/optimizer/s_expr.rs index bf4773a7b31ed..0d3b88e2b191d 100644 --- a/src/query/sql/src/planner/optimizer/s_expr.rs +++ b/src/query/sql/src/planner/optimizer/s_expr.rs @@ -362,7 +362,6 @@ impl SExpr { | RelOperator::DummyTableScan(_) | RelOperator::CteScan(_) | RelOperator::AddRowNumber(_) - | RelOperator::RuntimeFilterSource(_) | RelOperator::Pattern(_) | RelOperator::MaterializedCte(_) | RelOperator::ConstantTableScan(_) => {} @@ -429,7 +428,6 @@ fn find_subquery(rel_op: &RelOperator) -> bool { | RelOperator::DummyTableScan(_) | RelOperator::CteScan(_) | RelOperator::AddRowNumber(_) - | RelOperator::RuntimeFilterSource(_) | RelOperator::Pattern(_) | RelOperator::MaterializedCte(_) | RelOperator::ConstantTableScan(_) => false, diff --git a/src/query/sql/src/planner/plans/join.rs b/src/query/sql/src/planner/plans/join.rs index 41946ed5c0cf5..fdab89b126fac 100644 --- a/src/query/sql/src/planner/plans/join.rs +++ b/src/query/sql/src/planner/plans/join.rs @@ -148,8 +148,6 @@ pub struct Join { // marker_index is for MarkJoin only. pub marker_index: Option, pub from_correlated_subquery: bool, - // It means that join has a corresponding runtime filter - pub contain_runtime_filter: bool, // if we execute distributed merge into, we need to hold the // hash table to get not match data from source. pub need_hold_hash_table: bool, @@ -164,7 +162,6 @@ impl Default for Join { join_type: JoinType::Cross, marker_index: Default::default(), from_correlated_subquery: Default::default(), - contain_runtime_filter: false, need_hold_hash_table: false, } } diff --git a/src/query/sql/src/planner/plans/mod.rs b/src/query/sql/src/planner/plans/mod.rs index daf0298d31121..dd0b998840ed5 100644 --- a/src/query/sql/src/planner/plans/mod.rs +++ b/src/query/sql/src/planner/plans/mod.rs @@ -42,7 +42,6 @@ mod project_set; mod recluster_table; mod replace; mod revert_table; -mod runtime_filter_source; mod scalar_expr; mod scan; mod setting; @@ -84,8 +83,6 @@ pub use project_set::*; pub use recluster_table::ReclusterTablePlan; pub use replace::Replace; pub use revert_table::RevertTablePlan; -pub use runtime_filter_source::RuntimeFilterId; -pub use runtime_filter_source::RuntimeFilterSource; pub use scalar_expr::*; pub use scan::*; pub use setting::*; diff --git a/src/query/sql/src/planner/plans/operator.rs b/src/query/sql/src/planner/plans/operator.rs index 9539304cb1ae1..aadfced70c0bd 100644 --- a/src/query/sql/src/planner/plans/operator.rs +++ b/src/query/sql/src/planner/plans/operator.rs @@ -35,7 +35,6 @@ use crate::optimizer::RelationalProperty; use crate::optimizer::RequiredProperty; use crate::optimizer::StatInfo; use crate::plans::materialized_cte::MaterializedCte; -use crate::plans::runtime_filter_source::RuntimeFilterSource; use crate::plans::ConstantTableScan; use crate::plans::CteScan; use crate::plans::Exchange; @@ -79,7 +78,6 @@ pub enum RelOp { Exchange, UnionAll, DummyTableScan, - RuntimeFilterSource, Window, ProjectSet, MaterializedCte, @@ -106,7 +104,6 @@ pub enum RelOperator { AddRowNumber(AddRowNumber), UnionAll(UnionAll), DummyTableScan(DummyTableScan), - RuntimeFilterSource(RuntimeFilterSource), Window(Window), ProjectSet(ProjectSet), MaterializedCte(MaterializedCte), @@ -129,7 +126,6 @@ impl Operator for RelOperator { RelOperator::Exchange(rel_op) => rel_op.rel_op(), RelOperator::UnionAll(rel_op) => rel_op.rel_op(), RelOperator::DummyTableScan(rel_op) => rel_op.rel_op(), - RelOperator::RuntimeFilterSource(rel_op) => rel_op.rel_op(), RelOperator::ProjectSet(rel_op) => rel_op.rel_op(), RelOperator::Window(rel_op) => rel_op.rel_op(), RelOperator::CteScan(rel_op) => rel_op.rel_op(), @@ -153,7 +149,6 @@ impl Operator for RelOperator { RelOperator::Exchange(rel_op) => rel_op.derive_relational_prop(rel_expr), RelOperator::UnionAll(rel_op) => rel_op.derive_relational_prop(rel_expr), RelOperator::DummyTableScan(rel_op) => rel_op.derive_relational_prop(rel_expr), - RelOperator::RuntimeFilterSource(rel_op) => rel_op.derive_relational_prop(rel_expr), RelOperator::ProjectSet(rel_op) => rel_op.derive_relational_prop(rel_expr), RelOperator::Window(rel_op) => rel_op.derive_relational_prop(rel_expr), RelOperator::CteScan(rel_op) => rel_op.derive_relational_prop(rel_expr), @@ -177,7 +172,6 @@ impl Operator for RelOperator { RelOperator::Exchange(rel_op) => rel_op.derive_physical_prop(rel_expr), RelOperator::UnionAll(rel_op) => rel_op.derive_physical_prop(rel_expr), RelOperator::DummyTableScan(rel_op) => rel_op.derive_physical_prop(rel_expr), - RelOperator::RuntimeFilterSource(rel_op) => rel_op.derive_physical_prop(rel_expr), RelOperator::ProjectSet(rel_op) => rel_op.derive_physical_prop(rel_expr), RelOperator::Window(rel_op) => rel_op.derive_physical_prop(rel_expr), RelOperator::CteScan(rel_op) => rel_op.derive_physical_prop(rel_expr), @@ -201,7 +195,6 @@ impl Operator for RelOperator { RelOperator::Exchange(rel_op) => rel_op.derive_cardinality(rel_expr), RelOperator::UnionAll(rel_op) => rel_op.derive_cardinality(rel_expr), RelOperator::DummyTableScan(rel_op) => rel_op.derive_cardinality(rel_expr), - RelOperator::RuntimeFilterSource(rel_op) => rel_op.derive_cardinality(rel_expr), RelOperator::ProjectSet(rel_op) => rel_op.derive_cardinality(rel_expr), RelOperator::Window(rel_op) => rel_op.derive_cardinality(rel_expr), RelOperator::CteScan(rel_op) => rel_op.derive_cardinality(rel_expr), @@ -253,9 +246,6 @@ impl Operator for RelOperator { RelOperator::DummyTableScan(rel_op) => { rel_op.compute_required_prop_child(ctx, rel_expr, child_index, required) } - RelOperator::RuntimeFilterSource(rel_op) => { - rel_op.compute_required_prop_child(ctx, rel_expr, child_index, required) - } RelOperator::Window(rel_op) => { rel_op.compute_required_prop_child(ctx, rel_expr, child_index, required) } @@ -542,26 +532,6 @@ impl TryFrom for DummyTableScan { } } -impl From for RelOperator { - fn from(value: RuntimeFilterSource) -> Self { - Self::RuntimeFilterSource(value) - } -} - -impl TryFrom for RuntimeFilterSource { - type Error = ErrorCode; - - fn try_from(value: RelOperator) -> std::result::Result { - if let RelOperator::RuntimeFilterSource(value) = value { - Ok(value) - } else { - Err(ErrorCode::Internal( - "Cannot downcast RelOperator to RuntimeFilterSource", - )) - } - } -} - impl From for RelOperator { fn from(value: ProjectSet) -> Self { Self::ProjectSet(value) diff --git a/src/query/sql/src/planner/plans/runtime_filter_source.rs b/src/query/sql/src/planner/plans/runtime_filter_source.rs deleted file mode 100644 index b0115cea54a36..0000000000000 --- a/src/query/sql/src/planner/plans/runtime_filter_source.rs +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::BTreeMap; -use std::hash::Hash; -use std::sync::Arc; - -use common_catalog::table_context::TableContext; -use common_exception::Result; - -use crate::optimizer::PhysicalProperty; -use crate::optimizer::RelExpr; -use crate::optimizer::RelationalProperty; -use crate::optimizer::RequiredProperty; -use crate::optimizer::StatInfo; -use crate::optimizer::Statistics; -use crate::plans::Operator; -use crate::plans::RelOp; -use crate::ColumnSet; -use crate::IndexType; -use crate::ScalarExpr; - -#[derive( - Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize, Ord, PartialOrd, -)] -pub struct RuntimeFilterId { - id: String, -} - -impl RuntimeFilterId { - pub fn new(id: IndexType) -> Self { - RuntimeFilterId { id: id.to_string() } - } -} - -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub struct RuntimeFilterSource { - pub left_runtime_filters: BTreeMap, - pub right_runtime_filters: BTreeMap, -} - -impl RuntimeFilterSource { - pub fn used_columns(&self) -> Result { - let mut used_columns = ColumnSet::new(); - for expr in self - .left_runtime_filters - .iter() - .chain(self.right_runtime_filters.iter()) - { - used_columns.extend(expr.1.used_columns()); - } - Ok(used_columns) - } -} - -impl Operator for RuntimeFilterSource { - fn rel_op(&self) -> RelOp { - RelOp::RuntimeFilterSource - } - - fn derive_relational_prop(&self, rel_expr: &RelExpr) -> Result> { - let left_prop = rel_expr.derive_relational_prop_child(0)?; - // Derive output columns - let output_columns = left_prop.output_columns.clone(); - // Derive outer columns - let outer_columns = left_prop.outer_columns.clone(); - - Ok(Arc::new(RelationalProperty { - output_columns, - outer_columns, - used_columns: self.used_columns()?, - })) - } - - fn derive_physical_prop(&self, _rel_expr: &RelExpr) -> Result { - todo!() - } - - fn derive_cardinality(&self, rel_expr: &RelExpr) -> Result> { - let stat_info = rel_expr.derive_cardinality_child(0)?; - Ok(Arc::new(StatInfo { - cardinality: stat_info.cardinality, - statistics: Statistics { - precise_cardinality: None, - column_stats: stat_info.statistics.column_stats.clone(), - }, - })) - } - - fn compute_required_prop_child( - &self, - _ctx: Arc, - _rel_expr: &RelExpr, - _child_index: usize, - _required: &RequiredProperty, - ) -> Result { - todo!() - } -} diff --git a/tests/sqllogictests/suites/crdb/runtime_filter_join.test b/tests/sqllogictests/suites/crdb/runtime_filter_join.test deleted file mode 100644 index 9b83d4e41aab1..0000000000000 --- a/tests/sqllogictests/suites/crdb/runtime_filter_join.test +++ /dev/null @@ -1,1077 +0,0 @@ -control sortmode rowsort - -statement ok -set enable_runtime_filter = 1; - -statement ok -drop table if exists onecolumn - -statement ok -CREATE TABLE onecolumn (x INT NULL) - -statement ok -INSERT INTO onecolumn(x) VALUES (44), (NULL), (42) - -query II -SELECT * FROM onecolumn AS a(x) CROSS JOIN onecolumn AS b(y) order by x ----- -42 42 -42 44 -42 NULL -44 42 -44 44 -44 NULL -NULL 42 -NULL 44 -NULL NULL - -statement error 1065 -SELECT x FROM onecolumn AS a, onecolumn AS b; - -query II -SELECT * FROM onecolumn AS a(x) JOIN onecolumn AS b(y) ON a.x = b.y order by a.x ----- -42 42 -44 44 - -query I -SELECT * FROM onecolumn AS a JOIN onecolumn as b USING(x) ORDER BY x ----- -42 -44 - -query I -SELECT * FROM onecolumn AS a NATURAL JOIN onecolumn as b order by a.x ----- -42 -44 - -query II -SELECT * FROM onecolumn AS a(x) LEFT OUTER JOIN onecolumn AS b(y) ON a.x = b.y order by a.x ----- -42 42 -44 44 -NULL NULL - -query I -SELECT * FROM onecolumn AS a LEFT OUTER JOIN onecolumn AS b USING(x) ORDER BY x ----- -42 -44 -NULL - -statement error 1065 -SELECT * FROM onecolumn AS a, onecolumn AS b ORDER BY x - -query I -SELECT * FROM onecolumn AS a NATURAL LEFT OUTER JOIN onecolumn AS b order by a.x ----- -42 -44 -NULL - -query II -SELECT * FROM onecolumn AS a(x) RIGHT OUTER JOIN onecolumn AS b(y) ON a.x = b.y order by x ----- -42 42 -44 44 -NULL NULL - -query I -SELECT * FROM onecolumn AS a RIGHT OUTER JOIN onecolumn AS b USING(x) ORDER BY x ----- -42 -44 -NULL - -query I -SELECT * FROM onecolumn AS a NATURAL RIGHT OUTER JOIN onecolumn AS b order by x ----- -42 -44 -NULL - -statement ok -drop table if exists onecolumn_w - -statement ok -CREATE TABLE onecolumn_w(w INT) - -statement ok -INSERT INTO onecolumn_w(w) VALUES (42),(43) - -query II -SELECT * FROM onecolumn AS a NATURAL JOIN onecolumn_w as b ----- -42 42 -42 43 -44 42 -44 43 -NULL 42 -NULL 43 - -statement ok -drop table if exists othercolumn - -statement ok -CREATE TABLE othercolumn (x INT) - -statement ok -INSERT INTO othercolumn(x) VALUES (43),(42),(16) - -query II -SELECT * FROM onecolumn AS a FULL OUTER JOIN othercolumn AS b ON a.x = b.x ORDER BY a.x,b.x ----- -42 42 -44 NULL -NULL 16 -NULL 43 -NULL NULL - -query II -SELECT * FROM onecolumn AS a full OUTER JOIN othercolumn AS b ON a.x = b.x and a.x > 16 order by a.x ----- -42 42 -44 NULL -NULL 16 -NULL 43 -NULL NULL - -query II -SELECT * FROM onecolumn AS a full OUTER JOIN othercolumn AS b ON a.x = b.x and b.x > 16 order by b.x ----- -42 42 -44 NULL -NULL 16 -NULL 43 -NULL NULL - -query II -SELECT * FROM onecolumn AS a full OUTER JOIN othercolumn AS b ON false order by b.x ----- -42 NULL -44 NULL -NULL 16 -NULL 42 -NULL 43 -NULL NULL - -query II -SELECT * FROM onecolumn AS a full OUTER JOIN othercolumn AS b ON true order by b.x ----- -42 16 -42 42 -42 43 -44 16 -44 42 -44 43 -NULL 16 -NULL 42 -NULL 43 - -# query -# SELECT * FROM onecolumn AS a FULL OUTER JOIN othercolumn AS b USING(x) ORDER BY x - -# query -# SELECT x AS s, a.x, b.x FROM onecolumn AS a FULL OUTER JOIN othercolumn AS b USING(x) ORDER BY s - -# query -# SELECT * FROM onecolumn AS a NATURAL FULL OUTER JOIN othercolumn AS b ORDER BY x - -# query -# SELECT * FROM (SELECT x FROM onecolumn ORDER BY x DESC) NATURAL JOIN (VALUES (42)) AS v(x) LIMIT 1 - -statement ok -drop table if exists empty - -statement ok -CREATE TABLE empty (x INT) - -# bug(#7149) fix by https://github.com/datafuselabs/databend/pull/7150 -statement ok -SELECT * FROM onecolumn AS a(x) CROSS JOIN empty AS b(y) - -statement ok -SELECT * FROM empty AS a CROSS JOIN onecolumn AS b - -statement ok -SELECT * FROM onecolumn AS a(x) JOIN empty AS b(y) ON a.x = b.y - -statement ok -SELECT * FROM onecolumn AS a JOIN empty AS b USING(x) - -statement ok -SELECT * FROM empty AS a(x) JOIN onecolumn AS b(y) ON a.x = b.y - -statement ok -SELECT * FROM empty AS a JOIN onecolumn AS b USING(x) - -query IT -SELECT * FROM onecolumn AS a(x) LEFT OUTER JOIN empty AS b(y) ON a.x = b.y ORDER BY a.x ----- -42 NULL -44 NULL -NULL NULL - -query I -SELECT * FROM onecolumn AS a LEFT OUTER JOIN empty AS b USING(x) ORDER BY x ----- -42 -44 -NULL - -statement ok -SELECT * FROM empty AS a(x) LEFT OUTER JOIN onecolumn AS b(y) ON a.x = b.y - -statement ok -SELECT * FROM empty AS a LEFT OUTER JOIN onecolumn AS b USING(x) - -statement ok -SELECT * FROM onecolumn AS a(x) RIGHT OUTER JOIN empty AS b(y) ON a.x = b.y - -statement ok -SELECT * FROM onecolumn AS a RIGHT OUTER JOIN empty AS b USING(x) - -query II -SELECT * FROM empty AS a(x) FULL OUTER JOIN onecolumn AS b(y) ON a.x = b.y ORDER BY b.y ----- -NULL 42 -NULL 44 -NULL NULL - -statement ok -SELECT * FROM empty AS a FULL OUTER JOIN onecolumn AS b USING(x) ORDER BY x - -query II -SELECT * FROM onecolumn AS a(x) FULL OUTER JOIN empty AS b(y) ON a.x = b.y ORDER BY a.x ----- -42 NULL -44 NULL -NULL NULL - -# query -# SELECT * FROM onecolumn AS a FULL OUTER JOIN empty AS b USING(x) ORDER BY x - -query II -SELECT * FROM empty AS a(x) FULL OUTER JOIN onecolumn AS b(y) ON a.x = b.y ORDER BY b.y ----- -NULL 42 -NULL 44 -NULL NULL - -# query -# SELECT * FROM empty AS a FULL OUTER JOIN onecolumn AS b USING(x) ORDER BY x - -statement ok -drop table if exists twocolumn - -statement ok -CREATE TABLE twocolumn (x INT NULL, y INT NULL) - -statement ok -INSERT INTO twocolumn(x, y) VALUES (44,51), (NULL,52), (42,53), (45,45) - -query II -SELECT * FROM onecolumn NATURAL JOIN twocolumn ----- -42 53 -44 51 - -query IIII -SELECT * FROM twocolumn AS a JOIN twocolumn AS b ON a.x = a.y order by a.x ----- -45 45 42 53 -45 45 44 51 -45 45 45 45 -45 45 NULL 52 - -query II -SELECT o.x, t.y FROM onecolumn o INNER JOIN twocolumn t ON (o.x=t.x AND t.y=53) ----- -42 53 - -query IT -SELECT o.x, t.y FROM onecolumn o LEFT OUTER JOIN twocolumn t ON (o.x=t.x AND t.y=53) order by o.x ----- -42 53 -44 NULL -NULL NULL - -query II -SELECT o.x, t.y FROM onecolumn o LEFT OUTER JOIN twocolumn t ON (o.x=t.x AND o.x=44) order by o.x ----- -42 NULL -44 51 -NULL NULL - -query II -SELECT o.x, t.y FROM onecolumn o LEFT OUTER JOIN twocolumn t ON (o.x=t.x AND t.x=44) order by o.x ----- -42 NULL -44 51 -NULL NULL - -# query -# SELECT * FROM (SELECT x, 2 two FROM onecolumn) NATURAL FULL JOIN (SELECT x, y+1 plus1 FROM twocolumn) - -statement ok -drop table if exists a - -statement ok -drop table if exists b - -statement ok -CREATE TABLE a (i int) - -statement ok -INSERT INTO a VALUES (1), (2), (3) - -statement ok -CREATE TABLE b (i int, b bool) - -statement ok -INSERT INTO b VALUES (2, true), (3, true), (4, false) - -query III -SELECT * FROM a INNER JOIN b ON a.i = b.i ----- -2 2 1 -3 3 1 - -query ITT -SELECT * FROM a LEFT OUTER JOIN b ON a.i = b.i ----- -1 NULL NULL -2 2 1 -3 3 1 - -query III -SELECT * FROM a RIGHT OUTER JOIN b ON a.i = b.i order by b ----- -2 2 1 -3 3 1 -NULL 4 0 - -query III -SELECT * FROM a FULL OUTER JOIN b ON a.i = b.i order by b ----- -1 NULL NULL -2 2 1 -3 3 1 -NULL 4 0 - -query III -SELECT * FROM a FULL OUTER JOIN b ON (a.i = b.i and a.i>2) ORDER BY a.i, b.i ----- -1 NULL NULL -2 NULL NULL -3 3 1 -NULL 2 1 -NULL 4 0 - - -statement ok -INSERT INTO b VALUES (3, false) - -query III -SELECT * FROM a RIGHT OUTER JOIN b ON a.i=b.i ORDER BY b.i, b.b ----- -2 2 1 -3 3 0 -3 3 1 -NULL 4 0 - -query III -SELECT * FROM a FULL OUTER JOIN b ON a.i=b.i ORDER BY b.i, b.b ----- -1 NULL NULL -2 2 1 -3 3 0 -3 3 1 -NULL 4 0 - - -query IIIIII -SELECT * FROM (onecolumn CROSS JOIN twocolumn JOIN onecolumn AS a(b) ON a.b=twocolumn.x JOIN twocolumn AS c(d,e) ON a.b=c.d AND c.d=onecolumn.x) ORDER BY 1 LIMIT 1 ----- -42 42 53 42 42 53 - -# query I -# SELECT * FROM onecolumn JOIN twocolumn ON twocolumn.x = onecolumn.x AND onecolumn.x IN (SELECT x FROM twocolumn WHERE y >= 52) - -# ---- -# 42 42 53 - -# query I -# SELECT * FROM onecolumn JOIN (VALUES (41),(42),(43)) AS a(x) USING(x) - -# ---- -# 42 - -query I -SELECT * FROM onecolumn JOIN (SELECT x + 2 AS x FROM onecolumn) USING(x) ----- -44 - -query IIIII -SELECT * FROM (twocolumn AS a JOIN twocolumn AS b USING(x) JOIN twocolumn AS c USING(x)) ORDER BY x LIMIT 1 ----- -42 53 53 53 - -query IIIIII -SELECT a.x AS s, b.x, c.x, a.y, b.y, c.y FROM (twocolumn AS a JOIN twocolumn AS b USING(x) JOIN twocolumn AS c USING(x)) ORDER BY s ----- -42 42 42 53 53 53 -44 44 44 51 51 51 -45 45 45 45 45 45 - -statement error 1065 -SELECT * FROM (onecolumn AS a JOIN onecolumn AS b USING(y)) - - -# TODO column name "x" appears more than once in USING clause ,should be error? -query I -SELECT * FROM (onecolumn AS a JOIN onecolumn AS b USING(x, x)) ----- -42 -44 - -statement ok -drop table if exists othertype - -statement ok -CREATE TABLE othertype (x TEXT) - -query I -SELECT * FROM (onecolumn AS a JOIN othertype AS b USING(x)) ----- - -statement error 1065 -SELECT * FROM (onecolumn JOIN onecolumn USING(x)) - -statement error 1065 -SELECT * FROM (onecolumn JOIN twocolumn USING(x) JOIN onecolumn USING(x)) - -# query II -# SELECT * FROM (SELECT * FROM onecolumn), (SELECT * FROM onecolumn) - -# query I -# SELECT x FROM (onecolumn JOIN othercolumn USING (x)) JOIN (onecolumn AS a JOIN othercolumn AS b USING(x)) USING(x) - -statement error 1065 -SELECT x FROM (SELECT * FROM onecolumn), (SELECT * FROM onecolumn) - -statement error 1065 -SELECT * FROM (onecolumn AS a JOIN onecolumn AS b ON x > 32) - - -statement error 1065 -SELECT * FROM (onecolumn AS a JOIN onecolumn AS b ON a.y > y) - -statement ok -drop table if exists s - -statement ok -CREATE TABLE s(x INT) - -statement ok -INSERT INTO s(x) VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10) - -statement ok -drop table if exists pairs - -statement ok -drop table if exists square - -statement ok -CREATE TABLE square (n INT, sq INT) - -statement ok -INSERT INTO square VALUES (1,1), (2,4), (3,9), (4,16), (5,25), (6,36) - -statement ok -CREATE TABLE pairs (a INT, b INT) - -statement ok -INSERT INTO pairs VALUES (1,1), (1,2), (1,3), (1,4), (1,5), (1,6), (2,3), (2,4), (2,5), (2,6), (3,4), (3,5), (3,6), (4,5), (4,6) - -query IIII -SELECT * FROM pairs, square WHERE pairs.b = square.n order by a ----- -1 1 1 1 -1 2 2 4 -1 3 3 9 -1 4 4 16 -1 5 5 25 -1 6 6 36 -2 3 3 9 -2 4 4 16 -2 5 5 25 -2 6 6 36 -3 4 4 16 -3 5 5 25 -3 6 6 36 -4 5 5 25 -4 6 6 36 - -query IIII -SELECT * FROM pairs, square WHERE pairs.a + pairs.b = square.sq ----- -1 3 2 4 -3 6 3 9 -4 5 3 9 - -# query -# SELECT a, b, n, sq FROM (SELECT a, b, a * b / 2 AS div, n, sq FROM pairs, square) WHERE div = sq - -query IIII -SELECT * FROM pairs FULL OUTER JOIN square ON pairs.a + pairs.b = square.sq order by a ----- -1 1 NULL NULL -1 2 NULL NULL -1 3 2 4 -1 4 NULL NULL -1 5 NULL NULL -1 6 NULL NULL -2 3 NULL NULL -2 4 NULL NULL -2 5 NULL NULL -2 6 NULL NULL -3 4 NULL NULL -3 5 NULL NULL -3 6 3 9 -4 5 3 9 -4 6 NULL NULL -NULL NULL 1 1 -NULL NULL 4 16 -NULL NULL 5 25 -NULL NULL 6 36 - -query IIII -SELECT * FROM pairs FULL OUTER JOIN square ON pairs.a + pairs.b = square.sq WHERE pairs.b%2 <> square.sq%2 order by a ----- -1 3 2 4 -3 6 3 9 - -query IITT -SELECT * FROM (SELECT * FROM pairs LEFT JOIN square ON b = sq AND a > 1 AND n < 6) WHERE b > 1 AND (n IS NULL OR n > 1) AND (n IS NULL OR a < sq) ----- -1 2 NULL NULL -1 3 NULL NULL -1 4 NULL NULL -1 5 NULL NULL -1 6 NULL NULL -2 3 NULL NULL -2 4 2 4 -2 5 NULL NULL -2 6 NULL NULL -3 4 2 4 -3 5 NULL NULL -3 6 NULL NULL -4 5 NULL NULL -4 6 NULL NULL - -onlyif todo -query IIII -SELECT * FROM (SELECT * FROM pairs RIGHT JOIN square ON b = sq AND a > 1 AND n < 6) WHERE (a IS NULL OR a > 2) AND n > 1 AND (a IS NULL OR a < sq) order by n ----- -3 4 2 4 -NULL NULL 3 9 -NULL NULL 4 16 -NULL NULL 5 25 -NULL NULL 6 36 - -statement ok -drop table if exists t1 - -statement ok -drop table if exists t2 - -statement ok -CREATE TABLE t1 (col1 INT, x INT, col2 INT, y INT) - -statement ok -CREATE TABLE t2 (col3 INT, y INT, x INT, col4 INT) - -statement ok -INSERT INTO t1 VALUES (10, 1, 11, 1), (20, 2, 21, 1), (30, 3, 31, 1) - -statement ok -INSERT INTO t2 VALUES (100, 1, 1, 101), (200, 1, 201, 2), (400, 1, 401, 4) - -query IIIIIII -SELECT * FROM t1 JOIN t2 USING(x) ----- -10 1 11 1 100 1 101 - -query IIIIII -SELECT * FROM t1 NATURAL JOIN t2 ----- -10 1 11 1 100 101 - -query IIIIIIII -SELECT * FROM t1 JOIN t2 ON t2.x=t1.x ----- -10 1 11 1 100 1 1 101 - -# query -# SELECT * FROM t1 FULL OUTER JOIN t2 USING(x) - -# query -# SELECT * FROM t1 NATURAL FULL OUTER JOIN t2 - -query III -SELECT t2.x, t1.x, x FROM t1 JOIN t2 USING(x) ----- -1 1 1 - -# query -# SELECT t2.x, t1.x, x FROM t1 FULL OUTER JOIN t2 USING(x) - -query I -SELECT x FROM t1 NATURAL JOIN (SELECT * FROM t2) ----- -1 - -statement ok -drop table if exists pkBA - -statement ok -drop table if exists pkBC - -statement ok -drop table if exists pkBAC - -statement ok -drop table if exists pkBAD - -statement ok -CREATE TABLE pkBA (a INT, b INT, c INT, d INT) - -statement ok -CREATE TABLE pkBC (a INT, b INT, c INT, d INT) - -statement ok -CREATE TABLE pkBAC (a INT, b INT, c INT, d INT) - -statement ok -CREATE TABLE pkBAD (a INT, b INT, c INT, d INT) - -statement ok -drop table if exists str1 - -statement ok -drop table if exists str2 - -statement ok -CREATE TABLE str1 (a INT, s STRING) - -statement ok -INSERT INTO str1 VALUES (1, 'a' ), (2, 'A'), (3, 'c'), (4, 'D') - -statement ok -CREATE TABLE str2 (a INT, s STRING) - -statement ok -INSERT INTO str2 VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4, 'E') - -query TTT -SELECT s, str1.s, str2.s FROM str1 INNER JOIN str2 USING(s) ----- -A A A - -query TTT -SELECT s, str1.s, str2.s FROM str1 LEFT OUTER JOIN str2 USING(s) order by str1.s ----- -A A A -D D NULL -a a NULL -c c NULL - -query TTT -SELECT s, str1.s, str2.s FROM str1 RIGHT OUTER JOIN str2 USING(s) order by str2.s ----- -A A A -B NULL B -C NULL C -E NULL E - -query ITIT -SELECT * FROM str1 LEFT OUTER JOIN str2 ON str1.s = str2.s order by str1.a ----- -1 a NULL NULL -2 A 1 A -3 c NULL NULL -4 D NULL NULL - -statement ok -INSERT INTO str1 VALUES (1, 'a' ), (2, 'A'), (3, 'c'), (4, 'D') - -query ITIT -select * from str1 right join str2 on str1.s = str2.s order by str2.a ----- -2 A 1 A -2 A 1 A -NULL NULL 2 B -NULL NULL 3 C -NULL NULL 4 E - -query ITIT -select * from str1 right join str2 on false order by str2.a ----- -NULL NULL 1 A -NULL NULL 2 B -NULL NULL 3 C -NULL NULL 4 E - -# query -# SELECT s, str1.s, str2.s FROM str1 FULL OUTER JOIN str2 USING(s) - -statement ok -drop table if exists xyu - -statement ok -drop table if exists xyv - -statement ok -CREATE TABLE xyu (x INT, y INT, u INT) - -statement ok -INSERT INTO xyu VALUES (0, 0, 0), (1, 1, 1), (3, 1, 31), (3, 2, 32), (4, 4, 44) - -statement ok -CREATE TABLE xyv (x INT, y INT, v INT) - -statement ok -INSERT INTO xyv VALUES (1, 1, 1), (2, 2, 2), (3, 1, 31), (3, 3, 33), (5, 5, 55) - -query IIII -SELECT * FROM xyu INNER JOIN xyv USING(x, y) WHERE x > 2 ----- -3 1 31 31 - -query IIII -SELECT * FROM xyu LEFT OUTER JOIN xyv USING(x, y) WHERE x > 2 ----- -3 1 31 31 -3 2 32 NULL -4 4 44 NULL - -query IIII -SELECT * FROM xyu RIGHT OUTER JOIN xyv USING(x, y) WHERE x > 2 order by y ----- -31 3 1 31 -NULL 3 3 33 -NULL 5 5 55 - -# statement error 1065 -# SELECT * FROM xyu FULL OUTER JOIN xyv USING(x, y) WHERE x > 2 - -query IIIIII -SELECT * FROM xyu INNER JOIN xyv ON xyu.x = xyv.x AND xyu.y = xyv.y WHERE xyu.x = 1 AND xyu.y < 10 ----- -1 1 1 1 1 1 - -query IIIIII -SELECT * FROM xyu INNER JOIN xyv ON xyu.x = xyv.x AND xyu.y = xyv.y AND xyu.x = 1 AND xyu.y < 10 ----- -1 1 1 1 1 1 - -query IIITTT -SELECT * FROM xyu LEFT OUTER JOIN xyv ON xyu.x = xyv.x AND xyu.y = xyv.y AND xyu.x = 1 AND xyu.y < 10 ----- -0 0 0 NULL NULL NULL -1 1 1 1 1 1 -3 1 31 NULL NULL NULL -3 2 32 NULL NULL NULL -4 4 44 NULL NULL NULL - -query IIIIII -SELECT * FROM xyu RIGHT OUTER JOIN xyv ON xyu.x = xyv.x AND xyu.y = xyv.y AND xyu.x = 1 AND xyu.y < 10 order by v ----- -1 1 1 1 1 1 -NULL NULL NULL 2 2 2 -NULL NULL NULL 3 1 31 -NULL NULL NULL 3 3 33 -NULL NULL NULL 5 5 55 - -query IIII -SELECT * FROM (SELECT * FROM xyu ORDER BY x, y) AS xyu LEFT OUTER JOIN (SELECT * FROM xyv ORDER BY x, y) AS xyv USING(x, y) WHERE x > 2 ----- -3 1 31 31 -3 2 32 NULL -4 4 44 NULL - -query IIII -SELECT * FROM (SELECT * FROM xyu ORDER BY x, y) AS xyu RIGHT OUTER JOIN (SELECT * FROM xyv ORDER BY x, y) AS xyv USING(x, y) WHERE x > 2 order by v ----- -31 3 1 31 -NULL 3 3 33 -NULL 5 5 55 - -# query -# SELECT * FROM (SELECT * FROM xyu ORDER BY x, y) AS xyu FULL OUTER JOIN (SELECT * FROM xyv ORDER BY x, y) AS xyv USING(x, y) WHERE x > 2 - -query IIITTT -SELECT * FROM (SELECT * FROM xyu ORDER BY x, y) AS xyu LEFT OUTER JOIN (SELECT * FROM xyv ORDER BY x, y) AS xyv ON xyu.x = xyv.x AND xyu.y = xyv.y AND xyu.x = 1 AND xyu.y < 10 ----- -0 0 0 NULL NULL NULL -1 1 1 1 1 1 -3 1 31 NULL NULL NULL -3 2 32 NULL NULL NULL -4 4 44 NULL NULL NULL - -query IIIIII -SELECT * FROM xyu RIGHT OUTER JOIN (SELECT * FROM xyv ORDER BY x, y) AS xyv ON xyu.x = xyv.x AND xyu.y = xyv.y AND xyu.x = 1 AND xyu.y < 10 ORDER BY v ----- -1 1 1 1 1 1 -NULL NULL NULL 2 2 2 -NULL NULL NULL 3 1 31 -NULL NULL NULL 3 3 33 -NULL NULL NULL 5 5 55 - -statement ok -drop table if exists l - -statement ok -drop table if exists r - -statement ok -CREATE TABLE l (a INT, b1 INT) - -statement ok -CREATE TABLE r (a INT, b2 INT) - -statement ok -INSERT INTO l VALUES (1, 1), (2, 1), (3, 1) - -statement ok -INSERT INTO r VALUES (2, 1), (3, 1), (4, 1) - -query III -SELECT * FROM l LEFT OUTER JOIN r USING(a) WHERE a = 1 ----- -1 1 NULL - -query III -SELECT * FROM l LEFT OUTER JOIN r USING(a) WHERE a = 2 ----- -2 1 1 - -query III -SELECT * FROM l RIGHT OUTER JOIN r USING(a) WHERE a = 3 ----- -1 3 1 - -query III -SELECT * FROM l RIGHT OUTER JOIN r USING(a) WHERE a = 4 ----- -NULL 4 1 - -statement ok -drop table if exists foo - -statement ok -drop table if exists bar - -statement ok -CREATE TABLE foo ( a INT, b INT, c FLOAT, d FLOAT) - -statement ok -INSERT INTO foo VALUES (1, 1, 1, 1), (2, 2, 2, 2), (3, 3, 3, 3) - -statement ok -CREATE TABLE bar ( a INT, b FLOAT, c FLOAT, d INT) - -statement ok -INSERT INTO bar VALUES (1, 1, 1, 1), (2, 2, 2, 2), (3, 3, 3, 3) - -query II?? -SELECT * FROM foo NATURAL JOIN bar ----- -1 1 1.0 1.0 -2 2 2.0 2.0 -3 3 3.0 3.0 - -query II??I?I -SELECT * FROM foo JOIN bar USING (b) ----- -1 1 1.0 1.0 1 1.0 1 -2 2 2.0 2.0 2 2.0 2 -3 3 3.0 3.0 3 3.0 3 - -query II???I -SELECT * FROM foo JOIN bar USING (a, b) ----- -1 1 1.0 1.0 1.0 1 -2 2 2.0 2.0 2.0 2 -3 3 3.0 3.0 3.0 3 - -query II??I -SELECT * FROM foo JOIN bar USING (a, b, c) ----- -1 1 1.0 1.0 1 -2 2 2.0 2.0 2 -3 3 3.0 3.0 3 - -query II??I??I -SELECT * FROM foo JOIN bar ON foo.b = bar.b ----- -1 1 1.0 1.0 1 1.0 1.0 1 -2 2 2.0 2.0 2 2.0 2.0 2 -3 3 3.0 3.0 3 3.0 3.0 3 - -query II??I??I -SELECT * FROM foo JOIN bar ON foo.a = bar.a AND foo.b = bar.b ----- -1 1 1.0 1.0 1 1.0 1.0 1 -2 2 2.0 2.0 2 2.0 2.0 2 -3 3 3.0 3.0 3 3.0 3.0 3 - -query II??I??I -SELECT * FROM foo, bar WHERE foo.b = bar.b ----- -1 1 1.0 1.0 1 1.0 1.0 1 -2 2 2.0 2.0 2 2.0 2.0 2 -3 3 3.0 3.0 3 3.0 3.0 3 - -query II??I??I -SELECT * FROM foo, bar WHERE foo.a = bar.a AND foo.b = bar.b ----- -1 1 1.0 1.0 1 1.0 1.0 1 -2 2 2.0 2.0 2 2.0 2.0 2 -3 3 3.0 3.0 3 3.0 3.0 3 - -query II???I -SELECT * FROM foo JOIN bar USING (a, b) WHERE foo.c = bar.c AND foo.d = bar.d ----- -1 1 1.0 1.0 1.0 1 -2 2 2.0 2.0 2.0 2 -3 3 3.0 3.0 3.0 3 - -query TII -SELECT * FROM onecolumn AS a(x) RIGHT JOIN twocolumn ON false order by y ----- -NULL 42 53 -NULL 44 51 -NULL 45 45 -NULL NULL 52 - -statement ok -SELECT * FROM onecolumn AS a(x) RIGHT JOIN twocolumn ON true where false order by y - -statement ok -SELECT * FROM onecolumn AS a(x) LEFT JOIN twocolumn ON true where twocolumn.x > 50 order by y - -statement ok -insert into onecolumn values(42) - -query II -select * from onecolumn as a right semi join twocolumn as b on a.x = b.x order by b.x ----- -42 53 -44 51 - -query II -select * from onecolumn as a right anti join twocolumn as b on a.x = b.x order by b.x ----- -45 45 -NULL 52 - -query II -select * from onecolumn as a right semi join twocolumn as b on a.x = b.x and a.x > 42 order by b.x ----- -44 51 - -query II -select * from onecolumn as a right anti join twocolumn as b on a.x = b.x and a.x > 42 order by b.x ----- -42 53 -45 45 -NULL 52 - -query II -select * from onecolumn as a right semi join twocolumn as b on a.x = b.x and b.x > 42 order by b.x ----- -44 51 - -query II -select * from onecolumn as a right anti join twocolumn as b on a.x = b.x and b.x > 42 order by b.x ----- -42 53 -45 45 -NULL 52 - -query II -select * from onecolumn as a right semi join twocolumn as b on true order by b.x ----- -42 53 -44 51 -45 45 -NULL 52 - -statement ok -select * from onecolumn as a right anti join twocolumn as b on true order by b.x - -statement ok -select * from onecolumn as a right semi join twocolumn as b on false order by b.x - -query II -select * from onecolumn as a right anti join twocolumn as b on false order by b.x ----- -42 53 -44 51 -45 45 -NULL 52 - -query III -select * from onecolumn as a left join twocolumn as b on a.x = b.x where b.x > 42 ----- -44 44 51 - -query III -select * from onecolumn as a left join twocolumn as b on a.x = b.x where b.x > 44 or b.x < 43 ----- -42 42 53 -42 42 53 - -query III -select * from onecolumn as a left join twocolumn as b on a.x = b.x where b.x > 42 and b.x < 45 ----- -44 44 51 - -# query -# SELECT column1, column1+1FROM (SELECT * FROM (VALUES (NULL, NULL)) AS t NATURAL FULL OUTER JOIN (VALUES (1, 1)) AS u) - -# query -# SELECT * FROM (VALUES (1, 2)) a(a1,a2) FULL JOIN (VALUES (3, 4)) b(b1,b2) ON a1=b1 ORDER BY a2 - -# statement ok -# drop table if exists abcd - -# statement ok -# drop table if exists dxby - -# statement ok -# CREATE TABLE abcd (a INT, b INT, c INT, d INT) - -# statement ok -# INSERT INTO abcd VALUES (1, 1, 1, 1), (2, 2, 2, 2) - -# statement ok -# CREATE TABLE dxby (d INT, x INT, b INT, y INT) - -# statement ok -# INSERT INTO dxby VALUES (2, 2, 2, 2), (3, 3, 3, 3) - -# query -# SELECT * FROM abcd NATURAL FULL OUTER JOIN dxby - -# query -# SELECT abcd.*, dxby.* FROM abcd NATURAL FULL OUTER JOIN dxby - -# query -# SELECT abcd.*, dxby.* FROM abcd INNER JOIN dxby USING (d, b) - -statement ok -set enable_runtime_filter = 0; diff --git a/tests/sqllogictests/suites/query/runtime_filter_join.test b/tests/sqllogictests/suites/query/runtime_filter_join.test deleted file mode 100644 index c04317d784b27..0000000000000 --- a/tests/sqllogictests/suites/query/runtime_filter_join.test +++ /dev/null @@ -1,520 +0,0 @@ -statement ok -use default - -statement ok -set enable_runtime_filter = 1 - -statement ok -drop table if exists t1 - - -statement ok -create table t1(a int, b int) - - -statement ok -insert into t1 values(7, 8), (3, 4), (5, 6) - - -statement ok -drop table if exists t2 - - -statement ok -create table t2(a int, d int) - - -statement ok -insert into t2 values(1, 2), (3, 4), (5, 6) - - -query III -select * from t1 join t2 using(a) order by t1.a, t2.a ----- -3 4 4 -5 6 6 - -query I -select t1.a from t1 join t2 using(a) order by t1.a, t2.a ----- -3 -5 - -query II -select t1.a, t2.a from t1 join t2 using(a) order by t1.a, t2.a ----- -3 3 -5 5 - -query I -select a from t1 join t2 using(a) order by t1.a, t2.a ----- -3 -5 - -query I -select t2.d from t1 join t2 using(a) order by t1.a, t2.a ----- -4 -6 - -query I -select a from (select number as a from numbers(3)) n join (select number as a from numbers(4)) nn using (a) order by a ----- -0 -1 -2 - -query III -select * from t1 natural join t2 order by t1.a, t2.a ----- -3 4 4 -5 6 6 - - -statement ok -drop table t1 - - -statement ok -drop table t2 - -statement ok -create table t1(a int, b int) - - -statement ok -insert into t1 values(1, 2), (1, 3), (2, 4) - - -statement ok -create table t2(c int, d int) - - -statement ok -insert into t2 values(1, 2), (2, 6) - - -query IIII -select * from t2 inner join t1 on t1.a = t2.c order by a, b, c, d ----- -1 2 1 2 -1 2 1 3 -2 6 2 4 - - -statement ok -drop table t1 - - -statement ok -drop table t2 - -statement ok -create table t1(a int, b int) - - -statement ok -create table t2(c int, d int) - - -statement ok -insert into t1 values(1, 2), (3 ,4), (7, 8) - - -statement ok -insert into t2 values(1, 4), (2, 3), (6, 8) - - -query IIII -select * from t1 right join t2 on t1.a = t2.c order by t2.c ----- -1 2 1 4 -NULL NULL 2 3 -NULL NULL 6 8 - -query IIII -select * from t1 right join t2 on t1.a > t2.c order by t2.c ----- -7 8 1 4 -3 4 1 4 -7 8 2 3 -3 4 2 3 -7 8 6 8 - -query IIII -select * from t1 left join t2 on t1.a = t2.c order by t2.c ----- -1 2 1 4 -3 4 NULL NULL -7 8 NULL NULL - - -query IIII -select * from t1 left outer join t2 on t1.a = t2.c and t1.a > 3 order by a,b,c,d ----- -1 2 NULL NULL -3 4 NULL NULL -7 8 NULL NULL - - -query IIII -select * from t1 left outer join t2 on t1.a = t2.c and t2.c > 4 order by a,b,c,d ----- -1 2 NULL NULL -3 4 NULL NULL -7 8 NULL NULL - - -query IIII -select * from t1 left outer join t2 on t2.c > 4 and t1.a > 3 order by a,b,c,d ----- -1 2 NULL NULL -3 4 NULL NULL -7 8 6 8 - - -query IIII -select * from t1 left outer join t2 on t1.a > 3 order by a,b,c,d ----- -1 2 NULL NULL -3 4 NULL NULL -7 8 1 4 -7 8 2 3 -7 8 6 8 - - -query IIII -select * from t1 left outer join t2 on t2.c > 4 order by a,b,c,d ----- -1 2 6 8 -3 4 6 8 -7 8 6 8 - - -query IIII -select * from t1 left outer join t2 on t1.a > t2.c order by a,b,c,d ----- -1 2 NULL NULL -3 4 1 4 -3 4 2 3 -7 8 1 4 -7 8 2 3 -7 8 6 8 - - -query III -select t1.a, t2.c, t2.d from t1 left join t2 on t2.c in ( -t2.c ); ----- -1 NULL NULL -3 NULL NULL -7 NULL NULL - - -statement ok -drop table t1 - - -statement ok -drop table t2 - -statement ok -drop table if exists t - -statement ok -create table t(a int) - -statement ok -insert into t values(1),(2),(3) - -statement ok -create table t1(b float) - - -statement ok -insert into t1 values(1.0),(2.0),(3.0) - -statement ok -create table t2(c smallint unsigned null) - - -statement ok -insert into t2 values(1),(2),(null) - -query T -select TRY_CAST((c + 1) AS Int64 NULL) from t2; ----- -2 -3 -NULL - - -query IF -select * from t inner join t1 on t.a = t1.b order by a, b ----- -1 1.0 -2 2.0 -3 3.0 - - -query II -select * from t inner join t2 on t.a = t2.c order by a, c ----- -1 1 -2 2 - - -query II -select * from t inner join t2 on t.a = t2.c + 1 order by a, c ----- -2 1 -3 2 - - -query II -select * from t inner join t2 on t.a = t2.c + 1 and t.a - 1 = t2.c order by a, c ----- -2 1 -3 2 - - -query FI -select * from t1 inner join t on t.a = t1.b order by a, b ----- -1.0 1 -2.0 2 -3.0 3 - - -query II -select * from t2 inner join t on t.a = t2.c order by a, c ----- -1 1 -2 2 - - -query II -select * from t2 inner join t on t.a = t2.c + 1 order by a, c ----- -1 2 -2 3 - - -query II -select * from t2 inner join t on t.a = t2.c + 1 and t.a - 1 = t2.c order by a, c ----- -1 2 -2 3 - - -query I -select count(*) from numbers(1000) as t inner join numbers(1000) as t1 on t.number = t1.number ----- -1000 - - -query I -select t.number from numbers(10000) as t inner join numbers(1000) as t1 on t.number % 1000 = t1.number order by number limit 5 ----- -0 -1 -2 -3 -4 - -statement ok -drop table t - -statement ok -drop table if exists t3 - -statement ok -CREATE TABLE t3(c0 BIGINT NULL, c1 DOUBLE NULL) - -statement ok -drop table if exists t4 - -statement ok -CREATE TABLE t4(c0 FLOAT NULL) - -query I -SELECT SUM(count) FROM (SELECT ((false IS NOT NULL AND false) ::INT64)as count FROM t4 NATURAL LEFT JOIN t3) as res ----- -NULL - -statement ok -drop table t1 - -statement ok -drop table t2 - -statement ok -drop table t3 - -statement ok -drop table t4 - -statement ok -create table t1_null(a int null , b int null) - - -statement ok -create table t2_null(a int null , b int null) - - -statement ok -insert into t1_null values(1, 2), (2, 3), (null, 1) - - -statement ok -insert into t2_null values(3, 4), (2, 3), (null, 2) - - -query IIII -select * from t1_null inner join t2_null on t1_null.a = t2_null.a ----- -2 3 2 3 - -statement ok -drop table t1_null - -statement ok -drop table t2_null - -query I -select * from (SELECT number AS a FROM numbers(10)) x left join (SELECT number AS a FROM numbers(5)) y using(a) order by x.a ----- -0 -1 -2 -3 -4 -5 -6 -7 -8 -9 - -query I -select * from (SELECT number AS a FROM numbers(10)) x right join (SELECT number AS a FROM numbers(5)) y using(a) order by x.a ----- -0 -1 -2 -3 -4 - -query II -select * from (SELECT number AS a FROM numbers(1000)) x right join (SELECT number AS a FROM numbers(5)) y on x.a = y.a order by x.a ----- -0 0 -1 1 -2 2 -3 3 -4 4 - -query II -select * from numbers(10) x join (select 1::UInt64 number) y on x.number = y.number ----- -1 1 - -statement ok -drop table if exists onecolumn - -statement ok -CREATE TABLE onecolumn (x INT NULL) - -statement ok -INSERT INTO onecolumn(x) VALUES (44), (NULL), (42) - -statement ok -drop table if exists empty - -statement ok -CREATE TABLE empty (x INT) - -statement ok -SELECT * FROM onecolumn AS a(x) CROSS JOIN empty AS b(y) - -statement ok -drop table onecolumn - -statement ok -drop table empty - -statement ok -drop table if exists z0 - -statement ok -drop table if exists z1 - -statement ok -CREATE TABLE z0(c0BOOLEAN BOOLEAN NULL, c1FLOAT DOUBLE NULL) - -statement ok -CREATE TABLE z1(c0BOOLEAN BOOL NULL DEFAULT(true)) - -statement ok -INSERT INTO z0(c1float, c0boolean) VALUES (0.27563244104385376, false), (0.7913353443145752, false) - -statement ok -select * from z1 right join z0 on false limit 0 - - -statement ok -drop table z0 - -statement ok -drop table z1 - -statement ok -CREATE TABLE t0(c0BOOLEAN BOOLEAN NULL DEFAULT(false)); - -statement ok -CREATE TABLE t1(c0BOOLEAN BOOL NULL, c1FLOAT FLOAT NOT NULL DEFAULT(0.4661566913127899)); - -statement ok -CREATE TABLE t2(c0VARCHAR VARCHAR NULL, c1FLOAT DOUBLE NULL DEFAULT(0.954969048500061), c2VARCHAR VARCHAR NULL); - -statement ok -INSERT INTO t0(c0boolean) VALUES (false), (true); - -statement ok -INSERT INTO t0(c0boolean) VALUES (false), (false), (true); - -statement ok -INSERT INTO t1(c1float) VALUES (0.43919482827186584); - -statement ok -INSERT INTO t1(c1float) VALUES (0.2492278516292572); - -statement ok -INSERT INTO t2(c1float) VALUES (0.9702655076980591); - -statement ok -INSERT INTO t2(c1float, c2varchar) VALUES (0.5340723991394043, '02'), (0.4661566913127899, '1261837'); - -statement ok -SELECT t0.c0boolean, t1.c0boolean, t1.c1float FROM t0, t1 RIGHT JOIN t2 ON t1.c0boolean; - - -statement ok -drop table t0; - -statement ok -drop table t1; - -statement ok -drop table t2; - -statement ok -set max_block_size = 1; - -# https://github.com/datafuselabs/databend/pull/10140 -query I -select count(*) from numbers(10000) as t1 inner join numbers(10000) as t2 on t1.number = t2.number ----- -10000 - -statement ok -set enable_runtime_filter = 0