-
Notifications
You must be signed in to change notification settings - Fork 2.9k
🐛 fix Flink Read support for parquet int96 timestamps #3987
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… int96 after we use `add_filles` syscall to migrate hive table to iceberg table, when we use flink to read that target table, failed ava.lang.UnsupportedOperationException: Unsupported type: optional int96 wafer_start_time = 4 at org.apache.iceberg.flink.data.FlinkParquetReaders$ReadBuilder.primitive(FlinkParquetReaders.java:268) at org.apache.iceberg.flink.data.FlinkParquetReaders$ReadBuilder.primitive(FlinkParquetReaders.java:73) at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visit(TypeWithSchemaVisitor.java:52) at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visitField(TypeWithSchemaVisitor.java:155) at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visitFields(TypeWithSchemaVisitor.java:169) at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visit(TypeWithSchemaVisitor.java:47) at org.apache.iceberg.flink.data.FlinkParquetReaders.buildReader(FlinkParquetReaders.java:68) at org.apache.iceberg.flink.source.RowDataFileScanTaskReader.lambda$newParquetIterable$1(RowDataFileScanTaskReader.java:138) at org.apache.iceberg.parquet.ReadConf.(ReadConf.java:118)
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
Outdated
Show resolved
Hide resolved
|
I like this 🐛 |
kbendick
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @kingeasternsun!
For the systems I'm used to, usually reading int96 as timestamp is a configurable option. I'm not sure with respect to Flink if it's configurable or not, but in either case it's something we might consider.
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
Outdated
Show resolved
Hide resolved
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
Outdated
Show resolved
Hide resolved
…as little as possible
flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
Outdated
Show resolved
Hide resolved
| List<RowData> readDataRows = rowDatasFromFile(parquetInputFile, schema); | ||
| Assert.assertEquals(rows.size(), readDataRows.size()); | ||
| for (int i = 0; i < rows.size(); i += 1) { | ||
| Assert.assertEquals(rows.get(i).getLong(0), readDataRows.get(i).getLong(0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that Flink used millisecond precision for timestamps? Spark uses microsecond. Should these match?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll reconsider it
flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
Outdated
Show resolved
Hide resolved
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
Outdated
Show resolved
Hide resolved
| exclude group: 'org.apache.hive', module: 'hive-storage-api' | ||
| } | ||
|
|
||
| testImplementation "org.apache.spark:spark-sql_2.12:3.2.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this dependency was added for writing the timestamp as int96 in the unit test, but in fact we apache flink's ParquetRowDataWriter support writing a timestamp_with_local_time_zone into an INT96. So I will suggest to use the flink parquet writer rather than the spark parquet writer. (It's strange for me to introduce a spark module in in the flink module).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually prefer using the Spark module, unless Flink natively supports writing INT96 timestamps to Parquet. The benefit of using the Spark module is that support has been around for a long time and is relatively trusted to produce correct INT96 timestamp values.
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| @Test | ||
| public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will suggest to write few rows by using the flink native writers, and then use the the following readers to assert the their results:
- flink native parquet reader;
- iceberg generic parquet reader
- iceberg flink parquet reader
… use GenericParquetReaders to read int96 file
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
In spark We use
add_fillessyscall to migrate hive table to iceberg table, but when we use flink to read that target table, failed with these errorsThe solution was inspired by #1184