Skip to content

Commit

Permalink
feat: change tracking enabled table support merge into (#14900)
Browse files Browse the repository at this point in the history
* change tracking enabled table support merge into

* add sqllogic test

* make lint

* update
  • Loading branch information
zhyass authored Mar 14, 2024
1 parent 8dd06c3 commit 5f1541b
Show file tree
Hide file tree
Showing 40 changed files with 788 additions and 643 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/query/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ doctest = false
test = false

[dependencies]
databend-common-arrow = { path = "../../common/arrow" }
databend-common-base = { path = "../../common/base" }
databend-common-config = { path = "../config" }
databend-common-exception = { path = "../../common/exception" }
Expand Down
1 change: 1 addition & 0 deletions src/query/catalog/src/plan/internal_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub struct InternalColumnMeta {
/// The row offsets in the block.
pub offsets: Option<Vec<usize>>,
pub base_block_ids: Option<Scalar>,
pub inner: Option<BlockMetaInfoPtr>,
}

#[typetag::serde(name = "internal_column_meta")]
Expand Down
21 changes: 8 additions & 13 deletions src/query/catalog/src/plan/stream_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ use std::any::Any;
use std::path::Path;
use std::sync::Arc;

use databend_common_arrow::arrow::bitmap::Bitmap;
use databend_common_base::base::uuid::Uuid;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::decimal::DecimalScalar;
use databend_common_expression::types::nullable::NullableColumn;
use databend_common_expression::types::AnyType;
use databend_common_expression::types::DataType;
use databend_common_expression::types::DecimalDataType;
Expand All @@ -32,7 +30,6 @@ use databend_common_expression::BlockEntry;
use databend_common_expression::BlockMetaInfo;
use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::BlockMetaInfoPtr;
use databend_common_expression::Column;
use databend_common_expression::ColumnId;
use databend_common_expression::FromData;
use databend_common_expression::Scalar;
Expand Down Expand Up @@ -128,16 +125,17 @@ impl StreamColumnMeta {
}
}

