-
Notifications
You must be signed in to change notification settings - Fork 3k
Add mixin interface to support commit validation #14503
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
amogh-jahagirdar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great to me @danielcweeks , will leave it up for @bryanck review!
core/src/main/java/org/apache/iceberg/SupportsCommitValidation.java
Outdated
Show resolved
Hide resolved
bryanck
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
|
@danielcweeks @bryanck @amogh-jahagirdar There's an existing PR, which is currently being reviewed by @pvary, to add this functionality: #14425 (PR - #14445). My main concern with this change is that it doesn't fit into the existing validation framework in |
|
Also, I believe this fixes the stale issue #6514 |
After this change, we will have two places in the code where validation occurs. In my opinion, we should align with the existing validation API in SnapshotProducer and corresponding interfaces in RowDelta validateDataFilesExist(Iterable<? extends CharSequence> referencedFiles);
RowDelta validateDeletedFiles();
RowDelta conflictDetectionFilter(Expression conflictDetectionFilter);What do you think? cc: @pvary |
singhpk234
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM too, Thanks @danielcweeks !
| // this operation retries | ||
| // to ensure that if a concurrent operation assigns the UUID, this operation will | ||
| // not fail. | ||
| validator.accept(base, updated); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[optional] may be move this above comment since the comment is for line below
| * Commits the updated metadata to the table taking a validator that will be called with the | ||
| * refreshed metadata and pending metadata. | ||
| * <p> | ||
| * The validator should throw a {@link CommitFailedException} if validation fails. Retries will still |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not throw a ValidationException like the rest of the code, overriding SnapshotProducer::validate(TableMetadata currentMetadata, Snapshot snapshot)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We retry on CommitFailedException, so I think there's a question as to whether there's any scenario where a retry could result in a successful commit. I'm not clear whether that is a reasonable assumption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's a question as to whether there's any scenario where a retry could result in a successful commit. I'm not clear whether that is a reasonable assumption.
It makes sense to retry a CommitFailedException in SnapshotProducer::commit() due to the nature of concurrent commits in Iceberg. But why would we want to retry logical validation failures?
I think the CommitValidator should throw a non-retryable ValidationException and the client (Kafka Connect, Flink) should then decide whether to retry or skip the commit.
| long sequenceNumber = base.nextSequenceNumber(); | ||
| Long parentSnapshotId = parentSnapshot == null ? null : parentSnapshot.snapshotId(); | ||
|
|
||
| validate(base, parentSnapshot); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reuse some of the existing code of validate(TableMetadata currentMetadata, Snapshot snapshot) in this change?
| // this operation retries | ||
| // to ensure that if a concurrent operation assigns the UUID, this operation will | ||
| // not fail. | ||
| validator.accept(base, updated); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be close to the other validate(TableMetadata currentMetadata, Snapshot snapshot) call on line 260?
| @FunctionalInterface | ||
| interface CommitValidator extends BiConsumer<TableMetadata, TableMetadata> { | ||
| @Override | ||
| void accept(TableMetadata base, TableMetadata current); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this accept Snapshot as a parameter to avoid depending on core classes?
Or match the existing validation signature in SnapshotProducer:
/**
* Validate the current metadata.
*
* <p>Child operations can override this to add custom validation.
*
* @param currentMetadata current table metadata to validate
* @param snapshot ending snapshot on the lineage which is being validated
*/
protected void validate(TableMetadata currentMetadata, Snapshot snapshot) {}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the change @danielcweeks. I accidentally pressed the request changes button, just leaving my comments here around aligning with the existing validation APIs mentioned in my previous comment.
|
@singhpk234 @bryanck @amogh-jahagirdar I've created a different approach in #14509 that uses a more public API. It's not as powerful as this, but |
aiborodin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replying to the comment.
|
Closed in favor of #14509 |
Currently committing will auto refresh table state before building new table metadata, but this is hidden from the original committer, which makes it impossible for to validate any changes to the table during the commit process.
This PR adds the ability for a committer to provide a validator that can inspect the original table state and the new table state. This can be used in a number of ways such as to provide isolation guarantees or for implementations like kafka connect to validate that the offset state hasn't been updated by a competing commit.