diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java index a675a557580d..87854a7738cb 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java @@ -28,16 +28,19 @@ import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; import java.util.List; +import java.util.Optional; import org.apache.iceberg.data.Record; import org.apache.iceberg.parquet.ParquetTypeVisitor; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; import org.apache.iceberg.parquet.ParquetValueWriters.PrimitiveWriter; import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter; +import org.apache.parquet.Preconditions; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.io.api.Binary; -import org.apache.parquet.schema.DecimalMetadata; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -116,44 +119,11 @@ public ParquetValueWriter map(GroupType map, @Override public ParquetValueWriter primitive(PrimitiveType primitive) { ColumnDescriptor desc = type.getColumnDescription(currentPath()); - - if (primitive.getOriginalType() != null) { - switch (primitive.getOriginalType()) { - case ENUM: - case JSON: - case UTF8: - return ParquetValueWriters.strings(desc); - case INT_8: - case INT_16: - case INT_32: - return ParquetValueWriters.ints(desc); - case INT_64: - return ParquetValueWriters.longs(desc); - case DATE: - return new DateWriter(desc); - case TIME_MICROS: - return new TimeWriter(desc); - case TIMESTAMP_MICROS: - return new TimestamptzWriter(desc); - case DECIMAL: - DecimalMetadata decimal = primitive.getDecimalMetadata(); - switch (primitive.getPrimitiveTypeName()) { - case INT32: - return ParquetValueWriters.decimalAsInteger(desc, decimal.getPrecision(), decimal.getScale()); - case INT64: - return ParquetValueWriters.decimalAsLong(desc, decimal.getPrecision(), decimal.getScale()); - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - return ParquetValueWriters.decimalAsFixed(desc, decimal.getPrecision(), decimal.getScale()); - default: - throw new UnsupportedOperationException( - "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName()); - } - case BSON: - return ParquetValueWriters.byteBuffers(desc); - default: - throw new UnsupportedOperationException( - "Unsupported logical type: " + primitive.getOriginalType()); + LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation(); + if (logicalType != null) { + Optional> writer = logicalType.accept(new LogicalTypeWriterVisitor(desc)); + if (writer.isPresent()) { + return writer.get(); } } @@ -178,6 +148,83 @@ public ParquetValueWriter primitive(PrimitiveType primitive) { } } + private static class LogicalTypeWriterVisitor implements LogicalTypeAnnotationVisitor> { + private final ColumnDescriptor desc; + + private LogicalTypeWriterVisitor(ColumnDescriptor desc) { + this.desc = desc; + } + + @Override + public Optional> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) { + switch (desc.getPrimitiveType().getPrimitiveTypeName()) { + case INT32: + return Optional.of(ParquetValueWriters.decimalAsInteger( + desc, decimalType.getPrecision(), decimalType.getScale())); + case INT64: + return Optional.of(ParquetValueWriters.decimalAsLong( + desc, decimalType.getPrecision(), decimalType.getScale())); + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + return Optional.of(ParquetValueWriters.decimalAsFixed( + desc, decimalType.getPrecision(), decimalType.getScale())); + } + return Optional.empty(); + } + + @Override + public Optional> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) { + return Optional.of(new DateWriter(desc)); + } + + @Override + public Optional> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) { + return Optional.of(new TimeWriter(desc)); + } + + @Override + public Optional> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) { + Preconditions.checkArgument(LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()), + "Cannot write timestamp in %s, only MICROS is supported", timestampType.getUnit()); + if (timestampType.isAdjustedToUTC()) { + return Optional.of(new TimestamptzWriter(desc)); + } else { + return Optional.of(new TimestampWriter(desc)); + } + } + + @Override + public Optional> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) { + Preconditions.checkArgument(intType.isSigned() || intType.getBitWidth() < 64, + "Cannot read uint64: not a supported Java type"); + if (intType.getBitWidth() < 64) { + return Optional.of(ParquetValueWriters.ints(desc)); + } else { + return Optional.of(ParquetValueWriters.longs(desc)); + } + } + + @Override + public Optional> visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) { + return Optional.of(ParquetValueWriters.byteBuffers(desc)); + } + } + private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); diff --git a/data/src/test/java/org/apache/iceberg/data/DataTest.java b/data/src/test/java/org/apache/iceberg/data/DataTest.java index abad3c542cb5..5566a75309d4 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataTest.java +++ b/data/src/test/java/org/apache/iceberg/data/DataTest.java @@ -48,7 +48,8 @@ public abstract class DataTest { optional(105, "f", Types.FloatType.get()), required(106, "d", Types.DoubleType.get()), optional(107, "date", Types.DateType.get()), - required(108, "ts", Types.TimestampType.withZone()), + required(108, "ts_tz", Types.TimestampType.withZone()), + required(109, "ts", Types.TimestampType.withoutZone()), required(110, "s", Types.StringType.get()), required(112, "fixed", Types.FixedType.ofLength(7)), optional(113, "bytes", Types.BinaryType.get()), diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java b/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java index ffdbc17218d7..6be5e1578595 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java @@ -25,12 +25,13 @@ import com.google.common.collect.Maps; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; -import org.apache.parquet.schema.DecimalMetadata; +import org.apache.iceberg.types.Types.TimestampType; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type.Repetition; @@ -132,39 +133,16 @@ public Type map(GroupType map, Type keyType, Type valueType) { @Override public Type primitive(PrimitiveType primitive) { - OriginalType annotation = primitive.getOriginalType(); - if (annotation != null) { - switch (annotation) { - case INT_8: - case UINT_8: - case INT_16: - case UINT_16: - case INT_32: - return Types.IntegerType.get(); - case INT_64: - return Types.LongType.get(); - case DATE: - return Types.DateType.get(); - case TIME_MILLIS: - case TIME_MICROS: - return Types.TimeType.get(); - case TIMESTAMP_MILLIS: - case TIMESTAMP_MICROS: - return Types.TimestampType.withZone(); - case JSON: - case BSON: - case ENUM: - case UTF8: - return Types.StringType.get(); - case DECIMAL: - DecimalMetadata decimal = primitive.getDecimalMetadata(); - return Types.DecimalType.of( - decimal.getPrecision(), decimal.getScale()); - default: - throw new UnsupportedOperationException("Unsupported logical type: " + annotation); + // first, use the logical type annotation, if present + LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation(); + if (logicalType != null) { + Optional converted = logicalType.accept(ParquetLogicalTypeVisitor.get()); + if (converted.isPresent()) { + return converted.get(); } } + // last, use the primitive type switch (primitive.getPrimitiveTypeName()) { case BOOLEAN: return Types.BooleanType.get(); @@ -186,6 +164,67 @@ public Type primitive(PrimitiveType primitive) { "Cannot convert unknown primitive type: " + primitive); } + private static class ParquetLogicalTypeVisitor implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor { + private static final ParquetLogicalTypeVisitor INSTANCE = new ParquetLogicalTypeVisitor(); + + private static ParquetLogicalTypeVisitor get() { + return INSTANCE; + } + + @Override + public Optional visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) { + return Optional.of(Types.StringType.get()); + } + + @Override + public Optional visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) { + return Optional.of(Types.StringType.get()); + } + + @Override + public Optional visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) { + return Optional.of(Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale())); + } + + @Override + public Optional visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) { + return Optional.of(Types.DateType.get()); + } + + @Override + public Optional visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) { + return Optional.of(Types.TimeType.get()); + } + + @Override + public Optional visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) { + return Optional.of(timestampType.isAdjustedToUTC() ? TimestampType.withZone() : TimestampType.withoutZone()); + } + + @Override + public Optional visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) { + Preconditions.checkArgument(intType.isSigned() || intType.getBitWidth() < 64, + "Cannot use uint64: not a supported Java type"); + if (intType.getBitWidth() < 32) { + return Optional.of(Types.IntegerType.get()); + } else if (intType.getBitWidth() == 32 && intType.isSigned()) { + return Optional.of(Types.IntegerType.get()); + } else { + return Optional.of(Types.LongType.get()); + } + } + + @Override + public Optional visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonType) { + return Optional.of(Types.StringType.get()); + } + + @Override + public Optional visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) { + return Optional.of(Types.BinaryType.get()); + } + } + private void addAlias(String name, int fieldId) { String fullName = name; if (!fieldNames.isEmpty()) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index 18d6ad6ac62f..27487ac52c70 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -45,24 +45,13 @@ class ParquetWriter implements FileAppender, Closeable { - // We have one for Parquet 1.10 and one for 1.11. The signature changed, but we still want - // to be compatible with both of them private static DynConstructors.Ctor pageStoreCtorParquet = DynConstructors .builder(PageWriteStore.class) - - // Parquet 1.11 .hiddenImpl("org.apache.parquet.hadoop.ColumnChunkPageWriteStore", CodecFactory.BytesCompressor.class, MessageType.class, ByteBufferAllocator.class, int.class) - - // Parquet 1.10 - .hiddenImpl("org.apache.parquet.hadoop.ColumnChunkPageWriteStore", - CodecFactory.BytesCompressor.class, - MessageType.class, - ByteBufferAllocator.class) - .build(); private static final DynMethods.UnboundMethod flushToWriter = DynMethods @@ -86,7 +75,6 @@ class ParquetWriter implements FileAppender, Closeable { private long recordCount = 0; private long nextCheckRecordCount = 10; - // Copied from Parquet 1.11.0 to keep support for 1.10.0 private static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length"; private static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java index e4ffb75a19d0..9f55670eda90 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java @@ -30,16 +30,14 @@ import org.apache.iceberg.types.Types.MapType; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimestampType; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; -import static org.apache.parquet.schema.OriginalType.DATE; -import static org.apache.parquet.schema.OriginalType.DECIMAL; -import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS; -import static org.apache.parquet.schema.OriginalType.TIME_MICROS; -import static org.apache.parquet.schema.OriginalType.UTF8; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; @@ -51,6 +49,14 @@ public class TypeToMessageType { public static final int DECIMAL_INT32_MAX_DIGITS = 9; public static final int DECIMAL_INT64_MAX_DIGITS = 18; + private static final LogicalTypeAnnotation STRING = LogicalTypeAnnotation.stringType(); + private static final LogicalTypeAnnotation DATE = LogicalTypeAnnotation.dateType(); + private static final LogicalTypeAnnotation TIME_MICROS = LogicalTypeAnnotation + .timeType(false /* not adjusted to UTC */, TimeUnit.MICROS); + private static final LogicalTypeAnnotation TIMESTAMP_MICROS = LogicalTypeAnnotation + .timestampType(false /* not adjusted to UTC */, TimeUnit.MICROS); + private static final LogicalTypeAnnotation TIMESTAMPTZ_MICROS = LogicalTypeAnnotation + .timestampType(true /* adjusted to UTC */, TimeUnit.MICROS); public MessageType convert(Schema schema, String name) { Types.MessageTypeBuilder builder = Types.buildMessage(); @@ -130,9 +136,13 @@ public Type primitive(PrimitiveType primitive, Type.Repetition repetition, int i case TIME: return Types.primitive(INT64, repetition).as(TIME_MICROS).id(id).named(name); case TIMESTAMP: - return Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).id(id).named(name); + if (((TimestampType) primitive).shouldAdjustToUTC()) { + return Types.primitive(INT64, repetition).as(TIMESTAMPTZ_MICROS).id(id).named(name); + } else { + return Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).id(id).named(name); + } case STRING: - return Types.primitive(BINARY, repetition).as(UTF8).id(id).named(name); + return Types.primitive(BINARY, repetition).as(STRING).id(id).named(name); case BINARY: return Types.primitive(BINARY, repetition).id(id).named(name); case FIXED: @@ -148,18 +158,14 @@ public Type primitive(PrimitiveType primitive, Type.Repetition repetition, int i if (decimal.precision() <= DECIMAL_INT32_MAX_DIGITS) { // store as an int return Types.primitive(INT32, repetition) - .as(DECIMAL) - .precision(decimal.precision()) - .scale(decimal.scale()) + .as(decimalAnnotation(decimal.precision(), decimal.scale())) .id(id) .named(name); } else if (decimal.precision() <= DECIMAL_INT64_MAX_DIGITS) { // store as a long return Types.primitive(INT64, repetition) - .as(DECIMAL) - .precision(decimal.precision()) - .scale(decimal.scale()) + .as(decimalAnnotation(decimal.precision(), decimal.scale())) .id(id) .named(name); @@ -167,9 +173,7 @@ public Type primitive(PrimitiveType primitive, Type.Repetition repetition, int i // store as a fixed-length array int minLength = TypeUtil.decimalRequiredBytes(decimal.precision()); return Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition).length(minLength) - .as(DECIMAL) - .precision(decimal.precision()) - .scale(decimal.scale()) + .as(decimalAnnotation(decimal.precision(), decimal.scale())) .id(id) .named(name); } @@ -181,4 +185,8 @@ public Type primitive(PrimitiveType primitive, Type.Repetition repetition, int i throw new UnsupportedOperationException("Unsupported type for Parquet: " + primitive); } } + + private static LogicalTypeAnnotation decimalAnnotation(int precision, int scale) { + return LogicalTypeAnnotation.decimalType(scale, precision); + } } diff --git a/versions.props b/versions.props index 753653a54319..b782d0509ee8 100644 --- a/versions.props +++ b/versions.props @@ -4,7 +4,7 @@ org.apache.avro:avro = 1.9.2 org.apache.hadoop:* = 2.7.3 org.apache.hive:hive-metastore = 2.3.6 org.apache.orc:* = 1.6.3 -org.apache.parquet:parquet-avro = 1.11.0 +org.apache.parquet:* = 1.11.0 org.apache.spark:spark-hive_2.11 = 2.4.5 org.apache.spark:spark-avro_2.11 = 2.4.5 org.apache.pig:pig = 0.14.0