Skip to content

Commit

Permalink
PARQUET-968 Add Hive/Presto support in ProtoParquet
Browse files Browse the repository at this point in the history
This PR adds Hive (https://github.com/apache/hive) and Presto (https://github.com/prestodb/presto) support for parquet messages written with ProtoParquetWriter. Hive and other tools, such as Presto (used by AWS Athena), rely on specific LIST/MAP wrappers (as defined in the parquet spec: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md). These wrappers are currently missing from the ProtoParquet schema. AvroParquet works just fine, because it adds these wrappers when it deals with arrays and maps. This PR brings these wrappers in parquet-proto, providing the same functionality that already exists in parquet-avro.

This is backward compatible. Messages written without the extra LIST/MAP wrappers are still being read successfully using the updated ProtoParquetReader.

Regarding the change.
Given the following protobuf schema:

```
message ListOfPrimitives {
    repeated int64 my_repeated_id = 1;
}
```

Old parquet schema was:
```
message ListOfPrimitives {
  repeated int64 my_repeated_id = 1;
}
```

New parquet schema is:
```
message ListOfPrimitives {
  required group my_repeated_id (LIST) = 1 {
    repeated group list {
      required int64 element;
    }
  }
}
```
---

For list of messages, the changes look like this:

Protobuf schema:
```
message ListOfMessages {
    string top_field = 1;
    repeated MyInnerMessage first_array = 2;
}

message MyInnerMessage {
    int32 inner_field = 1;
}
```

Old parquet schema was:
```
message TestProto3.ListOfMessages {
  optional binary top_field (UTF8) = 1;
  repeated group first_array = 2 {
    optional int32 inner_field = 1;
  }
}
```

The expected parquet schema, compatible with Hive (and similar to parquet-avro) is the following (notice the LIST wrapper):

```
message TestProto3.ListOfMessages {
  optional binary top_field (UTF8) = 1;
  required group first_array (LIST) = 2 {
    repeated group list {
      optional group element {
        optional int32 inner_field = 1;
      }
    }
  }
}
```

---

Similar for maps. Protobuf schema:
```
message TopMessage {
    map<int64, MyInnerMessage> myMap = 1;
}

message MyInnerMessage {
    int32 inner_field = 1;
}
```

Old parquet schema:
```
message TestProto3.TopMessage {
  repeated group myMap = 1 {
    optional int64 key = 1;
    optional group value = 2 {
      optional int32 inner_field = 1;
    }
  }
}
```

New parquet schema (notice the `MAP` wrapper):
```
message TestProto3.TopMessage {
  required group myMap (MAP) = 1 {
    repeated group key_value {
      required int64 key;
      optional group value {
        optional int32 inner_field = 1;
      }
    }
  }
}
```

Jira: https://issues.apache.org/jira/browse/PARQUET-968

Author: Constantin Muraru <[email protected]>
Author: Benoît Hanotte <[email protected]>

Closes #411 from costimuraru/PARQUET-968 and squashes the following commits:

16eafcb [Benoît Hanotte] PARQUET-968 add proto flag to enable writing using specs-compliant schemas (#2)
a8bd704 [Constantin Muraru] Pick up commit from @andredasilvapinto
5cf9248 [Constantin Muraru] PARQUET-968 Add Hive support in ProtoParquet
  • Loading branch information
Constantin Muraru authored and julienledem committed Apr 26, 2018
1 parent af977ad commit f849384
Show file tree
Hide file tree
Showing 9 changed files with 1,331 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import com.twitter.elephantbird.util.Protobufs;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.io.InvalidRecordException;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.IncompatibleSchemaModificationException;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.Type;

import java.util.HashMap;
Expand Down Expand Up @@ -126,10 +128,15 @@ public void add(Object value) {
};
}

if (OriginalType.LIST == parquetType.getOriginalType()) {
return new ListConverter(parentBuilder, fieldDescriptor, parquetType);
}
if (OriginalType.MAP == parquetType.getOriginalType()) {
return new MapConverter(parentBuilder, fieldDescriptor, parquetType);
}
return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType);
}


