Skip to content
Merged

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.parquet.arrow.schema;

import static java.util.Arrays.asList;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS;
import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType;
import static org.apache.parquet.schema.OriginalType.DATE;
import static org.apache.parquet.schema.OriginalType.DECIMAL;
import static org.apache.parquet.schema.OriginalType.INTERVAL;
Expand Down Expand Up @@ -64,10 +67,9 @@
import org.apache.parquet.example.Paper;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import org.junit.Assert;
import org.junit.Test;

import junit.framework.Assert;

/**
* @see SchemaConverter
*/
Expand Down Expand Up @@ -169,7 +171,7 @@ private static Field field(String name, ArrowType type, Field... children) {
.addField(Types.optional(INT64).as(DECIMAL).precision(15).scale(5).named("k1"))
.addField(Types.optional(BINARY).as(DECIMAL).precision(25).scale(5).named("k2"))
.addField(Types.optional(INT32).as(DATE).named("l"))
.addField(Types.optional(INT32).as(TIME_MILLIS).named("m"))
.addField(Types.optional(INT32).as(timeType(false, MILLIS)).named("m"))
.addField(Types.optional(INT64).as(TIMESTAMP_MILLIS).named("n"))
.addField(Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("o"))
.addField(Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("o1"))
Expand Down Expand Up @@ -365,15 +367,17 @@ public void testArrowTimeMillisecondToParquet() {
MessageType expected = converter.fromArrow(new Schema(asList(
field("a", new ArrowType.Time(TimeUnit.MILLISECOND, 32))
))).getParquetSchema();
Assert.assertEquals(expected, Types.buildMessage().addField(Types.optional(INT32).as(TIME_MILLIS).named("a")).named("root"));
Assert.assertEquals(expected,
Types.buildMessage().addField(Types.optional(INT32).as(timeType(false, MILLIS)).named("a")).named("root"));
}

@Test
public void testArrowTimeMicrosecondToParquet() {
MessageType expected = converter.fromArrow(new Schema(asList(
field("a", new ArrowType.Time(TimeUnit.MICROSECOND, 64))
))).getParquetSchema();
Assert.assertEquals(expected, Types.buildMessage().addField(Types.optional(INT64).as(TIME_MICROS).named("a")).named("root"));
Assert.assertEquals(expected,
Types.buildMessage().addField(Types.optional(INT64).as(timeType(false, MICROS)).named("a")).named("root"));
}

@Test(expected = UnsupportedOperationException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.schema.ConversionPatterns;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
Expand All @@ -36,11 +35,21 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import static java.util.Optional.empty;
import static java.util.Optional.of;
import static org.apache.avro.JsonProperties.NULL_VALUE;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
import static org.apache.parquet.schema.OriginalType.*;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS;
import static org.apache.parquet.schema.LogicalTypeAnnotation.dateType;
import static org.apache.parquet.schema.LogicalTypeAnnotation.decimalType;
import static org.apache.parquet.schema.LogicalTypeAnnotation.enumType;
import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType;
import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
import static org.apache.parquet.schema.Type.Repetition.REPEATED;

Expand Down Expand Up @@ -147,11 +156,11 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet
} else if (type.equals(Schema.Type.BYTES)) {
builder = Types.primitive(BINARY, repetition);
} else if (type.equals(Schema.Type.STRING)) {
builder = Types.primitive(BINARY, repetition).as(UTF8);
builder = Types.primitive(BINARY, repetition).as(stringType());
} else if (type.equals(Schema.Type.RECORD)) {
return new GroupType(repetition, fieldName, convertFields(schema.getFields()));
} else if (type.equals(Schema.Type.ENUM)) {
builder = Types.primitive(BINARY, repetition).as(ENUM);
builder = Types.primitive(BINARY, repetition).as(enumType());
} else if (type.equals(Schema.Type.ARRAY)) {
if (writeOldListStructure) {
return ConversionPatterns.listType(repetition, fieldName,
Expand All @@ -178,12 +187,10 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet
LogicalType logicalType = schema.getLogicalType();
if (logicalType != null) {
if (logicalType instanceof LogicalTypes.Decimal) {
builder = builder.as(DECIMAL)
.precision(((LogicalTypes.Decimal) logicalType).getPrecision())
.scale(((LogicalTypes.Decimal) logicalType).getScale());

LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
builder = builder.as(decimalType(decimal.getScale(), decimal.getPrecision()));
} else {
OriginalType annotation = convertLogicalType(logicalType);
LogicalTypeAnnotation annotation = convertLogicalType(logicalType);
if (annotation != null) {
builder.as(annotation);
}
Expand Down Expand Up @@ -267,7 +274,7 @@ private Schema convertField(final Type parquetType) {
final PrimitiveType asPrimitive = parquetType.asPrimitiveType();
final PrimitiveTypeName parquetPrimitiveTypeName =
asPrimitive.getPrimitiveTypeName();
final OriginalType annotation = parquetType.getOriginalType();
final LogicalTypeAnnotation annotation = parquetType.getLogicalTypeAnnotation();
Schema schema = parquetPrimitiveTypeName.convert(
new PrimitiveType.PrimitiveTypeNameConverter<Schema, RuntimeException>() {
@Override
Expand Down Expand Up @@ -301,17 +308,17 @@ public Schema convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName primitiveTypeName) {
}
@Override
public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
if (annotation == OriginalType.UTF8 || annotation == OriginalType.ENUM) {
if (annotation instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation ||
annotation instanceof LogicalTypeAnnotation.EnumLogicalTypeAnnotation) {
return Schema.create(Schema.Type.STRING);
} else {
return Schema.create(Schema.Type.BYTES);
}
}
});

LogicalType logicalType = convertOriginalType(
annotation, asPrimitive.getDecimalMetadata());
if (logicalType != null && (annotation != DECIMAL ||
LogicalType logicalType = convertLogicalType(annotation);
if (logicalType != null && (!(annotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) ||
parquetPrimitiveTypeName == BINARY ||
parquetPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY)) {
schema = logicalType.addToSchema(schema);
Expand All @@ -321,10 +328,11 @@ public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {

} else {
GroupType parquetGroupType = parquetType.asGroupType();
OriginalType originalType = parquetGroupType.getOriginalType();
if (originalType != null) {
switch(originalType) {
case LIST:
LogicalTypeAnnotation logicalTypeAnnotation = parquetGroupType.getLogicalTypeAnnotation();
if (logicalTypeAnnotation != null) {
return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<Schema>() {
@Override
public Optional<Schema> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
if (parquetGroupType.getFieldCount()!= 1) {
throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
}
Expand All @@ -334,17 +342,29 @@ public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
}
if (isElementType(repeatedType, parquetGroupType.getName())) {
// repeated element types are always required
return Schema.createArray(convertField(repeatedType));
return of(Schema.createArray(convertField(repeatedType)));
} else {
Type elementType = repeatedType.asGroupType().getType(0);
if (elementType.isRepetition(Type.Repetition.OPTIONAL)) {
return Schema.createArray(optional(convertField(elementType)));
return of(Schema.createArray(optional(convertField(elementType))));
} else {
return Schema.createArray(convertField(elementType));
return of(Schema.createArray(convertField(elementType)));
}
}
case MAP_KEY_VALUE: // for backward-compatibility
case MAP:
}

@Override
// for backward-compatibility
public Optional<Schema> visit(LogicalTypeAnnotation.MapKeyValueTypeAnnotation mapKeyValueLogicalType) {
return visitMapOrMapKeyValue();
}

@Override
public Optional<Schema> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
return visitMapOrMapKeyValue();
}

private Optional<Schema> visitMapOrMapKeyValue() {
if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) {
throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
}
Expand All @@ -356,69 +376,89 @@ public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
Type keyType = mapKeyValType.getType(0);
if (!keyType.isPrimitive() ||
!keyType.asPrimitiveType().getPrimitiveTypeName().equals(PrimitiveTypeName.BINARY) ||
!keyType.getOriginalType().equals(OriginalType.UTF8)) {
!keyType.getLogicalTypeAnnotation().equals(stringType())) {
throw new IllegalArgumentException("Map key type must be binary (UTF8): "
+ keyType);
}
Type valueType = mapKeyValType.getType(1);
if (valueType.isRepetition(Type.Repetition.OPTIONAL)) {
return Schema.createMap(optional(convertField(valueType)));
return of(Schema.createMap(optional(convertField(valueType))));
} else {
return Schema.createMap(convertField(valueType));
return of(Schema.createMap(convertField(valueType)));
}
case ENUM:
return Schema.create(Schema.Type.STRING);
case UTF8:
default:
throw new UnsupportedOperationException("Cannot convert Parquet type " +
parquetType);
}

}
@Override
public Optional<Schema> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
return of(Schema.create(Schema.Type.STRING));
}
}).orElseThrow(() -> new UnsupportedOperationException("Cannot convert Parquet type " + parquetType));
} else {
// if no original type then it's a record
return convertFields(parquetGroupType.getName(), parquetGroupType.getFields());
}
}
}

private OriginalType convertLogicalType(LogicalType logicalType) {
private LogicalTypeAnnotation convertLogicalType(LogicalType logicalType) {
if (logicalType == null) {
return null;
} else if (logicalType instanceof LogicalTypes.Decimal) {
return OriginalType.DECIMAL;
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
return decimalType(decimal.getScale(), decimal.getPrecision());
} else if (logicalType instanceof LogicalTypes.Date) {
return OriginalType.DATE;
return dateType();
} else if (logicalType instanceof LogicalTypes.TimeMillis) {
return OriginalType.TIME_MILLIS;
return timeType(false, MILLIS);
} else if (logicalType instanceof LogicalTypes.TimeMicros) {
return OriginalType.TIME_MICROS;
return timeType(false, MICROS);
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
return OriginalType.TIMESTAMP_MILLIS;
return timestampType(false, MILLIS);
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
return OriginalType.TIMESTAMP_MICROS;
return timestampType(false, MICROS);
}
return null;
}

private LogicalType convertOriginalType(OriginalType annotation, DecimalMetadata meta) {
private LogicalType convertLogicalType(LogicalTypeAnnotation annotation) {
if (annotation == null) {
return null;
}
switch (annotation) {
case DECIMAL:
return LogicalTypes.decimal(meta.getPrecision(), meta.getScale());
case DATE:
return LogicalTypes.date();
case TIME_MILLIS:
return LogicalTypes.timeMillis();
case TIME_MICROS:
return LogicalTypes.timeMicros();
case TIMESTAMP_MILLIS:
return LogicalTypes.timestampMillis();
case TIMESTAMP_MICROS:
return LogicalTypes.timestampMicros();
}
return null;
return annotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<LogicalType>() {
@Override
public Optional<LogicalType> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
return of(LogicalTypes.decimal(decimalLogicalType.getPrecision(), decimalLogicalType.getScale()));
}

@Override
public Optional<LogicalType> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
return of(LogicalTypes.date());
}

@Override
public Optional<LogicalType> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
LogicalTypeAnnotation.TimeUnit unit = timeLogicalType.getUnit();
switch (unit) {
case MILLIS:
return of(LogicalTypes.timeMillis());
case MICROS:
return of(LogicalTypes.timeMicros());
}
return empty();
}

@Override
public Optional<LogicalType> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
LogicalTypeAnnotation.TimeUnit unit = timestampLogicalType.getUnit();
switch (unit) {
case MILLIS:
return of(LogicalTypes.timestampMillis());
case MICROS:
return of(LogicalTypes.timestampMicros());
}
return empty();
}
}).orElse(null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down Expand Up @@ -616,7 +616,7 @@ public void testTimeMillisType() throws Exception {

testRoundTripConversion(expected,
"message myrecord {\n" +
" required int32 time (TIME_MILLIS);\n" +
" required int32 time (TIME(MILLIS,false));\n" +
"}\n");

for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
Expand Down Expand Up @@ -646,7 +646,7 @@ public void testTimeMicrosType() throws Exception {

testRoundTripConversion(expected,
"message myrecord {\n" +
" required int64 time (TIME_MICROS);\n" +
" required int64 time (TIME(MICROS,false));\n" +
"}\n");

for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
Expand Down Expand Up @@ -676,7 +676,7 @@ public void testTimestampMillisType() throws Exception {

testRoundTripConversion(expected,
"message myrecord {\n" +
" required int64 timestamp (TIMESTAMP_MILLIS);\n" +
" required int64 timestamp (TIMESTAMP(MILLIS,false));\n" +
"}\n");

for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
Expand Down Expand Up @@ -706,7 +706,7 @@ public void testTimestampMicrosType() throws Exception {

testRoundTripConversion(expected,
"message myrecord {\n" +
" required int64 timestamp (TIMESTAMP_MICROS);\n" +
" required int64 timestamp (TIMESTAMP(MICROS,false));\n" +
"}\n");

for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
Expand Down
Loading