Skip to content

feat: add distributed merge into metrics #13594

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Nov 6, 2023
68 changes: 68 additions & 0 deletions src/common/storage/src/metrics/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,35 @@ lazy_static! {
register_counter(key!("merge_into_deleted_blocks_rows_counter"));
static ref MERGE_INTO_APPEND_BLOCKS_COUNTER: Counter =
register_counter(key!("merge_into_append_blocks_counter"));
static ref MERGE_INTO_DISTRIBUTED_HASHTABLE_FETCH_ROWNUMBER: Counter =
register_counter(key!("merge_into_distributed_hashtable_fetch_row_number"));
static ref MERGE_INTO_DISTRIBUTED_HASHTABLE_EMPTY_BLOCK: Counter =
register_counter(key!("merge_into_distributed_hashtable_empty_block"));
static ref MERGE_INTO_DISTRIBUTED_GENERATE_ROW_NUMBERS: Counter =
register_counter(key!("merge_into_distributed_generate_row_numbers"));
static ref MERGE_INTO_DISTRIBUTED_INIT_UNIQUE_NUMBER: Counter =
register_counter(key!("merge_into_distributed_init_unique_number"));
static ref MERGE_INTO_DISTRIBUTED_NEW_SET_LEN: Counter =
register_counter(key!("merge_into_distributed_new_set_len"));
static ref MERGE_INTO_DISTRIBUTED_HASHTABLE_PUSH_EMPTY_NULL_BLOCK: Counter = register_counter(
key!("merge_into_distributed_hashtable_push_empty_null_block")
);
static ref MERGE_INTO_DISTRIBUTED_HASHTABLE_PUSH_NULL_BLOCK: Counter =
register_counter(key!("merge_into_distributed_hashtable_push_null_block"));
static ref MERGE_INTO_DISTRIBUTED_HASHTABLE_PUSH_NULL_BLOCK_ROWS: Counter = register_counter(
key!("merge_into_distributed_hashtable_push_null_block_rows")
);
static ref MERGE_INTO_APPEND_BLOCKS_ROWS_COUNTER: Counter =
register_counter(key!("merge_into_append_blocks_rows_counter"));
static ref MERGE_INTO_MATCHED_ROWS: Counter = register_counter(key!("merge_into_matched_rows"));
static ref MERGE_INTO_UNMATCHED_ROWS: Counter =
register_counter(key!("merge_into_unmatched_rows"));
static ref MERGE_INTO_DISTRIBUTED_DEDUPLICATE_ROWNUMBER: Counter =
register_counter(key!("merge_into_distributed_deduplicate_row_number"));
static ref MERGE_INTO_DISTRIBUTED_EMPTY_ROWNUMBER: Counter =
register_counter(key!("merge_into_distributed_empty_row_number"));
static ref MERGE_INTO_DISTRIBUTED_APPLY_ROWNUMBER: Counter =
register_counter(key!("merge_into_distributed_apply_row_number"));
static ref MERGE_INTO_ACCUMULATE_MILLISECONDS: Histogram =
register_histogram_in_milliseconds(key!("merge_into_accumulate_milliseconds"));
static ref MERGE_INTO_APPLY_MILLISECONDS: Histogram =
Expand Down Expand Up @@ -73,6 +97,50 @@ pub fn metrics_inc_merge_into_append_blocks_counter(c: u32) {
MERGE_INTO_APPEND_BLOCKS_COUNTER.inc_by(c as u64);
}

pub fn merge_into_distributed_deduplicate_row_number(c: u32) {
MERGE_INTO_DISTRIBUTED_DEDUPLICATE_ROWNUMBER.inc_by(c as u64);
}

pub fn merge_into_distributed_empty_row_number(c: u32) {
MERGE_INTO_DISTRIBUTED_EMPTY_ROWNUMBER.inc_by(c as u64);
}

pub fn merge_into_distributed_apply_row_number(c: u32) {
MERGE_INTO_DISTRIBUTED_APPLY_ROWNUMBER.inc_by(c as u64);
}

pub fn merge_into_distributed_hashtable_fetch_row_number(c: u32) {
MERGE_INTO_DISTRIBUTED_HASHTABLE_FETCH_ROWNUMBER.inc_by(c as u64);
}

pub fn merge_into_distributed_hashtable_empty_block(c: u32) {
MERGE_INTO_DISTRIBUTED_HASHTABLE_EMPTY_BLOCK.inc_by(c as u64);
}

pub fn merge_into_distributed_generate_row_numbers(c: u32) {
MERGE_INTO_DISTRIBUTED_GENERATE_ROW_NUMBERS.inc_by(c as u64);
}

pub fn merge_into_distributed_init_unique_number(c: u32) {
MERGE_INTO_DISTRIBUTED_INIT_UNIQUE_NUMBER.inc_by(c as u64);
}

pub fn merge_into_distributed_new_set_len(c: u32) {
MERGE_INTO_DISTRIBUTED_NEW_SET_LEN.inc_by(c as u64);
}

pub fn merge_into_distributed_hashtable_push_empty_null_block(c: u32) {
MERGE_INTO_DISTRIBUTED_HASHTABLE_PUSH_EMPTY_NULL_BLOCK.inc_by(c as u64);
}

pub fn merge_into_distributed_hashtable_push_null_block(c: u32) {
MERGE_INTO_DISTRIBUTED_HASHTABLE_PUSH_NULL_BLOCK.inc_by(c as u64);
}

pub fn merge_into_distributed_hashtable_push_null_block_rows(c: u32) {
MERGE_INTO_DISTRIBUTED_HASHTABLE_PUSH_NULL_BLOCK_ROWS.inc_by(c as u64);
}

pub fn metrics_inc_merge_into_append_blocks_rows_counter(c: u32) {
MERGE_INTO_APPEND_BLOCKS_ROWS_COUNTER.inc_by(c as u64);
}
Expand Down
15 changes: 15 additions & 0 deletions src/query/service/src/pipelines/builders/builder_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use common_storages_fuse::operations::merge_into::TransformAddRowNumberColumnPro
use common_storages_fuse::operations::TransformSerializeBlock;
use common_storages_fuse::FuseTable;

use crate::pipelines::processors::transforms::AccumulateRowNumber;
use crate::pipelines::processors::transforms::ExtractHashTableByRowNumber;
use crate::pipelines::processors::transforms::TransformAddComputedColumns;
use crate::pipelines::processors::DeduplicateRowNumber;
Expand Down Expand Up @@ -638,6 +639,20 @@ impl PipelineBuilder {
pipe_items,
));

