Skip to content

Commit

Permalink
PARQUET-968 Add Hive support in ProtoParquet
Browse files Browse the repository at this point in the history
Fix bug with map writer

Implement review

Implement review

PARQUET-968 Implement feedback

Update the proto to parquet schema converter for MAP fields so that it follows the scec: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
This came as feedback from the Amazon Athena team.
  • Loading branch information
Constantin Muraru committed Feb 14, 2018
1 parent 6a4bbe9 commit 5cf9248
Show file tree
Hide file tree
Showing 5 changed files with 401 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import com.twitter.elephantbird.util.Protobufs;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.io.InvalidRecordException;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.IncompatibleSchemaModificationException;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.Type;

import java.util.HashMap;
Expand Down Expand Up @@ -129,10 +131,14 @@ public void add(Object value) {
};
}

return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType);
OriginalType originalType = parquetType.getOriginalType() == null ? OriginalType.UTF8 : parquetType.getOriginalType();
switch (originalType) {
case LIST: return new ListConverter(parentBuilder, fieldDescriptor, parquetType);
case MAP: return new MapConverter(parentBuilder, fieldDescriptor, parquetType);
default: return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType);
}
}


private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {

JavaType javaType = fieldDescriptor.getJavaType();
Expand Down Expand Up @@ -345,4 +351,125 @@ public void addBinary(Binary binary) {
}

}

/**
* This class unwraps the additional LIST wrapper and makes it possible to read the underlying data and then convert
* it to protobuf.
* <p>
* Consider the following protobuf schema:
* message SimpleList {
* repeated int64 first_array = 1;
* }
* <p>
* A LIST wrapper is created in parquet for the above mentioned protobuf schema:
* message SimpleList {
* required group first_array (LIST) = 1 {
* repeated int32 element;
* }
* }
* <p>
* The LIST wrappers are used by 3rd party tools, such as Hive, to read parquet arrays. The wrapper contains
* one only one field: either a primitive field (like in the example above, where we have an array of ints) or
* another group (array of messages).
*/
final class ListConverter extends GroupConverter {
private final Converter converter;
private final boolean listOfMessage;

public ListConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
OriginalType originalType = parquetType.getOriginalType();
if (originalType != OriginalType.LIST) {
throw new ParquetDecodingException("Expected LIST wrapper. Found: " + originalType + " instead.");
}

listOfMessage = fieldDescriptor.getJavaType() == JavaType.MESSAGE;

Type parquetSchema;
if (parquetType.asGroupType().containsField("list")) {
parquetSchema = parquetType.asGroupType().getType("list");
if (parquetSchema.asGroupType().containsField("element")) {
parquetSchema.asGroupType().getType("element");
}
} else {
throw new ParquetDecodingException("Expected list but got: " + parquetType);
}

converter = newMessageConverter(parentBuilder, fieldDescriptor, parquetSchema);
}

@Override
public Converter getConverter(int fieldIndex) {
if (fieldIndex > 0) {
throw new ParquetDecodingException("Unexpected multiple fields in the LIST wrapper");
}

if (listOfMessage) {
return converter;
}

return new GroupConverter() {
@Override
public Converter getConverter(int fieldIndex) {
return converter;
}

@Override
public void start() {

}

@Override
public void end() {

}
};
}

@Override
public void start() {

}

@Override
public void end() {

}
}


final class MapConverter extends GroupConverter {
private final Converter converter;

public MapConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
OriginalType originalType = parquetType.getOriginalType();
if (originalType != OriginalType.MAP) {
throw new ParquetDecodingException("Expected MAP wrapper. Found: " + originalType + " instead.");
}

Type parquetSchema;
if (parquetType.asGroupType().containsField("key_value")){
parquetSchema = parquetType.asGroupType().getType("key_value");
} else {
throw new ParquetDecodingException("Expected map but got: " + parquetType);
}

converter = newMessageConverter(parentBuilder, fieldDescriptor, parquetSchema);
}

@Override
public Converter getConverter(int fieldIndex) {
if (fieldIndex > 0) {
throw new ParquetDecodingException("Unexpected multiple fields in the MAP wrapper");
}
return converter;
}

@Override
public void start() {
}

