Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
5aa6703
add meta store in sink coordinator
wcy-fdu Dec 3, 2024
4aba17b
iceberg sink write meta store
wcy-fdu Dec 3, 2024
790db0b
save work
wcy-fdu Dec 3, 2024
22e7fc3
save work
wcy-fdu Dec 5, 2024
f75b2c2
save work: sealize and deserialize pre commit metadata
wcy-fdu Dec 6, 2024
a0e6691
put recommit logic in coodinator.init()
wcy-fdu Dec 6, 2024
9525615
clippy happy
wcy-fdu Dec 9, 2024
7c938b1
delete meta store right after iceberg commit
wcy-fdu Dec 9, 2024
6af8331
save work, todo: store sink metadata pb in meta store
wcy-fdu Dec 9, 2024
45e53ce
save work, todo: add log_store_rewind_start_epoch in StartCoordinatio…
wcy-fdu Dec 11, 2024
10b4d0d
save work
wcy-fdu Dec 11, 2024
375895c
rewind log store
wcy-fdu Dec 12, 2024
350d76e
update aws-lc-rs
wcy-fdu Dec 12, 2024
6810374
save work
wcy-fdu Dec 20, 2024
9cfb39a
fix meta store table name, verfify normal write path is right
wcy-fdu Dec 20, 2024
64ce858
remove unrelated file
wcy-fdu Dec 20, 2024
0f17d73
resolve conflict
wcy-fdu Jan 3, 2025
b315722
save work
wcy-fdu Jan 6, 2025
7ada259
save work
wcy-fdu Jan 8, 2025
a94da79
resolve conflict
wcy-fdu Mar 3, 2025
d1b49de
update migration script to latest
wcy-fdu Mar 3, 2025
59dd44e
resolve conflict
wcy-fdu Mar 3, 2025
2d58f12
resolve some comments: add index, use snapshot_id to check file exist.
wcy-fdu Mar 5, 2025
312e313
clean system table when drop sink
wcy-fdu Mar 6, 2025
6f0db4e
add new interface for log reader to avoid change in rewind interface
wcy-fdu Mar 6, 2025
63d9973
add change in log store for exactly once sink
wcy-fdu Mar 6, 2025
7d4e7bc
minor
wcy-fdu Mar 7, 2025
e833faf
add comments
wcy-fdu Mar 7, 2025
31cc794
change the semantics of rewind
wcy-fdu Mar 10, 2025
a70dd05
do not build stream when log_reader init()
wcy-fdu Mar 10, 2025
0535d41
make clippy happy
wcy-fdu Mar 11, 2025
169f277
do nothing for in memory log reader start_offset
wcy-fdu Mar 11, 2025
5fa1177
merge change in pr 20772
wcy-fdu Mar 11, 2025
7c8b64e
remove some old test
wcy-fdu Mar 11, 2025
c3f226c
sleep before and after iceberg commit for chaos mesh test
wcy-fdu Mar 12, 2025
66393cd
skip rewind when all data already sinked and recovery happened
wcy-fdu Mar 13, 2025
deb49fc
save work
wcy-fdu Mar 13, 2025
e0740a8
add delete marker, fix some corner case, WIP, gc table
wcy-fdu Mar 13, 2025
7863cea
delete previous item when commit
wcy-fdu Mar 13, 2025
fd9900d
add wait epoch berfore coordinator.commit
wcy-fdu Mar 17, 2025
ea3aa1f
do not sleep when re commit
wcy-fdu Mar 18, 2025
b163968
do not write system table when re commit, otherwise pk will conflict
wcy-fdu Mar 18, 2025
9be29b7
tune test
wcy-fdu Mar 18, 2025
6231d89
resolve some comments
wcy-fdu Mar 19, 2025
65f91ca
resolve some comments
wcy-fdu Mar 19, 2025
f8ee260
resolve some comments
wcy-fdu Mar 19, 2025
abddb71
resolve comment
wcy-fdu Mar 21, 2025
d76108e
fix test
wcy-fdu Mar 21, 2025
3348332
fix test
wcy-fdu Mar 21, 2025
809052c
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
wcy-fdu Mar 21, 2025
308c8fb
minor fix
wcy-fdu Mar 21, 2025
62b7e7f
minor refactor
wcy-fdu Mar 25, 2025
2de4098
merge #20772
wcy-fdu Mar 25, 2025
6a242c4
enhance tracing
wcy-fdu Mar 25, 2025
aa50691
keep migration script latest
wcy-fdu Mar 25, 2025
03c6a56
minor
wcy-fdu Mar 25, 2025
72d7739
sort cargo.toml
wcy-fdu Mar 25, 2025
841e96a
minor
wcy-fdu Mar 25, 2025
4b7a4cf
resolve conflict
wcy-fdu Mar 25, 2025
fc7a84c
only exactly once iceberg sink wait epoch before committing
wcy-fdu Mar 25, 2025
1909886
Merge branch 'main' into wcy/exactlt_once_iceberg_sink.pr
wcy-fdu Mar 25, 2025
d6e2eff
Merge branch 'main' into wcy/exactlt_once_iceberg_sink.pr
wcy-fdu Mar 26, 2025
21fd519
change bahavior when drop sink
wcy-fdu Mar 26, 2025
bb68fa4
Merge branch 'wcy/exactlt_once_iceberg_sink.pr' of https://github.com…
wcy-fdu Mar 26, 2025
09466b1
minor
wcy-fdu Mar 26, 2025
94c3227
clippy happy
wcy-fdu Mar 26, 2025
12e56dd
Merge branch 'main' into wcy/exactlt_once_iceberg_sink.pr
wcy-fdu Mar 26, 2025
0017444
empty commit for retry
wcy-fdu Mar 26, 2025
f243572
Merge branch 'wcy/exactlt_once_iceberg_sink.pr' of https://github.com…
wcy-fdu Mar 26, 2025
639217a
do not handle system table when drop sink to wish CI pass
wcy-fdu Mar 26, 2025
9348a0e
add #[cfg(not(madsim))] for drop sink
wcy-fdu Mar 26, 2025
57fef17
resolve comments
wcy-fdu Mar 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,12 @@ otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev =
prost = { version = "0.13" }
prost-build = { version = "0.13" }
# branch dev_rebase_main_20250325
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "c9da916ab29b13adcb137ba40bad9e2dc10309c4", features = [
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "14c3387c388f64e99dd2a811cdd5f554fe7680c3", features = [
"storage-s3",
"storage-gcs",
] }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "c9da916ab29b13adcb137ba40bad9e2dc10309c4" }
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "c9da916ab29b13adcb137ba40bad9e2dc10309c4" }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "14c3387c388f64e99dd2a811cdd5f554fe7680c3" }
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "14c3387c388f64e99dd2a811cdd5f554fe7680c3" }
opendal = "0.49"
arrow-udf-js = "0.6.1"
arrow-udf-wasm = { version = "0.5.1", features = ["build"] }
Expand Down
1 change: 1 addition & 0 deletions src/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ plotters = { version = "0.3.5", default-features = false, features = [
risingwave_common = { workspace = true }
risingwave_connector = { workspace = true }
risingwave_stream = { workspace = true }
sea-orm = { workspace = true }
serde = { version = "1", features = ["derive"] }
serde_yaml = "0.9"
thiserror-ext = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use risingwave_connector::source::{
};
use risingwave_stream::executor::test_utils::prelude::ColumnDesc;
use risingwave_stream::executor::{Barrier, Message, MessageStreamItem, StreamExecutorError};
use sea_orm::DatabaseConnection;
use serde::{Deserialize, Deserializer};
use thiserror_ext::AsReport;
use tokio::sync::oneshot::Sender;
Expand Down Expand Up @@ -382,7 +383,7 @@ where
<S as risingwave_connector::sink::Sink>::Coordinator: std::marker::Send,
<S as risingwave_connector::sink::Sink>::Coordinator: 'static,
{
if let Ok(coordinator) = sink.new_coordinator().await {
if let Ok(coordinator) = sink.new_coordinator(DatabaseConnection::Disconnected).await {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will passing DatabaseConnection::Disconnected cause the iceberg sink bench to fail?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The iceberg sink will only write to the system table in one case, that is, is_exactly_once = true is set in the create sink statement. Otherwise, this DatabaseConnection will not be used.

sink_writer_param.meta_client = Some(SinkMetaClient::MockMetaClient(MockMetaClient::new(
Box::new(coordinator),
)));
Expand Down
2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ risingwave_common_estimate_size = { workspace = true }
risingwave_common_rate_limit = { workspace = true }
risingwave_connector_codec = { workspace = true }
risingwave_jni_core = { workspace = true }
risingwave_meta_model = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
rumqttc = { version = "0.24.0", features = ["url"] }
Expand All @@ -120,6 +121,7 @@ rustls-pemfile = "2"
rustls-pki-types = "1"
rw_futures_util = { workspace = true }
scopeguard = "1"
sea-orm = { workspace = true }
sea-schema = { version = "0.16", default-features = false, features = [
"discovery",
"sqlx-postgres",
Expand Down
8 changes: 6 additions & 2 deletions src/connector/src/sink/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_common::array::StreamChunk;
use risingwave_common::bitmap::Bitmap;
use risingwave_pb::connector_service::SinkMetadata;

use super::SinkCommittedEpochSubscriber;
use crate::sink::{SinkCommitCoordinator, SinkWriter};

pub type BoxWriter<CM> = Box<dyn SinkWriter<CommitMetadata = CM> + Send + 'static>;
Expand Down Expand Up @@ -52,8 +53,11 @@ impl<CM: 'static + Send> SinkWriter for BoxWriter<CM> {

#[async_trait]
impl SinkCommitCoordinator for BoxCoordinator {
async fn init(&mut self) -> crate::sink::Result<Option<u64>> {
self.deref_mut().init().await
async fn init(
&mut self,
subscriber: SinkCommittedEpochSubscriber,
) -> crate::sink::Result<Option<u64>> {
self.deref_mut().init(subscriber).await
}

async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> crate::sink::Result<()> {
Expand Down
7 changes: 4 additions & 3 deletions src/connector/src/sink/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_pb::connector_service::SinkMetadata;
use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
use sea_orm::DatabaseConnection;
use serde_derive::{Deserialize, Serialize};
use serde_with::{DisplayFromStr, serde_as};
use with_options::WithOptions;
Expand All @@ -48,7 +49,7 @@ use super::decouple_checkpoint_log_sink::{
use super::writer::SinkWriter;
use super::{
Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkCommitCoordinator,
SinkError, SinkParam, SinkWriterMetrics, SinkWriterParam,
SinkCommittedEpochSubscriber, SinkError, SinkParam, SinkWriterMetrics, SinkWriterParam,
};
use crate::connector_common::AwsAuthProps;

Expand Down Expand Up @@ -384,7 +385,7 @@ impl Sink for DeltaLakeSink {
true
}

async fn new_coordinator(&self) -> Result<Self::Coordinator> {
async fn new_coordinator(&self, _db: DatabaseConnection) -> Result<Self::Coordinator> {
Ok(DeltaLakeSinkCommitter {
table: self.config.common.create_deltalake_client().await?,
})
Expand Down Expand Up @@ -496,7 +497,7 @@ pub struct DeltaLakeSinkCommitter {

#[async_trait::async_trait]
impl SinkCommitCoordinator for DeltaLakeSinkCommitter {
async fn init(&mut self) -> crate::sink::Result<Option<u64>> {
async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
tracing::info!("DeltaLake commit coordinator inited.");
Ok(None)
}
Expand Down
Loading
Loading