From 16eafcb6d124af1a1deed68e84723e8ad1d91261 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Hanotte?= Date: Fri, 13 Apr 2018 15:25:34 +0200 Subject: [PATCH] PARQUET-968 add proto flag to enable writing using specs-compliant schemas (#2) * PARQUET-968 Add flag to write using specs compliant schemas For users that require backward compatibility with parquet 1.9.0 and older, the flag "parquet.proto.writeSpecsCompliant" is introduced to allow writing collection using the old style (using repeated and not using the LIST and MAP wrappers that are recommended by the parquet specs). * PARQUET-968 Add InputOutputFormat tests to validate read/write --- .../parquet/proto/ProtoMessageConverter.java | 43 +- .../parquet/proto/ProtoSchemaConverter.java | 30 +- .../parquet/proto/ProtoWriteSupport.java | 76 ++- .../proto/ProtoInputOutputFormatTest.java | 120 +++++ .../proto/ProtoSchemaConverterTest.java | 185 ++++++- .../parquet/proto/ProtoWriteSupportTest.java | 501 +++++++++++++++++- .../parquet/proto/utils/WriteUsingMR.java | 10 +- .../src/test/resources/TestProto3.proto | 8 + .../src/test/resources/TestProtobuf.proto | 8 + 9 files changed, 920 insertions(+), 61 deletions(-) diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java index bb9930b6fe..d5f43e6b35 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java @@ -131,12 +131,13 @@ public void add(Object value) { }; } - OriginalType originalType = parquetType.getOriginalType() == null ? OriginalType.UTF8 : parquetType.getOriginalType(); - switch (originalType) { - case LIST: return new ListConverter(parentBuilder, fieldDescriptor, parquetType); - case MAP: return new MapConverter(parentBuilder, fieldDescriptor, parquetType); - default: return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType); + if (OriginalType.LIST == parquetType.getOriginalType()) { + return new ListConverter(parentBuilder, fieldDescriptor, parquetType); } + if (OriginalType.MAP == parquetType.getOriginalType()) { + return new MapConverter(parentBuilder, fieldDescriptor, parquetType); + } + return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType); } private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { @@ -363,38 +364,38 @@ public void addBinary(Binary binary) { *

* A LIST wrapper is created in parquet for the above mentioned protobuf schema: * message SimpleList { - * required group first_array (LIST) = 1 { - * repeated int32 element; + * optional group first_array (LIST) = 1 { + * repeated group list { + * optional int32 element; + * } * } * } *

* The LIST wrappers are used by 3rd party tools, such as Hive, to read parquet arrays. The wrapper contains - * one only one field: either a primitive field (like in the example above, where we have an array of ints) or - * another group (array of messages). + * a repeated group named 'list', itself containing only one field called 'element' of the type of the repeated + * object (can be a primitive as in this example or a group in case of a repeated message in protobuf). */ final class ListConverter extends GroupConverter { private final Converter converter; - private final boolean listOfMessage; public ListConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { OriginalType originalType = parquetType.getOriginalType(); - if (originalType != OriginalType.LIST) { + if (originalType != OriginalType.LIST || parquetType.isPrimitive()) { throw new ParquetDecodingException("Expected LIST wrapper. Found: " + originalType + " instead."); } - listOfMessage = fieldDescriptor.getJavaType() == JavaType.MESSAGE; + GroupType rootWrapperType = parquetType.asGroupType(); + if (!rootWrapperType.containsField("list") || rootWrapperType.getType("list").isPrimitive()) { + throw new ParquetDecodingException("Expected repeated 'list' group inside LIST wrapperr but got: " + rootWrapperType); + } - Type parquetSchema; - if (parquetType.asGroupType().containsField("list")) { - parquetSchema = parquetType.asGroupType().getType("list"); - if (parquetSchema.asGroupType().containsField("element")) { - parquetSchema = parquetSchema.asGroupType().getType("element"); - } - } else { - throw new ParquetDecodingException("Expected list but got: " + parquetType); + GroupType listType = rootWrapperType.getType("list").asGroupType(); + if (!listType.containsField("element")) { + throw new ParquetDecodingException("Expected 'element' inside repeated list group but got: " + listType); } - converter = newMessageConverter(parentBuilder, fieldDescriptor, parquetSchema); + Type elementType = listType.getType("element"); + converter = newMessageConverter(parentBuilder, fieldDescriptor, elementType); } @Override diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java index eae54ebefa..9c1bf27c93 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java @@ -48,6 +48,22 @@ public class ProtoSchemaConverter { private static final Logger LOG = LoggerFactory.getLogger(ProtoSchemaConverter.class); + private final boolean parquetSpecsCompliant; + + public ProtoSchemaConverter() { + this(false); + } + + /** + * Instanciate a schema converter to get the parquet schema corresponding to protobuf classes. + * @param parquetSpecsCompliant If set to false, the parquet schema generated will be using the old + * schema style (prior to PARQUET-968) to provide backward-compatibility + * but which does not use LIST and MAP wrappers around collections as required + * by the parquet specifications. If set to true, specs compliant schemas are used. + */ + public ProtoSchemaConverter(boolean parquetSpecsCompliant) { + this.parquetSpecsCompliant = parquetSpecsCompliant; + } public MessageType convert(Class protobufClass) { LOG.debug("Converting protocol buffer class \"" + protobufClass + "\" to parquet schema."); @@ -86,7 +102,8 @@ private Builder>, GroupBuilder> addF } ParquetType parquetType = getParquetType(descriptor); - if (descriptor.isRepeated()) { + if (descriptor.isRepeated() && parquetSpecsCompliant) { + // the old schema style did not include the LIST wrapper around repeated fields return addRepeatedPrimitive(descriptor, parquetType.primitiveType, parquetType.originalType, builder); } @@ -98,7 +115,7 @@ private Builder>, GroupBuilder> addR OriginalType originalType, final GroupBuilder builder) { return builder - .group(Type.Repetition.REQUIRED).as(OriginalType.LIST) + .group(Type.Repetition.OPTIONAL).as(OriginalType.LIST) .group(Type.Repetition.REPEATED) .primitive(primitiveType, Type.Repetition.REQUIRED).as(originalType) .named("element") @@ -108,7 +125,7 @@ private Builder>, GroupBuilder> addR private GroupBuilder> addRepeatedMessage(FieldDescriptor descriptor, GroupBuilder builder) { GroupBuilder>>> result = builder - .group(Type.Repetition.REQUIRED).as(OriginalType.LIST) + .group(Type.Repetition.OPTIONAL).as(OriginalType.LIST) .group(Type.Repetition.REPEATED) .group(Type.Repetition.OPTIONAL); @@ -118,9 +135,12 @@ private GroupBuilder> addRepeatedMessage(FieldDescriptor des } private GroupBuilder> addMessageField(FieldDescriptor descriptor, final GroupBuilder builder) { - if (descriptor.isMapField()) { + if (descriptor.isMapField() && parquetSpecsCompliant) { + // the old schema style did not include the MAP wrapper around map groups return addMapField(descriptor, builder); - } else if (descriptor.isRepeated()) { + } + if (descriptor.isRepeated() && parquetSpecsCompliant) { + // the old schema style did not include the LIST wrapper around repeated messages return addRepeatedMessage(descriptor, builder); } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java index bb75e71748..4d70632061 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java @@ -47,7 +47,13 @@ public class ProtoWriteSupport extends WriteSupport< private static final Logger LOG = LoggerFactory.getLogger(ProtoWriteSupport.class); public static final String PB_CLASS_WRITE = "parquet.proto.writeClass"; + // PARQUET-968 introduces changes to allow writing specs compliant schemas with parquet-protobuf. + // In the past, collection were not written using the LIST and MAP wrappers and thus were not compliant + // with the parquet specs. This flag, is set to true, allows to write using spec compliant schemas + // but is set to false by default to keep backward compatibility + public static final String PB_SPECS_COMPLIANT_WRITE = "parquet.proto.writeSpecsCompliant"; + private boolean writeSpecsCompliant = false; private RecordConsumer recordConsumer; private Class protoMessage; private MessageWriter messageWriter; @@ -68,6 +74,16 @@ public static void setSchema(Configuration configuration, Class extraMetaData = new HashMap(); extraMetaData.put(ProtoReadSupport.PB_CLASS, protoMessage.getName()); extraMetaData.put(ProtoReadSupport.PB_DESCRIPTOR, serializeDescriptor(protoMessage)); + extraMetaData.put(PB_SPECS_COMPLIANT_WRITE, String.valueOf(writeSpecsCompliant)); return new WriteContext(rootSchema, extraMetaData); } @@ -158,8 +176,12 @@ class MessageWriter extends FieldWriter { Type type = schema.getType(name); FieldWriter writer = createWriter(fieldDescriptor, type); - if(fieldDescriptor.isRepeated() && !fieldDescriptor.isMapField()) { - writer = new ArrayWriter(writer); + if(writeSpecsCompliant && fieldDescriptor.isRepeated() && !fieldDescriptor.isMapField()) { + writer = new ArrayWriter(writer); + } + else if (!writeSpecsCompliant && fieldDescriptor.isRepeated()) { + // the old schemas style used to write maps as repeated fields instead of wrapping them in a LIST + writer = new RepeatedWriter(writer); } writer.setFieldName(name); @@ -187,7 +209,7 @@ private FieldWriter createWriter(FieldDescriptor fieldDescriptor, Type type) { } private FieldWriter createMessageWriter(FieldDescriptor fieldDescriptor, Type type) { - if (fieldDescriptor.isMapField()) { + if (fieldDescriptor.isMapField() && writeSpecsCompliant) { return createMapWriter(fieldDescriptor, type); } @@ -235,16 +257,16 @@ void writeTopLevelMessage(Object value) { /** Writes message as part of repeated field. It cannot start field*/ @Override final void writeRawValue(Object value) { + recordConsumer.startGroup(); writeAllFields((MessageOrBuilder) value); + recordConsumer.endGroup(); } /** Used for writing nonrepeated (optional, required) fields*/ @Override final void writeField(Object value) { recordConsumer.startField(fieldName, index); - recordConsumer.startGroup(); - writeAllFields((MessageOrBuilder) value); - recordConsumer.endGroup(); + writeRawValue(value); recordConsumer.endField(fieldName, index); } @@ -288,21 +310,11 @@ final void writeField(Object value) { recordConsumer.startField("list", 0); // This is the wrapper group for the array field for (Object listEntry: list) { recordConsumer.startGroup(); - recordConsumer.startField("element", 0); // This is the mandatory inner field - if (!isPrimitive(listEntry)) { - recordConsumer.startGroup(); - } - fieldWriter.writeRawValue(listEntry); - if (!isPrimitive(listEntry)) { - recordConsumer.endGroup(); - } - recordConsumer.endField("element", 0); - recordConsumer.endGroup(); } recordConsumer.endField("list", 0); @@ -312,8 +324,33 @@ final void writeField(Object value) { } } - private boolean isPrimitive(Object listEntry) { - return !(listEntry instanceof Message); + /** + * The RepeatedWriter is used to write collections (lists and maps) using the old style (without LIST and MAP + * wrappers). + */ + class RepeatedWriter extends FieldWriter { + final FieldWriter fieldWriter; + + RepeatedWriter(FieldWriter fieldWriter) { + this.fieldWriter = fieldWriter; + } + + @Override + final void writeRawValue(Object value) { + throw new UnsupportedOperationException("Array has no raw value"); + } + + @Override + final void writeField(Object value) { + recordConsumer.startField(fieldName, index); + List list = (List) value; + + for (Object listEntry: list) { + fieldWriter.writeRawValue(listEntry); + } + + recordConsumer.endField(fieldName, index); + } } /** validates mapping between protobuffer fields and parquet fields.*/ @@ -440,5 +477,4 @@ private String serializeDescriptor(Class protoClass) { DescriptorProtos.DescriptorProto asProto = descriptor.toProto(); return TextFormat.printToString(asProto); } - } diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java index 6c01d7b8db..5544dc6887 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java @@ -19,6 +19,7 @@ package org.apache.parquet.proto; import com.google.protobuf.Message; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.proto.test.TestProto3; import org.apache.parquet.proto.test.TestProtobuf; @@ -193,6 +194,125 @@ public void testProto3CustomProtoClass() throws Exception { assertEquals("writtenString", stringValue); } + @Test + public void testRepeatedIntMessageClass() throws Exception { + TestProtobuf.RepeatedIntMessage msgEmpty = TestProtobuf.RepeatedIntMessage.newBuilder().build(); + TestProtobuf.RepeatedIntMessage msgNonEmpty = TestProtobuf.RepeatedIntMessage.newBuilder() + .addRepeatedInt(1).addRepeatedInt(2) + .build(); + + Path outputPath = new WriteUsingMR().write(msgEmpty, msgNonEmpty); + ReadUsingMR readUsingMR = new ReadUsingMR(); + String customClass = TestProtobuf.RepeatedIntMessage.class.getName(); + ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); + List result = readUsingMR.read(outputPath); + + assertEquals(2, result.size()); + assertEquals(msgEmpty, result.get(0)); + assertEquals(msgNonEmpty, result.get(1)); + } + + @Test + public void testRepeatedIntMessageClassSchemaCompliant() throws Exception { + TestProtobuf.RepeatedIntMessage msgEmpty = TestProtobuf.RepeatedIntMessage.newBuilder().build(); + TestProtobuf.RepeatedIntMessage msgNonEmpty = TestProtobuf.RepeatedIntMessage.newBuilder() + .addRepeatedInt(1).addRepeatedInt(2) + .build(); + + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + + Path outputPath = new WriteUsingMR(conf).write(msgEmpty, msgNonEmpty); + ReadUsingMR readUsingMR = new ReadUsingMR(); + String customClass = TestProtobuf.RepeatedIntMessage.class.getName(); + ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); + List result = readUsingMR.read(outputPath); + + assertEquals(2, result.size()); + assertEquals(msgEmpty, result.get(0)); + assertEquals(msgNonEmpty, result.get(1)); + } + + @Test + public void testMapIntMessageClass() throws Exception { + TestProtobuf.MapIntMessage msgEmpty = TestProtobuf.MapIntMessage.newBuilder().build(); + TestProtobuf.MapIntMessage msgNonEmpty = TestProtobuf.MapIntMessage.newBuilder() + .putMapInt(1, 123).putMapInt(2, 234) + .build(); + + Path outputPath = new WriteUsingMR().write(msgEmpty, msgNonEmpty); + ReadUsingMR readUsingMR = new ReadUsingMR(); + String customClass = TestProtobuf.MapIntMessage.class.getName(); + ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); + List result = readUsingMR.read(outputPath); + + assertEquals(2, result.size()); + assertEquals(msgEmpty, result.get(0)); + assertEquals(msgNonEmpty, result.get(1)); + } + + @Test + public void testMapIntMessageClassSchemaCompliant() throws Exception { + TestProtobuf.MapIntMessage msgEmpty = TestProtobuf.MapIntMessage.newBuilder().build(); + TestProtobuf.MapIntMessage msgNonEmpty = TestProtobuf.MapIntMessage.newBuilder() + .putMapInt(1, 123).putMapInt(2, 234) + .build(); + + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + + Path outputPath = new WriteUsingMR(conf).write(msgEmpty, msgNonEmpty); + ReadUsingMR readUsingMR = new ReadUsingMR(); + String customClass = TestProtobuf.MapIntMessage.class.getName(); + ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); + List result = readUsingMR.read(outputPath); + + assertEquals(2, result.size()); + assertEquals(msgEmpty, result.get(0)); + assertEquals(msgNonEmpty, result.get(1)); + } + + @Test + public void testRepeatedInnerMessageClass() throws Exception { + TestProtobuf.RepeatedInnerMessage msgEmpty = TestProtobuf.RepeatedInnerMessage.newBuilder().build(); + TestProtobuf.RepeatedInnerMessage msgNonEmpty = TestProtobuf.RepeatedInnerMessage.newBuilder() + .addRepeatedInnerMessage(TestProtobuf.InnerMessage.newBuilder().setOne("one").build()) + .addRepeatedInnerMessage(TestProtobuf.InnerMessage.newBuilder().setTwo("two").build()) + .build(); + + Path outputPath = new WriteUsingMR().write(msgEmpty, msgNonEmpty); + ReadUsingMR readUsingMR = new ReadUsingMR(); + String customClass = TestProtobuf.RepeatedInnerMessage.class.getName(); + ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); + List result = readUsingMR.read(outputPath); + + assertEquals(2, result.size()); + assertEquals(msgEmpty, result.get(0)); + assertEquals(msgNonEmpty, result.get(1)); + } + + @Test + public void testRepeatedInnerMessageClassSchemaCompliant() throws Exception { + TestProtobuf.RepeatedInnerMessage msgEmpty = TestProtobuf.RepeatedInnerMessage.newBuilder().build(); + TestProtobuf.RepeatedInnerMessage msgNonEmpty = TestProtobuf.RepeatedInnerMessage.newBuilder() + .addRepeatedInnerMessage(TestProtobuf.InnerMessage.newBuilder().setOne("one").build()) + .addRepeatedInnerMessage(TestProtobuf.InnerMessage.newBuilder().setTwo("two").build()) + .build(); + + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + + Path outputPath = new WriteUsingMR(conf).write(msgEmpty, msgNonEmpty); + ReadUsingMR readUsingMR = new ReadUsingMR(); + String customClass = TestProtobuf.RepeatedInnerMessage.class.getName(); + ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); + List result = readUsingMR.read(outputPath); + + assertEquals(2, result.size()); + assertEquals(msgEmpty, result.get(0)); + assertEquals(msgNonEmpty, result.get(1)); + } + /** * Runs job that writes input to file and then job reading data back. */ diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java index d7ec169ce7..4ca82ac740 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java @@ -32,14 +32,17 @@ public class ProtoSchemaConverterTest { /** * Converts given pbClass to parquet schema and compares it with expected parquet schema. */ - private void testConversion(Class pbClass, String parquetSchemaString) throws + private void testConversion(Class pbClass, String parquetSchemaString, boolean parquetSpecsCompliant) throws Exception { - ProtoSchemaConverter protoSchemaConverter = new ProtoSchemaConverter(); + ProtoSchemaConverter protoSchemaConverter = new ProtoSchemaConverter(parquetSpecsCompliant); MessageType schema = protoSchemaConverter.convert(pbClass); MessageType expectedMT = MessageTypeParser.parseMessageType(parquetSchemaString); assertEquals(expectedMT.toString(), schema.toString()); } + private void testConversion(Class pbClass, String parquetSchemaString) throws Exception { + testConversion(pbClass, parquetSchemaString, true); + } /** * Tests that all protocol buffer datatypes are converted to correct parquet datatypes. @@ -122,7 +125,7 @@ public void testConvertRepetition() throws Exception { "message TestProtobuf.SchemaConverterRepetition {\n" + " optional int32 optionalPrimitive = 1;\n" + " required int32 requiredPrimitive = 2;\n" + - " required group repeatedPrimitive (LIST) = 3 {\n" + + " optional group repeatedPrimitive (LIST) = 3 {\n" + " repeated group list {\n" + " required int32 element;\n" + " }\n" + @@ -133,7 +136,7 @@ public void testConvertRepetition() throws Exception { " required group requiredMessage = 8 {\n" + " optional int32 someId= 3;\n" + " }\n" + - " required group repeatedMessage (LIST) = 9 {\n" + + " optional group repeatedMessage (LIST) = 9 {\n" + " repeated group list {\n" + " optional group element {\n" + " optional int32 someId = 3;\n" + @@ -150,7 +153,7 @@ public void testProto3ConvertRepetition() throws Exception { String expectedSchema = "message TestProto3.SchemaConverterRepetition {\n" + " optional int32 optionalPrimitive = 1;\n" + - " required group repeatedPrimitive (LIST) = 3 {\n" + + " optional group repeatedPrimitive (LIST) = 3 {\n" + " repeated group list {\n" + " required int32 element;\n" + " }\n" + @@ -158,7 +161,7 @@ public void testProto3ConvertRepetition() throws Exception { " optional group optionalMessage = 7 {\n" + " optional int32 someId = 3;\n" + " }\n" + - " required group repeatedMessage (LIST) = 9 {\n" + + " optional group repeatedMessage (LIST) = 9 {\n" + " repeated group list {\n" + " optional group element {\n" + " optional int32 someId = 3;\n" + @@ -169,4 +172,174 @@ public void testProto3ConvertRepetition() throws Exception { testConversion(TestProto3.SchemaConverterRepetition.class, expectedSchema); } + + @Test + public void testConvertRepeatedIntMessage() throws Exception { + String expectedSchema = + "message TestProtobuf.RepeatedIntMessage {\n" + + " optional group repeatedInt (LIST) = 1 {\n" + + " repeated group list {\n" + + " required int32 element;\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + + testConversion(TestProtobuf.RepeatedIntMessage.class, expectedSchema); + } + + @Test + public void testConvertRepeatedIntMessageNonSpecsCompliant() throws Exception { + String expectedSchema = + "message TestProtobuf.RepeatedIntMessage {\n" + + " repeated int32 repeatedInt = 1;\n" + + "}"; + + testConversion(TestProtobuf.RepeatedIntMessage.class, expectedSchema, false); + } + + @Test + public void testProto3ConvertRepeatedIntMessage() throws Exception { + String expectedSchema = + "message TestProto3.RepeatedIntMessage {\n" + + " optional group repeatedInt (LIST) = 1 {\n" + + " repeated group list {\n" + + " required int32 element;\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + + testConversion(TestProto3.RepeatedIntMessage.class, expectedSchema); + } + + @Test + public void testProto3ConvertRepeatedIntMessageNonSpecsCompliant() throws Exception { + String expectedSchema = + "message TestProto3.RepeatedIntMessage {\n" + + " repeated int32 repeatedInt = 1;\n" + + "}"; + + testConversion(TestProto3.RepeatedIntMessage.class, expectedSchema, false); + } + + @Test + public void testConvertRepeatedInnerMessage() throws Exception { + String expectedSchema = + "message TestProtobuf.RepeatedInnerMessage {\n" + + " optional group repeatedInnerMessage (LIST) = 1 {\n" + + " repeated group list {\n" + + " optional group element {\n" + + " optional binary one (UTF8) = 1;\n" + + " optional binary two (UTF8) = 2;\n" + + " optional binary three (UTF8) = 3;\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + + testConversion(TestProtobuf.RepeatedInnerMessage.class, expectedSchema); + } + + @Test + public void testConvertRepeatedInnerMessageNonSpecsCompliant() throws Exception { + String expectedSchema = + "message TestProtobuf.RepeatedInnerMessage {\n" + + " repeated group repeatedInnerMessage = 1 {\n" + + " optional binary one (UTF8) = 1;\n" + + " optional binary two (UTF8) = 2;\n" + + " optional binary three (UTF8) = 3;\n" + + " }\n" + + "}"; + + testConversion(TestProtobuf.RepeatedInnerMessage.class, expectedSchema, false); + } + + @Test + public void testProto3ConvertRepeatedInnerMessage() throws Exception { + String expectedSchema = + "message TestProto3.RepeatedInnerMessage {\n" + + " optional group repeatedInnerMessage (LIST) = 1 {\n" + + " repeated group list {\n" + + " optional group element {\n" + + " optional binary one (UTF8) = 1;\n" + + " optional binary two (UTF8) = 2;\n" + + " optional binary three (UTF8) = 3;\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + + testConversion(TestProto3.RepeatedInnerMessage.class, expectedSchema); + } + + @Test + public void testProto3ConvertRepeatedInnerMessageNonSpecsCompliant() throws Exception { + String expectedSchema = + "message TestProto3.RepeatedInnerMessage {\n" + + " repeated group repeatedInnerMessage = 1 {\n" + + " optional binary one (UTF8) = 1;\n" + + " optional binary two (UTF8) = 2;\n" + + " optional binary three (UTF8) = 3;\n" + + " }\n" + + "}"; + + testConversion(TestProto3.RepeatedInnerMessage.class, expectedSchema, false); + } + + @Test + public void testConvertMapIntMessage() throws Exception { + String expectedSchema = + "message TestProtobuf.MapIntMessage {\n" + + " optional group mapInt (MAP) = 1 {\n" + + " repeated group key_value {\n" + + " required int32 key;\n" + + " optional int32 value;\n" + + " }\n" + + " }\n" + + "}"; + + testConversion(TestProtobuf.MapIntMessage.class, expectedSchema); + } + + @Test + public void testConvertMapIntMessageNonSpecsCompliant() throws Exception { + String expectedSchema = + "message TestProtobuf.MapIntMessage {\n" + + " repeated group mapInt = 1 {\n" + + " optional int32 key = 1;\n" + + " optional int32 value = 2;\n" + + " }\n" + + "}"; + + testConversion(TestProtobuf.MapIntMessage.class, expectedSchema, false); + } + + @Test + public void testProto3ConvertMapIntMessage() throws Exception { + String expectedSchema = + "message TestProto3.MapIntMessage {\n" + + " optional group mapInt (MAP) = 1 {\n" + + " repeated group key_value {\n" + + " required int32 key;\n" + + " optional int32 value;\n" + + " }\n" + + " }\n" + + "}"; + + testConversion(TestProto3.MapIntMessage.class, expectedSchema); + } + + @Test + public void testProto3ConvertMapIntMessageNonSpecsCompliant() throws Exception { + String expectedSchema = + "message TestProto3.MapIntMessage {\n" + + " repeated group mapInt = 1 {\n" + + " optional int32 key = 1;\n" + + " optional int32 value = 2;\n" + + " }\n" + + "}"; + + testConversion(TestProto3.MapIntMessage.class, expectedSchema, false); + } } diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java index de27ebf3f4..f71229c222 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java @@ -31,8 +31,12 @@ public class ProtoWriteSupportTest { private ProtoWriteSupport createReadConsumerInstance(Class cls, RecordConsumer readConsumerMock) { + return createReadConsumerInstance(cls, readConsumerMock, new Configuration()); + } + + private ProtoWriteSupport createReadConsumerInstance(Class cls, RecordConsumer readConsumerMock, Configuration conf) { ProtoWriteSupport support = new ProtoWriteSupport(cls); - support.init(new Configuration()); + support.init(conf); support.prepareForWrite(readConsumerMock); return support; } @@ -80,9 +84,11 @@ public void testProto3SimplestMessage() throws Exception { } @Test - public void testRepeatedIntMessage() throws Exception { + public void testRepeatedIntMessageSpecsCompliant() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); - ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock, conf); TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder(); msg.addRepeatedInt(1323); @@ -117,9 +123,67 @@ public void testRepeatedIntMessage() throws Exception { } @Test - public void testProto3RepeatedIntMessage() throws Exception { + public void testRepeatedIntMessage() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); - ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.RepeatedIntMessage.class, readConsumerMock); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock); + + TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder(); + msg.addRepeatedInt(1323); + msg.addRepeatedInt(54469); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("repeatedInt", 0); + inOrder.verify(readConsumerMock).addInteger(1323); + inOrder.verify(readConsumerMock).addInteger(54469); + inOrder.verify(readConsumerMock).endField("repeatedInt", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testRepeatedIntMessageEmptySpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock, conf); + + TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder(); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testRepeatedIntMessageEmpty() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock); + + TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder(); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3RepeatedIntMessageSpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.RepeatedIntMessage.class, readConsumerMock, conf); TestProto3.RepeatedIntMessage.Builder msg = TestProto3.RepeatedIntMessage.newBuilder(); msg.addRepeatedInt(1323); @@ -153,6 +217,290 @@ public void testProto3RepeatedIntMessage() throws Exception { Mockito.verifyNoMoreInteractions(readConsumerMock); } + @Test + public void testProto3RepeatedIntMessage() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.RepeatedIntMessage.class, readConsumerMock); + + TestProto3.RepeatedIntMessage.Builder msg = TestProto3.RepeatedIntMessage.newBuilder(); + msg.addRepeatedInt(1323); + msg.addRepeatedInt(54469); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("repeatedInt", 0); + inOrder.verify(readConsumerMock).addInteger(1323); + inOrder.verify(readConsumerMock).addInteger(54469); + inOrder.verify(readConsumerMock).endField("repeatedInt", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3RepeatedIntMessageEmptySpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock, conf); + + TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder(); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3RepeatedIntMessageEmpty() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock); + + TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder(); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testMapIntMessageSpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.MapIntMessage.class, readConsumerMock, conf); + + TestProtobuf.MapIntMessage.Builder msg = TestProtobuf.MapIntMessage.newBuilder(); + msg.putMapInt(123, 1); + msg.putMapInt(234, 2); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("mapInt", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key_value", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(123); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(1); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(234); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(2); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("key_value", 0); + inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("mapInt", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testMapIntMessage() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.MapIntMessage.class, readConsumerMock); + + TestProtobuf.MapIntMessage.Builder msg = TestProtobuf.MapIntMessage.newBuilder(); + msg.putMapInt(123, 1); + msg.putMapInt(234, 2); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("mapInt", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(123); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(1); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(234); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(2); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("mapInt", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testMapIntMessageEmptySpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.MapIntMessage.class, readConsumerMock, conf); + + TestProtobuf.MapIntMessage.Builder msg = TestProtobuf.MapIntMessage.newBuilder(); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testMapIntMessageEmpty() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.MapIntMessage.class, readConsumerMock); + + TestProtobuf.MapIntMessage.Builder msg = TestProtobuf.MapIntMessage.newBuilder(); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3MapIntMessageSpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.MapIntMessage.class, readConsumerMock, conf); + + TestProto3.MapIntMessage.Builder msg = TestProto3.MapIntMessage.newBuilder(); + msg.putMapInt(123, 1); + msg.putMapInt(234, 2); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("mapInt", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key_value", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(123); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(1); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(234); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(2); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("key_value", 0); + inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("mapInt", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3MapIntMessage() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.MapIntMessage.class, readConsumerMock); + + TestProto3.MapIntMessage.Builder msg = TestProto3.MapIntMessage.newBuilder(); + msg.putMapInt(123, 1); + msg.putMapInt(234, 2); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("mapInt", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(123); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(1); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(234); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(2); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("mapInt", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3MapIntMessageEmptySpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.MapIntMessage.class, readConsumerMock, conf); + + TestProto3.MapIntMessage.Builder msg = TestProto3.MapIntMessage.newBuilder(); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3MapIntMessageEmpty() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.MapIntMessage.class, readConsumerMock); + + TestProto3.MapIntMessage.Builder msg = TestProto3.MapIntMessage.newBuilder(); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + @Test public void testRepeatedInnerMessageMessage_message() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); @@ -165,6 +513,37 @@ public void testRepeatedInnerMessageMessage_message() throws Exception { InOrder inOrder = Mockito.inOrder(readConsumerMock); + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("inner", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("one", 0); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes())); + inOrder.verify(readConsumerMock).endField("one", 0); + inOrder.verify(readConsumerMock).startField("two", 1); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes())); + inOrder.verify(readConsumerMock).endField("two", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("inner", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testRepeatedInnerMessageSpecsCompliantMessage_message() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.TopMessage.class, readConsumerMock, conf); + + TestProtobuf.TopMessage.Builder msg = TestProtobuf.TopMessage.newBuilder(); + msg.addInnerBuilder().setOne("one").setTwo("two"); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + inOrder.verify(readConsumerMock).startMessage(); inOrder.verify(readConsumerMock).startField("inner", 0); inOrder.verify(readConsumerMock).startGroup(); @@ -192,7 +571,7 @@ public void testRepeatedInnerMessageMessage_message() throws Exception { @Test public void testProto3RepeatedInnerMessageMessage_message() throws Exception { - RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class);; ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.TopMessage.class, readConsumerMock); TestProto3.TopMessage.Builder msg = TestProto3.TopMessage.newBuilder(); @@ -202,6 +581,37 @@ public void testProto3RepeatedInnerMessageMessage_message() throws Exception { InOrder inOrder = Mockito.inOrder(readConsumerMock); + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("inner", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("one", 0); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes())); + inOrder.verify(readConsumerMock).endField("one", 0); + inOrder.verify(readConsumerMock).startField("two", 1); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes())); + inOrder.verify(readConsumerMock).endField("two", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("inner", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3RepeatedInnerMessageSpecsCompliantMessage_message() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.TopMessage.class, readConsumerMock, conf); + + TestProto3.TopMessage.Builder msg = TestProto3.TopMessage.newBuilder(); + msg.addInnerBuilder().setOne("one").setTwo("two"); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + inOrder.verify(readConsumerMock).startMessage(); inOrder.verify(readConsumerMock).startField("inner", 0); inOrder.verify(readConsumerMock).startGroup(); @@ -227,10 +637,13 @@ public void testProto3RepeatedInnerMessageMessage_message() throws Exception { Mockito.verifyNoMoreInteractions(readConsumerMock); } + @Test - public void testRepeatedInnerMessageMessage_scalar() throws Exception { + public void testRepeatedInnerMessageSpecsCompliantMessage_scalar() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); - ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.TopMessage.class, readConsumerMock); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.TopMessage.class, readConsumerMock, conf); TestProtobuf.TopMessage.Builder msg = TestProtobuf.TopMessage.newBuilder(); msg.addInnerBuilder().setOne("one"); @@ -274,6 +687,41 @@ public void testRepeatedInnerMessageMessage_scalar() throws Exception { Mockito.verifyNoMoreInteractions(readConsumerMock); } + @Test + public void testRepeatedInnerMessageMessage_scalar() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.TopMessage.class, readConsumerMock); + + TestProtobuf.TopMessage.Builder msg = TestProtobuf.TopMessage.newBuilder(); + msg.addInnerBuilder().setOne("one"); + msg.addInnerBuilder().setTwo("two"); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("inner", 0); + + //first inner message + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("one", 0); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes())); + inOrder.verify(readConsumerMock).endField("one", 0); + inOrder.verify(readConsumerMock).endGroup(); + + //second inner message + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("two", 1); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes())); + inOrder.verify(readConsumerMock).endField("two", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("inner", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + @Test public void testProto3RepeatedInnerMessageMessage_scalar() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); @@ -287,6 +735,43 @@ public void testProto3RepeatedInnerMessageMessage_scalar() throws Exception { InOrder inOrder = Mockito.inOrder(readConsumerMock); + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("inner", 0); + + //first inner message + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("one", 0); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes())); + inOrder.verify(readConsumerMock).endField("one", 0); + inOrder.verify(readConsumerMock).endGroup(); + + //second inner message + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("two", 1); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes())); + inOrder.verify(readConsumerMock).endField("two", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("inner", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3RepeatedInnerMessageSpecsCompliantMessage_scalar() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.TopMessage.class, readConsumerMock, conf); + + TestProto3.TopMessage.Builder msg = TestProto3.TopMessage.newBuilder(); + msg.addInnerBuilder().setOne("one"); + msg.addInnerBuilder().setTwo("two"); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + inOrder.verify(readConsumerMock).startMessage(); inOrder.verify(readConsumerMock).startField("inner", 0); inOrder.verify(readConsumerMock).startGroup(); diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java index d18076a642..55f9237ec5 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java @@ -46,10 +46,18 @@ public class WriteUsingMR { private static final Logger LOG = LoggerFactory.getLogger(WriteUsingMR.class); - Configuration conf = new Configuration(); + private final Configuration conf; private static List inputMessages; Path outputPath; + public WriteUsingMR() { + this(new Configuration()); + } + + public WriteUsingMR(Configuration conf) { + this.conf = new Configuration(); + } + public Configuration getConfiguration() { return conf; } diff --git a/parquet-protobuf/src/test/resources/TestProto3.proto b/parquet-protobuf/src/test/resources/TestProto3.proto index 1896445306..e49eef5727 100644 --- a/parquet-protobuf/src/test/resources/TestProto3.proto +++ b/parquet-protobuf/src/test/resources/TestProto3.proto @@ -124,6 +124,14 @@ message RepeatedIntMessage { repeated int32 repeatedInt = 1; } +message RepeatedInnerMessage { + repeated InnerMessage repeatedInnerMessage = 1; +} + +message MapIntMessage { + map mapInt = 1; +} + message HighIndexMessage { repeated int32 repeatedInt = 50000; } diff --git a/parquet-protobuf/src/test/resources/TestProtobuf.proto b/parquet-protobuf/src/test/resources/TestProtobuf.proto index d7cdf03a91..d4ab4c7dcd 100644 --- a/parquet-protobuf/src/test/resources/TestProtobuf.proto +++ b/parquet-protobuf/src/test/resources/TestProtobuf.proto @@ -122,6 +122,14 @@ message RepeatedIntMessage { repeated int32 repeatedInt = 1; } +message RepeatedInnerMessage { + repeated InnerMessage repeatedInnerMessage = 1; +} + +message MapIntMessage { + map mapInt = 1; +} + message HighIndexMessage { repeated int32 repeatedInt = 50000; }