diff --git a/.travis.yml b/.travis.yml index 72d70300be..f5157b2ae4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,20 +2,6 @@ language: java before_install: - sudo apt-get update -qq - sudo apt-get install build-essential - - mkdir protobuf_install - - pushd protobuf_install - - wget https://github.com/google/protobuf/archive/v3.2.0.tar.gz -O protobuf-3.2.0.tar.gz - - tar xzf protobuf-3.2.0.tar.gz - - cd protobuf-3.2.0 - - sudo apt-get install autoconf automake libtool curl make g++ unzip - - ./autogen.sh - - ./configure - - make - - make check - - sudo make install - - sudo ldconfig - - protoc --version - - popd - pwd - sudo apt-get install -qq libboost-dev libboost-test-dev libboost-program-options-dev libevent-dev automake libtool flex bison pkg-config g++ libssl-dev - wget -nv http://archive.apache.org/dist/thrift/0.7.0/thrift-0.7.0.tar.gz diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java index 68dba979b8..4e422534ef 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -394,4 +394,36 @@ List mergeFields(GroupType toMerge, boolean strict) { } return newFields; } + + /** + * Check the projection schema is contained by the origin, and decorate projection schema with the origin field id if original schema has field id. + * This is needed because people usually do not put the field id when they describe a projection schema. + * + * @param projection Projection schema (or subschema of the projection) + * @param builder Builder for constructing a corresponding schema with field id (if origin contains) regarding to projection + * @param Type, which could be {@link MessageType} or {@link org.apache.parquet.schema.Types.GroupBuilder} + * @return builder or the final {@link MessageType}. + * @throws InvalidRecordException if check fails. + */ + protected T checkSubTypeAndDecorateWithFieldId(GroupType projection, Types.GroupBuilder builder) { + if (this.getRepetition() != projection.getRepetition()) { + throw new InvalidRecordException(projection + " found: expected " + this); + } + for (Type field : projection.getFields()) { + Type fieldInFile = this.getType(field.getName()); + if (fieldInFile == null) { + throw new InvalidRecordException(projection + " found: expected " + this); + } + if (fieldInFile.isPrimitive()) { + fieldInFile.checkContains(field); + builder.addField(fieldInFile.getId() == null ? field : field.withId(fieldInFile.getId().intValue())); + } else { + fieldInFile.asGroupType().checkSubTypeAndDecorateWithFieldId(field.asGroupType(), builder.group(field.getRepetition())); + } + } + if (this.getId() != null) { + builder.id(this.getId().intValue()); + } + return builder.as(this.getOriginalType()).named(this.getName()); + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java index 1e26ed2425..9b1ab47193 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -41,14 +41,14 @@ public MessageType(String name, Type... fields) { super(Repetition.REPEATED, name, fields); } - /** - * - * @param name the name of the type - * @param fields the fields contained by this message - */ - public MessageType(String name, List fields) { - super(Repetition.REPEATED, name, fields); - } + /** + * + * @param name the name of the type + * @param fields the fields contained by this message + */ + public MessageType(String name, List fields) { + super(Repetition.REPEATED, name, fields); + } /** * {@inheritDoc} @@ -145,4 +145,15 @@ public MessageType union(MessageType toMerge, boolean strict) { return new MessageType(this.getName(), mergeFields(toMerge, strict)); } + /** + * Check projection schema is subschema of this and append field id to projection if this contains field id. + * This is because we often just don't put field id for projection schema, but that is necessary when trying to read fields by id. + * + * @param projection + * @return Projection schema with field id. + * @throws InvalidRecordException if check fails. + */ + public MessageType checkSubTypeAndDecorateWithId(MessageType projection) { + return checkSubTypeAndDecorateWithFieldId(projection, Types.buildMessage()); + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Type.java b/parquet-column/src/main/java/org/apache/parquet/schema/Type.java index 99222f96c3..a442342fea 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/Type.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/Type.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -49,6 +49,15 @@ public int intValue() { return id; } + /** + * This method is solely used by the jackson json inspector to serialize an {@link ID} object. + * + * @return Int value of this id. + */ + public int getValue() { + return id; + } + @Override public boolean equals(Object obj) { return (obj instanceof ID) && ((ID)obj).id == id; diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java index 4add1740ce..52d54e4a2d 100644 --- a/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java +++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java @@ -18,16 +18,17 @@ */ package org.apache.parquet.schema; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; import static org.apache.parquet.schema.OriginalType.LIST; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; import static org.apache.parquet.schema.Type.Repetition.REPEATED; import static org.apache.parquet.schema.Type.Repetition.REQUIRED; +import org.apache.parquet.io.InvalidRecordException; import org.junit.Test; import org.apache.parquet.example.Paper; @@ -200,4 +201,66 @@ public void testIDs() throws Exception { assertEquals(schema, schema2); assertEquals(schema.toString(), schema2.toString()); } + + @Test + public void testDecorateProjectionSchema() { + MessageType schema = new MessageType("Document", + new PrimitiveType(REQUIRED, INT64, "DocId").withId(1), + new GroupType(OPTIONAL, "Links", + new PrimitiveType(REPEATED, INT64, "Backward").withId(1), + new PrimitiveType(REPEATED, INT64, "Forward").withId(2) + ).withId(2), + new GroupType(REPEATED, "Name", + new GroupType(REPEATED, "Language", + new PrimitiveType(REQUIRED, BINARY, "Code").withId(1), + new PrimitiveType(OPTIONAL, BINARY, "Country").withId(2)) + .withId(1), + new PrimitiveType(OPTIONAL, BINARY, "Url").withId(2)) + .withId(3)); + + MessageType projection = new MessageType("Document", + new PrimitiveType(REQUIRED, INT64, "DocId"), + new GroupType(REPEATED, "Name", + new GroupType(REPEATED, "Language", + new PrimitiveType(OPTIONAL, BINARY, "Country")))); + + MessageType projectionWithId = schema.checkSubTypeAndDecorateWithId(projection); + projection.checkContains(projectionWithId); + projectionWithId.checkContains(projection); + assertFieldIdNotNull(projectionWithId); + } + + @Test(expected = InvalidRecordException.class) + public void testCheckProjectionSchema() { + MessageType schema = new MessageType("Document", + new PrimitiveType(REQUIRED, INT64, "DocId").withId(1), + new GroupType(OPTIONAL, "Links", + new PrimitiveType(REPEATED, INT64, "Backward").withId(1), + new PrimitiveType(REPEATED, INT64, "Forward").withId(2) + ).withId(2), + new GroupType(REPEATED, "Name", + new GroupType(REPEATED, "Language", + new PrimitiveType(REQUIRED, BINARY, "Code").withId(1), + new PrimitiveType(OPTIONAL, BINARY, "Country").withId(2)) + .withId(1), + new PrimitiveType(OPTIONAL, BINARY, "Url").withId(2)) + .withId(3)); + + MessageType projection = new MessageType("Document", + new PrimitiveType(REQUIRED, INT64, "DocId"), + new GroupType(REPEATED, "Name", + new GroupType(REQUIRED, "Language", // repetition wrong + new PrimitiveType(OPTIONAL, BINARY, "Country")))); + + schema.checkSubTypeAndDecorateWithId(projection); + } + + private void assertFieldIdNotNull(GroupType projectionWithId) { + for (Type field : projectionWithId.getFields()) { + assertTrue(field.getId() != null); + if (!field.isPrimitive()) { + assertFieldIdNotNull(field.asGroupType()); + } + } + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index bf22b617b3..5d9a0faca6 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -36,6 +36,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.CorruptStatistics; import org.apache.parquet.format.PageEncodingStats; @@ -62,14 +63,9 @@ import org.apache.parquet.column.EncodingStats; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.ParquetDecodingException; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.*; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type.Repetition; -import org.apache.parquet.schema.TypeVisitor; -import org.apache.parquet.schema.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +76,7 @@ public class ParquetMetadataConverter { public static final MetadataFilter NO_FILTER = new NoFilter(); public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter(); public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k + public static final String PARQUET_SCHEMA_FIELD_WITH_ID = "parquet.schema.field.with.id"; private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataConverter.class); @@ -115,7 +112,7 @@ public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parque } FileMetaData fileMetaData = new FileMetaData( currentVersion, - toParquetSchema(parquetMetadata.getFileMetaData().getSchema()), + toParquetSchema(parquetMetadata.getFileMetaData()), numRows, rowGroups); @@ -129,17 +126,18 @@ public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parque } // Visible for testing - List toParquetSchema(MessageType schema) { + List toParquetSchema(org.apache.parquet.hadoop.metadata.FileMetaData fileMetaData) { List result = new ArrayList(); - addToList(result, schema); + String withId = fileMetaData.getKeyValueMetaData().get(PARQUET_SCHEMA_FIELD_WITH_ID); + addToList(result, fileMetaData.getSchema(), !StringUtils.isBlank(withId) && Boolean.valueOf(withId)); return result; } - private void addToList(final List result, org.apache.parquet.schema.Type field) { + private void addToList(final List result, final org.apache.parquet.schema.Type field, final boolean withId) { field.accept(new TypeVisitor() { @Override public void visit(PrimitiveType primitiveType) { - SchemaElement element = new SchemaElement(primitiveType.getName()); + SchemaElement element = schemaElementfromField(primitiveType, withId); element.setRepetition_type(toParquetRepetition(primitiveType.getRepetition())); element.setType(getType(primitiveType.getPrimitiveTypeName())); if (primitiveType.getOriginalType() != null) { @@ -163,7 +161,7 @@ public void visit(MessageType messageType) { @Override public void visit(GroupType groupType) { - SchemaElement element = new SchemaElement(groupType.getName()); + SchemaElement element = schemaElementfromField(groupType, withId); element.setRepetition_type(toParquetRepetition(groupType.getRepetition())); if (groupType.getOriginalType() != null) { element.setConverted_type(getConvertedType(groupType.getOriginalType())); @@ -176,12 +174,30 @@ private void visitChildren(final List result, element.setNum_children(groupType.getFieldCount()); result.add(element); for (org.apache.parquet.schema.Type field : groupType.getFields()) { - addToList(result, field); + addToList(result, field, withId); } } }); } + /** + * Build a {@link SchemaElement} from {@link org.apache.parquet.schema.Type} with the field's name, and keep the field + * id if the field has one. + * + * @param field a field of the parquet schema + * @return SchemaElement + */ + private static SchemaElement schemaElementfromField(org.apache.parquet.schema.Type field, boolean withId) { + SchemaElement element = new SchemaElement(field.getName()); + if (withId) { + if (field.getId() == null) { + throw new InvalidSchemaException("Field id is required, but not found in " + field.toString()); + } + element.setField_id(field.getId().intValue()); + } + return element; + } + private void addRowGroup(ParquetMetadata parquetMetadata, List rowGroups, BlockMetaData block) { //rowGroup.total_byte_size = ; List columns = block.getColumns(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java index 6d8c1fdb67..7dbe842a4c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,6 +25,7 @@ import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.Types; /** * Abstraction used by the {@link org.apache.parquet.hadoop.ParquetInputFormat} to materialize records @@ -55,8 +56,7 @@ public static MessageType getSchemaForRead(MessageType fileMessageType, String p } public static MessageType getSchemaForRead(MessageType fileMessageType, MessageType projectedMessageType) { - fileMessageType.checkContains(projectedMessageType); - return projectedMessageType; + return fileMessageType.checkSubTypeAndDecorateWithId(projectedMessageType); } /** diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java index 35c35c19c6..a5e3f8a89c 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -44,6 +44,7 @@ import java.util.Set; import java.util.TreeSet; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.Version; @@ -61,6 +62,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.InvalidSchemaException; import org.junit.Assert; import org.junit.Test; @@ -100,23 +102,55 @@ public void testPageHeader() throws IOException { @Test public void testSchemaConverter() { ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); - List parquetSchema = parquetMetadataConverter.toParquetSchema(Paper.schema); + List parquetSchema = parquetMetadataConverter.toParquetSchema(mockHadoopFileMetaData(Paper.schema, false)); MessageType schema = parquetMetadataConverter.fromParquetSchema(parquetSchema); assertEquals(Paper.schema, schema); } + @Test + public void testSchemaWithFieldId() { + final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); + final MessageType schema = Types.buildMessage() + .required(PrimitiveTypeName.BINARY) + .as(OriginalType.UTF8) + .id(1) + .named("stringField") + .optional(PrimitiveTypeName.INT32) + .as(OriginalType.INT_32) + .id(2) + .named("intField") + .named("Message"); + List parquetSchema = parquetMetadataConverter.toParquetSchema(mockHadoopFileMetaData(schema, true)); + assertEquals(schema, parquetMetadataConverter.fromParquetSchema(parquetSchema)); + } + + @Test(expected = InvalidSchemaException.class) + public void testSchemaExpectingFieldId() { + final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); + final MessageType messageType = Types.buildMessage() + .required(PrimitiveTypeName.BINARY) + .as(OriginalType.UTF8) + .named("stringField") + .optional(PrimitiveTypeName.INT32) + .as(OriginalType.INT_32) + .named("intField") + .named("Message"); + parquetMetadataConverter.toParquetSchema(mockHadoopFileMetaData(messageType, true)); + } + @Test public void testSchemaConverterDecimal() { ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); + final MessageType messageType = Types.buildMessage() + .required(PrimitiveTypeName.BINARY) + .as(OriginalType.DECIMAL).precision(9).scale(2) + .named("aBinaryDecimal") + .optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(4) + .as(OriginalType.DECIMAL).precision(9).scale(2) + .named("aFixedDecimal") + .named("Message"); List schemaElements = parquetMetadataConverter.toParquetSchema( - Types.buildMessage() - .required(PrimitiveTypeName.BINARY) - .as(OriginalType.DECIMAL).precision(9).scale(2) - .named("aBinaryDecimal") - .optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(4) - .as(OriginalType.DECIMAL).precision(9).scale(2) - .named("aFixedDecimal") - .named("Message") + mockHadoopFileMetaData(messageType, false) ); List expected = Lists.newArrayList( new SchemaElement("Message").setNum_children(2), @@ -164,6 +198,12 @@ public void testEnumEquivalence() { } } + private org.apache.parquet.hadoop.metadata.FileMetaData mockHadoopFileMetaData(MessageType messageType, boolean withId) { + return new org.apache.parquet.hadoop.metadata.FileMetaData(messageType, + withId ? ImmutableMap.of(ParquetMetadataConverter.PARQUET_SCHEMA_FIELD_WITH_ID, "true") : Collections.emptyMap(), + null); + } + private FileMetaData metadata(long... sizes) { List schema = emptyList(); List rowGroups = new ArrayList(); @@ -330,7 +370,7 @@ private ColumnChunkMetaData createColumnChunkMetaData() { 0, 0, 0, 0, 0); return md; } - + @Test public void testEncodingsCache() { ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); diff --git a/parquet-protobuf/pom.xml b/parquet-protobuf/pom.xml index 978a964c4d..29f81ab4da 100644 --- a/parquet-protobuf/pom.xml +++ b/parquet-protobuf/pom.xml @@ -82,6 +82,7 @@ ${slf4j.version} test + @@ -92,6 +93,12 @@ + + + kr.motd.maven + os-maven-plugin + + org.apache.maven.plugins @@ -146,32 +153,20 @@ - maven-antrun-plugin + org.xolstice.maven.plugins + protobuf-maven-plugin + + ${project.basedir}/src/test/resources + com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + - generate-sources - generate-test-sources - - - - - - - - - - - - src/main/java - target/generated-sources/java - - run + test-compile - diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java index b5649a05b6..cbd926d953 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,11 +18,14 @@ */ package org.apache.parquet.proto; +import com.google.common.base.Function; +import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.twitter.elephantbird.util.Protobufs; import org.apache.parquet.column.Dictionary; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.Converter; @@ -30,7 +33,11 @@ import org.apache.parquet.io.api.PrimitiveConverter; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.IncompatibleSchemaModificationException; +import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.Type; +import org.omg.CORBA.DynAnyPackage.Invalid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.List; @@ -47,49 +54,33 @@ * @author Lukas Nalezenec */ class ProtoMessageConverter extends GroupConverter { + private static final Logger LOG = LoggerFactory.getLogger(ProtoMessageConverter.class); private final Converter[] converters; private final ParentValueContainer parent; - private final Message.Builder myBuilder; + private final Message.Builder messageBuilder; + private final boolean readFieldById; // used in record converter - ProtoMessageConverter(ParentValueContainer pvc, Class protoClass, GroupType parquetSchema) { - this(pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema); + ProtoMessageConverter(ParentValueContainer pvc, Class protoClass, GroupType parquetSchema, boolean readFieldById) { + this(pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema, readFieldById); } - // For usage in message arrays - ProtoMessageConverter(ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema) { - - int schemaSize = parquetSchema.getFieldCount(); - converters = new Converter[schemaSize]; - - this.parent = pvc; - int parquetFieldIndex = 1; - + ProtoMessageConverter(ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema, boolean readFieldById) { if (pvc == null) { throw new IllegalStateException("Missing parent value container"); } - - myBuilder = builder; - - Descriptors.Descriptor protoDescriptor = builder.getDescriptorForType(); - - for (Type parquetField : parquetSchema.getFields()) { - Descriptors.FieldDescriptor protoField = protoDescriptor.findFieldByName(parquetField.getName()); - - if (protoField == null) { - String description = "Scheme mismatch \n\"" + parquetField + "\"" + - "\n proto descriptor:\n" + protoDescriptor.toProto(); - throw new IncompatibleSchemaModificationException("Cant find \"" + parquetField.getName() + "\" " + description); - } - - converters[parquetFieldIndex - 1] = newMessageConverter(myBuilder, protoField, parquetField); - - parquetFieldIndex++; - } + this.parent = pvc; + this.readFieldById = readFieldById; + this.messageBuilder = builder; + this.converters = Lists.transform(parquetSchema.getFields(), convertField()) + .toArray(new Converter[parquetSchema.getFieldCount()]); } + private Function convertField() { + return readFieldById ? new ConvertFieldById() : new ConvertFieldByName(); + } @Override public Converter getConverter(int fieldIndex) { @@ -103,8 +94,8 @@ public void start() { @Override public void end() { - parent.add(myBuilder.build()); - myBuilder.clear(); + parent.add(messageBuilder.build()); + messageBuilder.clear(); } private Converter newMessageConverter(final Message.Builder parentBuilder, final Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { @@ -148,7 +139,7 @@ private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder p case LONG: return new ProtoLongConverter(pvc); case MESSAGE: { Message.Builder subBuilder = parentBuilder.newBuilderForField(fieldDescriptor); - return new ProtoMessageConverter(pvc, subBuilder, parquetType.asGroupType()); + return new ProtoMessageConverter(pvc, subBuilder, parquetType.asGroupType(), readFieldById); } } @@ -157,7 +148,7 @@ private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder p } public Message.Builder getBuilder() { - return myBuilder; + return messageBuilder; } static abstract class ParentValueContainer { @@ -345,4 +336,46 @@ public void addBinary(Binary binary) { } } + + /** + * Map parquet and protobuf schema by field names, error if no field found, this is the old behavior. + */ + private final class ConvertFieldByName implements Function { + + @Override + public Converter apply(Type parquetField) { + Descriptors.FieldDescriptor protoField = messageBuilder.getDescriptorForType().findFieldByName(parquetField.getName()); + if (protoField == null) { + String description = "Scheme mismatch \n\"" + parquetField + "\"" + + "\n proto descriptor:\n" + messageBuilder.getDescriptorForType().toProto(); + throw new IncompatibleSchemaModificationException("Cant find \"" + parquetField.getName() + "\" " + description); + } + + return newMessageConverter(messageBuilder, protoField, parquetField); + } + } + + /** + * Map parquet and protobuf schema by field id, if no field found, treat it as unknown field (for the moment, just + * ignore the field). + */ + private final class ConvertFieldById implements Function { + + @Override + public Converter apply(Type parquetField) { + final Type.ID parquetFieldId = parquetField.getId(); + if (parquetFieldId == null) { + throw new InvalidSchemaException("Schema should have field id, but field <" + parquetField.toString() + + "> has no id. Need to disable the flag " + ParquetMetadataConverter.PARQUET_SCHEMA_FIELD_WITH_ID + + " in configuration to read it."); + } + Descriptors.FieldDescriptor protoField = messageBuilder.getDescriptorForType().findFieldByNumber(parquetFieldId.intValue()); + if (protoField == null) { + LOG.warn("Cannot find corresponding field in protobuf schema for: " + parquetField.toString()); + return parquetField.isPrimitive() ? UnknownFieldIgnorePrimitiveConverter.INSTANCE + : new UnknownFieldIgnoreGroupConverter(parquetField.asGroupType()); + } + return newMessageConverter(messageBuilder, protoField, parquetField); + } + } } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java index 3a21d84486..b5c7a2fd5b 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,6 +21,7 @@ import com.google.protobuf.Message; import com.twitter.elephantbird.util.Protobufs; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.api.InitContext; import org.apache.parquet.hadoop.api.ReadSupport; import org.apache.parquet.io.api.RecordMaterializer; @@ -90,8 +91,11 @@ public RecordMaterializer prepareForRead(Configuration configuration, Map protobufClass = Protobufs.getProtobufClass(headerProtoClass); - return new ProtoRecordMaterializer(requestedSchema, protobufClass); + // If the parquet.schema.field.with.id is set on in the footer, map the schema with field id, + // but users have the possibility to fallback to the old behavior of mapping field by name, by disabling explicitly the flag in the configuration. + final boolean readFieldById = Boolean.valueOf(keyValueMetaData.get(ParquetMetadataConverter.PARQUET_SCHEMA_FIELD_WITH_ID)) + && configuration.getBoolean(ParquetMetadataConverter.PARQUET_SCHEMA_FIELD_WITH_ID, true); + return new ProtoRecordMaterializer(requestedSchema, protobufClass, readFieldById); } - } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java index 99153a98cc..fb0fd2adf0 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,8 +21,11 @@ import com.google.protobuf.Message; import com.google.protobuf.MessageOrBuilder; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.schema.MessageType; +import java.util.Map; + /** * Converts data content of root message from Protocol Buffer message to parquet message. * It delegates conversion of inner fields to {@link ProtoMessageConverter} class using inheritance. @@ -46,13 +49,13 @@ public void add(Object a) { } - public ProtoRecordConverter(Class protoclass, MessageType parquetSchema) { - super(new SkipParentValueContainer(), protoclass, parquetSchema); + public ProtoRecordConverter(Class protoclass, MessageType parquetSchema, boolean readFieldById) { + super(new SkipParentValueContainer(), protoclass, parquetSchema, readFieldById); reusedBuilder = getBuilder(); } - public ProtoRecordConverter(Message.Builder builder, MessageType parquetSchema) { - super(new SkipParentValueContainer(), builder, parquetSchema); + public ProtoRecordConverter(Message.Builder builder, MessageType parquetSchema, boolean readFieldById) { + super(new SkipParentValueContainer(), builder, parquetSchema, readFieldById); reusedBuilder = getBuilder(); } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java index 039a571137..fe5069d215 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,16 +20,19 @@ import com.google.protobuf.Message; import com.google.protobuf.MessageOrBuilder; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; +import java.util.Map; + class ProtoRecordMaterializer extends RecordMaterializer { private final ProtoRecordConverter root; - public ProtoRecordMaterializer(MessageType requestedSchema, Class protobufClass) { - this.root = new ProtoRecordConverter(protobufClass, requestedSchema); + public ProtoRecordMaterializer(MessageType requestedSchema, Class protobufClass, boolean readFieldById) { + this.root = new ProtoRecordConverter(protobufClass, requestedSchema, readFieldById); } @Override diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java index c0ed351046..9e6ae1e427 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,6 +26,7 @@ import com.google.protobuf.TextFormat; import com.twitter.elephantbird.util.Protobufs; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.BadConfigurationException; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.io.InvalidRecordException; @@ -118,6 +119,10 @@ public WriteContext init(Configuration configuration) { Map extraMetaData = new HashMap(); extraMetaData.put(ProtoReadSupport.PB_CLASS, protoMessage.getName()); extraMetaData.put(ProtoReadSupport.PB_DESCRIPTOR, serializeDescriptor(protoMessage)); + // By default, we will persist field id in the parquet schema metadata. + // This can be turned off by explicitly set the flag to false in the configuration. + extraMetaData.put(ParquetMetadataConverter.PARQUET_SCHEMA_FIELD_WITH_ID, + String.valueOf(configuration.getBoolean(ParquetMetadataConverter.PARQUET_SCHEMA_FIELD_WITH_ID, true))); return new WriteContext(rootSchema, extraMetaData); } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/UnknownFieldIgnoreGroupConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/UnknownFieldIgnoreGroupConverter.java new file mode 100644 index 0000000000..77d1807993 --- /dev/null +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/UnknownFieldIgnoreGroupConverter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.proto; + +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.Type; + +/** + * Ignore unknown fields of group type.. + */ +public class UnknownFieldIgnoreGroupConverter extends GroupConverter { + + private final Converter[] fields; + + public UnknownFieldIgnoreGroupConverter(GroupType messageField) { + fields = new Converter[messageField.getFieldCount()]; + int fieldIndex = 0; + for (Type fieldType : messageField.getFields()) { + fields[fieldIndex++] = fieldType.isPrimitive() ? UnknownFieldIgnorePrimitiveConverter.INSTANCE + : new UnknownFieldIgnoreGroupConverter(fieldType.asGroupType()); + } + } + + @Override + public Converter getConverter(int fieldIndex) { + return fields[fieldIndex]; + } + + @Override + public void start() { + + } + + @Override + public void end() { + + } +} diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/UnknownFieldIgnorePrimitiveConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/UnknownFieldIgnorePrimitiveConverter.java new file mode 100644 index 0000000000..f58a177504 --- /dev/null +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/UnknownFieldIgnorePrimitiveConverter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.proto; + +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.PrimitiveConverter; + +/** + * Ignore primitive unknown field. It does nothing when called, so the field value is ignored. + */ +public class UnknownFieldIgnorePrimitiveConverter extends PrimitiveConverter { + + public static final UnknownFieldIgnorePrimitiveConverter INSTANCE = new UnknownFieldIgnorePrimitiveConverter(); + + private UnknownFieldIgnorePrimitiveConverter() { + // Use singleton instance. + } + + @Override + public void addBinary(Binary value) { + + } + + @Override + public void addBoolean(boolean value) { + + } + + @Override + public void addDouble(double value) { + + } + + @Override + public void addFloat(float value) { + + } + + @Override + public void addInt(int value) { + + } + + @Override + public void addLong(long value) { + + } +} diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java index 6c01d7b8db..fccf4a79f9 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java @@ -19,6 +19,7 @@ package org.apache.parquet.proto; import com.google.protobuf.Message; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.proto.test.TestProto3; import org.apache.parquet.proto.test.TestProtobuf; @@ -193,6 +194,43 @@ public void testProto3CustomProtoClass() throws Exception { assertEquals("writtenString", stringValue); } + /** + * Write data with {@link org.apache.parquet.proto.test.TestProto3.SchemaConverterAllDatatypes}, and read back with + * {@link org.apache.parquet.proto.test.TestProto3.SchemaConverterAllDatatypesEvolved}, to emulate the evolution of + * protobuf schema during the time. + * + * @throws Exception if test error. + */ + @Test + public void testReadProto3WithEvolvedSchema() throws Exception { + TestProto3.SchemaConverterSimpleMessage innerMessage = TestProto3.SchemaConverterSimpleMessage.newBuilder() + .setSomeId(100) + .setName("some name") + .build(); + TestProto3.SchemaConverterAllDatatypes data = TestProto3.SchemaConverterAllDatatypes.newBuilder() + .setOptionalDouble(1.0) + .setOptionalBool(true) + .setOptionalString("optional string") + .setOptionalMessage(innerMessage) + .addRepeatedMessage(innerMessage).addRepeatedMessage(TestProto3.SchemaConverterSimpleMessage.getDefaultInstance()) + .setOptionalEnum(TestProto3.SchemaConverterAllDatatypes.TestEnum.FIRST) + .setSomeInt32(8) + .setSomeString("some string") + .build(); + Path outputPath = new WriteUsingMR().write(new TestProto3.SchemaConverterAllDatatypes[]{data}); + Configuration readConf = new Configuration(); + ProtoReadSupport.setProtobufClass(readConf, TestProto3.SchemaConverterAllDatatypesEvolved.class.getName()); + ReadUsingMR readUsingMR = new ReadUsingMR(readConf); + List result = readUsingMR.read(outputPath); + assertEquals(result.size(), 1); + TestProto3.SchemaConverterAllDatatypesEvolved readBack = (TestProto3.SchemaConverterAllDatatypesEvolved) result.get(0); + assertEquals(readBack.getOptionalDouble(), data.getOptionalDouble(), 0.0); + assertEquals(readBack.getOptionalBool(), data.getOptionalBool()); + assertEquals(readBack.getOptionalString(), data.getOptionalString()); + assertEquals(readBack.getSomeInt32(), data.getSomeInt32()); + assertEquals(readBack.getOptionalMessage().getName(), readBack.getOptionalMessage().getName()); + } + /** * Runs job that writes input to file and then job reading data back. */ diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java index 6f5ff53b69..cb4de1da80 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java @@ -18,12 +18,21 @@ */ package org.apache.parquet.proto; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.FieldDescriptor.JavaType; import com.google.protobuf.Message; -import org.junit.Test; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.proto.test.TestProto3; import org.apache.parquet.proto.test.TestProtobuf; +import org.apache.parquet.proto.test.TestProtobuf.SchemaConverterAllDatatypes; +import org.apache.parquet.proto.utils.WriteUsingMR; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.Type; +import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -72,7 +81,7 @@ public void testConvertAllDatatypes() throws Exception { " optional binary optionalEnum (ENUM) = 18;" + "}"; - testConversion(TestProtobuf.SchemaConverterAllDatatypes.class, expectedSchema); + testConversion(SchemaConverterAllDatatypes.class, expectedSchema); } /** @@ -99,6 +108,11 @@ public void testProto3ConvertAllDatatypes() throws Exception { " optional binary optionalBytes = 15;\n" + " optional group optionalMessage = 16 {\n" + " optional int32 someId = 3;\n" + + " optional binary name (UTF8) = 5;\n" + + " }\n" + + " repeated group repeatedMessage = 17 {\n" + + " optional int32 someId = 3;\n" + + " optional binary name (UTF8) = 5;\n" + " }\n" + " optional binary optionalEnum (ENUM) = 18;" + " optional int32 someInt32 = 19;" + @@ -107,6 +121,7 @@ public void testProto3ConvertAllDatatypes() throws Exception { " optional int64 key = 1;\n" + " optional group value = 2 {\n" + " optional int32 someId = 3;\n" + + " optional binary name (UTF8) = 5;\n" + " }\n" + " }\n" + "}"; @@ -143,12 +158,56 @@ public void testProto3ConvertRepetition() throws Exception { " repeated int32 repeatedPrimitive = 3;\n" + " optional group optionalMessage = 7 {\n" + " optional int32 someId = 3;\n" + + " optional binary name (UTF8) = 5;\n" + " }\n" + " repeated group repeatedMessage = 9 {" + " optional int32 someId = 3;\n" + + " optional binary name (UTF8) = 5;\n" + " }\n" + "}"; testConversion(TestProto3.SchemaConverterRepetition.class, expectedSchema); } + + /** + * Test the persistent metadata contains field id. + * + * @throws Exception if test fails. + */ + @Test + public void testConvertProtoSchema2ParquetMetadata() throws Exception { + ProtoSchemaConverter schemaConverter = new ProtoSchemaConverter(); + MessageType parquetSchema = schemaConverter.convert(SchemaConverterAllDatatypes.class); + // Test if the converted schema respects the field id, index and name. + for (Descriptors.FieldDescriptor field : SchemaConverterAllDatatypes.getDescriptor().getFields()) { + checkFieldConversion(parquetSchema, field); + } + + final WriteUsingMR writeParquet = new WriteUsingMR(); + Path parquetPath = writeParquet.write(SchemaConverterAllDatatypes.getDefaultInstance()); + FileStatus[] files = parquetPath.getFileSystem(writeParquet.getConfiguration()).listStatus(parquetPath); + int parquetFileCount = 0; + for (FileStatus file : files) { + if (file.getPath().getName().endsWith("parquet")) { + parquetFileCount ++; + ParquetFileReader parquetReader = ParquetFileReader.open(writeParquet.getConfiguration(), file.getPath()); + MessageType outputSchema = parquetReader.getFooter().getFileMetaData().getSchema(); + // Test if the output schema is same as the original one converted from protobuf descriptor, thus id is preserved. + assertEquals(outputSchema, parquetSchema); + } + } + // There will be only 1 file. + assertEquals(parquetFileCount, 1); + } + + private static void checkFieldConversion(GroupType parquetSchema, Descriptors.FieldDescriptor field) { + Type schemaField = parquetSchema.getType(field.getIndex()); + assertEquals(schemaField.getName(), field.getName()); + assertEquals(schemaField.getId().getValue(), field.getNumber()); + if (field.getJavaType() == JavaType.MESSAGE) { + for (Descriptors.FieldDescriptor subField : field.getMessageType().getFields()) { + checkFieldConversion((GroupType) schemaField, subField); + } + } + } } diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/ReadUsingMR.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/ReadUsingMR.java index 8905968b01..51aef4aac4 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/ReadUsingMR.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/ReadUsingMR.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -41,9 +41,17 @@ public class ReadUsingMR { private static List outputMessages; - Configuration conf = new Configuration(); + final Configuration conf; private String projection; + public ReadUsingMR() { + this(new Configuration()); + } + + public ReadUsingMR(Configuration conf) { + this.conf = conf; + } + public void setRequestedProjection(String projection) { this.projection = projection; } diff --git a/parquet-protobuf/src/test/resources/TestProto3.proto b/parquet-protobuf/src/test/resources/TestProto3.proto index 1896445306..d87312b21c 100644 --- a/parquet-protobuf/src/test/resources/TestProto3.proto +++ b/parquet-protobuf/src/test/resources/TestProto3.proto @@ -50,6 +50,11 @@ message Links { message SchemaConverterSimpleMessage { int32 someId = 3; + string name = 5; +} + +message SchemaConverterSimpleMessageEvolved { + string name = 5; } message SchemaConverterAllDatatypes { @@ -69,6 +74,7 @@ message SchemaConverterAllDatatypes { string optionalString = 14; bytes optionalBytes = 15; SchemaConverterSimpleMessage optionalMessage = 16; + repeated SchemaConverterSimpleMessage repeatedMessage = 17; enum TestEnum { FIRST = 0; SECOND = 1; @@ -81,6 +87,29 @@ message SchemaConverterAllDatatypes { map optionalMap = 21; } +// This message is a version of SchemaConverterAllDatatypes with some fields removed. +message SchemaConverterAllDatatypesEvolved { + double optionalDouble = 1; + float optionalFloat = 2; + int32 optionalInt32 = 3; + int64 optionalInt64 = 4; + uint32 optionalUInt32 = 5; + uint64 optionalUInt64 = 6; + sint32 optionalSInt32 = 7; + sint64 optionalSInt64 = 8; + fixed32 optionalFixed32 = 9; + fixed64 optionalFixed64 = 10; + sfixed32 optionalSFixed32 = 11; + sfixed64 optionalSFixed64 = 12; + bool optionalBool = 13; + string optionalString = 14; + bytes optionalBytes = 15; + SchemaConverterSimpleMessageEvolved optionalMessage = 16; + oneof oneof { + int32 someInt32 = 19; + } +} + message SchemaConverterRepetition { int32 optionalPrimitive = 1; repeated int32 repeatedPrimitive = 3;