Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: CREATE OR REPLACE FLOW #5001

Merged
merged 7 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 59 additions & 16 deletions src/common/meta/src/ddl/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use common_procedure::{
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use futures::future::join_all;
use futures::TryStreamExt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
Expand Down Expand Up @@ -75,6 +76,7 @@ impl CreateFlowProcedure {
source_table_ids: vec![],
query_context,
state: CreateFlowState::Prepare,
flow_already_exists: false,
},
}
}
Expand All @@ -90,6 +92,7 @@ impl CreateFlowProcedure {
let flow_name = &self.data.task.flow_name;
let sink_table_name = &self.data.task.sink_table_name;
let create_if_not_exists = self.data.task.create_if_not_exists;
let or_replace = self.data.task.or_replace;

let flow_name_value = self
.context
Expand All @@ -98,16 +101,41 @@ impl CreateFlowProcedure {
.get(catalog_name, flow_name)
.await?;

if create_if_not_exists && or_replace {
// this is forbidden because not clear what does that mean exactly
return error::UnsupportedSnafu {
operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`".to_string(),
}
.fail();
}

if let Some(value) = flow_name_value {
ensure!(
create_if_not_exists,
create_if_not_exists || or_replace,
error::FlowAlreadyExistsSnafu {
flow_name: format_full_flow_name(catalog_name, flow_name),
}
);

let flow_id = value.flow_id();
return Ok(Status::done_with_output(flow_id));
if create_if_not_exists {
info!("Flow already exists, flow_id: {}", flow_id);
return Ok(Status::done_with_output(flow_id));
}

let flow_id = value.flow_id();
let peers = self
.context
.flow_metadata_manager
.flow_route_manager()
.routes(flow_id)
.map_ok(|(_, value)| value.peer)
.try_collect::<Vec<_>>()
.await?;
self.data.flow_id = Some(flow_id);
self.data.peers = peers;
info!("Replacing flow, flow_id: {}", flow_id);
self.data.flow_already_exists = true;
}

// Ensures sink table doesn't exist.
Expand All @@ -128,7 +156,9 @@ impl CreateFlowProcedure {
}

self.collect_source_tables().await?;
self.allocate_flow_id().await?;
if self.data.flow_id.is_none() {
self.allocate_flow_id().await?;
}
self.data.state = CreateFlowState::CreateFlows;

Ok(Status::executing(true))
Expand All @@ -153,7 +183,7 @@ impl CreateFlowProcedure {
.map_err(add_peer_context_if_needed(peer.clone()))
});
}

info!("Creating flow({}) on flownodes", self.data.flow_id.unwrap());
discord9 marked this conversation as resolved.
Show resolved Hide resolved
join_all(create_flow)
.await
.into_iter()
Expand All @@ -170,18 +200,27 @@ impl CreateFlowProcedure {
async fn on_create_metadata(&mut self) -> Result<Status> {
// Safety: The flow id must be allocated.
let flow_id = self.data.flow_id.unwrap();
// TODO(weny): Support `or_replace`.
let (flow_info, flow_routes) = (&self.data).into();
self.context
.flow_metadata_manager
.create_flow_metadata(flow_id, flow_info, flow_routes)
.await?;
info!("Created flow metadata for flow {flow_id}");
if self.data.flow_already_exists {
self.context
.flow_metadata_manager
.update_flow_metadata(flow_id, flow_info, flow_routes)
.await?;
info!("Replaced flow metadata for flow {flow_id}");
} else {
self.context
.flow_metadata_manager
.create_flow_metadata(flow_id, flow_info, flow_routes)
.await?;
info!("Created flow metadata for flow {flow_id}");
}

self.data.state = CreateFlowState::InvalidateFlowCache;
Ok(Status::executing(true))
}

async fn on_broadcast(&mut self) -> Result<Status> {
debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
// Safety: The flow id must be allocated.
let flow_id = self.data.flow_id.unwrap();
let ctx = Context {
Expand All @@ -192,10 +231,13 @@ impl CreateFlowProcedure {
.cache_invalidator
.invalidate(
&ctx,
&[CacheIdent::CreateFlow(CreateFlow {
source_table_ids: self.data.source_table_ids.clone(),
flownodes: self.data.peers.clone(),
})],
&[
CacheIdent::CreateFlow(CreateFlow {
source_table_ids: self.data.source_table_ids.clone(),
flownodes: self.data.peers.clone(),
}),
CacheIdent::FlowId(flow_id),
],
)
.await?;

Expand Down Expand Up @@ -270,6 +312,7 @@ pub struct CreateFlowData {
pub(crate) peers: Vec<Peer>,
pub(crate) source_table_ids: Vec<TableId>,
pub(crate) query_context: QueryContext,
pub(crate) flow_already_exists: bool,
}

impl From<&CreateFlowData> for CreateRequest {
Expand All @@ -284,9 +327,9 @@ impl From<&CreateFlowData> for CreateRequest {
.map(|table_id| api::v1::TableId { id: *table_id })
.collect_vec(),
sink_table_name: Some(value.task.sink_table_name.clone().into()),
// Always be true
// Always be true to ensure idempotent in case of retry
create_if_not_exists: true,
or_replace: true,
or_replace: value.task.or_replace,
expire_after: value.task.expire_after.map(|value| ExpireAfter { value }),
comment: value.task.comment.clone(),
sql: value.task.sql.clone(),
Expand Down
15 changes: 11 additions & 4 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,10 +655,17 @@ async fn handle_create_flow_task(
procedure_id: &procedure_id,
err_msg: "downcast to `u32`",
})?);
info!(
"Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
create_flow_task.catalog_name, create_flow_task.flow_name,
);
if !create_flow_task.or_replace {
info!(
"Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
create_flow_task.catalog_name, create_flow_task.flow_name,
);
} else {
info!(
"Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
create_flow_task.catalog_name, create_flow_task.flow_name,
);
}

Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
Expand Down
87 changes: 87 additions & 0 deletions src/common/meta/src/key/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,93 @@ impl FlowMetadataManager {
Ok(())
}

/// Update metadata for flow and returns an error if old metadata IS NOT exists.
pub async fn update_flow_metadata(
discord9 marked this conversation as resolved.
Show resolved Hide resolved
&self,
flow_id: FlowId,
flow_info: FlowInfoValue,
flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
) -> Result<()> {
let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = self
.flow_name_manager
.build_update_txn(&flow_info.catalog_name, &flow_info.flow_name, flow_id)?;

let (create_flow_txn, on_create_flow_failure) = self
.flow_info_manager
.build_update_txn(flow_id, &flow_info)?;

let create_flow_routes_txn = self
.flow_route_manager
.build_create_txn(flow_id, flow_routes.clone())?;

let create_flownode_flow_txn = self
.flownode_flow_manager
.build_create_txn(flow_id, flow_info.flownode_ids().clone());

let create_table_flow_txn = self.table_flow_manager.build_create_txn(
flow_id,
flow_routes
.into_iter()
.map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
.collect(),
flow_info.source_table_ids(),
)?;

let txn = Txn::merge_all(vec![
create_flow_flow_name_txn,
create_flow_txn,
create_flow_routes_txn,
create_flownode_flow_txn,
create_table_flow_txn,
]);
info!(
"Creating flow {}.{}({}), with {} txn operations",
flow_info.catalog_name,
flow_info.flow_name,
flow_id,
txn.max_operations()
);

let mut resp = self.kv_backend.txn(txn).await?;
if !resp.succeeded {
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
let remote_flow_flow_name =
on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
error::UnexpectedSnafu {
err_msg: format!(
"Reads the empty flow name during the creating flow, flow_id: {flow_id}"
discord9 marked this conversation as resolved.
Show resolved Hide resolved
),
}
})?;

if remote_flow_flow_name.flow_id() != flow_id {
info!(
"Trying to create flow {}.{}({}), but flow({}) already exists",
flow_info.catalog_name,
flow_info.flow_name,
flow_id,
remote_flow_flow_name.flow_id()
);

return error::FlowAlreadyExistsSnafu {
flow_name: format!("{}.{}", flow_info.catalog_name, flow_info.flow_name),
}
.fail();
}

let remote_flow =
on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
err_msg: format!(
"Reads the empty flow during the creating flow, flow_id: {flow_id}"
discord9 marked this conversation as resolved.
Show resolved Hide resolved
),
})?;
let op_name = "creating flow";
discord9 marked this conversation as resolved.
Show resolved Hide resolved
ensure_values!(*remote_flow, flow_info, op_name);
}

Ok(())
}

fn flow_metadata_keys(&self, flow_id: FlowId, flow_value: &FlowInfoValue) -> Vec<Vec<u8>> {
let source_table_ids = flow_value.source_table_ids();
let mut keys =
Expand Down
28 changes: 27 additions & 1 deletion src/common/meta/src/key/flow/flow_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::FlownodeId;

Expand Down Expand Up @@ -215,6 +215,32 @@ impl FlowInfoManager {
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
))
}

/// Builds a update flow transaction.
/// It is expected that the `__flow/info/{flow_id}` IS ALREADY occupied.
/// Otherwise, the transaction will retrieve existing value and fail.
pub(crate) fn build_update_txn(
&self,
flow_id: FlowId,
discord9 marked this conversation as resolved.
Show resolved Hide resolved
flow_value: &FlowInfoValue,
) -> Result<(
Txn,
impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult,
)> {
let key = FlowInfoKey::new(flow_id).to_bytes();
let txn = Txn::new()
.when(vec![Compare::new(key.clone(), CompareOp::NotEqual, None)])
discord9 marked this conversation as resolved.
Show resolved Hide resolved
.and_then(vec![TxnOp::Put(
key.clone(),
flow_value.try_as_raw_value()?,
)])
.or_else(vec![TxnOp::Get(key.clone())]);

Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
))
}
}

#[cfg(test)]
Expand Down
35 changes: 34 additions & 1 deletion src/common/meta/src/key/flow/flow_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{
BytesAdapter, DeserializedValueWithBytes, FlowId, MetadataKey, MetadataValue, NAME_PATTERN,
};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
Expand Down Expand Up @@ -237,6 +237,39 @@ impl FlowNameManager {
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
))
}

/// Builds a update flow name transaction.
/// It's expected that the `__flow/name/{catalog}/{flow_name}` IS already occupied.
/// Otherwise, the transaction will retrieve existing value(and fail).
pub fn build_update_txn(
&self,
catalog_name: &str,
flow_name: &str,
flow_id: FlowId,
) -> Result<(
Txn,
impl FnOnce(&mut TxnOpGetResponseSet) -> FlowNameDecodeResult,
)> {
let key = FlowNameKey::new(catalog_name, flow_name);
let raw_key = key.to_bytes();
let flow_flow_name_value = FlowNameValue::new(flow_id);
let txn = Txn::new()
.when(vec![Compare::new(
raw_key.clone(),
CompareOp::NotEqual,
None,
)])
discord9 marked this conversation as resolved.
Show resolved Hide resolved
.and_then(vec![TxnOp::Put(
raw_key.clone(),
flow_flow_name_value.try_as_raw_value()?,
)])
.or_else(vec![TxnOp::Get(raw_key.clone())]);

Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
))
}
}

#[cfg(test)]
Expand Down
Loading