From 9f7da64cca9393d21b1c9d39961180bbcc2b26fe Mon Sep 17 00:00:00 2001 From: Qinghui Xu Date: Fri, 7 Apr 2017 18:10:45 +0200 Subject: [PATCH 1/6] Parquet-protobuf build independent of environment Use protobuf-maven-plugin to manage the protoc for compiling proto. This way the build is independent of the environment and no protoc is needed to be installed. --- parquet-protobuf/pom.xml | 33 ++++++++++++++------------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/parquet-protobuf/pom.xml b/parquet-protobuf/pom.xml index 978a964c4d..29f81ab4da 100644 --- a/parquet-protobuf/pom.xml +++ b/parquet-protobuf/pom.xml @@ -82,6 +82,7 @@ ${slf4j.version} test + @@ -92,6 +93,12 @@ + + + kr.motd.maven + os-maven-plugin + + org.apache.maven.plugins @@ -146,32 +153,20 @@ - maven-antrun-plugin + org.xolstice.maven.plugins + protobuf-maven-plugin + + ${project.basedir}/src/test/resources + com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + - generate-sources - generate-test-sources - - - - - - - - - - - - src/main/java - target/generated-sources/java - - run + test-compile - From e5b90ea0154676fbf8e9a704088d01ba98996323 Mon Sep 17 00:00:00 2001 From: Qinghui Xu Date: Wed, 12 Apr 2017 14:18:51 +0200 Subject: [PATCH 2/6] Keep track of field id in parqet file metadata Field id is the key element for serialization framework such as protobuf. Persist field id in the file metadata if the schema supports it. --- .../java/org/apache/parquet/schema/Type.java | 15 ++++++++--- .../converter/ParquetMetadataConverter.java | 27 ++++++++++++++----- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Type.java b/parquet-column/src/main/java/org/apache/parquet/schema/Type.java index 99222f96c3..a442342fea 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/Type.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/Type.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -49,6 +49,15 @@ public int intValue() { return id; } + /** + * This method is solely used by the jackson json inspector to serialize an {@link ID} object. + * + * @return Int value of this id. + */ + public int getValue() { + return id; + } + @Override public boolean equals(Object obj) { return (obj instanceof ID) && ((ID)obj).id == id; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index bf22b617b3..218b714dfe 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -139,7 +139,7 @@ private void addToList(final List result, org.apache.parquet.sche field.accept(new TypeVisitor() { @Override public void visit(PrimitiveType primitiveType) { - SchemaElement element = new SchemaElement(primitiveType.getName()); + SchemaElement element = schemaElementfromType(primitiveType); element.setRepetition_type(toParquetRepetition(primitiveType.getRepetition())); element.setType(getType(primitiveType.getPrimitiveTypeName())); if (primitiveType.getOriginalType() != null) { @@ -157,13 +157,13 @@ public void visit(PrimitiveType primitiveType) { @Override public void visit(MessageType messageType) { - SchemaElement element = new SchemaElement(messageType.getName()); + SchemaElement element = schemaElementfromType(messageType); visitChildren(result, messageType.asGroupType(), element); } @Override public void visit(GroupType groupType) { - SchemaElement element = new SchemaElement(groupType.getName()); + SchemaElement element = schemaElementfromType(groupType); element.setRepetition_type(toParquetRepetition(groupType.getRepetition())); if (groupType.getOriginalType() != null) { element.setConverted_type(getConvertedType(groupType.getOriginalType())); @@ -182,6 +182,21 @@ private void visitChildren(final List result, }); } + /** + * Build a {@link SchemaElement} from {@link org.apache.parquet.schema.Type} with the field's name, and keep the field + * id if the field has one. + * + * @param field a field of the parquet schema + * @return SchemaElement + */ + private static SchemaElement schemaElementfromType(org.apache.parquet.schema.Type field) { + SchemaElement element = new SchemaElement(field.getName()); + if (field.getId() != null) { + element.setField_id(field.getId().intValue()); + } + return element; + } + private void addRowGroup(ParquetMetadata parquetMetadata, List rowGroups, BlockMetaData block) { //rowGroup.total_byte_size = ; List columns = block.getColumns(); From c9f4278091aadc268cb8e01f165e666c654ed88e Mon Sep 17 00:00:00 2001 From: Qinghui Xu Date: Fri, 14 Apr 2017 14:50:26 +0200 Subject: [PATCH 3/6] Handle field id for parquet protobuf When read back a parquet file with protobuf schema, try to match the parquet schema to protobuf with respect to field id. If the parquet file is written with a previsous parquet-protobuf ( prior to 1.9.1), field id is not persisted in parquet file metadata. When reading these legacy file, schema converter falls back to matching fields by name. --- .../parquet/proto/ProtoMessageConverter.java | 11 ++-- .../proto/ProtoSchemaConverterTest.java | 55 ++++++++++++++++++- 2 files changed, 60 insertions(+), 6 deletions(-) diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java index b5649a05b6..b22a3556e3 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -76,7 +76,10 @@ class ProtoMessageConverter extends GroupConverter { Descriptors.Descriptor protoDescriptor = builder.getDescriptorForType(); for (Type parquetField : parquetSchema.getFields()) { - Descriptors.FieldDescriptor protoField = protoDescriptor.findFieldByName(parquetField.getName()); + // Find field by id, fall back to find field by name if no id found (legacy schema). + Descriptors.FieldDescriptor protoField = parquetField.getId() == null ? + protoDescriptor.findFieldByName(parquetField.getName()) : + protoDescriptor.findFieldByNumber(parquetField.getId().intValue()); if (protoField == null) { String description = "Scheme mismatch \n\"" + parquetField + "\"" + diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java index 6f5ff53b69..6408244b7e 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java @@ -18,12 +18,21 @@ */ package org.apache.parquet.proto; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.FieldDescriptor.JavaType; import com.google.protobuf.Message; -import org.junit.Test; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.proto.test.TestProto3; import org.apache.parquet.proto.test.TestProtobuf; +import org.apache.parquet.proto.test.TestProtobuf.SchemaConverterAllDatatypes; +import org.apache.parquet.proto.utils.WriteUsingMR; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.Type; +import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -72,7 +81,7 @@ public void testConvertAllDatatypes() throws Exception { " optional binary optionalEnum (ENUM) = 18;" + "}"; - testConversion(TestProtobuf.SchemaConverterAllDatatypes.class, expectedSchema); + testConversion(SchemaConverterAllDatatypes.class, expectedSchema); } /** @@ -151,4 +160,46 @@ public void testProto3ConvertRepetition() throws Exception { testConversion(TestProto3.SchemaConverterRepetition.class, expectedSchema); } + + /** + * Test the persistent metadata contains field id. + * + * @throws Exception if test fails. + */ + @Test + public void testConvertProtoSchema2ParquetMetadata() throws Exception { + ProtoSchemaConverter schemaConverter = new ProtoSchemaConverter(); + MessageType parquetSchema = schemaConverter.convert(SchemaConverterAllDatatypes.class); + // Test if the converted schema respects the field id, index and name. + for (Descriptors.FieldDescriptor field : SchemaConverterAllDatatypes.getDescriptor().getFields()) { + checkFieldConversion(parquetSchema, field); + } + + final WriteUsingMR writeParquet = new WriteUsingMR(); + Path parquetPath = writeParquet.write(SchemaConverterAllDatatypes.getDefaultInstance()); + FileStatus[] files = parquetPath.getFileSystem(writeParquet.getConfiguration()).listStatus(parquetPath); + int parquetFileCount = 0; + for (FileStatus file : files) { + if (file.getPath().getName().endsWith("parquet")) { + parquetFileCount ++; + ParquetFileReader parquetReader = ParquetFileReader.open(writeParquet.getConfiguration(), file.getPath()); + MessageType outputSchema = parquetReader.getFooter().getFileMetaData().getSchema(); + // Test if the output schema is same as the original one converted from protobuf descriptor, thus id is preserved. + assertEquals(outputSchema, parquetSchema); + } + } + // There will be only 1 file. + assertEquals(parquetFileCount, 1); + } + + private static void checkFieldConversion(GroupType parquetSchema, Descriptors.FieldDescriptor field) { + Type schemaField = parquetSchema.getType(field.getIndex()); + assertEquals(schemaField.getName(), field.getName()); + assertEquals(schemaField.getId().getValue(), field.getNumber()); + if (field.getJavaType() == JavaType.MESSAGE) { + for (Descriptors.FieldDescriptor subField : field.getMessageType().getFields()) { + checkFieldConversion((GroupType) schemaField, subField); + } + } + } } From 32b9e394e7f9a9e37c59ff6178e79e511a84e5e4 Mon Sep 17 00:00:00 2001 From: Qinghui Xu Date: Wed, 24 May 2017 17:39:59 +0200 Subject: [PATCH 4/6] Configurable behavior on the field id persistence Use parameter "parquet.schema.field.with.id" to enable schema field id persistence. When this flag is on, all schema fields should contain id (this is generally different from the field index which is the field position), otherwise it will be considered as an error. This flag is set as extra metadata in the footer, and its default value is false. For parquet-protobuf, this flag is systematically set to true, except that it's set to false explicitly in the configuration, which makes field id persistence as the default behavior for parquet-protobuf. --- .../converter/ParquetMetadataConverter.java | 33 +++++----- .../TestParquetMetadataConverter.java | 66 +++++++++++++++---- .../parquet/proto/ProtoMessageConverter.java | 18 ++--- .../parquet/proto/ProtoReadSupport.java | 14 ++-- .../parquet/proto/ProtoRecordConverter.java | 17 +++-- .../proto/ProtoRecordMaterializer.java | 13 ++-- .../parquet/proto/ProtoWriteSupport.java | 11 +++- 7 files changed, 115 insertions(+), 57 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 218b714dfe..5d9a0faca6 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -36,6 +36,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.CorruptStatistics; import org.apache.parquet.format.PageEncodingStats; @@ -62,14 +63,9 @@ import org.apache.parquet.column.EncodingStats; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.ParquetDecodingException; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.*; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type.Repetition; -import org.apache.parquet.schema.TypeVisitor; -import org.apache.parquet.schema.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +76,7 @@ public class ParquetMetadataConverter { public static final MetadataFilter NO_FILTER = new NoFilter(); public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter(); public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k + public static final String PARQUET_SCHEMA_FIELD_WITH_ID = "parquet.schema.field.with.id"; private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataConverter.class); @@ -115,7 +112,7 @@ public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parque } FileMetaData fileMetaData = new FileMetaData( currentVersion, - toParquetSchema(parquetMetadata.getFileMetaData().getSchema()), + toParquetSchema(parquetMetadata.getFileMetaData()), numRows, rowGroups); @@ -129,17 +126,18 @@ public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parque } // Visible for testing - List toParquetSchema(MessageType schema) { + List toParquetSchema(org.apache.parquet.hadoop.metadata.FileMetaData fileMetaData) { List result = new ArrayList(); - addToList(result, schema); + String withId = fileMetaData.getKeyValueMetaData().get(PARQUET_SCHEMA_FIELD_WITH_ID); + addToList(result, fileMetaData.getSchema(), !StringUtils.isBlank(withId) && Boolean.valueOf(withId)); return result; } - private void addToList(final List result, org.apache.parquet.schema.Type field) { + private void addToList(final List result, final org.apache.parquet.schema.Type field, final boolean withId) { field.accept(new TypeVisitor() { @Override public void visit(PrimitiveType primitiveType) { - SchemaElement element = schemaElementfromType(primitiveType); + SchemaElement element = schemaElementfromField(primitiveType, withId); element.setRepetition_type(toParquetRepetition(primitiveType.getRepetition())); element.setType(getType(primitiveType.getPrimitiveTypeName())); if (primitiveType.getOriginalType() != null) { @@ -157,13 +155,13 @@ public void visit(PrimitiveType primitiveType) { @Override public void visit(MessageType messageType) { - SchemaElement element = schemaElementfromType(messageType); + SchemaElement element = new SchemaElement(messageType.getName()); visitChildren(result, messageType.asGroupType(), element); } @Override public void visit(GroupType groupType) { - SchemaElement element = schemaElementfromType(groupType); + SchemaElement element = schemaElementfromField(groupType, withId); element.setRepetition_type(toParquetRepetition(groupType.getRepetition())); if (groupType.getOriginalType() != null) { element.setConverted_type(getConvertedType(groupType.getOriginalType())); @@ -176,7 +174,7 @@ private void visitChildren(final List result, element.setNum_children(groupType.getFieldCount()); result.add(element); for (org.apache.parquet.schema.Type field : groupType.getFields()) { - addToList(result, field); + addToList(result, field, withId); } } }); @@ -189,9 +187,12 @@ private void visitChildren(final List result, * @param field a field of the parquet schema * @return SchemaElement */ - private static SchemaElement schemaElementfromType(org.apache.parquet.schema.Type field) { + private static SchemaElement schemaElementfromField(org.apache.parquet.schema.Type field, boolean withId) { SchemaElement element = new SchemaElement(field.getName()); - if (field.getId() != null) { + if (withId) { + if (field.getId() == null) { + throw new InvalidSchemaException("Field id is required, but not found in " + field.toString()); + } element.setField_id(field.getId().intValue()); } return element; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java index 35c35c19c6..a5e3f8a89c 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -44,6 +44,7 @@ import java.util.Set; import java.util.TreeSet; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.Version; @@ -61,6 +62,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.InvalidSchemaException; import org.junit.Assert; import org.junit.Test; @@ -100,23 +102,55 @@ public void testPageHeader() throws IOException { @Test public void testSchemaConverter() { ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); - List parquetSchema = parquetMetadataConverter.toParquetSchema(Paper.schema); + List parquetSchema = parquetMetadataConverter.toParquetSchema(mockHadoopFileMetaData(Paper.schema, false)); MessageType schema = parquetMetadataConverter.fromParquetSchema(parquetSchema); assertEquals(Paper.schema, schema); } + @Test + public void testSchemaWithFieldId() { + final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); + final MessageType schema = Types.buildMessage() + .required(PrimitiveTypeName.BINARY) + .as(OriginalType.UTF8) + .id(1) + .named("stringField") + .optional(PrimitiveTypeName.INT32) + .as(OriginalType.INT_32) + .id(2) + .named("intField") + .named("Message"); + List parquetSchema = parquetMetadataConverter.toParquetSchema(mockHadoopFileMetaData(schema, true)); + assertEquals(schema, parquetMetadataConverter.fromParquetSchema(parquetSchema)); + } + + @Test(expected = InvalidSchemaException.class) + public void testSchemaExpectingFieldId() { + final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); + final MessageType messageType = Types.buildMessage() + .required(PrimitiveTypeName.BINARY) + .as(OriginalType.UTF8) + .named("stringField") + .optional(PrimitiveTypeName.INT32) + .as(OriginalType.INT_32) + .named("intField") + .named("Message"); + parquetMetadataConverter.toParquetSchema(mockHadoopFileMetaData(messageType, true)); + } + @Test public void testSchemaConverterDecimal() { ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); + final MessageType messageType = Types.buildMessage() + .required(PrimitiveTypeName.BINARY) + .as(OriginalType.DECIMAL).precision(9).scale(2) + .named("aBinaryDecimal") + .optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(4) + .as(OriginalType.DECIMAL).precision(9).scale(2) + .named("aFixedDecimal") + .named("Message"); List schemaElements = parquetMetadataConverter.toParquetSchema( - Types.buildMessage() - .required(PrimitiveTypeName.BINARY) - .as(OriginalType.DECIMAL).precision(9).scale(2) - .named("aBinaryDecimal") - .optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(4) - .as(OriginalType.DECIMAL).precision(9).scale(2) - .named("aFixedDecimal") - .named("Message") + mockHadoopFileMetaData(messageType, false) ); List expected = Lists.newArrayList( new SchemaElement("Message").setNum_children(2), @@ -164,6 +198,12 @@ public void testEnumEquivalence() { } } + private org.apache.parquet.hadoop.metadata.FileMetaData mockHadoopFileMetaData(MessageType messageType, boolean withId) { + return new org.apache.parquet.hadoop.metadata.FileMetaData(messageType, + withId ? ImmutableMap.of(ParquetMetadataConverter.PARQUET_SCHEMA_FIELD_WITH_ID, "true") : Collections.emptyMap(), + null); + } + private FileMetaData metadata(long... sizes) { List schema = emptyList(); List rowGroups = new ArrayList(); @@ -330,7 +370,7 @@ private ColumnChunkMetaData createColumnChunkMetaData() { 0, 0, 0, 0, 0); return md; } - + @Test public void testEncodingsCache() { ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java index b22a3556e3..d1061ae83f 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java @@ -22,7 +22,9 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.twitter.elephantbird.util.Protobufs; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.column.Dictionary; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.Converter; @@ -51,15 +53,15 @@ class ProtoMessageConverter extends GroupConverter { private final Converter[] converters; private final ParentValueContainer parent; private final Message.Builder myBuilder; + private final boolean readFieldById; // used in record converter - ProtoMessageConverter(ParentValueContainer pvc, Class protoClass, GroupType parquetSchema) { - this(pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema); + ProtoMessageConverter(ParentValueContainer pvc, Class protoClass, GroupType parquetSchema, boolean readFieldById) { + this(pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema, readFieldById); } - // For usage in message arrays - ProtoMessageConverter(ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema) { + ProtoMessageConverter(ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema, boolean readFieldById) { int schemaSize = parquetSchema.getFieldCount(); converters = new Converter[schemaSize]; @@ -72,12 +74,12 @@ class ProtoMessageConverter extends GroupConverter { } myBuilder = builder; + this.readFieldById = readFieldById; Descriptors.Descriptor protoDescriptor = builder.getDescriptorForType(); - for (Type parquetField : parquetSchema.getFields()) { - // Find field by id, fall back to find field by name if no id found (legacy schema). - Descriptors.FieldDescriptor protoField = parquetField.getId() == null ? + // Find field by id, fall back to find field by name if either flag is set to false explicitly or no id found (legacy schema). + Descriptors.FieldDescriptor protoField = !readFieldById || (parquetField.getId() == null) ? protoDescriptor.findFieldByName(parquetField.getName()) : protoDescriptor.findFieldByNumber(parquetField.getId().intValue()); @@ -151,7 +153,7 @@ private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder p case LONG: return new ProtoLongConverter(pvc); case MESSAGE: { Message.Builder subBuilder = parentBuilder.newBuilderForField(fieldDescriptor); - return new ProtoMessageConverter(pvc, subBuilder, parquetType.asGroupType()); + return new ProtoMessageConverter(pvc, subBuilder, parquetType.asGroupType(), readFieldById); } } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java index 3a21d84486..b5c7a2fd5b 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,6 +21,7 @@ import com.google.protobuf.Message; import com.twitter.elephantbird.util.Protobufs; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.api.InitContext; import org.apache.parquet.hadoop.api.ReadSupport; import org.apache.parquet.io.api.RecordMaterializer; @@ -90,8 +91,11 @@ public RecordMaterializer prepareForRead(Configuration configuration, Map protobufClass = Protobufs.getProtobufClass(headerProtoClass); - return new ProtoRecordMaterializer(requestedSchema, protobufClass); + // If the parquet.schema.field.with.id is set on in the footer, map the schema with field id, + // but users have the possibility to fallback to the old behavior of mapping field by name, by disabling explicitly the flag in the configuration. + final boolean readFieldById = Boolean.valueOf(keyValueMetaData.get(ParquetMetadataConverter.PARQUET_SCHEMA_FIELD_WITH_ID)) + && configuration.getBoolean(ParquetMetadataConverter.PARQUET_SCHEMA_FIELD_WITH_ID, true); + return new ProtoRecordMaterializer(requestedSchema, protobufClass, readFieldById); } - } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java index 99153a98cc..fb0fd2adf0 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,8 +21,11 @@ import com.google.protobuf.Message; import com.google.protobuf.MessageOrBuilder; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.schema.MessageType; +import java.util.Map; + /** * Converts data content of root message from Protocol Buffer message to parquet message. * It delegates conversion of inner fields to {@link ProtoMessageConverter} class using inheritance. @@ -46,13 +49,13 @@ public void add(Object a) { } - public ProtoRecordConverter(Class protoclass, MessageType parquetSchema) { - super(new SkipParentValueContainer(), protoclass, parquetSchema); + public ProtoRecordConverter(Class protoclass, MessageType parquetSchema, boolean readFieldById) { + super(new SkipParentValueContainer(), protoclass, parquetSchema, readFieldById); reusedBuilder = getBuilder(); } - public ProtoRecordConverter(Message.Builder builder, MessageType parquetSchema) { - super(new SkipParentValueContainer(), builder, parquetSchema); + public ProtoRecordConverter(Message.Builder builder, MessageType parquetSchema, boolean readFieldById) { + super(new SkipParentValueContainer(), builder, parquetSchema, readFieldById); reusedBuilder = getBuilder(); } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java index 039a571137..fe5069d215 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,16 +20,19 @@ import com.google.protobuf.Message; import com.google.protobuf.MessageOrBuilder; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; +import java.util.Map; + class ProtoRecordMaterializer extends RecordMaterializer { private final ProtoRecordConverter root; - public ProtoRecordMaterializer(MessageType requestedSchema, Class protobufClass) { - this.root = new ProtoRecordConverter(protobufClass, requestedSchema); + public ProtoRecordMaterializer(MessageType requestedSchema, Class protobufClass, boolean readFieldById) { + this.root = new ProtoRecordConverter(protobufClass, requestedSchema, readFieldById); } @Override diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java index c0ed351046..9e6ae1e427 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,6 +26,7 @@ import com.google.protobuf.TextFormat; import com.twitter.elephantbird.util.Protobufs; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.BadConfigurationException; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.io.InvalidRecordException; @@ -118,6 +119,10 @@ public WriteContext init(Configuration configuration) { Map extraMetaData = new HashMap(); extraMetaData.put(ProtoReadSupport.PB_CLASS, protoMessage.getName()); extraMetaData.put(ProtoReadSupport.PB_DESCRIPTOR, serializeDescriptor(protoMessage)); + // By default, we will persist field id in the parquet schema metadata. + // This can be turned off by explicitly set the flag to false in the configuration. + extraMetaData.put(ParquetMetadataConverter.PARQUET_SCHEMA_FIELD_WITH_ID, + String.valueOf(configuration.getBoolean(ParquetMetadataConverter.PARQUET_SCHEMA_FIELD_WITH_ID, true))); return new WriteContext(rootSchema, extraMetaData); } From f65bd0a4c42311e34fc5d9f10f0ce3ff34f28a3c Mon Sep 17 00:00:00 2001 From: Qinghui Xu Date: Mon, 29 May 2017 17:49:37 +0200 Subject: [PATCH 5/6] Remove protoc install from travis Protoc is now managed by maven plugin. --- .travis.yml | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/.travis.yml b/.travis.yml index 72d70300be..f5157b2ae4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,20 +2,6 @@ language: java before_install: - sudo apt-get update -qq - sudo apt-get install build-essential - - mkdir protobuf_install - - pushd protobuf_install - - wget https://github.com/google/protobuf/archive/v3.2.0.tar.gz -O protobuf-3.2.0.tar.gz - - tar xzf protobuf-3.2.0.tar.gz - - cd protobuf-3.2.0 - - sudo apt-get install autoconf automake libtool curl make g++ unzip - - ./autogen.sh - - ./configure - - make - - make check - - sudo make install - - sudo ldconfig - - protoc --version - - popd - pwd - sudo apt-get install -qq libboost-dev libboost-test-dev libboost-program-options-dev libevent-dev automake libtool flex bison pkg-config g++ libssl-dev - wget -nv http://archive.apache.org/dist/thrift/0.7.0/thrift-0.7.0.tar.gz From 76887c0196bb67bf75be9f891b04842bc1fa3df5 Mon Sep 17 00:00:00 2001 From: Qinghui Xu Date: Wed, 31 May 2017 19:13:58 +0200 Subject: [PATCH 6/6] Protobuf schema evolution In this commit, we seperate the new behavior of reading fields by id and the old behavoir of parquet-protobuf. In new behavior: For schema containing field id (flag "parquet.schema.field.with.id" on), parquet-protobuf now supports reading parquet files with new protobuf schema after removing fields. When reading a field already removed from the schema, it will be treated as unknown field (for the moment the implementation just safely ignores them). Also, there is a more strict check on the field id presence. If the footer has the flag "parquet.schema.field.with.id" on, protobuf reader expects all fields in the schema contain id, or it will raise an error. This new behavior is the default behavior, but can be explicitly disabled by setting flag off in the configuration. This will fall back to the old behavior: mapping fields by name, and error will be raised if unknown fields are found. --- .../org/apache/parquet/schema/GroupType.java | 38 +++++++- .../apache/parquet/schema/MessageType.java | 33 ++++--- .../parquet/schema/TestMessageType.java | 69 +++++++++++++- .../parquet/hadoop/api/ReadSupport.java | 10 +- .../parquet/proto/ProtoMessageConverter.java | 92 ++++++++++++------- .../UnknownFieldIgnoreGroupConverter.java | 56 +++++++++++ .../UnknownFieldIgnorePrimitiveConverter.java | 64 +++++++++++++ .../proto/ProtoInputOutputFormatTest.java | 38 ++++++++ .../proto/ProtoSchemaConverterTest.java | 8 ++ .../parquet/proto/utils/ReadUsingMR.java | 16 +++- .../src/test/resources/TestProto3.proto | 29 ++++++ 11 files changed, 395 insertions(+), 58 deletions(-) create mode 100644 parquet-protobuf/src/main/java/org/apache/parquet/proto/UnknownFieldIgnoreGroupConverter.java create mode 100644 parquet-protobuf/src/main/java/org/apache/parquet/proto/UnknownFieldIgnorePrimitiveConverter.java diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java index 68dba979b8..4e422534ef 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -394,4 +394,36 @@ List mergeFields(GroupType toMerge, boolean strict) { } return newFields; } + + /** + * Check the projection schema is contained by the origin, and decorate projection schema with the origin field id if original schema has field id. + * This is needed because people usually do not put the field id when they describe a projection schema. + * + * @param projection Projection schema (or subschema of the projection) + * @param builder Builder for constructing a corresponding schema with field id (if origin contains) regarding to projection + * @param Type, which could be {@link MessageType} or {@link org.apache.parquet.schema.Types.GroupBuilder} + * @return builder or the final {@link MessageType}. + * @throws InvalidRecordException if check fails. + */ + protected T checkSubTypeAndDecorateWithFieldId(GroupType projection, Types.GroupBuilder builder) { + if (this.getRepetition() != projection.getRepetition()) { + throw new InvalidRecordException(projection + " found: expected " + this); + } + for (Type field : projection.getFields()) { + Type fieldInFile = this.getType(field.getName()); + if (fieldInFile == null) { + throw new InvalidRecordException(projection + " found: expected " + this); + } + if (fieldInFile.isPrimitive()) { + fieldInFile.checkContains(field); + builder.addField(fieldInFile.getId() == null ? field : field.withId(fieldInFile.getId().intValue())); + } else { + fieldInFile.asGroupType().checkSubTypeAndDecorateWithFieldId(field.asGroupType(), builder.group(field.getRepetition())); + } + } + if (this.getId() != null) { + builder.id(this.getId().intValue()); + } + return builder.as(this.getOriginalType()).named(this.getName()); + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java index 1e26ed2425..9b1ab47193 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -41,14 +41,14 @@ public MessageType(String name, Type... fields) { super(Repetition.REPEATED, name, fields); } - /** - * - * @param name the name of the type - * @param fields the fields contained by this message - */ - public MessageType(String name, List fields) { - super(Repetition.REPEATED, name, fields); - } + /** + * + * @param name the name of the type + * @param fields the fields contained by this message + */ + public MessageType(String name, List fields) { + super(Repetition.REPEATED, name, fields); + } /** * {@inheritDoc} @@ -145,4 +145,15 @@ public MessageType union(MessageType toMerge, boolean strict) { return new MessageType(this.getName(), mergeFields(toMerge, strict)); } + /** + * Check projection schema is subschema of this and append field id to projection if this contains field id. + * This is because we often just don't put field id for projection schema, but that is necessary when trying to read fields by id. + * + * @param projection + * @return Projection schema with field id. + * @throws InvalidRecordException if check fails. + */ + public MessageType checkSubTypeAndDecorateWithId(MessageType projection) { + return checkSubTypeAndDecorateWithFieldId(projection, Types.buildMessage()); + } } diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java index 4add1740ce..52d54e4a2d 100644 --- a/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java +++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java @@ -18,16 +18,17 @@ */ package org.apache.parquet.schema; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; import static org.apache.parquet.schema.OriginalType.LIST; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; import static org.apache.parquet.schema.Type.Repetition.REPEATED; import static org.apache.parquet.schema.Type.Repetition.REQUIRED; +import org.apache.parquet.io.InvalidRecordException; import org.junit.Test; import org.apache.parquet.example.Paper; @@ -200,4 +201,66 @@ public void testIDs() throws Exception { assertEquals(schema, schema2); assertEquals(schema.toString(), schema2.toString()); } + + @Test + public void testDecorateProjectionSchema() { + MessageType schema = new MessageType("Document", + new PrimitiveType(REQUIRED, INT64, "DocId").withId(1), + new GroupType(OPTIONAL, "Links", + new PrimitiveType(REPEATED, INT64, "Backward").withId(1), + new PrimitiveType(REPEATED, INT64, "Forward").withId(2) + ).withId(2), + new GroupType(REPEATED, "Name", + new GroupType(REPEATED, "Language", + new PrimitiveType(REQUIRED, BINARY, "Code").withId(1), + new PrimitiveType(OPTIONAL, BINARY, "Country").withId(2)) + .withId(1), + new PrimitiveType(OPTIONAL, BINARY, "Url").withId(2)) + .withId(3)); + + MessageType projection = new MessageType("Document", + new PrimitiveType(REQUIRED, INT64, "DocId"), + new GroupType(REPEATED, "Name", + new GroupType(REPEATED, "Language", + new PrimitiveType(OPTIONAL, BINARY, "Country")))); + + MessageType projectionWithId = schema.checkSubTypeAndDecorateWithId(projection); + projection.checkContains(projectionWithId); + projectionWithId.checkContains(projection); + assertFieldIdNotNull(projectionWithId); + } + + @Test(expected = InvalidRecordException.class) + public void testCheckProjectionSchema() { + MessageType schema = new MessageType("Document", + new PrimitiveType(REQUIRED, INT64, "DocId").withId(1), + new GroupType(OPTIONAL, "Links", + new PrimitiveType(REPEATED, INT64, "Backward").withId(1), + new PrimitiveType(REPEATED, INT64, "Forward").withId(2) + ).withId(2), + new GroupType(REPEATED, "Name", + new GroupType(REPEATED, "Language", + new PrimitiveType(REQUIRED, BINARY, "Code").withId(1), + new PrimitiveType(OPTIONAL, BINARY, "Country").withId(2)) + .withId(1), + new PrimitiveType(OPTIONAL, BINARY, "Url").withId(2)) + .withId(3)); + + MessageType projection = new MessageType("Document", + new PrimitiveType(REQUIRED, INT64, "DocId"), + new GroupType(REPEATED, "Name", + new GroupType(REQUIRED, "Language", // repetition wrong + new PrimitiveType(OPTIONAL, BINARY, "Country")))); + + schema.checkSubTypeAndDecorateWithId(projection); + } + + private void assertFieldIdNotNull(GroupType projectionWithId) { + for (Type field : projectionWithId.getFields()) { + assertTrue(field.getId() != null); + if (!field.isPrimitive()) { + assertFieldIdNotNull(field.asGroupType()); + } + } + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java index 6d8c1fdb67..7dbe842a4c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,6 +25,7 @@ import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.Types; /** * Abstraction used by the {@link org.apache.parquet.hadoop.ParquetInputFormat} to materialize records @@ -55,8 +56,7 @@ public static MessageType getSchemaForRead(MessageType fileMessageType, String p } public static MessageType getSchemaForRead(MessageType fileMessageType, MessageType projectedMessageType) { - fileMessageType.checkContains(projectedMessageType); - return projectedMessageType; + return fileMessageType.checkSubTypeAndDecorateWithId(projectedMessageType); } /** diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java index d1061ae83f..cbd926d953 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java @@ -18,11 +18,12 @@ */ package org.apache.parquet.proto; +import com.google.common.base.Function; +import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.twitter.elephantbird.util.Protobufs; -import org.apache.hadoop.conf.Configuration; import org.apache.parquet.column.Dictionary; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.io.InvalidRecordException; @@ -32,7 +33,11 @@ import org.apache.parquet.io.api.PrimitiveConverter; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.IncompatibleSchemaModificationException; +import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.Type; +import org.omg.CORBA.DynAnyPackage.Invalid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.List; @@ -49,10 +54,11 @@ * @author Lukas Nalezenec */ class ProtoMessageConverter extends GroupConverter { + private static final Logger LOG = LoggerFactory.getLogger(ProtoMessageConverter.class); private final Converter[] converters; private final ParentValueContainer parent; - private final Message.Builder myBuilder; + private final Message.Builder messageBuilder; private final boolean readFieldById; // used in record converter @@ -62,39 +68,19 @@ class ProtoMessageConverter extends GroupConverter { // For usage in message arrays ProtoMessageConverter(ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema, boolean readFieldById) { - - int schemaSize = parquetSchema.getFieldCount(); - converters = new Converter[schemaSize]; - - this.parent = pvc; - int parquetFieldIndex = 1; - if (pvc == null) { throw new IllegalStateException("Missing parent value container"); } - - myBuilder = builder; + this.parent = pvc; this.readFieldById = readFieldById; - - Descriptors.Descriptor protoDescriptor = builder.getDescriptorForType(); - for (Type parquetField : parquetSchema.getFields()) { - // Find field by id, fall back to find field by name if either flag is set to false explicitly or no id found (legacy schema). - Descriptors.FieldDescriptor protoField = !readFieldById || (parquetField.getId() == null) ? - protoDescriptor.findFieldByName(parquetField.getName()) : - protoDescriptor.findFieldByNumber(parquetField.getId().intValue()); - - if (protoField == null) { - String description = "Scheme mismatch \n\"" + parquetField + "\"" + - "\n proto descriptor:\n" + protoDescriptor.toProto(); - throw new IncompatibleSchemaModificationException("Cant find \"" + parquetField.getName() + "\" " + description); - } - - converters[parquetFieldIndex - 1] = newMessageConverter(myBuilder, protoField, parquetField); - - parquetFieldIndex++; - } + this.messageBuilder = builder; + this.converters = Lists.transform(parquetSchema.getFields(), convertField()) + .toArray(new Converter[parquetSchema.getFieldCount()]); } + private Function convertField() { + return readFieldById ? new ConvertFieldById() : new ConvertFieldByName(); + } @Override public Converter getConverter(int fieldIndex) { @@ -108,8 +94,8 @@ public void start() { @Override public void end() { - parent.add(myBuilder.build()); - myBuilder.clear(); + parent.add(messageBuilder.build()); + messageBuilder.clear(); } private Converter newMessageConverter(final Message.Builder parentBuilder, final Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { @@ -162,7 +148,7 @@ private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder p } public Message.Builder getBuilder() { - return myBuilder; + return messageBuilder; } static abstract class ParentValueContainer { @@ -350,4 +336,46 @@ public void addBinary(Binary binary) { } } + + /** + * Map parquet and protobuf schema by field names, error if no field found, this is the old behavior. + */ + private final class ConvertFieldByName implements Function { + + @Override + public Converter apply(Type parquetField) { + Descriptors.FieldDescriptor protoField = messageBuilder.getDescriptorForType().findFieldByName(parquetField.getName()); + if (protoField == null) { + String description = "Scheme mismatch \n\"" + parquetField + "\"" + + "\n proto descriptor:\n" + messageBuilder.getDescriptorForType().toProto(); + throw new IncompatibleSchemaModificationException("Cant find \"" + parquetField.getName() + "\" " + description); + } + + return newMessageConverter(messageBuilder, protoField, parquetField); + } + } + + /** + * Map parquet and protobuf schema by field id, if no field found, treat it as unknown field (for the moment, just + * ignore the field). + */ + private final class ConvertFieldById implements Function { + + @Override + public Converter apply(Type parquetField) { + final Type.ID parquetFieldId = parquetField.getId(); + if (parquetFieldId == null) { + throw new InvalidSchemaException("Schema should have field id, but field <" + parquetField.toString() + + "> has no id. Need to disable the flag " + ParquetMetadataConverter.PARQUET_SCHEMA_FIELD_WITH_ID + + " in configuration to read it."); + } + Descriptors.FieldDescriptor protoField = messageBuilder.getDescriptorForType().findFieldByNumber(parquetFieldId.intValue()); + if (protoField == null) { + LOG.warn("Cannot find corresponding field in protobuf schema for: " + parquetField.toString()); + return parquetField.isPrimitive() ? UnknownFieldIgnorePrimitiveConverter.INSTANCE + : new UnknownFieldIgnoreGroupConverter(parquetField.asGroupType()); + } + return newMessageConverter(messageBuilder, protoField, parquetField); + } + } } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/UnknownFieldIgnoreGroupConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/UnknownFieldIgnoreGroupConverter.java new file mode 100644 index 0000000000..77d1807993 --- /dev/null +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/UnknownFieldIgnoreGroupConverter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.proto; + +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.Type; + +/** + * Ignore unknown fields of group type.. + */ +public class UnknownFieldIgnoreGroupConverter extends GroupConverter { + + private final Converter[] fields; + + public UnknownFieldIgnoreGroupConverter(GroupType messageField) { + fields = new Converter[messageField.getFieldCount()]; + int fieldIndex = 0; + for (Type fieldType : messageField.getFields()) { + fields[fieldIndex++] = fieldType.isPrimitive() ? UnknownFieldIgnorePrimitiveConverter.INSTANCE + : new UnknownFieldIgnoreGroupConverter(fieldType.asGroupType()); + } + } + + @Override + public Converter getConverter(int fieldIndex) { + return fields[fieldIndex]; + } + + @Override + public void start() { + + } + + @Override + public void end() { + + } +} diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/UnknownFieldIgnorePrimitiveConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/UnknownFieldIgnorePrimitiveConverter.java new file mode 100644 index 0000000000..f58a177504 --- /dev/null +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/UnknownFieldIgnorePrimitiveConverter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.proto; + +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.PrimitiveConverter; + +/** + * Ignore primitive unknown field. It does nothing when called, so the field value is ignored. + */ +public class UnknownFieldIgnorePrimitiveConverter extends PrimitiveConverter { + + public static final UnknownFieldIgnorePrimitiveConverter INSTANCE = new UnknownFieldIgnorePrimitiveConverter(); + + private UnknownFieldIgnorePrimitiveConverter() { + // Use singleton instance. + } + + @Override + public void addBinary(Binary value) { + + } + + @Override + public void addBoolean(boolean value) { + + } + + @Override + public void addDouble(double value) { + + } + + @Override + public void addFloat(float value) { + + } + + @Override + public void addInt(int value) { + + } + + @Override + public void addLong(long value) { + + } +} diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java index 6c01d7b8db..fccf4a79f9 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java @@ -19,6 +19,7 @@ package org.apache.parquet.proto; import com.google.protobuf.Message; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.proto.test.TestProto3; import org.apache.parquet.proto.test.TestProtobuf; @@ -193,6 +194,43 @@ public void testProto3CustomProtoClass() throws Exception { assertEquals("writtenString", stringValue); } + /** + * Write data with {@link org.apache.parquet.proto.test.TestProto3.SchemaConverterAllDatatypes}, and read back with + * {@link org.apache.parquet.proto.test.TestProto3.SchemaConverterAllDatatypesEvolved}, to emulate the evolution of + * protobuf schema during the time. + * + * @throws Exception if test error. + */ + @Test + public void testReadProto3WithEvolvedSchema() throws Exception { + TestProto3.SchemaConverterSimpleMessage innerMessage = TestProto3.SchemaConverterSimpleMessage.newBuilder() + .setSomeId(100) + .setName("some name") + .build(); + TestProto3.SchemaConverterAllDatatypes data = TestProto3.SchemaConverterAllDatatypes.newBuilder() + .setOptionalDouble(1.0) + .setOptionalBool(true) + .setOptionalString("optional string") + .setOptionalMessage(innerMessage) + .addRepeatedMessage(innerMessage).addRepeatedMessage(TestProto3.SchemaConverterSimpleMessage.getDefaultInstance()) + .setOptionalEnum(TestProto3.SchemaConverterAllDatatypes.TestEnum.FIRST) + .setSomeInt32(8) + .setSomeString("some string") + .build(); + Path outputPath = new WriteUsingMR().write(new TestProto3.SchemaConverterAllDatatypes[]{data}); + Configuration readConf = new Configuration(); + ProtoReadSupport.setProtobufClass(readConf, TestProto3.SchemaConverterAllDatatypesEvolved.class.getName()); + ReadUsingMR readUsingMR = new ReadUsingMR(readConf); + List result = readUsingMR.read(outputPath); + assertEquals(result.size(), 1); + TestProto3.SchemaConverterAllDatatypesEvolved readBack = (TestProto3.SchemaConverterAllDatatypesEvolved) result.get(0); + assertEquals(readBack.getOptionalDouble(), data.getOptionalDouble(), 0.0); + assertEquals(readBack.getOptionalBool(), data.getOptionalBool()); + assertEquals(readBack.getOptionalString(), data.getOptionalString()); + assertEquals(readBack.getSomeInt32(), data.getSomeInt32()); + assertEquals(readBack.getOptionalMessage().getName(), readBack.getOptionalMessage().getName()); + } + /** * Runs job that writes input to file and then job reading data back. */ diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java index 6408244b7e..cb4de1da80 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java @@ -108,6 +108,11 @@ public void testProto3ConvertAllDatatypes() throws Exception { " optional binary optionalBytes = 15;\n" + " optional group optionalMessage = 16 {\n" + " optional int32 someId = 3;\n" + + " optional binary name (UTF8) = 5;\n" + + " }\n" + + " repeated group repeatedMessage = 17 {\n" + + " optional int32 someId = 3;\n" + + " optional binary name (UTF8) = 5;\n" + " }\n" + " optional binary optionalEnum (ENUM) = 18;" + " optional int32 someInt32 = 19;" + @@ -116,6 +121,7 @@ public void testProto3ConvertAllDatatypes() throws Exception { " optional int64 key = 1;\n" + " optional group value = 2 {\n" + " optional int32 someId = 3;\n" + + " optional binary name (UTF8) = 5;\n" + " }\n" + " }\n" + "}"; @@ -152,9 +158,11 @@ public void testProto3ConvertRepetition() throws Exception { " repeated int32 repeatedPrimitive = 3;\n" + " optional group optionalMessage = 7 {\n" + " optional int32 someId = 3;\n" + + " optional binary name (UTF8) = 5;\n" + " }\n" + " repeated group repeatedMessage = 9 {" + " optional int32 someId = 3;\n" + + " optional binary name (UTF8) = 5;\n" + " }\n" + "}"; diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/ReadUsingMR.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/ReadUsingMR.java index 8905968b01..51aef4aac4 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/ReadUsingMR.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/ReadUsingMR.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -41,9 +41,17 @@ public class ReadUsingMR { private static List outputMessages; - Configuration conf = new Configuration(); + final Configuration conf; private String projection; + public ReadUsingMR() { + this(new Configuration()); + } + + public ReadUsingMR(Configuration conf) { + this.conf = conf; + } + public void setRequestedProjection(String projection) { this.projection = projection; } diff --git a/parquet-protobuf/src/test/resources/TestProto3.proto b/parquet-protobuf/src/test/resources/TestProto3.proto index 1896445306..d87312b21c 100644 --- a/parquet-protobuf/src/test/resources/TestProto3.proto +++ b/parquet-protobuf/src/test/resources/TestProto3.proto @@ -50,6 +50,11 @@ message Links { message SchemaConverterSimpleMessage { int32 someId = 3; + string name = 5; +} + +message SchemaConverterSimpleMessageEvolved { + string name = 5; } message SchemaConverterAllDatatypes { @@ -69,6 +74,7 @@ message SchemaConverterAllDatatypes { string optionalString = 14; bytes optionalBytes = 15; SchemaConverterSimpleMessage optionalMessage = 16; + repeated SchemaConverterSimpleMessage repeatedMessage = 17; enum TestEnum { FIRST = 0; SECOND = 1; @@ -81,6 +87,29 @@ message SchemaConverterAllDatatypes { map optionalMap = 21; } +// This message is a version of SchemaConverterAllDatatypes with some fields removed. +message SchemaConverterAllDatatypesEvolved { + double optionalDouble = 1; + float optionalFloat = 2; + int32 optionalInt32 = 3; + int64 optionalInt64 = 4; + uint32 optionalUInt32 = 5; + uint64 optionalUInt64 = 6; + sint32 optionalSInt32 = 7; + sint64 optionalSInt64 = 8; + fixed32 optionalFixed32 = 9; + fixed64 optionalFixed64 = 10; + sfixed32 optionalSFixed32 = 11; + sfixed64 optionalSFixed64 = 12; + bool optionalBool = 13; + string optionalString = 14; + bytes optionalBytes = 15; + SchemaConverterSimpleMessageEvolved optionalMessage = 16; + oneof oneof { + int32 someInt32 = 19; + } +} + message SchemaConverterRepetition { int32 optionalPrimitive = 1; repeated int32 repeatedPrimitive = 3;