Skip to content

Commit

Permalink
chore: remove old runtime filter (#13896)
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 authored Dec 1, 2023
1 parent 7963416 commit 8dde650
Show file tree
Hide file tree
Showing 40 changed files with 9 additions and 2,580 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ impl MergeIntoInterpreter {
RelOperator::Exchange(_) => {}
RelOperator::UnionAll(_) => {}
RelOperator::DummyTableScan(_) => {}
RelOperator::RuntimeFilterSource(_) => {}
RelOperator::Window(_) => {}
RelOperator::ProjectSet(_) => {}
RelOperator::MaterializedCte(_) => {}
Expand Down
98 changes: 4 additions & 94 deletions src/query/service/src/pipelines/builders/builder_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,14 @@ use std::sync::Arc;

use common_base::base::tokio::sync::Barrier;
use common_exception::Result;
use common_pipeline_core::processors::InputPort;
use common_pipeline_core::processors::ProcessorPtr;
use common_pipeline_core::Pipe;
use common_pipeline_core::PipeItem;
use common_pipeline_sinks::Sinker;
use common_pipeline_transforms::processors::ProcessorProfileWrapper;
use common_pipeline_transforms::processors::ProfileStub;
use common_pipeline_transforms::processors::Transformer;
use common_sql::executor::physical_plans::HashJoin;
use common_sql::executor::physical_plans::MaterializedCte;
use common_sql::executor::physical_plans::RangeJoin;
use common_sql::executor::physical_plans::RuntimeFilterSource;
use common_sql::executor::PhysicalPlan;
use common_sql::ColumnBinding;
use common_sql::IndexType;
Expand All @@ -42,13 +38,10 @@ use crate::pipelines::processors::transforms::HashJoinProbeState;
use crate::pipelines::processors::transforms::MaterializedCteSink;
use crate::pipelines::processors::transforms::MaterializedCteState;
use crate::pipelines::processors::transforms::ProbeSpillState;
use crate::pipelines::processors::transforms::RuntimeFilterState;
use crate::pipelines::processors::transforms::TransformHashJoinBuild;
use crate::pipelines::processors::transforms::TransformHashJoinProbe;
use crate::pipelines::processors::HashJoinDesc;
use crate::pipelines::processors::HashJoinState;
use crate::pipelines::processors::SinkRuntimeFilterSource;
use crate::pipelines::processors::TransformRuntimeFilter;
use crate::pipelines::PipelineBuilder;
use crate::sessions::QueryContext;

Expand Down Expand Up @@ -202,17 +195,11 @@ impl PipelineBuilder {
Ok(ProcessorPtr::create(transform))
}
};
if hash_join_plan.contain_runtime_filter {
build_res.main_pipeline.duplicate(false)?;
self.join_state = Some(build_state.clone());
self.index = Some(self.pipelines.len());
} else {
// for merge into
if hash_join_plan.need_hold_hash_table {
self.join_state = Some(build_state.clone())
}
build_res.main_pipeline.add_sink(create_sink_processor)?;
// for merge into
if hash_join_plan.need_hold_hash_table {
self.join_state = Some(build_state.clone())
}
build_res.main_pipeline.add_sink(create_sink_processor)?;

self.pipelines.push(build_res.main_pipeline.finalize());
self.pipelines.extend(build_res.sources_pipelines);
Expand Down Expand Up @@ -301,83 +288,6 @@ impl PipelineBuilder {
Ok(())
}

pub fn build_runtime_filter_source(
&mut self,
runtime_filter_source: &RuntimeFilterSource,
) -> Result<()> {
let state = self.build_runtime_filter_state(self.ctx.clone(), runtime_filter_source)?;
self.expand_runtime_filter_source(&runtime_filter_source.right_side, state.clone())?;
self.build_runtime_filter(&runtime_filter_source.left_side, state)?;
Ok(())
}

fn expand_runtime_filter_source(
&mut self,
_right_side: &PhysicalPlan,
state: Arc<RuntimeFilterState>,
) -> Result<()> {
let pipeline = &mut self.pipelines[self.index.unwrap()];
let output_size = pipeline.output_len();
debug_assert!(output_size % 2 == 0);

let mut items = Vec::with_capacity(output_size);
// Join
// / \
// / \
// RFSource \
// / \ \
// / \ \
// scan t1 scan t2
for _ in 0..output_size / 2 {
let input = InputPort::create();
items.push(PipeItem::create(
ProcessorPtr::create(TransformHashJoinBuild::try_create(
input.clone(),
self.join_state.as_ref().unwrap().clone(),
None,
)?),
vec![input],
vec![],
));
let input = InputPort::create();
items.push(PipeItem::create(
ProcessorPtr::create(Sinker::<SinkRuntimeFilterSource>::create(
input.clone(),
SinkRuntimeFilterSource::new(state.clone()),
)),
vec![input],
vec![],
));
}
pipeline.add_pipe(Pipe::create(output_size, 0, items));
Ok(())
}

fn build_runtime_filter(
&mut self,
left_side: &PhysicalPlan,
state: Arc<RuntimeFilterState>,
) -> Result<()> {
self.build_pipeline(left_side)?;
self.main_pipeline.add_transform(|input, output| {
let processor = TransformRuntimeFilter::create(input, output, state.clone());
Ok(ProcessorPtr::create(processor))
})?;
Ok(())
}

fn build_runtime_filter_state(
&self,
ctx: Arc<QueryContext>,
runtime_filter_source: &RuntimeFilterSource,
) -> Result<Arc<RuntimeFilterState>> {
Ok(Arc::new(RuntimeFilterState::new(
ctx,
runtime_filter_source.left_runtime_filters.clone(),
runtime_filter_source.right_runtime_filters.clone(),
)))
}

pub(crate) fn build_materialized_cte(
&mut self,
materialized_cte: &MaterializedCte,
Expand Down
9 changes: 1 addition & 8 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ pub struct PipelineBuilder {

// probe data_fields for merge into
pub probe_data_fields: Option<Vec<DataField>>,
// Used in runtime filter source
pub join_state: Option<Arc<HashJoinBuildState>>,
// record the index of join build side pipeline in `pipelines`
pub index: Option<usize>,

// Cte -> state, each cte has it's own state
pub cte_state: HashMap<IndexType, Arc<MaterializedCteState>>,
Expand All @@ -72,13 +69,12 @@ impl PipelineBuilder {
func_ctx,
settings,
pipelines: vec![],
join_state: None,
main_pipeline: Pipeline::with_scopes(scopes),
proc_profs: prof_span_set,
exchange_injector: DefaultExchangeInjector::create(),
index: None,
cte_state: HashMap::new(),
probe_data_fields: None,
join_state: None,
}
}

Expand Down Expand Up @@ -134,9 +130,6 @@ impl PipelineBuilder {
PhysicalPlan::Exchange(_) => Err(ErrorCode::Internal(
"Invalid physical plan with PhysicalPlan::Exchange",
)),
PhysicalPlan::RuntimeFilterSource(runtime_filter_source) => {
self.build_runtime_filter_source(runtime_filter_source)
}
PhysicalPlan::RangeJoin(range_join) => self.build_range_join(range_join),
PhysicalPlan::MaterializedCte(materialized_cte) => {
self.build_materialized_cte(materialized_cte)
Expand Down
2 changes: 0 additions & 2 deletions src/query/service/src/pipelines/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ pub(crate) mod transforms;
pub use transforms::DeduplicateRowNumber;
pub use transforms::HashJoinDesc;
pub use transforms::HashJoinState;
pub use transforms::SinkRuntimeFilterSource;
pub use transforms::TransformAddStreamColumns;
pub use transforms::TransformCastSchema;
pub use transforms::TransformCreateSets;
pub use transforms::TransformLimit;
pub use transforms::TransformResortAddOn;
pub use transforms::TransformResortAddOnWithoutSourceSchema;
pub use transforms::TransformRuntimeFilter;
pub use transforms::TransformWindow;
5 changes: 0 additions & 5 deletions src/query/service/src/pipelines/processors/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ mod processor_accumulate_row_number;
mod processor_deduplicate_row_number;
mod processor_extract_hash_table_by_row_number;
pub(crate) mod range_join;
mod runtime_filter;
mod transform_add_computed_columns;
mod transform_add_const_columns;
mod transform_add_stream_columns;
Expand All @@ -31,7 +30,6 @@ mod transform_merge_block;
mod transform_resort_addon;
mod transform_resort_addon_without_source_schema;
mod transform_runtime_cast_schema;
mod transform_runtime_filter;
mod transform_srf;
mod transform_udf;
mod window;
Expand All @@ -41,7 +39,6 @@ pub use processor_accumulate_row_number::AccumulateRowNumber;
pub use processor_deduplicate_row_number::DeduplicateRowNumber;
pub use processor_extract_hash_table_by_row_number::ExtractHashTableByRowNumber;
pub use range_join::RangeJoinState;
pub use runtime_filter::RuntimeFilterState;
pub use transform_add_computed_columns::TransformAddComputedColumns;
pub use transform_add_const_columns::TransformAddConstColumns;
pub use transform_add_stream_columns::TransformAddStreamColumns;
Expand All @@ -56,8 +53,6 @@ pub use transform_merge_block::TransformMergeBlock;
pub use transform_resort_addon::TransformResortAddOn;
pub use transform_resort_addon_without_source_schema::TransformResortAddOnWithoutSourceSchema;
pub use transform_runtime_cast_schema::TransformRuntimeCastSchema;
pub use transform_runtime_filter::SinkRuntimeFilterSource;
pub use transform_runtime_filter::TransformRuntimeFilter;
pub use transform_srf::TransformSRF;
pub use transform_udf::TransformUdf;
pub use window::FrameBound;
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit 8dde650

Please sign in to comment.