private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {

JavaType javaType = fieldDescriptor.getJavaType();
Expand Down Expand Up @@ -342,4 +349,121 @@ public void addBinary(Binary binary) {
}

}

/**
* This class unwraps the additional LIST wrapper and makes it possible to read the underlying data and then convert
* it to protobuf.
* <p>
* Consider the following protobuf schema:
* message SimpleList {
* repeated int64 first_array = 1;
* }
* <p>
* A LIST wrapper is created in parquet for the above mentioned protobuf schema:
* message SimpleList {
* optional group first_array (LIST) = 1 {
* repeated group list {
* optional int32 element;
* }
* }
* }
* <p>
* The LIST wrappers are used by 3rd party tools, such as Hive, to read parquet arrays. The wrapper contains
* a repeated group named 'list', itself containing only one field called 'element' of the type of the repeated
* object (can be a primitive as in this example or a group in case of a repeated message in protobuf).
*/
final class ListConverter extends GroupConverter {
private final Converter converter;

public ListConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
OriginalType originalType = parquetType.getOriginalType();
if (originalType != OriginalType.LIST || parquetType.isPrimitive()) {
throw new ParquetDecodingException("Expected LIST wrapper. Found: " + originalType + " instead.");
}

GroupType rootWrapperType = parquetType.asGroupType();
if (!rootWrapperType.containsField("list") || rootWrapperType.getType("list").isPrimitive()) {
throw new ParquetDecodingException("Expected repeated 'list' group inside LIST wrapperr but got: " + rootWrapperType);
}

GroupType listType = rootWrapperType.getType("list").asGroupType();
if (!listType.containsField("element")) {
throw new ParquetDecodingException("Expected 'element' inside repeated list group but got: " + listType);
}

Type elementType = listType.getType("element");
converter = newMessageConverter(parentBuilder, fieldDescriptor, elementType);
}

@Override
public Converter getConverter(int fieldIndex) {
if (fieldIndex > 0) {
throw new ParquetDecodingException("Unexpected multiple fields in the LIST wrapper");
}

return new GroupConverter() {
@Override
public Converter getConverter(int fieldIndex) {
return converter;
}

@Override
public void start() {

}

@Override
public void end() {

}
};
}

@Override
public void start() {

}

@Override
public void end() {

}
}


final class MapConverter extends GroupConverter {
private final Converter converter;

public MapConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
OriginalType originalType = parquetType.getOriginalType();
if (originalType != OriginalType.MAP) {
throw new ParquetDecodingException("Expected MAP wrapper. Found: " + originalType + " instead.");
}

Type parquetSchema;
if (parquetType.asGroupType().containsField("key_value")){
parquetSchema = parquetType.asGroupType().getType("key_value");
} else {
throw new ParquetDecodingException("Expected map but got: " + parquetType);
}

converter = newMessageConverter(parentBuilder, fieldDescriptor, parquetSchema);
}

@Override
public Converter getConverter(int fieldIndex) {
if (fieldIndex > 0) {
throw new ParquetDecodingException("Unexpected multiple fields in the MAP wrapper");
}
return converter;
}

@Override
public void start() {
}

