Skip to content

Commit

Permalink
fix rowid split for new distributed execution
Browse files Browse the repository at this point in the history
  • Loading branch information
JackTan25 committed Dec 15, 2023
1 parent a4b2d27 commit df155ee
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 23 deletions.
20 changes: 15 additions & 5 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ use common_exception::Result;
use common_expression::types::UInt32Type;
use common_expression::ConstantFolder;
use common_expression::DataBlock;
use common_expression::DataField;
use common_expression::DataSchema;
use common_expression::DataSchemaRef;
use common_expression::FieldIndex;
use common_expression::FromData;
use common_expression::RemoteExpr;
use common_expression::SendableDataBlockStream;
use common_expression::ROW_ID_COL_NAME;
use common_expression::ROW_NUMBER_COL_NAME;
use common_functions::BUILTIN_FUNCTIONS;
use common_meta_app::schema::TableInfo;
Expand Down Expand Up @@ -227,7 +229,7 @@ impl MergeIntoInterpreter {
}
}

if *distributed {
if *distributed && !*change_join_order {
row_number_idx = Some(join_output_schema.index_of(ROW_NUMBER_COL_NAME)?);
}

Expand All @@ -238,7 +240,7 @@ impl MergeIntoInterpreter {
));
}

if *distributed && row_number_idx.is_none() {
if *distributed && row_number_idx.is_none() && !*change_join_order {
return Err(ErrorCode::InvalidRowIdIndex(
"can't get internal row_number_idx when running merge into",
));
Expand Down Expand Up @@ -412,9 +414,17 @@ impl MergeIntoInterpreter {
row_id_idx,
segments: segments.clone(),
distributed: true,
output_schema: DataSchemaRef::new(DataSchema::new(vec![
join_output_schema.fields[row_number_idx.unwrap()].clone(),
])),
output_schema: match *change_join_order {
false => DataSchemaRef::new(DataSchema::new(vec![
join_output_schema.fields[row_number_idx.unwrap()].clone(),
])),
true => DataSchemaRef::new(DataSchema::new(vec![DataField::new(
ROW_ID_COL_NAME,
common_expression::types::DataType::Number(
common_expression::types::NumberDataType::UInt64,
),
)])),
},
merge_type: merge_type.clone(),
change_join_order: *change_join_order,
}));
Expand Down
24 changes: 24 additions & 0 deletions src/query/service/src/pipelines/builders/builder_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ use common_storages_fuse::operations::MergeIntoNotMatchedProcessor;
use common_storages_fuse::operations::MergeIntoSplitProcessor;
use common_storages_fuse::operations::RowNumberAndLogSplitProcessor;
use common_storages_fuse::operations::TransformAddRowNumberColumnProcessor;
use common_storages_fuse::operations::TransformDistributedMergeIntoBlockDeserialize;
use common_storages_fuse::operations::TransformDistributedMergeIntoBlockSerialize;
use common_storages_fuse::operations::TransformSerializeBlock;
use common_storages_fuse::FuseTable;

Expand Down Expand Up @@ -117,6 +119,17 @@ impl PipelineBuilder {
self.build_pipeline(input)?;
self.main_pipeline.try_resize(1)?;

// deserialize MixRowIdKindAndLog
if *change_join_order {
self.main_pipeline
.add_transform(|transform_input_port, transform_output_port| {
Ok(TransformDistributedMergeIntoBlockDeserialize::create(
transform_input_port,
transform_output_port,
))
})?;
}

let tbl = self
.ctx
.build_table_by_table_info(catalog_info, table_info, None)?;
Expand Down Expand Up @@ -878,6 +891,17 @@ impl PipelineBuilder {
));
}

// add distributed_merge_into_block_serialize
// we will wrap rowid and log as MixRowIdKindAndLog
if *distributed && *change_join_order {
self.main_pipeline
.add_transform(|transform_input_port, transform_output_port| {
Ok(TransformDistributedMergeIntoBlockSerialize::create(
transform_input_port,
transform_output_port,
))
})?;
}
Ok(())
}
}
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/operations/merge_into/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub use mutator::MatchedAggregator;
pub use processors::MatchedSplitProcessor;
pub use processors::MergeIntoNotMatchedProcessor;
pub use processors::MergeIntoSplitProcessor;
pub use processors::MixRowNumberKindAndLog;
pub use processors::MixRowIdKindAndLog;
pub use processors::RowNumberAndLogSplitProcessor;
pub use processors::SourceFullMatched;
pub use processors::TransformAddRowNumberColumnProcessor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod transform_matched_mutation_aggregator;
pub use processor_distributed_merge_into_block_deserialize::TransformDistributedMergeIntoBlockDeserialize;
pub use processor_distributed_merge_into_block_serialize::TransformDistributedMergeIntoBlockSerialize;
pub use processor_merge_into_matched_and_split::MatchedSplitProcessor;
pub use processor_merge_into_matched_and_split::MixRowNumberKindAndLog;
pub use processor_merge_into_matched_and_split::MixRowIdKindAndLog;
pub(crate) use processor_merge_into_matched_and_split::RowIdKind;
pub use processor_merge_into_matched_and_split::SourceFullMatched;
pub use processor_merge_into_not_matched::MergeIntoNotMatchedProcessor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use common_pipeline_core::PipeItem;
use common_pipeline_transforms::processors::Transform;
use common_pipeline_transforms::processors::Transformer;

use super::processor_merge_into_matched_and_split::MixRowNumberKindAndLog;
use super::processor_merge_into_matched_and_split::MixRowIdKindAndLog;
use super::RowIdKind;

