diff --git a/parquet-protobuf/pom.xml b/parquet-protobuf/pom.xml index 979b4369d3..350aaed382 100644 --- a/parquet-protobuf/pom.xml +++ b/parquet-protobuf/pom.xml @@ -78,6 +78,12 @@ ${hadoop.version} provided + + org.apache.parquet + parquet-tools + ${project.version} + test + org.slf4j slf4j-simple @@ -159,7 +165,7 @@ - + diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetOutputFormat.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetOutputFormat.java index 75e3ad855e..768e403cd9 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetOutputFormat.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetOutputFormat.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -46,8 +46,8 @@ public static void setProtobufClass(Job job, Class protoClass ProtoWriteSupport.setSchema(ContextUtil.getConfiguration(job), protoClass); } - public ProtoParquetOutputFormat(Class msg) { - super(new ProtoWriteSupport(msg)); + public ProtoParquetOutputFormat(Class msg, final boolean includeDefaults) { + super(new ProtoWriteSupport(msg, includeDefaults)); } public ProtoParquetOutputFormat() { diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java index 1af8a9ad25..2ab2071ab1 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -42,9 +42,10 @@ public class ProtoParquetWriter extends ParquetWrite * @throws IOException */ public ProtoParquetWriter(Path file, Class protoMessage, + final boolean includeDefaults, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException { - super(file, new ProtoWriteSupport(protoMessage), + super(file, new ProtoWriteSupport(protoMessage, includeDefaults), compressionCodecName, blockSize, pageSize); } @@ -60,9 +61,10 @@ public ProtoParquetWriter(Path file, Class protoMessage, * @throws IOException */ public ProtoParquetWriter(Path file, Class protoMessage, + final boolean includeDefaults, CompressionCodecName compressionCodecName, int blockSize, int pageSize, boolean enableDictionary, boolean validating) throws IOException { - super(file, new ProtoWriteSupport(protoMessage), + super(file, new ProtoWriteSupport(protoMessage, includeDefaults), compressionCodecName, blockSize, pageSize, enableDictionary, validating); } @@ -74,7 +76,7 @@ public ProtoParquetWriter(Path file, Class protoMessage, * @throws IOException */ public ProtoParquetWriter(Path file, Class protoMessage) throws IOException { - this(file, protoMessage, CompressionCodecName.UNCOMPRESSED, + this(file, protoMessage, true, CompressionCodecName.UNCOMPRESSED, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE); } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java index bb75e71748..d6afac709c 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,6 +21,11 @@ import com.google.protobuf.*; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.FieldDescriptor; +import com.google.protobuf.Descriptors.FieldDescriptor.JavaType; +import com.google.protobuf.Descriptors.OneofDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.MessageOrBuilder; +import com.google.protobuf.TextFormat; import com.twitter.elephantbird.util.Protobufs; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.hadoop.BadConfigurationException; @@ -48,15 +53,18 @@ public class ProtoWriteSupport extends WriteSupport< private static final Logger LOG = LoggerFactory.getLogger(ProtoWriteSupport.class); public static final String PB_CLASS_WRITE = "parquet.proto.writeClass"; + private final boolean includeDefaultValues; private RecordConsumer recordConsumer; private Class protoMessage; private MessageWriter messageWriter; public ProtoWriteSupport() { + this(null, true); } - public ProtoWriteSupport(Class protobufClass) { + public ProtoWriteSupport(Class protobufClass, final boolean includeDefaultValues) { this.protoMessage = protobufClass; + this.includeDefaultValues = includeDefaultValues; } @Override @@ -136,11 +144,17 @@ void writeRawValue(Object value) { } + boolean shouldWrite(Object value) { + return true; + } + /** Used for writing nonrepeated (optional, required) fields*/ void writeField(Object value) { - recordConsumer.startField(fieldName, index); - writeRawValue(value); - recordConsumer.endField(fieldName, index); + if (shouldWrite(value)) { + recordConsumer.startField(fieldName, index); + writeRawValue(value); + recordConsumer.endField(fieldName, index); + } } } @@ -249,6 +263,49 @@ final void writeField(Object value) { } private void writeAllFields(MessageOrBuilder pb) { + if (includeDefaultValues) { + writeAllFieldsIncludingDefaults(pb); + } else { + writeAllDifferentFields(pb); + } + } + + private void writeAllFieldsIncludingDefaults(final MessageOrBuilder pb) { + for (FieldDescriptor fieldDescriptor : pb.getDescriptorForType().getFields()) { + if (fieldDescriptor.getContainingOneof() != null) { + continue; + } + if (fieldDescriptor.isExtension()) { + // Field index of an extension field might overlap with a base field. + throw new UnsupportedOperationException("Cannot convert Protobuf message with extension field(s)"); + } + + // We only write message fields if they have a value in order to keep the transformation reversible + if (fieldDescriptor.getJavaType() != JavaType.MESSAGE + || fieldDescriptor.isRepeated() + || pb.hasField(fieldDescriptor) + ) { + setField(pb, fieldDescriptor); + } + } + + // We only write a one of field if it has been set in order to keep the transformation reversible + // (i.e. we don't automatically set default values for fields inside one ofs) + for (OneofDescriptor oneofDescriptor : pb.getDescriptorForType().getOneofs()) { + if (pb.hasOneof(oneofDescriptor)) { + final FieldDescriptor fieldDescriptor = pb.getOneofFieldDescriptor(oneofDescriptor); + setField(pb, fieldDescriptor); + } + } + } + + private void setField(final MessageOrBuilder pb, final FieldDescriptor fieldDescriptor) { + int fieldIndex = fieldDescriptor.getIndex(); + final Object fieldValue = pb.getField(fieldDescriptor); + fieldWriters[fieldIndex].writeField(fieldValue); + } + + private void writeAllDifferentFields(final MessageOrBuilder pb) { //returns changed fields with values. Map is ordered by id. Map changedPbFields = pb.getAllFields(); @@ -279,8 +336,18 @@ final void writeRawValue(Object value) { throw new UnsupportedOperationException("Array has no raw value"); } + @Override + boolean shouldWrite(Object value) { + // Empty fields should be omitted + return !((List) value).isEmpty(); + } + @Override final void writeField(Object value) { + if (!shouldWrite(value)) { + return; + } + recordConsumer.startField(fieldName, index); recordConsumer.startGroup(); List list = (List) value; @@ -366,6 +433,12 @@ public MapWriter(FieldWriter keyWriter, FieldWriter valueWriter) { this.valueWriter = valueWriter; } + @Override + boolean shouldWrite(Object value) { + // Empty fields should be omitted + return !((Collection) value).isEmpty(); + } + @Override final void writeRawValue(Object value) { recordConsumer.startGroup(); diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java index 6c01d7b8db..d94a40b434 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java @@ -21,9 +21,7 @@ import com.google.protobuf.Message; import org.apache.hadoop.fs.Path; import org.apache.parquet.proto.test.TestProto3; -import org.apache.parquet.proto.test.TestProtobuf; -import org.apache.parquet.proto.test.TestProtobuf.FirstCustomClassMessage; -import org.apache.parquet.proto.test.TestProtobuf.SecondCustomClassMessage; +import org.apache.parquet.proto.test.TestProto2; import org.apache.parquet.proto.utils.ReadUsingMR; import org.apache.parquet.proto.utils.WriteUsingMR; import org.junit.Test; @@ -40,10 +38,10 @@ public class ProtoInputOutputFormatTest { * second job and compares input and output. */ @Test - public void testInputOutput() throws Exception { - TestProtobuf.IOFormatMessage input; + public void testProto2InputOutput() throws Exception { + TestProto2.IOFormatMessage input; { - TestProtobuf.IOFormatMessage.Builder msg = TestProtobuf.IOFormatMessage.newBuilder(); + TestProto2.IOFormatMessage.Builder msg = TestProto2.IOFormatMessage.newBuilder(); msg.setOptionalDouble(666); msg.addRepeatedString("Msg1"); msg.addRepeatedString("Msg2"); @@ -54,7 +52,7 @@ public void testInputOutput() throws Exception { List result = runMRJobs(input); assertEquals(1, result.size()); - TestProtobuf.IOFormatMessage output = (TestProtobuf.IOFormatMessage) result.get(0); + TestProto2.IOFormatMessage output = (TestProto2.IOFormatMessage) result.get(0); assertEquals(666, output.getOptionalDouble(), 0.00001); assertEquals(323, output.getMsg().getSomeId()); @@ -95,9 +93,9 @@ public void testProto3InputOutput() throws Exception { * Only requested data should be read. * */ @Test - public void testProjection() throws Exception { + public void testProto2Projection() throws Exception { - TestProtobuf.Document.Builder writtenDocument = TestProtobuf.Document.newBuilder(); + TestProto2.Document.Builder writtenDocument = TestProto2.Document.newBuilder(); writtenDocument.setDocId(12345); writtenDocument.addNameBuilder().setUrl("http://goout.cz/"); @@ -109,7 +107,7 @@ public void testProjection() throws Exception { String projection = "message Document {required int64 DocId; }"; reader.setRequestedProjection(projection); List output = reader.read(outputPath); - TestProtobuf.Document readDocument = (TestProtobuf.Document) output.get(0); + TestProto2.Document readDocument = (TestProto2.Document) output.get(0); //test that only requested fields were deserialized @@ -146,26 +144,26 @@ public void testProto3Projection() throws Exception { * It should replace class specified in header. * */ @Test - public void testCustomProtoClass() throws Exception { - FirstCustomClassMessage.Builder inputMessage; - inputMessage = FirstCustomClassMessage.newBuilder(); + public void testProto2CustomProtoClass() throws Exception { + TestProto2.FirstCustomClassMessage.Builder inputMessage; + inputMessage = TestProto2.FirstCustomClassMessage.newBuilder(); inputMessage.setString("writtenString"); Path outputPath = new WriteUsingMR().write(new Message[]{inputMessage.build()}); ReadUsingMR readUsingMR = new ReadUsingMR(); - String customClass = SecondCustomClassMessage.class.getName(); + String customClass = TestProto2.SecondCustomClassMessage.class.getName(); ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); List result = readUsingMR.read(outputPath); assertEquals(1, result.size()); Message msg = result.get(0); assertFalse("Class from header returned.", - msg instanceof FirstCustomClassMessage); + msg instanceof TestProto2.FirstCustomClassMessage); assertTrue("Custom class was not used", - msg instanceof SecondCustomClassMessage); + msg instanceof TestProto2.SecondCustomClassMessage); String stringValue; - stringValue = ((SecondCustomClassMessage) msg).getString(); + stringValue = ((TestProto2.SecondCustomClassMessage) msg).getString(); assertEquals("writtenString", stringValue); } diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoRecordConverterTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoRecordConverterTest.java index e042f96821..a78621f32a 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoRecordConverterTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoRecordConverterTest.java @@ -19,30 +19,33 @@ package org.apache.parquet.proto; import com.google.protobuf.ByteString; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.tools.read.SimpleRecord; +import org.apache.parquet.tools.read.SimpleRecord.NameValue; import org.junit.Test; import org.apache.parquet.proto.test.TestProto3; -import org.apache.parquet.proto.test.TestProtobuf; +import org.apache.parquet.proto.test.TestProto2; import java.util.List; +import static org.apache.parquet.proto.TestUtils.writeAndReadParquet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.apache.parquet.proto.TestUtils.testData; -import static org.apache.parquet.proto.test.TestProtobuf.SchemaConverterAllDatatypes; public class ProtoRecordConverterTest { @Test - public void testAllTypes() throws Exception { - SchemaConverterAllDatatypes.Builder data; - data = SchemaConverterAllDatatypes.newBuilder(); + public void testProto2AllTypes() throws Exception { + TestProto2.SchemaConverterAllDatatypes.Builder data; + data = TestProto2.SchemaConverterAllDatatypes.newBuilder(); data.setOptionalBool(true); data.setOptionalBytes(ByteString.copyFrom("someText", "UTF-8")); data.setOptionalDouble(0.577); data.setOptionalFloat(3.1415f); - data.setOptionalEnum(SchemaConverterAllDatatypes.TestEnum.FIRST); + data.setOptionalEnum(TestProto2.SchemaConverterAllDatatypes.TestEnum.FIRST); data.setOptionalFixed32(1000 * 1000 * 1); data.setOptionalFixed64(1000 * 1000 * 1000 * 2); data.setOptionalInt32(1000 * 1000 * 3); @@ -57,21 +60,21 @@ public void testAllTypes() throws Exception { data.getOptionalMessageBuilder().setSomeId(1984); data.getPbGroupBuilder().setGroupInt(1492); - SchemaConverterAllDatatypes dataBuilt = data.build(); + TestProto2.SchemaConverterAllDatatypes dataBuilt = data.build(); data.clear(); - List result; - result = testData(dataBuilt); + List result; + result = testData(false, dataBuilt); //data are fully checked in testData function. Lets do one more check. - SchemaConverterAllDatatypes o = result.get(0); + TestProto2.SchemaConverterAllDatatypes o = result.get(0); assertEquals("Good Will Hunting", o.getOptionalString()); assertEquals(true, o.getOptionalBool()); assertEquals(ByteString.copyFrom("someText", "UTF-8"), o.getOptionalBytes()); assertEquals(0.577, o.getOptionalDouble(), 0.00001); assertEquals(3.1415f, o.getOptionalFloat(), 0.00001); - assertEquals(SchemaConverterAllDatatypes.TestEnum.FIRST, o.getOptionalEnum()); + assertEquals(TestProto2.SchemaConverterAllDatatypes.TestEnum.FIRST, o.getOptionalEnum()); assertEquals(1000 * 1000 * 1, o.getOptionalFixed32()); assertEquals(1000 * 1000 * 1000 * 2, o.getOptionalFixed64()); assertEquals(1000 * 1000 * 3, o.getOptionalInt32()); @@ -108,12 +111,13 @@ public void testProto3AllTypes() throws Exception { data.setOptionalUInt32(1000 * 1000 * 8); data.setOptionalUInt64(1000L * 1000 * 1000 * 9); data.getOptionalMessageBuilder().setSomeId(1984); + data.setEmptyString(""); TestProto3.SchemaConverterAllDatatypes dataBuilt = data.build(); data.clear(); List result; - result = testData(dataBuilt); + result = testData(true, dataBuilt); //data are fully checked in testData function. Lets do one more check. TestProto3.SchemaConverterAllDatatypes o = result.get(0); @@ -138,18 +142,18 @@ public void testProto3AllTypes() throws Exception { } @Test - public void testAllTypesMultiple() throws Exception { + public void testProto2AllTypesMultiple() throws Exception { int count = 100; - SchemaConverterAllDatatypes[] input = new SchemaConverterAllDatatypes[count]; + TestProto2.SchemaConverterAllDatatypes[] input = new TestProto2.SchemaConverterAllDatatypes[count]; for (int i = 0; i < count; i++) { - SchemaConverterAllDatatypes.Builder d = SchemaConverterAllDatatypes.newBuilder(); + TestProto2.SchemaConverterAllDatatypes.Builder d = TestProto2.SchemaConverterAllDatatypes.newBuilder(); if (i % 2 != 0) d.setOptionalBool(true); if (i % 3 != 0) d.setOptionalBytes(ByteString.copyFrom("someText " + i, "UTF-8")); if (i % 4 != 0) d.setOptionalDouble(0.577 * i); if (i % 5 != 0) d.setOptionalFloat(3.1415f * i); - if (i % 6 != 0) d.setOptionalEnum(SchemaConverterAllDatatypes.TestEnum.FIRST); + if (i % 6 != 0) d.setOptionalEnum(TestProto2.SchemaConverterAllDatatypes.TestEnum.FIRST); if (i % 7 != 0) d.setOptionalFixed32(1000 * i * 1); if (i % 8 != 0) d.setOptionalFixed64(1000 * i * 1000 * 2); if (i % 9 != 0) d.setOptionalInt32(1000 * i * 3); @@ -166,8 +170,8 @@ public void testAllTypesMultiple() throws Exception { input[i] = d.build(); } - List result; - result = testData(input); + List result; + result = testData(false, input); //data are fully checked in testData function. Lets do one more check. assertEquals("Good Will Hunting 0", result.get(0).getOptionalString()); @@ -203,7 +207,7 @@ public void testProto3AllTypesMultiple() throws Exception { } List result; - result = testData(input); + result = testData(true, input); //data are fully checked in testData function. Lets do one more check. assertEquals("Good Will Hunting 0", result.get(0).getOptionalString()); @@ -211,12 +215,12 @@ public void testProto3AllTypesMultiple() throws Exception { } @Test - public void testDefaults() throws Exception { - SchemaConverterAllDatatypes.Builder data; - data = SchemaConverterAllDatatypes.newBuilder(); + public void testProto2Defaults() throws Exception { + TestProto2.SchemaConverterAllDatatypes.Builder data; + data = TestProto2.SchemaConverterAllDatatypes.newBuilder(); - List result = testData(data.build()); - SchemaConverterAllDatatypes message = result.get(0); + List result = testData(false, data.build()); + TestProto2.SchemaConverterAllDatatypes message = result.get(0); assertEquals("", message.getOptionalString()); assertEquals(false, message.getOptionalBool()); assertEquals(0, message.getOptionalFixed32()); @@ -224,30 +228,43 @@ public void testDefaults() throws Exception { @Test public void testProto3Defaults() throws Exception { - TestProto3.SchemaConverterAllDatatypes.Builder data; - data = TestProto3.SchemaConverterAllDatatypes.newBuilder(); + TestProto3.SchemaConverterAllDatatypes data = TestProto3.SchemaConverterAllDatatypes.newBuilder() + .setOptionalEnum(TestProto3.SchemaConverterAllDatatypes.TestEnum.FIRST) + .setOptionalFixed32(12).build(); + + List res = writeAndReadParquet(true, TestProto3.SchemaConverterAllDatatypes.class, data); + SimpleRecord msg = res.get(0); + final byte[] optionalEnumBinary2 = (byte[]) getValue(msg, "optionalEnum"); + final String optionalEnumValue2 = Binary.fromConstantByteArray(optionalEnumBinary2).toStringUsingUTF8(); + assertEquals(12, getValue(msg, "optionalFixed32")); + assertEquals(TestProto3.SchemaConverterAllDatatypes.TestEnum.FIRST.toString(), optionalEnumValue2); + assertEquals("", getValue(msg, "optionalString")); + assertEquals(false, getValue(msg, "optionalBool")); + } - List result = testData(data.build()); - TestProto3.SchemaConverterAllDatatypes message = result.get(0); - assertEquals("", message.getOptionalString()); - assertEquals(false, message.getOptionalBool()); - assertEquals(0, message.getOptionalFixed32()); + private static Object getValue(final SimpleRecord msg, final String name) { + for (NameValue value : msg.getValues()) { + if (value.getName().equals(name)) { + return value.getValue(); + } + } + return null; } @Test - public void testRepeatedMessages() throws Exception { - TestProtobuf.TopMessage.Builder top = TestProtobuf.TopMessage.newBuilder(); + public void testProto2RepeatedMessages() throws Exception { + TestProto2.TopMessage.Builder top = TestProto2.TopMessage.newBuilder(); top.addInnerBuilder().setOne("First inner"); top.addInnerBuilder().setTwo("Second inner"); top.addInnerBuilder().setThree("Third inner"); - TestProtobuf.TopMessage result = testData(top.build()).get(0); + TestProto2.TopMessage result = testData(false, top.build()).get(0); assertEquals(3, result.getInnerCount()); - TestProtobuf.InnerMessage first = result.getInner(0); - TestProtobuf.InnerMessage second = result.getInner(1); - TestProtobuf.InnerMessage third = result.getInner(2); + TestProto2.InnerMessage first = result.getInner(0); + TestProto2.InnerMessage second = result.getInner(1); + TestProto2.InnerMessage third = result.getInner(2); assertEquals("First inner", first.getOne()); assertFalse(first.hasTwo()); @@ -269,7 +286,7 @@ public void testProto3RepeatedMessages() throws Exception { top.addInnerBuilder().setTwo("Second inner"); top.addInnerBuilder().setThree("Third inner"); - TestProto3.TopMessage result = testData(top.build()).get(0); + TestProto3.TopMessage result = testData(true, top.build()).get(0); assertEquals(3, result.getInnerCount()); @@ -291,14 +308,14 @@ public void testProto3RepeatedMessages() throws Exception { } @Test - public void testRepeatedInt() throws Exception { - TestProtobuf.RepeatedIntMessage.Builder top = TestProtobuf.RepeatedIntMessage.newBuilder(); + public void testProto2RepeatedInt() throws Exception { + TestProto2.RepeatedIntMessage.Builder top = TestProto2.RepeatedIntMessage.newBuilder(); top.addRepeatedInt(1); top.addRepeatedInt(2); top.addRepeatedInt(3); - TestProtobuf.RepeatedIntMessage result = testData(top.build()).get(0); + TestProto2.RepeatedIntMessage result = testData(false, top.build()).get(0); assertEquals(3, result.getRepeatedIntCount()); @@ -315,7 +332,7 @@ public void testProto3RepeatedInt() throws Exception { top.addRepeatedInt(2); top.addRepeatedInt(3); - TestProto3.RepeatedIntMessage result = testData(top.build()).get(0); + TestProto3.RepeatedIntMessage result = testData(true, top.build()).get(0); assertEquals(3, result.getRepeatedIntCount()); @@ -325,12 +342,12 @@ public void testProto3RepeatedInt() throws Exception { } @Test - public void testLargeProtobufferFieldId() throws Exception { - TestProtobuf.HighIndexMessage.Builder builder = TestProtobuf.HighIndexMessage.newBuilder(); + public void testProto2LargeProtobufferFieldId() throws Exception { + TestProto2.HighIndexMessage.Builder builder = TestProto2.HighIndexMessage.newBuilder(); builder.addRepeatedInt(1); builder.addRepeatedInt(2); - testData(builder.build()); + testData(false, builder.build()); } @Test @@ -339,6 +356,6 @@ public void testProto3LargeProtobufferFieldId() throws Exception { builder.addRepeatedInt(1); builder.addRepeatedInt(2); - testData(builder.build()); + testData(true, builder.build()); } } diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java index d7ec169ce7..b8045c8224 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java @@ -21,7 +21,7 @@ import com.google.protobuf.Message; import org.junit.Test; import org.apache.parquet.proto.test.TestProto3; -import org.apache.parquet.proto.test.TestProtobuf; +import org.apache.parquet.proto.test.TestProto2; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; @@ -45,9 +45,9 @@ private void testConversion(Class pbClass, String parquetSche * Tests that all protocol buffer datatypes are converted to correct parquet datatypes. */ @Test - public void testConvertAllDatatypes() throws Exception { + public void testProto2ConvertAllDatatypes() throws Exception { String expectedSchema = - "message TestProtobuf.SchemaConverterAllDatatypes {\n" + + "message TestProto2.SchemaConverterAllDatatypes {\n" + " optional double optionalDouble = 1;\n" + " optional float optionalFloat = 2;\n" + " optional int32 optionalInt32 = 3;\n" + @@ -72,7 +72,7 @@ public void testConvertAllDatatypes() throws Exception { " optional binary optionalEnum (ENUM) = 18;" + "}"; - testConversion(TestProtobuf.SchemaConverterAllDatatypes.class, expectedSchema); + testConversion(TestProto2.SchemaConverterAllDatatypes.class, expectedSchema); } /** @@ -111,15 +111,23 @@ public void testProto3ConvertAllDatatypes() throws Exception { " }\n" + " }\n" + " }\n" + + " optional binary emptyString (UTF8) = 22;\n" + + " required group emptyMessageArray (LIST) = 23 {\n" + + " repeated group list {\n" + + " optional group element {\n" + + " optional int32 someId = 3;\n" + + " }\n" + + " }\n" + + " }\n" + "}"; testConversion(TestProto3.SchemaConverterAllDatatypes.class, expectedSchema); } @Test - public void testConvertRepetition() throws Exception { + public void testProto2ConvertRepetition() throws Exception { String expectedSchema = - "message TestProtobuf.SchemaConverterRepetition {\n" + + "message TestProto2.SchemaConverterRepetition {\n" + " optional int32 optionalPrimitive = 1;\n" + " required int32 requiredPrimitive = 2;\n" + " required group repeatedPrimitive (LIST) = 3 {\n" + @@ -142,7 +150,7 @@ public void testConvertRepetition() throws Exception { " }" + "}"; - testConversion(TestProtobuf.SchemaConverterRepetition.class, expectedSchema); + testConversion(TestProto2.SchemaConverterRepetition.class, expectedSchema); } @Test diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java index de27ebf3f4..ef8eedd501 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java @@ -26,23 +26,23 @@ import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.proto.test.TestProto3; -import org.apache.parquet.proto.test.TestProtobuf; +import org.apache.parquet.proto.test.TestProto2; public class ProtoWriteSupportTest { private ProtoWriteSupport createReadConsumerInstance(Class cls, RecordConsumer readConsumerMock) { - ProtoWriteSupport support = new ProtoWriteSupport(cls); + ProtoWriteSupport support = new ProtoWriteSupport(cls, false); support.init(new Configuration()); support.prepareForWrite(readConsumerMock); return support; } @Test - public void testSimplestMessage() throws Exception { + public void testProto2SimplestMessage() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); - ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.InnerMessage.class, readConsumerMock); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto2.InnerMessage.class, readConsumerMock); - TestProtobuf.InnerMessage.Builder msg = TestProtobuf.InnerMessage.newBuilder(); + TestProto2.InnerMessage.Builder msg = TestProto2.InnerMessage.newBuilder(); msg.setOne("oneValue"); instance.write(msg.build()); @@ -80,11 +80,11 @@ public void testProto3SimplestMessage() throws Exception { } @Test - public void testRepeatedIntMessage() throws Exception { + public void testProto2RepeatedIntMessage() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); - ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto2.RepeatedIntMessage.class, readConsumerMock); - TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder(); + TestProto2.RepeatedIntMessage.Builder msg = TestProto2.RepeatedIntMessage.newBuilder(); msg.addRepeatedInt(1323); msg.addRepeatedInt(54469); @@ -154,11 +154,11 @@ public void testProto3RepeatedIntMessage() throws Exception { } @Test - public void testRepeatedInnerMessageMessage_message() throws Exception { + public void testProto2RepeatedInnerMessageMessage_message() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); - ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.TopMessage.class, readConsumerMock); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto2.TopMessage.class, readConsumerMock); - TestProtobuf.TopMessage.Builder msg = TestProtobuf.TopMessage.newBuilder(); + TestProto2.TopMessage.Builder msg = TestProto2.TopMessage.newBuilder(); msg.addInnerBuilder().setOne("one").setTwo("two"); instance.write(msg.build()); @@ -228,11 +228,11 @@ public void testProto3RepeatedInnerMessageMessage_message() throws Exception { } @Test - public void testRepeatedInnerMessageMessage_scalar() throws Exception { + public void testProto2RepeatedInnerMessageMessage_scalar() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); - ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.TopMessage.class, readConsumerMock); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto2.TopMessage.class, readConsumerMock); - TestProtobuf.TopMessage.Builder msg = TestProtobuf.TopMessage.newBuilder(); + TestProto2.TopMessage.Builder msg = TestProto2.TopMessage.newBuilder(); msg.addInnerBuilder().setOne("one"); msg.addInnerBuilder().setTwo("two"); @@ -322,11 +322,11 @@ public void testProto3RepeatedInnerMessageMessage_scalar() throws Exception { } @Test - public void testOptionalInnerMessage() throws Exception { + public void testProto2OptionalInnerMessage() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); - ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.MessageA.class, readConsumerMock); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto2.MessageA.class, readConsumerMock); - TestProtobuf.MessageA.Builder msg = TestProtobuf.MessageA.newBuilder(); + TestProto2.MessageA.Builder msg = TestProto2.MessageA.newBuilder(); msg.getInnerBuilder().setOne("one"); instance.write(msg.build()); @@ -374,15 +374,15 @@ public void testProto3OptionalInnerMessage() throws Exception { } @Test(expected = UnsupportedOperationException.class) - public void testMessageWithExtensions() throws Exception { + public void testProto2MessageWithExtensions() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); - ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.Vehicle.class, readConsumerMock); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto2.Vehicle.class, readConsumerMock); - TestProtobuf.Vehicle.Builder msg = TestProtobuf.Vehicle.newBuilder(); + TestProto2.Vehicle.Builder msg = TestProto2.Vehicle.newBuilder(); msg.setHorsePower(300); // Currently there's no support for extension fields. This test tests that the extension field // will cause an exception. - msg.setExtension(TestProtobuf.Airplane.wingSpan, 50); + msg.setExtension(TestProto2.Airplane.wingSpan, 50); instance.write(msg.build()); } diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java index e3e25f6c15..38b8846c6d 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,6 +26,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.tools.read.SimpleReadSupport; +import org.apache.parquet.tools.read.SimpleRecord; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -40,14 +44,21 @@ public static Path someTemporaryFilePath() throws IOException { return new Path(tmp.getPath()); } - public static List writeAndRead(T... records) throws IOException { + public static List writeAndRead(final boolean includeDefaults, T... records) throws IOException { Class cls = inferRecordsClass(records); - Path file = writeMessages(cls, records); + Path file = writeMessages(cls, includeDefaults, records); return readMessages(file); } + + public static List writeAndReadParquet(final boolean includeDefaults, Class cls, final T... records) throws IOException { + final Path file = writeMessages(cls, includeDefaults, records); + + return readParquet(file); + } + public static Class inferRecordsClass(MessageOrBuilder[] records) { Class cls = null; @@ -73,13 +84,13 @@ public static Class inferRecordsClass(MessageOrBuilder[] reco /** * Writes messages to file, reads messages from file and checks if everything is OK. */ - public static List testData(T... messages) throws IOException { + public static List testData(boolean includeDefaults, T... messages) throws IOException { checkSameBuilderInstance(messages); List input = cloneList(messages); - List output = (List) writeAndRead(messages); + List output = (List) writeAndRead(includeDefaults, messages); List outputAsMessages = asMessages(output); assertEquals("The protocol buffers are not same:\n", asMessages(input), outputAsMessages); @@ -163,18 +174,27 @@ public static List readMessages(Path file) throw return result; } + private static List readParquet(final Path file) throws IOException { + final List records = new ArrayList<>(); + + try (final ParquetReader reader = ParquetReader + .builder(new SimpleReadSupport(), file).build()) { + for (SimpleRecord value = reader.read(); value != null; value = reader.read()) { + records.add(value); + } + } + + return records; + } + /** * Writes messages to temporary file and returns its path. */ - public static Path writeMessages(MessageOrBuilder... records) throws IOException { - return writeMessages(inferRecordsClass(records), records); - } - - public static Path writeMessages(Class cls, MessageOrBuilder... records) throws IOException { + public static Path writeMessages(final Class cls, final boolean includeDefaults, final MessageOrBuilder... records) throws IOException { Path file = someTemporaryFilePath(); - ProtoParquetWriter writer = - new ProtoParquetWriter(file, cls); + ParquetWriter writer = + new ParquetWriter(file, new ProtoWriteSupport(cls, includeDefaults)); for (MessageOrBuilder record : records) { writer.write(record); @@ -184,5 +204,4 @@ public static Path writeMessages(Class cls, MessageOrBuilder. return file; } - } diff --git a/parquet-protobuf/src/test/resources/TestProtobuf.proto b/parquet-protobuf/src/test/resources/TestProto2.proto similarity index 99% rename from parquet-protobuf/src/test/resources/TestProtobuf.proto rename to parquet-protobuf/src/test/resources/TestProto2.proto index d7cdf03a91..b6975e94d3 100644 --- a/parquet-protobuf/src/test/resources/TestProtobuf.proto +++ b/parquet-protobuf/src/test/resources/TestProto2.proto @@ -1,3 +1,4 @@ +syntax = "proto2"; // // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file @@ -17,7 +18,7 @@ // under the License. // -package TestProtobuf; +package TestProto2; option java_package = "org.apache.parquet.proto.test"; diff --git a/parquet-protobuf/src/test/resources/TestProto3.proto b/parquet-protobuf/src/test/resources/TestProto3.proto index 1896445306..7f7ef89dad 100644 --- a/parquet-protobuf/src/test/resources/TestProto3.proto +++ b/parquet-protobuf/src/test/resources/TestProto3.proto @@ -79,6 +79,8 @@ message SchemaConverterAllDatatypes { string someString = 20; } map optionalMap = 21; + string emptyString = 22; + repeated SchemaConverterSimpleMessage emptyMessageArray = 23; } message SchemaConverterRepetition {