diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 4b7d3a19d50a..710c771036d4 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -356,7 +356,7 @@ public LocalDateTime read(LocalDateTime reuse) { } } - private static class TimestampInt96Reader extends ParquetValueReaders.PrimitiveReader { + private static class TimestampInt96Reader extends ParquetValueReaders.PrimitiveReader { private static final long UNIX_EPOCH_JULIAN = 2_440_588L; private TimestampInt96Reader(ColumnDescriptor desc) { @@ -364,14 +364,14 @@ private TimestampInt96Reader(ColumnDescriptor desc) { } @Override - public LocalDateTime read(LocalDateTime reuse) { + public OffsetDateTime read(OffsetDateTime reuse) { final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); final long timeOfDayNanos = byteBuffer.getLong(); final int julianDay = byteBuffer.getInt(); return Instant .ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN)) - .plusNanos(timeOfDayNanos).atOffset(ZoneOffset.UTC).toLocalDateTime(); + .plusNanos(timeOfDayNanos).atOffset(ZoneOffset.UTC); } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java b/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java index 43c2cd2b70d2..069e505ff00d 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java @@ -156,6 +156,8 @@ public Type primitive(PrimitiveType primitive) { return Types.DoubleType.get(); case FIXED_LEN_BYTE_ARRAY: return Types.FixedType.ofLength(primitive.getTypeLength()); + case INT96: + return Types.TimestampType.withZone(); case BINARY: return Types.BinaryType.get(); } diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java index 642a8de9322e..03d234c1eca5 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java @@ -26,14 +26,23 @@ import java.util.Map; import org.apache.avro.generic.GenericData; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.parquet.ParquetWriteAdapter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.TypeUtil; @@ -95,6 +104,29 @@ protected List rowsFromFile(InputFile inputFile, Schema schema) thr } } + protected Table tableFromInputFile(InputFile inputFile, Schema schema) throws IOException { + HadoopTables tables = new HadoopTables(); + Table table = + tables.create( + schema, + PartitionSpec.unpartitioned(), + ImmutableMap.of(), + temp.newFolder().getCanonicalPath()); + + table + .newAppend() + .appendFile( + DataFiles.builder(PartitionSpec.unpartitioned()) + .withFormat(FileFormat.PARQUET) + .withInputFile(inputFile) + .withMetrics(ParquetUtil.fileMetrics(inputFile, MetricsConfig.getDefault())) + .withFileSizeInBytes(inputFile.getLength()) + .build()) + .commit(); + + return table; + } + @Test public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException { String outputFilePath = String.format("%s/%s", temp.getRoot().getAbsolutePath(), "parquet_int96.parquet"); @@ -120,9 +152,21 @@ public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOExceptio writer.addAll(rows); } - List readRows = rowsFromFile(Files.localInput(outputFilePath), schema); + InputFile parquetInputFile = Files.localInput(outputFilePath); + List readRows = rowsFromFile(parquetInputFile, schema); Assert.assertEquals(rows.size(), readRows.size()); Assert.assertThat(readRows, CoreMatchers.is(rows)); + + // Now we try to import that file as an Iceberg table to make sure Iceberg can read + // Int96 end to end. + Table int96Table = tableFromInputFile(parquetInputFile, schema); + List tableRecords = Lists.newArrayList(IcebergGenerics.read(int96Table).build()); + + Assert.assertEquals(rows.size(), tableRecords.size()); + + for (int i = 0; i < tableRecords.size(); i++) { + GenericsHelpers.assertEqualsUnsafe(schema.asStruct(), tableRecords.get(i), rows.get(i)); + } } /**