// accumulate row_number
if *distributed {
let pipe_items = vec![
create_dummy_item(),
create_dummy_item(),
AccumulateRowNumber::create()?.into_pipe_item(),
];
self.main_pipeline.add_pipe(Pipe::create(
self.main_pipeline.output_len(),
get_output_len(&pipe_items),
pipe_items,
));
}

Ok(())
}
}
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/processors/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod aggregator;
pub mod group_by;
pub(crate) mod hash_join;
mod metrics;
mod processor_accumulate_row_number;
mod processor_deduplicate_row_number;
mod processor_extract_hash_table_by_row_number;
pub(crate) mod range_join;
Expand Down Expand Up @@ -59,6 +60,7 @@ use common_pipeline_transforms::processors::transforms::transform_sort_partial;
pub use hash_join::FixedKeyHashJoinHashTable;
pub use hash_join::HashJoinDesc;
pub use hash_join::HashJoinState;
pub use processor_accumulate_row_number::AccumulateRowNumber;
pub use processor_deduplicate_row_number::DeduplicateRowNumber;
pub use processor_extract_hash_table_by_row_number::ExtractHashTableByRowNumber;
pub use range_join::RangeJoinState;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_exception::Result;
use common_expression::types::DataType;
use common_expression::types::NumberDataType;
use common_expression::DataBlock;
use common_pipeline_core::pipe::PipeItem;
use common_pipeline_core::processors::port::InputPort;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransform;
use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer;

pub struct AccumulateRowNumber {
data_blocks: Vec<DataBlock>,
}

#[async_trait::async_trait]
impl AsyncAccumulatingTransform for AccumulateRowNumber {
const NAME: &'static str = "AccumulateRowNumber";

#[async_backtrace::framed]
async fn transform(&mut self, data: DataBlock) -> Result<Option<DataBlock>> {
self.accumulate(data).await?;
// no partial output
Ok(None)
}

#[async_backtrace::framed]
async fn on_finish(&mut self, _output: bool) -> Result<Option<DataBlock>> {
self.apply().await
}
}

impl AccumulateRowNumber {
#[async_backtrace::framed]
pub async fn accumulate(&mut self, data_block: DataBlock) -> Result<()> {
// if matched all source data, we will get an empty block, but which
// has source join schema,not only row_number,for combound_block project,
// it will do nothing for empty block.
if !data_block.is_empty() {
assert_eq!(data_block.num_columns(), 1);
assert_eq!(
data_block.get_by_offset(0).data_type,
DataType::Number(NumberDataType::UInt64)
);
}

self.data_blocks.push(data_block);
Ok(())
}

#[async_backtrace::framed]
pub async fn apply(&mut self) -> Result<Option<DataBlock>> {
// row_numbers is small, so concat is ok.
Ok(Some(DataBlock::concat(&self.data_blocks)?))
}
}

