diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index afaf606078c0..177bdf6b716a 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -28,6 +28,7 @@ use common_procedure::{ use common_telemetry::info; use common_telemetry::tracing_context::TracingContext; use futures::future::join_all; +use futures::TryStreamExt; use itertools::Itertools; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; @@ -43,7 +44,7 @@ use crate::instruction::{CacheIdent, CreateFlow}; use crate::key::flow::flow_info::FlowInfoValue; use crate::key::flow::flow_route::FlowRouteValue; use crate::key::table_name::TableNameKey; -use crate::key::{FlowId, FlowPartitionId}; +use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId}; use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock}; use crate::peer::Peer; use crate::rpc::ddl::{CreateFlowTask, QueryContext}; @@ -75,6 +76,7 @@ impl CreateFlowProcedure { source_table_ids: vec![], query_context, state: CreateFlowState::Prepare, + prev_flow_info_value: None, }, } } @@ -90,6 +92,7 @@ impl CreateFlowProcedure { let flow_name = &self.data.task.flow_name; let sink_table_name = &self.data.task.sink_table_name; let create_if_not_exists = self.data.task.create_if_not_exists; + let or_replace = self.data.task.or_replace; let flow_name_value = self .context @@ -98,16 +101,56 @@ impl CreateFlowProcedure { .get(catalog_name, flow_name) .await?; + if create_if_not_exists && or_replace { + // this is forbidden because not clear what does that mean exactly + return error::UnsupportedSnafu { + operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`".to_string(), + } + .fail(); + } + if let Some(value) = flow_name_value { ensure!( - create_if_not_exists, + create_if_not_exists || or_replace, error::FlowAlreadyExistsSnafu { flow_name: format_full_flow_name(catalog_name, flow_name), } ); let flow_id = value.flow_id(); - return Ok(Status::done_with_output(flow_id)); + if create_if_not_exists { + info!("Flow already exists, flow_id: {}", flow_id); + return Ok(Status::done_with_output(flow_id)); + } + + let flow_id = value.flow_id(); + let peers = self + .context + .flow_metadata_manager + .flow_route_manager() + .routes(flow_id) + .map_ok(|(_, value)| value.peer) + .try_collect::>() + .await?; + self.data.flow_id = Some(flow_id); + self.data.peers = peers; + info!("Replacing flow, flow_id: {}", flow_id); + + let flow_info_value = self + .context + .flow_metadata_manager + .flow_info_manager() + .get_raw(flow_id) + .await?; + + ensure!( + flow_info_value.is_some(), + error::FlowNotFoundSnafu { + flow_name: format_full_flow_name(catalog_name, flow_name), + } + ); + + self.data.prev_flow_info_value = flow_info_value; } // Ensures sink table doesn't exist. @@ -128,7 +171,9 @@ impl CreateFlowProcedure { } self.collect_source_tables().await?; - self.allocate_flow_id().await?; + if self.data.flow_id.is_none() { + self.allocate_flow_id().await?; + } self.data.state = CreateFlowState::CreateFlows; Ok(Status::executing(true)) @@ -153,7 +198,10 @@ impl CreateFlowProcedure { .map_err(add_peer_context_if_needed(peer.clone())) }); } - + info!( + "Creating flow({:?}) on flownodes with peers={:?}", + self.data.flow_id, self.data.peers + ); join_all(create_flow) .await .into_iter() @@ -170,18 +218,29 @@ impl CreateFlowProcedure { async fn on_create_metadata(&mut self) -> Result { // Safety: The flow id must be allocated. let flow_id = self.data.flow_id.unwrap(); - // TODO(weny): Support `or_replace`. let (flow_info, flow_routes) = (&self.data).into(); - self.context - .flow_metadata_manager - .create_flow_metadata(flow_id, flow_info, flow_routes) - .await?; - info!("Created flow metadata for flow {flow_id}"); + if let Some(prev_flow_value) = self.data.prev_flow_info_value.as_ref() + && self.data.task.or_replace + { + self.context + .flow_metadata_manager + .update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes) + .await?; + info!("Replaced flow metadata for flow {flow_id}"); + } else { + self.context + .flow_metadata_manager + .create_flow_metadata(flow_id, flow_info, flow_routes) + .await?; + info!("Created flow metadata for flow {flow_id}"); + } + self.data.state = CreateFlowState::InvalidateFlowCache; Ok(Status::executing(true)) } async fn on_broadcast(&mut self) -> Result { + debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache); // Safety: The flow id must be allocated. let flow_id = self.data.flow_id.unwrap(); let ctx = Context { @@ -192,10 +251,13 @@ impl CreateFlowProcedure { .cache_invalidator .invalidate( &ctx, - &[CacheIdent::CreateFlow(CreateFlow { - source_table_ids: self.data.source_table_ids.clone(), - flownodes: self.data.peers.clone(), - })], + &[ + CacheIdent::CreateFlow(CreateFlow { + source_table_ids: self.data.source_table_ids.clone(), + flownodes: self.data.peers.clone(), + }), + CacheIdent::FlowId(flow_id), + ], ) .await?; @@ -270,6 +332,9 @@ pub struct CreateFlowData { pub(crate) peers: Vec, pub(crate) source_table_ids: Vec, pub(crate) query_context: QueryContext, + /// For verify if prev value is consistent when need to update flow metadata. + /// only set when `or_replace` is true. + pub(crate) prev_flow_info_value: Option>, } impl From<&CreateFlowData> for CreateRequest { @@ -284,9 +349,9 @@ impl From<&CreateFlowData> for CreateRequest { .map(|table_id| api::v1::TableId { id: *table_id }) .collect_vec(), sink_table_name: Some(value.task.sink_table_name.clone().into()), - // Always be true + // Always be true to ensure idempotent in case of retry create_if_not_exists: true, - or_replace: true, + or_replace: value.task.or_replace, expire_after: value.task.expire_after.map(|value| ExpireAfter { value }), comment: value.task.comment.clone(), sql: value.task.sql.clone(), diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 1ee148406d04..617a8c574d7e 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -655,10 +655,17 @@ async fn handle_create_flow_task( procedure_id: &procedure_id, err_msg: "downcast to `u32`", })?); - info!( - "Flow {}.{}({flow_id}) is created via procedure_id {id:?}", - create_flow_task.catalog_name, create_flow_task.flow_name, - ); + if !create_flow_task.or_replace { + info!( + "Flow {}.{}({flow_id}) is created via procedure_id {id:?}", + create_flow_task.catalog_name, create_flow_task.flow_name, + ); + } else { + info!( + "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}", + create_flow_task.catalog_name, create_flow_task.flow_name, + ); + } Ok(SubmitDdlTaskResponse { key: procedure_id.into(), diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index 41a5058a331f..24fed99b8fc5 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -38,7 +38,7 @@ use crate::key::flow::flow_name::FlowNameManager; use crate::key::flow::flownode_flow::FlownodeFlowManager; pub use crate::key::flow::table_flow::{TableFlowManager, TableFlowManagerRef}; use crate::key::txn_helper::TxnOpGetResponseSet; -use crate::key::{FlowId, MetadataKey}; +use crate::key::{DeserializedValueWithBytes, FlowId, MetadataKey}; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; use crate::rpc::store::BatchDeleteRequest; @@ -230,6 +230,102 @@ impl FlowMetadataManager { Ok(()) } + /// Update metadata for flow and returns an error if old metadata IS NOT exists. + pub async fn update_flow_metadata( + &self, + flow_id: FlowId, + current_flow_info: &DeserializedValueWithBytes, + new_flow_info: &FlowInfoValue, + flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>, + ) -> Result<()> { + let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = + self.flow_name_manager.build_update_txn( + &new_flow_info.catalog_name, + &new_flow_info.flow_name, + flow_id, + )?; + + let (create_flow_txn, on_create_flow_failure) = + self.flow_info_manager + .build_update_txn(flow_id, current_flow_info, new_flow_info)?; + + let create_flow_routes_txn = self + .flow_route_manager + .build_create_txn(flow_id, flow_routes.clone())?; + + let create_flownode_flow_txn = self + .flownode_flow_manager + .build_create_txn(flow_id, new_flow_info.flownode_ids().clone()); + + let create_table_flow_txn = self.table_flow_manager.build_create_txn( + flow_id, + flow_routes + .into_iter() + .map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer })) + .collect(), + new_flow_info.source_table_ids(), + )?; + + let txn = Txn::merge_all(vec![ + create_flow_flow_name_txn, + create_flow_txn, + create_flow_routes_txn, + create_flownode_flow_txn, + create_table_flow_txn, + ]); + info!( + "Creating flow {}.{}({}), with {} txn operations", + new_flow_info.catalog_name, + new_flow_info.flow_name, + flow_id, + txn.max_operations() + ); + + let mut resp = self.kv_backend.txn(txn).await?; + if !resp.succeeded { + let mut set = TxnOpGetResponseSet::from(&mut resp.responses); + let remote_flow_flow_name = + on_create_flow_flow_name_failure(&mut set)?.with_context(|| { + error::UnexpectedSnafu { + err_msg: format!( + "Reads the empty flow name during the updating flow, flow_id: {flow_id}" + ), + } + })?; + + if remote_flow_flow_name.flow_id() != flow_id { + info!( + "Trying to updating flow {}.{}({}), but flow({}) already exists with a different flow id", + new_flow_info.catalog_name, + new_flow_info.flow_name, + flow_id, + remote_flow_flow_name.flow_id() + ); + + return error::UnexpectedSnafu { + err_msg: format!( + "Reads different flow id when updating flow({2}.{3}), prev flow id = {0}, updating with flow id = {1}", + remote_flow_flow_name.flow_id(), + flow_id, + new_flow_info.catalog_name, + new_flow_info.flow_name, + ), + }.fail(); + } + + let remote_flow = + on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu { + err_msg: format!( + "Reads the empty flow during the updating flow, flow_id: {flow_id}" + ), + })?; + let op_name = "updating flow"; + ensure_values!(*remote_flow, new_flow_info.clone(), op_name); + } + + Ok(()) + } + fn flow_metadata_keys(&self, flow_id: FlowId, flow_value: &FlowInfoValue) -> Vec> { let source_table_ids = flow_value.source_table_ids(); let mut keys = @@ -560,4 +656,222 @@ mod tests { // Ensures all keys are deleted assert!(mem_kv.is_empty()) } + + #[tokio::test] + async fn test_update_flow_metadata() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone()); + let flow_id = 10; + let flow_value = test_flow_info_value( + "flow", + [(0, 1u64), (1, 2u64)].into(), + vec![1024, 1025, 1026], + ); + let flow_routes = vec![ + ( + 1u32, + FlowRouteValue { + peer: Peer::empty(1), + }, + ), + ( + 2, + FlowRouteValue { + peer: Peer::empty(2), + }, + ), + ]; + flow_metadata_manager + .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone()) + .await + .unwrap(); + + let new_flow_value = { + let mut tmp = flow_value.clone(); + tmp.raw_sql = "new".to_string(); + tmp + }; + + // Update flow instead + flow_metadata_manager + .update_flow_metadata( + flow_id, + &DeserializedValueWithBytes::from_inner(flow_value.clone()), + &new_flow_value, + flow_routes.clone(), + ) + .await + .unwrap(); + + let got = flow_metadata_manager + .flow_info_manager() + .get(flow_id) + .await + .unwrap() + .unwrap(); + let routes = flow_metadata_manager + .flow_route_manager() + .routes(flow_id) + .try_collect::>() + .await + .unwrap(); + assert_eq!( + routes, + vec![ + ( + FlowRouteKey::new(flow_id, 1), + FlowRouteValue { + peer: Peer::empty(1), + }, + ), + ( + FlowRouteKey::new(flow_id, 2), + FlowRouteValue { + peer: Peer::empty(2), + }, + ), + ] + ); + assert_eq!(got, new_flow_value); + let flows = flow_metadata_manager + .flownode_flow_manager() + .flows(1) + .try_collect::>() + .await + .unwrap(); + assert_eq!(flows, vec![(flow_id, 0)]); + for table_id in [1024, 1025, 1026] { + let nodes = flow_metadata_manager + .table_flow_manager() + .flows(table_id) + .try_collect::>() + .await + .unwrap(); + assert_eq!( + nodes, + vec![ + ( + TableFlowKey::new(table_id, 1, flow_id, 1), + TableFlowValue { + peer: Peer::empty(1) + } + ), + ( + TableFlowKey::new(table_id, 2, flow_id, 2), + TableFlowValue { + peer: Peer::empty(2) + } + ) + ] + ); + } + } + + #[tokio::test] + async fn test_update_flow_metadata_flow_replace_diff_id_err() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let flow_metadata_manager = FlowMetadataManager::new(mem_kv); + let flow_id = 10; + let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]); + let flow_routes = vec![ + ( + 1u32, + FlowRouteValue { + peer: Peer::empty(1), + }, + ), + ( + 2, + FlowRouteValue { + peer: Peer::empty(2), + }, + ), + ]; + flow_metadata_manager + .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone()) + .await + .unwrap(); + // update again with same flow id + flow_metadata_manager + .update_flow_metadata( + flow_id, + &DeserializedValueWithBytes::from_inner(flow_value.clone()), + &flow_value, + flow_routes.clone(), + ) + .await + .unwrap(); + // update again with wrong flow id, expected error + let err = flow_metadata_manager + .update_flow_metadata( + flow_id + 1, + &DeserializedValueWithBytes::from_inner(flow_value.clone()), + &flow_value, + flow_routes, + ) + .await + .unwrap_err(); + assert_matches!(err, error::Error::Unexpected { .. }); + assert!(err + .to_string() + .contains("Reads different flow id when updating flow")); + } + + #[tokio::test] + async fn test_update_flow_metadata_unexpected_err_prev_value_diff() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let flow_metadata_manager = FlowMetadataManager::new(mem_kv); + let flow_id = 10; + let catalog_name = "greptime"; + let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]); + let flow_routes = vec![ + ( + 1u32, + FlowRouteValue { + peer: Peer::empty(1), + }, + ), + ( + 2, + FlowRouteValue { + peer: Peer::empty(2), + }, + ), + ]; + flow_metadata_manager + .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone()) + .await + .unwrap(); + // Creates again. + let another_sink_table_name = TableName { + catalog_name: catalog_name.to_string(), + schema_name: "my_schema".to_string(), + table_name: "another_sink_table".to_string(), + }; + let flow_value = FlowInfoValue { + catalog_name: "greptime".to_string(), + flow_name: "flow".to_string(), + source_table_ids: vec![1024, 1025, 1026], + sink_table_name: another_sink_table_name, + flownode_ids: [(0, 1u64)].into(), + raw_sql: "raw".to_string(), + expire_after: Some(300), + comment: "hi".to_string(), + options: Default::default(), + }; + let err = flow_metadata_manager + .update_flow_metadata( + flow_id, + &DeserializedValueWithBytes::from_inner(flow_value.clone()), + &flow_value, + flow_routes.clone(), + ) + .await + .unwrap_err(); + assert!( + err.to_string().contains("Reads the different value"), + "error: {:?}", + err + ); + } } diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index 55f982af936b..67d663625db3 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -26,7 +26,7 @@ use crate::error::{self, Result}; use crate::key::flow::FlowScoped; use crate::key::txn_helper::TxnOpGetResponseSet; use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue}; -use crate::kv_backend::txn::Txn; +use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::FlownodeId; @@ -196,6 +196,19 @@ impl FlowInfoManager { .transpose() } + /// Returns the [FlowInfoValue] with original bytes of specified `flow_id`. + pub async fn get_raw( + &self, + flow_id: FlowId, + ) -> Result>> { + let key = FlowInfoKey::new(flow_id).to_bytes(); + self.kv_backend + .get(&key) + .await? + .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value)) + .transpose() + } + /// Builds a create flow transaction. /// It is expected that the `__flow/info/{flow_id}` wasn't occupied. /// Otherwise, the transaction will retrieve existing value. @@ -215,6 +228,36 @@ impl FlowInfoManager { TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)), )) } + + /// Builds a update flow transaction. + /// It is expected that the `__flow/info/{flow_id}` IS ALREADY occupied and equal to `prev_flow_value`, + /// but the new value can be the same, so to allow replace operation to happen even when the value is the same. + /// Otherwise, the transaction will retrieve existing value and fail. + pub(crate) fn build_update_txn( + &self, + flow_id: FlowId, + current_flow_value: &DeserializedValueWithBytes, + new_flow_value: &FlowInfoValue, + ) -> Result<( + Txn, + impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult, + )> { + let key = FlowInfoKey::new(flow_id).to_bytes(); + let raw_value = new_flow_value.try_as_raw_value()?; + let prev_value = current_flow_value.get_raw_bytes(); + let txn = Txn::new() + .when(vec![ + Compare::new(key.clone(), CompareOp::NotEqual, None), + Compare::new(key.clone(), CompareOp::Equal, Some(prev_value)), + ]) + .and_then(vec![TxnOp::Put(key.clone(), raw_value)]) + .or_else(vec![TxnOp::Get(key.clone())]); + + Ok(( + txn, + TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)), + )) + } } #[cfg(test)] diff --git a/src/common/meta/src/key/flow/flow_name.rs b/src/common/meta/src/key/flow/flow_name.rs index 0db86b05fa5e..79c87c7360ea 100644 --- a/src/common/meta/src/key/flow/flow_name.rs +++ b/src/common/meta/src/key/flow/flow_name.rs @@ -26,7 +26,7 @@ use crate::key::txn_helper::TxnOpGetResponseSet; use crate::key::{ BytesAdapter, DeserializedValueWithBytes, FlowId, MetadataKey, MetadataValue, NAME_PATTERN, }; -use crate::kv_backend::txn::Txn; +use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; use crate::rpc::store::RangeRequest; @@ -237,6 +237,37 @@ impl FlowNameManager { TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)), )) } + + /// Builds a update flow name transaction. Which doesn't change either the name or id, just checking if they are the same. + /// It's expected that the `__flow/name/{catalog}/{flow_name}` IS already occupied, + /// and both flow name and flow id is the same. + /// Otherwise, the transaction will retrieve existing value(and fail). + pub fn build_update_txn( + &self, + catalog_name: &str, + flow_name: &str, + flow_id: FlowId, + ) -> Result<( + Txn, + impl FnOnce(&mut TxnOpGetResponseSet) -> FlowNameDecodeResult, + )> { + let key = FlowNameKey::new(catalog_name, flow_name); + let raw_key = key.to_bytes(); + let flow_flow_name_value = FlowNameValue::new(flow_id); + let raw_value = flow_flow_name_value.try_as_raw_value()?; + let txn = Txn::new() + .when(vec![Compare::new( + raw_key.clone(), + CompareOp::Equal, + Some(raw_value), + )]) + .or_else(vec![TxnOp::Get(raw_key.clone())]); + + Ok(( + txn, + TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)), + )) + } } #[cfg(test)] diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index dadc99f8ec5d..80d03e27706b 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -50,7 +50,10 @@ use crate::adapter::util::column_schemas_to_proto; use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; use crate::compute::ErrCollector; use crate::df_optimizer::sql_to_flow_plan; -use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu}; +use crate::error::{ + EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, TableNotFoundSnafu, + UnexpectedSnafu, +}; use crate::expr::{Batch, GlobalId}; use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS}; use crate::repr::{self, DiffRow, Row, BATCH_SIZE}; @@ -673,6 +676,21 @@ impl FlowWorkerManager { } } +/// The arguments to create a flow in [`FlowWorkerManager`]. +#[derive(Debug, Clone)] +pub struct CreateFlowArgs { + pub flow_id: FlowId, + pub sink_table_name: TableName, + pub source_table_ids: Vec, + pub create_if_not_exists: bool, + pub or_replace: bool, + pub expire_after: Option, + pub comment: Option, + pub sql: String, + pub flow_options: HashMap, + pub query_ctx: Option, +} + /// Create&Remove flow impl FlowWorkerManager { /// remove a flow by it's id @@ -694,18 +712,48 @@ impl FlowWorkerManager { /// 1. parse query into typed plan(and optional parse expire_after expr) /// 2. render source/sink with output table id and used input table id #[allow(clippy::too_many_arguments)] - pub async fn create_flow( - &self, - flow_id: FlowId, - sink_table_name: TableName, - source_table_ids: &[TableId], - create_if_not_exists: bool, - expire_after: Option, - comment: Option, - sql: String, - flow_options: HashMap, - query_ctx: Option, - ) -> Result, Error> { + pub async fn create_flow(&self, args: CreateFlowArgs) -> Result, Error> { + let CreateFlowArgs { + flow_id, + sink_table_name, + source_table_ids, + create_if_not_exists, + or_replace, + expire_after, + comment, + sql, + flow_options, + query_ctx, + } = args; + + let already_exist = { + let mut flag = false; + + // check if the task already exists + for handle in self.worker_handles.iter() { + if handle.lock().await.contains_flow(flow_id).await? { + flag = true; + break; + } + } + flag + }; + match (create_if_not_exists, or_replace, already_exist) { + // do replace + (_, true, true) => { + info!("Replacing flow with id={}", flow_id); + self.remove_flow(flow_id).await?; + } + (false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?, + // do nothing if exists + (true, false, true) => { + info!("Flow with id={} already exists, do nothing", flow_id); + return Ok(None); + } + // create if not exists + (_, _, false) => (), + } + if create_if_not_exists { // check if the task already exists for handle in self.worker_handles.iter() { @@ -717,7 +765,7 @@ impl FlowWorkerManager { let mut node_ctx = self.node_context.write().await; // assign global id to source and sink table - for source in source_table_ids { + for source in &source_table_ids { node_ctx .assign_global_id_to_table(&self.table_info_source, None, Some(*source)) .await?; @@ -726,7 +774,7 @@ impl FlowWorkerManager { .assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None) .await?; - node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_name.clone()); + node_ctx.register_task_src_sink(flow_id, &source_table_ids, sink_table_name.clone()); node_ctx.query_context = query_ctx.map(Arc::new); // construct a active dataflow state with it diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 8e5a02858820..3841d08914c5 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -28,7 +28,7 @@ use itertools::Itertools; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; -use crate::adapter::FlowWorkerManager; +use crate::adapter::{CreateFlowArgs, FlowWorkerManager}; use crate::error::InternalSnafu; use crate::metrics::METRIC_FLOW_TASK_COUNT; use crate::repr::{self, DiffRow}; @@ -57,7 +57,7 @@ impl Flownode for FlowWorkerManager { comment, sql, flow_options, - or_replace: _, + or_replace, })) => { let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec(); let sink_table_name = [ @@ -66,20 +66,19 @@ impl Flownode for FlowWorkerManager { sink_table_name.table_name, ]; let expire_after = expire_after.map(|e| e.value); - let ret = self - .create_flow( - task_id.id as u64, - sink_table_name, - &source_table_ids, - create_if_not_exists, - expire_after, - Some(comment), - sql, - flow_options, - query_ctx, - ) - .await - .map_err(to_meta_err)?; + let args = CreateFlowArgs { + flow_id: task_id.id as u64, + sink_table_name, + source_table_ids, + create_if_not_exists, + or_replace, + expire_after, + comment: Some(comment), + sql, + flow_options, + query_ctx, + }; + let ret = self.create_flow(args).await.map_err(to_meta_err)?; METRIC_FLOW_TASK_COUNT.inc(); Ok(FlowResponse { affected_flows: ret diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index c9d75a929922..87b6bbdc09ed 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -48,7 +48,7 @@ use tonic::codec::CompressionEncoding; use tonic::transport::server::TcpIncoming; use tonic::{Request, Response, Status}; -use crate::adapter::FlowWorkerManagerRef; +use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef}; use crate::error::{ CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, @@ -355,23 +355,26 @@ impl FlownodeBuilder { info.sink_table_name().schema_name.clone(), info.sink_table_name().table_name.clone(), ]; - manager - .create_flow( - flow_id as _, - sink_table_name, - info.source_table_ids(), - true, - info.expire_after(), - Some(info.comment().clone()), - info.raw_sql().clone(), - info.options().clone(), - Some( - QueryContextBuilder::default() - .current_catalog(info.catalog_name().clone()) - .build(), - ), - ) - .await?; + let args = CreateFlowArgs { + flow_id: flow_id as _, + sink_table_name, + source_table_ids: info.source_table_ids().to_vec(), + // because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist) + // but for the sake of consistency and to make sure recover of flow actually happen, we set both to true + // (which is also fine since checks for not allow both to be true is on metasrv and we already pass that) + create_if_not_exists: true, + or_replace: true, + expire_after: info.expire_after(), + comment: Some(info.comment().clone()), + sql: info.raw_sql().clone(), + flow_options: info.options().clone(), + query_ctx: Some( + QueryContextBuilder::default() + .current_catalog(info.catalog_name().clone()) + .build(), + ), + }; + manager.create_flow(args).await?; } Ok(cnt) diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index 172961d50a1f..062bd8e14e18 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -865,7 +865,9 @@ pub fn show_create_flow( value: flow_val.sink_table_name().table_name.clone(), quote_style: None, }]), - or_replace: true, + // notice we don't want `OR REPLACE` and `IF NOT EXISTS` in same sql since it's unclear what to do + // so we set `or_replace` to false. + or_replace: false, if_not_exists: true, expire_after: flow_val.expire_after(), comment, diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index d3cc7f66800e..4c6095d2529c 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -259,7 +259,7 @@ SHOW CREATE FLOW filter_numbers_basic; +----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ | Flow | Create Flow | +----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ -| filter_numbers_basic | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_basic | +| filter_numbers_basic | CREATE FLOW IF NOT EXISTS filter_numbers_basic | | | SINK TO out_num_cnt_basic | | | AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic WHERE number > 10 | +----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ diff --git a/tests/cases/standalone/common/flow/show_create_flow.result b/tests/cases/standalone/common/flow/show_create_flow.result index a91930ee7620..38fa609c960d 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.result +++ b/tests/cases/standalone/common/flow/show_create_flow.result @@ -9,7 +9,9 @@ Affected Rows: 0 create table out_num_cnt_show ( number INT, - ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX, + PRIMARY KEY(number), +); Affected Rows: 0 @@ -32,7 +34,7 @@ SHOW CREATE FLOW filter_numbers_show; +---------------------+------------------------------------------------------------+ | Flow | Create Flow | +---------------------+------------------------------------------------------------+ -| filter_numbers_show | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show | +| filter_numbers_show | CREATE FLOW IF NOT EXISTS filter_numbers_show | | | SINK TO out_num_cnt_show | | | AS SELECT number FROM numbers_input_show WHERE number > 10 | +---------------------+------------------------------------------------------------+ @@ -67,6 +69,251 @@ SHOW FLOWS LIKE 'filter_numbers_show'; ++ ++ +-- also test `CREATE OR REPLACE` and `IF NOT EXISTS` +-- (flow exists, replace, if not exists)=(false, false, false) +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 10; + +Affected Rows: 0 + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + ++---------------------+---------------+-------------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+-------------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > 10 | ++---------------------+---------------+-------------------------------------------------------------+ + +-- this one should error out +-- (flow exists, replace, if not exists)=(true, false, false) +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 15; + +Error: 8000(FlowAlreadyExists), Flow already exists: greptime.filter_numbers_show + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + ++---------------------+---------------+-------------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+-------------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > 10 | ++---------------------+---------------+-------------------------------------------------------------+ + +-- makesure it's not replaced in flownode +INSERT INTO numbers_input_show VALUES (10, 0),(15, 1),(16, 2); + +Affected Rows: 3 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + ++-----------------------------------------+ +| ADMIN FLUSH_FLOW('filter_numbers_show') | ++-----------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------+ + +SELECT number, ts FROM out_num_cnt_show; + ++--------+-------------------------+ +| number | ts | ++--------+-------------------------+ +| 15 | 1970-01-01T00:00:00.001 | +| 16 | 1970-01-01T00:00:00.002 | ++--------+-------------------------+ + +-- after this one, the flow SHOULD NOT be replaced +-- (flow exists, replace, if not exists)=(true, false, true) +CREATE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 5; + +Affected Rows: 0 + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + ++---------------------+---------------+-------------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+-------------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > 10 | ++---------------------+---------------+-------------------------------------------------------------+ + +-- makesure it's not replaced in flownode +INSERT INTO numbers_input_show VALUES (4,4),(5,4),(10, 3),(11, 4); + +Affected Rows: 4 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + ++-----------------------------------------+ +| ADMIN FLUSH_FLOW('filter_numbers_show') | ++-----------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------+ + +SELECT number, ts FROM out_num_cnt_show; + ++--------+-------------------------+ +| number | ts | ++--------+-------------------------+ +| 11 | 1970-01-01T00:00:00.004 | +| 15 | 1970-01-01T00:00:00.001 | +| 16 | 1970-01-01T00:00:00.002 | ++--------+-------------------------+ + +-- after this, the flow SHOULD be replaced +-- (flow exists, replace, if not exists)=(true, true, false) +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 3; + +Affected Rows: 0 + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + ++---------------------+---------------+------------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+------------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > 3 | ++---------------------+---------------+------------------------------------------------------------+ + +-- makesure it's replaced in flownode +INSERT INTO numbers_input_show VALUES (3, 1),(4, 2),(10, 3),(11, 4); + +Affected Rows: 4 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + ++-----------------------------------------+ +| ADMIN FLUSH_FLOW('filter_numbers_show') | ++-----------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------+ + +SELECT number, ts FROM out_num_cnt_show; + ++--------+-------------------------+ +| number | ts | ++--------+-------------------------+ +| 4 | 1970-01-01T00:00:00.002 | +| 10 | 1970-01-01T00:00:00.003 | +| 11 | 1970-01-01T00:00:00.004 | +| 15 | 1970-01-01T00:00:00.001 | +| 16 | 1970-01-01T00:00:00.002 | ++--------+-------------------------+ + +-- after this, the flow SHOULD error out since having both `replace` and `if not exists` +-- (flow exists, replace, if not exists)=(true, true, true) +CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 0; + +Error: 1001(Unsupported), Unsupported operation Create flow with both `IF NOT EXISTS` and `OR REPLACE` + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + ++---------------------+---------------+------------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+------------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > 3 | ++---------------------+---------------+------------------------------------------------------------+ + +DROP FLOW filter_numbers_show; + +Affected Rows: 0 + +-- (flow exists, replace, if not exists)=(false, true, true) +CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -1; + +Error: 1001(Unsupported), Unsupported operation Create flow with both `IF NOT EXISTS` and `OR REPLACE` + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +++ +++ + +DROP FLOW filter_numbers_show; + +Error: 8001(FlowNotFound), Flow not found: greptime.filter_numbers_show + +-- following always create since didn't exist +-- (flow exists, replace, if not exists)=(false, true, false) +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -2; + +Affected Rows: 0 + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + ++---------------------+---------------+-------------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+-------------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > -2 | ++---------------------+---------------+-------------------------------------------------------------+ + +DROP FLOW filter_numbers_show; + +Affected Rows: 0 + +-- (flow exists, replace, if not exists)=(false, false, true) +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -3; + +Affected Rows: 0 + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + ++---------------------+---------------+-------------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+-------------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > -3 | ++---------------------+---------------+-------------------------------------------------------------+ + +-- makesure after recover should be the same +-- SQLNESS ARG restart=true +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + ++---------------------+---------------+-------------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+-------------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > -3 | ++---------------------+---------------+-------------------------------------------------------------+ + +SELECT * FROM out_num_cnt_show; + ++--------+-------------------------+ +| number | ts | ++--------+-------------------------+ +| 4 | 1970-01-01T00:00:00.002 | +| 10 | 1970-01-01T00:00:00.003 | +| 11 | 1970-01-01T00:00:00.004 | +| 15 | 1970-01-01T00:00:00.001 | +| 16 | 1970-01-01T00:00:00.002 | ++--------+-------------------------+ + +INSERT INTO numbers_input_show VALUES(-4,0), (-3,1), (-2,2), (-1,3); + +Affected Rows: 4 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + ++-----------------------------------------+ +| ADMIN FLUSH_FLOW('filter_numbers_show') | ++-----------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------+ + +SELECT * FROM out_num_cnt_show; + ++--------+-------------------------+ +| number | ts | ++--------+-------------------------+ +| -2 | 1970-01-01T00:00:00.002 | +| -1 | 1970-01-01T00:00:00.003 | +| 4 | 1970-01-01T00:00:00.002 | +| 10 | 1970-01-01T00:00:00.003 | +| 11 | 1970-01-01T00:00:00.004 | +| 15 | 1970-01-01T00:00:00.001 | +| 16 | 1970-01-01T00:00:00.002 | ++--------+-------------------------+ + +DROP FLOW filter_numbers_show; + +Affected Rows: 0 + drop table out_num_cnt_show; Affected Rows: 0 diff --git a/tests/cases/standalone/common/flow/show_create_flow.sql b/tests/cases/standalone/common/flow/show_create_flow.sql index 3cf84ed0f335..7348a83b5103 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.sql +++ b/tests/cases/standalone/common/flow/show_create_flow.sql @@ -6,7 +6,9 @@ CREATE TABLE numbers_input_show ( ); create table out_num_cnt_show ( number INT, - ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX, + PRIMARY KEY(number), +); SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; @@ -27,6 +29,99 @@ SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS W SHOW FLOWS LIKE 'filter_numbers_show'; +-- also test `CREATE OR REPLACE` and `IF NOT EXISTS` + +-- (flow exists, replace, if not exists)=(false, false, false) +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 10; + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +-- this one should error out +-- (flow exists, replace, if not exists)=(true, false, false) +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 15; + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +-- makesure it's not replaced in flownode +INSERT INTO numbers_input_show VALUES (10, 0),(15, 1),(16, 2); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + +SELECT number, ts FROM out_num_cnt_show; + +-- after this one, the flow SHOULD NOT be replaced +-- (flow exists, replace, if not exists)=(true, false, true) +CREATE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 5; + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +-- makesure it's not replaced in flownode +INSERT INTO numbers_input_show VALUES (4,4),(5,4),(10, 3),(11, 4); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + +SELECT number, ts FROM out_num_cnt_show; + +-- after this, the flow SHOULD be replaced +-- (flow exists, replace, if not exists)=(true, true, false) +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 3; + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +-- makesure it's replaced in flownode +INSERT INTO numbers_input_show VALUES (3, 1),(4, 2),(10, 3),(11, 4); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + +SELECT number, ts FROM out_num_cnt_show; + +-- after this, the flow SHOULD error out since having both `replace` and `if not exists` +-- (flow exists, replace, if not exists)=(true, true, true) +CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 0; + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +DROP FLOW filter_numbers_show; + +-- (flow exists, replace, if not exists)=(false, true, true) +CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -1; + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +DROP FLOW filter_numbers_show; + +-- following always create since didn't exist +-- (flow exists, replace, if not exists)=(false, true, false) +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -2; + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +DROP FLOW filter_numbers_show; + +-- (flow exists, replace, if not exists)=(false, false, true) +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -3; + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +-- makesure after recover should be the same +-- SQLNESS ARG restart=true + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +SELECT * FROM out_num_cnt_show; + +INSERT INTO numbers_input_show VALUES(-4,0), (-3,1), (-2,2), (-1,3); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + +SELECT * FROM out_num_cnt_show; + +DROP FLOW filter_numbers_show; + drop table out_num_cnt_show; drop table numbers_input_show;