-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Flink 1.20: Support Avro and Parquet timestamp(9), unknown, and defaults #12470
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I'll also follow up with a PR for Parquet readers, but that depends on changes in #12463. |
5f505e7 to
0af3f01
Compare
|
#12463 was merged and the changes for Parquet were small, so I included them here. |
| } else { | ||
| return Optional.of(new MicrosToTimestampReader(desc)); | ||
| } | ||
| return Optional.of(new MicrosToTimestampReader(desc)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, the readers were converting values to LocalDateTime or OffsetDateTime and then Flink would convert those values back to a (millis, nanosOfMilli) pair. This involved a lot of unnecessary date/time logic in both Iceberg and Flink as well as readers to produce the separate types.
Now, the conversion to Flink is direct and doesn't go through Java date/time classes. That avoids all time zone calculations and should be quicker.
| LogicalTypeAnnotation annotation = primitive.getLogicalTypeAnnotation(); | ||
| if (annotation != null) { | ||
| Optional<ParquetValueWriter<?>> writer = | ||
| annotation.accept(new LogicalTypeWriterBuilder(fType, desc)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated this to use the logical annotation visitor.
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
Outdated
Show resolved
Hide resolved
| } | ||
| pos += 1; | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this is a bit strange. Could we fix the empty lines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For dense control flow blocks, we often leave extra newlines to make it more readable.
| public void testNumericTypes() throws IOException { | ||
| List<Record> expected = | ||
| ImmutableList.of( | ||
| recordNumType( | ||
| 2, | ||
| Integer.MAX_VALUE, | ||
| Float.MAX_VALUE, | ||
| Double.MAX_VALUE, | ||
| Long.MAX_VALUE, | ||
| 1643811742000L, | ||
| 1643811742000L, | ||
| 1643811742000L, | ||
| 10.24d), | ||
| recordNumType( | ||
| 2, | ||
| Integer.MIN_VALUE, | ||
| Float.MIN_VALUE, | ||
| Double.MIN_VALUE, | ||
| Long.MIN_VALUE, | ||
| 1643811742000L, | ||
| 1643811742000L, | ||
| 1643811742000L, | ||
| 10.24d)); | ||
|
|
||
| writeAndValidate(SCHEMA_NUM_TYPE, expected); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be an interesting edge case to test in the TestData?
Min/max values for different columns? Could be especially interesting for timestamps/date etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These cases are tested in the random data generator, so this is duplication.
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
Outdated
Show resolved
Hide resolved
|
Thanks for reviewing, @pvary and @danielcweeks! |
This updates Flink's Avro and Parquet readers to support new timestamp(9) and unknown types.
While enabling
DataTestcases, I found thatsupportsDefaultValueswas not enabled so default value tests were not running for Avro. After I enabled those tests, I also needed to update theRowDataassertions and also convert values to match Flink's object model in the readers by callingRowDataUtil.convertConstant.