Skip to content

Commit

Permalink
feat: add distributed merge into metrics (#13594)
Browse files Browse the repository at this point in the history
* fix conflict

* add more metrics

* add more metrics

* add more metrics

* add assert

* accumulate row_number

* fix check

* add pipe

* macthed all data

* macthed all data

* macthed all data
  • Loading branch information
JackTan25 authored Nov 6, 2023
1 parent db6728a commit 10f44fe
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 2 deletions.
68 changes: 68 additions & 0 deletions src/common/storage/src/metrics/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,35 @@ lazy_static! {
register_counter(key!("merge_into_deleted_blocks_rows_counter"));
static ref MERGE_INTO_APPEND_BLOCKS_COUNTER: Counter =
register_counter(key!("merge_into_append_blocks_counter"));
static ref MERGE_INTO_DISTRIBUTED_HASHTABLE_FETCH_ROWNUMBER: Counter =
register_counter(key!("merge_into_distributed_hashtable_fetch_row_number"));
static ref MERGE_INTO_DISTRIBUTED_HASHTABLE_EMPTY_BLOCK: Counter =
register_counter(key!("merge_into_distributed_hashtable_empty_block"));
static ref MERGE_INTO_DISTRIBUTED_GENERATE_ROW_NUMBERS: Counter =
register_counter(key!("merge_into_distributed_generate_row_numbers"));
static ref MERGE_INTO_DISTRIBUTED_INIT_UNIQUE_NUMBER: Counter =
register_counter(key!("merge_into_distributed_init_unique_number"));
static ref MERGE_INTO_DISTRIBUTED_NEW_SET_LEN: Counter =
register_counter(key!("merge_into_distributed_new_set_len"));
static ref MERGE_INTO_DISTRIBUTED_HASHTABLE_PUSH_EMPTY_NULL_BLOCK: Counter = register_counter(
key!("merge_into_distributed_hashtable_push_empty_null_block")
);
static ref MERGE_INTO_DISTRIBUTED_HASHTABLE_PUSH_NULL_BLOCK: Counter =
register_counter(key!("merge_into_distributed_hashtable_push_null_block"));
static ref MERGE_INTO_DISTRIBUTED_HASHTABLE_PUSH_NULL_BLOCK_ROWS: Counter = register_counter(
key!("merge_into_distributed_hashtable_push_null_block_rows")
);
static ref MERGE_INTO_APPEND_BLOCKS_ROWS_COUNTER: Counter =
register_counter(key!("merge_into_append_blocks_rows_counter"));
static ref MERGE_INTO_MATCHED_ROWS: Counter = register_counter(key!("merge_into_matched_rows"));
static ref MERGE_INTO_UNMATCHED_ROWS: Counter =
register_counter(key!("merge_into_unmatched_rows"));
static ref MERGE_INTO_DISTRIBUTED_DEDUPLICATE_ROWNUMBER: Counter =
register_counter(key!("merge_into_distributed_deduplicate_row_number"));
static ref MERGE_INTO_DISTRIBUTED_EMPTY_ROWNUMBER: Counter =
register_counter(key!("merge_into_distributed_empty_row_number"));
static ref MERGE_INTO_DISTRIBUTED_APPLY_ROWNUMBER: Counter =
register_counter(key!("merge_into_distributed_apply_row_number"));
static ref MERGE_INTO_ACCUMULATE_MILLISECONDS: Histogram =
register_histogram_in_milliseconds(key!("merge_into_accumulate_milliseconds"));
static ref MERGE_INTO_APPLY_MILLISECONDS: Histogram =
Expand Down Expand Up @@ -73,6 +97,50 @@ pub fn metrics_inc_merge_into_append_blocks_counter(c: u32) {
MERGE_INTO_APPEND_BLOCKS_COUNTER.inc_by(c as u64);
}

pub fn merge_into_distributed_deduplicate_row_number(c: u32) {
MERGE_INTO_DISTRIBUTED_DEDUPLICATE_ROWNUMBER.inc_by(c as u64);
}

pub fn merge_into_distributed_empty_row_number(c: u32) {
MERGE_INTO_DISTRIBUTED_EMPTY_ROWNUMBER.inc_by(c as u64);
}

pub fn merge_into_distributed_apply_row_number(c: u32) {
MERGE_INTO_DISTRIBUTED_APPLY_ROWNUMBER.inc_by(c as u64);
}

pub fn merge_into_distributed_hashtable_fetch_row_number(c: u32) {
MERGE_INTO_DISTRIBUTED_HASHTABLE_FETCH_ROWNUMBER.inc_by(c as u64);
}

pub fn merge_into_distributed_hashtable_empty_block(c: u32) {
MERGE_INTO_DISTRIBUTED_HASHTABLE_EMPTY_BLOCK.inc_by(c as u64);
}

pub fn merge_into_distributed_generate_row_numbers(c: u32) {
MERGE_INTO_DISTRIBUTED_GENERATE_ROW_NUMBERS.inc_by(c as u64);
}

pub fn merge_into_distributed_init_unique_number(c: u32) {
MERGE_INTO_DISTRIBUTED_INIT_UNIQUE_NUMBER.inc_by(c as u64);
}

pub fn merge_into_distributed_new_set_len(c: u32) {
MERGE_INTO_DISTRIBUTED_NEW_SET_LEN.inc_by(c as u64);
}

pub fn merge_into_distributed_hashtable_push_empty_null_block(c: u32) {
MERGE_INTO_DISTRIBUTED_HASHTABLE_PUSH_EMPTY_NULL_BLOCK.inc_by(c as u64);
}

pub fn merge_into_distributed_hashtable_push_null_block(c: u32) {
MERGE_INTO_DISTRIBUTED_HASHTABLE_PUSH_NULL_BLOCK.inc_by(c as u64);
}

pub fn merge_into_distributed_hashtable_push_null_block_rows(c: u32) {
MERGE_INTO_DISTRIBUTED_HASHTABLE_PUSH_NULL_BLOCK_ROWS.inc_by(c as u64);
}

pub fn metrics_inc_merge_into_append_blocks_rows_counter(c: u32) {
MERGE_INTO_APPEND_BLOCKS_ROWS_COUNTER.inc_by(c as u64);
}
Expand Down
15 changes: 15 additions & 0 deletions src/query/service/src/pipelines/builders/builder_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use common_storages_fuse::operations::merge_into::TransformAddRowNumberColumnPro
use common_storages_fuse::operations::TransformSerializeBlock;
use common_storages_fuse::FuseTable;

use crate::pipelines::processors::transforms::AccumulateRowNumber;
use crate::pipelines::processors::transforms::ExtractHashTableByRowNumber;
use crate::pipelines::processors::transforms::TransformAddComputedColumns;
use crate::pipelines::processors::DeduplicateRowNumber;
Expand Down Expand Up @@ -638,6 +639,20 @@ impl PipelineBuilder {
pipe_items,
));

// accumulate row_number
if *distributed {
let pipe_items = vec![
create_dummy_item(),
create_dummy_item(),
AccumulateRowNumber::create()?.into_pipe_item(),
];
self.main_pipeline.add_pipe(Pipe::create(
self.main_pipeline.output_len(),
get_output_len(&pipe_items),
pipe_items,
));
}

Ok(())
}
}
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/processors/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod aggregator;
pub mod group_by;
pub(crate) mod hash_join;
mod metrics;
mod processor_accumulate_row_number;
mod processor_deduplicate_row_number;
mod processor_extract_hash_table_by_row_number;
pub(crate) mod range_join;
Expand Down Expand Up @@ -59,6 +60,7 @@ use common_pipeline_transforms::processors::transforms::transform_sort_partial;
pub use hash_join::FixedKeyHashJoinHashTable;
pub use hash_join::HashJoinDesc;
pub use hash_join::HashJoinState;
pub use processor_accumulate_row_number::AccumulateRowNumber;
pub use processor_deduplicate_row_number::DeduplicateRowNumber;
pub use processor_extract_hash_table_by_row_number::ExtractHashTableByRowNumber;
pub use range_join::RangeJoinState;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_exception::Result;
use common_expression::types::DataType;
use common_expression::types::NumberDataType;
use common_expression::DataBlock;
use common_pipeline_core::pipe::PipeItem;
use common_pipeline_core::processors::port::InputPort;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransform;
use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer;

