diff --git a/src/common/storage/src/metrics/merge_into.rs b/src/common/storage/src/metrics/merge_into.rs index c36f452fbef5c..be13fbe0d65ef 100644 --- a/src/common/storage/src/metrics/merge_into.rs +++ b/src/common/storage/src/metrics/merge_into.rs @@ -35,11 +35,35 @@ lazy_static! { register_counter(key!("merge_into_deleted_blocks_rows_counter")); static ref MERGE_INTO_APPEND_BLOCKS_COUNTER: Counter = register_counter(key!("merge_into_append_blocks_counter")); + static ref MERGE_INTO_DISTRIBUTED_HASHTABLE_FETCH_ROWNUMBER: Counter = + register_counter(key!("merge_into_distributed_hashtable_fetch_row_number")); + static ref MERGE_INTO_DISTRIBUTED_HASHTABLE_EMPTY_BLOCK: Counter = + register_counter(key!("merge_into_distributed_hashtable_empty_block")); + static ref MERGE_INTO_DISTRIBUTED_GENERATE_ROW_NUMBERS: Counter = + register_counter(key!("merge_into_distributed_generate_row_numbers")); + static ref MERGE_INTO_DISTRIBUTED_INIT_UNIQUE_NUMBER: Counter = + register_counter(key!("merge_into_distributed_init_unique_number")); + static ref MERGE_INTO_DISTRIBUTED_NEW_SET_LEN: Counter = + register_counter(key!("merge_into_distributed_new_set_len")); + static ref MERGE_INTO_DISTRIBUTED_HASHTABLE_PUSH_EMPTY_NULL_BLOCK: Counter = register_counter( + key!("merge_into_distributed_hashtable_push_empty_null_block") + ); + static ref MERGE_INTO_DISTRIBUTED_HASHTABLE_PUSH_NULL_BLOCK: Counter = + register_counter(key!("merge_into_distributed_hashtable_push_null_block")); + static ref MERGE_INTO_DISTRIBUTED_HASHTABLE_PUSH_NULL_BLOCK_ROWS: Counter = register_counter( + key!("merge_into_distributed_hashtable_push_null_block_rows") + ); static ref MERGE_INTO_APPEND_BLOCKS_ROWS_COUNTER: Counter = register_counter(key!("merge_into_append_blocks_rows_counter")); static ref MERGE_INTO_MATCHED_ROWS: Counter = register_counter(key!("merge_into_matched_rows")); static ref MERGE_INTO_UNMATCHED_ROWS: Counter = register_counter(key!("merge_into_unmatched_rows")); + static ref MERGE_INTO_DISTRIBUTED_DEDUPLICATE_ROWNUMBER: Counter = + register_counter(key!("merge_into_distributed_deduplicate_row_number")); + static ref MERGE_INTO_DISTRIBUTED_EMPTY_ROWNUMBER: Counter = + register_counter(key!("merge_into_distributed_empty_row_number")); + static ref MERGE_INTO_DISTRIBUTED_APPLY_ROWNUMBER: Counter = + register_counter(key!("merge_into_distributed_apply_row_number")); static ref MERGE_INTO_ACCUMULATE_MILLISECONDS: Histogram = register_histogram_in_milliseconds(key!("merge_into_accumulate_milliseconds")); static ref MERGE_INTO_APPLY_MILLISECONDS: Histogram = @@ -73,6 +97,50 @@ pub fn metrics_inc_merge_into_append_blocks_counter(c: u32) { MERGE_INTO_APPEND_BLOCKS_COUNTER.inc_by(c as u64); } +pub fn merge_into_distributed_deduplicate_row_number(c: u32) { + MERGE_INTO_DISTRIBUTED_DEDUPLICATE_ROWNUMBER.inc_by(c as u64); +} + +pub fn merge_into_distributed_empty_row_number(c: u32) { + MERGE_INTO_DISTRIBUTED_EMPTY_ROWNUMBER.inc_by(c as u64); +} + +pub fn merge_into_distributed_apply_row_number(c: u32) { + MERGE_INTO_DISTRIBUTED_APPLY_ROWNUMBER.inc_by(c as u64); +} + +pub fn merge_into_distributed_hashtable_fetch_row_number(c: u32) { + MERGE_INTO_DISTRIBUTED_HASHTABLE_FETCH_ROWNUMBER.inc_by(c as u64); +} + +pub fn merge_into_distributed_hashtable_empty_block(c: u32) { + MERGE_INTO_DISTRIBUTED_HASHTABLE_EMPTY_BLOCK.inc_by(c as u64); +} + +pub fn merge_into_distributed_generate_row_numbers(c: u32) { + MERGE_INTO_DISTRIBUTED_GENERATE_ROW_NUMBERS.inc_by(c as u64); +} + +pub fn merge_into_distributed_init_unique_number(c: u32) { + MERGE_INTO_DISTRIBUTED_INIT_UNIQUE_NUMBER.inc_by(c as u64); +} + +pub fn merge_into_distributed_new_set_len(c: u32) { + MERGE_INTO_DISTRIBUTED_NEW_SET_LEN.inc_by(c as u64); +} + +pub fn merge_into_distributed_hashtable_push_empty_null_block(c: u32) { + MERGE_INTO_DISTRIBUTED_HASHTABLE_PUSH_EMPTY_NULL_BLOCK.inc_by(c as u64); +} + +pub fn merge_into_distributed_hashtable_push_null_block(c: u32) { + MERGE_INTO_DISTRIBUTED_HASHTABLE_PUSH_NULL_BLOCK.inc_by(c as u64); +} + +pub fn merge_into_distributed_hashtable_push_null_block_rows(c: u32) { + MERGE_INTO_DISTRIBUTED_HASHTABLE_PUSH_NULL_BLOCK_ROWS.inc_by(c as u64); +} + pub fn metrics_inc_merge_into_append_blocks_rows_counter(c: u32) { MERGE_INTO_APPEND_BLOCKS_ROWS_COUNTER.inc_by(c as u64); } diff --git a/src/query/service/src/pipelines/builders/builder_merge_into.rs b/src/query/service/src/pipelines/builders/builder_merge_into.rs index 3265a4393debf..d915a8556189b 100644 --- a/src/query/service/src/pipelines/builders/builder_merge_into.rs +++ b/src/query/service/src/pipelines/builders/builder_merge_into.rs @@ -44,6 +44,7 @@ use common_storages_fuse::operations::merge_into::TransformAddRowNumberColumnPro use common_storages_fuse::operations::TransformSerializeBlock; use common_storages_fuse::FuseTable; +use crate::pipelines::processors::transforms::AccumulateRowNumber; use crate::pipelines::processors::transforms::ExtractHashTableByRowNumber; use crate::pipelines::processors::transforms::TransformAddComputedColumns; use crate::pipelines::processors::DeduplicateRowNumber; @@ -638,6 +639,20 @@ impl PipelineBuilder { pipe_items, )); + // accumulate row_number + if *distributed { + let pipe_items = vec![ + create_dummy_item(), + create_dummy_item(), + AccumulateRowNumber::create()?.into_pipe_item(), + ]; + self.main_pipeline.add_pipe(Pipe::create( + self.main_pipeline.output_len(), + get_output_len(&pipe_items), + pipe_items, + )); + } + Ok(()) } } diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index ce31f31413947..d7a29454dfbf9 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -16,6 +16,7 @@ mod aggregator; pub mod group_by; pub(crate) mod hash_join; mod metrics; +mod processor_accumulate_row_number; mod processor_deduplicate_row_number; mod processor_extract_hash_table_by_row_number; pub(crate) mod range_join; @@ -59,6 +60,7 @@ use common_pipeline_transforms::processors::transforms::transform_sort_partial; pub use hash_join::FixedKeyHashJoinHashTable; pub use hash_join::HashJoinDesc; pub use hash_join::HashJoinState; +pub use processor_accumulate_row_number::AccumulateRowNumber; pub use processor_deduplicate_row_number::DeduplicateRowNumber; pub use processor_extract_hash_table_by_row_number::ExtractHashTableByRowNumber; pub use range_join::RangeJoinState; diff --git a/src/query/service/src/pipelines/processors/transforms/processor_accumulate_row_number.rs b/src/query/service/src/pipelines/processors/transforms/processor_accumulate_row_number.rs new file mode 100644 index 0000000000000..f96bc4013ab04 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/processor_accumulate_row_number.rs @@ -0,0 +1,88 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_exception::Result; +use common_expression::types::DataType; +use common_expression::types::NumberDataType; +use common_expression::DataBlock; +use common_pipeline_core::pipe::PipeItem; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransform; +use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer; + +pub struct AccumulateRowNumber { + data_blocks: Vec, +} + +#[async_trait::async_trait] +impl AsyncAccumulatingTransform for AccumulateRowNumber { + const NAME: &'static str = "AccumulateRowNumber"; + + #[async_backtrace::framed] + async fn transform(&mut self, data: DataBlock) -> Result> { + self.accumulate(data).await?; + // no partial output + Ok(None) + } + + #[async_backtrace::framed] + async fn on_finish(&mut self, _output: bool) -> Result> { + self.apply().await + } +} + +impl AccumulateRowNumber { + #[async_backtrace::framed] + pub async fn accumulate(&mut self, data_block: DataBlock) -> Result<()> { + // if matched all source data, we will get an empty block, but which + // has source join schema,not only row_number,for combound_block project, + // it will do nothing for empty block. + if !data_block.is_empty() { + assert_eq!(data_block.num_columns(), 1); + assert_eq!( + data_block.get_by_offset(0).data_type, + DataType::Number(NumberDataType::UInt64) + ); + } + + self.data_blocks.push(data_block); + Ok(()) + } + + #[async_backtrace::framed] + pub async fn apply(&mut self) -> Result> { + // row_numbers is small, so concat is ok. + Ok(Some(DataBlock::concat(&self.data_blocks)?)) + } +} + +impl AccumulateRowNumber { + pub fn create() -> Result { + Ok(Self { + data_blocks: Vec::with_capacity(10), + }) + } + + pub fn into_pipe_item(self) -> PipeItem { + let input = InputPort::create(); + let output = OutputPort::create(); + let processor_ptr = + AsyncAccumulatingTransformer::create(input.clone(), output.clone(), self); + PipeItem::create(ProcessorPtr::create(processor_ptr), vec![input], vec![ + output, + ]) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/processor_deduplicate_row_number.rs b/src/query/service/src/pipelines/processors/transforms/processor_deduplicate_row_number.rs index 5b113e7160f64..9508579fc805e 100644 --- a/src/query/service/src/pipelines/processors/transforms/processor_deduplicate_row_number.rs +++ b/src/query/service/src/pipelines/processors/transforms/processor_deduplicate_row_number.rs @@ -16,6 +16,8 @@ use std::collections::HashSet; use common_arrow::arrow::buffer::Buffer; use common_exception::Result; +use common_expression::types::DataType; +use common_expression::types::NumberDataType; use common_expression::types::UInt64Type; use common_expression::DataBlock; use common_expression::FromData; @@ -25,7 +27,13 @@ use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransform; use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer; +use common_storage::metrics::merge_into::merge_into_distributed_apply_row_number; +use common_storage::metrics::merge_into::merge_into_distributed_deduplicate_row_number; +use common_storage::metrics::merge_into::merge_into_distributed_empty_row_number; +use common_storage::metrics::merge_into::merge_into_distributed_init_unique_number; +use common_storage::metrics::merge_into::merge_into_distributed_new_set_len; use itertools::Itertools; +use log::info; pub struct DeduplicateRowNumber { unique_row_number: HashSet, @@ -61,15 +69,21 @@ impl DeduplicateRowNumber { // but if there is still also some data unmatched, we won't receive // an empty block. if data_block.is_empty() { + merge_into_distributed_empty_row_number(1); self.unique_row_number.clear(); self.accepted_data = true; return Ok(()); } let row_number_vec = get_row_number(&data_block, 0); - + merge_into_distributed_deduplicate_row_number(data_block.num_rows() as u32); if !self.accepted_data { self.unique_row_number = row_number_vec.into_iter().collect(); + merge_into_distributed_init_unique_number(self.unique_row_number.len() as u32); + info!( + "init unique_row_number_len:{}", + self.unique_row_number.len(), + ); self.accepted_data = true; return Ok(()); } @@ -80,6 +94,8 @@ impl DeduplicateRowNumber { new_set.insert(number); } } + merge_into_distributed_new_set_len(new_set.len() as u32); + info!("init new_set_len:{}", new_set.len()); self.unique_row_number = new_set; Ok(()) } @@ -87,6 +103,7 @@ impl DeduplicateRowNumber { #[async_backtrace::framed] pub async fn apply(&mut self) -> Result> { let row_number_vecs = self.unique_row_number.clone().into_iter().collect_vec(); + merge_into_distributed_apply_row_number(row_number_vecs.len() as u32); Ok(Some(DataBlock::new_from_columns(vec![ UInt64Type::from_data(row_number_vecs), ]))) @@ -95,6 +112,10 @@ impl DeduplicateRowNumber { pub(crate) fn get_row_number(data_block: &DataBlock, row_number_idx: usize) -> Buffer { let row_number_col = data_block.get_by_offset(row_number_idx); + assert_eq!( + row_number_col.data_type, + DataType::Number(NumberDataType::UInt64) + ); let value = row_number_col.value.try_downcast::().unwrap(); match value { common_expression::Value::Scalar(scalar) => Buffer::from(vec![scalar]), diff --git a/src/query/service/src/pipelines/processors/transforms/processor_extract_hash_table_by_row_number.rs b/src/query/service/src/pipelines/processors/transforms/processor_extract_hash_table_by_row_number.rs index 2ecd8f65b66c6..cd6cc289be866 100644 --- a/src/query/service/src/pipelines/processors/transforms/processor_extract_hash_table_by_row_number.rs +++ b/src/query/service/src/pipelines/processors/transforms/processor_extract_hash_table_by_row_number.rs @@ -31,10 +31,14 @@ use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::Event; use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_core::processors::Processor; +use common_storage::metrics::merge_into::merge_into_distributed_hashtable_empty_block; +use common_storage::metrics::merge_into::merge_into_distributed_hashtable_fetch_row_number; +use common_storage::metrics::merge_into::merge_into_distributed_hashtable_push_empty_null_block; +use common_storage::metrics::merge_into::merge_into_distributed_hashtable_push_null_block; +use common_storage::metrics::merge_into::merge_into_distributed_hashtable_push_null_block_rows; use super::hash_join::HashJoinBuildState; use super::processor_deduplicate_row_number::get_row_number; - pub struct ExtractHashTableByRowNumber { input_port: Arc, output_port: Arc, @@ -112,9 +116,11 @@ impl Processor for ExtractHashTableByRowNumber { fn process(&mut self) -> Result<()> { if let Some(data_block) = self.input_data.take() { if data_block.is_empty() { + merge_into_distributed_hashtable_empty_block(1); return Ok(()); } + merge_into_distributed_hashtable_fetch_row_number(data_block.num_rows() as u32); let row_number_vec = get_row_number(&data_block, 0); let length = row_number_vec.len(); let row_number_set: HashSet = row_number_vec.into_iter().collect(); @@ -148,6 +154,14 @@ impl Processor for ExtractHashTableByRowNumber { filtered_block.num_rows(), ); null_block.merge_block(filtered_block); + if null_block.is_empty() { + merge_into_distributed_hashtable_push_empty_null_block(1); + } else { + merge_into_distributed_hashtable_push_null_block(1); + merge_into_distributed_hashtable_push_null_block_rows( + null_block.num_rows() as u32, + ); + } self.output_data.push(null_block); } } diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/transform_add_rownumber_column.rs b/src/query/storages/fuse/src/operations/merge_into/processors/transform_add_rownumber_column.rs index e13f689eb0b51..e014082f4e542 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/transform_add_rownumber_column.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/transform_add_rownumber_column.rs @@ -29,6 +29,7 @@ use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_transforms::processors::transforms::Transform; use common_pipeline_transforms::processors::transforms::Transformer; +use common_storage::metrics::merge_into::merge_into_distributed_generate_row_numbers; const PREFIX_OFFSET: usize = 48; @@ -74,11 +75,13 @@ impl Transform for TransformAddRowNumberColumnProcessor { for number in row_number..row_number + num_rows { row_numbers.push(number); } + merge_into_distributed_generate_row_numbers(row_numbers.len() as u32); let mut data_block = data; let row_number_entry = BlockEntry::new( DataType::Number(NumberDataType::UInt64), Value::Column(UInt64Type::from_data(row_numbers)), ); + data_block.add_column(row_number_entry); Ok(data_block) }