From 6370687591ad54730468ad4a32d11acc4bc4d79b Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 15 Nov 2024 14:42:08 +0800 Subject: [PATCH 1/7] feat: Replace flow --- src/common/meta/src/ddl/create_flow.rs | 60 ++++++-- src/common/meta/src/ddl_manager.rs | 15 +- src/common/meta/src/key/flow.rs | 87 +++++++++++ src/common/meta/src/key/flow/flow_info.rs | 28 +++- src/common/meta/src/key/flow/flow_name.rs | 35 ++++- src/flow/src/adapter.rs | 76 ++++++++-- src/flow/src/adapter/flownode_impl.rs | 31 ++-- src/flow/src/server.rs | 36 ++--- .../common/flow/show_create_flow.result | 143 ++++++++++++++++++ .../common/flow/show_create_flow.sql | 65 ++++++++ 10 files changed, 510 insertions(+), 66 deletions(-) diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index afaf606078c0..951c47e470ae 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -28,6 +28,7 @@ use common_procedure::{ use common_telemetry::info; use common_telemetry::tracing_context::TracingContext; use futures::future::join_all; +use futures::TryStreamExt; use itertools::Itertools; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; @@ -75,6 +76,7 @@ impl CreateFlowProcedure { source_table_ids: vec![], query_context, state: CreateFlowState::Prepare, + flow_already_exists: false, }, } } @@ -90,6 +92,7 @@ impl CreateFlowProcedure { let flow_name = &self.data.task.flow_name; let sink_table_name = &self.data.task.sink_table_name; let create_if_not_exists = self.data.task.create_if_not_exists; + let or_replace = self.data.task.or_replace; let flow_name_value = self .context @@ -98,16 +101,40 @@ impl CreateFlowProcedure { .get(catalog_name, flow_name) .await?; + if create_if_not_exists && or_replace { + return error::UnsupportedSnafu { + operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`".to_string(), + } + .fail(); + } + if let Some(value) = flow_name_value { ensure!( - create_if_not_exists, + create_if_not_exists || or_replace, error::FlowAlreadyExistsSnafu { flow_name: format_full_flow_name(catalog_name, flow_name), } ); let flow_id = value.flow_id(); - return Ok(Status::done_with_output(flow_id)); + if create_if_not_exists { + info!("Flow already exists, flow_id: {}", flow_id); + return Ok(Status::done_with_output(flow_id)); + } + + let flow_id = value.flow_id(); + let peers = self + .context + .flow_metadata_manager + .flow_route_manager() + .routes(flow_id) + .map_ok(|(_, value)| value.peer) + .try_collect::>() + .await?; + self.data.flow_id = Some(flow_id); + self.data.peers = peers; + info!("Replacing flow, flow_id: {}", flow_id); + self.data.flow_already_exists = true; } // Ensures sink table doesn't exist. @@ -128,7 +155,9 @@ impl CreateFlowProcedure { } self.collect_source_tables().await?; - self.allocate_flow_id().await?; + if self.data.flow_id.is_none() { + self.allocate_flow_id().await?; + } self.data.state = CreateFlowState::CreateFlows; Ok(Status::executing(true)) @@ -170,13 +199,21 @@ impl CreateFlowProcedure { async fn on_create_metadata(&mut self) -> Result { // Safety: The flow id must be allocated. let flow_id = self.data.flow_id.unwrap(); - // TODO(weny): Support `or_replace`. let (flow_info, flow_routes) = (&self.data).into(); - self.context - .flow_metadata_manager - .create_flow_metadata(flow_id, flow_info, flow_routes) - .await?; - info!("Created flow metadata for flow {flow_id}"); + if self.data.flow_already_exists { + self.context + .flow_metadata_manager + .update_flow_metadata(flow_id, flow_info, flow_routes) + .await?; + info!("Replaced flow metadata for flow {flow_id}"); + } else { + self.context + .flow_metadata_manager + .create_flow_metadata(flow_id, flow_info, flow_routes) + .await?; + info!("Created flow metadata for flow {flow_id}"); + } + self.data.state = CreateFlowState::InvalidateFlowCache; Ok(Status::executing(true)) } @@ -270,6 +307,7 @@ pub struct CreateFlowData { pub(crate) peers: Vec, pub(crate) source_table_ids: Vec, pub(crate) query_context: QueryContext, + pub(crate) flow_already_exists: bool, } impl From<&CreateFlowData> for CreateRequest { @@ -284,9 +322,9 @@ impl From<&CreateFlowData> for CreateRequest { .map(|table_id| api::v1::TableId { id: *table_id }) .collect_vec(), sink_table_name: Some(value.task.sink_table_name.clone().into()), - // Always be true + // Always be true to ensure idempotent in case of retry create_if_not_exists: true, - or_replace: true, + or_replace: value.task.or_replace, expire_after: value.task.expire_after.map(|value| ExpireAfter { value }), comment: value.task.comment.clone(), sql: value.task.sql.clone(), diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 1ee148406d04..617a8c574d7e 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -655,10 +655,17 @@ async fn handle_create_flow_task( procedure_id: &procedure_id, err_msg: "downcast to `u32`", })?); - info!( - "Flow {}.{}({flow_id}) is created via procedure_id {id:?}", - create_flow_task.catalog_name, create_flow_task.flow_name, - ); + if !create_flow_task.or_replace { + info!( + "Flow {}.{}({flow_id}) is created via procedure_id {id:?}", + create_flow_task.catalog_name, create_flow_task.flow_name, + ); + } else { + info!( + "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}", + create_flow_task.catalog_name, create_flow_task.flow_name, + ); + } Ok(SubmitDdlTaskResponse { key: procedure_id.into(), diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index 41a5058a331f..e92255d0afe9 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -230,6 +230,93 @@ impl FlowMetadataManager { Ok(()) } + /// Update metadata for flow and returns an error if old metadata IS NOT exists. + pub async fn update_flow_metadata( + &self, + flow_id: FlowId, + flow_info: FlowInfoValue, + flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>, + ) -> Result<()> { + let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = self + .flow_name_manager + .build_update_txn(&flow_info.catalog_name, &flow_info.flow_name, flow_id)?; + + let (create_flow_txn, on_create_flow_failure) = self + .flow_info_manager + .build_update_txn(flow_id, &flow_info)?; + + let create_flow_routes_txn = self + .flow_route_manager + .build_create_txn(flow_id, flow_routes.clone())?; + + let create_flownode_flow_txn = self + .flownode_flow_manager + .build_create_txn(flow_id, flow_info.flownode_ids().clone()); + + let create_table_flow_txn = self.table_flow_manager.build_create_txn( + flow_id, + flow_routes + .into_iter() + .map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer })) + .collect(), + flow_info.source_table_ids(), + )?; + + let txn = Txn::merge_all(vec![ + create_flow_flow_name_txn, + create_flow_txn, + create_flow_routes_txn, + create_flownode_flow_txn, + create_table_flow_txn, + ]); + info!( + "Creating flow {}.{}({}), with {} txn operations", + flow_info.catalog_name, + flow_info.flow_name, + flow_id, + txn.max_operations() + ); + + let mut resp = self.kv_backend.txn(txn).await?; + if !resp.succeeded { + let mut set = TxnOpGetResponseSet::from(&mut resp.responses); + let remote_flow_flow_name = + on_create_flow_flow_name_failure(&mut set)?.with_context(|| { + error::UnexpectedSnafu { + err_msg: format!( + "Reads the empty flow name during the creating flow, flow_id: {flow_id}" + ), + } + })?; + + if remote_flow_flow_name.flow_id() != flow_id { + info!( + "Trying to create flow {}.{}({}), but flow({}) already exists", + flow_info.catalog_name, + flow_info.flow_name, + flow_id, + remote_flow_flow_name.flow_id() + ); + + return error::FlowAlreadyExistsSnafu { + flow_name: format!("{}.{}", flow_info.catalog_name, flow_info.flow_name), + } + .fail(); + } + + let remote_flow = + on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu { + err_msg: format!( + "Reads the empty flow during the creating flow, flow_id: {flow_id}" + ), + })?; + let op_name = "creating flow"; + ensure_values!(*remote_flow, flow_info, op_name); + } + + Ok(()) + } + fn flow_metadata_keys(&self, flow_id: FlowId, flow_value: &FlowInfoValue) -> Vec> { let source_table_ids = flow_value.source_table_ids(); let mut keys = diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index 55f982af936b..c3f718870f9a 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -26,7 +26,7 @@ use crate::error::{self, Result}; use crate::key::flow::FlowScoped; use crate::key::txn_helper::TxnOpGetResponseSet; use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue}; -use crate::kv_backend::txn::Txn; +use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::FlownodeId; @@ -215,6 +215,32 @@ impl FlowInfoManager { TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)), )) } + + /// Builds a update flow transaction. + /// It is expected that the `__flow/info/{flow_id}` IS ALREADY occupied. + /// Otherwise, the transaction will retrieve existing value and fail. + pub(crate) fn build_update_txn( + &self, + flow_id: FlowId, + flow_value: &FlowInfoValue, + ) -> Result<( + Txn, + impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult, + )> { + let key = FlowInfoKey::new(flow_id).to_bytes(); + let txn = Txn::new() + .when(vec![Compare::new(key.clone(), CompareOp::NotEqual, None)]) + .and_then(vec![TxnOp::Put( + key.clone(), + flow_value.try_as_raw_value()?, + )]) + .or_else(vec![TxnOp::Get(key.clone())]); + + Ok(( + txn, + TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)), + )) + } } #[cfg(test)] diff --git a/src/common/meta/src/key/flow/flow_name.rs b/src/common/meta/src/key/flow/flow_name.rs index 0db86b05fa5e..4a829851c50f 100644 --- a/src/common/meta/src/key/flow/flow_name.rs +++ b/src/common/meta/src/key/flow/flow_name.rs @@ -26,7 +26,7 @@ use crate::key::txn_helper::TxnOpGetResponseSet; use crate::key::{ BytesAdapter, DeserializedValueWithBytes, FlowId, MetadataKey, MetadataValue, NAME_PATTERN, }; -use crate::kv_backend::txn::Txn; +use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; use crate::rpc::store::RangeRequest; @@ -237,6 +237,39 @@ impl FlowNameManager { TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)), )) } + + /// Builds a update flow name transaction. + /// It's expected that the `__flow/name/{catalog}/{flow_name}` IS already occupied. + /// Otherwise, the transaction will retrieve existing value(and fail). + pub fn build_update_txn( + &self, + catalog_name: &str, + flow_name: &str, + flow_id: FlowId, + ) -> Result<( + Txn, + impl FnOnce(&mut TxnOpGetResponseSet) -> FlowNameDecodeResult, + )> { + let key = FlowNameKey::new(catalog_name, flow_name); + let raw_key = key.to_bytes(); + let flow_flow_name_value = FlowNameValue::new(flow_id); + let txn = Txn::new() + .when(vec![Compare::new( + raw_key.clone(), + CompareOp::NotEqual, + None, + )]) + .and_then(vec![TxnOp::Put( + raw_key.clone(), + flow_flow_name_value.try_as_raw_value()?, + )]) + .or_else(vec![TxnOp::Get(raw_key.clone())]); + + Ok(( + txn, + TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)), + )) + } } #[cfg(test)] diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index dadc99f8ec5d..64d53888e7fd 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -50,7 +50,10 @@ use crate::adapter::util::column_schemas_to_proto; use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; use crate::compute::ErrCollector; use crate::df_optimizer::sql_to_flow_plan; -use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu}; +use crate::error::{ + EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, TableNotFoundSnafu, + UnexpectedSnafu, +}; use crate::expr::{Batch, GlobalId}; use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS}; use crate::repr::{self, DiffRow, Row, BATCH_SIZE}; @@ -673,6 +676,20 @@ impl FlowWorkerManager { } } +#[derive(Debug, Clone)] +pub struct CreateFlowArgs { + pub flow_id: FlowId, + pub sink_table_name: TableName, + pub source_table_ids: Vec, + pub create_if_not_exists: bool, + pub or_replace: bool, + pub expire_after: Option, + pub comment: Option, + pub sql: String, + pub flow_options: HashMap, + pub query_ctx: Option, +} + /// Create&Remove flow impl FlowWorkerManager { /// remove a flow by it's id @@ -694,18 +711,47 @@ impl FlowWorkerManager { /// 1. parse query into typed plan(and optional parse expire_after expr) /// 2. render source/sink with output table id and used input table id #[allow(clippy::too_many_arguments)] - pub async fn create_flow( - &self, - flow_id: FlowId, - sink_table_name: TableName, - source_table_ids: &[TableId], - create_if_not_exists: bool, - expire_after: Option, - comment: Option, - sql: String, - flow_options: HashMap, - query_ctx: Option, - ) -> Result, Error> { + pub async fn create_flow(&self, args: CreateFlowArgs) -> Result, Error> { + let CreateFlowArgs { + flow_id, + sink_table_name, + source_table_ids, + create_if_not_exists, + or_replace, + expire_after, + comment, + sql, + flow_options, + query_ctx, + } = args; + + let already_exist = { + let mut flag = false; + + // check if the task already exists + for handle in self.worker_handles.iter() { + if handle.lock().await.contains_flow(flow_id).await? { + flag = true; + } + } + flag + }; + match (create_if_not_exists, or_replace, already_exist) { + // do replace + (_, true, true) => { + info!("Replacing flow with id={}", flow_id); + self.remove_flow(flow_id).await?; + } + (false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?, + // do nothing if exists + (true, false, true) => { + info!("Flow with id={} already exists, do nothing", flow_id); + return Ok(None); + } + // create if not exists + (_, _, false) => (), + } + if create_if_not_exists { // check if the task already exists for handle in self.worker_handles.iter() { @@ -717,7 +763,7 @@ impl FlowWorkerManager { let mut node_ctx = self.node_context.write().await; // assign global id to source and sink table - for source in source_table_ids { + for source in &source_table_ids { node_ctx .assign_global_id_to_table(&self.table_info_source, None, Some(*source)) .await?; @@ -726,7 +772,7 @@ impl FlowWorkerManager { .assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None) .await?; - node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_name.clone()); + node_ctx.register_task_src_sink(flow_id, &source_table_ids, sink_table_name.clone()); node_ctx.query_context = query_ctx.map(Arc::new); // construct a active dataflow state with it diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 8e5a02858820..a438cf4ecca9 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -28,7 +28,7 @@ use itertools::Itertools; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; -use crate::adapter::FlowWorkerManager; +use crate::adapter::{CreateFlowArgs, FlowWorkerManager}; use crate::error::InternalSnafu; use crate::metrics::METRIC_FLOW_TASK_COUNT; use crate::repr::{self, DiffRow}; @@ -57,7 +57,7 @@ impl Flownode for FlowWorkerManager { comment, sql, flow_options, - or_replace: _, + or_replace, })) => { let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec(); let sink_table_name = [ @@ -66,20 +66,19 @@ impl Flownode for FlowWorkerManager { sink_table_name.table_name, ]; let expire_after = expire_after.map(|e| e.value); - let ret = self - .create_flow( - task_id.id as u64, - sink_table_name, - &source_table_ids, - create_if_not_exists, - expire_after, - Some(comment), - sql, - flow_options, - query_ctx, - ) - .await - .map_err(to_meta_err)?; + let args = CreateFlowArgs { + flow_id: task_id.id as u64, + sink_table_name, + source_table_ids: source_table_ids, + create_if_not_exists, + or_replace, + expire_after, + comment: Some(comment), + sql, + flow_options, + query_ctx, + }; + let ret = self.create_flow(args).await.map_err(to_meta_err)?; METRIC_FLOW_TASK_COUNT.inc(); Ok(FlowResponse { affected_flows: ret diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index c9d75a929922..8d639cc03470 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -48,7 +48,7 @@ use tonic::codec::CompressionEncoding; use tonic::transport::server::TcpIncoming; use tonic::{Request, Response, Status}; -use crate::adapter::FlowWorkerManagerRef; +use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef}; use crate::error::{ CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, @@ -355,23 +355,23 @@ impl FlownodeBuilder { info.sink_table_name().schema_name.clone(), info.sink_table_name().table_name.clone(), ]; - manager - .create_flow( - flow_id as _, - sink_table_name, - info.source_table_ids(), - true, - info.expire_after(), - Some(info.comment().clone()), - info.raw_sql().clone(), - info.options().clone(), - Some( - QueryContextBuilder::default() - .current_catalog(info.catalog_name().clone()) - .build(), - ), - ) - .await?; + let args = CreateFlowArgs { + flow_id: flow_id as _, + sink_table_name, + source_table_ids: info.source_table_ids().to_vec(), + create_if_not_exists: true, + or_replace: true, + expire_after: info.expire_after(), + comment: Some(info.comment().clone()), + sql: info.raw_sql().clone(), + flow_options: info.options().clone(), + query_ctx: Some( + QueryContextBuilder::default() + .current_catalog(info.catalog_name().clone()) + .build(), + ), + }; + manager.create_flow(args).await?; } Ok(cnt) diff --git a/tests/cases/standalone/common/flow/show_create_flow.result b/tests/cases/standalone/common/flow/show_create_flow.result index a91930ee7620..f622fc55f95f 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.result +++ b/tests/cases/standalone/common/flow/show_create_flow.result @@ -67,6 +67,149 @@ SHOW FLOWS LIKE 'filter_numbers_show'; ++ ++ +-- also test `CREATE OR REPLACE` and `IF NOT EXISTS` +-- (flow exists, replace, if not exists)=(false, false, false) +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 10; + +Affected Rows: 0 + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + ++---------------------+---------------+---------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+---------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number FROM numbers_input_show WHERE number > 10 | ++---------------------+---------------+---------------------------------------------------------+ + +-- this one should error out +-- (flow exists, replace, if not exists)=(true, false, false) +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 15; + +Error: 8000(FlowAlreadyExists), Flow already exists: greptime.filter_numbers_show + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + ++---------------------+---------------+---------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+---------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number FROM numbers_input_show WHERE number > 10 | ++---------------------+---------------+---------------------------------------------------------+ + +-- after this one, the flow SHOULD NOT be replaced +-- (flow exists, replace, if not exists)=(true, false, true) +CREATE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 5; + +Affected Rows: 0 + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + ++---------------------+---------------+---------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+---------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number FROM numbers_input_show WHERE number > 10 | ++---------------------+---------------+---------------------------------------------------------+ + +-- makesure it's not replaced in flownode +INSERT INTO numbers_input_show VALUES (10, now()); + +Affected Rows: 1 + +SELECT number FROM out_num_cnt_show; + +++ +++ + +-- after this, the flow SHOULD be replaced +-- (flow exists, replace, if not exists)=(true, true, false) +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 3; + +Affected Rows: 0 + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + ++---------------------+---------------+---------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+---------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number FROM numbers_input_show WHERE number > 10 | ++---------------------+---------------+---------------------------------------------------------+ + +-- makesure it's replaced in flownode +INSERT INTO numbers_input_show VALUES (10, now()); + +Affected Rows: 1 + +SELECT number FROM out_num_cnt_show; + +++ +++ + +-- after this, the flow SHOULD error out since having both `replace` and `if not exists` +-- (flow exists, replace, if not exists)=(true, true, true) +CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 0; + +Error: 1001(Unsupported), Unsupported operation Create flow with both `IF NOT EXISTS` and `OR REPLACE` + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + ++---------------------+---------------+---------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+---------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number FROM numbers_input_show WHERE number > 10 | ++---------------------+---------------+---------------------------------------------------------+ + +DROP FLOW filter_numbers_show; + +Affected Rows: 0 + +-- following always create since didn't exist +-- (flow exists, replace, if not exists)=(false, true, true) +CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > -1; + +Error: 1001(Unsupported), Unsupported operation Create flow with both `IF NOT EXISTS` and `OR REPLACE` + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +++ +++ + +DROP FLOW filter_numbers_show; + +Error: 8001(FlowNotFound), Flow not found: greptime.filter_numbers_show + +-- (flow exists, replace, if not exists)=(false, true, false) +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > -2; + +Affected Rows: 0 + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + ++---------------------+---------------+---------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+---------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number FROM numbers_input_show WHERE number > -2 | ++---------------------+---------------+---------------------------------------------------------+ + +DROP FLOW filter_numbers_show; + +Affected Rows: 0 + +-- (flow exists, replace, if not exists)=(false, false, true) +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > -3; + +Affected Rows: 0 + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + ++---------------------+---------------+---------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+---------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number FROM numbers_input_show WHERE number > -3 | ++---------------------+---------------+---------------------------------------------------------+ + +DROP FLOW filter_numbers_show; + +Affected Rows: 0 + drop table out_num_cnt_show; Affected Rows: 0 diff --git a/tests/cases/standalone/common/flow/show_create_flow.sql b/tests/cases/standalone/common/flow/show_create_flow.sql index 3cf84ed0f335..87a601955164 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.sql +++ b/tests/cases/standalone/common/flow/show_create_flow.sql @@ -27,6 +27,71 @@ SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS W SHOW FLOWS LIKE 'filter_numbers_show'; +-- also test `CREATE OR REPLACE` and `IF NOT EXISTS` + +-- (flow exists, replace, if not exists)=(false, false, false) +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 10; + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +-- this one should error out +-- (flow exists, replace, if not exists)=(true, false, false) +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 15; + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +-- after this one, the flow SHOULD NOT be replaced +-- (flow exists, replace, if not exists)=(true, false, true) +CREATE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 5; + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +-- makesure it's not replaced in flownode +INSERT INTO numbers_input_show VALUES (10, now()); + +SELECT number FROM out_num_cnt_show; + +-- after this, the flow SHOULD be replaced +-- (flow exists, replace, if not exists)=(true, true, false) +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 3; + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +-- makesure it's replaced in flownode +INSERT INTO numbers_input_show VALUES (10, now()); + +SELECT number FROM out_num_cnt_show; + +-- after this, the flow SHOULD error out since having both `replace` and `if not exists` +-- (flow exists, replace, if not exists)=(true, true, true) +CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 0; + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +DROP FLOW filter_numbers_show; + +-- following always create since didn't exist +-- (flow exists, replace, if not exists)=(false, true, true) +CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > -1; + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +DROP FLOW filter_numbers_show; + +-- (flow exists, replace, if not exists)=(false, true, false) +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > -2; + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +DROP FLOW filter_numbers_show; + +-- (flow exists, replace, if not exists)=(false, false, true) +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > -3; + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +DROP FLOW filter_numbers_show; + drop table out_num_cnt_show; drop table numbers_input_show; From 9a7fb985bb6e4c4f2acc253c626f3d65e2c167ef Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 15 Nov 2024 16:01:44 +0800 Subject: [PATCH 2/7] refactor: better show create flow&tests: better check --- src/common/meta/src/ddl/create_flow.rs | 15 +- src/flow/src/adapter/flownode_impl.rs | 2 +- src/query/src/sql.rs | 4 +- .../common/flow/show_create_flow.result | 167 ++++++++++++------ .../common/flow/show_create_flow.sql | 44 +++-- 5 files changed, 155 insertions(+), 77 deletions(-) diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 951c47e470ae..0d984ceae622 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -102,6 +102,7 @@ impl CreateFlowProcedure { .await?; if create_if_not_exists && or_replace { + // this is forbidden because not clear what does that mean exactly return error::UnsupportedSnafu { operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`".to_string(), } @@ -182,7 +183,7 @@ impl CreateFlowProcedure { .map_err(add_peer_context_if_needed(peer.clone())) }); } - + info!("Creating flow({}) on flownodes", self.data.flow_id.unwrap()); join_all(create_flow) .await .into_iter() @@ -219,6 +220,7 @@ impl CreateFlowProcedure { } async fn on_broadcast(&mut self) -> Result { + debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache); // Safety: The flow id must be allocated. let flow_id = self.data.flow_id.unwrap(); let ctx = Context { @@ -229,10 +231,13 @@ impl CreateFlowProcedure { .cache_invalidator .invalidate( &ctx, - &[CacheIdent::CreateFlow(CreateFlow { - source_table_ids: self.data.source_table_ids.clone(), - flownodes: self.data.peers.clone(), - })], + &[ + CacheIdent::CreateFlow(CreateFlow { + source_table_ids: self.data.source_table_ids.clone(), + flownodes: self.data.peers.clone(), + }), + CacheIdent::FlowId(flow_id), + ], ) .await?; diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index a438cf4ecca9..3841d08914c5 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -69,7 +69,7 @@ impl Flownode for FlowWorkerManager { let args = CreateFlowArgs { flow_id: task_id.id as u64, sink_table_name, - source_table_ids: source_table_ids, + source_table_ids, create_if_not_exists, or_replace, expire_after, diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index 172961d50a1f..062bd8e14e18 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -865,7 +865,9 @@ pub fn show_create_flow( value: flow_val.sink_table_name().table_name.clone(), quote_style: None, }]), - or_replace: true, + // notice we don't want `OR REPLACE` and `IF NOT EXISTS` in same sql since it's unclear what to do + // so we set `or_replace` to false. + or_replace: false, if_not_exists: true, expire_after: flow_val.expire_after(), comment, diff --git a/tests/cases/standalone/common/flow/show_create_flow.result b/tests/cases/standalone/common/flow/show_create_flow.result index f622fc55f95f..5ef5b0a83d76 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.result +++ b/tests/cases/standalone/common/flow/show_create_flow.result @@ -9,7 +9,9 @@ Affected Rows: 0 create table out_num_cnt_show ( number INT, - ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX, + PRIMARY KEY(number), +); Affected Rows: 0 @@ -32,7 +34,7 @@ SHOW CREATE FLOW filter_numbers_show; +---------------------+------------------------------------------------------------+ | Flow | Create Flow | +---------------------+------------------------------------------------------------+ -| filter_numbers_show | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show | +| filter_numbers_show | CREATE FLOW IF NOT EXISTS filter_numbers_show | | | SINK TO out_num_cnt_show | | | AS SELECT number FROM numbers_input_show WHERE number > 10 | +---------------------+------------------------------------------------------------+ @@ -69,101 +71,153 @@ SHOW FLOWS LIKE 'filter_numbers_show'; -- also test `CREATE OR REPLACE` and `IF NOT EXISTS` -- (flow exists, replace, if not exists)=(false, false, false) -CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 10; +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 10; Affected Rows: 0 SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+---------------------+---------------+---------------------------------------------------------+ -| flow_name | table_catalog | flow_definition | -+---------------------+---------------+---------------------------------------------------------+ -| filter_numbers_show | greptime | SELECT number FROM numbers_input_show WHERE number > 10 | -+---------------------+---------------+---------------------------------------------------------+ ++---------------------+---------------+-------------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+-------------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > 10 | ++---------------------+---------------+-------------------------------------------------------------+ -- this one should error out -- (flow exists, replace, if not exists)=(true, false, false) -CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 15; +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 15; Error: 8000(FlowAlreadyExists), Flow already exists: greptime.filter_numbers_show SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+---------------------+---------------+---------------------------------------------------------+ -| flow_name | table_catalog | flow_definition | -+---------------------+---------------+---------------------------------------------------------+ -| filter_numbers_show | greptime | SELECT number FROM numbers_input_show WHERE number > 10 | -+---------------------+---------------+---------------------------------------------------------+ ++---------------------+---------------+-------------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+-------------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > 10 | ++---------------------+---------------+-------------------------------------------------------------+ + +-- makesure it's not replaced in flownode +INSERT INTO numbers_input_show VALUES (10, 0),(15, 1),(16, 2); + +Affected Rows: 3 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + ++-----------------------------------------+ +| ADMIN FLUSH_FLOW('filter_numbers_show') | ++-----------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------+ + +SELECT number, ts FROM out_num_cnt_show; + ++--------+-------------------------+ +| number | ts | ++--------+-------------------------+ +| 15 | 1970-01-01T00:00:00.001 | +| 16 | 1970-01-01T00:00:00.002 | ++--------+-------------------------+ -- after this one, the flow SHOULD NOT be replaced -- (flow exists, replace, if not exists)=(true, false, true) -CREATE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 5; +CREATE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 5; Affected Rows: 0 SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+---------------------+---------------+---------------------------------------------------------+ -| flow_name | table_catalog | flow_definition | -+---------------------+---------------+---------------------------------------------------------+ -| filter_numbers_show | greptime | SELECT number FROM numbers_input_show WHERE number > 10 | -+---------------------+---------------+---------------------------------------------------------+ ++---------------------+---------------+-------------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+-------------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > 10 | ++---------------------+---------------+-------------------------------------------------------------+ -- makesure it's not replaced in flownode -INSERT INTO numbers_input_show VALUES (10, now()); +INSERT INTO numbers_input_show VALUES (4,4),(5,4),(10, 3),(11, 4); -Affected Rows: 1 +Affected Rows: 4 -SELECT number FROM out_num_cnt_show; +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); -++ -++ ++-----------------------------------------+ +| ADMIN FLUSH_FLOW('filter_numbers_show') | ++-----------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------+ + +SELECT number, ts FROM out_num_cnt_show; + ++--------+-------------------------+ +| number | ts | ++--------+-------------------------+ +| 11 | 1970-01-01T00:00:00.004 | +| 15 | 1970-01-01T00:00:00.001 | +| 16 | 1970-01-01T00:00:00.002 | ++--------+-------------------------+ -- after this, the flow SHOULD be replaced -- (flow exists, replace, if not exists)=(true, true, false) -CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 3; +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 3; Affected Rows: 0 SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+---------------------+---------------+---------------------------------------------------------+ -| flow_name | table_catalog | flow_definition | -+---------------------+---------------+---------------------------------------------------------+ -| filter_numbers_show | greptime | SELECT number FROM numbers_input_show WHERE number > 10 | -+---------------------+---------------+---------------------------------------------------------+ ++---------------------+---------------+------------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+------------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > 3 | ++---------------------+---------------+------------------------------------------------------------+ -- makesure it's replaced in flownode -INSERT INTO numbers_input_show VALUES (10, now()); +INSERT INTO numbers_input_show VALUES (3, 1),(4, 2),(10, 3),(11, 4); -Affected Rows: 1 +Affected Rows: 4 -SELECT number FROM out_num_cnt_show; +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); -++ -++ ++-----------------------------------------+ +| ADMIN FLUSH_FLOW('filter_numbers_show') | ++-----------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------+ + +SELECT number, ts FROM out_num_cnt_show; + ++--------+-------------------------+ +| number | ts | ++--------+-------------------------+ +| 4 | 1970-01-01T00:00:00.002 | +| 10 | 1970-01-01T00:00:00.003 | +| 11 | 1970-01-01T00:00:00.004 | +| 15 | 1970-01-01T00:00:00.001 | +| 16 | 1970-01-01T00:00:00.002 | ++--------+-------------------------+ -- after this, the flow SHOULD error out since having both `replace` and `if not exists` -- (flow exists, replace, if not exists)=(true, true, true) -CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 0; +CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 0; Error: 1001(Unsupported), Unsupported operation Create flow with both `IF NOT EXISTS` and `OR REPLACE` SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+---------------------+---------------+---------------------------------------------------------+ -| flow_name | table_catalog | flow_definition | -+---------------------+---------------+---------------------------------------------------------+ -| filter_numbers_show | greptime | SELECT number FROM numbers_input_show WHERE number > 10 | -+---------------------+---------------+---------------------------------------------------------+ ++---------------------+---------------+------------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+------------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > 3 | ++---------------------+---------------+------------------------------------------------------------+ DROP FLOW filter_numbers_show; Affected Rows: 0 --- following always create since didn't exist -- (flow exists, replace, if not exists)=(false, true, true) -CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > -1; +CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -1; Error: 1001(Unsupported), Unsupported operation Create flow with both `IF NOT EXISTS` and `OR REPLACE` @@ -176,35 +230,36 @@ DROP FLOW filter_numbers_show; Error: 8001(FlowNotFound), Flow not found: greptime.filter_numbers_show +-- following always create since didn't exist -- (flow exists, replace, if not exists)=(false, true, false) -CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > -2; +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -2; Affected Rows: 0 SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+---------------------+---------------+---------------------------------------------------------+ -| flow_name | table_catalog | flow_definition | -+---------------------+---------------+---------------------------------------------------------+ -| filter_numbers_show | greptime | SELECT number FROM numbers_input_show WHERE number > -2 | -+---------------------+---------------+---------------------------------------------------------+ ++---------------------+---------------+-------------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+-------------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > -2 | ++---------------------+---------------+-------------------------------------------------------------+ DROP FLOW filter_numbers_show; Affected Rows: 0 -- (flow exists, replace, if not exists)=(false, false, true) -CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > -3; +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -3; Affected Rows: 0 SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+---------------------+---------------+---------------------------------------------------------+ -| flow_name | table_catalog | flow_definition | -+---------------------+---------------+---------------------------------------------------------+ -| filter_numbers_show | greptime | SELECT number FROM numbers_input_show WHERE number > -3 | -+---------------------+---------------+---------------------------------------------------------+ ++---------------------+---------------+-------------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+-------------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > -3 | ++---------------------+---------------+-------------------------------------------------------------+ DROP FLOW filter_numbers_show; diff --git a/tests/cases/standalone/common/flow/show_create_flow.sql b/tests/cases/standalone/common/flow/show_create_flow.sql index 87a601955164..887558613857 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.sql +++ b/tests/cases/standalone/common/flow/show_create_flow.sql @@ -6,7 +6,9 @@ CREATE TABLE numbers_input_show ( ); create table out_num_cnt_show ( number INT, - ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX, + PRIMARY KEY(number), +); SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; @@ -30,63 +32,77 @@ SHOW FLOWS LIKE 'filter_numbers_show'; -- also test `CREATE OR REPLACE` and `IF NOT EXISTS` -- (flow exists, replace, if not exists)=(false, false, false) -CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 10; +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 10; SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -- this one should error out -- (flow exists, replace, if not exists)=(true, false, false) -CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 15; +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 15; SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; +-- makesure it's not replaced in flownode +INSERT INTO numbers_input_show VALUES (10, 0),(15, 1),(16, 2); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + +SELECT number, ts FROM out_num_cnt_show; + -- after this one, the flow SHOULD NOT be replaced -- (flow exists, replace, if not exists)=(true, false, true) -CREATE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 5; +CREATE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 5; SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -- makesure it's not replaced in flownode -INSERT INTO numbers_input_show VALUES (10, now()); +INSERT INTO numbers_input_show VALUES (4,4),(5,4),(10, 3),(11, 4); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); -SELECT number FROM out_num_cnt_show; +SELECT number, ts FROM out_num_cnt_show; -- after this, the flow SHOULD be replaced -- (flow exists, replace, if not exists)=(true, true, false) -CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 3; +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 3; SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -- makesure it's replaced in flownode -INSERT INTO numbers_input_show VALUES (10, now()); +INSERT INTO numbers_input_show VALUES (3, 1),(4, 2),(10, 3),(11, 4); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); -SELECT number FROM out_num_cnt_show; +SELECT number, ts FROM out_num_cnt_show; -- after this, the flow SHOULD error out since having both `replace` and `if not exists` -- (flow exists, replace, if not exists)=(true, true, true) -CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 0; +CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 0; SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; DROP FLOW filter_numbers_show; --- following always create since didn't exist -- (flow exists, replace, if not exists)=(false, true, true) -CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > -1; +CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -1; SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; DROP FLOW filter_numbers_show; +-- following always create since didn't exist -- (flow exists, replace, if not exists)=(false, true, false) -CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > -2; +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -2; SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; DROP FLOW filter_numbers_show; -- (flow exists, replace, if not exists)=(false, false, true) -CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > -3; +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -3; SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; From 12dd5456d06ca5823870e6eae8ac4b8263f061df Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 15 Nov 2024 16:28:33 +0800 Subject: [PATCH 3/7] tests: sqlness result update --- tests/cases/standalone/common/flow/flow_basic.result | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index d3cc7f66800e..4c6095d2529c 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -259,7 +259,7 @@ SHOW CREATE FLOW filter_numbers_basic; +----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ | Flow | Create Flow | +----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ -| filter_numbers_basic | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_basic | +| filter_numbers_basic | CREATE FLOW IF NOT EXISTS filter_numbers_basic | | | SINK TO out_num_cnt_basic | | | AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic WHERE number > 10 | +----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ From e3c9e2e3bffbdf5432a7a0210acd6608a98fada5 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 18 Nov 2024 15:10:56 +0800 Subject: [PATCH 4/7] tests: unit test for update --- src/common/meta/src/ddl/create_flow.rs | 31 ++- src/common/meta/src/key/flow.rs | 246 +++++++++++++++++++++- src/common/meta/src/key/flow/flow_info.rs | 16 +- src/common/meta/src/key/flow/flow_name.rs | 11 +- 4 files changed, 277 insertions(+), 27 deletions(-) diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 0d984ceae622..1ea5837019a5 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -76,7 +76,7 @@ impl CreateFlowProcedure { source_table_ids: vec![], query_context, state: CreateFlowState::Prepare, - flow_already_exists: false, + prev_flow_info_value: None, }, } } @@ -135,7 +135,22 @@ impl CreateFlowProcedure { self.data.flow_id = Some(flow_id); self.data.peers = peers; info!("Replacing flow, flow_id: {}", flow_id); - self.data.flow_already_exists = true; + + let flow_info_value = self + .context + .flow_metadata_manager + .flow_info_manager() + .get(flow_id) + .await?; + + ensure!( + flow_info_value.is_some(), + error::UnexpectedSnafu { + err_msg: format!("Flow info value not found for flow_id: {}", flow_id) + } + ); + + self.data.prev_flow_info_value = flow_info_value; } // Ensures sink table doesn't exist. @@ -183,7 +198,10 @@ impl CreateFlowProcedure { .map_err(add_peer_context_if_needed(peer.clone())) }); } - info!("Creating flow({}) on flownodes", self.data.flow_id.unwrap()); + info!( + "Creating flow({:?}) on flownodes with peers={:?}", + self.data.flow_id, self.data.peers + ); join_all(create_flow) .await .into_iter() @@ -201,10 +219,10 @@ impl CreateFlowProcedure { // Safety: The flow id must be allocated. let flow_id = self.data.flow_id.unwrap(); let (flow_info, flow_routes) = (&self.data).into(); - if self.data.flow_already_exists { + if let Some(prev_flow_value) = self.data.prev_flow_info_value.as_ref() { self.context .flow_metadata_manager - .update_flow_metadata(flow_id, flow_info, flow_routes) + .update_flow_metadata(flow_id, flow_info, prev_flow_value.clone(), flow_routes) .await?; info!("Replaced flow metadata for flow {flow_id}"); } else { @@ -312,7 +330,8 @@ pub struct CreateFlowData { pub(crate) peers: Vec, pub(crate) source_table_ids: Vec, pub(crate) query_context: QueryContext, - pub(crate) flow_already_exists: bool, + /// For verify if prev value is consistent when need to update flow metadata. + pub(crate) prev_flow_info_value: Option, } impl From<&CreateFlowData> for CreateRequest { diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index e92255d0afe9..461b6dee2afe 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -235,15 +235,16 @@ impl FlowMetadataManager { &self, flow_id: FlowId, flow_info: FlowInfoValue, + prev_flow_info: FlowInfoValue, flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>, ) -> Result<()> { let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = self .flow_name_manager .build_update_txn(&flow_info.catalog_name, &flow_info.flow_name, flow_id)?; - let (create_flow_txn, on_create_flow_failure) = self - .flow_info_manager - .build_update_txn(flow_id, &flow_info)?; + let (create_flow_txn, on_create_flow_failure) = + self.flow_info_manager + .build_update_txn(flow_id, &flow_info, &prev_flow_info)?; let create_flow_routes_txn = self .flow_route_manager @@ -284,33 +285,38 @@ impl FlowMetadataManager { on_create_flow_flow_name_failure(&mut set)?.with_context(|| { error::UnexpectedSnafu { err_msg: format!( - "Reads the empty flow name during the creating flow, flow_id: {flow_id}" + "Reads the empty flow name during the updating flow, flow_id: {flow_id}" ), } })?; if remote_flow_flow_name.flow_id() != flow_id { info!( - "Trying to create flow {}.{}({}), but flow({}) already exists", + "Trying to updating flow {}.{}({}), but flow({}) already exists with a different flow id", flow_info.catalog_name, flow_info.flow_name, flow_id, remote_flow_flow_name.flow_id() ); - return error::FlowAlreadyExistsSnafu { - flow_name: format!("{}.{}", flow_info.catalog_name, flow_info.flow_name), - } - .fail(); + return error::UnexpectedSnafu { + err_msg: format!( + "Reads different flow id when updating flow({2}.{3}), prev flow id = {0}, updating with flow id = {1}", + remote_flow_flow_name.flow_id(), + flow_id, + flow_info.catalog_name, + flow_info.flow_name, + ), + }.fail(); } let remote_flow = on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu { err_msg: format!( - "Reads the empty flow during the creating flow, flow_id: {flow_id}" + "Reads the empty flow during the updating flow, flow_id: {flow_id}" ), })?; - let op_name = "creating flow"; + let op_name = "updating flow"; ensure_values!(*remote_flow, flow_info, op_name); } @@ -647,4 +653,222 @@ mod tests { // Ensures all keys are deleted assert!(mem_kv.is_empty()) } + + #[tokio::test] + async fn test_update_flow_metadata() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone()); + let flow_id = 10; + let flow_value = test_flow_info_value( + "flow", + [(0, 1u64), (1, 2u64)].into(), + vec![1024, 1025, 1026], + ); + let flow_routes = vec![ + ( + 1u32, + FlowRouteValue { + peer: Peer::empty(1), + }, + ), + ( + 2, + FlowRouteValue { + peer: Peer::empty(2), + }, + ), + ]; + flow_metadata_manager + .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone()) + .await + .unwrap(); + + let new_flow_value = { + let mut tmp = flow_value.clone(); + tmp.raw_sql = "new".to_string(); + tmp + }; + + // Update flow instead + flow_metadata_manager + .update_flow_metadata( + flow_id, + new_flow_value.clone(), + flow_value.clone(), + flow_routes.clone(), + ) + .await + .unwrap(); + + let got = flow_metadata_manager + .flow_info_manager() + .get(flow_id) + .await + .unwrap() + .unwrap(); + let routes = flow_metadata_manager + .flow_route_manager() + .routes(flow_id) + .try_collect::>() + .await + .unwrap(); + assert_eq!( + routes, + vec![ + ( + FlowRouteKey::new(flow_id, 1), + FlowRouteValue { + peer: Peer::empty(1), + }, + ), + ( + FlowRouteKey::new(flow_id, 2), + FlowRouteValue { + peer: Peer::empty(2), + }, + ), + ] + ); + assert_eq!(got, new_flow_value); + let flows = flow_metadata_manager + .flownode_flow_manager() + .flows(1) + .try_collect::>() + .await + .unwrap(); + assert_eq!(flows, vec![(flow_id, 0)]); + for table_id in [1024, 1025, 1026] { + let nodes = flow_metadata_manager + .table_flow_manager() + .flows(table_id) + .try_collect::>() + .await + .unwrap(); + assert_eq!( + nodes, + vec![ + ( + TableFlowKey::new(table_id, 1, flow_id, 1), + TableFlowValue { + peer: Peer::empty(1) + } + ), + ( + TableFlowKey::new(table_id, 2, flow_id, 2), + TableFlowValue { + peer: Peer::empty(2) + } + ) + ] + ); + } + } + + #[tokio::test] + async fn test_update_flow_metadata_flow_replace_diff_id_err() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let flow_metadata_manager = FlowMetadataManager::new(mem_kv); + let flow_id = 10; + let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]); + let flow_routes = vec![ + ( + 1u32, + FlowRouteValue { + peer: Peer::empty(1), + }, + ), + ( + 2, + FlowRouteValue { + peer: Peer::empty(2), + }, + ), + ]; + flow_metadata_manager + .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone()) + .await + .unwrap(); + // update again with same flow id + flow_metadata_manager + .update_flow_metadata( + flow_id, + flow_value.clone(), + flow_value.clone(), + flow_routes.clone(), + ) + .await + .unwrap(); + // update again with wrong flow id, expected error + let err = flow_metadata_manager + .update_flow_metadata( + flow_id + 1, + flow_value.clone(), + flow_value.clone(), + flow_routes, + ) + .await + .unwrap_err(); + assert_matches!(err, error::Error::Unexpected { .. }); + assert!(err + .to_string() + .contains("Reads different flow id when updating flow")); + } + + #[tokio::test] + async fn test_update_flow_metadata_unexpected_err_prev_value_diff() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let flow_metadata_manager = FlowMetadataManager::new(mem_kv); + let flow_id = 10; + let catalog_name = "greptime"; + let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]); + let flow_routes = vec![ + ( + 1u32, + FlowRouteValue { + peer: Peer::empty(1), + }, + ), + ( + 2, + FlowRouteValue { + peer: Peer::empty(2), + }, + ), + ]; + flow_metadata_manager + .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone()) + .await + .unwrap(); + // Creates again. + let another_sink_table_name = TableName { + catalog_name: catalog_name.to_string(), + schema_name: "my_schema".to_string(), + table_name: "another_sink_table".to_string(), + }; + let flow_value = FlowInfoValue { + catalog_name: "greptime".to_string(), + flow_name: "flow".to_string(), + source_table_ids: vec![1024, 1025, 1026], + sink_table_name: another_sink_table_name, + flownode_ids: [(0, 1u64)].into(), + raw_sql: "raw".to_string(), + expire_after: Some(300), + comment: "hi".to_string(), + options: Default::default(), + }; + let err = flow_metadata_manager + .update_flow_metadata( + flow_id, + flow_value.clone(), + flow_value.clone(), + flow_routes.clone(), + ) + .await + .unwrap_err(); + assert!( + err.to_string().contains("Reads the different value"), + "error: {:?}", + err + ); + } } diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index c3f718870f9a..108bb5977afa 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -217,23 +217,27 @@ impl FlowInfoManager { } /// Builds a update flow transaction. - /// It is expected that the `__flow/info/{flow_id}` IS ALREADY occupied. + /// It is expected that the `__flow/info/{flow_id}` IS ALREADY occupied and equal to `prev_flow_value`, + /// but the new value can be the same, so to allow replace operation to happen even when the value is the same. /// Otherwise, the transaction will retrieve existing value and fail. pub(crate) fn build_update_txn( &self, flow_id: FlowId, flow_value: &FlowInfoValue, + prev_flow_value: &FlowInfoValue, ) -> Result<( Txn, impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult, )> { let key = FlowInfoKey::new(flow_id).to_bytes(); + let raw_value = flow_value.try_as_raw_value()?; + let prev_value = prev_flow_value.try_as_raw_value()?; let txn = Txn::new() - .when(vec![Compare::new(key.clone(), CompareOp::NotEqual, None)]) - .and_then(vec![TxnOp::Put( - key.clone(), - flow_value.try_as_raw_value()?, - )]) + .when(vec![ + Compare::new(key.clone(), CompareOp::NotEqual, None), + Compare::new(key.clone(), CompareOp::Equal, Some(prev_value)), + ]) + .and_then(vec![TxnOp::Put(key.clone(), raw_value.clone())]) .or_else(vec![TxnOp::Get(key.clone())]); Ok(( diff --git a/src/common/meta/src/key/flow/flow_name.rs b/src/common/meta/src/key/flow/flow_name.rs index 4a829851c50f..ae93e56e13bf 100644 --- a/src/common/meta/src/key/flow/flow_name.rs +++ b/src/common/meta/src/key/flow/flow_name.rs @@ -238,8 +238,9 @@ impl FlowNameManager { )) } - /// Builds a update flow name transaction. - /// It's expected that the `__flow/name/{catalog}/{flow_name}` IS already occupied. + /// Builds a update flow name transaction. Which doesn't change either the name or id, just checking if they are the same. + /// It's expected that the `__flow/name/{catalog}/{flow_name}` IS already occupied, + /// and both flow name and flow id is the same. /// Otherwise, the transaction will retrieve existing value(and fail). pub fn build_update_txn( &self, @@ -253,12 +254,14 @@ impl FlowNameManager { let key = FlowNameKey::new(catalog_name, flow_name); let raw_key = key.to_bytes(); let flow_flow_name_value = FlowNameValue::new(flow_id); + let raw_value = flow_flow_name_value.try_as_raw_value()?; let txn = Txn::new() .when(vec![Compare::new( raw_key.clone(), - CompareOp::NotEqual, - None, + CompareOp::Equal, + Some(raw_value), )]) + // TODO(discord9): make this a no op since it's the same value .and_then(vec![TxnOp::Put( raw_key.clone(), flow_flow_name_value.try_as_raw_value()?, From 10d417af4ef9f3ebb1d96b9927a940a909685960 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 18 Nov 2024 16:32:08 +0800 Subject: [PATCH 5/7] refactor: cmp with raw bytes --- src/common/meta/src/ddl/create_flow.rs | 17 +++++++++------ src/common/meta/src/key/flow.rs | 26 +++++++++++------------ src/common/meta/src/key/flow/flow_info.rs | 17 +++++++++++++-- src/common/meta/src/key/flow/flow_name.rs | 5 ----- 4 files changed, 38 insertions(+), 27 deletions(-) diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 1ea5837019a5..043d9915e75c 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -44,7 +44,7 @@ use crate::instruction::{CacheIdent, CreateFlow}; use crate::key::flow::flow_info::FlowInfoValue; use crate::key::flow::flow_route::FlowRouteValue; use crate::key::table_name::TableNameKey; -use crate::key::{FlowId, FlowPartitionId}; +use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId}; use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock}; use crate::peer::Peer; use crate::rpc::ddl::{CreateFlowTask, QueryContext}; @@ -140,13 +140,13 @@ impl CreateFlowProcedure { .context .flow_metadata_manager .flow_info_manager() - .get(flow_id) + .get_raw(flow_id) .await?; ensure!( flow_info_value.is_some(), - error::UnexpectedSnafu { - err_msg: format!("Flow info value not found for flow_id: {}", flow_id) + error::FlowNotFoundSnafu { + flow_name: format_full_flow_name(catalog_name, flow_name), } ); @@ -219,10 +219,12 @@ impl CreateFlowProcedure { // Safety: The flow id must be allocated. let flow_id = self.data.flow_id.unwrap(); let (flow_info, flow_routes) = (&self.data).into(); - if let Some(prev_flow_value) = self.data.prev_flow_info_value.as_ref() { + if let Some(prev_flow_value) = self.data.prev_flow_info_value.as_ref() + && self.data.task.or_replace + { self.context .flow_metadata_manager - .update_flow_metadata(flow_id, flow_info, prev_flow_value.clone(), flow_routes) + .update_flow_metadata(flow_id, &flow_info, prev_flow_value, flow_routes) .await?; info!("Replaced flow metadata for flow {flow_id}"); } else { @@ -331,7 +333,8 @@ pub struct CreateFlowData { pub(crate) source_table_ids: Vec, pub(crate) query_context: QueryContext, /// For verify if prev value is consistent when need to update flow metadata. - pub(crate) prev_flow_info_value: Option, + /// only set when `or_replace` is true. + pub(crate) prev_flow_info_value: Option>, } impl From<&CreateFlowData> for CreateRequest { diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index 461b6dee2afe..a10f909bc3cd 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -38,7 +38,7 @@ use crate::key::flow::flow_name::FlowNameManager; use crate::key::flow::flownode_flow::FlownodeFlowManager; pub use crate::key::flow::table_flow::{TableFlowManager, TableFlowManagerRef}; use crate::key::txn_helper::TxnOpGetResponseSet; -use crate::key::{FlowId, MetadataKey}; +use crate::key::{DeserializedValueWithBytes, FlowId, MetadataKey}; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; use crate::rpc::store::BatchDeleteRequest; @@ -234,8 +234,8 @@ impl FlowMetadataManager { pub async fn update_flow_metadata( &self, flow_id: FlowId, - flow_info: FlowInfoValue, - prev_flow_info: FlowInfoValue, + flow_info: &FlowInfoValue, + prev_flow_info: &DeserializedValueWithBytes, flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>, ) -> Result<()> { let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = self @@ -244,7 +244,7 @@ impl FlowMetadataManager { let (create_flow_txn, on_create_flow_failure) = self.flow_info_manager - .build_update_txn(flow_id, &flow_info, &prev_flow_info)?; + .build_update_txn(flow_id, flow_info, prev_flow_info)?; let create_flow_routes_txn = self .flow_route_manager @@ -317,7 +317,7 @@ impl FlowMetadataManager { ), })?; let op_name = "updating flow"; - ensure_values!(*remote_flow, flow_info, op_name); + ensure_values!(*remote_flow, flow_info.clone(), op_name); } Ok(()) @@ -693,8 +693,8 @@ mod tests { flow_metadata_manager .update_flow_metadata( flow_id, - new_flow_value.clone(), - flow_value.clone(), + &new_flow_value, + &DeserializedValueWithBytes::from_inner(flow_value.clone()), flow_routes.clone(), ) .await @@ -792,8 +792,8 @@ mod tests { flow_metadata_manager .update_flow_metadata( flow_id, - flow_value.clone(), - flow_value.clone(), + &flow_value, + &DeserializedValueWithBytes::from_inner(flow_value.clone()), flow_routes.clone(), ) .await @@ -802,8 +802,8 @@ mod tests { let err = flow_metadata_manager .update_flow_metadata( flow_id + 1, - flow_value.clone(), - flow_value.clone(), + &flow_value, + &DeserializedValueWithBytes::from_inner(flow_value.clone()), flow_routes, ) .await @@ -859,8 +859,8 @@ mod tests { let err = flow_metadata_manager .update_flow_metadata( flow_id, - flow_value.clone(), - flow_value.clone(), + &flow_value, + &DeserializedValueWithBytes::from_inner(flow_value.clone()), flow_routes.clone(), ) .await diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index 108bb5977afa..0e53d8240c64 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -196,6 +196,19 @@ impl FlowInfoManager { .transpose() } + /// Returns the [FlowInfoValue] with original bytes of specified `flow_id`. + pub async fn get_raw( + &self, + flow_id: FlowId, + ) -> Result>> { + let key = FlowInfoKey::new(flow_id).to_bytes(); + self.kv_backend + .get(&key) + .await? + .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value)) + .transpose() + } + /// Builds a create flow transaction. /// It is expected that the `__flow/info/{flow_id}` wasn't occupied. /// Otherwise, the transaction will retrieve existing value. @@ -224,14 +237,14 @@ impl FlowInfoManager { &self, flow_id: FlowId, flow_value: &FlowInfoValue, - prev_flow_value: &FlowInfoValue, + prev_flow_value: &DeserializedValueWithBytes, ) -> Result<( Txn, impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult, )> { let key = FlowInfoKey::new(flow_id).to_bytes(); let raw_value = flow_value.try_as_raw_value()?; - let prev_value = prev_flow_value.try_as_raw_value()?; + let prev_value = prev_flow_value.get_raw_bytes(); let txn = Txn::new() .when(vec![ Compare::new(key.clone(), CompareOp::NotEqual, None), diff --git a/src/common/meta/src/key/flow/flow_name.rs b/src/common/meta/src/key/flow/flow_name.rs index ae93e56e13bf..79c87c7360ea 100644 --- a/src/common/meta/src/key/flow/flow_name.rs +++ b/src/common/meta/src/key/flow/flow_name.rs @@ -261,11 +261,6 @@ impl FlowNameManager { CompareOp::Equal, Some(raw_value), )]) - // TODO(discord9): make this a no op since it's the same value - .and_then(vec![TxnOp::Put( - raw_key.clone(), - flow_flow_name_value.try_as_raw_value()?, - )]) .or_else(vec![TxnOp::Get(raw_key.clone())]); Ok(( From 8a94c5b3ea023ad5d97293aafee155c793db5e66 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 18 Nov 2024 16:54:23 +0800 Subject: [PATCH 6/7] refactor: rename --- src/common/meta/src/ddl/create_flow.rs | 2 +- src/common/meta/src/key/flow.rs | 41 ++++++++++++----------- src/common/meta/src/key/flow/flow_info.rs | 8 ++--- 3 files changed, 27 insertions(+), 24 deletions(-) diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 043d9915e75c..177bdf6b716a 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -224,7 +224,7 @@ impl CreateFlowProcedure { { self.context .flow_metadata_manager - .update_flow_metadata(flow_id, &flow_info, prev_flow_value, flow_routes) + .update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes) .await?; info!("Replaced flow metadata for flow {flow_id}"); } else { diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index a10f909bc3cd..24fed99b8fc5 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -234,17 +234,20 @@ impl FlowMetadataManager { pub async fn update_flow_metadata( &self, flow_id: FlowId, - flow_info: &FlowInfoValue, - prev_flow_info: &DeserializedValueWithBytes, + current_flow_info: &DeserializedValueWithBytes, + new_flow_info: &FlowInfoValue, flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>, ) -> Result<()> { - let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = self - .flow_name_manager - .build_update_txn(&flow_info.catalog_name, &flow_info.flow_name, flow_id)?; + let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = + self.flow_name_manager.build_update_txn( + &new_flow_info.catalog_name, + &new_flow_info.flow_name, + flow_id, + )?; let (create_flow_txn, on_create_flow_failure) = self.flow_info_manager - .build_update_txn(flow_id, flow_info, prev_flow_info)?; + .build_update_txn(flow_id, current_flow_info, new_flow_info)?; let create_flow_routes_txn = self .flow_route_manager @@ -252,7 +255,7 @@ impl FlowMetadataManager { let create_flownode_flow_txn = self .flownode_flow_manager - .build_create_txn(flow_id, flow_info.flownode_ids().clone()); + .build_create_txn(flow_id, new_flow_info.flownode_ids().clone()); let create_table_flow_txn = self.table_flow_manager.build_create_txn( flow_id, @@ -260,7 +263,7 @@ impl FlowMetadataManager { .into_iter() .map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer })) .collect(), - flow_info.source_table_ids(), + new_flow_info.source_table_ids(), )?; let txn = Txn::merge_all(vec![ @@ -272,8 +275,8 @@ impl FlowMetadataManager { ]); info!( "Creating flow {}.{}({}), with {} txn operations", - flow_info.catalog_name, - flow_info.flow_name, + new_flow_info.catalog_name, + new_flow_info.flow_name, flow_id, txn.max_operations() ); @@ -293,8 +296,8 @@ impl FlowMetadataManager { if remote_flow_flow_name.flow_id() != flow_id { info!( "Trying to updating flow {}.{}({}), but flow({}) already exists with a different flow id", - flow_info.catalog_name, - flow_info.flow_name, + new_flow_info.catalog_name, + new_flow_info.flow_name, flow_id, remote_flow_flow_name.flow_id() ); @@ -304,8 +307,8 @@ impl FlowMetadataManager { "Reads different flow id when updating flow({2}.{3}), prev flow id = {0}, updating with flow id = {1}", remote_flow_flow_name.flow_id(), flow_id, - flow_info.catalog_name, - flow_info.flow_name, + new_flow_info.catalog_name, + new_flow_info.flow_name, ), }.fail(); } @@ -317,7 +320,7 @@ impl FlowMetadataManager { ), })?; let op_name = "updating flow"; - ensure_values!(*remote_flow, flow_info.clone(), op_name); + ensure_values!(*remote_flow, new_flow_info.clone(), op_name); } Ok(()) @@ -693,8 +696,8 @@ mod tests { flow_metadata_manager .update_flow_metadata( flow_id, - &new_flow_value, &DeserializedValueWithBytes::from_inner(flow_value.clone()), + &new_flow_value, flow_routes.clone(), ) .await @@ -792,8 +795,8 @@ mod tests { flow_metadata_manager .update_flow_metadata( flow_id, - &flow_value, &DeserializedValueWithBytes::from_inner(flow_value.clone()), + &flow_value, flow_routes.clone(), ) .await @@ -802,8 +805,8 @@ mod tests { let err = flow_metadata_manager .update_flow_metadata( flow_id + 1, - &flow_value, &DeserializedValueWithBytes::from_inner(flow_value.clone()), + &flow_value, flow_routes, ) .await @@ -859,8 +862,8 @@ mod tests { let err = flow_metadata_manager .update_flow_metadata( flow_id, - &flow_value, &DeserializedValueWithBytes::from_inner(flow_value.clone()), + &flow_value, flow_routes.clone(), ) .await diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index 0e53d8240c64..0250ccdbdc7a 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -236,15 +236,15 @@ impl FlowInfoManager { pub(crate) fn build_update_txn( &self, flow_id: FlowId, - flow_value: &FlowInfoValue, - prev_flow_value: &DeserializedValueWithBytes, + current_flow_value: &DeserializedValueWithBytes, + new_flow_value: &FlowInfoValue, ) -> Result<( Txn, impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult, )> { let key = FlowInfoKey::new(flow_id).to_bytes(); - let raw_value = flow_value.try_as_raw_value()?; - let prev_value = prev_flow_value.get_raw_bytes(); + let raw_value = new_flow_value.try_as_raw_value()?; + let prev_value = current_flow_value.get_raw_bytes(); let txn = Txn::new() .when(vec![ Compare::new(key.clone(), CompareOp::NotEqual, None), From 5a73a9507751cd9355fe08418200859f1c220633 Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 19 Nov 2024 16:28:56 +0800 Subject: [PATCH 7/7] refactor: per review --- src/common/meta/src/key/flow/flow_info.rs | 2 +- src/flow/src/adapter.rs | 2 + src/flow/src/server.rs | 3 ++ .../common/flow/show_create_flow.result | 49 +++++++++++++++++++ .../common/flow/show_create_flow.sql | 14 ++++++ 5 files changed, 69 insertions(+), 1 deletion(-) diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index 0250ccdbdc7a..67d663625db3 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -250,7 +250,7 @@ impl FlowInfoManager { Compare::new(key.clone(), CompareOp::NotEqual, None), Compare::new(key.clone(), CompareOp::Equal, Some(prev_value)), ]) - .and_then(vec![TxnOp::Put(key.clone(), raw_value.clone())]) + .and_then(vec![TxnOp::Put(key.clone(), raw_value)]) .or_else(vec![TxnOp::Get(key.clone())]); Ok(( diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 64d53888e7fd..80d03e27706b 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -676,6 +676,7 @@ impl FlowWorkerManager { } } +/// The arguments to create a flow in [`FlowWorkerManager`]. #[derive(Debug, Clone)] pub struct CreateFlowArgs { pub flow_id: FlowId, @@ -732,6 +733,7 @@ impl FlowWorkerManager { for handle in self.worker_handles.iter() { if handle.lock().await.contains_flow(flow_id).await? { flag = true; + break; } } flag diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 8d639cc03470..87b6bbdc09ed 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -359,6 +359,9 @@ impl FlownodeBuilder { flow_id: flow_id as _, sink_table_name, source_table_ids: info.source_table_ids().to_vec(), + // because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist) + // but for the sake of consistency and to make sure recover of flow actually happen, we set both to true + // (which is also fine since checks for not allow both to be true is on metasrv and we already pass that) create_if_not_exists: true, or_replace: true, expire_after: info.expire_after(), diff --git a/tests/cases/standalone/common/flow/show_create_flow.result b/tests/cases/standalone/common/flow/show_create_flow.result index 5ef5b0a83d76..38fa609c960d 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.result +++ b/tests/cases/standalone/common/flow/show_create_flow.result @@ -261,6 +261,55 @@ SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS W | filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > -3 | +---------------------+---------------+-------------------------------------------------------------+ +-- makesure after recover should be the same +-- SQLNESS ARG restart=true +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + ++---------------------+---------------+-------------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+-------------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > -3 | ++---------------------+---------------+-------------------------------------------------------------+ + +SELECT * FROM out_num_cnt_show; + ++--------+-------------------------+ +| number | ts | ++--------+-------------------------+ +| 4 | 1970-01-01T00:00:00.002 | +| 10 | 1970-01-01T00:00:00.003 | +| 11 | 1970-01-01T00:00:00.004 | +| 15 | 1970-01-01T00:00:00.001 | +| 16 | 1970-01-01T00:00:00.002 | ++--------+-------------------------+ + +INSERT INTO numbers_input_show VALUES(-4,0), (-3,1), (-2,2), (-1,3); + +Affected Rows: 4 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + ++-----------------------------------------+ +| ADMIN FLUSH_FLOW('filter_numbers_show') | ++-----------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------+ + +SELECT * FROM out_num_cnt_show; + ++--------+-------------------------+ +| number | ts | ++--------+-------------------------+ +| -2 | 1970-01-01T00:00:00.002 | +| -1 | 1970-01-01T00:00:00.003 | +| 4 | 1970-01-01T00:00:00.002 | +| 10 | 1970-01-01T00:00:00.003 | +| 11 | 1970-01-01T00:00:00.004 | +| 15 | 1970-01-01T00:00:00.001 | +| 16 | 1970-01-01T00:00:00.002 | ++--------+-------------------------+ + DROP FLOW filter_numbers_show; Affected Rows: 0 diff --git a/tests/cases/standalone/common/flow/show_create_flow.sql b/tests/cases/standalone/common/flow/show_create_flow.sql index 887558613857..7348a83b5103 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.sql +++ b/tests/cases/standalone/common/flow/show_create_flow.sql @@ -106,6 +106,20 @@ CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT nu SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; +-- makesure after recover should be the same +-- SQLNESS ARG restart=true + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +SELECT * FROM out_num_cnt_show; + +INSERT INTO numbers_input_show VALUES(-4,0), (-3,1), (-2,2), (-1,3); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + +SELECT * FROM out_num_cnt_show; + DROP FLOW filter_numbers_show; drop table out_num_cnt_show;