-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Spark 3.2: Use SnapshotSummary to deduce latestOffset in Spark SS #4517
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.2: Use SnapshotSummary to deduce latestOffset in Spark SS #4517
Conversation
SreeramGarlapati
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@singhpk234 - we explicitly opted out of reading the SnapshotSummary - based on discussion with @aokolnychyi, @RussellSpitzer & @rdblue.
rationale: SnapshotSummary is a free form dictionary & ADDED_FILES_PROP as a key in this dictionary - is NOT added to Iceberg Spec & is not NOT populated by all engines yet.
Pl. read thru this comment thread for full context.
#2660 (comment)
|
Thanks @SreeramGarlapati !!! for sharing the #2660 (comment) thread, it really helps. Glad to know it was already considered while the implementation. It looks like we were on the fence to establish the reliability of this metric and hence decided to re-visit it in future, refering this comment #2660 (comment)
Agree with you on this.
As per my understanding engines don't own the responsibility of calculating / updating the snapshot summary stats, IIUC it's done entirely by core library. The flow for ex : commit() [from engine] -> apply() in SnapshotProducer (core) -> Summary calculation (core). |
|
@singhpk234, this is going to get the size of @SreeramGarlapati makes a good point, but if the added files property is present, then it seems to me like we could use it and fall back to reading the snapshot. |
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
12f298c to
8ff5320
Compare
| long addedFilesCount = PropertyUtil.propertyAsLong(latestSnapshot.summary(), SnapshotSummary.ADDED_FILES_PROP, -1); | ||
| // If snapshotSummary doesn't have SnapshotSummary.ADDED_FILES_PROP, iterate through addedFiles iterator to find | ||
| // addedFilesCount. | ||
| addedFilesCount = addedFilesCount == -1 ? Iterables.size(latestSnapshot.addedFiles()) : addedFilesCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I see, good point we cannot just put Iterables.size(latestSnapshot.addedFiles()) as the input to the last method otherwise it will be evaluated.
jackye1995
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good to me!
|
@SreeramGarlapati @rdblue any additional comments? If not I think it is good to be merged. |
8ff5320 to
73211a1
Compare
|
I think this is good to go, and there is no further comment for quite a while now. I will merge it, thanks for the reviews! @rdblue @SreeramGarlapati |
Came across this while working on
cc @rdblue, @RussellSpitzer, @jackye1995, @kbendick