Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Optional;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.parquet.ParquetTypeVisitor;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
import org.apache.iceberg.parquet.ParquetValueWriters.PrimitiveWriter;
import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter;
import org.apache.parquet.Preconditions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
Expand Down Expand Up @@ -116,44 +119,11 @@ public ParquetValueWriter<?> map(GroupType map,
@Override
public ParquetValueWriter<?> primitive(PrimitiveType primitive) {
ColumnDescriptor desc = type.getColumnDescription(currentPath());

if (primitive.getOriginalType() != null) {
switch (primitive.getOriginalType()) {
case ENUM:
case JSON:
case UTF8:
return ParquetValueWriters.strings(desc);
case INT_8:
case INT_16:
case INT_32:
return ParquetValueWriters.ints(desc);
case INT_64:
return ParquetValueWriters.longs(desc);
case DATE:
return new DateWriter(desc);
case TIME_MICROS:
return new TimeWriter(desc);
case TIMESTAMP_MICROS:
return new TimestamptzWriter(desc);
case DECIMAL:
DecimalMetadata decimal = primitive.getDecimalMetadata();
switch (primitive.getPrimitiveTypeName()) {
case INT32:
return ParquetValueWriters.decimalAsInteger(desc, decimal.getPrecision(), decimal.getScale());
case INT64:
return ParquetValueWriters.decimalAsLong(desc, decimal.getPrecision(), decimal.getScale());
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return ParquetValueWriters.decimalAsFixed(desc, decimal.getPrecision(), decimal.getScale());
default:
throw new UnsupportedOperationException(
"Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
}
case BSON:
return ParquetValueWriters.byteBuffers(desc);
default:
throw new UnsupportedOperationException(
"Unsupported logical type: " + primitive.getOriginalType());
LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation();
if (logicalType != null) {
Optional<PrimitiveWriter<?>> writer = logicalType.accept(new LogicalTypeWriterVisitor(desc));
if (writer.isPresent()) {
return writer.get();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When writer.isPresent() == false, we falls back to the previous logic, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.

}

Expand All @@ -178,6 +148,83 @@ public ParquetValueWriter<?> primitive(PrimitiveType primitive) {
}
}

private static class LogicalTypeWriterVisitor implements LogicalTypeAnnotationVisitor<PrimitiveWriter<?>> {
private final ColumnDescriptor desc;

private LogicalTypeWriterVisitor(ColumnDescriptor desc) {
this.desc = desc;
}

@Override
public Optional<PrimitiveWriter<?>> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) {
return Optional.of(ParquetValueWriters.strings(desc));
}

@Override
public Optional<PrimitiveWriter<?>> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) {
return Optional.of(ParquetValueWriters.strings(desc));
}

@Override
public Optional<PrimitiveWriter<?>> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) {
switch (desc.getPrimitiveType().getPrimitiveTypeName()) {
case INT32:
return Optional.of(ParquetValueWriters.decimalAsInteger(
desc, decimalType.getPrecision(), decimalType.getScale()));
case INT64:
return Optional.of(ParquetValueWriters.decimalAsLong(
desc, decimalType.getPrecision(), decimalType.getScale()));
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return Optional.of(ParquetValueWriters.decimalAsFixed(
desc, decimalType.getPrecision(), decimalType.getScale()));
}
return Optional.empty();
}

@Override
public Optional<PrimitiveWriter<?>> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) {
return Optional.of(new DateWriter(desc));
}

@Override
public Optional<PrimitiveWriter<?>> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) {
return Optional.of(new TimeWriter(desc));
}

@Override
public Optional<PrimitiveWriter<?>> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) {
Preconditions.checkArgument(LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()),
"Cannot write timestamp in %s, only MICROS is supported", timestampType.getUnit());
if (timestampType.isAdjustedToUTC()) {
return Optional.of(new TimestamptzWriter(desc));
} else {
return Optional.of(new TimestampWriter(desc));
}
}

@Override
public Optional<PrimitiveWriter<?>> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) {
Preconditions.checkArgument(intType.isSigned() || intType.getBitWidth() < 64,
"Cannot read uint64: not a supported Java type");
if (intType.getBitWidth() < 64) {
return Optional.of(ParquetValueWriters.ints(desc));
} else {
return Optional.of(ParquetValueWriters.longs(desc));
}
}

@Override
public Optional<PrimitiveWriter<?>> visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Parquet, LogicalTypeAnnotation.java, there is LogicalTypeAnnotation.IntLogicalTypeAnnotation. Do we need it here too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Methods that aren't implemented here use the superclass implementation, which returns Optional.empty. When that happens, we fall back to the old logic that uses just the primitive type.

return Optional.of(ParquetValueWriters.strings(desc));
}

@Override
public Optional<PrimitiveWriter<?>> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) {
return Optional.of(ParquetValueWriters.byteBuffers(desc));
}
}

private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

