diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java index bb9930b6fe..d5f43e6b35 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java @@ -131,12 +131,13 @@ public void add(Object value) { }; } - OriginalType originalType = parquetType.getOriginalType() == null ? OriginalType.UTF8 : parquetType.getOriginalType(); - switch (originalType) { - case LIST: return new ListConverter(parentBuilder, fieldDescriptor, parquetType); - case MAP: return new MapConverter(parentBuilder, fieldDescriptor, parquetType); - default: return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType); + if (OriginalType.LIST == parquetType.getOriginalType()) { + return new ListConverter(parentBuilder, fieldDescriptor, parquetType); } + if (OriginalType.MAP == parquetType.getOriginalType()) { + return new MapConverter(parentBuilder, fieldDescriptor, parquetType); + } + return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType); } private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { @@ -363,38 +364,38 @@ public void addBinary(Binary binary) { *

* A LIST wrapper is created in parquet for the above mentioned protobuf schema: * message SimpleList { - * required group first_array (LIST) = 1 { - * repeated int32 element; + * optional group first_array (LIST) = 1 { + * repeated group list { + * optional int32 element; + * } * } * } *

* The LIST wrappers are used by 3rd party tools, such as Hive, to read parquet arrays. The wrapper contains - * one only one field: either a primitive field (like in the example above, where we have an array of ints) or - * another group (array of messages). + * a repeated group named 'list', itself containing only one field called 'element' of the type of the repeated + * object (can be a primitive as in this example or a group in case of a repeated message in protobuf). */ final class ListConverter extends GroupConverter { private final Converter converter; - private final boolean listOfMessage; public ListConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { OriginalType originalType = parquetType.getOriginalType(); - if (originalType != OriginalType.LIST) { + if (originalType != OriginalType.LIST || parquetType.isPrimitive()) { throw new ParquetDecodingException("Expected LIST wrapper. Found: " + originalType + " instead."); } - listOfMessage = fieldDescriptor.getJavaType() == JavaType.MESSAGE; + GroupType rootWrapperType = parquetType.asGroupType(); + if (!rootWrapperType.containsField("list") || rootWrapperType.getType("list").isPrimitive()) { + throw new ParquetDecodingException("Expected repeated 'list' group inside LIST wrapperr but got: " + rootWrapperType); + } - Type parquetSchema; - if (parquetType.asGroupType().containsField("list")) { - parquetSchema = parquetType.asGroupType().getType("list"); - if (parquetSchema.asGroupType().containsField("element")) { - parquetSchema = parquetSchema.asGroupType().getType("element"); - } - } else { - throw new ParquetDecodingException("Expected list but got: " + parquetType); + GroupType listType = rootWrapperType.getType("list").asGroupType(); + if (!listType.containsField("element")) { + throw new ParquetDecodingException("Expected 'element' inside repeated list group but got: " + listType); } - converter = newMessageConverter(parentBuilder, fieldDescriptor, parquetSchema); + Type elementType = listType.getType("element"); + converter = newMessageConverter(parentBuilder, fieldDescriptor, elementType); } @Override diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java index eae54ebefa..9c1bf27c93 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java @@ -48,6 +48,22 @@ public class ProtoSchemaConverter { private static final Logger LOG = LoggerFactory.getLogger(ProtoSchemaConverter.class); + private final boolean parquetSpecsCompliant; + + public ProtoSchemaConverter() { + this(false); + } + + /** + * Instanciate a schema converter to get the parquet schema corresponding to protobuf classes. + * @param parquetSpecsCompliant If set to false, the parquet schema generated will be using the old + * schema style (prior to PARQUET-968) to provide backward-compatibility + * but which does not use LIST and MAP wrappers around collections as required + * by the parquet specifications. If set to true, specs compliant schemas are used. + */ + public ProtoSchemaConverter(boolean parquetSpecsCompliant) { + this.parquetSpecsCompliant = parquetSpecsCompliant; + } public MessageType convert(Class protobufClass) { LOG.debug("Converting protocol buffer class \"" + protobufClass + "\" to parquet schema."); @@ -86,7 +102,8 @@ private Builder>, GroupBuilder> addF } ParquetType parquetType = getParquetType(descriptor); - if (descriptor.isRepeated()) { + if (descriptor.isRepeated() && parquetSpecsCompliant) { + // the old schema style did not include the LIST wrapper around repeated fields return addRepeatedPrimitive(descriptor, parquetType.primitiveType, parquetType.originalType, builder); } @@ -98,7 +115,7 @@ private Builder>, GroupBuilder> addR OriginalType originalType, final GroupBuilder builder) { return builder - .group(Type.Repetition.REQUIRED).as(OriginalType.LIST) + .group(Type.Repetition.OPTIONAL).as(OriginalType.LIST) .group(Type.Repetition.REPEATED) .primitive(primitiveType, Type.Repetition.REQUIRED).as(originalType) .named("element") @@ -108,7 +125,7 @@ private Builder>, GroupBuilder> addR private GroupBuilder> addRepeatedMessage(FieldDescriptor descriptor, GroupBuilder builder) { GroupBuilder>>> result = builder - .group(Type.Repetition.REQUIRED).as(OriginalType.LIST) + .group(Type.Repetition.OPTIONAL).as(OriginalType.LIST) .group(Type.Repetition.REPEATED) .group(Type.Repetition.OPTIONAL); @@ -118,9 +135,12 @@ private GroupBuilder> addRepeatedMessage(FieldDescriptor des } private GroupBuilder> addMessageField(FieldDescriptor descriptor, final GroupBuilder builder) { - if (descriptor.isMapField()) { + if (descriptor.isMapField() && parquetSpecsCompliant) { + // the old schema style did not include the MAP wrapper around map groups return addMapField(descriptor, builder); - } else if (descriptor.isRepeated()) { + } + if (descriptor.isRepeated() && parquetSpecsCompliant) { + // the old schema style did not include the LIST wrapper around repeated messages return addRepeatedMessage(descriptor, builder); } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java index bb75e71748..4d70632061 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java @@ -47,7 +47,13 @@ public class ProtoWriteSupport extends WriteSupport< private static final Logger LOG = LoggerFactory.getLogger(ProtoWriteSupport.class); public static final String PB_CLASS_WRITE = "parquet.proto.writeClass"; + // PARQUET-968 introduces changes to allow writing specs compliant schemas with parquet-protobuf. + // In the past, collection were not written using the LIST and MAP wrappers and thus were not compliant + // with the parquet specs. This flag, is set to true, allows to write using spec compliant schemas + // but is set to false by default to keep backward compatibility + public static final String PB_SPECS_COMPLIANT_WRITE = "parquet.proto.writeSpecsCompliant"; + private boolean writeSpecsCompliant = false; private RecordConsumer recordConsumer; private Class protoMessage; private MessageWriter messageWriter; @@ -68,6 +74,16 @@ public static void setSchema(Configuration configuration, Class extraMetaData = new HashMap(); extraMetaData.put(ProtoReadSupport.PB_CLASS, protoMessage.getName()); extraMetaData.put(ProtoReadSupport.PB_DESCRIPTOR, serializeDescriptor(protoMessage)); + extraMetaData.put(PB_SPECS_COMPLIANT_WRITE, String.valueOf(writeSpecsCompliant)); return new WriteContext(rootSchema, extraMetaData); } @@ -158,8 +176,12 @@ class MessageWriter extends FieldWriter { Type type = schema.getType(name); FieldWriter writer = createWriter(fieldDescriptor, type); - if(fieldDescriptor.isRepeated() && !fieldDescriptor.isMapField()) { - writer = new ArrayWriter(writer); + if(writeSpecsCompliant && fieldDescriptor.isRepeated() && !fieldDescriptor.isMapField()) { + writer = new ArrayWriter(writer); + } + else if (!writeSpecsCompliant && fieldDescriptor.isRepeated()) { + // the old schemas style used to write maps as repeated fields instead of wrapping them in a LIST + writer = new RepeatedWriter(writer); } writer.setFieldName(name); @@ -187,7 +209,7 @@ private FieldWriter createWriter(FieldDescriptor fieldDescriptor, Type type) { } private FieldWriter createMessageWriter(FieldDescriptor fieldDescriptor, Type type) { - if (fieldDescriptor.isMapField()) { + if (fieldDescriptor.isMapField() && writeSpecsCompliant) { return createMapWriter(fieldDescriptor, type); } @@ -235,16 +257,16 @@ void writeTopLevelMessage(Object value) { /** Writes message as part of repeated field. It cannot start field*/ @Override final void writeRawValue(Object value) { + recordConsumer.startGroup(); writeAllFields((MessageOrBuilder) value); + recordConsumer.endGroup(); } /** Used for writing nonrepeated (optional, required) fields*/ @Override final void writeField(Object value) { recordConsumer.startField(fieldName, index); - recordConsumer.startGroup(); - writeAllFields((MessageOrBuilder) value); - recordConsumer.endGroup(); + writeRawValue(value); recordConsumer.endField(fieldName, index); } @@ -288,21 +310,11 @@ final void writeField(Object value) { recordConsumer.startField("list", 0); // This is the wrapper group for the array field for (Object listEntry: list) { recordConsumer.startGroup(); - recordConsumer.startField("element", 0); // This is the mandatory inner field - if (!isPrimitive(listEntry)) { - recordConsumer.startGroup(); - } - fieldWriter.writeRawValue(listEntry); - if (!isPrimitive(listEntry)) { - recordConsumer.endGroup(); - } - recordConsumer.endField("element", 0); - recordConsumer.endGroup(); } recordConsumer.endField("list", 0); @@ -312,8 +324,33 @@ final void writeField(Object value) { } } - private boolean isPrimitive(Object listEntry) { - return !(listEntry instanceof Message); + /** + * The RepeatedWriter is used to write collections (lists and maps) using the old style (without LIST and MAP + * wrappers). + */ + class RepeatedWriter extends FieldWriter { + final FieldWriter fieldWriter; + + RepeatedWriter(FieldWriter fieldWriter) { + this.fieldWriter = fieldWriter; + } + + @Override + final void writeRawValue(Object value) { + throw new UnsupportedOperationException("Array has no raw value"); + } + + @Override + final void writeField(Object value) { + recordConsumer.startField(fieldName, index); + List list = (List) value; + + for (Object listEntry: list) { + fieldWriter.writeRawValue(listEntry); + } + + recordConsumer.endField(fieldName, index); + } } /** validates mapping between protobuffer fields and parquet fields.*/ @@ -440,5 +477,4 @@ private String serializeDescriptor(Class protoClass) { DescriptorProtos.DescriptorProto asProto = descriptor.toProto(); return TextFormat.printToString(asProto); } - } diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java index 6c01d7b8db..5544dc6887 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java @@ -19,6 +19,7 @@ package org.apache.parquet.proto; import com.google.protobuf.Message; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.proto.test.TestProto3; import org.apache.parquet.proto.test.TestProtobuf; @@ -193,6 +194,125 @@ public void testProto3CustomProtoClass() throws Exception { assertEquals("writtenString", stringValue); } + @Test + public void testRepeatedIntMessageClass() throws Exception { + TestProtobuf.RepeatedIntMessage msgEmpty = TestProtobuf.RepeatedIntMessage.newBuilder().build(); + TestProtobuf.RepeatedIntMessage msgNonEmpty = TestProtobuf.RepeatedIntMessage.newBuilder() + .addRepeatedInt(1).addRepeatedInt(2) + .build(); + + Path outputPath = new WriteUsingMR().write(msgEmpty, msgNonEmpty); + ReadUsingMR readUsingMR = new ReadUsingMR(); + String customClass = TestProtobuf.RepeatedIntMessage.class.getName(); + ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); + List result = readUsingMR.read(outputPath); + + assertEquals(2, result.size()); + assertEquals(msgEmpty, result.get(0)); + assertEquals(msgNonEmpty, result.get(1)); + } + + @Test + public void testRepeatedIntMessageClassSchemaCompliant() throws Exception { + TestProtobuf.RepeatedIntMessage msgEmpty = TestProtobuf.RepeatedIntMessage.newBuilder().build(); + TestProtobuf.RepeatedIntMessage msgNonEmpty = TestProtobuf.RepeatedIntMessage.newBuilder() + .addRepeatedInt(1).addRepeatedInt(2) + .build(); + + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + + Path outputPath = new WriteUsingMR(conf).write(msgEmpty, msgNonEmpty); + ReadUsingMR readUsingMR = new ReadUsingMR(); + String customClass = TestProtobuf.RepeatedIntMessage.class.getName(); + ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); + List result = readUsingMR.read(outputPath); + + assertEquals(2, result.size()); + assertEquals(msgEmpty, result.get(0)); + assertEquals(msgNonEmpty, result.get(1)); + } + + @Test + public void testMapIntMessageClass() throws Exception { + TestProtobuf.MapIntMessage msgEmpty = TestProtobuf.MapIntMessage.newBuilder().build(); + TestProtobuf.MapIntMessage msgNonEmpty = TestProtobuf.MapIntMessage.newBuilder() + .putMapInt(1, 123).putMapInt(2, 234) + .build(); + + Path outputPath = new WriteUsingMR().write(msgEmpty, msgNonEmpty); + ReadUsingMR readUsingMR = new ReadUsingMR(); + String customClass = TestProtobuf.MapIntMessage.class.getName(); + ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); + List result = readUsingMR.read(outputPath); + + assertEquals(2, result.size()); + assertEquals(msgEmpty, result.get(0)); + assertEquals(msgNonEmpty, result.get(1)); + } + + @Test + public void testMapIntMessageClassSchemaCompliant() throws Exception { + TestProtobuf.MapIntMessage msgEmpty = TestProtobuf.MapIntMessage.newBuilder().build(); + TestProtobuf.MapIntMessage msgNonEmpty = TestProtobuf.MapIntMessage.newBuilder() + .putMapInt(1, 123).putMapInt(2, 234) + .build(); + + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + + Path outputPath = new WriteUsingMR(conf).write(msgEmpty, msgNonEmpty); + ReadUsingMR readUsingMR = new ReadUsingMR(); + String customClass = TestProtobuf.MapIntMessage.class.getName(); + ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); + List result = readUsingMR.read(outputPath); + + assertEquals(2, result.size()); + assertEquals(msgEmpty, result.get(0)); + assertEquals(msgNonEmpty, result.get(1)); + } + + @Test + public void testRepeatedInnerMessageClass() throws Exception { + TestProtobuf.RepeatedInnerMessage msgEmpty = TestProtobuf.RepeatedInnerMessage.newBuilder().build(); + TestProtobuf.RepeatedInnerMessage msgNonEmpty = TestProtobuf.RepeatedInnerMessage.newBuilder() + .addRepeatedInnerMessage(TestProtobuf.InnerMessage.newBuilder().setOne("one").build()) + .addRepeatedInnerMessage(TestProtobuf.InnerMessage.newBuilder().setTwo("two").build()) + .build(); + + Path outputPath = new WriteUsingMR().write(msgEmpty, msgNonEmpty); + ReadUsingMR readUsingMR = new ReadUsingMR(); + String customClass = TestProtobuf.RepeatedInnerMessage.class.getName(); + ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); + List result = readUsingMR.read(outputPath); + + assertEquals(2, result.size()); + assertEquals(msgEmpty, result.get(0)); + assertEquals(msgNonEmpty, result.get(1)); + } + + @Test + public void testRepeatedInnerMessageClassSchemaCompliant() throws Exception { + TestProtobuf.RepeatedInnerMessage msgEmpty = TestProtobuf.RepeatedInnerMessage.newBuilder().build(); + TestProtobuf.RepeatedInnerMessage msgNonEmpty = TestProtobuf.RepeatedInnerMessage.newBuilder() + .addRepeatedInnerMessage(TestProtobuf.InnerMessage.newBuilder().setOne("one").build()) + .addRepeatedInnerMessage(TestProtobuf.InnerMessage.newBuilder().setTwo("two").build()) + .build(); + + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + + Path outputPath = new WriteUsingMR(conf).write(msgEmpty, msgNonEmpty); + ReadUsingMR readUsingMR = new ReadUsingMR(); + String customClass = TestProtobuf.RepeatedInnerMessage.class.getName(); + ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); + List result = readUsingMR.read(outputPath); + + assertEquals(2, result.size()); + assertEquals(msgEmpty, result.get(0)); + assertEquals(msgNonEmpty, result.get(1)); + } + /** * Runs job that writes input to file and then job reading data back. */ diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java index d7ec169ce7..4ca82ac740 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java @@ -32,14 +32,17 @@ public class ProtoSchemaConverterTest { /** * Converts given pbClass to parquet schema and compares it with expected parquet schema. */ - private void testConversion(Class pbClass, String parquetSchemaString) throws + private void testConversion(Class pbClass, String parquetSchemaString, boolean parquetSpecsCompliant) throws Exception { - ProtoSchemaConverter protoSchemaConverter = new ProtoSchemaConverter(); + ProtoSchemaConverter protoSchemaConverter = new ProtoSchemaConverter(parquetSpecsCompliant); MessageType schema = protoSchemaConverter.convert(pbClass); MessageType expectedMT = MessageTypeParser.parseMessageType(parquetSchemaString); assertEquals(expectedMT.toString(), schema.toString()); } + private void testConversion(Class pbClass, String parquetSchemaString) throws Exception { + testConversion(pbClass, parquetSchemaString, true); + } /** * Tests that all protocol buffer datatypes are converted to correct parquet datatypes. @@ -122,7 +125,7 @@ public void testConvertRepetition() throws Exception { "message TestProtobuf.SchemaConverterRepetition {\n" + " optional int32 optionalPrimitive = 1;\n" + " required int32 requiredPrimitive = 2;\n" + - " required group repeatedPrimitive (LIST) = 3 {\n" + + " optional group repeatedPrimitive (LIST) = 3 {\n" + " repeated group list {\n" + " required int32 element;\n" + " }\n" + @@ -133,7 +136,7 @@ public void testConvertRepetition() throws Exception { " required group requiredMessage = 8 {\n" + " optional int32 someId= 3;\n" + " }\n" + - " required group repeatedMessage (LIST) = 9 {\n" + + " optional group repeatedMessage (LIST) = 9 {\n" + " repeated group list {\n" + " optional group element {\n" + " optional int32 someId = 3;\n" + @@ -150,7 +153,7 @@ public void testProto3ConvertRepetition() throws Exception { String expectedSchema = "message TestProto3.SchemaConverterRepetition {\n" + " optional int32 optionalPrimitive = 1;\n" + - " required group repeatedPrimitive (LIST) = 3 {\n" + + " optional group repeatedPrimitive (LIST) = 3 {\n" + " repeated group list {\n" + " required int32 element;\n" + " }\n" + @@ -158,7 +161,7 @@ public void testProto3ConvertRepetition() throws Exception { " optional group optionalMessage = 7 {\n" + " optional int32 someId = 3;\n" + " }\n" + - " required group repeatedMessage (LIST) = 9 {\n" + + " optional group repeatedMessage (LIST) = 9 {\n" + " repeated group list {\n" + " optional group element {\n" + " optional int32 someId = 3;\n" + @@ -169,4 +172,174 @@ public void testProto3ConvertRepetition() throws Exception { testConversion(TestProto3.SchemaConverterRepetition.class, expectedSchema); } + + @Test + public void testConvertRepeatedIntMessage() throws Exception { + String expectedSchema = + "message TestProtobuf.RepeatedIntMessage {\n" + + " optional group repeatedInt (LIST) = 1 {\n" + + " repeated group list {\n" + + " required int32 element;\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + + testConversion(TestProtobuf.RepeatedIntMessage.class, expectedSchema); + } + + @Test + public void testConvertRepeatedIntMessageNonSpecsCompliant() throws Exception { + String expectedSchema = + "message TestProtobuf.RepeatedIntMessage {\n" + + " repeated int32 repeatedInt = 1;\n" + + "}"; + + testConversion(TestProtobuf.RepeatedIntMessage.class, expectedSchema, false); + } + + @Test + public void testProto3ConvertRepeatedIntMessage() throws Exception { + String expectedSchema = + "message TestProto3.RepeatedIntMessage {\n" + + " optional group repeatedInt (LIST) = 1 {\n" + + " repeated group list {\n" + + " required int32 element;\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + + testConversion(TestProto3.RepeatedIntMessage.class, expectedSchema); + } + + @Test + public void testProto3ConvertRepeatedIntMessageNonSpecsCompliant() throws Exception { + String expectedSchema = + "message TestProto3.RepeatedIntMessage {\n" + + " repeated int32 repeatedInt = 1;\n" + + "}"; + + testConversion(TestProto3.RepeatedIntMessage.class, expectedSchema, false); + } + + @Test + public void testConvertRepeatedInnerMessage() throws Exception { + String expectedSchema = + "message TestProtobuf.RepeatedInnerMessage {\n" + + " optional group repeatedInnerMessage (LIST) = 1 {\n" + + " repeated group list {\n" + + " optional group element {\n" + + " optional binary one (UTF8) = 1;\n" + + " optional binary two (UTF8) = 2;\n" + + " optional binary three (UTF8) = 3;\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + + testConversion(TestProtobuf.RepeatedInnerMessage.class, expectedSchema); + } + + @Test + public void testConvertRepeatedInnerMessageNonSpecsCompliant() throws Exception { + String expectedSchema = + "message TestProtobuf.RepeatedInnerMessage {\n" + + " repeated group repeatedInnerMessage = 1 {\n" + + " optional binary one (UTF8) = 1;\n" + + " optional binary two (UTF8) = 2;\n" + + " optional binary three (UTF8) = 3;\n" + + " }\n" + + "}"; + + testConversion(TestProtobuf.RepeatedInnerMessage.class, expectedSchema, false); + } + + @Test + public void testProto3ConvertRepeatedInnerMessage() throws Exception { + String expectedSchema = + "message TestProto3.RepeatedInnerMessage {\n" + + " optional group repeatedInnerMessage (LIST) = 1 {\n" + + " repeated group list {\n" + + " optional group element {\n" + + " optional binary one (UTF8) = 1;\n" + + " optional binary two (UTF8) = 2;\n" + + " optional binary three (UTF8) = 3;\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + + testConversion(TestProto3.RepeatedInnerMessage.class, expectedSchema); + } + + @Test + public void testProto3ConvertRepeatedInnerMessageNonSpecsCompliant() throws Exception { + String expectedSchema = + "message TestProto3.RepeatedInnerMessage {\n" + + " repeated group repeatedInnerMessage = 1 {\n" + + " optional binary one (UTF8) = 1;\n" + + " optional binary two (UTF8) = 2;\n" + + " optional binary three (UTF8) = 3;\n" + + " }\n" + + "}"; + + testConversion(TestProto3.RepeatedInnerMessage.class, expectedSchema, false); + } + + @Test + public void testConvertMapIntMessage() throws Exception { + String expectedSchema = + "message TestProtobuf.MapIntMessage {\n" + + " optional group mapInt (MAP) = 1 {\n" + + " repeated group key_value {\n" + + " required int32 key;\n" + + " optional int32 value;\n" + + " }\n" + + " }\n" + + "}"; + + testConversion(TestProtobuf.MapIntMessage.class, expectedSchema); + } + + @Test + public void testConvertMapIntMessageNonSpecsCompliant() throws Exception { + String expectedSchema = + "message TestProtobuf.MapIntMessage {\n" + + " repeated group mapInt = 1 {\n" + + " optional int32 key = 1;\n" + + " optional int32 value = 2;\n" + + " }\n" + + "}"; + + testConversion(TestProtobuf.MapIntMessage.class, expectedSchema, false); + } + + @Test + public void testProto3ConvertMapIntMessage() throws Exception { + String expectedSchema = + "message TestProto3.MapIntMessage {\n" + + " optional group mapInt (MAP) = 1 {\n" + + " repeated group key_value {\n" + + " required int32 key;\n" + + " optional int32 value;\n" + + " }\n" + + " }\n" + + "}"; + + testConversion(TestProto3.MapIntMessage.class, expectedSchema); + } + + @Test + public void testProto3ConvertMapIntMessageNonSpecsCompliant() throws Exception { + String expectedSchema = + "message TestProto3.MapIntMessage {\n" + + " repeated group mapInt = 1 {\n" + + " optional int32 key = 1;\n" + + " optional int32 value = 2;\n" + + " }\n" + + "}"; + + testConversion(TestProto3.MapIntMessage.class, expectedSchema, false); + } } diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java index de27ebf3f4..f71229c222 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java @@ -31,8 +31,12 @@ public class ProtoWriteSupportTest { private ProtoWriteSupport createReadConsumerInstance(Class cls, RecordConsumer readConsumerMock) { + return createReadConsumerInstance(cls, readConsumerMock, new Configuration()); + } + + private ProtoWriteSupport createReadConsumerInstance(Class cls, RecordConsumer readConsumerMock, Configuration conf) { ProtoWriteSupport support = new ProtoWriteSupport(cls); - support.init(new Configuration()); + support.init(conf); support.prepareForWrite(readConsumerMock); return support; } @@ -80,9 +84,11 @@ public void testProto3SimplestMessage() throws Exception { } @Test - public void testRepeatedIntMessage() throws Exception { + public void testRepeatedIntMessageSpecsCompliant() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); - ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock, conf); TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder(); msg.addRepeatedInt(1323); @@ -117,9 +123,67 @@ public void testRepeatedIntMessage() throws Exception { } @Test - public void testProto3RepeatedIntMessage() throws Exception { + public void testRepeatedIntMessage() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); - ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.RepeatedIntMessage.class, readConsumerMock); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock); + + TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder(); + msg.addRepeatedInt(1323); + msg.addRepeatedInt(54469); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("repeatedInt", 0); + inOrder.verify(readConsumerMock).addInteger(1323); + inOrder.verify(readConsumerMock).addInteger(54469); + inOrder.verify(readConsumerMock).endField("repeatedInt", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testRepeatedIntMessageEmptySpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock, conf); + + TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder(); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testRepeatedIntMessageEmpty() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock); + + TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder(); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3RepeatedIntMessageSpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.RepeatedIntMessage.class, readConsumerMock, conf); TestProto3.RepeatedIntMessage.Builder msg = TestProto3.RepeatedIntMessage.newBuilder(); msg.addRepeatedInt(1323); @@ -153,6 +217,290 @@ public void testProto3RepeatedIntMessage() throws Exception { Mockito.verifyNoMoreInteractions(readConsumerMock); } + @Test + public void testProto3RepeatedIntMessage() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.RepeatedIntMessage.class, readConsumerMock); + + TestProto3.RepeatedIntMessage.Builder msg = TestProto3.RepeatedIntMessage.newBuilder(); + msg.addRepeatedInt(1323); + msg.addRepeatedInt(54469); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("repeatedInt", 0); + inOrder.verify(readConsumerMock).addInteger(1323); + inOrder.verify(readConsumerMock).addInteger(54469); + inOrder.verify(readConsumerMock).endField("repeatedInt", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3RepeatedIntMessageEmptySpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock, conf); + + TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder(); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3RepeatedIntMessageEmpty() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock); + + TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder(); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testMapIntMessageSpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.MapIntMessage.class, readConsumerMock, conf); + + TestProtobuf.MapIntMessage.Builder msg = TestProtobuf.MapIntMessage.newBuilder(); + msg.putMapInt(123, 1); + msg.putMapInt(234, 2); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("mapInt", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key_value", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(123); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(1); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(234); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(2); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("key_value", 0); + inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("mapInt", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testMapIntMessage() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.MapIntMessage.class, readConsumerMock); + + TestProtobuf.MapIntMessage.Builder msg = TestProtobuf.MapIntMessage.newBuilder(); + msg.putMapInt(123, 1); + msg.putMapInt(234, 2); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("mapInt", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(123); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(1); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(234); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(2); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("mapInt", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testMapIntMessageEmptySpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.MapIntMessage.class, readConsumerMock, conf); + + TestProtobuf.MapIntMessage.Builder msg = TestProtobuf.MapIntMessage.newBuilder(); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testMapIntMessageEmpty() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.MapIntMessage.class, readConsumerMock); + + TestProtobuf.MapIntMessage.Builder msg = TestProtobuf.MapIntMessage.newBuilder(); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3MapIntMessageSpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.MapIntMessage.class, readConsumerMock, conf); + + TestProto3.MapIntMessage.Builder msg = TestProto3.MapIntMessage.newBuilder(); + msg.putMapInt(123, 1); + msg.putMapInt(234, 2); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("mapInt", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key_value", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(123); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(1); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(234); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(2); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("key_value", 0); + inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("mapInt", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3MapIntMessage() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.MapIntMessage.class, readConsumerMock); + + TestProto3.MapIntMessage.Builder msg = TestProto3.MapIntMessage.newBuilder(); + msg.putMapInt(123, 1); + msg.putMapInt(234, 2); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("mapInt", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(123); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(1); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(234); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(2); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("mapInt", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3MapIntMessageEmptySpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.MapIntMessage.class, readConsumerMock, conf); + + TestProto3.MapIntMessage.Builder msg = TestProto3.MapIntMessage.newBuilder(); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3MapIntMessageEmpty() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.MapIntMessage.class, readConsumerMock); + + TestProto3.MapIntMessage.Builder msg = TestProto3.MapIntMessage.newBuilder(); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + @Test public void testRepeatedInnerMessageMessage_message() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); @@ -165,6 +513,37 @@ public void testRepeatedInnerMessageMessage_message() throws Exception { InOrder inOrder = Mockito.inOrder(readConsumerMock); + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("inner", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("one", 0); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes())); + inOrder.verify(readConsumerMock).endField("one", 0); + inOrder.verify(readConsumerMock).startField("two", 1); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes())); + inOrder.verify(readConsumerMock).endField("two", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("inner", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testRepeatedInnerMessageSpecsCompliantMessage_message() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.TopMessage.class, readConsumerMock, conf); + + TestProtobuf.TopMessage.Builder msg = TestProtobuf.TopMessage.newBuilder(); + msg.addInnerBuilder().setOne("one").setTwo("two"); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + inOrder.verify(readConsumerMock).startMessage(); inOrder.verify(readConsumerMock).startField("inner", 0); inOrder.verify(readConsumerMock).startGroup(); @@ -192,7 +571,7 @@ public void testRepeatedInnerMessageMessage_message() throws Exception { @Test public void testProto3RepeatedInnerMessageMessage_message() throws Exception { - RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class);; ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.TopMessage.class, readConsumerMock); TestProto3.TopMessage.Builder msg = TestProto3.TopMessage.newBuilder(); @@ -202,6 +581,37 @@ public void testProto3RepeatedInnerMessageMessage_message() throws Exception { InOrder inOrder = Mockito.inOrder(readConsumerMock); + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("inner", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("one", 0); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes())); + inOrder.verify(readConsumerMock).endField("one", 0); + inOrder.verify(readConsumerMock).startField("two", 1); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes())); + inOrder.verify(readConsumerMock).endField("two", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("inner", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3RepeatedInnerMessageSpecsCompliantMessage_message() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.TopMessage.class, readConsumerMock, conf); + + TestProto3.TopMessage.Builder msg = TestProto3.TopMessage.newBuilder(); + msg.addInnerBuilder().setOne("one").setTwo("two"); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + inOrder.verify(readConsumerMock).startMessage(); inOrder.verify(readConsumerMock).startField("inner", 0); inOrder.verify(readConsumerMock).startGroup(); @@ -227,10 +637,13 @@ public void testProto3RepeatedInnerMessageMessage_message() throws Exception { Mockito.verifyNoMoreInteractions(readConsumerMock); } + @Test - public void testRepeatedInnerMessageMessage_scalar() throws Exception { + public void testRepeatedInnerMessageSpecsCompliantMessage_scalar() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); - ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.TopMessage.class, readConsumerMock); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.TopMessage.class, readConsumerMock, conf); TestProtobuf.TopMessage.Builder msg = TestProtobuf.TopMessage.newBuilder(); msg.addInnerBuilder().setOne("one"); @@ -274,6 +687,41 @@ public void testRepeatedInnerMessageMessage_scalar() throws Exception { Mockito.verifyNoMoreInteractions(readConsumerMock); } + @Test + public void testRepeatedInnerMessageMessage_scalar() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.TopMessage.class, readConsumerMock); + + TestProtobuf.TopMessage.Builder msg = TestProtobuf.TopMessage.newBuilder(); + msg.addInnerBuilder().setOne("one"); + msg.addInnerBuilder().setTwo("two"); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("inner", 0); + + //first inner message + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("one", 0); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes())); + inOrder.verify(readConsumerMock).endField("one", 0); + inOrder.verify(readConsumerMock).endGroup(); + + //second inner message + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("two", 1); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes())); + inOrder.verify(readConsumerMock).endField("two", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("inner", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + @Test public void testProto3RepeatedInnerMessageMessage_scalar() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); @@ -287,6 +735,43 @@ public void testProto3RepeatedInnerMessageMessage_scalar() throws Exception { InOrder inOrder = Mockito.inOrder(readConsumerMock); + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("inner", 0); + + //first inner message + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("one", 0); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes())); + inOrder.verify(readConsumerMock).endField("one", 0); + inOrder.verify(readConsumerMock).endGroup(); + + //second inner message + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("two", 1); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes())); + inOrder.verify(readConsumerMock).endField("two", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("inner", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3RepeatedInnerMessageSpecsCompliantMessage_scalar() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.TopMessage.class, readConsumerMock, conf); + + TestProto3.TopMessage.Builder msg = TestProto3.TopMessage.newBuilder(); + msg.addInnerBuilder().setOne("one"); + msg.addInnerBuilder().setTwo("two"); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + inOrder.verify(readConsumerMock).startMessage(); inOrder.verify(readConsumerMock).startField("inner", 0); inOrder.verify(readConsumerMock).startGroup(); diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java index d18076a642..55f9237ec5 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java @@ -46,10 +46,18 @@ public class WriteUsingMR { private static final Logger LOG = LoggerFactory.getLogger(WriteUsingMR.class); - Configuration conf = new Configuration(); + private final Configuration conf; private static List inputMessages; Path outputPath; + public WriteUsingMR() { + this(new Configuration()); + } + + public WriteUsingMR(Configuration conf) { + this.conf = new Configuration(); + } + public Configuration getConfiguration() { return conf; } diff --git a/parquet-protobuf/src/test/resources/TestProto3.proto b/parquet-protobuf/src/test/resources/TestProto3.proto index 1896445306..e49eef5727 100644 --- a/parquet-protobuf/src/test/resources/TestProto3.proto +++ b/parquet-protobuf/src/test/resources/TestProto3.proto @@ -124,6 +124,14 @@ message RepeatedIntMessage { repeated int32 repeatedInt = 1; } +message RepeatedInnerMessage { + repeated InnerMessage repeatedInnerMessage = 1; +} + +message MapIntMessage { + map mapInt = 1; +} + message HighIndexMessage { repeated int32 repeatedInt = 50000; } diff --git a/parquet-protobuf/src/test/resources/TestProtobuf.proto b/parquet-protobuf/src/test/resources/TestProtobuf.proto index d7cdf03a91..d4ab4c7dcd 100644 --- a/parquet-protobuf/src/test/resources/TestProtobuf.proto +++ b/parquet-protobuf/src/test/resources/TestProtobuf.proto @@ -122,6 +122,14 @@ message RepeatedIntMessage { repeated int32 repeatedInt = 1; } +message RepeatedInnerMessage { + repeated InnerMessage repeatedInnerMessage = 1; +} + +message MapIntMessage { + map mapInt = 1; +} + message HighIndexMessage { repeated int32 repeatedInt = 50000; }