-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Api: Track partition statistics via TableMetadata #8502
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
c1428bb to
a9741f9
Compare
| * <p>Statistics are informational. A reader can choose to ignore statistics information. Statistics | ||
| * support is not required to read the table correctly. | ||
| */ | ||
| public interface PartitionStatisticsFile extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a9741f9 to
cc90a60
Compare
|
Ack, will review this week. |
Thanks. |
|
I started looking on Friday but got distracted. I will try to finish by end of Wed. |
aokolnychyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did one pass. Overall, this seems solid. I'll need to do another pass with fresh eyes.
Are saving the rest of changes in engines to keep these files during cleanup for future PRs? Shall we test the core expiry of snapshots in this PR? It would be unfortunate to add partition stats files and let the expiry process remove them.
.palantir/revapi.yml
Outdated
| justification: "Static utility class - should not have public constructor" | ||
| "1.4.0": | ||
| org.apache.iceberg:iceberg-api: | ||
| - code: "java.method.addedToInterface" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know we did a similar breaking change when adding table stats. I wonder whether that was the correct decision, however. Why not return an empty list by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added as default
| import java.io.Serializable; | ||
|
|
||
| /** | ||
| * Represents a partition statistics file in the table default format, that can be used to read |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether in the table default format will always be accurate. What about just this?
Represents a partition statistics file that can be used to read table data more efficiently.
| long snapshotId(); | ||
|
|
||
| /** | ||
| * Returns fully qualified path to the file, suitable for constructing a Hadoop Path. Never null. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't encourage using Hadoop Path given that we have our own FileIO and InputFile.
What about dropping suitable for constructing a Hadoop Path?
| * @return this for method chaining | ||
| */ | ||
| UpdatePartitionStatistics setPartitionStatistics( | ||
| long snapshotId, PartitionStatisticsFile partitionStatisticsFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, PartitionStatisticsFile already returns snapshotId().
Why also pass it explicitly and validate they are equal in the implementation?
Seems to match what we do for UpdateStatistics, though.
| deleteFiles(manifestListsToDelete, "manifest list"); | ||
|
|
||
| if (!beforeExpiration.statisticsFiles().isEmpty()) { | ||
| if (!beforeExpiration.statisticsFiles().isEmpty() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about a helper method like hasStatsFiles or something to simplify the condition and stay on 1 line?
| deleteFiles(manifestListsToDelete, "manifest list"); | ||
|
|
||
| if (!beforeExpiration.statisticsFiles().isEmpty()) { | ||
| if (!beforeExpiration.statisticsFiles().isEmpty() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A helper method here too?
| * @return the location of statistics files | ||
| * @deprecated use the {@code allStatisticsFilesLocations(table)} instead. | ||
| */ | ||
| @Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the purpose of this method and how it is used, what about simply extending it to return all statistics files? Then we won't need changes in consumers and won't need another method. We can clarify that this returns all stats files in Javadoc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the changes in this file as the new APIs added was not called from anywhere (this is needed for spark action).
I will raise a follow up PR for Spark actions to consider partition stats file for remove orphan files and expire snapshots action.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since, it was needed only for spark action, moved these changes into a separate PR with the tests for expire snapshots and remove orphan files.
|
|
||
| public class SetPartitionStatistics implements UpdatePartitionStatistics { | ||
| private final TableOperations ops; | ||
| private final Map<Long, Optional<PartitionStatisticsFile>> partitionStatisticsToSet = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure how I feel about using a map and giving Optional a special meaning. Would having a map of stats to set and a separate set of snapshot IDs to remove easier to read/understand? What do you think, @ajantha-bhat?
private TableMetadata internalApply(TableMetadata base) {
TableMetadata.Builder builder = TableMetadata.buildFrom(base);
toSet.forEach(builder::setPartitionStatistics);
toRemove.forEach(builder::removePartitionStatistics);
return builder.build();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
| this.statisticsFiles = | ||
| base.statisticsFiles.stream().collect(Collectors.groupingBy(StatisticsFile::snapshotId)); | ||
| this.partitionStatisticsFiles = | ||
| base.partitionStatisticsFiles.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether a helper method would make it easier to read.
| ], | ||
| "snapshot-log": [], | ||
| "metadata-log": [] | ||
| } No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing empty line?
|
@aokolnychyi: Thanks for the review.
Got a little busy week. I will finish addressing comments and the follow up Spark PR for expire snapshots and remove orphan files by Monday. Meanwhile you can also review the independent PR (Util for partition stats reading and writing) : #9170 |
Tracking `PartitionStatisticsFile` in a same way as how `StatisticsFile` is already tracked.
cc90a60 to
35855ce
Compare
35855ce to
3124544
Compare
|
@aokolnychyi: Fixed all the comments and also opened a new Spark module PR (which is dependent on this) to ensure partition stats are considered for GC (expire snapshots and remove orphan files). I will rebase that PR once this is merged. |
aokolnychyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks close. I did another pass, I'll need to check the tests with fresh eyes.
| * <p>Statistics are informational. A reader can choose to ignore statistics information. Statistics | ||
| * support is not required to read the table correctly. | ||
| */ | ||
| public interface PartitionStatisticsFile extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure it is a good idea to make PartitionStatisticsFile serializable? I would probably not do that unless there is a good reason right now. None of our existing files are serializable by contract (they may be in practice but not by API).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. We can add it back if required during an end to end implementation.
| this.previousFileLocation = base.metadataFileLocation; | ||
| this.previousFiles = base.previousFiles; | ||
| this.refs = Maps.newHashMap(base.refs); | ||
| this.statisticsFiles = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ajantha-bhat, just to make sure I understand. We try to replace the partition stats for each snapshot but it is not required by the spec so it is technically possible to have multiple files for one snapshot?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently one stats file per snapshot. But I think in future it may track multiple files too.
I followed the same pattern as existing puffin files. I want to keep the interfaces consistent.
| } | ||
|
|
||
| public Builder removePartitionStatistics(long snapshotId) { | ||
| Preconditions.checkNotNull(snapshotId, "snapshotId is null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can it be null if it is primitive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. I copy pasted from existing removeStatistics which has this problem. I overlooked or assumed things are correct. I will be careful next time.
| this.sortOrdersById = Maps.newHashMap(base.sortOrdersById); | ||
| } | ||
|
|
||
| private static Map<Long, List<StatisticsFile>> statsFileBySnapshotID(TableMetadata base) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Is there a pattern in this class to have static methods at the end? If so, can we put these methods together with other static methods? If not, it is OK to keep them here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved
| .collect(Collectors.groupingBy(StatisticsFile::snapshotId)); | ||
| } | ||
|
|
||
| private static Map<Long, List<PartitionStatisticsFile>> partitionStatsFileBySnapshotID( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is common for this class to call such methods indexSmth and pass actual elements. If so, what about something like below? We can use shorter variables to stay on line but that's totally optional. Up to you, method names are examples.
private static Map<Long, List<StatisticsFile>> indexStatistics(List<StatisticsFile> files) {
return files.stream().collect(Collectors.groupingBy(StatisticsFile::snapshotId));
}
private static Map<Long, List<PartitionStatisticsFile>> indexPartitionStatistics(
List<PartitionStatisticsFile> files) {
return files.stream().collect(Collectors.groupingBy(PartitionStatisticsFile::snapshotId));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
| return this; | ||
| } | ||
|
|
||
| public Builder setPartitionStatistics(PartitionStatisticsFile partitionStatisticsFile) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: I wonder whether using just file or partitionStats to shorten the lines would make sense.
Completely up to you.
|
|
||
| @Override | ||
| public UpdatePartitionStatistics setPartitionStatistics( | ||
| PartitionStatisticsFile partitionStatisticsFile) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel the context in this class is pretty clear and we can call it stats, partitionStats or file to shorten the lines. Then we can also call other variables partitionStatsToSet or statsToSet.
|
|
||
| public class SetPartitionStatistics implements UpdatePartitionStatistics { | ||
| private final TableOperations ops; | ||
| private final Set<PartitionStatisticsFile> partitionStatisticsToSet = Sets.newHashSet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about using Map<Long, PartitionStatisticsFile> to make sure the files are overridden if multiple files are passed for the same snapshot? We can then simply call statsToSet.values().forEach(...) below?
| } | ||
|
|
||
| private static List<PartitionStatisticsFile> partitionStatisticsFilesFromJson( | ||
| JsonNode partitionStatisticsFilesList) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't mind using longer variables but I wonder whether a shorter version and staying on one line in some places would be more readable in this method.
|
@ajantha-bhat, I'll take a look at other PRs once this is in. I feel this one is almost ready to go. |
|
@aokolnychyi: I have handled all the new comments. Thanks again for the review. |
|
Re-triggering build due to flaky test in Flink |
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; | ||
|
|
||
| public class SetPartitionStatistics implements UpdatePartitionStatistics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this have to be public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me. I had only one comment about the access modifier for the base implementation class.
This is a big PR so I will not hold it because of that. However, can we please follow up, if needed?
Great work, @ajantha-bhat!
|
@aokolnychyi: Thanks for the detailed review and merge. The next small PR to review will be #9284 (Spark 3.5: Ensure that partition stats files are considered for GC procedures)
I will check this in a followup. Again this follows the style from puffin files. |
| List<PartitionStatisticsFile> partitionStatisticsFiles; | ||
| if (node.has(PARTITION_STATISTICS)) { | ||
| partitionStatisticsFiles = partitionStatsFilesFromJson(node.get(PARTITION_STATISTICS)); | ||
| } else { | ||
| partitionStatisticsFiles = ImmutableList.of(); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ajantha-bhat and @aokolnychyi, I have a question about this implementation as I'm exploring to add new fields into TableMetadata. Suppose the table db.table's partition stats is updated by the new version of Iceberg via UpdatePartitionStatistics. After that, some old version of Iceberg library or the PyIceberg client produces a new commit to this table. Per my understanding, that writer will produce TableMetadata without PARTITION_STATISTICS since it knows nothing about PARTITION_STATISTICS, which effectively loses that info for the table.
Do you have any solutions or ideas on how to prevent such cases? I can think of some potential ideas, such as:
- upgrade the format_version to a new one whenever we need to add new fields to table metadata, all the old clients will be rejected by the version check then.
- define a writer_version field, old client can read metadata produced by new client, but it will reject writers with old versions.
- move the check to the REST catalog service?
I feel it's too heavy to do a format upgrade when only adding new fields in TableMetadata.
Do you have any other ideas? Really appreciate your inputs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way partition stats tracked and added to table metadata is same as puffin files right now.
The stats are optional, so even if we lose it. Planner won't return wrong query results.
However these stats can be helpful to improve query performance. We are planning to provide a call procedure, compute partition stats. Which will check the last snapshot that had partition stats and incrementally compute the stats for remaining snapshots.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your reply.
The way partition stats tracked and added to table metadata is same as puffin files right now.
The stats are optional, so even if we lose it. Planner won't return wrong query results.
Yes, I know it's optional and it doesn't affect the correctness of queries. My main concern here is that how can prevent old writers from corrupting new writers' metadata in general. I think it's OK for now as only statistics files are added, but it's quite annoying. And It would be possible that we need to add some required fields in the TableMetadata in the future.
PartitionStatisticsFileas per the Spec.PartitionStatisticsFilein a same way as howStatisticsFileis already tracked.Fixes #8457