@Override
public void end() {
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -18,36 +18,49 @@
*/
package org.apache.parquet.proto;

import static org.apache.parquet.schema.OriginalType.ENUM;
import static org.apache.parquet.schema.OriginalType.UTF8;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;

import java.util.List;

import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
import com.google.protobuf.Message;
import com.twitter.elephantbird.util.Protobufs;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.apache.parquet.schema.Types.Builder;
import org.apache.parquet.schema.Types.GroupBuilder;

import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
import com.google.protobuf.Message;
import com.twitter.elephantbird.util.Protobufs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

import static org.apache.parquet.schema.OriginalType.ENUM;
import static org.apache.parquet.schema.OriginalType.UTF8;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;

/**
* Converts a Protocol Buffer Descriptor into a Parquet schema.
*/
public class ProtoSchemaConverter {

private static final Logger LOG = LoggerFactory.getLogger(ProtoSchemaConverter.class);
private final boolean parquetSpecsCompliant;

public ProtoSchemaConverter() {
this(false);
}

/**
* Instanciate a schema converter to get the parquet schema corresponding to protobuf classes.
* @param parquetSpecsCompliant If set to false, the parquet schema generated will be using the old
* schema style (prior to PARQUET-968) to provide backward-compatibility
* but which does not use LIST and MAP wrappers around collections as required
* by the parquet specifications. If set to true, specs compliant schemas are used.
*/
public ProtoSchemaConverter(boolean parquetSpecsCompliant) {
this.parquetSpecsCompliant = parquetSpecsCompliant;
}

public MessageType convert(Class<? extends Message> protobufClass) {
LOG.debug("Converting protocol buffer class \"" + protobufClass + "\" to parquet schema.");
Expand All @@ -60,8 +73,8 @@ public MessageType convert(Class<? extends Message> protobufClass) {
}

/* Iterates over list of fields. **/
private <T> GroupBuilder<T> convertFields(GroupBuilder<T> groupBuilder, List<Descriptors.FieldDescriptor> fieldDescriptors) {
for (Descriptors.FieldDescriptor fieldDescriptor : fieldDescriptors) {
private <T> GroupBuilder<T> convertFields(GroupBuilder<T> groupBuilder, List<FieldDescriptor> fieldDescriptors) {
for (FieldDescriptor fieldDescriptor : fieldDescriptors) {
groupBuilder =
addField(fieldDescriptor, groupBuilder)
.id(fieldDescriptor.getNumber())
Expand All @@ -70,7 +83,7 @@ private <T> GroupBuilder<T> convertFields(GroupBuilder<T> groupBuilder, List<Des
return groupBuilder;
}

private Type.Repetition getRepetition(Descriptors.FieldDescriptor descriptor) {
private Type.Repetition getRepetition(FieldDescriptor descriptor) {
if (descriptor.isRequired()) {
return Type.Repetition.REQUIRED;
} else if (descriptor.isRepeated()) {
Expand All @@ -80,26 +93,110 @@ private Type.Repetition getRepetition(Descriptors.FieldDescriptor descriptor) {
}
}

private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addField(Descriptors.FieldDescriptor descriptor, GroupBuilder<T> builder) {
Type.Repetition repetition = getRepetition(descriptor);
JavaType javaType = descriptor.getJavaType();
private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addField(FieldDescriptor descriptor, final GroupBuilder<T> builder) {
if (descriptor.getJavaType() == JavaType.MESSAGE) {
return addMessageField(descriptor, builder);
}

ParquetType parquetType = getParquetType(descriptor);
if (descriptor.isRepeated() && parquetSpecsCompliant) {
// the old schema style did not include the LIST wrapper around repeated fields
return addRepeatedPrimitive(descriptor, parquetType.primitiveType, parquetType.originalType, builder);
}

return builder.primitive(parquetType.primitiveType, getRepetition(descriptor)).as(parquetType.originalType);
}

private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addRepeatedPrimitive(FieldDescriptor descriptor,
PrimitiveTypeName primitiveType,
OriginalType originalType,
final GroupBuilder<T> builder) {
return builder
.group(Type.Repetition.OPTIONAL).as(OriginalType.LIST)
.group(Type.Repetition.REPEATED)
.primitive(primitiveType, Type.Repetition.REQUIRED).as(originalType)
.named("element")
.named("list");
}

private <T> GroupBuilder<GroupBuilder<T>> addRepeatedMessage(FieldDescriptor descriptor, GroupBuilder<T> builder) {
GroupBuilder<GroupBuilder<GroupBuilder<GroupBuilder<T>>>> result =
builder
.group(Type.Repetition.OPTIONAL).as(OriginalType.LIST)
.group(Type.Repetition.REPEATED)
.group(Type.Repetition.OPTIONAL);

convertFields(result, descriptor.getMessageType().getFields());

return result.named("element").named("list");
}

private <T> GroupBuilder<GroupBuilder<T>> addMessageField(FieldDescriptor descriptor, final GroupBuilder<T> builder) {
if (descriptor.isMapField() && parquetSpecsCompliant) {
// the old schema style did not include the MAP wrapper around map groups
return addMapField(descriptor, builder);
}
if (descriptor.isRepeated() && parquetSpecsCompliant) {
// the old schema style did not include the LIST wrapper around repeated messages
return addRepeatedMessage(descriptor, builder);
}

// Plain message
GroupBuilder<GroupBuilder<T>> group = builder.group(getRepetition(descriptor));
convertFields(group, descriptor.getMessageType().getFields());
return group;
}

private <T> GroupBuilder<GroupBuilder<T>> addMapField(FieldDescriptor descriptor, final GroupBuilder<T> builder) {
List<FieldDescriptor> fields = descriptor.getMessageType().getFields();
if (fields.size() != 2) {
throw new UnsupportedOperationException("Expected two fields for the map (key/value), but got: " + fields);
}

ParquetType mapKeyParquetType = getParquetType(fields.get(0));

GroupBuilder<GroupBuilder<GroupBuilder<T>>> group = builder
.group(Type.Repetition.OPTIONAL).as(OriginalType.MAP) // only optional maps are allowed in Proto3
.group(Type.Repetition.REPEATED) // key_value wrapper
.primitive(mapKeyParquetType.primitiveType, Type.Repetition.REQUIRED).as(mapKeyParquetType.originalType).named("key");

return addField(fields.get(1), group).named("value")
.named("key_value");
}

private ParquetType getParquetType(FieldDescriptor fieldDescriptor) {

JavaType javaType = fieldDescriptor.getJavaType();
switch (javaType) {
case BOOLEAN: return builder.primitive(BOOLEAN, repetition);
case INT: return builder.primitive(INT32, repetition);
case LONG: return builder.primitive(INT64, repetition);
case FLOAT: return builder.primitive(FLOAT, repetition);
case DOUBLE: return builder.primitive(DOUBLE, repetition);
case BYTE_STRING: return builder.primitive(BINARY, repetition);
case STRING: return builder.primitive(BINARY, repetition).as(UTF8);
case MESSAGE: {
GroupBuilder<GroupBuilder<T>> group = builder.group(repetition);
convertFields(group, descriptor.getMessageType().getFields());
return group;
}
case ENUM: return builder.primitive(BINARY, repetition).as(ENUM);
case INT: return ParquetType.of(INT32);
case LONG: return ParquetType.of(INT64);
case DOUBLE: return ParquetType.of(DOUBLE);
case BOOLEAN: return ParquetType.of(BOOLEAN);
case FLOAT: return ParquetType.of(FLOAT);
case STRING: return ParquetType.of(BINARY, UTF8);
case ENUM: return ParquetType.of(BINARY, ENUM);
case BYTE_STRING: return ParquetType.of(BINARY);
default:
throw new UnsupportedOperationException("Cannot convert Protocol Buffer: unknown type " + javaType);
}
}

private static class ParquetType {
PrimitiveTypeName primitiveType;
OriginalType originalType;

private ParquetType(PrimitiveTypeName primitiveType, OriginalType originalType) {
this.primitiveType = primitiveType;
this.originalType = originalType;
}

public static ParquetType of(PrimitiveTypeName primitiveType, OriginalType originalType) {
return new ParquetType(primitiveType, originalType);
}

public static ParquetType of(PrimitiveTypeName primitiveType) {
return of(primitiveType, null);
}
}

}
Loading

0 comments on commit f849384

Please sign in to comment.