Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.PrimitiveColumnIO;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;

import javax.annotation.Nullable;
Expand All @@ -34,7 +34,6 @@
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.parquet.schema.OriginalType.DECIMAL;
import static org.apache.parquet.schema.Type.Repetition.REPEATED;

public final class ParquetTypeUtils
Expand Down Expand Up @@ -80,7 +79,7 @@ public static ColumnIO getArrayElementColumn(ColumnIO columnIO)
* }
*/
if (columnIO instanceof GroupColumnIO &&
columnIO.getType().getOriginalType() == null &&
columnIO.getType().getLogicalTypeAnnotation() == null &&
((GroupColumnIO) columnIO).getChildrenCount() == 1 &&
!columnIO.getName().equals("array") &&
!columnIO.getName().equals(columnIO.getParent().getName() + "_tuple")) {
Expand Down Expand Up @@ -221,11 +220,11 @@ public static ColumnIO lookupColumnById(GroupColumnIO groupColumnIO, int columnI

public static Optional<DecimalType> createDecimalType(RichColumnDescriptor descriptor)
{
if (descriptor.getPrimitiveType().getOriginalType() != DECIMAL) {
if (!(descriptor.getPrimitiveType().getLogicalTypeAnnotation() instanceof DecimalLogicalTypeAnnotation)) {
return Optional.empty();
}
DecimalMetadata decimalMetadata = descriptor.getPrimitiveType().getDecimalMetadata();
return Optional.of(DecimalType.createDecimalType(decimalMetadata.getPrecision(), decimalMetadata.getScale()));
DecimalLogicalTypeAnnotation decimalLogicalType = (DecimalLogicalTypeAnnotation) descriptor.getPrimitiveType().getLogicalTypeAnnotation();
return Optional.of(DecimalType.createDecimalType(decimalLogicalType.getPrecision(), decimalLogicalType.getScale()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import org.apache.parquet.internal.filter2.columnindex.RowRanges;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.joda.time.DateTimeZone;

Expand Down Expand Up @@ -105,13 +105,16 @@ public static PrimitiveColumnReader createReader(RichColumnDescriptor descriptor
case INT32:
return createDecimalColumnReader(descriptor).orElse(new IntColumnReader(descriptor));
case INT64:
if (descriptor.getPrimitiveType().getOriginalType() == OriginalType.TIME_MICROS) {
if (descriptor.getPrimitiveType().getLogicalTypeAnnotation() instanceof TimeLogicalTypeAnnotation &&
((TimeLogicalTypeAnnotation) descriptor.getPrimitiveType().getLogicalTypeAnnotation()).getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) {
return new TimeMicrosColumnReader(descriptor);
}
if (descriptor.getPrimitiveType().getOriginalType() == OriginalType.TIMESTAMP_MICROS) {
if (descriptor.getPrimitiveType().getLogicalTypeAnnotation() instanceof TimestampLogicalTypeAnnotation &&
((TimestampLogicalTypeAnnotation) descriptor.getPrimitiveType().getLogicalTypeAnnotation()).getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) {
return new TimestampMicrosColumnReader(descriptor);
}
if (descriptor.getPrimitiveType().getOriginalType() == OriginalType.TIMESTAMP_MILLIS) {
if (descriptor.getPrimitiveType().getLogicalTypeAnnotation() instanceof TimestampLogicalTypeAnnotation &&
((TimestampLogicalTypeAnnotation) descriptor.getPrimitiveType().getLogicalTypeAnnotation()).getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) {
return new Int64TimestampMillisColumnReader(descriptor);
}
if (descriptor.getPrimitiveType().getLogicalTypeAnnotation() instanceof TimestampLogicalTypeAnnotation &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type.Repetition;
import org.apache.parquet.schema.Types;
Expand Down Expand Up @@ -143,7 +142,7 @@ private org.apache.parquet.schema.Type getPrimitiveType(Type type, String name,
.named(name);
}
if (DATE.equals(type)) {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition).as(OriginalType.DATE).named(name);
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition).as(LogicalTypeAnnotation.dateType()).named(name);
}
if (BIGINT.equals(type)) {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition).named(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,15 @@

import com.google.common.collect.Lists;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

import java.util.LinkedList;
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.parquet.schema.OriginalType.LIST;
import static org.apache.parquet.schema.OriginalType.MAP;
import static org.apache.parquet.schema.Type.Repetition.REPEATED;

// Code from iceberg
Expand All @@ -44,8 +42,8 @@ else if (type.isPrimitive()) {
else {
// if not a primitive, the typeId must be a group
GroupType group = type.asGroupType();
OriginalType annotation = group.getOriginalType();
if (annotation == LIST) {
LogicalTypeAnnotation annotation = group.getLogicalTypeAnnotation();
if (LogicalTypeAnnotation.listType().equals(annotation)) {
checkArgument(!group.isRepetition(REPEATED),
"Invalid list: top-level group is repeated: " + group);
checkArgument(group.getFieldCount() == 1,
Expand All @@ -70,7 +68,7 @@ else if (type.isPrimitive()) {
visitor.fieldNames.pop();
}
}
else if (annotation == MAP) {
else if (LogicalTypeAnnotation.mapType().equals(annotation)) {
checkArgument(!group.isRepetition(REPEATED),
"Invalid map: top-level group is repeated: " + group);
checkArgument(group.getFieldCount() == 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.joda.time.DateTimeZone;

Expand Down Expand Up @@ -215,7 +215,7 @@ private static PrimitiveValueWriter getValueWriter(ValuesWriter valuesWriter, Ty
return new DateValueWriter(valuesWriter, parquetType);
}
if (TIME_MICROS.equals(type)) {
verifyParquetType(type, parquetType, OriginalType.TIME_MICROS);
verifyParquetType(type, parquetType, TimeLogicalTypeAnnotation.class, isTime(LogicalTypeAnnotation.TimeUnit.MICROS));
return new TimeMicrosValueWriter(valuesWriter, parquetType);
}
if (type instanceof TimestampType) {
Expand All @@ -224,28 +224,23 @@ private static PrimitiveValueWriter getValueWriter(ValuesWriter valuesWriter, Ty
return new Int96TimestampValueWriter(valuesWriter, type, parquetType, parquetTimeZone.get());
}
if (TIMESTAMP_MILLIS.equals(type)) {
verifyParquetType(type, parquetType, OriginalType.TIMESTAMP_MILLIS);
verifyParquetType(type, parquetType, TimestampLogicalTypeAnnotation.class, isTimestamp(LogicalTypeAnnotation.TimeUnit.MILLIS));
return new TimestampMillisValueWriter(valuesWriter, type, parquetType);
}
if (TIMESTAMP_MICROS.equals(type)) {
verifyParquetType(type, parquetType, OriginalType.TIMESTAMP_MICROS);
verifyParquetType(type, parquetType, TimestampLogicalTypeAnnotation.class, isTimestamp(LogicalTypeAnnotation.TimeUnit.MICROS));
return new BigintValueWriter(valuesWriter, type, parquetType);
}
if (TIMESTAMP_NANOS.equals(type)) {
verifyParquetType(type, parquetType, (OriginalType) null); // no OriginalType for timestamp NANOS
verifyParquetType(type, parquetType, TimestampLogicalTypeAnnotation.class, isTimestamp(LogicalTypeAnnotation.TimeUnit.NANOS));
return new TimestampNanosValueWriter(valuesWriter, type, parquetType);
}
}

if (TIMESTAMP_TZ_MILLIS.equals(type)) {
verifyParquetType(type, parquetType, OriginalType.TIMESTAMP_MILLIS);
return new TimestampTzMillisValueWriter(valuesWriter, parquetType);
}
if (TIMESTAMP_TZ_MICROS.equals(type)) {
verifyParquetType(type, parquetType, OriginalType.TIMESTAMP_MICROS);
return new TimestampTzMicrosValueWriter(valuesWriter, parquetType);
}
if (DOUBLE.equals(type)) {
Expand All @@ -264,11 +259,6 @@ private static PrimitiveValueWriter getValueWriter(ValuesWriter valuesWriter, Ty
throw new TrinoException(NOT_SUPPORTED, format("Unsupported type for Parquet writer: %s", type));
}

private static void verifyParquetType(Type type, PrimitiveType parquetType, OriginalType originalType)
{
checkArgument(parquetType.getOriginalType() == originalType, "Wrong Parquet type '%s' for Trino type '%s'", parquetType, type);
}

private static <T> void verifyParquetType(Type type, PrimitiveType parquetType, Class<T> annotationType, Predicate<T> predicate)
{
checkArgument(
Expand All @@ -277,6 +267,14 @@ private static <T> void verifyParquetType(Type type, PrimitiveType parquetType,
"Wrong Parquet type '%s' for Trino type '%s'", parquetType, type);
}

private static Predicate<TimeLogicalTypeAnnotation> isTime(LogicalTypeAnnotation.TimeUnit precision)
{
requireNonNull(precision, "precision is null");
return annotation -> annotation.getUnit() == precision &&
// isAdjustedToUTC=false indicates Local semantics (timestamps not normalized to UTC)
!annotation.isAdjustedToUTC();
}

private static Predicate<TimestampLogicalTypeAnnotation> isTimestamp(LogicalTypeAnnotation.TimeUnit precision)
{
requireNonNull(precision, "precision is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -51,9 +52,6 @@
import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
import static org.apache.parquet.schema.OriginalType.DECIMAL;
import static org.apache.parquet.schema.OriginalType.UINT_8;
import static org.apache.parquet.schema.OriginalType.UTF8;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
Expand Down Expand Up @@ -265,7 +263,7 @@ public boolean inverseCanDrop(org.apache.parquet.filter2.predicate.Statistics<Lo
@Test
public void testBuildBinaryDecimal()
{
PrimitiveType type = Types.required(BINARY).as(DECIMAL).precision(12).scale(2).named("test_binary_decimal");
PrimitiveType type = Types.required(BINARY).as(LogicalTypeAnnotation.decimalType(2, 12)).named("test_binary_decimal");
ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
//assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
assertNull(builder.build());
Expand Down Expand Up @@ -409,7 +407,7 @@ public void testBuildBinaryDecimal()
@Test
public void testBuildBinaryUtf8()
{
PrimitiveType type = Types.required(BINARY).as(UTF8).named("test_binary_utf8");
PrimitiveType type = Types.required(BINARY).as(LogicalTypeAnnotation.stringType()).named("test_binary_utf8");
ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
//assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
assertNull(builder.build());
Expand Down Expand Up @@ -554,7 +552,7 @@ public void testBuildBinaryUtf8()
public void testStaticBuildBinary()
{
ColumnIndex columnIndex = ColumnIndexBuilder.build(
Types.required(BINARY).as(UTF8).named("test_binary_utf8"),
Types.required(BINARY).as(LogicalTypeAnnotation.stringType()).named("test_binary_utf8"),
BoundaryOrder.ASCENDING,
asList(true, true, false, false, true, false, true, false),
asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L),
Expand Down Expand Up @@ -603,7 +601,7 @@ public void testStaticBuildBinary()
public void testFilterWithoutNullCounts()
{
ColumnIndex columnIndex = ColumnIndexBuilder.build(
Types.required(BINARY).as(UTF8).named("test_binary_utf8"),
Types.required(BINARY).as(LogicalTypeAnnotation.stringType()).named("test_binary_utf8"),
BoundaryOrder.ASCENDING,
asList(true, true, false, false, true, false, true, false),
null,
Expand Down Expand Up @@ -1137,7 +1135,7 @@ public void testStaticBuildInt32()
@Test
public void testBuildUInt8()
{
PrimitiveType type = Types.required(INT32).as(UINT_8).named("test_uint8");
PrimitiveType type = Types.required(INT32).as(LogicalTypeAnnotation.intType(8, false)).named("test_uint8");
ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
//assertThat(builder, instanceOf(IntColumnIndexBuilder.class));
assertNull(builder.build());
Expand Down Expand Up @@ -1352,7 +1350,7 @@ public void testNoOpBuilder()
{
ColumnIndexBuilder builder = ColumnIndexBuilder.getNoOpBuilder();
StatsBuilder sb = new StatsBuilder();
builder.add(sb.stats(Types.required(BINARY).as(UTF8).named("test_binary_utf8"), stringBinary("Jeltz"),
builder.add(sb.stats(Types.required(BINARY).as(LogicalTypeAnnotation.stringType()).named("test_binary_utf8"), stringBinary("Jeltz"),
stringBinary("Slartibartfast"), null, null));
builder.add(sb.stats(Types.required(BOOLEAN).named("test_boolean"), true, true, null, null));
builder.add(sb.stats(Types.required(DOUBLE).named("test_double"), null, null, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
import org.apache.parquet.column.statistics.IntStatistics;
import org.apache.parquet.column.statistics.LongStatistics;
import org.apache.parquet.format.Statistics;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -403,7 +404,7 @@ private void testReadStatsBinaryUtf8OldWriter(Optional<String> fileCreatedBy, St
if (max != null) {
statistics.setMax(max.getBytes(UTF_8));
}
assertThat(MetadataReader.readStats(fileCreatedBy, Optional.of(statistics), new PrimitiveType(OPTIONAL, BINARY, "Test column", OriginalType.UTF8)))
assertThat(MetadataReader.readStats(fileCreatedBy, Optional.of(statistics), Types.optional(BINARY).as(LogicalTypeAnnotation.stringType()).named("Test column")))
.isInstanceOfSatisfying(BinaryStatistics.class, columnStatistics -> {
assertFalse(columnStatistics.isEmpty());

Expand Down Expand Up @@ -437,7 +438,7 @@ private void testReadStatsBinaryUtf8OldWriter(Optional<String> fileCreatedBy, St
@Test(dataProvider = "allCreatedBy")
public void testReadStatsBinaryUtf8(Optional<String> fileCreatedBy)
{
PrimitiveType varchar = new PrimitiveType(OPTIONAL, BINARY, "Test column", OriginalType.UTF8);
PrimitiveType varchar = Types.optional(BINARY).as(LogicalTypeAnnotation.stringType()).named("Test column");
Statistics statistics;

// Stats written by Parquet after https://issues.apache.org/jira/browse/PARQUET-1025
Expand Down Expand Up @@ -477,7 +478,7 @@ public void testReadNullStats(Optional<String> fileCreatedBy)
columnStatistics -> assertTrue(columnStatistics.isEmpty()));

// varchar
assertThat(MetadataReader.readStats(fileCreatedBy, Optional.empty(), new PrimitiveType(OPTIONAL, BINARY, "Test column", OriginalType.UTF8)))
assertThat(MetadataReader.readStats(fileCreatedBy, Optional.empty(), Types.optional(BINARY).as(LogicalTypeAnnotation.stringType()).named("Test column")))
.isInstanceOfSatisfying(
BinaryStatistics.class,
columnStatistics -> assertTrue(columnStatistics.isEmpty()));
Expand Down
Loading