-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Read support for parquet int96 timestamps #1184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Read support for parquet int96 timestamps #1184
Conversation
| SparkSession.builder() | ||
| .master("local[2]") | ||
| .config("spark.sql.parquet.int96AsTimestamp", "false") | ||
| .getOrCreate(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to avoid creating a Spark session just to write a timestamp? What about calling Spark's FileFormat to write directly instead?
We wrap Spark's FileFormat in our DSv2 table implementation: https://github.com/Netflix/iceberg/blob/netflix-spark-2.4/metacat/src/main/java/com/netflix/iceberg/batch/BatchPatternWrite.java#L90
This test would run much faster by using that to create a file instead of creating a Spark context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I would much rather avoid creating a SparkSession here if possible. However, looking into ParquetFileFormat it seems like we would still need to pass a SparkSession to create the writer.
I can look at ParquetOutputWriter but I might need to match the configuration there with what Spark uses to write int96.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another approach would be to check-in a parquet file written by a spark and have the test just read it?
A drawback with that approach is that updating this file would be brittle, but I can check in the code that writes the file in an ignored test, but that should avoid us from creating a spark session during unit tests. What do you think @rdblue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At one point, we supported writing to Parquet using Spark's built-in ReadSupport. I think we can probably get that working again to create the files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, looking at one of the tests we do support writing parquet files using Spark's WriteSupport.
To be able to use a FileAppender I had to add a TimestampAsInt96 type (that can only be written using Spark's builtin WriteSupport) so that schema conversion within Iceberg's ParquetWriteSupport knows that this timestamps should be encoded as int96 in the parquet schema.
|
|
||
| final String parquetPath = temp.getRoot().getAbsolutePath() + "/parquet_int96"; | ||
| final java.sql.Timestamp ts = java.sql.Timestamp.valueOf("2014-01-01 23:00:01"); | ||
| spark.createDataset(ImmutableList.of(ts), Encoders.TIMESTAMP()).write().parquet(parquetPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using Spark's FileFormat would also make this test easier. You'd be able to pass in a value in micros and validate that you get the same value back, unmodified. You'd also not need to locate the Parquet file using find.
|
Mostly looks good, but I'd like to fix up the test to avoid creating a SparkSession for just one case. Thanks @gustavoatt! |
gustavoatt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplified the test by removing the usage of a SparkSession but still used Spark's ParquetWriteSupport. PTAL @rdblue
| SparkSession.builder() | ||
| .master("local[2]") | ||
| .config("spark.sql.parquet.int96AsTimestamp", "false") | ||
| .getOrCreate(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, looking at one of the tests we do support writing parquet files using Spark's WriteSupport.
To be able to use a FileAppender I had to add a TimestampAsInt96 type (that can only be written using Spark's builtin WriteSupport) so that schema conversion within Iceberg's ParquetWriteSupport knows that this timestamps should be encoded as int96 in the parquet schema.
| /** | ||
| * @return Timestamp type (with timezone) represented as INT96. This is only added for compatibility reasons | ||
| * and can only be written using a Spark's ParquetWriteSupport. Writing this type should be avoided. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should change the type system to support this. INT96 may be something that we can read, but Iceberg cannot write it, per the spec.
Was this needed to build the tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I found a way to have tests running that doesn't add a new type, I had to create an implementation of ParquetWriter.Builder that uses Spark's ParquetWriteSupport and Iceberg's ParquetWriteAdapter to avoid creating a SparkSession.
| final Schema schema = new Schema(required(1, "ts", Types.TimestampType.asSparkInt96())); | ||
| final StructType sparkSchema = SparkSchemaUtil.convert(schema); | ||
| final Path parquetFile = Paths.get(temp.getRoot().getAbsolutePath(), "parquet_int96.parquet"); | ||
| final List<InternalRow> rows = Lists.newArrayList(RandomData.generateSpark(schema, 10, 0L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we don't use final for local variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Removed these final modifiers.
| public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException { | ||
| final Schema schema = new Schema(required(1, "ts", Types.TimestampType.asSparkInt96())); | ||
| final StructType sparkSchema = SparkSchemaUtil.convert(schema); | ||
| final Path parquetFile = Paths.get(temp.getRoot().getAbsolutePath(), "parquet_int96.parquet"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use temp.newFile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially tried that way but the writer fails because the file already exists.
| .set("spark.sql.parquet.int96AsTimestamp", "true") | ||
| .set("spark.sql.parquet.writeLegacyFormat", "false") | ||
| .set("spark.sql.parquet.outputTimestampType", "INT96") | ||
| .schema(schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to pass in a normal timestamp type and set a property, if needed, to enable INT96 support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I fully understand this comment.
But I did change my approach here, and while still writing InternalRow I removed most of these properties and left only the relevant ones to make sure that Spark writes these as int96.
gustavoatt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review @rdblue! I was able to keep Iceberg types unchanged and only added read support for int96 timestamps so this PR should be ready for another look 👀
| public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException { | ||
| final Schema schema = new Schema(required(1, "ts", Types.TimestampType.asSparkInt96())); | ||
| final StructType sparkSchema = SparkSchemaUtil.convert(schema); | ||
| final Path parquetFile = Paths.get(temp.getRoot().getAbsolutePath(), "parquet_int96.parquet"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially tried that way but the writer fails because the file already exists.
| final Schema schema = new Schema(required(1, "ts", Types.TimestampType.asSparkInt96())); | ||
| final StructType sparkSchema = SparkSchemaUtil.convert(schema); | ||
| final Path parquetFile = Paths.get(temp.getRoot().getAbsolutePath(), "parquet_int96.parquet"); | ||
| final List<InternalRow> rows = Lists.newArrayList(RandomData.generateSpark(schema, 10, 0L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Removed these final modifiers.
| /** | ||
| * @return Timestamp type (with timezone) represented as INT96. This is only added for compatibility reasons | ||
| * and can only be written using a Spark's ParquetWriteSupport. Writing this type should be avoided. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I found a way to have tests running that doesn't add a new type, I had to create an implementation of ParquetWriter.Builder that uses Spark's ParquetWriteSupport and Iceberg's ParquetWriteAdapter to avoid creating a SparkSession.
| .set("spark.sql.parquet.int96AsTimestamp", "true") | ||
| .set("spark.sql.parquet.writeLegacyFormat", "false") | ||
| .set("spark.sql.parquet.outputTimestampType", "INT96") | ||
| .schema(schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I fully understand this comment.
But I did change my approach here, and while still writing InternalRow I removed most of these properties and left only the relevant ones to make sure that Spark writes these as int96.
|
|
||
| @Override | ||
| public LocalDateTime read(LocalDateTime reuse) { | ||
| final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for reviewers (and future me): toByteBuffer returns a duplicate of the internal buffer so that it is safe for uses of it to modify the buffer's position with methods like getLong.
|
Nice work, @gustavoatt! Thank you for updating this so that the test is self-contained. I'll merge this when tests are passing. |
|
Merged. Thanks for fixing this, @gustavoatt! |
|
Awesome possum, thanks for resolving this |
|
Thanks for merging and for the review @rdblue! |
Summary
Add read support for Parquet INT96 timestamps (fixes #1138). This is needed so that parquet files written by Spark, that used INT96 timestamps, are able to be read by Iceberg without having to rewrite these files. This is specially useful for migrations.
apache/parquet-format#49 has more information about how parquet int96 timestamps are stored. Note that I only implemented read support since this representation has many issues (as visible in the conversation in the
parquet-formatPR).Testing