pub fn build_origin_block_row_num(num_rows: usize) -> Value<AnyType> {
pub fn build_origin_block_row_num(num_rows: usize) -> BlockEntry {
let mut row_ids = Vec::with_capacity(num_rows);
for i in 0..num_rows {
row_ids.push(i as u64);
}
let column = UInt64Type::from_data(row_ids);
Value::Column(Column::Nullable(Box::new(NullableColumn {
column,
validity: Bitmap::new_constant(true, num_rows),
})))
let column = Value::Column(UInt64Type::from_data(row_ids));

BlockEntry::new(
DataType::Nullable(Box::new(DataType::Number(NumberDataType::UInt64))),
column.wrap_nullable(None),
)
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -218,10 +216,7 @@ impl StreamColumn {
)))),
meta.build_origin_block_id(),
),
StreamColumnType::OriginRowNum => BlockEntry::new(
DataType::Nullable(Box::new(DataType::Number(NumberDataType::UInt64))),
build_origin_block_row_num(num_rows),
),
StreamColumnType::OriginRowNum => build_origin_block_row_num(num_rows),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ impl CopyIntoTableInterpreter {
None,
None,
false,
false,
)
.await?,
);
Expand Down
48 changes: 16 additions & 32 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ use databend_common_sql::executor::PhysicalPlanBuilder;
use databend_common_sql::plans;
use databend_common_sql::plans::MergeInto as MergePlan;
use databend_common_sql::plans::RelOperator;
use databend_common_sql::plans::UpdatePlan;
use databend_common_sql::IndexType;
use databend_common_sql::ScalarExpr;
use databend_common_sql::TypeCheck;
Expand Down Expand Up @@ -207,13 +206,6 @@ impl MergeIntoInterpreter {
// check mutability
let check_table = self.ctx.get_table(catalog, database, table_name).await?;
check_table.check_mutable()?;
// check change tracking
if check_table.change_tracking_enabled() {
return Err(ErrorCode::Unimplemented(format!(
"change tracking is enabled for table '{}', does not support MERGE INTO",
check_table.name(),
)));
}

let update_stream_meta = build_update_stream_meta_seq(self.ctx.clone(), meta_data).await?;

Expand Down Expand Up @@ -376,31 +368,23 @@ impl MergeIntoInterpreter {

// update
let update_list = if let Some(update_list) = &item.update {
// use update_plan to get exprs
let update_plan = UpdatePlan {
selection: None,
subquery_desc: vec![],
database: database.clone(),
table: match target_alias {
None => table_name.clone(),
Some(alias) => alias.name.to_string().to_lowercase(),
},
update_list: update_list.clone(),
bind_context: bind_context.clone(),
metadata: self.plan.meta_data.clone(),
catalog: catalog.clone(),
};
// we don't need real col_indices here, just give a
// dummy index, that's ok.
let col_indices = vec![DUMMY_COL_INDEX];
let update_list: Vec<(FieldIndex, RemoteExpr<String>)> = update_plan
.generate_update_list(
self.ctx.clone(),
fuse_table.schema().into(),
col_indices,
Some(PREDICATE_COLUMN_INDEX),
target_alias.is_some(),
)?;
let (database, table) = match target_alias {
None => (Some(database.as_str()), table_name.clone()),
Some(alias) => (None, alias.name.to_string().to_lowercase()),
};
let update_list = plans::generate_update_list(
self.ctx.clone(),
bind_context,
update_list,
fuse_table.schema_with_stream().into(),
col_indices,
Some(PREDICATE_COLUMN_INDEX),
database,
&table,
)?;
let update_list = update_list
.iter()
.map(|(idx, remote_expr)| {
Expand All @@ -422,7 +406,7 @@ impl MergeIntoInterpreter {
)
})
.collect_vec();
//
// update
Some(update_list)
} else {
// delete
Expand Down Expand Up @@ -452,7 +436,7 @@ impl MergeIntoInterpreter {

let commit_input = if !distributed {
// recv datablocks from matched upstream and unmatched upstream
// transform and append dat
// transform and append data
PhysicalPlan::MergeInto(Box::new(MergeInto {
input: Box::new(merge_into_source),
table_info: table_info.clone(),
Expand Down
2 changes: 0 additions & 2 deletions src/query/service/src/interpreters/interpreter_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,6 @@ impl UpdateInterpreter {
self.ctx.clone(),
tbl.schema_with_stream().into(),
col_indices.clone(),
None,
false,
)?;

let computed_list = self
Expand Down
13 changes: 4 additions & 9 deletions src/query/service/src/pipelines/builders/builder_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_pipeline_sources::EmptySource;
use databend_common_sql::evaluator::CompoundBlockOperator;
use databend_common_sql::executor::physical_plans::DeleteSource;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::gen_mutation_stream_operator;
use databend_common_sql::StreamContext;
use databend_common_storages_fuse::operations::MutationBlockPruningContext;
use databend_common_storages_fuse::operations::TransformSerializeBlock;
use databend_common_storages_fuse::FuseLazyPartInfo;
Expand Down Expand Up @@ -106,8 +105,8 @@ impl PipelineBuilder {
&mut self.main_pipeline,
)?;
if table.change_tracking_enabled() {
let func_ctx = self.ctx.get_function_context()?;
let (stream, operators) = gen_mutation_stream_operator(
let stream_ctx = StreamContext::try_create(
self.ctx.get_function_context()?,
table.schema_with_stream(),
table.get_table_info().ident.seq,
true,
Expand All @@ -117,11 +116,7 @@ impl PipelineBuilder {
TransformAddStreamColumns::try_create(
transform_input_port,
transform_output_port,
CompoundBlockOperator {
operators: operators.clone(),
ctx: func_ctx.clone(),
},
stream.clone(),
stream_ctx.clone(),
)
})?;
}
Expand Down
38 changes: 9 additions & 29 deletions src/query/service/src/pipelines/builders/builder_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl PipelineBuilder {
// start to append data

// 1.fill default columns
let table_default_schema = &tbl.schema().remove_computed_fields();
let table_default_schema = &tbl.schema_with_stream().remove_computed_fields();
let mut builder = self.main_pipeline.add_transform_with_specified_len(
|transform_input_port, transform_output_port| {
TransformResortAddOnWithoutSourceSchema::try_create(
Expand All @@ -200,7 +200,7 @@ impl PipelineBuilder {
Arc::new(DataSchema::from(table_default_schema)),
unmatched.clone(),
tbl.clone(),
Arc::new(DataSchema::from(tbl.schema())),
Arc::new(DataSchema::from(tbl.schema_with_stream())),
)
},
1,
Expand All @@ -209,7 +209,7 @@ impl PipelineBuilder {
self.main_pipeline.add_pipe(builder.finalize());

// 2.fill computed columns
let table_computed_schema = &tbl.schema().remove_virtual_computed_fields();
let table_computed_schema = &tbl.schema_with_stream().remove_virtual_computed_fields();
let default_schema: DataSchemaRef = Arc::new(table_default_schema.into());
let computed_schema: DataSchemaRef = Arc::new(table_computed_schema.into());
if default_schema != computed_schema {
Expand Down Expand Up @@ -307,15 +307,6 @@ impl PipelineBuilder {
table.get_block_thresholds(),
None,
)?;
let block_builder = TransformSerializeBlock::try_create(
self.ctx.clone(),
InputPort::create(),
OutputPort::create(),
table,
cluster_stats_gen.clone(),
MutationKind::MergeInto,
)?
.get_block_builder();
let max_threads = self.settings.get_max_threads()?;
let io_request_semaphore = Arc::new(Semaphore::new(max_threads as usize));
// MutationsLogs port0
Expand All @@ -324,7 +315,7 @@ impl PipelineBuilder {
self.main_pipeline.add_pipe(Pipe::create(2, 2, vec![
table.rowid_aggregate_mutator(
self.ctx.clone(),
block_builder,
cluster_stats_gen,
io_request_semaphore,
segments.clone(),
false, // we don't support for distributed mode.
Expand Down Expand Up @@ -429,17 +420,6 @@ impl PipelineBuilder {
let cluster_stats_gen =
table.get_cluster_stats_gen(self.ctx.clone(), 0, block_thresholds, None)?;

// this TransformSerializeBlock is just used to get block_builder
let block_builder = TransformSerializeBlock::try_create(
self.ctx.clone(),
InputPort::create(),
OutputPort::create(),
table,
cluster_stats_gen.clone(),
MutationKind::MergeInto,
)?
.get_block_builder();

let serialize_segment_transform = TransformSerializeSegment::new(
self.ctx.clone(),
InputPort::create(),
Expand Down Expand Up @@ -492,7 +472,7 @@ impl PipelineBuilder {
matched.clone(),
field_index_of_input_schema.clone(),
input.output_schema()?,
Arc::new(DataSchema::from(tbl.schema())),
Arc::new(DataSchema::from(tbl.schema_with_stream())),
merge_into.target_build_optimization,
*can_try_update_column_only,
)?;
Expand Down Expand Up @@ -676,7 +656,7 @@ impl PipelineBuilder {
};

// fill default columns
let table_default_schema = &table.schema().remove_computed_fields();
let table_default_schema = &table.schema_with_stream().remove_computed_fields();
let mut builder = self.main_pipeline.add_transform_with_specified_len(
|transform_input_port, transform_output_port| {
TransformResortAddOnWithoutSourceSchema::try_create(
Expand All @@ -686,7 +666,7 @@ impl PipelineBuilder {
Arc::new(DataSchema::from(table_default_schema)),
unmatched.clone(),
tbl.clone(),
Arc::new(DataSchema::from(table.schema())),
Arc::new(DataSchema::from(table.schema_with_stream())),
)
},
fill_default_len,
Expand All @@ -713,7 +693,7 @@ impl PipelineBuilder {
self.main_pipeline
.add_pipe(add_builder_pipe(builder, distributed));
// fill computed columns
let table_computed_schema = &table.schema().remove_virtual_computed_fields();
let table_computed_schema = &table.schema_with_stream().remove_virtual_computed_fields();
let default_schema: DataSchemaRef = Arc::new(table_default_schema.into());
let computed_schema: DataSchemaRef = Arc::new(table_computed_schema.into());
if default_schema != computed_schema {
Expand Down Expand Up @@ -839,7 +819,7 @@ impl PipelineBuilder {
} else {
pipe_items.push(table.rowid_aggregate_mutator(
self.ctx.clone(),
block_builder,
cluster_stats_gen.clone(),
io_request_semaphore,
segments.clone(),
merge_into.target_build_optimization,
Expand Down
17 changes: 8 additions & 9 deletions src/query/service/src/pipelines/builders/builder_recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use databend_common_sql::evaluator::CompoundBlockOperator;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::executor::physical_plans::ReclusterSink;
use databend_common_sql::executor::physical_plans::ReclusterSource;
use databend_common_sql::gen_mutation_stream_operator;
use databend_common_sql::StreamContext;
use databend_common_storages_factory::Table;
use databend_common_storages_fuse::operations::CommitSink;
use databend_common_storages_fuse::operations::MutationGenerator;
Expand Down Expand Up @@ -100,19 +100,18 @@ impl PipelineBuilder {

let num_input_columns = schema.fields().len();
if table.change_tracking_enabled() {
let func_ctx = self.ctx.get_function_context()?;
let (stream, operators) =
gen_mutation_stream_operator(schema, table_info.ident.seq, false)?;
let stream_ctx = StreamContext::try_create(
self.ctx.get_function_context()?,
schema,
table_info.ident.seq,
false,
)?;
self.main_pipeline.add_transform(
|transform_input_port, transform_output_port| {
TransformAddStreamColumns::try_create(
transform_input_port,
transform_output_port,
CompoundBlockOperator {
operators: operators.clone(),
ctx: func_ctx.clone(),
},
stream.clone(),
stream_ctx.clone(),
)
},
)?;
Expand Down
Loading

0 comments on commit 5f1541b

Please sign in to comment.