Skip to content

Commit

Permalink
Write default Protobuf values in Parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
andredasilvapinto authored and André Pinto committed Feb 14, 2018
1 parent a8bd704 commit 9a4c016
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 126 deletions.
8 changes: 7 additions & 1 deletion parquet-protobuf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-tools</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
Expand Down Expand Up @@ -159,7 +165,7 @@
<mkdir dir="${project.build.directory}/generated-test-sources/java" />
<exec failonerror="true" executable="${protoc.path}">
<arg value="--java_out=${project.build.directory}/generated-test-sources/java" />
<arg value="src/test/resources/TestProtobuf.proto" />
<arg value="src/test/resources/TestProto2.proto" />
<arg value="src/test/resources/TestProto3.proto" />
<arg value="-I." />
</exec>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down Expand Up @@ -46,8 +46,8 @@ public static void setProtobufClass(Job job, Class<? extends Message> protoClass
ProtoWriteSupport.setSchema(ContextUtil.getConfiguration(job), protoClass);
}

public ProtoParquetOutputFormat(Class<? extends Message> msg) {
super(new ProtoWriteSupport(msg));
public ProtoParquetOutputFormat(Class<? extends Message> msg, final boolean includeDefaults) {
super(new ProtoWriteSupport(msg, includeDefaults));
}

public ProtoParquetOutputFormat() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down Expand Up @@ -42,9 +42,10 @@ public class ProtoParquetWriter<T extends MessageOrBuilder> extends ParquetWrite
* @throws IOException
*/
public ProtoParquetWriter(Path file, Class<? extends Message> protoMessage,
final boolean includeDefaults,
CompressionCodecName compressionCodecName, int blockSize,
int pageSize) throws IOException {
super(file, new ProtoWriteSupport(protoMessage),
super(file, new ProtoWriteSupport(protoMessage, includeDefaults),
compressionCodecName, blockSize, pageSize);
}

Expand All @@ -60,9 +61,10 @@ public ProtoParquetWriter(Path file, Class<? extends Message> protoMessage,
* @throws IOException
*/
public ProtoParquetWriter(Path file, Class<? extends Message> protoMessage,
final boolean includeDefaults,
CompressionCodecName compressionCodecName, int blockSize,
int pageSize, boolean enableDictionary, boolean validating) throws IOException {
super(file, new ProtoWriteSupport(protoMessage),
super(file, new ProtoWriteSupport(protoMessage, includeDefaults),
compressionCodecName, blockSize, pageSize, enableDictionary, validating);
}

Expand All @@ -74,7 +76,7 @@ public ProtoParquetWriter(Path file, Class<? extends Message> protoMessage,
* @throws IOException
*/
public ProtoParquetWriter(Path file, Class<? extends Message> protoMessage) throws IOException {
this(file, protoMessage, CompressionCodecName.UNCOMPRESSED,
this(file, protoMessage, true, CompressionCodecName.UNCOMPRESSED,
DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -21,6 +21,11 @@
import com.google.protobuf.*;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
import com.google.protobuf.Descriptors.OneofDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.TextFormat;
import com.twitter.elephantbird.util.Protobufs;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.BadConfigurationException;
Expand Down Expand Up @@ -48,15 +53,18 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
private static final Logger LOG = LoggerFactory.getLogger(ProtoWriteSupport.class);
public static final String PB_CLASS_WRITE = "parquet.proto.writeClass";

private final boolean includeDefaultValues;
private RecordConsumer recordConsumer;
private Class<? extends Message> protoMessage;
private MessageWriter messageWriter;

public ProtoWriteSupport() {
this(null, true);
}

public ProtoWriteSupport(Class<? extends Message> protobufClass) {
public ProtoWriteSupport(Class<? extends Message> protobufClass, final boolean includeDefaultValues) {
this.protoMessage = protobufClass;
this.includeDefaultValues = includeDefaultValues;
}

@Override
Expand Down Expand Up @@ -136,11 +144,17 @@ void writeRawValue(Object value) {

}

boolean shouldWrite(Object value) {
return true;
}

/** Used for writing nonrepeated (optional, required) fields*/
void writeField(Object value) {
recordConsumer.startField(fieldName, index);
writeRawValue(value);
recordConsumer.endField(fieldName, index);
if (shouldWrite(value)) {
recordConsumer.startField(fieldName, index);
writeRawValue(value);
recordConsumer.endField(fieldName, index);
}
}
}

Expand Down Expand Up @@ -249,6 +263,49 @@ final void writeField(Object value) {
}

private void writeAllFields(MessageOrBuilder pb) {
if (includeDefaultValues) {
writeAllFieldsIncludingDefaults(pb);
} else {
writeAllDifferentFields(pb);
}
}

private void writeAllFieldsIncludingDefaults(final MessageOrBuilder pb) {
for (FieldDescriptor fieldDescriptor : pb.getDescriptorForType().getFields()) {
if (fieldDescriptor.getContainingOneof() != null) {
continue;
}
if (fieldDescriptor.isExtension()) {
// Field index of an extension field might overlap with a base field.
throw new UnsupportedOperationException("Cannot convert Protobuf message with extension field(s)");
}

// We only write message fields if they have a value in order to keep the transformation reversible
if (fieldDescriptor.getJavaType() != JavaType.MESSAGE
|| fieldDescriptor.isRepeated()
|| pb.hasField(fieldDescriptor)
) {
setField(pb, fieldDescriptor);
}
}

// We only write a one of field if it has been set in order to keep the transformation reversible
// (i.e. we don't automatically set default values for fields inside one ofs)
for (OneofDescriptor oneofDescriptor : pb.getDescriptorForType().getOneofs()) {
if (pb.hasOneof(oneofDescriptor)) {
final FieldDescriptor fieldDescriptor = pb.getOneofFieldDescriptor(oneofDescriptor);
setField(pb, fieldDescriptor);
}
}
}

private void setField(final MessageOrBuilder pb, final FieldDescriptor fieldDescriptor) {
int fieldIndex = fieldDescriptor.getIndex();
final Object fieldValue = pb.getField(fieldDescriptor);
fieldWriters[fieldIndex].writeField(fieldValue);
}

private void writeAllDifferentFields(final MessageOrBuilder pb) {
//returns changed fields with values. Map is ordered by id.
Map<FieldDescriptor, Object> changedPbFields = pb.getAllFields();

Expand Down Expand Up @@ -279,8 +336,18 @@ final void writeRawValue(Object value) {
throw new UnsupportedOperationException("Array has no raw value");
}

@Override
boolean shouldWrite(Object value) {
// Empty fields should be omitted
return !((List<?>) value).isEmpty();
}

@Override
final void writeField(Object value) {
if (!shouldWrite(value)) {
return;
}

recordConsumer.startField(fieldName, index);
recordConsumer.startGroup();
List<?> list = (List<?>) value;
Expand Down Expand Up @@ -366,6 +433,12 @@ public MapWriter(FieldWriter keyWriter, FieldWriter valueWriter) {
this.valueWriter = valueWriter;
}

@Override
boolean shouldWrite(Object value) {
// Empty fields should be omitted
return !((Collection<Message>) value).isEmpty();
}

@Override
final void writeRawValue(Object value) {
recordConsumer.startGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import com.google.protobuf.Message;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.proto.test.TestProto3;
import org.apache.parquet.proto.test.TestProtobuf;
import org.apache.parquet.proto.test.TestProtobuf.FirstCustomClassMessage;
import org.apache.parquet.proto.test.TestProtobuf.SecondCustomClassMessage;
import org.apache.parquet.proto.test.TestProto2;
import org.apache.parquet.proto.utils.ReadUsingMR;
import org.apache.parquet.proto.utils.WriteUsingMR;
import org.junit.Test;
Expand All @@ -40,10 +38,10 @@ public class ProtoInputOutputFormatTest {
* second job and compares input and output.
*/
@Test
public void testInputOutput() throws Exception {
TestProtobuf.IOFormatMessage input;
public void testProto2InputOutput() throws Exception {
TestProto2.IOFormatMessage input;
{
TestProtobuf.IOFormatMessage.Builder msg = TestProtobuf.IOFormatMessage.newBuilder();
TestProto2.IOFormatMessage.Builder msg = TestProto2.IOFormatMessage.newBuilder();
msg.setOptionalDouble(666);
msg.addRepeatedString("Msg1");
msg.addRepeatedString("Msg2");
Expand All @@ -54,7 +52,7 @@ public void testInputOutput() throws Exception {
List<Message> result = runMRJobs(input);

assertEquals(1, result.size());
TestProtobuf.IOFormatMessage output = (TestProtobuf.IOFormatMessage) result.get(0);
TestProto2.IOFormatMessage output = (TestProto2.IOFormatMessage) result.get(0);

assertEquals(666, output.getOptionalDouble(), 0.00001);
assertEquals(323, output.getMsg().getSomeId());
Expand Down Expand Up @@ -95,9 +93,9 @@ public void testProto3InputOutput() throws Exception {
* Only requested data should be read.
* */
@Test
public void testProjection() throws Exception {
public void testProto2Projection() throws Exception {

TestProtobuf.Document.Builder writtenDocument = TestProtobuf.Document.newBuilder();
TestProto2.Document.Builder writtenDocument = TestProto2.Document.newBuilder();
writtenDocument.setDocId(12345);
writtenDocument.addNameBuilder().setUrl("http://goout.cz/");

Expand All @@ -109,7 +107,7 @@ public void testProjection() throws Exception {
String projection = "message Document {required int64 DocId; }";
reader.setRequestedProjection(projection);
List<Message> output = reader.read(outputPath);
TestProtobuf.Document readDocument = (TestProtobuf.Document) output.get(0);
TestProto2.Document readDocument = (TestProto2.Document) output.get(0);


//test that only requested fields were deserialized
Expand Down Expand Up @@ -146,26 +144,26 @@ public void testProto3Projection() throws Exception {
* It should replace class specified in header.
* */
@Test
public void testCustomProtoClass() throws Exception {
FirstCustomClassMessage.Builder inputMessage;
inputMessage = FirstCustomClassMessage.newBuilder();
public void testProto2CustomProtoClass() throws Exception {
TestProto2.FirstCustomClassMessage.Builder inputMessage;
inputMessage = TestProto2.FirstCustomClassMessage.newBuilder();
inputMessage.setString("writtenString");

Path outputPath = new WriteUsingMR().write(new Message[]{inputMessage.build()});
ReadUsingMR readUsingMR = new ReadUsingMR();
String customClass = SecondCustomClassMessage.class.getName();
String customClass = TestProto2.SecondCustomClassMessage.class.getName();
ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass);
List<Message> result = readUsingMR.read(outputPath);

assertEquals(1, result.size());
Message msg = result.get(0);
assertFalse("Class from header returned.",
msg instanceof FirstCustomClassMessage);
msg instanceof TestProto2.FirstCustomClassMessage);
assertTrue("Custom class was not used",
msg instanceof SecondCustomClassMessage);
msg instanceof TestProto2.SecondCustomClassMessage);

String stringValue;
stringValue = ((SecondCustomClassMessage) msg).getString();
stringValue = ((TestProto2.SecondCustomClassMessage) msg).getString();
assertEquals("writtenString", stringValue);
}

Expand Down
Loading

0 comments on commit 9a4c016

Please sign in to comment.