pub struct AccumulateRowNumber {
data_blocks: Vec<DataBlock>,
}

#[async_trait::async_trait]
impl AsyncAccumulatingTransform for AccumulateRowNumber {
const NAME: &'static str = "AccumulateRowNumber";

#[async_backtrace::framed]
async fn transform(&mut self, data: DataBlock) -> Result<Option<DataBlock>> {
self.accumulate(data).await?;
// no partial output
Ok(None)
}

#[async_backtrace::framed]
async fn on_finish(&mut self, _output: bool) -> Result<Option<DataBlock>> {
self.apply().await
}
}

impl AccumulateRowNumber {
#[async_backtrace::framed]
pub async fn accumulate(&mut self, data_block: DataBlock) -> Result<()> {
// if matched all source data, we will get an empty block, but which
// has source join schema,not only row_number,for combound_block project,
// it will do nothing for empty block.
if !data_block.is_empty() {
assert_eq!(data_block.num_columns(), 1);
assert_eq!(
data_block.get_by_offset(0).data_type,
DataType::Number(NumberDataType::UInt64)
);
}

self.data_blocks.push(data_block);
Ok(())
}

#[async_backtrace::framed]
pub async fn apply(&mut self) -> Result<Option<DataBlock>> {
// row_numbers is small, so concat is ok.
Ok(Some(DataBlock::concat(&self.data_blocks)?))
}
}

