From 5cf92487d5e64764b75284867471acb4e3cddb4d Mon Sep 17 00:00:00 2001 From: Constantin Muraru Date: Sat, 29 Apr 2017 22:33:43 +0300 Subject: [PATCH] PARQUET-968 Add Hive support in ProtoParquet Fix bug with map writer Implement review Implement review PARQUET-968 Implement feedback Update the proto to parquet schema converter for MAP fields so that it follows the scec: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists This came as feedback from the Amazon Athena team. --- .../parquet/proto/ProtoMessageConverter.java | 131 ++++++++++++++++- .../parquet/proto/ProtoSchemaConverter.java | 139 ++++++++++++++---- .../parquet/proto/ProtoWriteSupport.java | 95 +++++++++++- .../proto/ProtoSchemaConverterTest.java | 38 +++-- .../parquet/proto/ProtoWriteSupportTest.java | 48 ++++++ 5 files changed, 401 insertions(+), 50 deletions(-) diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java index b5649a05b6..953994f1c1 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java @@ -24,12 +24,14 @@ import com.twitter.elephantbird.util.Protobufs; import org.apache.parquet.column.Dictionary; import org.apache.parquet.io.InvalidRecordException; +import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.Converter; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.PrimitiveConverter; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.IncompatibleSchemaModificationException; +import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.Type; import java.util.HashMap; @@ -129,10 +131,14 @@ public void add(Object value) { }; } - return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType); + OriginalType originalType = parquetType.getOriginalType() == null ? OriginalType.UTF8 : parquetType.getOriginalType(); + switch (originalType) { + case LIST: return new ListConverter(parentBuilder, fieldDescriptor, parquetType); + case MAP: return new MapConverter(parentBuilder, fieldDescriptor, parquetType); + default: return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType); + } } - private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { JavaType javaType = fieldDescriptor.getJavaType(); @@ -345,4 +351,125 @@ public void addBinary(Binary binary) { } } + + /** + * This class unwraps the additional LIST wrapper and makes it possible to read the underlying data and then convert + * it to protobuf. + *

+ * Consider the following protobuf schema: + * message SimpleList { + * repeated int64 first_array = 1; + * } + *

+ * A LIST wrapper is created in parquet for the above mentioned protobuf schema: + * message SimpleList { + * required group first_array (LIST) = 1 { + * repeated int32 element; + * } + * } + *

+ * The LIST wrappers are used by 3rd party tools, such as Hive, to read parquet arrays. The wrapper contains + * one only one field: either a primitive field (like in the example above, where we have an array of ints) or + * another group (array of messages). + */ + final class ListConverter extends GroupConverter { + private final Converter converter; + private final boolean listOfMessage; + + public ListConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { + OriginalType originalType = parquetType.getOriginalType(); + if (originalType != OriginalType.LIST) { + throw new ParquetDecodingException("Expected LIST wrapper. Found: " + originalType + " instead."); + } + + listOfMessage = fieldDescriptor.getJavaType() == JavaType.MESSAGE; + + Type parquetSchema; + if (parquetType.asGroupType().containsField("list")) { + parquetSchema = parquetType.asGroupType().getType("list"); + if (parquetSchema.asGroupType().containsField("element")) { + parquetSchema.asGroupType().getType("element"); + } + } else { + throw new ParquetDecodingException("Expected list but got: " + parquetType); + } + + converter = newMessageConverter(parentBuilder, fieldDescriptor, parquetSchema); + } + + @Override + public Converter getConverter(int fieldIndex) { + if (fieldIndex > 0) { + throw new ParquetDecodingException("Unexpected multiple fields in the LIST wrapper"); + } + + if (listOfMessage) { + return converter; + } + + return new GroupConverter() { + @Override + public Converter getConverter(int fieldIndex) { + return converter; + } + + @Override + public void start() { + + } + + @Override + public void end() { + + } + }; + } + + @Override + public void start() { + + } + + @Override + public void end() { + + } + } + + + final class MapConverter extends GroupConverter { + private final Converter converter; + + public MapConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { + OriginalType originalType = parquetType.getOriginalType(); + if (originalType != OriginalType.MAP) { + throw new ParquetDecodingException("Expected MAP wrapper. Found: " + originalType + " instead."); + } + + Type parquetSchema; + if (parquetType.asGroupType().containsField("key_value")){ + parquetSchema = parquetType.asGroupType().getType("key_value"); + } else { + throw new ParquetDecodingException("Expected map but got: " + parquetType); + } + + converter = newMessageConverter(parentBuilder, fieldDescriptor, parquetSchema); + } + + @Override + public Converter getConverter(int fieldIndex) { + if (fieldIndex > 0) { + throw new ParquetDecodingException("Unexpected multiple fields in the MAP wrapper"); + } + return converter; + } + + @Override + public void start() { + } + + @Override + public void end() { + } + } } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java index 2c4a1caeec..f3dd11db38 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java @@ -18,30 +18,26 @@ */ package org.apache.parquet.proto; -import static org.apache.parquet.schema.OriginalType.ENUM; -import static org.apache.parquet.schema.OriginalType.UTF8; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; - -import java.util.List; - +import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.FieldDescriptor.JavaType; +import com.google.protobuf.Message; +import com.twitter.elephantbird.util.Protobufs; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; import org.apache.parquet.schema.Types.Builder; import org.apache.parquet.schema.Types.GroupBuilder; - -import com.google.protobuf.Descriptors; -import com.google.protobuf.Descriptors.FieldDescriptor.JavaType; -import com.google.protobuf.Message; -import com.twitter.elephantbird.util.Protobufs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + +import static org.apache.parquet.schema.OriginalType.ENUM; +import static org.apache.parquet.schema.OriginalType.UTF8; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*; + /** *

* Converts a Protocol Buffer Descriptor into a Parquet schema. @@ -83,26 +79,105 @@ private Type.Repetition getRepetition(Descriptors.FieldDescriptor descriptor) { } } - private Builder>, GroupBuilder> addField(Descriptors.FieldDescriptor descriptor, GroupBuilder builder) { - Type.Repetition repetition = getRepetition(descriptor); - JavaType javaType = descriptor.getJavaType(); + private Builder>, GroupBuilder> addField(Descriptors.FieldDescriptor descriptor, final GroupBuilder builder) { + if (descriptor.getJavaType() == JavaType.MESSAGE) { + return addMessageField(descriptor, builder); + } + + ParquetType parquetType = getParquetType(descriptor); + if (descriptor.isRepeated()) { + return addRepeatedPrimitive(descriptor, parquetType.primitiveType, parquetType.originalType, builder); + } + + return builder.primitive(parquetType.primitiveType, getRepetition(descriptor)).as(parquetType.originalType); + } + + private Builder>, GroupBuilder> addRepeatedPrimitive(Descriptors.FieldDescriptor descriptor, + PrimitiveTypeName primitiveType, + OriginalType originalType, + final GroupBuilder builder) { + return builder + .group(Type.Repetition.REQUIRED).as(OriginalType.LIST) + .group(Type.Repetition.REPEATED) + .primitive(primitiveType, Type.Repetition.REQUIRED).as(originalType) + .named("element") + .named("list"); + } + + private GroupBuilder> addRepeatedMessage(Descriptors.FieldDescriptor descriptor, GroupBuilder builder) { + GroupBuilder>> result = + builder + .group(Type.Repetition.REQUIRED).as(OriginalType.LIST) + .group(Type.Repetition.REPEATED); + + convertFields(result, descriptor.getMessageType().getFields()); + + return result.named("list"); + } + + private GroupBuilder> addMessageField(Descriptors.FieldDescriptor descriptor, final GroupBuilder builder) { + if (descriptor.isMapField()) { + return addMapField(descriptor, builder); + } else if (descriptor.isRepeated()) { + return addRepeatedMessage(descriptor, builder); + } + + // Plain message + GroupBuilder> group = builder.group(getRepetition(descriptor)); + convertFields(group, descriptor.getMessageType().getFields()); + return group; + } + + private GroupBuilder> addMapField(Descriptors.FieldDescriptor descriptor, final GroupBuilder builder) { + List fields = descriptor.getMessageType().getFields(); + if (fields.size() != 2) { + throw new UnsupportedOperationException("Expected two fields for the map (key/value), but got: " + fields); + } + + ParquetType mapKeyParquetType = getParquetType(fields.get(0)); + + GroupBuilder>> group = builder + .group(Type.Repetition.REQUIRED).as(OriginalType.MAP) + .group(Type.Repetition.REPEATED) // key_value wrapper + .primitive(mapKeyParquetType.primitiveType, Type.Repetition.REQUIRED).as(mapKeyParquetType.originalType).named("key"); + + return addField(fields.get(1), group).named("value") + .named("key_value"); + } + + private ParquetType getParquetType(Descriptors.FieldDescriptor fieldDescriptor) { + + JavaType javaType = fieldDescriptor.getJavaType(); switch (javaType) { - case BOOLEAN: return builder.primitive(BOOLEAN, repetition); - case INT: return builder.primitive(INT32, repetition); - case LONG: return builder.primitive(INT64, repetition); - case FLOAT: return builder.primitive(FLOAT, repetition); - case DOUBLE: return builder.primitive(DOUBLE, repetition); - case BYTE_STRING: return builder.primitive(BINARY, repetition); - case STRING: return builder.primitive(BINARY, repetition).as(UTF8); - case MESSAGE: { - GroupBuilder> group = builder.group(repetition); - convertFields(group, descriptor.getMessageType().getFields()); - return group; - } - case ENUM: return builder.primitive(BINARY, repetition).as(ENUM); + case INT: return ParquetType.of(INT32); + case LONG: return ParquetType.of(INT64); + case DOUBLE: return ParquetType.of(DOUBLE); + case BOOLEAN: return ParquetType.of(BOOLEAN); + case FLOAT: return ParquetType.of(FLOAT); + case STRING: return ParquetType.of(BINARY, UTF8); + case ENUM: return ParquetType.of(BINARY, ENUM); + case BYTE_STRING: return ParquetType.of(BINARY); default: throw new UnsupportedOperationException("Cannot convert Protocol Buffer: unknown type " + javaType); } } + private static class ParquetType { + PrimitiveTypeName primitiveType; + OriginalType originalType; + + private ParquetType(PrimitiveTypeName primitiveType, OriginalType originalType) { + this.primitiveType = primitiveType; + this.originalType = originalType; + } + + public static ParquetType of(PrimitiveTypeName primitiveType, OriginalType originalType) { + return new ParquetType(primitiveType, originalType); + } + + public static ParquetType of(PrimitiveTypeName primitiveType) { + return of(primitiveType, null); + } + } + } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java index c0ed351046..8e2b4aeb44 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java @@ -21,6 +21,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Descriptors; +import com.google.protobuf.MapEntry; import com.google.protobuf.Message; import com.google.protobuf.MessageOrBuilder; import com.google.protobuf.TextFormat; @@ -34,11 +35,13 @@ import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.IncompatibleSchemaModificationException; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Array; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -162,7 +165,7 @@ class MessageWriter extends FieldWriter { Type type = schema.getType(name); FieldWriter writer = createWriter(fieldDescriptor, type); - if(fieldDescriptor.isRepeated()) { + if(fieldDescriptor.isRepeated() && !fieldDescriptor.isMapField()) { writer = new ArrayWriter(writer); } @@ -177,7 +180,7 @@ private FieldWriter createWriter(Descriptors.FieldDescriptor fieldDescriptor, Ty switch (fieldDescriptor.getJavaType()) { case STRING: return new StringWriter() ; - case MESSAGE: return new MessageWriter(fieldDescriptor.getMessageType(), type.asGroupType()); + case MESSAGE: return createMessageWriter(fieldDescriptor, type); case INT: return new IntWriter(); case LONG: return new LongWriter(); case FLOAT: return new FloatWriter(); @@ -190,6 +193,47 @@ private FieldWriter createWriter(Descriptors.FieldDescriptor fieldDescriptor, Ty return unknownType(fieldDescriptor);//should not be executed, always throws exception. } + private FieldWriter createMessageWriter(Descriptors.FieldDescriptor fieldDescriptor, Type type) { + if (fieldDescriptor.isMapField()) { + return createMapWriter(fieldDescriptor, type); + } + + return new MessageWriter(fieldDescriptor.getMessageType(), getGroupType(type)); + } + + private GroupType getGroupType(Type type) { + if (type.getOriginalType() == OriginalType.LIST) { + return type.asGroupType().getType("list").asGroupType(); + } + + if (type.getOriginalType() == OriginalType.MAP) { + return type.asGroupType().getType("key_value").asGroupType().getType("value").asGroupType(); + } + + return type.asGroupType(); + } + + private MapWriter createMapWriter(Descriptors.FieldDescriptor fieldDescriptor, Type type) { + List fields = fieldDescriptor.getMessageType().getFields(); + if (fields.size() != 2) { + throw new UnsupportedOperationException("Expected two fields for the map (key/value), but got: " + fields); + } + + // KeyFieldWriter + Descriptors.FieldDescriptor keyProtoField = fields.get(0); + FieldWriter keyWriter = createWriter(keyProtoField, type); + keyWriter.setFieldName(keyProtoField.getName()); + keyWriter.setIndex(0); + + // ValueFieldWriter + Descriptors.FieldDescriptor valueProtoField = fields.get(1); + FieldWriter valueWriter = createWriter(valueProtoField, type); + valueWriter.setFieldName(valueProtoField.getName()); + valueWriter.setIndex(1); + + return new MapWriter(keyWriter, valueWriter); + } + /** Writes top level message. It cannot call startGroup() */ void writeTopLevelMessage(Object value) { writeAllFields((MessageOrBuilder) value); @@ -198,9 +242,7 @@ void writeTopLevelMessage(Object value) { /** Writes message as part of repeated field. It cannot start field*/ @Override final void writeRawValue(Object value) { - recordConsumer.startGroup(); writeAllFields((MessageOrBuilder) value); - recordConsumer.endGroup(); } /** Used for writing nonrepeated (optional, required) fields*/ @@ -247,16 +289,32 @@ final void writeRawValue(Object value) { @Override final void writeField(Object value) { recordConsumer.startField(fieldName, index); + recordConsumer.startGroup(); List list = (List) value; + recordConsumer.startField("list", 0); // This is the wrapper group for the array field for (Object listEntry: list) { + recordConsumer.startGroup(); + if (isPrimitive(listEntry)) { + recordConsumer.startField("element", 0); + } fieldWriter.writeRawValue(listEntry); + if (isPrimitive(listEntry)) { + recordConsumer.endField("element", 0); + } + recordConsumer.endGroup(); } + recordConsumer.endField("list", 0); + recordConsumer.endGroup(); recordConsumer.endField(fieldName, index); } } + private boolean isPrimitive(Object listEntry) { + return !(listEntry instanceof Message); + } + /** validates mapping between protobuffer fields and parquet fields.*/ private void validatedMapping(Descriptors.Descriptor descriptor, GroupType parquetSchema) { List allFields = descriptor.getFields(); @@ -296,6 +354,35 @@ final void writeRawValue(Object value) { } } + class MapWriter extends FieldWriter { + + private final FieldWriter keyWriter; + private final FieldWriter valueWriter; + + public MapWriter(FieldWriter keyWriter, FieldWriter valueWriter) { + super(); + this.keyWriter = keyWriter; + this.valueWriter = valueWriter; + } + + @Override + final void writeRawValue(Object value) { + recordConsumer.startGroup(); + + recordConsumer.startField("key_value", 0); // This is the wrapper group for the map field + for(MapEntry entry : (Collection>) value) { + recordConsumer.startGroup(); + keyWriter.writeField(entry.getKey()); + valueWriter.writeField(entry.getValue()); + recordConsumer.endGroup(); + } + + recordConsumer.endField("key_value", 0); + + recordConsumer.endGroup(); + } + } + class FloatWriter extends FieldWriter { @Override final void writeRawValue(Object value) { diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java index 6f5ff53b69..70bc1f79d9 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java @@ -103,10 +103,12 @@ public void testProto3ConvertAllDatatypes() throws Exception { " optional binary optionalEnum (ENUM) = 18;" + " optional int32 someInt32 = 19;" + " optional binary someString (UTF8) = 20;" + - " repeated group optionalMap = 21 {\n" + - " optional int64 key = 1;\n" + - " optional group value = 2 {\n" + - " optional int32 someId = 3;\n" + + " required group optionalMap (MAP) = 21 {\n" + + " repeated group key_value {\n" + + " required int64 key;\n" + + " optional group value {\n" + + " optional int32 someId = 3;\n" + + " }\n" + " }\n" + " }\n" + "}"; @@ -120,16 +122,22 @@ public void testConvertRepetition() throws Exception { "message TestProtobuf.SchemaConverterRepetition {\n" + " optional int32 optionalPrimitive = 1;\n" + " required int32 requiredPrimitive = 2;\n" + - " repeated int32 repeatedPrimitive = 3;\n" + + " required group repeatedPrimitive (LIST) = 3 {\n" + + " repeated group list {\n" + + " required int32 element;\n" + + " }\n" + + " }\n" + " optional group optionalMessage = 7 {\n" + " optional int32 someId = 3;\n" + " }\n" + - " required group requiredMessage = 8 {" + + " required group requiredMessage = 8 {\n" + " optional int32 someId= 3;\n" + " }\n" + - " repeated group repeatedMessage = 9 {" + - " optional int32 someId = 3;\n" + - " }\n" + + " required group repeatedMessage (LIST) = 9 {\n" + + " repeated group list {\n" + + " optional int32 someId = 3;\n" + + " }\n" + + " }" + "}"; testConversion(TestProtobuf.SchemaConverterRepetition.class, expectedSchema); @@ -140,12 +148,18 @@ public void testProto3ConvertRepetition() throws Exception { String expectedSchema = "message TestProto3.SchemaConverterRepetition {\n" + " optional int32 optionalPrimitive = 1;\n" + - " repeated int32 repeatedPrimitive = 3;\n" + + " required group repeatedPrimitive (LIST) = 3 {\n" + + " repeated group list {\n" + + " required int32 element;\n" + + " }\n" + + " }\n" + " optional group optionalMessage = 7 {\n" + " optional int32 someId = 3;\n" + " }\n" + - " repeated group repeatedMessage = 9 {" + - " optional int32 someId = 3;\n" + + " required group repeatedMessage (LIST) = 9 {\n" + + " repeated group list {\n" + + " optional int32 someId = 3;\n" + + " }\n" + " }\n" + "}"; diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java index b937618c3b..e00facfc06 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java @@ -94,8 +94,23 @@ public void testRepeatedIntMessage() throws Exception { inOrder.verify(readConsumerMock).startMessage(); inOrder.verify(readConsumerMock).startField("repeatedInt", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("list", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("element", 0); inOrder.verify(readConsumerMock).addInteger(1323); + inOrder.verify(readConsumerMock).endField("element", 0); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("element", 0); inOrder.verify(readConsumerMock).addInteger(54469); + inOrder.verify(readConsumerMock).endField("element", 0); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("list", 0); + inOrder.verify(readConsumerMock).endGroup(); inOrder.verify(readConsumerMock).endField("repeatedInt", 0); inOrder.verify(readConsumerMock).endMessage(); Mockito.verifyNoMoreInteractions(readConsumerMock); @@ -116,8 +131,23 @@ public void testProto3RepeatedIntMessage() throws Exception { inOrder.verify(readConsumerMock).startMessage(); inOrder.verify(readConsumerMock).startField("repeatedInt", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("list", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("element", 0); inOrder.verify(readConsumerMock).addInteger(1323); + inOrder.verify(readConsumerMock).endField("element", 0); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("element", 0); inOrder.verify(readConsumerMock).addInteger(54469); + inOrder.verify(readConsumerMock).endField("element", 0); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("list", 0); + inOrder.verify(readConsumerMock).endGroup(); inOrder.verify(readConsumerMock).endField("repeatedInt", 0); inOrder.verify(readConsumerMock).endMessage(); Mockito.verifyNoMoreInteractions(readConsumerMock); @@ -138,6 +168,8 @@ public void testRepeatedInnerMessageMessage_message() throws Exception { inOrder.verify(readConsumerMock).startMessage(); inOrder.verify(readConsumerMock).startField("inner", 0); inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("list", 0); + inOrder.verify(readConsumerMock).startGroup(); inOrder.verify(readConsumerMock).startField("one", 0); inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes())); inOrder.verify(readConsumerMock).endField("one", 0); @@ -145,6 +177,8 @@ public void testRepeatedInnerMessageMessage_message() throws Exception { inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes())); inOrder.verify(readConsumerMock).endField("two", 1); inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("list", 0); + inOrder.verify(readConsumerMock).endGroup(); inOrder.verify(readConsumerMock).endField("inner", 0); inOrder.verify(readConsumerMock).endMessage(); Mockito.verifyNoMoreInteractions(readConsumerMock); @@ -165,6 +199,8 @@ public void testProto3RepeatedInnerMessageMessage_message() throws Exception { inOrder.verify(readConsumerMock).startMessage(); inOrder.verify(readConsumerMock).startField("inner", 0); inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("list", 0); + inOrder.verify(readConsumerMock).startGroup(); inOrder.verify(readConsumerMock).startField("one", 0); inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes())); inOrder.verify(readConsumerMock).endField("one", 0); @@ -172,6 +208,8 @@ public void testProto3RepeatedInnerMessageMessage_message() throws Exception { inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes())); inOrder.verify(readConsumerMock).endField("two", 1); inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("list", 0); + inOrder.verify(readConsumerMock).endGroup(); inOrder.verify(readConsumerMock).endField("inner", 0); inOrder.verify(readConsumerMock).endMessage(); Mockito.verifyNoMoreInteractions(readConsumerMock); @@ -192,6 +230,9 @@ public void testRepeatedInnerMessageMessage_scalar() throws Exception { inOrder.verify(readConsumerMock).startMessage(); inOrder.verify(readConsumerMock).startField("inner", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("list", 0); + //first inner message inOrder.verify(readConsumerMock).startGroup(); inOrder.verify(readConsumerMock).startField("one", 0); @@ -206,6 +247,8 @@ public void testRepeatedInnerMessageMessage_scalar() throws Exception { inOrder.verify(readConsumerMock).endField("two", 1); inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("list", 0); + inOrder.verify(readConsumerMock).endGroup(); inOrder.verify(readConsumerMock).endField("inner", 0); inOrder.verify(readConsumerMock).endMessage(); Mockito.verifyNoMoreInteractions(readConsumerMock); @@ -226,6 +269,9 @@ public void testProto3RepeatedInnerMessageMessage_scalar() throws Exception { inOrder.verify(readConsumerMock).startMessage(); inOrder.verify(readConsumerMock).startField("inner", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("list", 0); + //first inner message inOrder.verify(readConsumerMock).startGroup(); inOrder.verify(readConsumerMock).startField("one", 0); @@ -240,6 +286,8 @@ public void testProto3RepeatedInnerMessageMessage_scalar() throws Exception { inOrder.verify(readConsumerMock).endField("two", 1); inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("list", 0); + inOrder.verify(readConsumerMock).endGroup(); inOrder.verify(readConsumerMock).endField("inner", 0); inOrder.verify(readConsumerMock).endMessage(); Mockito.verifyNoMoreInteractions(readConsumerMock);