diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index d37b07e9d97f3..ba3609d438874 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -23,12 +23,14 @@ use common_exception::Result; use common_expression::types::UInt32Type; use common_expression::ConstantFolder; use common_expression::DataBlock; +use common_expression::DataField; use common_expression::DataSchema; use common_expression::DataSchemaRef; use common_expression::FieldIndex; use common_expression::FromData; use common_expression::RemoteExpr; use common_expression::SendableDataBlockStream; +use common_expression::ROW_ID_COL_NAME; use common_expression::ROW_NUMBER_COL_NAME; use common_functions::BUILTIN_FUNCTIONS; use common_meta_app::schema::TableInfo; @@ -227,7 +229,7 @@ impl MergeIntoInterpreter { } } - if *distributed { + if *distributed && !*change_join_order { row_number_idx = Some(join_output_schema.index_of(ROW_NUMBER_COL_NAME)?); } @@ -238,7 +240,7 @@ impl MergeIntoInterpreter { )); } - if *distributed && row_number_idx.is_none() { + if *distributed && row_number_idx.is_none() && !*change_join_order { return Err(ErrorCode::InvalidRowIdIndex( "can't get internal row_number_idx when running merge into", )); @@ -412,9 +414,17 @@ impl MergeIntoInterpreter { row_id_idx, segments: segments.clone(), distributed: true, - output_schema: DataSchemaRef::new(DataSchema::new(vec![ - join_output_schema.fields[row_number_idx.unwrap()].clone(), - ])), + output_schema: match *change_join_order { + false => DataSchemaRef::new(DataSchema::new(vec![ + join_output_schema.fields[row_number_idx.unwrap()].clone(), + ])), + true => DataSchemaRef::new(DataSchema::new(vec![DataField::new( + ROW_ID_COL_NAME, + common_expression::types::DataType::Number( + common_expression::types::NumberDataType::UInt64, + ), + )])), + }, merge_type: merge_type.clone(), change_join_order: *change_join_order, })); diff --git a/src/query/service/src/pipelines/builders/builder_merge_into.rs b/src/query/service/src/pipelines/builders/builder_merge_into.rs index b3d7f793e8f59..62ae6b4849b21 100644 --- a/src/query/service/src/pipelines/builders/builder_merge_into.rs +++ b/src/query/service/src/pipelines/builders/builder_merge_into.rs @@ -44,6 +44,8 @@ use common_storages_fuse::operations::MergeIntoNotMatchedProcessor; use common_storages_fuse::operations::MergeIntoSplitProcessor; use common_storages_fuse::operations::RowNumberAndLogSplitProcessor; use common_storages_fuse::operations::TransformAddRowNumberColumnProcessor; +use common_storages_fuse::operations::TransformDistributedMergeIntoBlockDeserialize; +use common_storages_fuse::operations::TransformDistributedMergeIntoBlockSerialize; use common_storages_fuse::operations::TransformSerializeBlock; use common_storages_fuse::FuseTable; @@ -117,6 +119,17 @@ impl PipelineBuilder { self.build_pipeline(input)?; self.main_pipeline.try_resize(1)?; + // deserialize MixRowIdKindAndLog + if *change_join_order { + self.main_pipeline + .add_transform(|transform_input_port, transform_output_port| { + Ok(TransformDistributedMergeIntoBlockDeserialize::create( + transform_input_port, + transform_output_port, + )) + })?; + } + let tbl = self .ctx .build_table_by_table_info(catalog_info, table_info, None)?; @@ -878,6 +891,17 @@ impl PipelineBuilder { )); } + // add distributed_merge_into_block_serialize + // we will wrap rowid and log as MixRowIdKindAndLog + if *distributed && *change_join_order { + self.main_pipeline + .add_transform(|transform_input_port, transform_output_port| { + Ok(TransformDistributedMergeIntoBlockSerialize::create( + transform_input_port, + transform_output_port, + )) + })?; + } Ok(()) } } diff --git a/src/query/storages/fuse/src/operations/merge_into/mod.rs b/src/query/storages/fuse/src/operations/merge_into/mod.rs index 52af8dcd8fed8..85510b43a9e9d 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mod.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mod.rs @@ -19,7 +19,7 @@ pub use mutator::MatchedAggregator; pub use processors::MatchedSplitProcessor; pub use processors::MergeIntoNotMatchedProcessor; pub use processors::MergeIntoSplitProcessor; -pub use processors::MixRowNumberKindAndLog; +pub use processors::MixRowIdKindAndLog; pub use processors::RowNumberAndLogSplitProcessor; pub use processors::SourceFullMatched; pub use processors::TransformAddRowNumberColumnProcessor; diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/mod.rs b/src/query/storages/fuse/src/operations/merge_into/processors/mod.rs index 681a1fdb1dbff..1bdf2ca5c8794 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/mod.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/mod.rs @@ -24,7 +24,7 @@ mod transform_matched_mutation_aggregator; pub use processor_distributed_merge_into_block_deserialize::TransformDistributedMergeIntoBlockDeserialize; pub use processor_distributed_merge_into_block_serialize::TransformDistributedMergeIntoBlockSerialize; pub use processor_merge_into_matched_and_split::MatchedSplitProcessor; -pub use processor_merge_into_matched_and_split::MixRowNumberKindAndLog; +pub use processor_merge_into_matched_and_split::MixRowIdKindAndLog; pub(crate) use processor_merge_into_matched_and_split::RowIdKind; pub use processor_merge_into_matched_and_split::SourceFullMatched; pub use processor_merge_into_not_matched::MergeIntoNotMatchedProcessor; diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_deserialize.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_deserialize.rs index 4c1a56e72e5dd..407736a251a21 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_deserialize.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_deserialize.rs @@ -26,7 +26,7 @@ use common_pipeline_core::PipeItem; use common_pipeline_transforms::processors::Transform; use common_pipeline_transforms::processors::Transformer; -use super::processor_merge_into_matched_and_split::MixRowNumberKindAndLog; +use super::processor_merge_into_matched_and_split::MixRowIdKindAndLog; use super::RowIdKind; // It will receive MutationLogs Or RowIds. @@ -37,7 +37,7 @@ pub struct TransformDistributedMergeIntoBlockDeserialize; /// this processor will be used in the future for merge into based on shuffle hash join. impl TransformDistributedMergeIntoBlockDeserialize { - fn create(input: Arc, output: Arc) -> ProcessorPtr { + pub fn create(input: Arc, output: Arc) -> ProcessorPtr { ProcessorPtr::create(Transformer::create( input, output, @@ -66,7 +66,7 @@ impl Transform for TransformDistributedMergeIntoBlockDeserialize { const NAME: &'static str = "TransformDistributedMergeIntoBlockDeserialize"; fn transform(&mut self, data: DataBlock) -> Result { - let mix_kind = MixRowNumberKindAndLog::downcast_ref_from(data.get_meta().unwrap()).unwrap(); + let mix_kind = MixRowIdKindAndLog::downcast_ref_from(data.get_meta().unwrap()).unwrap(); match mix_kind.kind { 0 => Ok(DataBlock::new_with_meta( data.columns().to_vec(), @@ -84,7 +84,7 @@ impl Transform for TransformDistributedMergeIntoBlockDeserialize { data.num_rows(), Some(Box::new(RowIdKind::Delete)), )), - _ => Err(ErrorCode::BadBytes("get error MixRowNumberKindAndLog kind")), + _ => Err(ErrorCode::BadBytes("get error MixRowIdKindAndLog kind")), } } } diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_serialize.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_serialize.rs index 42a21d512d829..ca6c7466f230c 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_serialize.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_serialize.rs @@ -30,7 +30,7 @@ use common_pipeline_core::PipeItem; use common_pipeline_transforms::processors::Transform; use common_pipeline_transforms::processors::Transformer; -use super::processor_merge_into_matched_and_split::MixRowNumberKindAndLog; +use super::processor_merge_into_matched_and_split::MixRowIdKindAndLog; use super::RowIdKind; use crate::operations::common::MutationLogs; @@ -42,7 +42,7 @@ pub struct TransformDistributedMergeIntoBlockSerialize; /// this processor will be used in the future for merge into based on shuffle hash join. impl TransformDistributedMergeIntoBlockSerialize { - fn create(input: Arc, output: Arc) -> ProcessorPtr { + pub fn create(input: Arc, output: Arc) -> ProcessorPtr { ProcessorPtr::create(Transformer::create( input, output, @@ -79,7 +79,7 @@ impl Transform for TransformDistributedMergeIntoBlockSerialize { Ok(DataBlock::new_with_meta( vec![entry], 1, - Some(Box::new(MixRowNumberKindAndLog { + Some(Box::new(MixRowIdKindAndLog { log: Some(log), kind: 0, })), @@ -90,7 +90,7 @@ impl Transform for TransformDistributedMergeIntoBlockSerialize { Ok(DataBlock::new_with_meta( data.columns().to_vec(), data.num_rows(), - Some(Box::new(MixRowNumberKindAndLog { + Some(Box::new(MixRowIdKindAndLog { log: None, kind: match row_id_kind { RowIdKind::Update => 1, diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs index c8b99c6a2dd40..60b75c91d2acd 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs @@ -66,7 +66,7 @@ enum MutationKind { // if we use hash shuffle join strategy, the enum // type can't be parser when transform data between nodes. #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] -pub struct MixRowNumberKindAndLog { +pub struct MixRowIdKindAndLog { pub log: Option, // kind's range is [0,1,2], 0 stands for log // 1 stands for row_id_update, 2 stands for row_id_delete, @@ -74,9 +74,9 @@ pub struct MixRowNumberKindAndLog { } #[typetag::serde(name = "mix_row_id_kind_and_log")] -impl BlockMetaInfo for MixRowNumberKindAndLog { +impl BlockMetaInfo for MixRowIdKindAndLog { fn equals(&self, info: &Box) -> bool { - MixRowNumberKindAndLog::downcast_ref_from(info).is_some_and(|other| self == other) + MixRowIdKindAndLog::downcast_ref_from(info).is_some_and(|other| self == other) } fn clone_self(&self) -> Box { diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs index 4ca785e0342b8..5bd6c8ed26e7f 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs @@ -27,6 +27,7 @@ use common_pipeline_core::Pipe; use common_pipeline_core::PipeItem; use super::processor_merge_into_matched_and_split::SourceFullMatched; +use crate::operations::merge_into::processors::RowIdKind; pub struct RowNumberAndLogSplitProcessor { input_port: Arc, @@ -133,19 +134,18 @@ impl Processor for RowNumberAndLogSplitProcessor { fn process(&mut self) -> Result<()> { if let Some(data_block) = self.input_data.take() { - // all matched or logs - // if it's rowid, the meta will be none if data_block.get_meta().is_some() { if SourceFullMatched::downcast_ref_from(data_block.get_meta().unwrap()).is_some() { + // distributed mode: source as build side + self.output_data_row_number = Some(data_block) + } else if RowIdKind::downcast_ref_from(data_block.get_meta().unwrap()).is_some() { + // distributed mode: target as build side self.output_data_row_number = Some(data_block) } else { // mutation logs self.output_data_log = Some(data_block); } } else { - // when we use source as probe side and do distributed - // execution,it could be rowid but we use output_data_row_number. - // it doesn't matter. self.output_data_row_number = Some(data_block) } }