diff --git a/Cargo.lock b/Cargo.lock index 7d22279dd99eb..03b3095ce4dfc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6252,7 +6252,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.4.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=c9da916ab29b13adcb137ba40bad9e2dc10309c4#c9da916ab29b13adcb137ba40bad9e2dc10309c4" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=14c3387c388f64e99dd2a811cdd5f554fe7680c3#14c3387c388f64e99dd2a811cdd5f554fe7680c3" dependencies = [ "anyhow", "apache-avro 0.17.0", @@ -6304,7 +6304,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-glue" version = "0.4.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=c9da916ab29b13adcb137ba40bad9e2dc10309c4#c9da916ab29b13adcb137ba40bad9e2dc10309c4" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=14c3387c388f64e99dd2a811cdd5f554fe7680c3#14c3387c388f64e99dd2a811cdd5f554fe7680c3" dependencies = [ "anyhow", "async-trait", @@ -6321,7 +6321,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.4.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=c9da916ab29b13adcb137ba40bad9e2dc10309c4#c9da916ab29b13adcb137ba40bad9e2dc10309c4" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=14c3387c388f64e99dd2a811cdd5f554fe7680c3#14c3387c388f64e99dd2a811cdd5f554fe7680c3" dependencies = [ "async-trait", "chrono", @@ -10592,6 +10592,7 @@ dependencies = [ "risingwave_common", "risingwave_connector", "risingwave_stream", + "sea-orm", "serde", "serde_yaml", "thiserror-ext", @@ -11083,6 +11084,7 @@ dependencies = [ "risingwave_common_rate_limit", "risingwave_connector_codec", "risingwave_jni_core", + "risingwave_meta_model", "risingwave_pb", "risingwave_rpc_client", "rumqttc", @@ -11092,6 +11094,7 @@ dependencies = [ "rustls-pki-types", "rw_futures_util", "scopeguard", + "sea-orm", "sea-schema", "serde", "serde_derive", diff --git a/Cargo.toml b/Cargo.toml index 52c90f06dde46..9d588f606f740 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,12 +151,12 @@ otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = prost = { version = "0.13" } prost-build = { version = "0.13" } # branch dev_rebase_main_20250325 -iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "c9da916ab29b13adcb137ba40bad9e2dc10309c4", features = [ +iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "14c3387c388f64e99dd2a811cdd5f554fe7680c3", features = [ "storage-s3", "storage-gcs", ] } -iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "c9da916ab29b13adcb137ba40bad9e2dc10309c4" } -iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "c9da916ab29b13adcb137ba40bad9e2dc10309c4" } +iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "14c3387c388f64e99dd2a811cdd5f554fe7680c3" } +iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "14c3387c388f64e99dd2a811cdd5f554fe7680c3" } opendal = "0.49" arrow-udf-js = "0.6.1" arrow-udf-wasm = { version = "0.5.1", features = ["build"] } diff --git a/src/bench/Cargo.toml b/src/bench/Cargo.toml index cfa288b6dd535..ff5b4f3cd5b99 100644 --- a/src/bench/Cargo.toml +++ b/src/bench/Cargo.toml @@ -24,6 +24,7 @@ plotters = { version = "0.3.5", default-features = false, features = [ risingwave_common = { workspace = true } risingwave_connector = { workspace = true } risingwave_stream = { workspace = true } +sea-orm = { workspace = true } serde = { version = "1", features = ["derive"] } serde_yaml = "0.9" thiserror-ext = { workspace = true } diff --git a/src/bench/sink_bench/main.rs b/src/bench/sink_bench/main.rs index 719e96602c02c..b29d4ec0307a5 100644 --- a/src/bench/sink_bench/main.rs +++ b/src/bench/sink_bench/main.rs @@ -64,6 +64,7 @@ use risingwave_connector::source::{ }; use risingwave_stream::executor::test_utils::prelude::ColumnDesc; use risingwave_stream::executor::{Barrier, Message, MessageStreamItem, StreamExecutorError}; +use sea_orm::DatabaseConnection; use serde::{Deserialize, Deserializer}; use thiserror_ext::AsReport; use tokio::sync::oneshot::Sender; @@ -382,7 +383,7 @@ where ::Coordinator: std::marker::Send, ::Coordinator: 'static, { - if let Ok(coordinator) = sink.new_coordinator().await { + if let Ok(coordinator) = sink.new_coordinator(DatabaseConnection::Disconnected).await { sink_writer_param.meta_client = Some(SinkMetaClient::MockMetaClient(MockMetaClient::new( Box::new(coordinator), ))); diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 17f584d4333f9..f59f8713330ec 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -111,6 +111,7 @@ risingwave_common_estimate_size = { workspace = true } risingwave_common_rate_limit = { workspace = true } risingwave_connector_codec = { workspace = true } risingwave_jni_core = { workspace = true } +risingwave_meta_model = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } rumqttc = { version = "0.24.0", features = ["url"] } @@ -120,6 +121,7 @@ rustls-pemfile = "2" rustls-pki-types = "1" rw_futures_util = { workspace = true } scopeguard = "1" +sea-orm = { workspace = true } sea-schema = { version = "0.16", default-features = false, features = [ "discovery", "sqlx-postgres", diff --git a/src/connector/src/sink/boxed.rs b/src/connector/src/sink/boxed.rs index db8017491e4f5..b7bedd79b0c71 100644 --- a/src/connector/src/sink/boxed.rs +++ b/src/connector/src/sink/boxed.rs @@ -20,6 +20,7 @@ use risingwave_common::array::StreamChunk; use risingwave_common::bitmap::Bitmap; use risingwave_pb::connector_service::SinkMetadata; +use super::SinkCommittedEpochSubscriber; use crate::sink::{SinkCommitCoordinator, SinkWriter}; pub type BoxWriter = Box + Send + 'static>; @@ -52,8 +53,11 @@ impl SinkWriter for BoxWriter { #[async_trait] impl SinkCommitCoordinator for BoxCoordinator { - async fn init(&mut self) -> crate::sink::Result> { - self.deref_mut().init().await + async fn init( + &mut self, + subscriber: SinkCommittedEpochSubscriber, + ) -> crate::sink::Result> { + self.deref_mut().init(subscriber).await } async fn commit(&mut self, epoch: u64, metadata: Vec) -> crate::sink::Result<()> { diff --git a/src/connector/src/sink/deltalake.rs b/src/connector/src/sink/deltalake.rs index 076af013b883b..77706325f9757 100644 --- a/src/connector/src/sink/deltalake.rs +++ b/src/connector/src/sink/deltalake.rs @@ -37,6 +37,7 @@ use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_pb::connector_service::SinkMetadata; use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; +use sea_orm::DatabaseConnection; use serde_derive::{Deserialize, Serialize}; use serde_with::{DisplayFromStr, serde_as}; use with_options::WithOptions; @@ -48,7 +49,7 @@ use super::decouple_checkpoint_log_sink::{ use super::writer::SinkWriter; use super::{ Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkCommitCoordinator, - SinkError, SinkParam, SinkWriterMetrics, SinkWriterParam, + SinkCommittedEpochSubscriber, SinkError, SinkParam, SinkWriterMetrics, SinkWriterParam, }; use crate::connector_common::AwsAuthProps; @@ -384,7 +385,7 @@ impl Sink for DeltaLakeSink { true } - async fn new_coordinator(&self) -> Result { + async fn new_coordinator(&self, _db: DatabaseConnection) -> Result { Ok(DeltaLakeSinkCommitter { table: self.config.common.create_deltalake_client().await?, }) @@ -496,7 +497,7 @@ pub struct DeltaLakeSinkCommitter { #[async_trait::async_trait] impl SinkCommitCoordinator for DeltaLakeSinkCommitter { - async fn init(&mut self) -> crate::sink::Result> { + async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result> { tracing::info!("DeltaLake commit coordinator inited."); Ok(None) } diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 3783b0fad870a..da871af0847e8 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -14,7 +14,6 @@ mod compaction; mod prometheus; - use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; use std::num::NonZeroU64; @@ -64,14 +63,20 @@ use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter}; use risingwave_common_estimate_size::EstimateSize; +use risingwave_meta_model::exactly_once_iceberg_sink::{self, Column, Entity, Model}; use risingwave_pb::connector_service::SinkMetadata; use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; +use sea_orm::{ + ColumnTrait, DatabaseConnection, EntityTrait, Order, PaginatorTrait, QueryFilter, QueryOrder, + Set, +}; use serde_derive::Deserialize; use serde_json::from_value; use serde_with::{DisplayFromStr, serde_as}; use thiserror_ext::AsReport; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc::{self}; +use tokio::sync::oneshot; use tokio_retry::Retry; use tokio_retry::strategy::{ExponentialBackoff, jitter}; use tracing::warn; @@ -84,7 +89,7 @@ use super::decouple_checkpoint_log_sink::{ }; use super::{ GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink, - SinkError, SinkWriterMetrics, SinkWriterParam, + SinkCommittedEpochSubscriber, SinkError, SinkWriterMetrics, SinkWriterParam, }; use crate::connector_common::IcebergCommon; use crate::sink::coordinate::CoordinatedSinkWriter; @@ -131,6 +136,10 @@ pub struct IcebergConfig { #[serde(default, deserialize_with = "deserialize_bool_from_string")] pub create_table_if_not_exists: bool, + /// Whether it is exactly_once, the default is not. + #[serde(default)] + #[serde_as(as = "Option")] + pub is_exactly_once: Option, // Retry commit num when iceberg commit fail. default is 8. // # TODO // Iceberg table may store the retry commit num in table meta. @@ -505,7 +514,6 @@ impl Sink for IcebergSink { inner, ) .await?; - let commit_checkpoint_interval = NonZeroU64::new(self.config.commit_checkpoint_interval).expect( "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation", @@ -522,16 +530,23 @@ impl Sink for IcebergSink { true } - async fn new_coordinator(&self) -> Result { + async fn new_coordinator(&self, db: DatabaseConnection) -> Result { let catalog = self.config.create_catalog().await?; let table = self.create_and_validate_table().await?; // FIXME(Dylan): Disable EMR serverless compaction for now. Ok(IcebergSinkCommitter { catalog, table, + is_exactly_once: self.config.is_exactly_once.unwrap_or_default(), + last_commit_epoch: 0, + sink_id: self.param.sink_id.sink_id(), + config: self.config.clone(), + param: self.param.clone(), + db, commit_notifier: None, _compact_task_guard: None, commit_retry_num: self.config.commit_retry_num, + committed_epoch_subscriber: None, }) } } @@ -1209,7 +1224,7 @@ const SCHEMA_ID: &str = "schema_id"; const PARTITION_SPEC_ID: &str = "partition_spec_id"; const DATA_FILES: &str = "data_files"; -#[derive(Default)] +#[derive(Default, Clone)] struct IcebergCommitResult { schema_id: i32, partition_spec_id: i32, @@ -1269,6 +1284,55 @@ impl IcebergCommitResult { bail!("Can't create iceberg sink write result from empty data!") } } + + fn try_from_sealized_bytes(value: Vec) -> Result { + let mut values = if let serde_json::Value::Object(value) = + serde_json::from_slice::(&value) + .context("Can't parse iceberg sink metadata")? + { + value + } else { + bail!("iceberg sink metadata should be an object"); + }; + + let schema_id; + if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) { + schema_id = value + .as_u64() + .ok_or_else(|| anyhow!("schema_id should be a u64"))?; + } else { + bail!("iceberg sink metadata should have schema_id"); + } + + let partition_spec_id; + if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) { + partition_spec_id = value + .as_u64() + .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?; + } else { + bail!("iceberg sink metadata should have partition_spec_id"); + } + + let data_files: Vec; + if let serde_json::Value::Array(values) = values + .remove(DATA_FILES) + .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))? + { + data_files = values + .into_iter() + .map(from_value::) + .collect::>() + .unwrap(); + } else { + bail!("iceberg sink metadata should have data_files object"); + } + + Ok(Self { + schema_id: schema_id as i32, + partition_spec_id: partition_spec_id as i32, + data_files, + }) + } } impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata { @@ -1307,12 +1371,49 @@ impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata { } } +impl TryFrom for Vec { + type Error = SinkError; + + fn try_from(value: IcebergCommitResult) -> std::result::Result, Self::Error> { + let json_data_files = serde_json::Value::Array( + value + .data_files + .iter() + .map(serde_json::to_value) + .collect::, _>>() + .context("Can't serialize data files to json")?, + ); + let json_value = serde_json::Value::Object( + vec![ + ( + SCHEMA_ID.to_owned(), + serde_json::Value::Number(value.schema_id.into()), + ), + ( + PARTITION_SPEC_ID.to_owned(), + serde_json::Value::Number(value.partition_spec_id.into()), + ), + (DATA_FILES.to_owned(), json_data_files), + ] + .into_iter() + .collect(), + ); + Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?) + } +} pub struct IcebergSinkCommitter { catalog: Arc, table: Table, + pub last_commit_epoch: u64, + pub(crate) is_exactly_once: bool, + pub(crate) sink_id: u32, + pub(crate) config: IcebergConfig, + pub(crate) param: SinkParam, + pub(crate) db: DatabaseConnection, commit_notifier: Option>, commit_retry_num: u32, _compact_task_guard: Option>, + pub(crate) committed_epoch_subscriber: Option, } impl IcebergSinkCommitter { @@ -1348,16 +1449,90 @@ impl IcebergSinkCommitter { #[async_trait::async_trait] impl SinkCommitCoordinator for IcebergSinkCommitter { - async fn init(&mut self) -> crate::sink::Result> { + async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result> { + if self.is_exactly_once { + self.committed_epoch_subscriber = Some(subscriber); + tracing::info!( + "Sink id = {}: iceberg sink coordinator initing.", + self.param.sink_id.sink_id() + ); + if self + .iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id.sink_id()) + .await? + { + let ordered_metadata_list_by_end_epoch = self + .get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id.sink_id()) + .await?; + + let mut last_recommit_epoch = 0; + for (end_epoch, sealized_bytes, snapshot_id, committed) in + ordered_metadata_list_by_end_epoch + { + let write_results_bytes = deserialize_metadata(sealized_bytes); + let mut write_results = vec![]; + + for each in write_results_bytes { + let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?; + write_results.push(write_result); + } + + match ( + committed, + self.is_snapshot_id_in_iceberg(&self.config, snapshot_id) + .await?, + ) { + (true, _) => { + tracing::info!( + "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.", + self.param.sink_id.sink_id() + ); + } + (false, true) => { + // skip + tracing::info!( + "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.", + self.param.sink_id.sink_id() + ); + self.mark_row_is_committed_by_sink_id_and_end_epoch( + &self.db, + self.sink_id, + end_epoch, + ) + .await?; + } + (false, false) => { + tracing::info!( + "Sink id = {}: there are files that were not successfully committed; re-commit these files.", + self.param.sink_id.sink_id() + ); + self.re_commit(end_epoch, write_results, snapshot_id) + .await?; + } + } + + last_recommit_epoch = end_epoch; + } + tracing::info!( + "Sink id = {}: iceberg commit coordinator inited.", + self.param.sink_id.sink_id() + ); + return Ok(Some(last_recommit_epoch)); + } else { + tracing::info!( + "Sink id = {}: init iceberg coodinator, and system table is empty.", + self.param.sink_id.sink_id() + ); + return Ok(None); + } + } + tracing::info!("Iceberg commit coordinator inited."); - // todo(wcy-fdu): The operation of the exactly once sink in the recovery phase will be performed here, and the returned rewind start offset will be updated. - // refer to https://github.com/risingwavelabs/risingwave/pull/19771/files#diff-4eafd6e83f9e3fc16b46073e7f3f65261a06c4fac63a997c89abbb1fdd2ad724R1375-R1415 - Ok(None) + return Ok(None); } async fn commit(&mut self, epoch: u64, metadata: Vec) -> Result<()> { tracing::info!("Starting iceberg commit in epoch {epoch}."); - let write_results = metadata + let write_results: Vec = metadata .iter() .map(IcebergCommitResult::try_from) .collect::>>()?; @@ -1367,6 +1542,7 @@ impl SinkCommitCoordinator for IcebergSinkCommitter { tracing::debug!(?epoch, "no data to commit"); return Ok(()); } + // guarantee that all write results has same schema_id and partition_spec_id if write_results .iter() @@ -1380,6 +1556,84 @@ impl SinkCommitCoordinator for IcebergSinkCommitter { ))); } + if self.is_exactly_once { + assert!(self.committed_epoch_subscriber.is_some()); + match self.committed_epoch_subscriber.clone() { + Some(committed_epoch_subscriber) => { + // Get the latest committed_epoch and the receiver + let (committed_epoch, mut rw_futures_utilrx) = + committed_epoch_subscriber(self.param.sink_id).await?; + // The exactly once commit process needs to start after the data corresponding to the current epoch is persisted in the log store. + if committed_epoch >= epoch { + self.commit_iceberg_inner(epoch, write_results, None) + .await?; + } else { + tracing::info!( + "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}", + committed_epoch, + epoch + ); + while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await { + tracing::info!( + "Received next committed epoch: {}", + next_committed_epoch + ); + // If next_epoch meets the condition, execute commit immediately + if next_committed_epoch >= epoch { + self.commit_iceberg_inner(epoch, write_results, None) + .await?; + break; + } + } + } + } + None => unreachable!( + "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized." + ), + } + } else { + self.commit_iceberg_inner(epoch, write_results, None) + .await?; + } + + Ok(()) + } +} + +/// Methods Required to Achieve Exactly Once Semantics +impl IcebergSinkCommitter { + async fn re_commit( + &mut self, + epoch: u64, + write_results: Vec, + snapshot_id: i64, + ) -> Result<()> { + tracing::info!("Starting iceberg re commit in epoch {epoch}."); + + // Skip if no data to commit + if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) { + tracing::debug!(?epoch, "no data to commit"); + return Ok(()); + } + self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id)) + .await?; + Ok(()) + } + + async fn commit_iceberg_inner( + &mut self, + epoch: u64, + write_results: Vec, + snapshot_id: Option, + ) -> Result<()> { + // If the provided `snapshot_id`` is not None, it indicates that this commit is a re commit + // occurring during the recovery phase. In this case, we need to use the `snapshot_id` + // that was previously persisted in the system table to commit. + let is_first_commit = snapshot_id.is_none(); + if !is_first_commit { + tracing::info!("Doing iceberg re commit."); + } + self.last_commit_epoch = epoch; let expect_schema_id = write_results[0].schema_id; let expect_partition_spec_id = write_results[0].partition_spec_id; @@ -1413,6 +1667,33 @@ impl SinkCommitCoordinator for IcebergSinkCommitter { .partition_type(schema) .map_err(|err| SinkError::Iceberg(anyhow!(err)))?; + let txn = Transaction::new(&self.table); + // Only generate new snapshot id when first commit. + let snapshot_id = match snapshot_id { + Some(previous_snapshot_id) => previous_snapshot_id, + None => txn.generate_unique_snapshot_id(), + }; + if self.is_exactly_once && is_first_commit { + // persist pre commit metadata and snapshot id in system table. + let mut pre_commit_metadata_bytes = Vec::new(); + for each_parallelism_write_result in write_results.clone() { + let each_parallelism_write_result_bytes: Vec = + each_parallelism_write_result.try_into()?; + pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes); + } + + let pre_commit_metadata_bytes: Vec = serialize_metadata(pre_commit_metadata_bytes); + + self.persist_pre_commit_metadata( + self.db.clone(), + self.last_commit_epoch, + epoch, + pre_commit_metadata_bytes, + snapshot_id, + ) + .await?; + } + let data_files = write_results .into_iter() .flat_map(|r| { @@ -1422,7 +1703,6 @@ impl SinkCommitCoordinator for IcebergSinkCommitter { }) }) .collect::>>()?; - // # TODO: // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964) // because retry logic involved reapply the commit metadata. @@ -1443,7 +1723,7 @@ impl SinkCommitCoordinator for IcebergSinkCommitter { .await?; let txn = Transaction::new(&table); let mut append_action = txn - .fast_append(None, None, vec![]) + .fast_append(Some(snapshot_id), None, vec![]) .map_err(|err| SinkError::Iceberg(anyhow!(err)))?; append_action .add_data_files(data_files.clone()) @@ -1466,8 +1746,186 @@ impl SinkCommitCoordinator for IcebergSinkCommitter { } } tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}."); + + if self.is_exactly_once { + self.mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch) + .await?; + tracing::info!( + "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.", + self.sink_id, + epoch + ); + + self.delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch) + .await?; + } Ok(()) } + + async fn persist_pre_commit_metadata( + &self, + db: DatabaseConnection, + start_epoch: u64, + end_epoch: u64, + pre_commit_metadata: Vec, + snapshot_id: i64, + ) -> Result<()> { + let m = exactly_once_iceberg_sink::ActiveModel { + sink_id: Set(self.sink_id as i32), + end_epoch: Set(end_epoch.try_into().unwrap()), + start_epoch: Set(start_epoch.try_into().unwrap()), + metadata: Set(pre_commit_metadata), + committed: Set(false), + snapshot_id: Set(snapshot_id), + }; + match exactly_once_iceberg_sink::Entity::insert(m).exec(&db).await { + Ok(_) => Ok(()), + Err(e) => { + tracing::error!("Error inserting into system table: {:?}", e.as_report()); + Err(e.into()) + } + } + } + + async fn mark_row_is_committed_by_sink_id_and_end_epoch( + &self, + db: &DatabaseConnection, + sink_id: u32, + end_epoch: u64, + ) -> Result<()> { + match Entity::update(exactly_once_iceberg_sink::ActiveModel { + sink_id: Set(sink_id as i32), + end_epoch: Set(end_epoch.try_into().unwrap()), + committed: Set(true), + ..Default::default() + }) + .exec(db) + .await + { + Ok(_) => { + tracing::info!( + "Sink id = {}: mark written data status to committed, end_epoch = {}.", + sink_id, + end_epoch + ); + Ok(()) + } + Err(e) => { + tracing::error!( + "Error marking item to committed from iceberg exactly once system table: {:?}", + e.as_report() + ); + Err(e.into()) + } + } + } + + async fn delete_row_by_sink_id_and_end_epoch( + &self, + db: &DatabaseConnection, + sink_id: u32, + end_epoch: u64, + ) -> Result<()> { + let end_epoch_i64: i64 = end_epoch.try_into().unwrap(); + match Entity::delete_many() + .filter(Column::SinkId.eq(sink_id)) + .filter(Column::EndEpoch.lt(end_epoch_i64)) + .exec(db) + .await + { + Ok(result) => { + let deleted_count = result.rows_affected; + + if deleted_count == 0 { + tracing::info!( + "Sink id = {}: no item deleted in iceberg exactly once system table, end_epoch < {}.", + sink_id, + end_epoch + ); + } else { + tracing::info!( + "Sink id = {}: deleted item in iceberg exactly once system table, end_epoch < {}.", + sink_id, + end_epoch + ); + } + Ok(()) + } + Err(e) => { + tracing::error!( + "Sink id = {}: error deleting from iceberg exactly once system table: {:?}", + sink_id, + e.as_report() + ); + Err(e.into()) + } + } + } + + async fn iceberg_sink_has_pre_commit_metadata( + &self, + db: &DatabaseConnection, + sink_id: u32, + ) -> Result { + match exactly_once_iceberg_sink::Entity::find() + .filter(exactly_once_iceberg_sink::Column::SinkId.eq(sink_id as i32)) + .count(db) + .await + { + Ok(count) => Ok(count > 0), + Err(e) => { + tracing::error!( + "Error querying pre-commit metadata from system table: {:?}", + e.as_report() + ); + Err(e.into()) + } + } + } + + async fn get_pre_commit_info_by_sink_id( + &self, + db: &DatabaseConnection, + sink_id: u32, + ) -> Result, i64, bool)>> { + let models: Vec = Entity::find() + .filter(Column::SinkId.eq(sink_id as i32)) + .order_by(Column::EndEpoch, Order::Asc) + .all(db) + .await?; + + let mut result: Vec<(u64, Vec, i64, bool)> = Vec::new(); + + for model in models { + result.push(( + model.end_epoch.try_into().unwrap(), + model.metadata, + model.snapshot_id, + model.committed, + )); + } + + Ok(result) + } + + /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files. + /// Therefore, the logic for checking whether all files in this batch are present in Iceberg + /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg. + async fn is_snapshot_id_in_iceberg( + &self, + iceberg_config: &IcebergConfig, + snapshot_id: i64, + ) -> Result { + let iceberg_common = iceberg_config.common.clone(); + let table = iceberg_common + .load_table(&iceberg_config.java_catalog_props) + .await?; + if table.metadata().snapshot_by_id(snapshot_id).is_some() { + Ok(true) + } else { + Ok(false) + } + } } const MAP_KEY: &str = "key"; @@ -1571,10 +2029,7 @@ fn check_compatibility( } /// Try to match our schema with iceberg schema. -pub fn try_matches_arrow_schema( - rw_schema: &Schema, - arrow_schema: &ArrowSchema, -) -> anyhow::Result<()> { +pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> { if rw_schema.fields.len() != arrow_schema.fields().len() { bail!( "Schema length mismatch, risingwave is {}, and iceberg is {}", @@ -1594,6 +2049,14 @@ pub fn try_matches_arrow_schema( Ok(()) } +pub fn serialize_metadata(metadata: Vec>) -> Vec { + serde_json::to_vec(&metadata).unwrap() +} + +pub fn deserialize_metadata(bytes: Vec) -> Vec> { + serde_json::from_slice(&bytes).unwrap() +} + #[cfg(test)] mod test { use std::collections::BTreeMap; @@ -1834,6 +2297,7 @@ mod test { .collect(), commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE, create_table_if_not_exists: false, + is_exactly_once: None, commit_retry_num: 8, }; diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 3c543bda618ae..ef8f2a97973bf 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -47,7 +47,7 @@ pub mod writer; use std::collections::BTreeMap; use std::future::Future; -use std::sync::LazyLock; +use std::sync::{Arc, LazyLock}; use ::clickhouse::error::Error as ClickHouseError; use ::deltalake::DeltaTableError; @@ -60,6 +60,7 @@ use decouple_checkpoint_log_sink::{ DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE, }; use deltalake::DELTALAKE_SINK; +use futures::future::BoxFuture; use iceberg::ICEBERG_SINK; use opendal::Error as OpendalError; use prometheus::Registry; @@ -82,9 +83,11 @@ use risingwave_pb::catalog::PbSinkType; use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema}; use risingwave_rpc_client::MetaClient; use risingwave_rpc_client::error::RpcError; +use sea_orm::DatabaseConnection; use starrocks::STARROCKS_SINK; use thiserror::Error; use thiserror_ext::AsReport; +use tokio::sync::mpsc::UnboundedReceiver; pub use tracing; use self::catalog::{SinkFormatDesc, SinkType}; @@ -651,7 +654,7 @@ pub trait Sink: TryFrom { } #[expect(clippy::unused_async)] - async fn new_coordinator(&self) -> Result { + async fn new_coordinator(&self, _db: DatabaseConnection) -> Result { Err(SinkError::Coordinator(anyhow!("no coordinator"))) } } @@ -697,11 +700,17 @@ pub trait LogSinker: 'static + Send { // Note: Please rebuild the log reader's read stream before consuming the log store, async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result; } +pub type SinkCommittedEpochSubscriber = Arc< + dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver)>> + + Send + + Sync + + 'static, +>; #[async_trait] pub trait SinkCommitCoordinator { /// Initialize the sink committer coordinator, return the log store rewind start offset. - async fn init(&mut self) -> Result>; + async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result>; /// After collecting the metadata from each sink writer, a coordinator will call `commit` with /// the set of metadata. The metadata is serialized into bytes, because the metadata is expected /// to be passed between different gRPC node, so in this general trait, the metadata is @@ -713,7 +722,7 @@ pub struct DummySinkCommitCoordinator; #[async_trait] impl SinkCommitCoordinator for DummySinkCommitCoordinator { - async fn init(&mut self) -> Result> { + async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result> { Ok(None) } @@ -931,6 +940,12 @@ pub enum SinkError { ), } +impl From for SinkError { + fn from(err: sea_orm::DbErr) -> Self { + SinkError::Iceberg(anyhow!(err)) + } +} + impl From for SinkError { fn from(error: OpendalError) -> Self { SinkError::File(error.to_report_string()) diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 942a7ff7cc262..138cecff5afeb 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -50,6 +50,7 @@ use risingwave_rpc_client::{ SinkWriterStreamHandle, }; use rw_futures_util::drop_either_future; +use sea_orm::DatabaseConnection; use thiserror_ext::AsReport; use tokio::sync::mpsc; use tokio::sync::mpsc::{Receiver, unbounded_channel}; @@ -57,6 +58,7 @@ use tokio::task::spawn_blocking; use tokio_stream::wrappers::ReceiverStream; use tracing::warn; +use super::SinkCommittedEpochSubscriber; use super::elasticsearch_opensearch::elasticsearch_converter::{ StreamChunkConverter, is_remote_es_sink, }; @@ -556,7 +558,7 @@ impl Sink for CoordinatedRemoteSink { true } - async fn new_coordinator(&self) -> Result { + async fn new_coordinator(&self, _db: DatabaseConnection) -> Result { RemoteCoordinator::new::(self.param.clone()).await } } @@ -676,7 +678,7 @@ impl RemoteCoordinator { #[async_trait] impl SinkCommitCoordinator for RemoteCoordinator { - async fn init(&mut self) -> crate::sink::Result> { + async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result> { Ok(None) } diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index c0c7610737e99..9317083547b29 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -28,6 +28,7 @@ use risingwave_common::types::DataType; use risingwave_pb::connector_service::SinkMetadata; use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; +use sea_orm::DatabaseConnection; use serde::Deserialize; use serde_derive::Serialize; use serde_json::Value; @@ -43,8 +44,8 @@ use super::doris_starrocks_connector::{ }; use super::encoder::{JsonEncoder, RowEncoder}; use super::{ - SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkCommitCoordinator, SinkError, - SinkParam, SinkWriterMetrics, + SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkCommitCoordinator, + SinkCommittedEpochSubscriber, SinkError, SinkParam, SinkWriterMetrics, }; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf; @@ -342,7 +343,7 @@ impl Sink for StarrocksSink { true } - async fn new_coordinator(&self) -> Result { + async fn new_coordinator(&self, _db: DatabaseConnection) -> Result { let header = HeaderBuilder::new() .add_common_header() .set_user_password( @@ -896,7 +897,7 @@ pub struct StarrocksSinkCommitter { #[async_trait::async_trait] impl SinkCommitCoordinator for StarrocksSinkCommitter { - async fn init(&mut self) -> crate::sink::Result> { + async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result> { tracing::info!("Starrocks commit coordinator inited."); Ok(None) } diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index e74dd0a18012b..128df9f3490ba 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -544,6 +544,11 @@ IcebergConfig: field_type: bool required: false default: Default::default + - name: is_exactly_once + field_type: bool + comments: Whether it is exactly_once, the default is not. + required: false + default: Default::default - name: commit_retry_num field_type: u32 required: false diff --git a/src/meta/model/migration/src/lib.rs b/src/meta/model/migration/src/lib.rs index df98e19b5dc25..5468b37a3f152 100644 --- a/src/meta/model/migration/src/lib.rs +++ b/src/meta/model/migration/src/lib.rs @@ -36,6 +36,7 @@ mod m20250106_072104_fragment_relation; mod m20250121_085800_change_wasm_udf_identifier; mod m20250210_170743_function_options; mod m20250319_062702_mysql_utf8mb4; +mod m20250325_061743_exactly_once_iceberg_sink_metadata; mod utils; pub struct Migrator; @@ -110,6 +111,7 @@ impl MigratorTrait for Migrator { Box::new(m20250121_085800_change_wasm_udf_identifier::Migration), Box::new(m20250210_170743_function_options::Migration), Box::new(m20250319_062702_mysql_utf8mb4::Migration), + Box::new(m20250325_061743_exactly_once_iceberg_sink_metadata::Migration), ] } } diff --git a/src/meta/model/migration/src/m20250325_061743_exactly_once_iceberg_sink_metadata.rs b/src/meta/model/migration/src/m20250325_061743_exactly_once_iceberg_sink_metadata.rs new file mode 100644 index 0000000000000..6588f8851931e --- /dev/null +++ b/src/meta/model/migration/src/m20250325_061743_exactly_once_iceberg_sink_metadata.rs @@ -0,0 +1,72 @@ +use sea_orm_migration::prelude::*; + +use crate::utils::ColumnDefExt; + +#[derive(DeriveMigrationName)] +pub struct Migration; +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(ExactlyOnceIcebergSinkMetadata::Table) + .if_not_exists() + .col( + ColumnDef::new(ExactlyOnceIcebergSinkMetadata::SinkId) + .integer() + .not_null(), + ) + .col( + ColumnDef::new(ExactlyOnceIcebergSinkMetadata::EndEpoch) + .big_integer() + .not_null(), + ) + .col( + ColumnDef::new(ExactlyOnceIcebergSinkMetadata::StartEpoch) + .big_integer() + .not_null(), + ) + .col( + ColumnDef::new(ExactlyOnceIcebergSinkMetadata::Metadata) + .rw_binary(manager) + .not_null(), + ) + .col( + ColumnDef::new(ExactlyOnceIcebergSinkMetadata::SnapshotId) + .big_integer() + .not_null(), + ) + .col( + ColumnDef::new(ExactlyOnceIcebergSinkMetadata::Committed) + .boolean() + .not_null(), + ) + .primary_key( + Index::create() + .col(ExactlyOnceIcebergSinkMetadata::SinkId) + .col(ExactlyOnceIcebergSinkMetadata::EndEpoch), + ) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + crate::drop_tables!(manager, ExactlyOnceIcebergSinkMetadata); + Ok(()) + } +} + +#[derive(DeriveIden)] +enum ExactlyOnceIcebergSinkMetadata { + Table, + SinkId, + EndEpoch, + StartEpoch, + Metadata, + SnapshotId, + Committed, +} diff --git a/src/meta/model/src/exactly_once_iceberg_sink.rs b/src/meta/model/src/exactly_once_iceberg_sink.rs new file mode 100644 index 0000000000000..c10b9ec2543ad --- /dev/null +++ b/src/meta/model/src/exactly_once_iceberg_sink.rs @@ -0,0 +1,35 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use sea_orm::entity::prelude::*; +use sea_orm::{DeriveEntityModel, DeriveRelation, EnumIter}; + +use crate::{Epoch, SinkId}; +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default)] +#[sea_orm(table_name = "exactly_once_iceberg_sink_metadata")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub sink_id: SinkId, + #[sea_orm(primary_key, auto_increment = false)] + pub end_epoch: Epoch, + pub start_epoch: Epoch, + pub metadata: ::prost::alloc::vec::Vec, + pub snapshot_id: i64, + pub committed: bool, +} + +impl ActiveModelBehavior for ActiveModel {} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} diff --git a/src/meta/model/src/lib.rs b/src/meta/model/src/lib.rs index 0266b61efb335..f3e814107c894 100644 --- a/src/meta/model/src/lib.rs +++ b/src/meta/model/src/lib.rs @@ -33,6 +33,7 @@ pub mod compaction_status; pub mod compaction_task; pub mod connection; pub mod database; +pub mod exactly_once_iceberg_sink; pub mod fragment; pub mod fragment_relation; pub mod function; diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 63466e7ea314b..645dcc6e04740 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -459,8 +459,11 @@ pub async fn start_service_as_election_leader( ); tracing::info!("SourceManager started"); - let (sink_manager, shutdown_handle) = - SinkCoordinatorManager::start_worker(hummock_manager.clone(), metadata_manager.clone()); + let (sink_manager, shutdown_handle) = SinkCoordinatorManager::start_worker( + env.meta_store_ref().conn.clone(), + hummock_manager.clone(), + metadata_manager.clone(), + ); tracing::info!("SinkCoordinatorManager started"); // TODO(shutdown): remove this as there's no need to gracefully shutdown some of these sub-tasks. let mut sub_tasks = vec![shutdown_handle]; diff --git a/src/meta/src/controller/catalog/drop_op.rs b/src/meta/src/controller/catalog/drop_op.rs index 103cfe56e3ac5..64d897bd486b6 100644 --- a/src/meta/src/controller/catalog/drop_op.rs +++ b/src/meta/src/controller/catalog/drop_op.rs @@ -14,7 +14,7 @@ use risingwave_pb::catalog::PbTable; use risingwave_pb::telemetry::PbTelemetryDatabaseObject; -use sea_orm::DatabaseTransaction; +use sea_orm::{ColumnTrait, DatabaseTransaction, EntityTrait, QueryFilter}; use super::*; impl CatalogController { @@ -28,6 +28,7 @@ impl CatalogController { ) -> MetaResult<(ReleaseContext, NotificationVersion)> { let mut inner = self.inner.write().await; let txn = inner.db.begin().await?; + let obj: PartialObject = Object::find_by_id(object_id) .into_partial_model() .one(&txn) @@ -86,7 +87,6 @@ impl CatalogController { }, }; removed_objects.push(obj); - let mut removed_object_ids: HashSet<_> = removed_objects.iter().map(|obj| obj.oid).collect(); diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index 0633d56d32568..88d717f3eabbf 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -200,3 +200,15 @@ impl From for MetaError { } } } + +impl From for SinkError { + fn from(e: MetaErrorInner) -> Self { + SinkError::Coordinator(e.into()) + } +} + +impl From for SinkError { + fn from(e: MetaError) -> Self { + SinkError::Coordinator(e.into()) + } +} diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index f11397ca7b7a2..8afa46eec7189 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -42,6 +42,7 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}; use tokio::sync::{Mutex, Semaphore}; use tonic::Streaming; +use crate::MetaResult; use crate::hummock::CompactorManagerRef; use crate::hummock::compaction::CompactStatus; use crate::hummock::error::Result; @@ -103,7 +104,6 @@ impl TableCommittedEpochNotifiers { }) } } - // Update to states are performed as follow: // - Initialize ValTransaction for the meta state to update // - Make changes on the ValTransaction. @@ -166,7 +166,6 @@ macro_rules! start_measure_real_process_timer { } pub(crate) use start_measure_real_process_timer; -use crate::MetaResult; use crate::controller::SqlMetaStore; use crate::hummock::manager::compaction_group_manager::CompactionGroupManager; use crate::hummock::manager::worker::HummockManagerEventSender; diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 7c7d654a75bad..d4882283662b9 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -29,7 +29,7 @@ use risingwave_pb::hummock::{ }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; -use crate::hummock::manager::TableCommittedEpochNotifiers; +use super::TableCommittedEpochNotifiers; use crate::hummock::model::CompactionGroup; use crate::manager::NotificationManager; use crate::model::{ diff --git a/src/meta/src/manager/sink_coordination/coordinator_worker.rs b/src/meta/src/manager/sink_coordination/coordinator_worker.rs index daf4558c8ebc5..f72502542a2e8 100644 --- a/src/meta/src/manager/sink_coordination/coordinator_worker.rs +++ b/src/meta/src/manager/sink_coordination/coordinator_worker.rs @@ -24,8 +24,11 @@ use futures::pin_mut; use itertools::Itertools; use risingwave_common::bitmap::Bitmap; use risingwave_connector::dispatch_sink; -use risingwave_connector::sink::{Sink, SinkCommitCoordinator, SinkParam, build_sink}; +use risingwave_connector::sink::{ + Sink, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkParam, build_sink, +}; use risingwave_pb::connector_service::SinkMetadata; +use sea_orm::DatabaseConnection; use thiserror_ext::AsReport; use tokio::select; use tokio::sync::mpsc::UnboundedReceiver; @@ -34,7 +37,6 @@ use tonic::Status; use tracing::{error, warn}; use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle; -use crate::manager::sink_coordination::manager::SinkCommittedEpochSubscriber; async fn run_future_with_periodic_fn( future: F, @@ -197,7 +199,8 @@ impl CoordinatorWorker { pub async fn run( param: SinkParam, request_rx: UnboundedReceiver, - _subscriber: SinkCommittedEpochSubscriber, + db: DatabaseConnection, + subscriber: SinkCommittedEpochSubscriber, ) { let sink = match build_sink(param.clone()) { Ok(sink) => sink, @@ -210,8 +213,9 @@ impl CoordinatorWorker { return; } }; + dispatch_sink!(sink, sink, { - let coordinator = match sink.new_coordinator().await { + let coordinator = match sink.new_coordinator(db).await { Ok(coordinator) => coordinator, Err(e) => { error!( @@ -222,7 +226,7 @@ impl CoordinatorWorker { return; } }; - Self::execute_coordinator(param, request_rx, coordinator).await + Self::execute_coordinator(param, request_rx, coordinator, subscriber).await }); } @@ -230,6 +234,7 @@ impl CoordinatorWorker { param: SinkParam, request_rx: UnboundedReceiver, coordinator: impl SinkCommitCoordinator, + subscriber: SinkCommittedEpochSubscriber, ) { let mut worker = CoordinatorWorker { handle_manager: CoordinationHandleManager { @@ -242,7 +247,7 @@ impl CoordinatorWorker { pending_epochs: Default::default(), }; - if let Err(e) = worker.run_coordination(coordinator).await { + if let Err(e) = worker.run_coordination(coordinator, subscriber).await { for handle in worker.handle_manager.writer_handles.into_values() { handle.abort(Status::internal(format!( "failed to run coordination: {:?}", @@ -255,8 +260,10 @@ impl CoordinatorWorker { async fn run_coordination( &mut self, mut coordinator: impl SinkCommitCoordinator, + subscriber: SinkCommittedEpochSubscriber, ) -> anyhow::Result<()> { - self.handle_manager.initial_log_store_rewind_start_epoch = coordinator.init().await?; + self.handle_manager.initial_log_store_rewind_start_epoch = + coordinator.init(subscriber).await?; loop { let (handle_id, vnode_bitmap, epoch, metadata) = self.handle_manager.next_commit_request().await?; @@ -272,7 +279,7 @@ impl CoordinatorWorker { .can_commit() { let (epoch, requests) = self.pending_epochs.pop_first().expect("non-empty"); - // TODO: measure commit time + let start_time = Instant::now(); run_future_with_periodic_fn( coordinator.commit(epoch, requests.metadatas), diff --git a/src/meta/src/manager/sink_coordination/manager.rs b/src/meta/src/manager/sink_coordination/manager.rs index 14683069df15a..848ccbe32c8f9 100644 --- a/src/meta/src/manager/sink_coordination/manager.rs +++ b/src/meta/src/manager/sink_coordination/manager.rs @@ -21,11 +21,12 @@ use futures::future::{BoxFuture, Either, select}; use futures::stream::FuturesUnordered; use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; use risingwave_common::bitmap::Bitmap; -use risingwave_connector::sink::SinkParam; use risingwave_connector::sink::catalog::SinkId; +use risingwave_connector::sink::{SinkCommittedEpochSubscriber, SinkError, SinkParam}; use risingwave_pb::connector_service::coordinate_request::Msg; use risingwave_pb::connector_service::{CoordinateRequest, CoordinateResponse, coordinate_request}; use rw_futures_util::pending_on_none; +use sea_orm::DatabaseConnection; use thiserror_ext::AsReport; use tokio::sync::mpsc; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}; @@ -35,7 +36,6 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::Status; use tracing::{debug, error, info, warn}; -use crate::MetaResult; use crate::hummock::HummockManagerRef; use crate::manager::MetadataManager; use crate::manager::sink_coordination::SinkWriterRequestStream; @@ -74,13 +74,6 @@ pub struct SinkCoordinatorManager { request_tx: mpsc::Sender, } -pub(super) type SinkCommittedEpochSubscriber = Arc< - dyn Fn(SinkId) -> BoxFuture<'static, MetaResult<(u64, UnboundedReceiver)>> - + Send - + Sync - + 'static, ->; - fn new_committed_epoch_subscriber( hummock_manager: HummockManagerRef, metadata_manager: MetadataManager, @@ -91,13 +84,15 @@ fn new_committed_epoch_subscriber( async move { let state_table_ids = metadata_manager .get_sink_state_table_ids(sink_id.sink_id as _) - .await?; + .await + .map_err(SinkError::from)?; let Some(table_id) = state_table_ids.first() else { return Err(anyhow!("no state table id in sink: {}", sink_id).into()); }; hummock_manager .subscribe_table_committed_epoch(*table_id) .await + .map_err(SinkError::from) } .boxed() }) @@ -105,6 +100,7 @@ fn new_committed_epoch_subscriber( impl SinkCoordinatorManager { pub fn start_worker( + db: DatabaseConnection, hummock_manager: HummockManagerRef, metadata_manager: MetadataManager, ) -> (Self, (JoinHandle<()>, Sender<()>)) { @@ -114,6 +110,7 @@ impl SinkCoordinatorManager { tokio::spawn(CoordinatorWorker::run( param, manager_request_stream, + db.clone(), subscriber.clone(), )) }) @@ -382,6 +379,7 @@ impl ManagerWorker { mod tests { use std::future::{Future, poll_fn}; use std::pin::pin; + use std::sync::Arc; use std::task::Poll; use anyhow::anyhow; @@ -397,10 +395,12 @@ mod tests { use risingwave_pb::connector_service::SinkMetadata; use risingwave_pb::connector_service::sink_metadata::{Metadata, SerializedMetadata}; use risingwave_rpc_client::CoordinatorStreamHandle; + use tokio::sync::mpsc::unbounded_channel; use tokio_stream::wrappers::ReceiverStream; use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker; + use crate::manager::sink_coordination::manager::SinkCommittedEpochSubscriber; struct MockCoordinator, &mut C) -> Result<(), SinkError>> { context: C, @@ -417,7 +417,10 @@ mod tests { impl, &mut C) -> Result<(), SinkError> + Send> SinkCommitCoordinator for MockCoordinator { - async fn init(&mut self) -> risingwave_connector::sink::Result> { + async fn init( + &mut self, + _subscriber: SinkCommittedEpochSubscriber, + ) -> risingwave_connector::sink::Result> { Ok(None) } @@ -464,6 +467,11 @@ mod tests { [vec![1u8, 2u8], vec![3u8, 4u8]], [vec![5u8, 6u8], vec![7u8, 8u8]], ]; + let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| { + let (_sender, receiver) = unbounded_channel(); + + async move { Ok((1, receiver)) }.boxed() + }); let (manager, (_join_handle, _stop_tx)) = SinkCoordinatorManager::start_worker_with_spawn_worker({ @@ -472,46 +480,54 @@ mod tests { move |param, new_writer_rx| { let metadata = metadata.clone(); let expected_param = expected_param.clone(); - tokio::spawn(async move { - // validate the start request - assert_eq!(param, expected_param); - CoordinatorWorker::execute_coordinator( - param.clone(), - new_writer_rx, - MockCoordinator::new(0, |epoch, metadata_list, count: &mut usize| { - *count += 1; - let mut metadata_list = metadata_list - .into_iter() - .map(|metadata| match metadata { - SinkMetadata { - metadata: - Some(Metadata::Serialized(SerializedMetadata { - metadata, - })), - } => metadata, - _ => unreachable!(), - }) - .collect_vec(); - metadata_list.sort(); - match *count { - 1 => { - assert_eq!(epoch, epoch1); - assert_eq!(2, metadata_list.len()); - assert_eq!(metadata[0][0], metadata_list[0]); - assert_eq!(metadata[0][1], metadata_list[1]); - } - 2 => { - assert_eq!(epoch, epoch2); - assert_eq!(2, metadata_list.len()); - assert_eq!(metadata[1][0], metadata_list[0]); - assert_eq!(metadata[1][1], metadata_list[1]); - } - _ => unreachable!(), - } - Ok(()) - }), - ) - .await; + tokio::spawn({ + let subscriber = mock_subscriber.clone(); + async move { + // validate the start request + assert_eq!(param, expected_param); + CoordinatorWorker::execute_coordinator( + param.clone(), + new_writer_rx, + MockCoordinator::new( + 0, + |epoch, metadata_list, count: &mut usize| { + *count += 1; + let mut metadata_list = + metadata_list + .into_iter() + .map(|metadata| match metadata { + SinkMetadata { + metadata: + Some(Metadata::Serialized( + SerializedMetadata { metadata }, + )), + } => metadata, + _ => unreachable!(), + }) + .collect_vec(); + metadata_list.sort(); + match *count { + 1 => { + assert_eq!(epoch, epoch1); + assert_eq!(2, metadata_list.len()); + assert_eq!(metadata[0][0], metadata_list[0]); + assert_eq!(metadata[0][1], metadata_list[1]); + } + 2 => { + assert_eq!(epoch, epoch2); + assert_eq!(2, metadata_list.len()); + assert_eq!(metadata[1][0], metadata_list[0]); + assert_eq!(metadata[1][1], metadata_list[1]); + } + _ => unreachable!(), + } + Ok(()) + }, + ), + subscriber.clone(), + ) + .await; + } }) } }); @@ -631,7 +647,11 @@ mod tests { let vnode = build_bitmap(&all_vnode); let metadata = [vec![1u8, 2u8], vec![3u8, 4u8]]; + let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| { + let (_sender, receiver) = unbounded_channel(); + async move { Ok((1, receiver)) }.boxed() + }); let (manager, (_join_handle, _stop_tx)) = SinkCoordinatorManager::start_worker_with_spawn_worker({ let expected_param = param.clone(); @@ -639,44 +659,52 @@ mod tests { move |param, new_writer_rx| { let metadata = metadata.clone(); let expected_param = expected_param.clone(); - tokio::spawn(async move { - // validate the start request - assert_eq!(param, expected_param); - CoordinatorWorker::execute_coordinator( - param.clone(), - new_writer_rx, - MockCoordinator::new(0, |epoch, metadata_list, count: &mut usize| { - *count += 1; - let mut metadata_list = metadata_list - .into_iter() - .map(|metadata| match metadata { - SinkMetadata { - metadata: - Some(Metadata::Serialized(SerializedMetadata { - metadata, - })), - } => metadata, - _ => unreachable!(), - }) - .collect_vec(); - metadata_list.sort(); - match *count { - 1 => { - assert_eq!(epoch, epoch1); - assert_eq!(1, metadata_list.len()); - assert_eq!(metadata[0], metadata_list[0]); - } - 2 => { - assert_eq!(epoch, epoch2); - assert_eq!(1, metadata_list.len()); - assert_eq!(metadata[1], metadata_list[0]); - } - _ => unreachable!(), - } - Ok(()) - }), - ) - .await; + tokio::spawn({ + let subscriber = mock_subscriber.clone(); + async move { + // validate the start request + assert_eq!(param, expected_param); + CoordinatorWorker::execute_coordinator( + param.clone(), + new_writer_rx, + MockCoordinator::new( + 0, + |epoch, metadata_list, count: &mut usize| { + *count += 1; + let mut metadata_list = + metadata_list + .into_iter() + .map(|metadata| match metadata { + SinkMetadata { + metadata: + Some(Metadata::Serialized( + SerializedMetadata { metadata }, + )), + } => metadata, + _ => unreachable!(), + }) + .collect_vec(); + metadata_list.sort(); + match *count { + 1 => { + assert_eq!(epoch, epoch1); + assert_eq!(1, metadata_list.len()); + assert_eq!(metadata[0], metadata_list[0]); + } + 2 => { + assert_eq!(epoch, epoch2); + assert_eq!(1, metadata_list.len()); + assert_eq!(metadata[1], metadata_list[0]); + } + _ => unreachable!(), + } + Ok(()) + }, + ), + subscriber.clone(), + ) + .await; + } }) } }); @@ -752,20 +780,29 @@ mod tests { let vnode1 = build_bitmap(first); let vnode2 = build_bitmap(second); + let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| { + let (_sender, receiver) = unbounded_channel(); + + async move { Ok((1, receiver)) }.boxed() + }); let (manager, (_join_handle, _stop_tx)) = SinkCoordinatorManager::start_worker_with_spawn_worker({ let expected_param = param.clone(); move |param, new_writer_rx| { let expected_param = expected_param.clone(); - tokio::spawn(async move { - // validate the start request - assert_eq!(param, expected_param); - CoordinatorWorker::execute_coordinator( - param, - new_writer_rx, - MockCoordinator::new((), |_, _, _| unreachable!()), - ) - .await; + tokio::spawn({ + let subscriber = mock_subscriber.clone(); + async move { + // validate the start request + assert_eq!(param, expected_param); + CoordinatorWorker::execute_coordinator( + param, + new_writer_rx, + MockCoordinator::new((), |_, _, _| unreachable!()), + subscriber.clone(), + ) + .await; + } }) } }); @@ -833,23 +870,33 @@ mod tests { }; let vnode1 = build_bitmap(first); let vnode2 = build_bitmap(second); + let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| { + let (_sender, receiver) = unbounded_channel(); + async move { Ok((1, receiver)) }.boxed() + }); let (manager, (_join_handle, _stop_tx)) = SinkCoordinatorManager::start_worker_with_spawn_worker({ let expected_param = param.clone(); move |param, new_writer_rx| { let expected_param = expected_param.clone(); - tokio::spawn(async move { - // validate the start request - assert_eq!(param, expected_param); - CoordinatorWorker::execute_coordinator( - param, - new_writer_rx, - MockCoordinator::new((), |_, _, _| { - Err(SinkError::Coordinator(anyhow!("failed to commit"))) - }), - ) - .await; + tokio::spawn({ + let subscriber = mock_subscriber.clone(); + { + async move { + // validate the start request + assert_eq!(param, expected_param); + CoordinatorWorker::execute_coordinator( + param, + new_writer_rx, + MockCoordinator::new((), |_, _, _| { + Err(SinkError::Coordinator(anyhow!("failed to commit"))) + }), + subscriber.clone(), + ) + .await; + } + } }) } }); @@ -940,7 +987,11 @@ mod tests { let metadata_scale_out = [vec![9u8, 10u8], vec![11u8, 12u8], vec![13u8, 14u8]]; let metadata_scale_in = [vec![13u8, 14u8], vec![15u8, 16u8]]; + let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| { + let (_sender, receiver) = unbounded_channel(); + async move { Ok((1, receiver)) }.boxed() + }); let (manager, (_join_handle, _stop_tx)) = SinkCoordinatorManager::start_worker_with_spawn_worker({ let expected_param = param.clone(); @@ -952,40 +1003,49 @@ mod tests { let metadata_scale_out = metadata_scale_out.clone(); let metadata_scale_in = metadata_scale_in.clone(); let expected_param = expected_param.clone(); - tokio::spawn(async move { - // validate the start request - assert_eq!(param, expected_param); - CoordinatorWorker::execute_coordinator( - param.clone(), - new_writer_rx, - MockCoordinator::new(0, |epoch, metadata_list, count: &mut usize| { - *count += 1; - let mut metadata_list = metadata_list - .into_iter() - .map(|metadata| match metadata { - SinkMetadata { - metadata: - Some(Metadata::Serialized(SerializedMetadata { - metadata, - })), - } => metadata, - _ => unreachable!(), - }) - .collect_vec(); - metadata_list.sort(); - let (expected_epoch, expected_metadata_list) = match *count { - 1 => (epoch1, metadata[0].as_slice()), - 2 => (epoch2, metadata[1].as_slice()), - 3 => (epoch3, metadata_scale_out.as_slice()), - 4 => (epoch4, metadata_scale_in.as_slice()), - _ => unreachable!(), - }; - assert_eq!(expected_epoch, epoch); - assert_eq!(expected_metadata_list, &metadata_list); - Ok(()) - }), - ) - .await; + tokio::spawn({ + let subscriber = mock_subscriber.clone(); + async move { + // validate the start request + assert_eq!(param, expected_param); + CoordinatorWorker::execute_coordinator( + param.clone(), + new_writer_rx, + MockCoordinator::new( + 0, + |epoch, metadata_list, count: &mut usize| { + *count += 1; + let mut metadata_list = + metadata_list + .into_iter() + .map(|metadata| match metadata { + SinkMetadata { + metadata: + Some(Metadata::Serialized( + SerializedMetadata { metadata }, + )), + } => metadata, + _ => unreachable!(), + }) + .collect_vec(); + metadata_list.sort(); + let (expected_epoch, expected_metadata_list) = match *count + { + 1 => (epoch1, metadata[0].as_slice()), + 2 => (epoch2, metadata[1].as_slice()), + 3 => (epoch3, metadata_scale_out.as_slice()), + 4 => (epoch4, metadata_scale_in.as_slice()), + _ => unreachable!(), + }; + assert_eq!(expected_epoch, epoch); + assert_eq!(expected_metadata_list, &metadata_list); + Ok(()) + }, + ), + subscriber.clone(), + ) + .await; + } }) } }); diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 64f59d2e9aee9..9eb76f6cefe73 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -34,6 +34,7 @@ use risingwave_connector::connector_common::validate_connection; use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY, }; +use risingwave_meta_model::exactly_once_iceberg_sink::{Column, Entity}; use risingwave_meta_model::object::ObjectType; use risingwave_meta_model::{ ConnectionId, DatabaseId, FunctionId, IndexId, ObjectId, SchemaId, SecretId, SinkId, SourceId, @@ -55,6 +56,7 @@ use risingwave_pb::stream_plan::{ StreamFragmentGraph as StreamFragmentGraphProto, }; use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage}; +use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; use thiserror_ext::AsReport; use tokio::sync::{Semaphore, oneshot}; use tokio::time::sleep; @@ -1553,7 +1555,13 @@ impl DdlController { let version = self .drop_object(object_type, object_id, drop_mode, target_replace_info) .await?; - + #[cfg(not(madsim))] + if let StreamingJobId::Sink(sink_id) = job_id { + // delete system table for exactly once iceberg sink + // todo(wcy-fdu): optimize the logic to be Iceberg unique. + let db = self.env.meta_store_ref().conn.clone(); + clean_all_rows_by_sink_id(&db, sink_id).await?; + } Ok(version) } @@ -2179,3 +2187,30 @@ fn report_create_object( attr_info, ); } + +async fn clean_all_rows_by_sink_id(db: &DatabaseConnection, sink_id: i32) -> MetaResult<()> { + match Entity::delete_many() + .filter(Column::SinkId.eq(sink_id)) + .exec(db) + .await + { + Ok(result) => { + let deleted_count = result.rows_affected; + + tracing::info!( + "Deleted {} items for sink_id = {} in iceberg exactly once system table.", + deleted_count, + sink_id + ); + Ok(()) + } + Err(e) => { + tracing::error!( + "Error deleting records for sink_id = {} from iceberg exactly once system table: {:?}", + sink_id, + e.as_report() + ); + Err(e.into()) + } + } +}