Skip to content

Conversation

@aiborodin
Copy link
Contributor

@aiborodin aiborodin commented Nov 6, 2025

This change uses the new API in #14514 to validate that no concurrent commit has moved the Kafka offsets that would not be seen by the committer due to the table refreshing during the commit process.

It is an alternative solution to #14510.

Change-Id: Ie7e2bc0920bd855188c5bcc2435ace947db9af21
Change-Id: Ib7c367e90f42a8184772fb17194bb4e5427804db
@aiborodin
Copy link
Contributor Author

@rdblue What do you think?

private TableMetadata base;
private boolean stageOnly = false;
private Consumer<String> deleteFunc = defaultDelete;
@Nullable private Long startingSnapshotId = null; // check all versions by default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the original PR's approach to iterate backward through history rather than this approach that iterates through more by default. I also like that the other approach can reuse work by running the validation once with a loop, rather than calling the validation in a loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the original PR's approach to iterate backward through history rather than this approach that iterates through more by default.

Both the original PR and this approach iterate backward. The original PR loops through the whole history for every validation run. This PR only checks new parent Snapshots.

I also like that the other approach can reuse work by running the validation once with a loop, rather than calling the validation in a loop.

Like I said above - the other approach runs validations for all parent snapshots in a loop. This PR limits the scope of validations to new Snapshots.

private boolean stageOnly = false;
private Consumer<String> deleteFunc = defaultDelete;
@Nullable private Long startingSnapshotId = null; // check all versions by default
@Nullable private Consumer<Snapshot> snapshotValidator = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I commented on the original PR, we don't want to leave the exception class to the implementation. A validation should return a boolean to indicate that something was wrong. That way, the exception class is uniform (ValidationException) and acceptance is explicit.

operation.set(snapshotOffsetsProp, offsetsJson);
operation.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString());
if (validThroughTs != null) {
operation.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is generally better to avoid unnecessary changes and refactoring. If you want to refactor this to remove the duplication, then we prefer opening a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Unfortunately, it resulted in the cyclomatic complexity failure, so I had to change it.

@Override
public void accept(Snapshot snapshot) {
if (stagedWapId.equals(snapshot.summary().get(SnapshotSummary.STAGED_WAP_ID_PROP))) {
throw new ValidationException(validationErrorMessage);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Use ValidationException.check instead.

In addition, exception messages should have all relevant context. In this case, I'd expect it to contain the WAP ID. I know this is just test code, but it seems like a good rule to follow in all cases.


private WapIdValidator(String stagedWapId, String validationErrorMessage) {
this.stagedWapId = stagedWapId;
this.validationErrorMessage = validationErrorMessage;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a test, but I would not want people looking at this to copy the convention used here of passing an error message into the validator. I'd expect this to be embedded in the exception produced by validation. This is probably related to the issue below, where the actual ID is missing.


@Override
protected void validate(TableMetadata base, Snapshot parent) {
super.validate(base, parent);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered whether it is a good idea to override validate and concluded that I wouldn't recommend this option. Part of the problem is that this needlessly touches a lot of files it doesn't need to, but it is also more brittle than adding a call because we may miss an implementation. And there are some custom operations out there that extend Iceberg classes that this would break.

* @param snapshotValidator a user function to validate parent snapshots
* @return this for method chaining
*/
default ThisT validateWith(Consumer<Snapshot> snapshotValidator) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the use of the ancestor iterable is a more elegant solution than making a large refactor and calling the validator in a loop.

if (dataSpec().isUnpartitioned()) {
validateAddedDataFiles(
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent);
currentMetadata, startingSnapshotId(), Expressions.alwaysTrue(), parent);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good example of what I mean in the comment above. There are quite a few changes here (and in other classes) to refactor so that startingSnapshotId can be reused.


ValidationException.check(
expectedOffsets.equals(lastCommittedOffsets),
"Latest offsets do not match expected offsets for this commit. Table: %s, Expected: %s, Last Committed: %s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error messages in Iceberg aim to be direct and simple, rather than using full sentences and punctuation. In this case, I'd expect something more like "Cannot commit to %s: expected offsets %s changed to %s"


/**
* Enables snapshot validation with a user-provided function, which must throw a {@link
* org.apache.iceberg.exceptions.ValidationException} on validation failures.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should document how the validator is called. Is the latest snapshot first or last? I think that affects how validations are written so it seems important.

@aiborodin
Copy link
Contributor Author

Thank you for the review @rdblue, I appreciate your feedback!
Given that #14509 has been merged, we should probably go ahead with that API.
I still think there's value in adding validateFromSnapshot(long snapshotId) to allow clients to limit the checked history. I can raise a separate PR for this feature.

@github-actions
Copy link

github-actions bot commented Dec 8, 2025

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Dec 8, 2025
@github-actions
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Dec 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants