Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-968 Add Hive/Presto support in ProtoParquet #411

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import com.twitter.elephantbird.util.Protobufs;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.io.InvalidRecordException;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.IncompatibleSchemaModificationException;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.Type;

import java.util.HashMap;
Expand Down Expand Up @@ -129,10 +131,14 @@ public void add(Object value) {
};
}

return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType);
OriginalType originalType = parquetType.getOriginalType() == null ? OriginalType.UTF8 : parquetType.getOriginalType();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why there is condition ? When will be the original type null ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess for the data generated in previous version of parquet-protobuf, it is not having the "OriginalType" annotation for repeated fields, thus this conditional test to be backward compatible.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the reason for this is that if originalType is null, the swicth will throw an exception.
In costimuraru#2 I replaced the switch with an if.

switch (originalType) {
case LIST: return new ListConverter(parentBuilder, fieldDescriptor, parquetType);
case MAP: return new MapConverter(parentBuilder, fieldDescriptor, parquetType);
default: return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType);
}
}


private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {

JavaType javaType = fieldDescriptor.getJavaType();
Expand Down Expand Up @@ -345,4 +351,121 @@ public void addBinary(Binary binary) {
}

}

/**
* This class unwraps the additional LIST wrapper and makes it possible to read the underlying data and then convert
* it to protobuf.
* <p>
* Consider the following protobuf schema:
* message SimpleList {
* repeated int64 first_array = 1;
* }
* <p>
* A LIST wrapper is created in parquet for the above mentioned protobuf schema:
* message SimpleList {
* required group first_array (LIST) = 1 {
* repeated int32 element;
* }
* }
* <p>
* The LIST wrappers are used by 3rd party tools, such as Hive, to read parquet arrays. The wrapper contains
* one only one field: either a primitive field (like in the example above, where we have an array of ints) or
* another group (array of messages).
*/
final class ListConverter extends GroupConverter {
private final Converter converter;
private final boolean listOfMessage;

public ListConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
OriginalType originalType = parquetType.getOriginalType();
if (originalType != OriginalType.LIST) {
throw new ParquetDecodingException("Expected LIST wrapper. Found: " + originalType + " instead.");
}

listOfMessage = fieldDescriptor.getJavaType() == JavaType.MESSAGE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is unused.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in costimuraru#2


Type parquetSchema;
if (parquetType.asGroupType().containsField("list")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIPS: We could extract local variable with explaining name.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in costimuraru#2

parquetSchema = parquetType.asGroupType().getType("list");
if (parquetSchema.asGroupType().containsField("element")) {
parquetSchema = parquetSchema.asGroupType().getType("element");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc for class ListConverter is not correct

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in costimuraru#2

}
} else {
throw new ParquetDecodingException("Expected list but got: " + parquetType);
}

converter = newMessageConverter(parentBuilder, fieldDescriptor, parquetSchema);
}

@Override
public Converter getConverter(int fieldIndex) {
if (fieldIndex > 0) {
throw new ParquetDecodingException("Unexpected multiple fields in the LIST wrapper");
}

return new GroupConverter() {
@Override
public Converter getConverter(int fieldIndex) {
return converter;
}

@Override
public void start() {

}

@Override
public void end() {

}
};
}

@Override
public void start() {

}

@Override
public void end() {

}
}


final class MapConverter extends GroupConverter {
private final Converter converter;

public MapConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
OriginalType originalType = parquetType.getOriginalType();
if (originalType != OriginalType.MAP) {
throw new ParquetDecodingException("Expected MAP wrapper. Found: " + originalType + " instead.");
}

Type parquetSchema;
if (parquetType.asGroupType().containsField("key_value")){
parquetSchema = parquetType.asGroupType().getType("key_value");
} else {
throw new ParquetDecodingException("Expected map but got: " + parquetType);
}

converter = newMessageConverter(parentBuilder, fieldDescriptor, parquetSchema);
}

@Override
public Converter getConverter(int fieldIndex) {
if (fieldIndex > 0) {
throw new ParquetDecodingException("Unexpected multiple fields in the MAP wrapper");
}
return converter;
}

@Override
public void start() {
}

@Override
public void end() {
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -18,30 +18,27 @@
*/
package org.apache.parquet.proto;

import static org.apache.parquet.schema.OriginalType.ENUM;
import static org.apache.parquet.schema.OriginalType.UTF8;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;

import java.util.List;

import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
import com.google.protobuf.Message;
import com.twitter.elephantbird.util.Protobufs;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.apache.parquet.schema.Types.Builder;
import org.apache.parquet.schema.Types.GroupBuilder;

import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
import com.google.protobuf.Message;
import com.twitter.elephantbird.util.Protobufs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

import static org.apache.parquet.schema.OriginalType.ENUM;
import static org.apache.parquet.schema.OriginalType.UTF8;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;

/**
* <p/>
* Converts a Protocol Buffer Descriptor into a Parquet schema.
Expand All @@ -63,8 +60,8 @@ public MessageType convert(Class<? extends Message> protobufClass) {
}

/* Iterates over list of fields. **/
private <T> GroupBuilder<T> convertFields(GroupBuilder<T> groupBuilder, List<Descriptors.FieldDescriptor> fieldDescriptors) {
for (Descriptors.FieldDescriptor fieldDescriptor : fieldDescriptors) {
private <T> GroupBuilder<T> convertFields(GroupBuilder<T> groupBuilder, List<FieldDescriptor> fieldDescriptors) {
for (FieldDescriptor fieldDescriptor : fieldDescriptors) {
groupBuilder =
addField(fieldDescriptor, groupBuilder)
.id(fieldDescriptor.getNumber())
Expand All @@ -73,7 +70,7 @@ private <T> GroupBuilder<T> convertFields(GroupBuilder<T> groupBuilder, List<Des
return groupBuilder;
}

private Type.Repetition getRepetition(Descriptors.FieldDescriptor descriptor) {
private Type.Repetition getRepetition(FieldDescriptor descriptor) {
if (descriptor.isRequired()) {
return Type.Repetition.REQUIRED;
} else if (descriptor.isRepeated()) {
Expand All @@ -83,26 +80,106 @@ private Type.Repetition getRepetition(Descriptors.FieldDescriptor descriptor) {
}
}

private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addField(Descriptors.FieldDescriptor descriptor, GroupBuilder<T> builder) {
Type.Repetition repetition = getRepetition(descriptor);
JavaType javaType = descriptor.getJavaType();
private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addField(FieldDescriptor descriptor, final GroupBuilder<T> builder) {
if (descriptor.getJavaType() == JavaType.MESSAGE) {
return addMessageField(descriptor, builder);
}

ParquetType parquetType = getParquetType(descriptor);
if (descriptor.isRepeated()) {
return addRepeatedPrimitive(descriptor, parquetType.primitiveType, parquetType.originalType, builder);
}

return builder.primitive(parquetType.primitiveType, getRepetition(descriptor)).as(parquetType.originalType);
}

private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addRepeatedPrimitive(FieldDescriptor descriptor,
PrimitiveTypeName primitiveType,
OriginalType originalType,
final GroupBuilder<T> builder) {
return builder
.group(Type.Repetition.REQUIRED).as(OriginalType.LIST)
.group(Type.Repetition.REPEATED)
.primitive(primitiveType, Type.Repetition.REQUIRED).as(originalType)
.named("element")
.named("list");
}

private <T> GroupBuilder<GroupBuilder<T>> addRepeatedMessage(FieldDescriptor descriptor, GroupBuilder<T> builder) {
GroupBuilder<GroupBuilder<GroupBuilder<GroupBuilder<T>>>> result =
builder
.group(Type.Repetition.REQUIRED).as(OriginalType.LIST)
.group(Type.Repetition.REPEATED)
.group(Type.Repetition.OPTIONAL);

convertFields(result, descriptor.getMessageType().getFields());

return result.named("element").named("list");
}

private <T> GroupBuilder<GroupBuilder<T>> addMessageField(FieldDescriptor descriptor, final GroupBuilder<T> builder) {
if (descriptor.isMapField()) {
return addMapField(descriptor, builder);
} else if (descriptor.isRepeated()) {
return addRepeatedMessage(descriptor, builder);
}

// Plain message
GroupBuilder<GroupBuilder<T>> group = builder.group(getRepetition(descriptor));
convertFields(group, descriptor.getMessageType().getFields());
return group;
}

private <T> GroupBuilder<GroupBuilder<T>> addMapField(FieldDescriptor descriptor, final GroupBuilder<T> builder) {
List<FieldDescriptor> fields = descriptor.getMessageType().getFields();
if (fields.size() != 2) {
throw new UnsupportedOperationException("Expected two fields for the map (key/value), but got: " + fields);
}

ParquetType mapKeyParquetType = getParquetType(fields.get(0));

GroupBuilder<GroupBuilder<GroupBuilder<T>>> group = builder
.group(Type.Repetition.OPTIONAL).as(OriginalType.MAP) // only optional maps are allowed in Proto3
.group(Type.Repetition.REPEATED) // key_value wrapper
.primitive(mapKeyParquetType.primitiveType, Type.Repetition.REQUIRED).as(mapKeyParquetType.originalType).named("key");

return addField(fields.get(1), group).named("value")
.named("key_value");
}

private ParquetType getParquetType(FieldDescriptor fieldDescriptor) {

JavaType javaType = fieldDescriptor.getJavaType();
switch (javaType) {
case BOOLEAN: return builder.primitive(BOOLEAN, repetition);
case INT: return builder.primitive(INT32, repetition);
case LONG: return builder.primitive(INT64, repetition);
case FLOAT: return builder.primitive(FLOAT, repetition);
case DOUBLE: return builder.primitive(DOUBLE, repetition);
case BYTE_STRING: return builder.primitive(BINARY, repetition);
case STRING: return builder.primitive(BINARY, repetition).as(UTF8);
case MESSAGE: {
GroupBuilder<GroupBuilder<T>> group = builder.group(repetition);
convertFields(group, descriptor.getMessageType().getFields());
return group;
}
case ENUM: return builder.primitive(BINARY, repetition).as(ENUM);
case INT: return ParquetType.of(INT32);
case LONG: return ParquetType.of(INT64);
case DOUBLE: return ParquetType.of(DOUBLE);
case BOOLEAN: return ParquetType.of(BOOLEAN);
case FLOAT: return ParquetType.of(FLOAT);
case STRING: return ParquetType.of(BINARY, UTF8);
case ENUM: return ParquetType.of(BINARY, ENUM);
case BYTE_STRING: return ParquetType.of(BINARY);
default:
throw new UnsupportedOperationException("Cannot convert Protocol Buffer: unknown type " + javaType);
}
}

private static class ParquetType {
PrimitiveTypeName primitiveType;
OriginalType originalType;

private ParquetType(PrimitiveTypeName primitiveType, OriginalType originalType) {
this.primitiveType = primitiveType;
this.originalType = originalType;
}

public static ParquetType of(PrimitiveTypeName primitiveType, OriginalType originalType) {
return new ParquetType(primitiveType, originalType);
}

public static ParquetType of(PrimitiveTypeName primitiveType) {
return of(primitiveType, null);
}
}

}
Loading