Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-968 Add Hive/Presto support in ProtoParquet #411

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import com.twitter.elephantbird.util.Protobufs;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.io.InvalidRecordException;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.IncompatibleSchemaModificationException;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.Type;

import java.util.HashMap;
Expand Down Expand Up @@ -129,10 +131,15 @@ public void add(Object value) {
};
}

if (OriginalType.LIST == parquetType.getOriginalType()) {
return new ListConverter(parentBuilder, fieldDescriptor, parquetType);
}
if (OriginalType.MAP == parquetType.getOriginalType()) {
return new MapConverter(parentBuilder, fieldDescriptor, parquetType);
}
return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType);
}


private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {

JavaType javaType = fieldDescriptor.getJavaType();
Expand Down Expand Up @@ -345,4 +352,121 @@ public void addBinary(Binary binary) {
}

}

/**
* This class unwraps the additional LIST wrapper and makes it possible to read the underlying data and then convert
* it to protobuf.
* <p>
* Consider the following protobuf schema:
* message SimpleList {
* repeated int64 first_array = 1;
* }
* <p>
* A LIST wrapper is created in parquet for the above mentioned protobuf schema:
* message SimpleList {
* optional group first_array (LIST) = 1 {
* repeated group list {
* optional int32 element;
* }
* }
* }
* <p>
* The LIST wrappers are used by 3rd party tools, such as Hive, to read parquet arrays. The wrapper contains
* a repeated group named 'list', itself containing only one field called 'element' of the type of the repeated
* object (can be a primitive as in this example or a group in case of a repeated message in protobuf).
*/
final class ListConverter extends GroupConverter {
private final Converter converter;

public ListConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
OriginalType originalType = parquetType.getOriginalType();
if (originalType != OriginalType.LIST || parquetType.isPrimitive()) {
throw new ParquetDecodingException("Expected LIST wrapper. Found: " + originalType + " instead.");
}

GroupType rootWrapperType = parquetType.asGroupType();
if (!rootWrapperType.containsField("list") || rootWrapperType.getType("list").isPrimitive()) {
throw new ParquetDecodingException("Expected repeated 'list' group inside LIST wrapperr but got: " + rootWrapperType);
}

GroupType listType = rootWrapperType.getType("list").asGroupType();
if (!listType.containsField("element")) {
throw new ParquetDecodingException("Expected 'element' inside repeated list group but got: " + listType);
}

Type elementType = listType.getType("element");
converter = newMessageConverter(parentBuilder, fieldDescriptor, elementType);
}

@Override
public Converter getConverter(int fieldIndex) {
if (fieldIndex > 0) {
throw new ParquetDecodingException("Unexpected multiple fields in the LIST wrapper");
}

return new GroupConverter() {
@Override
public Converter getConverter(int fieldIndex) {
return converter;
}

@Override
public void start() {

}

@Override
public void end() {

}
};
}

@Override
public void start() {

}

@Override
public void end() {

}
}


final class MapConverter extends GroupConverter {
private final Converter converter;

public MapConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
OriginalType originalType = parquetType.getOriginalType();
if (originalType != OriginalType.MAP) {
throw new ParquetDecodingException("Expected MAP wrapper. Found: " + originalType + " instead.");
}

Type parquetSchema;
if (parquetType.asGroupType().containsField("key_value")){
parquetSchema = parquetType.asGroupType().getType("key_value");
} else {
throw new ParquetDecodingException("Expected map but got: " + parquetType);
}

converter = newMessageConverter(parentBuilder, fieldDescriptor, parquetSchema);
}

@Override
public Converter getConverter(int fieldIndex) {
if (fieldIndex > 0) {
throw new ParquetDecodingException("Unexpected multiple fields in the MAP wrapper");
}
return converter;
}

@Override
public void start() {
}

