diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 651a6884cf57..e4920222bb47 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -19,6 +19,8 @@ package org.apache.iceberg.data.parquet; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; @@ -28,6 +30,7 @@ import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; @@ -299,6 +302,10 @@ public ParquetValueReader primitive(org.apache.iceberg.types.Type.PrimitiveTy case INT64: case DOUBLE: return new ParquetValueReaders.UnboxedReader<>(desc); + case INT96: + // Impala & Spark used to write timestamps as INT96 without a logical type. For backwards + // compatibility we try to read INT96 as timestamps. + return new TimestampInt96Reader(desc); default: throw new UnsupportedOperationException("Unsupported type: " + primitive); } @@ -345,6 +352,25 @@ public LocalDateTime read(LocalDateTime reuse) { } } + private static class TimestampInt96Reader extends ParquetValueReaders.PrimitiveReader { + private static final long UNIX_EPOCH_JULIAN = 2_440_588L; + + private TimestampInt96Reader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public LocalDateTime read(LocalDateTime reuse) { + final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); + final long timeOfDayNanos = byteBuffer.getLong(); + final int julianDay = byteBuffer.getInt(); + + return Instant + .ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN)) + .plusNanos(timeOfDayNanos).atOffset(ZoneOffset.UTC).toLocalDateTime(); + } + } + private static class TimestamptzReader extends ParquetValueReaders.PrimitiveReader { private TimestamptzReader(ColumnDescriptor desc) { super(desc); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java index 183bb69ad809..99a7453cf0b2 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java @@ -47,6 +47,13 @@ public Long next() { return nextLong(); } }; + case INT96: + return (ColumnIterator) new ColumnIterator(desc, writerVersion) { + @Override + public Binary next() { + return nextBinary(); + } + }; case FLOAT: return (ColumnIterator) new ColumnIterator(desc, writerVersion) { @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/PageIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/PageIterator.java index e34db05e47d0..f482ae045f35 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/PageIterator.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/PageIterator.java @@ -59,6 +59,12 @@ public Long next() { return nextLong(); } }; + case INT96: + return (PageIterator) new PageIterator(desc, writerVersion) { + public Binary next() { + return nextBinary(); + } + }; case FLOAT: return (PageIterator) new PageIterator(desc, writerVersion) { @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index 55bf925e0581..9b675add550b 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -681,6 +681,7 @@ private Setter newSetter(ParquetValueReader reader, Type type) { return (record, pos, ignored) -> setFloat(record, pos, unboxed.readFloat()); case DOUBLE: return (record, pos, ignored) -> setDouble(record, pos, unboxed.readDouble()); + case INT96: case FIXED_LEN_BYTE_ARRAY: case BINARY: return (record, pos, ignored) -> set(record, pos, unboxed.readBinary()); diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 51ddc9432bc3..98d20ef036df 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -22,9 +22,11 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; @@ -277,6 +279,10 @@ public ParquetValueReader primitive(org.apache.iceberg.types.Type.PrimitiveTy case INT64: case DOUBLE: return new UnboxedReader<>(desc); + case INT96: + // Impala & Spark used to write timestamps as INT96 without a logical type. For backwards + // compatibility we try to read INT96 as timestamps. + return new TimestampInt96Reader(desc); default: throw new UnsupportedOperationException("Unsupported type: " + primitive); } @@ -350,6 +356,29 @@ public long readLong() { } } + private static class TimestampInt96Reader extends UnboxedReader { + private static final long UNIX_EPOCH_JULIAN = 2_440_588L; + + TimestampInt96Reader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public Long read(Long ignored) { + return readLong(); + } + + @Override + public long readLong() { + final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); + final long timeOfDayNanos = byteBuffer.getLong(); + final int julianDay = byteBuffer.getInt(); + + return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN) + + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos); + } + } + private static class StringReader extends PrimitiveReader { StringReader(ColumnDescriptor desc) { super(desc); diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java index 5e22ccef24d9..642a8de9322e 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java @@ -23,19 +23,36 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.avro.generic.GenericData; +import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Files; +import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.Schema; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetWriteAdapter; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Assume; +import org.junit.Test; import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe; +import static org.apache.iceberg.types.Types.NestedField.required; public class TestSparkParquetReader extends AvroDataTest { @Override @@ -67,4 +84,76 @@ protected void writeAndValidate(Schema schema) throws IOException { Assert.assertFalse("Should not have extra rows", rows.hasNext()); } } + + protected List rowsFromFile(InputFile inputFile, Schema schema) throws IOException { + try (CloseableIterable reader = + Parquet.read(inputFile) + .project(schema) + .createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type)) + .build()) { + return Lists.newArrayList(reader); + } + } + + @Test + public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException { + String outputFilePath = String.format("%s/%s", temp.getRoot().getAbsolutePath(), "parquet_int96.parquet"); + HadoopOutputFile outputFile = + HadoopOutputFile.fromPath( + new org.apache.hadoop.fs.Path(outputFilePath), new Configuration()); + Schema schema = new Schema(required(1, "ts", Types.TimestampType.withZone())); + StructType sparkSchema = + new StructType( + new StructField[] { + new StructField("ts", DataTypes.TimestampType, true, Metadata.empty()) + }); + List rows = Lists.newArrayList(RandomData.generateSpark(schema, 10, 0L)); + + try (FileAppender writer = + new ParquetWriteAdapter<>( + new NativeSparkWriterBuilder(outputFile) + .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json()) + .set("spark.sql.parquet.writeLegacyFormat", "false") + .set("spark.sql.parquet.outputTimestampType", "INT96") + .build(), + MetricsConfig.getDefault())) { + writer.addAll(rows); + } + + List readRows = rowsFromFile(Files.localInput(outputFilePath), schema); + Assert.assertEquals(rows.size(), readRows.size()); + Assert.assertThat(readRows, CoreMatchers.is(rows)); + } + + /** + * Native Spark ParquetWriter.Builder implementation so that we can write timestamps using Spark's native + * ParquetWriteSupport. + */ + private static class NativeSparkWriterBuilder + extends ParquetWriter.Builder { + private final Map config = Maps.newHashMap(); + + NativeSparkWriterBuilder(org.apache.parquet.io.OutputFile path) { + super(path); + } + + public NativeSparkWriterBuilder set(String property, String value) { + this.config.put(property, value); + return self(); + } + + @Override + protected NativeSparkWriterBuilder self() { + return this; + } + + @Override + protected WriteSupport getWriteSupport(Configuration configuration) { + for (Map.Entry entry : config.entrySet()) { + configuration.set(entry.getKey(), entry.getValue()); + } + + return new org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport(); + } + } }