Expand Down
3 changes: 2 additions & 1 deletion data/src/test/java/org/apache/iceberg/data/DataTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public abstract class DataTest {
optional(105, "f", Types.FloatType.get()),
required(106, "d", Types.DoubleType.get()),
optional(107, "date", Types.DateType.get()),
required(108, "ts", Types.TimestampType.withZone()),
required(108, "ts_tz", Types.TimestampType.withZone()),
required(109, "ts", Types.TimestampType.withoutZone()),
required(110, "s", Types.StringType.get()),
required(112, "fixed", Types.FixedType.ofLength(7)),
optional(113, "bytes", Types.BinaryType.get()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.iceberg.types.Types.TimestampType;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type.Repetition;

Expand Down Expand Up @@ -132,39 +133,16 @@ public Type map(GroupType map, Type keyType, Type valueType) {

@Override
public Type primitive(PrimitiveType primitive) {
OriginalType annotation = primitive.getOriginalType();
if (annotation != null) {
switch (annotation) {
case INT_8:
case UINT_8:
case INT_16:
case UINT_16:
case INT_32:
return Types.IntegerType.get();
case INT_64:
return Types.LongType.get();
case DATE:
return Types.DateType.get();
case TIME_MILLIS:
case TIME_MICROS:
return Types.TimeType.get();
case TIMESTAMP_MILLIS:
case TIMESTAMP_MICROS:
return Types.TimestampType.withZone();
case JSON:
case BSON:
case ENUM:
case UTF8:
return Types.StringType.get();
case DECIMAL:
DecimalMetadata decimal = primitive.getDecimalMetadata();
return Types.DecimalType.of(
decimal.getPrecision(), decimal.getScale());
default:
throw new UnsupportedOperationException("Unsupported logical type: " + annotation);
// first, use the logical type annotation, if present
LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation();
if (logicalType != null) {
Optional<Type> converted = logicalType.accept(ParquetLogicalTypeVisitor.get());
if (converted.isPresent()) {
return converted.get();
}
}

// last, use the primitive type
switch (primitive.getPrimitiveTypeName()) {
case BOOLEAN:
return Types.BooleanType.get();
Expand All @@ -186,6 +164,67 @@ public Type primitive(PrimitiveType primitive) {
"Cannot convert unknown primitive type: " + primitive);
}

private static class ParquetLogicalTypeVisitor implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<Type> {
private static final ParquetLogicalTypeVisitor INSTANCE = new ParquetLogicalTypeVisitor();

private static ParquetLogicalTypeVisitor get() {
return INSTANCE;
}

@Override
public Optional<Type> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) {
return Optional.of(Types.StringType.get());
}

@Override
public Optional<Type> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) {
return Optional.of(Types.StringType.get());
}

@Override
public Optional<Type> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) {
return Optional.of(Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale()));
}

@Override
public Optional<Type> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) {
return Optional.of(Types.DateType.get());
}

@Override
public Optional<Type> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) {
return Optional.of(Types.TimeType.get());
}

@Override
public Optional<Type> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) {
return Optional.of(timestampType.isAdjustedToUTC() ? TimestampType.withZone() : TimestampType.withoutZone());
}

@Override
public Optional<Type> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) {
Preconditions.checkArgument(intType.isSigned() || intType.getBitWidth() < 64,
"Cannot use uint64: not a supported Java type");
if (intType.getBitWidth() < 32) {
return Optional.of(Types.IntegerType.get());
} else if (intType.getBitWidth() == 32 && intType.isSigned()) {
return Optional.of(Types.IntegerType.get());
} else {
return Optional.of(Types.LongType.get());
}
}

@Override
public Optional<Type> visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonType) {
return Optional.of(Types.StringType.get());
}

@Override
public Optional<Type> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) {
return Optional.of(Types.BinaryType.get());
}
}

private void addAlias(String name, int fieldId) {
String fullName = name;
if (!fieldNames.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,13 @@

class ParquetWriter<T> implements FileAppender<T>, Closeable {

// We have one for Parquet 1.10 and one for 1.11. The signature changed, but we still want
// to be compatible with both of them
private static DynConstructors.Ctor<PageWriteStore> pageStoreCtorParquet = DynConstructors
.builder(PageWriteStore.class)

// Parquet 1.11
.hiddenImpl("org.apache.parquet.hadoop.ColumnChunkPageWriteStore",
CodecFactory.BytesCompressor.class,
MessageType.class,
ByteBufferAllocator.class,
int.class)

// Parquet 1.10
.hiddenImpl("org.apache.parquet.hadoop.ColumnChunkPageWriteStore",
CodecFactory.BytesCompressor.class,
MessageType.class,
ByteBufferAllocator.class)

.build();

private static final DynMethods.UnboundMethod flushToWriter = DynMethods
Expand All @@ -86,7 +75,6 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private long recordCount = 0;
private long nextCheckRecordCount = 10;

// Copied from Parquet 1.11.0 to keep support for 1.10.0
private static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
private static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;

Expand Down
Loading