-
Notifications
You must be signed in to change notification settings - Fork 3k
Data: Fix equality delete set save DATE/TIMESTAMP type of data throw IllegalStateException #3135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Data: Fix equality delete set save DATE/TIMESTAMP type of data throw IllegalStateException #3135
Conversation
…hrow IllegalStateException
|
Running CI |
data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
Outdated
Show resolved
Hide resolved
data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
Outdated
Show resolved
Hide resolved
data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
Outdated
Show resolved
Hide resolved
flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
Outdated
Show resolved
Hide resolved
|
|
||
| Iterables.addAll(actualSet, actual.stream() | ||
| .map(record -> new InternalRecordWrapper(table2.schema().asStruct()).wrap(record)) | ||
| .collect(Collectors.toList())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you using InternalRecordWrapper here and for the expected set? I think that StructLikeSet should work without it, right? If not, then we need to update the set that is produced by rowSet to do the wrapping since future tests may not know to wrap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xloya, can you briefly describe what you did to address this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue OK. The original rowSetWithoutIds()method of DeleteReadTests and the original rowSet() method of TestGenericReaderDeletes returns both the GenericRecord class's data. In TestFlinkInputFormatReaderDeletes the rowSet() method returns RowDataWrapper class's data, and in TestSparkReaderDeletes the rowSet() method returns SparkStructLike class's data.Calling the Assert.assertEquals() method will compare the two StructLikeSets. If it is DATE/TIMESTAMP type of the data of GenericRecord, it will compare(GenericRecord, GenericRecord) and throw an exception of Not a instance of Integer/Long. If actual data class is RowDataWrapper or SparkStructLike, it will compare(GenericRecord, RowDataWrapper/SparkStructLike) and their class mismatch. so they both need to be converted to the InternalRecordWrapper class for comparison
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think either the conversion at the assertion or the conversion at rowSetWithoutIds() and rowSet() is fine, but it must be converted.
| ); | ||
|
|
||
| DeleteFile eqDeletes = FileHelpers.writeDeleteFile( | ||
| table2, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, deleteRowSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see how this delete file would be applied. The partition you're writing it to is Row.of(0), which ends up being 1970-01-01. That should prevent the delete file from being found when planning, so none of the deletes would be applied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xloya, what was the resolution to this? Were the deletes not applied? Was the test incorrect and not working?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue Previously, the data file was written using Row.of(0), which means that all data is written to the 1970-01-01 partition. So all the equality delete data before is also written to Row.of(0). Now it is changed that the data is written to the corresponding date time partition, and then the equality delete data is also written to the corresponding partition.
If keep the data file written to the partition as 1970-01-01, but the partition where the equality delete data is actually written is changed to 2021-09-01, it will not work.
Found spec on official website:
http://iceberg.apache.org/spec/
Like data files, delete files are tracked by partition. In general, a delete file must be applied to older data files with the same partition; see Scan Planning for details. Column metrics can be used to determine whether a delete file’s rows overlap the contents of a data file or a scan range.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Thanks!
|
Thanks, @xloya! The implementation looks correct to me, but there are a few things to fix in the tests. |
|
Thanks @rdblue for your particular review! I will fix them according to your suggestion these days |
…f github.com:xloya/iceberg into fix-equality-delete-set-save-date/timestamp/etc-data
| return set; | ||
| } | ||
|
|
||
| private StructLikeSet rowSetWithoutIds(Set<Integer> idSet, Table iTable, List<Record> recordList) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: the name iTable is odd. Why not just table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally used table, but code style check would report a hidden field error.Maybe there is a conflict with a property called table. iTable means iceberg table.
| .map(record -> new InternalRecordWrapper(iTable.schema().asStruct()).wrap(record)) | ||
| .forEach(set::add); | ||
| return set; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the other rowSetWithoutIds method should be rewritten to call this one since it is more generic. That will also avoid the wrapper problem in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok i'll rewrite the origin rowSetWithoutIds() method
| } | ||
|
|
||
| private StructLikeSet rowSetWithoutIds(int... idsToRemove) { | ||
| private StructLikeSet rowSetWithoutIds(Table iTable, List<Record> recordList, int... idsToRemove) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you're passing the table in, can you make this a static method? You may also be able to use the name table instead of iTable if it is static.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A helpful suggestion, it works, thanks!
|
Looks good to me. Running CI. |
|
Thanks, @xloya! Merged. |
|
Thanks for your time, @rdblue ! |
Co-authored-by: xiaojiebao <[email protected]>
Co-authored-by: xiaojiebao <[email protected]>
Co-authored-by: xiaojiebao <[email protected]>
Co-authored-by: xiaojiebao <[email protected]>
…IllegalStateException apache#3135(merged)
…IllegalStateException apache#3135(merged)
1. What this MR does:
When the equality delete set in DeleteFilter saves a piece of data read from an
equal deleteparquet file in memory, it will call theget()method of theGenericRecordclass for data comparison.If the table schema definition of the current position ofGenericRecordisDATE/TIMESTAMP/TIMEtype, it will be converted toLocalDate/LocalDateTime/LocalTimetype in memory. When theget()method ofGenericRecordis called, the java classInteger/Long/Longdefined in theTypesenum of theDATE/TIMESTAMP/TIMEtype is passed in. Because the type data ofLocalDate/LocalDateTime/LocalTimeis not an instance ofInteger/Long/Longtype, an IllegalStateException with Not an intance of xxx is thrown.The pr first converts the data to the
InternalRecordWrapperbefore saving the data in the equality delete set, and calls itsget()method during comparison. If it is a type that needs to be converted such asDATE/TIMESTAMP/TIME, use thetransformsparameter to convert, otherwise directly Output raw data type.2. Which issue this PR fixes:
Fix issue #3119.
@rdblue @chenjunjiedada @openinx Could you help to take a look at this PR? Thanks in advance!