-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Kafka Connect: validate offsets for refreshed table state on commit #14510
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka Connect: validate offsets for refreshed table state on commit #14510
Conversation
a8192c8 to
0b6e560
Compare
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
Outdated
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
Outdated
Show resolved
Hide resolved
|
Thank you @danielcweeks for the change and @rdblue for the review. I implemented an alternative API that works in Kafka Connect and reuses most of the existing validation code. It is in the following two PRs:
@rdblue @danielcweeks Could you please take a look and let me know what you think? |
0b6e560 to
1b6145c
Compare
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
Outdated
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
Show resolved
Hide resolved
...-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java
Show resolved
Hide resolved
amogh-jahagirdar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @danielcweeks , think this needs to be rebased but this looks correct to me!
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
Outdated
Show resolved
Hide resolved
1b6145c to
c62bd7b
Compare
|
LGTM also |
| @Override | ||
| public String errorMessage() { | ||
| return String.format( | ||
| "Latest offsets do not match expected offsets for this commit. Table: %s, Expected: %s, Last Committed: %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is less important in connector code vs core, but I like to have more direct error messages. For example, "Cannot commit to %s, stale offsets:\nExpected: %s\nCommitted: %s"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we can claim the offsets are stale (they may actually be in the future). All we can tell at this point is they don't match and we expect them to.. Nevermind, I think I understand what you mean by stale here.
| .set(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}") | ||
| .commit(); | ||
|
|
||
| table.refresh(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't needed after the append right? It's the same table instance.
|
Thanks @rdblue @bryanck @amogh-jahagirdar and @PrabhuJoseph for the reviews! |
This PR depends on #14509 and validates that another commit hasn't moved the kafka offsets that would not be seen by the committer due to the table refreshing during the commit process.