-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Add tests to check whether should remove meta columns in source reader #3477
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| } | ||
|
|
||
| @Override | ||
| public void setRowPositionSupplier(Supplier<Long> posSupplier) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a critical bug which we've missed to set the posSupplier to the reader since #1222. But I'm still not sure why this TestFlinkInputFormatReaderDeletes did not cover the avro case.
Let me dig it into deeper and try to provide unit tests in the TestFlinkInputFormatReaderDeletes to address this thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Others from Chinese Iceberg group also encountered this bug, they are stacktraces are the following:
Caused by: java.lang.NullPointerException
at org.apache.iceberg.data.DeleteFilter.pos(DeleteFilter.java:108) ~[iceberg-flink-runtime-0.12.1-20211109.jar:?]
at org.apache.iceberg.deletes.Deletes$PositionSetDeleteFilter.shouldKeep(Deletes.java:157) ~[iceberg-flink-runtime-0.12.1-20211109.jar:?]
at org.apache.iceberg.util.Filter$Iterator.shouldKeep(Filter.java:49) ~[iceberg-flink-runtime-0.12.1-20211109.jar:?]
at org.apache.iceberg.io.FilterIterator.advance(FilterIterator.java:67) ~[iceberg-flink-runtime-0.12.1-20211109.jar:?]
at org.apache.iceberg.io.FilterIterator.hasNext(FilterIterator.java:50) ~[iceberg-flink-runtime-0.12.1-20211109.jar:?]
at org.apache.iceberg.io.FilterIterator.advance(FilterIterator.java:65) ~[iceberg-flink-runtime-0.12.1-20211109.jar:?]
at org.apache.iceberg.io.FilterIterator.hasNext(FilterIterator.java:50) ~[iceberg-flink-runtime-0.12.1-20211109.jar:?]
at org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator(DataIterator.java:100) ~[iceberg-flink-runtime-0.12.1-20211109.jar:?]
at org.apache.iceberg.flink.source.DataIterator.hasNext(DataIterator.java:84) ~[iceberg-flink-runtime-0.12.1-20211109.jar:?]
at org.apache.iceberg.flink.source.FlinkInputFormat.reachedEnd(FlinkInputFormat.java:104) ~[iceberg-flink-runtime-0.12.1-20211109.jar:?]
at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:89) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we separate this fix into other issue? This doesn't seem to be related to the unittest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's exactly what I plan to do. Thanks @Reo-LEI .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created separate PR for this: #3540
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merged #3540.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the separate PR #3550 has been merged, Let's update this PR.
|
|
||
| @Test | ||
| public void testV1SkipToRemoveMetaColumn() throws IOException { | ||
| testSkipToRemoveMetaColumn(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't we move formatVersion to test parameters too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @stevenzwu for the comment. I did not use the parameterized formatVersion approach before because not all of the unit tests are need to be addressed in both v1 and v2 tables. For example, the testV2RemoveMetaColumn should only be checked for v2 tables. If we use the parameterized formatVersion, then there will introduce more unit tests that be ignored because we will usually provide a line in testV2RemoveMetaColumn as : Assume.assumeTrue("xxx", formatVersion==2).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This make sense.
| List<RowData> results = Lists.newArrayList(); | ||
| TestHelpers.readRowData(input, rowData -> { | ||
| // If project to remove the meta columns, it will get a RowDataProjection. | ||
| Assert.assertTrue(rowData instanceof GenericRowData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the v1 table, we don't need to remove the meta columns because we don't even add them. So all the output rows should be an instance of GenericRowData.
| List<RowData> results = Lists.newArrayList(); | ||
| TestHelpers.readRowData(input, rowData -> { | ||
| // If project to remove the meta columns, it will get a RowDataProjection. | ||
| Assert.assertTrue(rowData instanceof RowDataProjection); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For v2 table with eq-deletes or pos-deletes to merge, we will add _pos, file_path, is_deleted meta columns. For flink engine, we will need to project the row data to remove the extra meta columns for users. The RowDataProjection was introduced since here https://github.com/apache/iceberg/pull/3240/files#diff-aed36b23cf7e6110f84bd3b34307240d2174b2be2edb987dfcd22ff15ab75f72R81-R83
Reo-LEI
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR is look good to me overall. Thanks @openinx add this tests.
| // Create the table with given format version. | ||
| String location = folder.getRoot().getAbsolutePath(); | ||
| Table table = SimpleDataUtil.createTable(location, | ||
| ImmutableMap.of("format-version", String.valueOf(formatVersion)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think we should use TableProperties.FORMAT_VERSION here.
|
@rdblue , Is there any chance to take a look at this PR ? |
|
Sure, I'll take a look. Can you give me more background on meta-columns and when they are excluded? |
|
@rdblue , I think this conversation is a good material to read for this PR's background: https://github.com/apache/iceberg/pull/3240/files#r736304982 |
| input.open(s); | ||
| try { | ||
| while (!input.reachedEnd()) { | ||
| RowData row = input.nextRecord(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This is a new addition, but can we add an in-line column on the null to indicate that it's reuse like /** resuse **/? I always forget.
kbendick
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM
|
Thanks, @openinx! I think we also need to port these tests to 1.14 as well. |
|
Oh woops. I clicked request changes when I meant to click approve. Sorry about that! It was merged anyway 🙂 |
This is trying to address issue: #3431