Skip to content

Commit

Permalink
feat: CREATE OR REPLACE FLOW (#5001)
Browse files Browse the repository at this point in the history
* feat: Replace flow

* refactor: better show create flow&tests: better check

* tests: sqlness result update

* tests: unit test for update

* refactor: cmp with raw bytes

* refactor: rename

* refactor: per review
  • Loading branch information
discord9 authored Nov 19, 2024
1 parent dbb3f2d commit 4d8fe29
Show file tree
Hide file tree
Showing 12 changed files with 932 additions and 78 deletions.
99 changes: 82 additions & 17 deletions src/common/meta/src/ddl/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use common_procedure::{
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use futures::future::join_all;
use futures::TryStreamExt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
Expand All @@ -43,7 +44,7 @@ use crate::instruction::{CacheIdent, CreateFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_route::FlowRouteValue;
use crate::key::table_name::TableNameKey;
use crate::key::{FlowId, FlowPartitionId};
use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::{CreateFlowTask, QueryContext};
Expand Down Expand Up @@ -75,6 +76,7 @@ impl CreateFlowProcedure {
source_table_ids: vec![],
query_context,
state: CreateFlowState::Prepare,
prev_flow_info_value: None,
},
}
}
Expand All @@ -90,6 +92,7 @@ impl CreateFlowProcedure {
let flow_name = &self.data.task.flow_name;
let sink_table_name = &self.data.task.sink_table_name;
let create_if_not_exists = self.data.task.create_if_not_exists;
let or_replace = self.data.task.or_replace;

let flow_name_value = self
.context
Expand All @@ -98,16 +101,56 @@ impl CreateFlowProcedure {
.get(catalog_name, flow_name)
.await?;

if create_if_not_exists && or_replace {
// this is forbidden because not clear what does that mean exactly
return error::UnsupportedSnafu {
operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`".to_string(),
}
.fail();
}

if let Some(value) = flow_name_value {
ensure!(
create_if_not_exists,
create_if_not_exists || or_replace,
error::FlowAlreadyExistsSnafu {
flow_name: format_full_flow_name(catalog_name, flow_name),
}
);

let flow_id = value.flow_id();
return Ok(Status::done_with_output(flow_id));
if create_if_not_exists {
info!("Flow already exists, flow_id: {}", flow_id);
return Ok(Status::done_with_output(flow_id));
}

let flow_id = value.flow_id();
let peers = self
.context
.flow_metadata_manager
.flow_route_manager()
.routes(flow_id)
.map_ok(|(_, value)| value.peer)
.try_collect::<Vec<_>>()
.await?;
self.data.flow_id = Some(flow_id);
self.data.peers = peers;
info!("Replacing flow, flow_id: {}", flow_id);

let flow_info_value = self
.context
.flow_metadata_manager
.flow_info_manager()
.get_raw(flow_id)
.await?;

ensure!(
flow_info_value.is_some(),
error::FlowNotFoundSnafu {
flow_name: format_full_flow_name(catalog_name, flow_name),
}
);

self.data.prev_flow_info_value = flow_info_value;
}

// Ensures sink table doesn't exist.
Expand All @@ -128,7 +171,9 @@ impl CreateFlowProcedure {
}

self.collect_source_tables().await?;
self.allocate_flow_id().await?;
if self.data.flow_id.is_none() {
self.allocate_flow_id().await?;
}
self.data.state = CreateFlowState::CreateFlows;

Ok(Status::executing(true))
Expand All @@ -153,7 +198,10 @@ impl CreateFlowProcedure {
.map_err(add_peer_context_if_needed(peer.clone()))
});
}

info!(
"Creating flow({:?}) on flownodes with peers={:?}",
self.data.flow_id, self.data.peers
);
join_all(create_flow)
.await
.into_iter()
Expand All @@ -170,18 +218,29 @@ impl CreateFlowProcedure {
async fn on_create_metadata(&mut self) -> Result<Status> {
// Safety: The flow id must be allocated.
let flow_id = self.data.flow_id.unwrap();
// TODO(weny): Support `or_replace`.
let (flow_info, flow_routes) = (&self.data).into();
self.context
.flow_metadata_manager
.create_flow_metadata(flow_id, flow_info, flow_routes)
.await?;
info!("Created flow metadata for flow {flow_id}");
if let Some(prev_flow_value) = self.data.prev_flow_info_value.as_ref()
&& self.data.task.or_replace
{
self.context
.flow_metadata_manager
.update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes)
.await?;
info!("Replaced flow metadata for flow {flow_id}");
} else {
self.context
.flow_metadata_manager
.create_flow_metadata(flow_id, flow_info, flow_routes)
.await?;
info!("Created flow metadata for flow {flow_id}");
}

self.data.state = CreateFlowState::InvalidateFlowCache;
Ok(Status::executing(true))
}

async fn on_broadcast(&mut self) -> Result<Status> {
debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
// Safety: The flow id must be allocated.
let flow_id = self.data.flow_id.unwrap();
let ctx = Context {
Expand All @@ -192,10 +251,13 @@ impl CreateFlowProcedure {
.cache_invalidator
.invalidate(
&ctx,
&[CacheIdent::CreateFlow(CreateFlow {
source_table_ids: self.data.source_table_ids.clone(),
flownodes: self.data.peers.clone(),
})],
&[
CacheIdent::CreateFlow(CreateFlow {
source_table_ids: self.data.source_table_ids.clone(),
flownodes: self.data.peers.clone(),
}),
CacheIdent::FlowId(flow_id),
],
)
.await?;

Expand Down Expand Up @@ -270,6 +332,9 @@ pub struct CreateFlowData {
pub(crate) peers: Vec<Peer>,
pub(crate) source_table_ids: Vec<TableId>,
pub(crate) query_context: QueryContext,
/// For verify if prev value is consistent when need to update flow metadata.
/// only set when `or_replace` is true.
pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
}

impl From<&CreateFlowData> for CreateRequest {
Expand All @@ -284,9 +349,9 @@ impl From<&CreateFlowData> for CreateRequest {
.map(|table_id| api::v1::TableId { id: *table_id })
.collect_vec(),
sink_table_name: Some(value.task.sink_table_name.clone().into()),
// Always be true
// Always be true to ensure idempotent in case of retry
create_if_not_exists: true,
or_replace: true,
or_replace: value.task.or_replace,
expire_after: value.task.expire_after.map(|value| ExpireAfter { value }),
comment: value.task.comment.clone(),
sql: value.task.sql.clone(),
Expand Down
15 changes: 11 additions & 4 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,10 +655,17 @@ async fn handle_create_flow_task(
procedure_id: &procedure_id,
err_msg: "downcast to `u32`",
})?);
info!(
"Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
create_flow_task.catalog_name, create_flow_task.flow_name,
);
if !create_flow_task.or_replace {
info!(
"Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
create_flow_task.catalog_name, create_flow_task.flow_name,
);
} else {
info!(
"Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
create_flow_task.catalog_name, create_flow_task.flow_name,
);
}

Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
Expand Down
Loading

0 comments on commit 4d8fe29

Please sign in to comment.