impl AccumulateRowNumber {
pub fn create() -> Result<Self> {
Ok(Self {
data_blocks: Vec::with_capacity(10),
})
}

pub fn into_pipe_item(self) -> PipeItem {
let input = InputPort::create();
let output = OutputPort::create();
let processor_ptr =
AsyncAccumulatingTransformer::create(input.clone(), output.clone(), self);
PipeItem::create(ProcessorPtr::create(processor_ptr), vec![input], vec![
output,
])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::collections::HashSet;

use common_arrow::arrow::buffer::Buffer;
use common_exception::Result;
use common_expression::types::DataType;
use common_expression::types::NumberDataType;
use common_expression::types::UInt64Type;
use common_expression::DataBlock;
use common_expression::FromData;
Expand All @@ -25,7 +27,13 @@ use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransform;
use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer;
use common_storage::metrics::merge_into::merge_into_distributed_apply_row_number;
use common_storage::metrics::merge_into::merge_into_distributed_deduplicate_row_number;
use common_storage::metrics::merge_into::merge_into_distributed_empty_row_number;
use common_storage::metrics::merge_into::merge_into_distributed_init_unique_number;
use common_storage::metrics::merge_into::merge_into_distributed_new_set_len;
use itertools::Itertools;
use log::info;

pub struct DeduplicateRowNumber {
unique_row_number: HashSet<u64>,
Expand Down Expand Up @@ -61,15 +69,21 @@ impl DeduplicateRowNumber {
// but if there is still also some data unmatched, we won't receive
// an empty block.
if data_block.is_empty() {
merge_into_distributed_empty_row_number(1);
self.unique_row_number.clear();
self.accepted_data = true;
return Ok(());
}

let row_number_vec = get_row_number(&data_block, 0);

merge_into_distributed_deduplicate_row_number(data_block.num_rows() as u32);
if !self.accepted_data {
self.unique_row_number = row_number_vec.into_iter().collect();
merge_into_distributed_init_unique_number(self.unique_row_number.len() as u32);
info!(
"init unique_row_number_len:{}",
self.unique_row_number.len(),
);
self.accepted_data = true;
return Ok(());
}
Expand All @@ -80,13 +94,16 @@ impl DeduplicateRowNumber {
new_set.insert(number);
}
}
merge_into_distributed_new_set_len(new_set.len() as u32);
info!("init new_set_len:{}", new_set.len());
self.unique_row_number = new_set;
Ok(())
}

#[async_backtrace::framed]
pub async fn apply(&mut self) -> Result<Option<DataBlock>> {
let row_number_vecs = self.unique_row_number.clone().into_iter().collect_vec();
merge_into_distributed_apply_row_number(row_number_vecs.len() as u32);
Ok(Some(DataBlock::new_from_columns(vec![
UInt64Type::from_data(row_number_vecs),
])))
Expand All @@ -95,6 +112,10 @@ impl DeduplicateRowNumber {

pub(crate) fn get_row_number(data_block: &DataBlock, row_number_idx: usize) -> Buffer<u64> {
let row_number_col = data_block.get_by_offset(row_number_idx);
assert_eq!(
row_number_col.data_type,
DataType::Number(NumberDataType::UInt64)
);
let value = row_number_col.value.try_downcast::<UInt64Type>().unwrap();
match value {
common_expression::Value::Scalar(scalar) => Buffer::from(vec![scalar]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@ use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::Event;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_core::processors::Processor;
use common_storage::metrics::merge_into::merge_into_distributed_hashtable_empty_block;
use common_storage::metrics::merge_into::merge_into_distributed_hashtable_fetch_row_number;
use common_storage::metrics::merge_into::merge_into_distributed_hashtable_push_empty_null_block;
use common_storage::metrics::merge_into::merge_into_distributed_hashtable_push_null_block;
use common_storage::metrics::merge_into::merge_into_distributed_hashtable_push_null_block_rows;

use super::hash_join::HashJoinBuildState;
use super::processor_deduplicate_row_number::get_row_number;

pub struct ExtractHashTableByRowNumber {
input_port: Arc<InputPort>,
output_port: Arc<OutputPort>,
Expand Down Expand Up @@ -112,9 +116,11 @@ impl Processor for ExtractHashTableByRowNumber {
fn process(&mut self) -> Result<()> {
if let Some(data_block) = self.input_data.take() {
if data_block.is_empty() {
merge_into_distributed_hashtable_empty_block(1);
return Ok(());
}

merge_into_distributed_hashtable_fetch_row_number(data_block.num_rows() as u32);
let row_number_vec = get_row_number(&data_block, 0);
let length = row_number_vec.len();
let row_number_set: HashSet<u64> = row_number_vec.into_iter().collect();
Expand Down Expand Up @@ -148,6 +154,14 @@ impl Processor for ExtractHashTableByRowNumber {
filtered_block.num_rows(),
);
null_block.merge_block(filtered_block);
if null_block.is_empty() {
merge_into_distributed_hashtable_push_empty_null_block(1);
} else {
merge_into_distributed_hashtable_push_null_block(1);
merge_into_distributed_hashtable_push_null_block_rows(
null_block.num_rows() as u32,
);
}
self.output_data.push(null_block);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_transforms::processors::transforms::Transform;
use common_pipeline_transforms::processors::transforms::Transformer;
use common_storage::metrics::merge_into::merge_into_distributed_generate_row_numbers;

const PREFIX_OFFSET: usize = 48;

Expand Down Expand Up @@ -74,11 +75,13 @@ impl Transform for TransformAddRowNumberColumnProcessor {
for number in row_number..row_number + num_rows {
row_numbers.push(number);
}
merge_into_distributed_generate_row_numbers(row_numbers.len() as u32);
let mut data_block = data;
let row_number_entry = BlockEntry::new(
DataType::Number(NumberDataType::UInt64),
Value::Column(UInt64Type::from_data(row_numbers)),
);

data_block.add_column(row_number_entry);
Ok(data_block)
}
Expand Down