From 8b545c2fdcc09614210464931bf732a04f8a2946 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 25 Apr 2020 03:24:30 +0800 Subject: [PATCH] Support timestamp without zone in GenericParquetWriter --- .../apache/iceberg/data/parquet/GenericParquetWriter.java | 5 ++++- data/src/test/java/org/apache/iceberg/data/DataTest.java | 5 +++++ .../org/apache/iceberg/parquet/TypeToMessageType.java | 8 ++++++-- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java index d20db91af8c4..196ff9220832 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java @@ -38,6 +38,7 @@ import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.DecimalMetadata; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -133,7 +134,9 @@ public ParquetValueWriter primitive(PrimitiveType primitive) { case TIME_MICROS: return new TimeWriter(desc); case TIMESTAMP_MICROS: - return new TimestamptzWriter(desc); + boolean withZone = ((TimestampLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation()) + .isAdjustedToUTC(); + return withZone ? new TimestamptzWriter(desc) : new TimestampWriter(desc); case DECIMAL: DecimalMetadata decimal = primitive.getDecimalMetadata(); switch (primitive.getPrimitiveTypeName()) { diff --git a/data/src/test/java/org/apache/iceberg/data/DataTest.java b/data/src/test/java/org/apache/iceberg/data/DataTest.java index abad3c542cb5..66b2d96ea9b5 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataTest.java +++ b/data/src/test/java/org/apache/iceberg/data/DataTest.java @@ -61,6 +61,11 @@ public abstract class DataTest { @Rule public TemporaryFolder temp = new TemporaryFolder(); + @Test + public void testTimestampWithoutZone() throws IOException { + writeAndValidate(new Schema(required(0, "timestamp type without zone", Types.TimestampType.withoutZone()))); + } + @Test public void testSimpleStruct() throws IOException { writeAndValidate(new Schema(SUPPORTED_PRIMITIVES.fields())); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java index e4ffb75a19d0..6b1e3e9f4968 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java @@ -30,14 +30,15 @@ import org.apache.iceberg.types.Types.MapType; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimestampType; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; import static org.apache.parquet.schema.OriginalType.DATE; import static org.apache.parquet.schema.OriginalType.DECIMAL; -import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS; import static org.apache.parquet.schema.OriginalType.TIME_MICROS; import static org.apache.parquet.schema.OriginalType.UTF8; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; @@ -130,7 +131,10 @@ public Type primitive(PrimitiveType primitive, Type.Repetition repetition, int i case TIME: return Types.primitive(INT64, repetition).as(TIME_MICROS).id(id).named(name); case TIMESTAMP: - return Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).id(id).named(name); + boolean shouldAdjustToUTC = ((TimestampType) primitive).shouldAdjustToUTC(); + LogicalTypeAnnotation.TimeUnit timeUnit = LogicalTypeAnnotation.TimeUnit.MICROS; + return Types.primitive(INT64, repetition).as(LogicalTypeAnnotation.timestampType(shouldAdjustToUTC, timeUnit)) + .id(id).named(name); case STRING: return Types.primitive(BINARY, repetition).as(UTF8).id(id).named(name); case BINARY: