From ca3e955ed9a5f67314e43c6d1c73332a6633da76 Mon Sep 17 00:00:00 2001 From: Gustavo Torres Date: Tue, 7 Jul 2020 17:05:32 -0700 Subject: [PATCH 1/7] Read support for int96 as timestamp --- .../data/parquet/BaseParquetReaders.java | 26 +++++++++++++++++ .../iceberg/parquet/ColumnIterator.java | 7 +++++ .../apache/iceberg/parquet/PageIterator.java | 6 ++++ .../spark/data/SparkParquetReaders.java | 29 +++++++++++++++++++ 4 files changed, 68 insertions(+) diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 651a6884cf57..e4920222bb47 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -19,6 +19,8 @@ package org.apache.iceberg.data.parquet; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; @@ -28,6 +30,7 @@ import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; @@ -299,6 +302,10 @@ public ParquetValueReader primitive(org.apache.iceberg.types.Type.PrimitiveTy case INT64: case DOUBLE: return new ParquetValueReaders.UnboxedReader<>(desc); + case INT96: + // Impala & Spark used to write timestamps as INT96 without a logical type. For backwards + // compatibility we try to read INT96 as timestamps. + return new TimestampInt96Reader(desc); default: throw new UnsupportedOperationException("Unsupported type: " + primitive); } @@ -345,6 +352,25 @@ public LocalDateTime read(LocalDateTime reuse) { } } + private static class TimestampInt96Reader extends ParquetValueReaders.PrimitiveReader { + private static final long UNIX_EPOCH_JULIAN = 2_440_588L; + + private TimestampInt96Reader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public LocalDateTime read(LocalDateTime reuse) { + final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); + final long timeOfDayNanos = byteBuffer.getLong(); + final int julianDay = byteBuffer.getInt(); + + return Instant + .ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN)) + .plusNanos(timeOfDayNanos).atOffset(ZoneOffset.UTC).toLocalDateTime(); + } + } + private static class TimestamptzReader extends ParquetValueReaders.PrimitiveReader { private TimestamptzReader(ColumnDescriptor desc) { super(desc); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java index 183bb69ad809..99a7453cf0b2 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java @@ -47,6 +47,13 @@ public Long next() { return nextLong(); } }; + case INT96: + return (ColumnIterator) new ColumnIterator(desc, writerVersion) { + @Override + public Binary next() { + return nextBinary(); + } + }; case FLOAT: return (ColumnIterator) new ColumnIterator(desc, writerVersion) { @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/PageIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/PageIterator.java index e34db05e47d0..f482ae045f35 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/PageIterator.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/PageIterator.java @@ -59,6 +59,12 @@ public Long next() { return nextLong(); } }; + case INT96: + return (PageIterator) new PageIterator(desc, writerVersion) { + public Binary next() { + return nextBinary(); + } + }; case FLOAT: return (PageIterator) new PageIterator(desc, writerVersion) { @Override diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 51ddc9432bc3..98d20ef036df 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -22,9 +22,11 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; @@ -277,6 +279,10 @@ public ParquetValueReader primitive(org.apache.iceberg.types.Type.PrimitiveTy case INT64: case DOUBLE: return new UnboxedReader<>(desc); + case INT96: + // Impala & Spark used to write timestamps as INT96 without a logical type. For backwards + // compatibility we try to read INT96 as timestamps. + return new TimestampInt96Reader(desc); default: throw new UnsupportedOperationException("Unsupported type: " + primitive); } @@ -350,6 +356,29 @@ public long readLong() { } } + private static class TimestampInt96Reader extends UnboxedReader { + private static final long UNIX_EPOCH_JULIAN = 2_440_588L; + + TimestampInt96Reader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public Long read(Long ignored) { + return readLong(); + } + + @Override + public long readLong() { + final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); + final long timeOfDayNanos = byteBuffer.getLong(); + final int julianDay = byteBuffer.getInt(); + + return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN) + + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos); + } + } + private static class StringReader extends PrimitiveReader { StringReader(ColumnDescriptor desc) { super(desc); From cea839f58b8b0e3d73f415b2770913e1d0955f31 Mon Sep 17 00:00:00 2001 From: Gustavo Torres Date: Wed, 8 Jul 2020 12:07:11 -0700 Subject: [PATCH 2/7] Parquet int96 timestamp spark read tests --- .../spark/data/TestSparkParquetReader.java | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java index 5e22ccef24d9..ed4750e5463d 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java @@ -21,21 +21,32 @@ import java.io.File; import java.io.IOException; +import java.nio.file.Path; +import java.time.Instant; import java.util.Iterator; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.avro.generic.GenericData; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; import org.junit.Assert; import org.junit.Assume; +import org.junit.Test; import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe; +import static org.apache.iceberg.types.Types.NestedField.optional; public class TestSparkParquetReader extends AvroDataTest { @Override @@ -67,4 +78,49 @@ protected void writeAndValidate(Schema schema) throws IOException { Assert.assertFalse("Should not have extra rows", rows.hasNext()); } } + + protected List rowsFromFile(InputFile inputFile, Schema schema) throws IOException { + try (CloseableIterable reader = + Parquet.read(inputFile) + .project(schema) + .createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type)) + .build()) { + return Lists.newArrayList(reader); + } + } + + @Test + public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException { + final SparkSession spark = + SparkSession.builder() + .master("local[2]") + .config("spark.sql.parquet.int96AsTimestamp", "false") + .getOrCreate(); + + final String parquetPath = temp.getRoot().getAbsolutePath() + "/parquet_int96"; + final java.sql.Timestamp ts = java.sql.Timestamp.valueOf("2014-01-01 23:00:01"); + spark.createDataset(ImmutableList.of(ts), Encoders.TIMESTAMP()).write().parquet(parquetPath); + spark.stop(); + + // Get the single parquet file produced by spark. + List parquetOutputs = + java.nio.file.Files.find( + java.nio.file.Paths.get(parquetPath), + 1, + (path, basicFileAttributes) -> path.toString().endsWith(".parquet")) + .collect(Collectors.toList()); + Assert.assertEquals(1, parquetOutputs.size()); + + List rows = + rowsFromFile( + Files.localInput(parquetOutputs.get(0).toFile()), + new Schema(optional(1, "timestamp", Types.TimestampType.withoutZone()))); + Assert.assertEquals(1, rows.size()); + Assert.assertEquals(1, rows.get(0).numFields()); + + // Spark represents Timestamps as epoch micros and are stored as longs. + Assert.assertEquals( + ts.toInstant(), + Instant.ofEpochMilli(TimeUnit.MICROSECONDS.toMillis(rows.get(0).getLong(0)))); + } } From 944c325ec3c92530ddebae31e8f12da36ccbf9ec Mon Sep 17 00:00:00 2001 From: Gustavo Torres Date: Thu, 23 Jul 2020 14:09:21 -0700 Subject: [PATCH 3/7] Add int96 timestamp type for parquet-read support --- .../java/org/apache/iceberg/types/Types.java | 21 ++++++++++++++++--- .../iceberg/parquet/ParquetValueReaders.java | 1 + .../iceberg/parquet/TypeToMessageType.java | 6 +++++- 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java index a6f4d36b0e6e..4d3bec18d2ec 100644 --- a/api/src/main/java/org/apache/iceberg/types/Types.java +++ b/api/src/main/java/org/apache/iceberg/types/Types.java @@ -208,8 +208,9 @@ public String toString() { } public static class TimestampType extends PrimitiveType { - private static final TimestampType INSTANCE_WITH_ZONE = new TimestampType(true); - private static final TimestampType INSTANCE_WITHOUT_ZONE = new TimestampType(false); + private static final TimestampType INSTANCE_WITH_ZONE = new TimestampType(true, false); + private static final TimestampType INSTANCE_WITHOUT_ZONE = new TimestampType(false, false); + private static final TimestampType INSTANCE_SPARK_INT96 = new TimestampType(true, true); public static TimestampType withZone() { return INSTANCE_WITH_ZONE; @@ -219,16 +220,30 @@ public static TimestampType withoutZone() { return INSTANCE_WITHOUT_ZONE; } + /** + * @return Timestamp type (with timezone) represented as INT96. This is only added for compatibility reasons + * and can only be written using a Spark's ParquetWriteSupport. Writing this type should be avoided. + */ + public static TimestampType asSparkInt96() { + return INSTANCE_SPARK_INT96; + } + private final boolean adjustToUTC; + private final boolean asSparkInt96; - private TimestampType(boolean adjustToUTC) { + private TimestampType(boolean adjustToUTC, boolean asSparkInt96) { this.adjustToUTC = adjustToUTC; + this.asSparkInt96 = asSparkInt96; } public boolean shouldAdjustToUTC() { return adjustToUTC; } + public boolean shouldRepresentAsInt96() { + return asSparkInt96; + } + @Override public TypeID typeId() { return TypeID.TIMESTAMP; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index 55bf925e0581..9b675add550b 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -681,6 +681,7 @@ private Setter newSetter(ParquetValueReader reader, Type type) { return (record, pos, ignored) -> setFloat(record, pos, unboxed.readFloat()); case DOUBLE: return (record, pos, ignored) -> setDouble(record, pos, unboxed.readDouble()); + case INT96: case FIXED_LEN_BYTE_ARRAY: case BINARY: return (record, pos, ignored) -> set(record, pos, unboxed.readBinary()); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java index 9f55670eda90..59fa08279235 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java @@ -45,6 +45,7 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96; public class TypeToMessageType { public static final int DECIMAL_INT32_MAX_DIGITS = 9; @@ -136,7 +137,10 @@ public Type primitive(PrimitiveType primitive, Type.Repetition repetition, int i case TIME: return Types.primitive(INT64, repetition).as(TIME_MICROS).id(id).named(name); case TIMESTAMP: - if (((TimestampType) primitive).shouldAdjustToUTC()) { + final TimestampType timestampType = (TimestampType)primitive; + if (timestampType.shouldRepresentAsInt96()) { + return Types.primitive(INT96, repetition).id(id).named(name); + } else if (timestampType.shouldAdjustToUTC()) { return Types.primitive(INT64, repetition).as(TIMESTAMPTZ_MICROS).id(id).named(name); } else { return Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).id(id).named(name); From 0c94f883d73fa552eb33ebe620945e00710185f5 Mon Sep 17 00:00:00 2001 From: Gustavo Torres Date: Thu, 23 Jul 2020 14:10:12 -0700 Subject: [PATCH 4/7] Use spark's ParquetWriteSupport to test int96 timestamps read support --- .../spark/data/TestSparkParquetReader.java | 62 ++++++++----------- 1 file changed, 26 insertions(+), 36 deletions(-) diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java index ed4750e5463d..ebe73dd26d0c 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java @@ -22,11 +22,9 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; -import java.time.Instant; +import java.nio.file.Paths; import java.util.Iterator; import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.avro.generic.GenericData; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; @@ -34,19 +32,19 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; +import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe; -import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; public class TestSparkParquetReader extends AvroDataTest { @Override @@ -91,36 +89,28 @@ protected List rowsFromFile(InputFile inputFile, Schema schema) thr @Test public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException { - final SparkSession spark = - SparkSession.builder() - .master("local[2]") - .config("spark.sql.parquet.int96AsTimestamp", "false") - .getOrCreate(); + final Schema schema = new Schema(required(1, "ts", Types.TimestampType.asSparkInt96())); + final StructType sparkSchema = SparkSchemaUtil.convert(schema); + final Path parquetFile = Paths.get(temp.getRoot().getAbsolutePath(), "parquet_int96.parquet"); + final List rows = Lists.newArrayList(RandomData.generateSpark(schema, 10, 0L)); - final String parquetPath = temp.getRoot().getAbsolutePath() + "/parquet_int96"; - final java.sql.Timestamp ts = java.sql.Timestamp.valueOf("2014-01-01 23:00:01"); - spark.createDataset(ImmutableList.of(ts), Encoders.TIMESTAMP()).write().parquet(parquetPath); - spark.stop(); - - // Get the single parquet file produced by spark. - List parquetOutputs = - java.nio.file.Files.find( - java.nio.file.Paths.get(parquetPath), - 1, - (path, basicFileAttributes) -> path.toString().endsWith(".parquet")) - .collect(Collectors.toList()); - Assert.assertEquals(1, parquetOutputs.size()); - - List rows = - rowsFromFile( - Files.localInput(parquetOutputs.get(0).toFile()), - new Schema(optional(1, "timestamp", Types.TimestampType.withoutZone()))); - Assert.assertEquals(1, rows.size()); - Assert.assertEquals(1, rows.get(0).numFields()); + try (FileAppender writer = + Parquet.write(Files.localOutput(parquetFile.toString())) + .writeSupport( + new org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport()) + .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json()) + .set("org.apache.spark.legacyDateTime", "false") + .set("spark.sql.parquet.int96AsTimestamp", "true") + .set("spark.sql.parquet.writeLegacyFormat", "false") + .set("spark.sql.parquet.outputTimestampType", "INT96") + .schema(schema) + .build()) { + writer.addAll(rows); + } - // Spark represents Timestamps as epoch micros and are stored as longs. - Assert.assertEquals( - ts.toInstant(), - Instant.ofEpochMilli(TimeUnit.MICROSECONDS.toMillis(rows.get(0).getLong(0)))); + final List readRows = + rowsFromFile(Files.localInput(parquetFile.toString()), schema); + Assert.assertEquals(rows.size(), readRows.size()); + Assert.assertThat(readRows, CoreMatchers.is(rows)); } } From b35027be3f3056b23a546f02ea9e4a1fcc91392c Mon Sep 17 00:00:00 2001 From: Gustavo Torres Date: Thu, 23 Jul 2020 16:20:32 -0700 Subject: [PATCH 5/7] Fix style checks --- .../main/java/org/apache/iceberg/parquet/TypeToMessageType.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java index 59fa08279235..8392741071ea 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java @@ -137,7 +137,7 @@ public Type primitive(PrimitiveType primitive, Type.Repetition repetition, int i case TIME: return Types.primitive(INT64, repetition).as(TIME_MICROS).id(id).named(name); case TIMESTAMP: - final TimestampType timestampType = (TimestampType)primitive; + final TimestampType timestampType = (TimestampType) primitive; if (timestampType.shouldRepresentAsInt96()) { return Types.primitive(INT96, repetition).id(id).named(name); } else if (timestampType.shouldAdjustToUTC()) { From 7e48187c68417a6c947480022dc8a6e6945ed5ca Mon Sep 17 00:00:00 2001 From: Gustavo Torres Date: Fri, 24 Jul 2020 12:39:21 -0700 Subject: [PATCH 6/7] Rewrite spark int96 test without creating Int96 timestamp type --- .../java/org/apache/iceberg/types/Types.java | 21 +---- .../iceberg/parquet/TypeToMessageType.java | 6 +- .../spark/data/TestSparkParquetReader.java | 82 ++++++++++++++----- 3 files changed, 67 insertions(+), 42 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java index 4d3bec18d2ec..a6f4d36b0e6e 100644 --- a/api/src/main/java/org/apache/iceberg/types/Types.java +++ b/api/src/main/java/org/apache/iceberg/types/Types.java @@ -208,9 +208,8 @@ public String toString() { } public static class TimestampType extends PrimitiveType { - private static final TimestampType INSTANCE_WITH_ZONE = new TimestampType(true, false); - private static final TimestampType INSTANCE_WITHOUT_ZONE = new TimestampType(false, false); - private static final TimestampType INSTANCE_SPARK_INT96 = new TimestampType(true, true); + private static final TimestampType INSTANCE_WITH_ZONE = new TimestampType(true); + private static final TimestampType INSTANCE_WITHOUT_ZONE = new TimestampType(false); public static TimestampType withZone() { return INSTANCE_WITH_ZONE; @@ -220,30 +219,16 @@ public static TimestampType withoutZone() { return INSTANCE_WITHOUT_ZONE; } - /** - * @return Timestamp type (with timezone) represented as INT96. This is only added for compatibility reasons - * and can only be written using a Spark's ParquetWriteSupport. Writing this type should be avoided. - */ - public static TimestampType asSparkInt96() { - return INSTANCE_SPARK_INT96; - } - private final boolean adjustToUTC; - private final boolean asSparkInt96; - private TimestampType(boolean adjustToUTC, boolean asSparkInt96) { + private TimestampType(boolean adjustToUTC) { this.adjustToUTC = adjustToUTC; - this.asSparkInt96 = asSparkInt96; } public boolean shouldAdjustToUTC() { return adjustToUTC; } - public boolean shouldRepresentAsInt96() { - return asSparkInt96; - } - @Override public TypeID typeId() { return TypeID.TIMESTAMP; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java index 8392741071ea..9f55670eda90 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java @@ -45,7 +45,6 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96; public class TypeToMessageType { public static final int DECIMAL_INT32_MAX_DIGITS = 9; @@ -137,10 +136,7 @@ public Type primitive(PrimitiveType primitive, Type.Repetition repetition, int i case TIME: return Types.primitive(INT64, repetition).as(TIME_MICROS).id(id).named(name); case TIMESTAMP: - final TimestampType timestampType = (TimestampType) primitive; - if (timestampType.shouldRepresentAsInt96()) { - return Types.primitive(INT96, repetition).id(id).named(name); - } else if (timestampType.shouldAdjustToUTC()) { + if (((TimestampType) primitive).shouldAdjustToUTC()) { return Types.primitive(INT64, repetition).as(TIMESTAMPTZ_MICROS).id(id).named(name); } else { return Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).id(id).named(name); diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java index ebe73dd26d0c..d16591ed0902 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java @@ -21,22 +21,31 @@ import java.io.File; import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.Iterator; import java.util.List; +import java.util.Map; + import org.apache.avro.generic.GenericData; +import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Files; +import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.Schema; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetWriteAdapter; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.hamcrest.CoreMatchers; import org.junit.Assert; @@ -89,28 +98,63 @@ protected List rowsFromFile(InputFile inputFile, Schema schema) thr @Test public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException { - final Schema schema = new Schema(required(1, "ts", Types.TimestampType.asSparkInt96())); - final StructType sparkSchema = SparkSchemaUtil.convert(schema); - final Path parquetFile = Paths.get(temp.getRoot().getAbsolutePath(), "parquet_int96.parquet"); - final List rows = Lists.newArrayList(RandomData.generateSpark(schema, 10, 0L)); + String outputFilePath = String.format("%s/%s", temp.getRoot().getAbsolutePath(), "parquet_int96.parquet"); + HadoopOutputFile outputFile = + HadoopOutputFile.fromPath( + new org.apache.hadoop.fs.Path(outputFilePath), new Configuration()); + Schema schema = new Schema(required(1, "ts", Types.TimestampType.withZone())); + StructType sparkSchema = + new StructType( + new StructField[] { + new StructField("ts", DataTypes.TimestampType, true, Metadata.empty()) + }); + List rows = Lists.newArrayList(RandomData.generateSpark(schema, 10, 0L)); try (FileAppender writer = - Parquet.write(Files.localOutput(parquetFile.toString())) - .writeSupport( - new org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport()) - .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json()) - .set("org.apache.spark.legacyDateTime", "false") - .set("spark.sql.parquet.int96AsTimestamp", "true") - .set("spark.sql.parquet.writeLegacyFormat", "false") - .set("spark.sql.parquet.outputTimestampType", "INT96") - .schema(schema) - .build()) { + new ParquetWriteAdapter<>( + new NativeSparkWriterBuilder(outputFile) + .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json()) + .set("spark.sql.parquet.writeLegacyFormat", "false") + .set("spark.sql.parquet.outputTimestampType", "INT96") + .build(), + MetricsConfig.getDefault())) { writer.addAll(rows); } - final List readRows = - rowsFromFile(Files.localInput(parquetFile.toString()), schema); + List readRows = rowsFromFile(Files.localInput(outputFilePath), schema); Assert.assertEquals(rows.size(), readRows.size()); Assert.assertThat(readRows, CoreMatchers.is(rows)); } + + /** + * Native Spark ParquetWriter.Builder implementation so that we can write timestamps using Spark's native + * ParquetWriteSupport. + */ + private static class NativeSparkWriterBuilder + extends ParquetWriter.Builder { + private final Map config = Maps.newHashMap(); + + public NativeSparkWriterBuilder(org.apache.parquet.io.OutputFile path) { + super(path); + } + + public NativeSparkWriterBuilder set(String property, String value) { + this.config.put(property, value); + return self(); + } + + @Override + protected NativeSparkWriterBuilder self() { + return this; + } + + @Override + protected WriteSupport getWriteSupport(Configuration configuration) { + for (Map.Entry entry : config.entrySet()) { + configuration.set(entry.getKey(), entry.getValue()); + } + + return new org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport(); + } + } } From 68dc4c5e091ea610011f3264f611615f73c1b05d Mon Sep 17 00:00:00 2001 From: Gustavo Torres Date: Fri, 24 Jul 2020 13:13:06 -0700 Subject: [PATCH 7/7] Test checkstyle fixes --- .../apache/iceberg/spark/data/TestSparkParquetReader.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java index d16591ed0902..642a8de9322e 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java @@ -24,7 +24,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; - import org.apache.avro.generic.GenericData; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Files; @@ -106,7 +105,7 @@ public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOExceptio StructType sparkSchema = new StructType( new StructField[] { - new StructField("ts", DataTypes.TimestampType, true, Metadata.empty()) + new StructField("ts", DataTypes.TimestampType, true, Metadata.empty()) }); List rows = Lists.newArrayList(RandomData.generateSpark(schema, 10, 0L)); @@ -134,7 +133,7 @@ private static class NativeSparkWriterBuilder extends ParquetWriter.Builder { private final Map config = Maps.newHashMap(); - public NativeSparkWriterBuilder(org.apache.parquet.io.OutputFile path) { + NativeSparkWriterBuilder(org.apache.parquet.io.OutputFile path) { super(path); }