Skip to content

Commit

Permalink
fix: consume stream inside explicit txn (#15582)
Browse files Browse the repository at this point in the history
* fix: consume stream inside explicit txn

Consuming stream should respects the semantics of txn

* only allows "with consume" in query (and explain)

* cargo fmt

* fix logic test

* tweak logic test

* add logic test

* chore: rm http handlers constraints from logic test

since pr #15583 fix it

---------

Co-authored-by: sky <[email protected]>
  • Loading branch information
dantengsky and SkyFan2002 authored May 20, 2024
1 parent 101ac45 commit 17e412f
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 15 deletions.
1 change: 1 addition & 0 deletions src/query/service/src/interpreters/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub use notification::get_notification_client_config;
pub use query_log::InterpreterQueryLog;
pub use stream::dml_build_update_stream_req;
pub use stream::query_build_update_stream_req;
pub use stream::StreamTableUpdates;
pub use table::check_referenced_computed_columns;
pub use task::get_task_client_config;
pub use task::make_schedule_options;
Expand Down
24 changes: 15 additions & 9 deletions src/query/service/src/interpreters/common/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use chrono::Utc;
use databend_common_exception::Result;
use databend_common_license::license::Feature;
use databend_common_license::license_manager::get_license_manager;
use databend_common_meta_app::schema::UpdateMultiTableMetaReq;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::UpdateStreamMetaReq;
use databend_common_meta_app::schema::UpdateTableMetaReq;
use databend_common_meta_types::MatchSeq;
Expand Down Expand Up @@ -95,10 +95,14 @@ where F: Fn(&TableEntry) -> bool {
Ok(streams)
}

pub struct StreamTableUpdates {
pub update_table_metas: Vec<UpdateTableMetaReq>,
pub table_infos: Vec<TableInfo>,
}
pub async fn query_build_update_stream_req(
ctx: &Arc<QueryContext>,
metadata: &MetadataRef,
) -> Result<Option<UpdateMultiTableMetaReq>> {
) -> Result<Option<StreamTableUpdates>> {
let streams = get_stream_table(metadata, |t| {
t.is_consume() && t.table().engine() == STREAM_ENGINE
})?;
Expand All @@ -111,10 +115,13 @@ pub async fn query_build_update_stream_req(
.manager
.check_enterprise_enabled(ctx.get_license_key(), Feature::Stream)?;

let mut update_table_metas = Vec::with_capacity(streams.len());
let cap = streams.len();
let mut update_table_meta_reqs = Vec::with_capacity(cap);
let mut table_infos = Vec::with_capacity(cap);
for table in streams.into_iter() {
let stream = StreamTable::try_from_table(table.as_ref())?;
let stream_info = stream.get_table_info();
table_infos.push(stream_info.clone());

let source_table = stream.source_table(ctx.clone()).await?;
let inner_fuse = FuseTable::try_from_table(source_table.as_ref())?;
Expand All @@ -129,7 +136,7 @@ pub async fn query_build_update_stream_req(
new_table_meta.options = options;
new_table_meta.updated_on = Utc::now();

update_table_metas.push(UpdateTableMetaReq {
update_table_meta_reqs.push(UpdateTableMetaReq {
table_id: stream_info.ident.table_id,
seq: MatchSeq::Exact(stream_info.ident.seq),
new_table_meta,
Expand All @@ -138,10 +145,9 @@ pub async fn query_build_update_stream_req(
deduplicated_label: None,
});
}
Ok(Some(UpdateMultiTableMetaReq {
update_table_metas,
copied_files: vec![],
update_stream_metas: vec![],
deduplicated_labels: vec![],

Ok(Some(StreamTableUpdates {
update_table_metas: update_table_meta_reqs,
table_infos,
}))
}
37 changes: 31 additions & 6 deletions src/query/service/src/interpreters/interpreter_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use databend_common_expression::DataField;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::DataSchemaRefExt;
use databend_common_expression::TableSchemaRef;
use databend_common_meta_app::schema::UpdateMultiTableMetaReq;
use databend_common_meta_store::MetaStore;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
Expand All @@ -45,6 +46,7 @@ use log::error;
use log::info;

use crate::interpreters::common::query_build_update_stream_req;
use crate::interpreters::common::StreamTableUpdates;
use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline;
Expand Down Expand Up @@ -138,13 +140,16 @@ impl SelectInterpreter {
.await?;

// consume stream
if let Some(req) = query_build_update_stream_req(&self.ctx, &self.metadata).await? {
assert!(!req.update_table_metas.is_empty());
if let Some(StreamTableUpdates {
update_table_metas,
table_infos,
}) = query_build_update_stream_req(&self.ctx, &self.metadata).await?
{
assert!(!update_table_metas.is_empty());

// defensively checks that all catalog names are identical
{
let mut iter = req
.update_table_metas
let mut iter = update_table_metas
.iter()
.map(|item| item.new_table_meta.catalog.as_str());
let first = iter.next().unwrap();
Expand All @@ -158,17 +163,37 @@ impl SelectInterpreter {
}
}

let catalog_name = req.update_table_metas[0].new_table_meta.catalog.as_str();
let catalog_name = update_table_metas[0].new_table_meta.catalog.as_str();
let catalog = self.ctx.get_catalog(catalog_name).await?;
let query_id = self.ctx.get_id();
let auto_commit = !self.ctx.txn_mgr().lock().is_active();
build_res.main_pipeline.set_on_finished(
move |(_profiles, may_error)| match may_error {
Ok(_) => GlobalIORuntime::instance().block_on(async move {
info!(
"Updating the stream meta to consume data, query_id: {}",
query_id
);
catalog.update_multi_table_meta(req).await.map(|_| ())

if auto_commit {
info!("(auto) committing stream consumptions");
// commit to meta server directly
let r = UpdateMultiTableMetaReq {
update_table_metas,
copied_files: vec![],
update_stream_metas: vec![],
deduplicated_labels: vec![],
};
catalog.update_multi_table_meta(r).await.map(|_| ())
} else {
info!("(non-auto) committing stream consumptions");
for (req, info) in
update_table_metas.into_iter().zip(table_infos.into_iter())
{
catalog.update_table_meta(&info, req).await?;
}
Ok(())
}
}),
Err(error_code) => Err(error_code.clone()),
},
Expand Down
17 changes: 17 additions & 0 deletions src/query/sql/src/planner/binder/binder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use databend_common_ast::parser::parse_sql;
use databend_common_ast::parser::tokenize_sql;
use databend_common_ast::parser::Dialect;
use databend_common_catalog::catalog::CatalogManager;
use databend_common_catalog::query_kind::QueryKind;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand Down Expand Up @@ -663,6 +664,22 @@ impl<'a> Binder {
self.bind_set_priority(priority, object_id).await?
},
};

match plan.kind() {
QueryKind::Query { .. } | QueryKind::Explain { .. } => {}
_ => {
let meta_data_guard = self.metadata.read();
let tables = meta_data_guard.tables();
for t in tables {
if t.is_consume() {
return Err(ErrorCode::SyntaxException(
"WITH CONSUME only allowed in query",
));
}
}
}
}

Ok(plan)
}

Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::plans::Insert;
use crate::plans::InsertInputSource;
use crate::plans::Plan;
use crate::BindContext;

impl Binder {
pub fn schema_project(
&self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,189 @@ query I
SELECT c FROM s2;
----


#######################################
# consume inside explicit transaction #
#######################################


statement ok
create or replace table t_1 (
str varchar
);

statement ok
create or replace stream s_1 on table t_1 append_only = true;

statement ok
insert into t_1 (str) values ('a'), ('b');


# case 1: with consume should rollback-able (implicitly)

statement ok
begin;

query I
select str from s_1 with consume order by str;
----
a
b

statement error 1006
select 1/0;

statement ok
commit;

query I
select str from s_1 order by str;
----
a
b

# case 2: with consume should rollback-able (explicitly)

statement ok
begin;

query I
select str from s_1 with consume order by str;
----
a
b

# inside txn, s_1 is consumed, expects empty result set
query I
select str from s_1;

statement ok
rollback;

query I
select str from s_1 order by str;
----
a
b



# case 3: mixed with DMLs

statement ok
create table tmp_sink like t_1;

statement ok
begin;

# normal dml
statement ok
insert into tmp_sink select str from s_1;

# changes should not be consumed
query I
select str from s_1 order by str;
----
a
b

# but changes should be captured by insert stmt
query I
select count() from tmp_sink
----
2

# dml and stream consume

statement ok
select str from s_1 with consume;

statement ok
truncate table tmp_sink;

statement ok
insert into tmp_sink select str from s_1;

query I
select count() from tmp_sink
----
0

statement ok
commit;

query I
select str from s_1 order by str;
----


# case 4: disallows WITH CONSUME inside dml

statement ok
insert into t_1 (str) values ('a'), ('b');

statement error 1005
insert into tmp_sink select str from s_1 with consume;

statement error 1005
copy into tmp_sink select str from s_1 with consume;

# allows explain

# using statement to ignore the result (which may not be deterministic)

statement ok
explain select str from s_1 with consume;

statement ok
explain select str from s_1;

# explain should not consume the stream
query I
select str from s_1 order by str;
----
a
b

statement ok
create or replace table target_1 (
str varchar
);

statement ok
create or replace table target_2 (
str varchar
);

statement ok
begin;

statement ok
insert into target_1 select str from s_1;

query I
select str from s_1 with consume order by str;
----
a
b

statement ok
insert into target_2 select str from s_1;

statement ok
commit;

query I
select str from target_1 order by str;
----
a
b

query I
select str from target_2 order by str;
----


statement ok
drop database test_txn_stream;

0 comments on commit 17e412f

Please sign in to comment.