-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Fix Partitions table for evolved partition specs #4560
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Fix Partitions table for evolved partition specs #4560
Conversation
|
FYI @rdblue @aokolnychyi @RussellSpitzer @szlta if can you help review, thanks |
| originalPartitionIndex++; | ||
| } | ||
|
|
||
| PartitionData result = new PartitionData(newSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
result -> normalizedPartition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, renamed relevant fields in this method.
| return partitions.all(); | ||
| } | ||
|
|
||
| private static PartitionData normalizePartition(PartitionData partition, Types.StructType newSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newSchema -> normalizedPartitionSchema?
Maybe that's too long?
|
|
||
| PartitionData result = new PartitionData(newSchema); | ||
|
|
||
| int finalPartitionIndex = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
normalizedPartitionIndex (or normalizedIndex)
| } | ||
|
|
||
| @Test | ||
| public void testPartitionMetadataTable() throws ParseException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As one more thing to test, can you check reordering partition transforms?
Going from data, category -> category,data
I think you have this covered but I want to make sure we have a test in there since I think this is a pretty common usecase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RussellSpitzer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One suggested additional test, and I'm not quite sure about the Precondition check but other than that looks good
| for (FileScanTask task : tasks) { | ||
| partitions.get(task.file().partition()).update(task.file()); | ||
| PartitionData original = (PartitionData) task.file().partition(); | ||
| PartitionData normalized = normalizePartition(original, Partitioning.partitionType(table)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we reuse the result of the Partitioning.partitionType(table) invocation at line 99 rather than calling it in every iteration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea good point
|
Thanks for catching this @szehon-ho, this change looks good to me, just added a nit comment. |
| } | ||
|
|
||
| private static PartitionData normalizePartition(PartitionData partition, Types.StructType normalizedPartitionSchema) { | ||
| Map<Integer, Object> fieldIdToValues = Maps.newHashMap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing we may want to consider here is caching these normalization by spec rather than recomputing the mapping for every partition value. Thinking about this code being run on millions of files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still wasn't sure how to cache something per spec, I added a cache per partition for the normalized key, let me know if you have some thought?
By the way, I tried to use the un-normalized partition in the map itself as map key, but doesn't work, as it duplicates if the partition field has name change. So we still need to use normalized partition as map key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline, added cache of positional mappings to the final partition type, for each spec-id
| // Re-added partition fields currently not re-associated: https://github.com/apache/iceberg/issues/4292 | ||
| // In V1, dropped partition fields show separately when field is re-added | ||
| // In V2, re-added field currently conflicts with its deleted form | ||
| if (formatVersion == 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Putting this in a new test with the "Assume(formatVersion ==1)" would be a bit cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, split this into 3 tests
RussellSpitzer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two suggestions but I think this is good as is,
- Spitting up the test so that we have individual tests for the different alterations
- Caching the field mapping by partition spec
8253071 to
3d2860f
Compare
|
@RussellSpitzer added additional cache of field positional mapping per spec-id as suggested , it's changed a bit, let me know if you can take another look |
|
|
||
| LoadingCache<Integer, Integer[]> originalPartitionFieldPositionsBySpec = Caffeine.newBuilder().build(specId -> | ||
| originalPositions(table, specId, normalizedPartitionType)); | ||
| LoadingCache<Pair<PartitionData, Integer>, PartitionData> normalizedPartitions = Caffeine.newBuilder().build( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This I don't think is super important to cache since the getters and setters are pretty fast and you have the integer mapping already. So I probably would drop this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was saving the construction of the new partitionData object, at the cost of memory. but yea it might not be worth it, dropped it.
| Types.StructType normalizedPartitionType = Partitioning.partitionType(table); | ||
| PartitionMap partitions = new PartitionMap(); | ||
|
|
||
| LoadingCache<Integer, Integer[]> originalPartitionFieldPositionsBySpec = Caffeine.newBuilder().build(specId -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the number of partition specs should be very small so I would probably just use a
Maps.newHashMap()
Then .computeIfAbsent
Not that I think there is something wrong with Caffeine, just seems a bit heavy weight to me in this use case
| for (FileScanTask task : tasks) { | ||
| partitions.get(task.file().partition()).update(task.file()); | ||
| PartitionData original = (PartitionData) task.file().partition(); | ||
| int specId = task.spec().specId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking this would look something more like
positionMapping = originalPositions.computeIfAbsent(specId, specId -> originalPositions())
make normalizedPartitionData
for (originalIndex = 0; originalIndex < original.length; originalIndex ++) {
normalizedPartitionData.put(positionMapping[originalIndex], original.get(originalIndex))
}
This could be a separate function like we have now but I don't know if it's that bad if we just inline it here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, yea your suggestion is cleaner.
It's a bit easier to read that logic in a separate function, so kept it outside.
6d11188 to
5129068
Compare
| Partition partition = partitions.get(key); | ||
| if (partition == null) { | ||
| partition = new Partition(key); | ||
| partitions.put(StructLikeWrapper.forType(type).set(key), partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused by this change, I believe the issue here is that StructType does not have a well defined hashFunction (since implementations can do whatever they like) which is why we use the Wrapper to make sure we have a valid hash. (and equals)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to map of PartitionData (I feel, it should have been that way in the beginning)
| sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg " + | ||
| "TBLPROPERTIES ('commit.manifest-merge.enabled' 'false')", tableName); | ||
| initTable(); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary white space change here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we want to make this change we need to do it in all the tests and currently this is missing format changes for testFilesMetadataTable and testWithUnknownTransfer. Probably fine to just keep it as is and match in the new tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed whitespace
|
|
||
| Table table = validationCatalog.loadTable(tableIdent); | ||
|
|
||
| table.updateSpec() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could just do this in SparkSQL if you like and skip the refresh, but this is fine too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will just keep it for now then, matches existing tests
2c7d303 to
3cfb65c
Compare
RussellSpitzer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for baring with my many comments. Feel free to merge whenever you feel ready.
|
No problem, thanks for all the suggestions ! |
(cherry picked from commit 4c3aac2)
#4516 fixes the schema of the partitions table in the case of changed partition specs to use Partitioning.partitionType (a union of all previous partition specs), but the data is still wrong in some cases:
iceberg/core/src/main/java/org/apache/iceberg/PartitionsTable.java
Line 93 in a78aa2d
This fixes the problem by :