@Override
public void end() {
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -18,30 +18,27 @@
*/
package org.apache.parquet.proto;

import static org.apache.parquet.schema.OriginalType.ENUM;
import static org.apache.parquet.schema.OriginalType.UTF8;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;

import java.util.List;

import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
import com.google.protobuf.Message;
import com.twitter.elephantbird.util.Protobufs;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.apache.parquet.schema.Types.Builder;
import org.apache.parquet.schema.Types.GroupBuilder;

import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
import com.google.protobuf.Message;
import com.twitter.elephantbird.util.Protobufs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

import static org.apache.parquet.schema.OriginalType.ENUM;
import static org.apache.parquet.schema.OriginalType.UTF8;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;

/**
* <p/>
* Converts a Protocol Buffer Descriptor into a Parquet schema.
Expand All @@ -51,6 +48,22 @@
public class ProtoSchemaConverter {

private static final Logger LOG = LoggerFactory.getLogger(ProtoSchemaConverter.class);
private final boolean parquetSpecsCompliant;

public ProtoSchemaConverter() {
this(false);
}

/**
* Instanciate a schema converter to get the parquet schema corresponding to protobuf classes.
* @param parquetSpecsCompliant If set to false, the parquet schema generated will be using the old
* schema style (prior to PARQUET-968) to provide backward-compatibility
* but which does not use LIST and MAP wrappers around collections as required
* by the parquet specifications. If set to true, specs compliant schemas are used.
*/
public ProtoSchemaConverter(boolean parquetSpecsCompliant) {
this.parquetSpecsCompliant = parquetSpecsCompliant;
}

public MessageType convert(Class<? extends Message> protobufClass) {
LOG.debug("Converting protocol buffer class \"" + protobufClass + "\" to parquet schema.");
Expand All @@ -63,8 +76,8 @@ public MessageType convert(Class<? extends Message> protobufClass) {
}

/* Iterates over list of fields. **/
private <T> GroupBuilder<T> convertFields(GroupBuilder<T> groupBuilder, List<Descriptors.FieldDescriptor> fieldDescriptors) {
for (Descriptors.FieldDescriptor fieldDescriptor : fieldDescriptors) {
private <T> GroupBuilder<T> convertFields(GroupBuilder<T> groupBuilder, List<FieldDescriptor> fieldDescriptors) {
for (FieldDescriptor fieldDescriptor : fieldDescriptors) {
groupBuilder =
addField(fieldDescriptor, groupBuilder)
.id(fieldDescriptor.getNumber())
Expand All @@ -73,7 +86,7 @@ private <T> GroupBuilder<T> convertFields(GroupBuilder<T> groupBuilder, List<Des
return groupBuilder;
}

private Type.Repetition getRepetition(Descriptors.FieldDescriptor descriptor) {
private Type.Repetition getRepetition(FieldDescriptor descriptor) {
if (descriptor.isRequired()) {
return Type.Repetition.REQUIRED;
} else if (descriptor.isRepeated()) {
Expand All @@ -83,26 +96,110 @@ private Type.Repetition getRepetition(Descriptors.FieldDescriptor descriptor) {
}
}

private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addField(Descriptors.FieldDescriptor descriptor, GroupBuilder<T> builder) {
Type.Repetition repetition = getRepetition(descriptor);
JavaType javaType = descriptor.getJavaType();
private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addField(FieldDescriptor descriptor, final GroupBuilder<T> builder) {
if (descriptor.getJavaType() == JavaType.MESSAGE) {
return addMessageField(descriptor, builder);
}

ParquetType parquetType = getParquetType(descriptor);
if (descriptor.isRepeated() && parquetSpecsCompliant) {
// the old schema style did not include the LIST wrapper around repeated fields
return addRepeatedPrimitive(descriptor, parquetType.primitiveType, parquetType.originalType, builder);
}

return builder.primitive(parquetType.primitiveType, getRepetition(descriptor)).as(parquetType.originalType);
}

private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addRepeatedPrimitive(FieldDescriptor descriptor,
PrimitiveTypeName primitiveType,
OriginalType originalType,
final GroupBuilder<T> builder) {
return builder
.group(Type.Repetition.OPTIONAL).as(OriginalType.LIST)
.group(Type.Repetition.REPEATED)
.primitive(primitiveType, Type.Repetition.REQUIRED).as(originalType)
.named("element")
.named("list");
}

private <T> GroupBuilder<GroupBuilder<T>> addRepeatedMessage(FieldDescriptor descriptor, GroupBuilder<T> builder) {
GroupBuilder<GroupBuilder<GroupBuilder<GroupBuilder<T>>>> result =
builder
.group(Type.Repetition.OPTIONAL).as(OriginalType.LIST)
.group(Type.Repetition.REPEATED)
.group(Type.Repetition.OPTIONAL);

convertFields(result, descriptor.getMessageType().getFields());

return result.named("element").named("list");
}

private <T> GroupBuilder<GroupBuilder<T>> addMessageField(FieldDescriptor descriptor, final GroupBuilder<T> builder) {
if (descriptor.isMapField() && parquetSpecsCompliant) {
// the old schema style did not include the MAP wrapper around map groups
return addMapField(descriptor, builder);
}
if (descriptor.isRepeated() && parquetSpecsCompliant) {
// the old schema style did not include the LIST wrapper around repeated messages
return addRepeatedMessage(descriptor, builder);
}

// Plain message
GroupBuilder<GroupBuilder<T>> group = builder.group(getRepetition(descriptor));
convertFields(group, descriptor.getMessageType().getFields());
return group;
}

private <T> GroupBuilder<GroupBuilder<T>> addMapField(FieldDescriptor descriptor, final GroupBuilder<T> builder) {
List<FieldDescriptor> fields = descriptor.getMessageType().getFields();
if (fields.size() != 2) {
throw new UnsupportedOperationException("Expected two fields for the map (key/value), but got: " + fields);
}

ParquetType mapKeyParquetType = getParquetType(fields.get(0));

GroupBuilder<GroupBuilder<GroupBuilder<T>>> group = builder
.group(Type.Repetition.OPTIONAL).as(OriginalType.MAP) // only optional maps are allowed in Proto3
.group(Type.Repetition.REPEATED) // key_value wrapper
.primitive(mapKeyParquetType.primitiveType, Type.Repetition.REQUIRED).as(mapKeyParquetType.originalType).named("key");

return addField(fields.get(1), group).named("value")
.named("key_value");
}

private ParquetType getParquetType(FieldDescriptor fieldDescriptor) {

JavaType javaType = fieldDescriptor.getJavaType();
switch (javaType) {
case BOOLEAN: return builder.primitive(BOOLEAN, repetition);
case INT: return builder.primitive(INT32, repetition);
case LONG: return builder.primitive(INT64, repetition);
case FLOAT: return builder.primitive(FLOAT, repetition);
case DOUBLE: return builder.primitive(DOUBLE, repetition);
case BYTE_STRING: return builder.primitive(BINARY, repetition);
case STRING: return builder.primitive(BINARY, repetition).as(UTF8);
case MESSAGE: {
GroupBuilder<GroupBuilder<T>> group = builder.group(repetition);
convertFields(group, descriptor.getMessageType().getFields());
return group;
}
case ENUM: return builder.primitive(BINARY, repetition).as(ENUM);
case INT: return ParquetType.of(INT32);
case LONG: return ParquetType.of(INT64);
case DOUBLE: return ParquetType.of(DOUBLE);
case BOOLEAN: return ParquetType.of(BOOLEAN);
case FLOAT: return ParquetType.of(FLOAT);
case STRING: return ParquetType.of(BINARY, UTF8);
case ENUM: return ParquetType.of(BINARY, ENUM);
case BYTE_STRING: return ParquetType.of(BINARY);
default:
throw new UnsupportedOperationException("Cannot convert Protocol Buffer: unknown type " + javaType);
}
}

private static class ParquetType {
PrimitiveTypeName primitiveType;
OriginalType originalType;

private ParquetType(PrimitiveTypeName primitiveType, OriginalType originalType) {
this.primitiveType = primitiveType;
this.originalType = originalType;
}

public static ParquetType of(PrimitiveTypeName primitiveType, OriginalType originalType) {
return new ParquetType(primitiveType, originalType);
}

public static ParquetType of(PrimitiveTypeName primitiveType) {
return of(primitiveType, null);
}
}

}
Loading