Skip to content

Conversation

@danielcweeks
Copy link
Contributor

This PR depends on #14503 which adds commit validation.

The changes in this PR ensure that the expected commit offsets are not updated when table is refreshed during the commit process. This prevents a concurrent KC committer from updating the offsets resulting in duplicate processing of events.

@bryanck
Copy link
Contributor

bryanck commented Nov 5, 2025

@kumarpritam863 you might be interested in this PR also

Comment on lines +327 to +332
private Snapshot latestSnapshot(TableMetadata metadata, String branch) {
if (branch == null) {
return metadata.currentSnapshot();
}
return metadata.snapshot(metadata.ref(branch).snapshotId());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably we can move this to SnapshotUtil


private Map<Integer, Long> lastCommittedOffsetsForTable(TableMetadata metadata, String branch) {
Snapshot snapshot = latestSnapshot(metadata, branch);
while (snapshot != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[optional] we can get an iterable of ancestors from the SnapshotUtils

Comment on lines +296 to +306
Map<Integer, Long> lastCommittedOffsets = lastCommittedOffsetsForTable(base, branch);

if (expectedOffsets == null || expectedOffsets.isEmpty()) {
return; // there are no stored offsets, so assume we're starting with new offsets
}

if (!expectedOffsets.equals(lastCommittedOffsets)) {
throw new CommitFailedException(
"Latest offsets do not match expected offsets for this commit.");
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[doubt] wouldn't this be lastCommittedOffset ?
if expectedOffset is null and lastCommittedOffset is non empty then we should fail ?

Suggested change
Map<Integer, Long> lastCommittedOffsets = lastCommittedOffsetsForTable(base, branch);
if (expectedOffsets == null || expectedOffsets.isEmpty()) {
return; // there are no stored offsets, so assume we're starting with new offsets
}
if (!expectedOffsets.equals(lastCommittedOffsets)) {
throw new CommitFailedException(
"Latest offsets do not match expected offsets for this commit.");
}
};
Map<Integer, Long> lastCommittedOffsets = lastCommittedOffsetsForTable(base, branch);
if (lastCommittedOffsets == null || lastCommittedOffsets.isEmpty()) {
return; // there are no stored offsets, so assume we're starting with new offsets
}
// handle case for expectedOffsets being null too
if (!lastCommittedOffsets.equals(expectedOffsets)) {
throw new CommitFailedException(
"Committed offsets do not match expected offsets for this commit.");
}
};

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I debated this, but I'm not sure what the expected behavior should be in that case. It seems likely that we want to error in that case as well, but the scenario around it is less clear to me.

@danielcweeks
Copy link
Contributor Author

Closed in favor of #14510

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants