Skip to content

Commit

Permalink
feat: change tracking enabled table support replace into (#14831)
Browse files Browse the repository at this point in the history
* change enabled table support replace into

* add sqllogic test

* update test
  • Loading branch information
zhyass authored Mar 5, 2024
1 parent d6b0a07 commit 4f2e1d0
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 22 deletions.
7 changes: 0 additions & 7 deletions src/query/service/src/interpreters/interpreter_replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,6 @@ impl ReplaceInterpreter {

// check mutability
table.check_mutable()?;
// check change tracking
if table.change_tracking_enabled() {
return Err(ErrorCode::Unimplemented(format!(
"change tracking is enabled for table '{}', does not support REPLACE",
table.name(),
)));
}

let catalog = self.ctx.get_catalog(&plan.catalog).await?;
let schema = table.schema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ impl PipelineBuilder {
cluster_stats_gen,
MutationKind::Replace,
)?;
let block_builder = serialize_block_transform.get_block_builder();
let mut block_builder = serialize_block_transform.get_block_builder();
block_builder.source_schema = table.schema_with_stream();

let serialize_segment_transform = TransformSerializeSegment::new(
self.ctx.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl TransformSerializeBlock {
.filter(|f| !matches!(f.computed_expr(), Some(ComputedExpr::Virtual(_))))
.cloned()
.collect::<Vec<_>>();
if !matches!(kind, MutationKind::Insert) {
if !matches!(kind, MutationKind::Insert | MutationKind::Replace) {
// add stream fields.
for stream_column in table.stream_columns().iter() {
fields.push(stream_column.table_field());
Expand Down
4 changes: 1 addition & 3 deletions src/query/storages/fuse/src/operations/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ impl FuseTable {
bloom_filter_column_indexes.clone(),
chunk_of_segment_locations,
block_slots.clone(),
self.operator.clone(),
self.table_info.schema(),
self.get_write_settings(),
self,
read_settings,
block_builder.clone(),
io_request_semaphore.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,29 @@ use databend_common_base::base::tokio::sync::Semaphore;
use databend_common_base::base::ProgressValues;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_base::runtime::TrySpawn;
use databend_common_catalog::plan::gen_mutation_stream_meta;
use databend_common_catalog::plan::Projection;
use databend_common_catalog::plan::StreamColumn;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::types::UInt64Type;
use databend_common_expression::BlockEntry;
use databend_common_expression::Column;
use databend_common_expression::ColumnId;
use databend_common_expression::ComputedExpr;
use databend_common_expression::DataBlock;
use databend_common_expression::FieldIndex;
use databend_common_expression::FromData;
use databend_common_expression::Scalar;
use databend_common_expression::TableSchema;
use databend_common_expression::Value;
use databend_common_metrics::storage::*;
use databend_common_sql::evaluator::BlockOperator;
use databend_common_sql::executor::physical_plans::OnConflictField;
use databend_common_sql::gen_mutation_stream_operator;
use databend_storages_common_cache::LoadParams;
use databend_storages_common_index::filters::Filter;
use databend_storages_common_index::filters::Xor8Filter;
Expand Down Expand Up @@ -68,6 +78,7 @@ use crate::operations::replace_into::meta::MergeIntoOperation;
use crate::operations::replace_into::meta::UniqueKeyDigest;
use crate::operations::replace_into::mutator::row_hash_of_columns;
use crate::operations::replace_into::mutator::DeletionAccumulator;
use crate::FuseTable;

struct AggregationContext {
segment_locations: AHashMap<SegmentIndex, Location>,
Expand All @@ -90,6 +101,8 @@ struct AggregationContext {
block_builder: BlockBuilder,
io_request_semaphore: Arc<Semaphore>,
query_id: String,
stream_columns: Vec<StreamColumn>,
stream_operators: Vec<BlockOperator>,
}

// Apply MergeIntoOperations to segments
Expand All @@ -107,13 +120,16 @@ impl MergeIntoOperationAggregator {
bloom_filter_column_indexes: Vec<FieldIndex>,
segment_locations: Vec<(SegmentIndex, Location)>,
block_slots: Option<BlockSlotDescription>,
data_accessor: Operator,
table_schema: Arc<TableSchema>,
write_settings: WriteSettings,
table: &FuseTable,
read_settings: ReadSettings,
block_builder: BlockBuilder,
io_request_semaphore: Arc<Semaphore>,
) -> Result<Self> {
let data_accessor = table.get_operator();
let table_schema = table.schema_with_stream();
let write_settings = table.get_write_settings();
let update_stream_columns = table.change_tracking_enabled();

let deletion_accumulator = DeletionAccumulator::default();
let segment_reader =
MetaReaders::segment_info_reader(data_accessor.clone(), table_schema.clone());
Expand Down Expand Up @@ -145,7 +161,7 @@ impl MergeIntoOperationAggregator {
table_schema.clone(),
projection,
false,
false,
update_stream_columns,
false,
)
}?;
Expand All @@ -158,16 +174,23 @@ impl MergeIntoOperationAggregator {
let reader = BlockReader::create(
ctx.clone(),
data_accessor.clone(),
table_schema,
table_schema.clone(),
projection,
false,
false,
update_stream_columns,
false,
)?;
Some(reader)
}
};
let query_id = ctx.get_id();

let (stream_columns, stream_operators) = if update_stream_columns {
gen_mutation_stream_operator(table_schema, table.get_table_info().ident.seq, true)?
} else {
(vec![], vec![])
};

Ok(Self {
ctx,
deletion_accumulator,
Expand All @@ -186,6 +209,8 @@ impl MergeIntoOperationAggregator {
block_builder,
io_request_semaphore,
query_id,
stream_columns,
stream_operators,
}),
})
}
Expand Down Expand Up @@ -471,7 +496,7 @@ impl AggregationContext {
let bitmap = bitmap.into();
let mut key_columns_data_after_deletion = key_columns_data.filter_with_bitmap(&bitmap)?;

let new_block = match &self.remain_column_reader {
let mut new_block = match &self.remain_column_reader {
None => key_columns_data_after_deletion,
Some(remain_columns_reader) => {
metrics_inc_replace_block_number_totally_loaded(1);
Expand Down Expand Up @@ -505,6 +530,31 @@ impl AggregationContext {
}
};

if self.key_column_reader.update_stream_columns {
// generate row id column
let mut row_ids = Vec::with_capacity(num_rows);
for i in 0..num_rows {
row_ids.push(i as u64);
}
let value = Value::Column(Column::filter(&UInt64Type::from_data(row_ids), &bitmap));
let row_num = BlockEntry::new(
DataType::Nullable(Box::new(DataType::Number(NumberDataType::UInt64))),
value.wrap_nullable(None),
);
new_block.add_column(row_num);

let stream_meta = gen_mutation_stream_meta(None, &block_meta.location.0)?;
for stream_column in self.stream_columns.iter() {
let entry = stream_column.generate_column_values(&stream_meta, num_rows);
new_block.add_column(entry);
}
let func_ctx = self.block_builder.ctx.get_function_context()?;
new_block = self
.stream_operators
.iter()
.try_fold(new_block, |input, op| op.execute(&func_ctx, input))?;
}

// serialization and compression is cpu intensive, send them to dedicated thread pool
// and wait (asyncly, which will NOT block the executor thread)
let block_builder = self.block_builder.clone();
Expand Down Expand Up @@ -749,6 +799,7 @@ mod tests {
use databend_common_expression::types::NumberScalar;
use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use databend_common_expression::TableSchema;

use super::*;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,18 @@ select a, _origin_version is null, _origin_block_id is null, _origin_block_row_n
statement error 1065
select change$is_update from t

statement error 1002
replace into t on(a) values(7)
statement ok
replace into t on(a) values(6),(7)

query IBBII
select a, _origin_version is null, _origin_block_id is null, _origin_block_row_num, _row_version from t order by a
----
1 0 0 0 0
2 0 0 1 0
3 0 0 1 1
5 0 0 0 0
6 0 0 0 0
7 0 0 1 0

statement ok
create table t2(a int)
Expand Down
65 changes: 65 additions & 0 deletions tests/sqllogictests/suites/ee/01_ee_system/01_0003_stream.test
Original file line number Diff line number Diff line change
Expand Up @@ -420,13 +420,78 @@ select a, `B`, c, change$action, change$is_update from s6 order by a, `B`
3 3 3 INSERT 0
4 4 4 INSERT 0

# test truncate
statement ok
truncate table t6

query IIITB
select a, `B`, c, change$action, change$is_update from s6 order by a, `B`
----
1 1 1 DELETE 0
2 2 2 DELETE 0

statement ok
drop stream s6

######################
# end of issue 14506 #
######################

statement ok
create table t7(a int, b int)

statement ok
insert into t7 values(1, 1), (2, 2), (3, 3)

statement ok
create stream s7 on table t7 append_only = false

statement ok
create table t8(a int, b int, c int)

statement ok
insert into t8 values(2, 2, 2), (3, 3, 3), (4, 4, 4)

# test replace into
statement ok
replace into t7 on(a, b) delete when a = 3 select * from t8

query IITB
select a, b, change$action, change$is_update from s7 order by a, b
----
2 2 DELETE 0
2 2 INSERT 0
3 3 DELETE 0
4 4 INSERT 0

query IIBBII
select a, b, _origin_version is null, _origin_block_id is null, _origin_block_row_num, _row_version from t7 order by a
----
1 1 0 0 0 0
2 2 0 0 0 0
4 4 0 0 1 0

statement ok
create stream s7_1 on table t7 at(stream => s7)

query IITB
select a, b, change$action, change$is_update from s7_1 order by a, b
----
2 2 INSERT 0
4 4 INSERT 0

statement ok
drop stream s7

statement ok
drop stream s7_1

statement ok
drop table t7 all

statement ok
drop table t8 all

statement error 2733
create stream s_err on table t4 at (stream => s3)

Expand Down

0 comments on commit 4f2e1d0

Please sign in to comment.