@Override
public void end() {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,26 @@
*/
package org.apache.parquet.proto;

import static org.apache.parquet.schema.OriginalType.ENUM;
import static org.apache.parquet.schema.OriginalType.UTF8;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;

import java.util.List;

import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
import com.google.protobuf.Message;
import com.twitter.elephantbird.util.Protobufs;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.apache.parquet.schema.Types.Builder;
import org.apache.parquet.schema.Types.GroupBuilder;

import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
import com.google.protobuf.Message;
import com.twitter.elephantbird.util.Protobufs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

import static org.apache.parquet.schema.OriginalType.ENUM;
import static org.apache.parquet.schema.OriginalType.UTF8;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;

/**
* <p/>
* Converts a Protocol Buffer Descriptor into a Parquet schema.
Expand Down Expand Up @@ -83,26 +79,105 @@ private Type.Repetition getRepetition(Descriptors.FieldDescriptor descriptor) {
}
}

private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addField(Descriptors.FieldDescriptor descriptor, GroupBuilder<T> builder) {
Type.Repetition repetition = getRepetition(descriptor);
JavaType javaType = descriptor.getJavaType();
private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addField(Descriptors.FieldDescriptor descriptor, final GroupBuilder<T> builder) {
if (descriptor.getJavaType() == JavaType.MESSAGE) {
return addMessageField(descriptor, builder);
}

ParquetType parquetType = getParquetType(descriptor);
if (descriptor.isRepeated()) {
return addRepeatedPrimitive(descriptor, parquetType.primitiveType, parquetType.originalType, builder);
}

return builder.primitive(parquetType.primitiveType, getRepetition(descriptor)).as(parquetType.originalType);
}

private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addRepeatedPrimitive(Descriptors.FieldDescriptor descriptor,
PrimitiveTypeName primitiveType,
OriginalType originalType,
final GroupBuilder<T> builder) {
return builder
.group(Type.Repetition.REQUIRED).as(OriginalType.LIST)
.group(Type.Repetition.REPEATED)
.primitive(primitiveType, Type.Repetition.REQUIRED).as(originalType)
.named("element")
.named("list");
}

private <T> GroupBuilder<GroupBuilder<T>> addRepeatedMessage(Descriptors.FieldDescriptor descriptor, GroupBuilder<T> builder) {
GroupBuilder<GroupBuilder<GroupBuilder<T>>> result =
builder
.group(Type.Repetition.REQUIRED).as(OriginalType.LIST)
.group(Type.Repetition.REPEATED);

convertFields(result, descriptor.getMessageType().getFields());

return result.named("list");
}

private <T> GroupBuilder<GroupBuilder<T>> addMessageField(Descriptors.FieldDescriptor descriptor, final GroupBuilder<T> builder) {
if (descriptor.isMapField()) {
return addMapField(descriptor, builder);
} else if (descriptor.isRepeated()) {
return addRepeatedMessage(descriptor, builder);
}

// Plain message
GroupBuilder<GroupBuilder<T>> group = builder.group(getRepetition(descriptor));
convertFields(group, descriptor.getMessageType().getFields());
return group;
}

private <T> GroupBuilder<GroupBuilder<T>> addMapField(Descriptors.FieldDescriptor descriptor, final GroupBuilder<T> builder) {
List<Descriptors.FieldDescriptor> fields = descriptor.getMessageType().getFields();
if (fields.size() != 2) {
throw new UnsupportedOperationException("Expected two fields for the map (key/value), but got: " + fields);
}

ParquetType mapKeyParquetType = getParquetType(fields.get(0));

GroupBuilder<GroupBuilder<GroupBuilder<T>>> group = builder
.group(Type.Repetition.REQUIRED).as(OriginalType.MAP)
.group(Type.Repetition.REPEATED) // key_value wrapper
.primitive(mapKeyParquetType.primitiveType, Type.Repetition.REQUIRED).as(mapKeyParquetType.originalType).named("key");

return addField(fields.get(1), group).named("value")
.named("key_value");
}

private ParquetType getParquetType(Descriptors.FieldDescriptor fieldDescriptor) {

JavaType javaType = fieldDescriptor.getJavaType();
switch (javaType) {
case BOOLEAN: return builder.primitive(BOOLEAN, repetition);
case INT: return builder.primitive(INT32, repetition);
case LONG: return builder.primitive(INT64, repetition);
case FLOAT: return builder.primitive(FLOAT, repetition);
case DOUBLE: return builder.primitive(DOUBLE, repetition);
case BYTE_STRING: return builder.primitive(BINARY, repetition);
case STRING: return builder.primitive(BINARY, repetition).as(UTF8);
case MESSAGE: {
GroupBuilder<GroupBuilder<T>> group = builder.group(repetition);
convertFields(group, descriptor.getMessageType().getFields());
return group;
}
case ENUM: return builder.primitive(BINARY, repetition).as(ENUM);
case INT: return ParquetType.of(INT32);
case LONG: return ParquetType.of(INT64);
case DOUBLE: return ParquetType.of(DOUBLE);
case BOOLEAN: return ParquetType.of(BOOLEAN);
case FLOAT: return ParquetType.of(FLOAT);
case STRING: return ParquetType.of(BINARY, UTF8);
case ENUM: return ParquetType.of(BINARY, ENUM);
case BYTE_STRING: return ParquetType.of(BINARY);
default:
throw new UnsupportedOperationException("Cannot convert Protocol Buffer: unknown type " + javaType);
}
}

private static class ParquetType {
PrimitiveTypeName primitiveType;
OriginalType originalType;

private ParquetType(PrimitiveTypeName primitiveType, OriginalType originalType) {
this.primitiveType = primitiveType;
this.originalType = originalType;
}

public static ParquetType of(PrimitiveTypeName primitiveType, OriginalType originalType) {
return new ParquetType(primitiveType, originalType);
}

public static ParquetType of(PrimitiveTypeName primitiveType) {
return of(primitiveType, null);
}
}

}
Loading

0 comments on commit 5cf9248

Please sign in to comment.