// It will receive MutationLogs Or RowIds.
Expand All @@ -37,7 +37,7 @@ pub struct TransformDistributedMergeIntoBlockDeserialize;

/// this processor will be used in the future for merge into based on shuffle hash join.
impl TransformDistributedMergeIntoBlockDeserialize {
fn create(input: Arc<InputPort>, output: Arc<OutputPort>) -> ProcessorPtr {
pub fn create(input: Arc<InputPort>, output: Arc<OutputPort>) -> ProcessorPtr {
ProcessorPtr::create(Transformer::create(
input,
output,
Expand Down Expand Up @@ -66,7 +66,7 @@ impl Transform for TransformDistributedMergeIntoBlockDeserialize {
const NAME: &'static str = "TransformDistributedMergeIntoBlockDeserialize";

fn transform(&mut self, data: DataBlock) -> Result<DataBlock> {
let mix_kind = MixRowNumberKindAndLog::downcast_ref_from(data.get_meta().unwrap()).unwrap();
let mix_kind = MixRowIdKindAndLog::downcast_ref_from(data.get_meta().unwrap()).unwrap();
match mix_kind.kind {
0 => Ok(DataBlock::new_with_meta(
data.columns().to_vec(),
Expand All @@ -84,7 +84,7 @@ impl Transform for TransformDistributedMergeIntoBlockDeserialize {
data.num_rows(),
Some(Box::new(RowIdKind::Delete)),
)),
_ => Err(ErrorCode::BadBytes("get error MixRowNumberKindAndLog kind")),
_ => Err(ErrorCode::BadBytes("get error MixRowIdKindAndLog kind")),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use common_pipeline_core::PipeItem;
use common_pipeline_transforms::processors::Transform;
use common_pipeline_transforms::processors::Transformer;

use super::processor_merge_into_matched_and_split::MixRowNumberKindAndLog;
use super::processor_merge_into_matched_and_split::MixRowIdKindAndLog;
use super::RowIdKind;
use crate::operations::common::MutationLogs;

Expand All @@ -42,7 +42,7 @@ pub struct TransformDistributedMergeIntoBlockSerialize;

/// this processor will be used in the future for merge into based on shuffle hash join.
impl TransformDistributedMergeIntoBlockSerialize {
fn create(input: Arc<InputPort>, output: Arc<OutputPort>) -> ProcessorPtr {
pub fn create(input: Arc<InputPort>, output: Arc<OutputPort>) -> ProcessorPtr {
ProcessorPtr::create(Transformer::create(
input,
output,
Expand Down Expand Up @@ -79,7 +79,7 @@ impl Transform for TransformDistributedMergeIntoBlockSerialize {
Ok(DataBlock::new_with_meta(
vec![entry],
1,
Some(Box::new(MixRowNumberKindAndLog {
Some(Box::new(MixRowIdKindAndLog {
log: Some(log),
kind: 0,
})),
Expand All @@ -90,7 +90,7 @@ impl Transform for TransformDistributedMergeIntoBlockSerialize {
Ok(DataBlock::new_with_meta(
data.columns().to_vec(),
data.num_rows(),
Some(Box::new(MixRowNumberKindAndLog {
Some(Box::new(MixRowIdKindAndLog {
log: None,
kind: match row_id_kind {
RowIdKind::Update => 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,17 @@ enum MutationKind {
// if we use hash shuffle join strategy, the enum
// type can't be parser when transform data between nodes.
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub struct MixRowNumberKindAndLog {
pub struct MixRowIdKindAndLog {
pub log: Option<MutationLogs>,
// kind's range is [0,1,2], 0 stands for log
// 1 stands for row_id_update, 2 stands for row_id_delete,
pub kind: usize,
}

#[typetag::serde(name = "mix_row_id_kind_and_log")]
impl BlockMetaInfo for MixRowNumberKindAndLog {
impl BlockMetaInfo for MixRowIdKindAndLog {
fn equals(&self, info: &Box<dyn BlockMetaInfo>) -> bool {
MixRowNumberKindAndLog::downcast_ref_from(info).is_some_and(|other| self == other)
MixRowIdKindAndLog::downcast_ref_from(info).is_some_and(|other| self == other)
}

fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use common_pipeline_core::Pipe;
use common_pipeline_core::PipeItem;

use super::processor_merge_into_matched_and_split::SourceFullMatched;
use crate::operations::merge_into::processors::RowIdKind;

pub struct RowNumberAndLogSplitProcessor {
input_port: Arc<InputPort>,
Expand Down Expand Up @@ -133,19 +134,18 @@ impl Processor for RowNumberAndLogSplitProcessor {

fn process(&mut self) -> Result<()> {
if let Some(data_block) = self.input_data.take() {
// all matched or logs
// if it's rowid, the meta will be none
if data_block.get_meta().is_some() {
if SourceFullMatched::downcast_ref_from(data_block.get_meta().unwrap()).is_some() {
// distributed mode: source as build side
self.output_data_row_number = Some(data_block)
} else if RowIdKind::downcast_ref_from(data_block.get_meta().unwrap()).is_some() {
// distributed mode: target as build side
self.output_data_row_number = Some(data_block)
} else {
// mutation logs
self.output_data_log = Some(data_block);
}
} else {
// when we use source as probe side and do distributed
// execution,it could be rowid but we use output_data_row_number.
// it doesn't matter.
self.output_data_row_number = Some(data_block)
}
}
Expand Down

0 comments on commit df155ee

Please sign in to comment.