feat(sink): introduce exactly once iceberg sink#19771
Merged
Conversation
wcy-fdu
commented
Jan 8, 2025
Contributor
Author
hzxa21
reviewed
Feb 11, 2025
Collaborator
hzxa21
left a comment
There was a problem hiding this comment.
Left some early comments
| <S as risingwave_connector::sink::Sink>::Coordinator: 'static, | ||
| { | ||
| if let Ok(coordinator) = sink.new_coordinator().await { | ||
| if let Ok(coordinator) = sink.new_coordinator(DatabaseConnection::Disconnected).await { |
Collaborator
There was a problem hiding this comment.
Will passing DatabaseConnection::Disconnected cause the iceberg sink bench to fail?
Contributor
Author
There was a problem hiding this comment.
The iceberg sink will only write to the system table in one case, that is, is_exactly_once = true is set in the create sink statement. Otherwise, this DatabaseConnection will not be used.
…/risingwavelabs/risingwave into wcy/exactlt_once_iceberg_sink.pr
…/risingwavelabs/risingwave into wcy/exactlt_once_iceberg_sink.pr
8 tasks
3 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
This pr introduce exactly once consistency semantics for iceberg sink.
Related RFC: Exactly Once File Sink
implementation detail
The implementation of this feature can be mainly divided into four parts:
coordinator.commit()interface.)All logic is completed in the
coordinator.init()interface when recovery occurs. This is because all memory states are cleared during recovery, and then the sink writer is built first, and then the sink writer builds the coordinator.re_commit.max_epochcalledlast_recommit_epochin the code, is passed to CN through the existingStartCoordinationResponsefor log store rewind. The rewind here is to avoid repeated consumption of the log store, because this batch of data must have been in the downstream iceberg at this time.test logic
The testing part is still being improved and will be continuously updated in PR.
After some offline discussions, we decided to test it this way: simulate error injection through madsim, and then sink the same batch of source data into iceberg twice, showing two iceberg tables. One of them has errors injected, and the other does not. After comparing the two tables, they are exactly the same.
Notes:
will notbe GC before the next successful commit. In later pr, there-commitstep after recovery will be changed to rewind to the start epoch.Checklist
Documentation
Release note