impl AccumulateRowNumber {
pub fn create() -> Result<Self> {
Ok(Self {
data_blocks: Vec::with_capacity(10),
})
}

pub fn into_pipe_item(self) -> PipeItem {
let input = InputPort::create();
let output = OutputPort::create();
let processor_ptr =
AsyncAccumulatingTransformer::create(input.clone(), output.clone(), self);
PipeItem::create(ProcessorPtr::create(processor_ptr), vec![input], vec![
output,
])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::collections::HashSet;

use common_arrow::arrow::buffer::Buffer;
use common_exception::Result;
use common_expression::types::DataType;
use common_expression::types::NumberDataType;
use common_expression::types::UInt64Type;
use common_expression::DataBlock;
use common_expression::FromData;
Expand All @@ -25,7 +27,13 @@ use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransform;
use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer;
use common_storage::metrics::merge_into::merge_into_distributed_apply_row_number;
use common_storage::metrics::merge_into::merge_into_distributed_deduplicate_row_number;
use common_storage::metrics::merge_into::merge_into_distributed_empty_row_number;
use common_storage::metrics::merge_into::merge_into_distributed_init_unique_number;
use common_storage::metrics::merge_into::merge_into_distributed_new_set_len;
use itertools::Itertools;
use log::info;

pub struct DeduplicateRowNumber {
unique_row_number: HashSet<u64>,
Expand Down Expand Up @@ -61,15 +69,21 @@ impl DeduplicateRowNumber {
// but if there is still also some data unmatched, we won't receive
// an empty block.
if data_block.is_empty() {
merge_into_distributed_empty_row_number(1);
self.unique_row_number.clear();
self.accepted_data = true;
return Ok(());
}

let row_number_vec = get_row_number(&data_block, 0);

merge_into_distributed_deduplicate_row_number(data_block.num_rows() as u32);
if !self.accepted_data {
self.unique_row_number = row_number_vec.into_iter().collect();
merge_into_distributed_init_unique_number(self.unique_row_number.len() as u32);
info!(
"init unique_row_number_len:{}",
self.unique_row_number.len(),
);
self.accepted_data = true;
return Ok(());
}
Expand All @@ -80,13 +94,16 @@ impl DeduplicateRowNumber {
new_set.insert(number);
}
}
merge_into_distributed_new_set_len(new_set.len() as u32);
info!("init new_set_len:{}", new_set.len());
self.unique_row_number = new_set;
Ok(())
}

#[async_backtrace::framed]
pub async fn apply(&mut self) -> Result<Option<DataBlock>> {
let row_number_vecs = self.unique_row_number.clone().into_iter().collect_vec();
merge_into_distributed_apply_row_number(row_number_vecs.len() as u32);
Ok(Some(DataBlock::new_from_columns(vec![
UInt64Type::from_data(row_number_vecs),
])))
Expand All @@ -95,6 +112,10 @@ impl DeduplicateRowNumber {

pub(crate) fn get_row_number(data_block: &DataBlock, row_number_idx: usize) -> Buffer<u64> {
let row_number_col = data_block.get_by_offset(row_number_idx);
assert_eq!(
row_number_col.data_type,
DataType::Number(NumberDataType::UInt64)
);
let value = row_number_col.value.try_downcast::<UInt64Type>().unwrap();
match value {
common_expression::Value::Scalar(scalar) => Buffer::from(vec![scalar]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@ use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::Event;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_core::processors::Processor;
use common_storage::metrics::merge_into::merge_into_distributed_hashtable_empty_block;
use common_storage::metrics::merge_into::merge_into_distributed_hashtable_fetch_row_number;
use common_storage::metrics::merge_into::merge_into_distributed_hashtable_push_empty_null_block;
use common_storage::metrics::merge_into::merge_into_distributed_hashtable_push_null_block;
use common_storage::metrics::merge_into::merge_into_distributed_hashtable_push_null_block_rows;

use super::hash_join::HashJoinBuildState;
use super::processor_deduplicate_row_number::get_row_number;

pub struct ExtractHashTableByRowNumber {
input_port: Arc<InputPort>,
output_port: Arc<OutputPort>,
Expand Down Expand Up @@ -112,9 +116,11 @@ impl Processor for ExtractHashTableByRowNumber {
fn process(&mut self) -> Result<()> {
if let Some(data_block) = self.input_data.take() {
if data_block.is_empty() {
merge_into_distributed_hashtable_empty_block(1);
return Ok(());
}

merge_into_distributed_hashtable_fetch_row_number(data_block.num_rows() as u32);
let row_number_vec = get_row_number(&data_block, 0);
let length = row_number_vec.len();
let row_number_set: HashSet<u64> = row_number_vec.into_iter().collect();
Expand Down Expand Up @@ -148,6 +154,14 @@ impl Processor for ExtractHashTableByRowNumber {
filtered_block.num_rows(),
);
null_block.merge_block(filtered_block);
if null_block.is_empty() {
merge_into_distributed_hashtable_push_empty_null_block(1);
} else {
merge_into_distributed_hashtable_push_null_block(1);
merge_into_distributed_hashtable_push_null_block_rows(
null_block.num_rows() as u32,
);
}
self.output_data.push(null_block);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_transforms::processors::transforms::Transform;
use common_pipeline_transforms::processors::transforms::Transformer;
use common_storage::metrics::merge_into::merge_into_distributed_generate_row_numbers;

const PREFIX_OFFSET: usize = 48;

Expand Down Expand Up @@ -74,11 +75,13 @@ impl Transform for TransformAddRowNumberColumnProcessor {
for number in row_number..row_number + num_rows {
row_numbers.push(number);
}
merge_into_distributed_generate_row_numbers(row_numbers.len() as u32);
let mut data_block = data;
let row_number_entry = BlockEntry::new(
DataType::Number(NumberDataType::UInt64),
Value::Column(UInt64Type::from_data(row_numbers)),
);

data_block.add_column(row_number_entry);
Ok(data_block)
}
Expand Down

0 comments on commit 10f44fe

Please sign in to comment.