diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 218b714dfe..5d9a0faca6 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -36,6 +36,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.CorruptStatistics; import org.apache.parquet.format.PageEncodingStats; @@ -62,14 +63,9 @@ import org.apache.parquet.column.EncodingStats; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.ParquetDecodingException; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.*; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type.Repetition; -import org.apache.parquet.schema.TypeVisitor; -import org.apache.parquet.schema.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +76,7 @@ public class ParquetMetadataConverter { public static final MetadataFilter NO_FILTER = new NoFilter(); public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter(); public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k + public static final String PARQUET_SCHEMA_FIELD_WITH_ID = "parquet.schema.field.with.id"; private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataConverter.class); @@ -115,7 +112,7 @@ public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parque } FileMetaData fileMetaData = new FileMetaData( currentVersion, - toParquetSchema(parquetMetadata.getFileMetaData().getSchema()), + toParquetSchema(parquetMetadata.getFileMetaData()), numRows, rowGroups); @@ -129,17 +126,18 @@ public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parque } // Visible for testing - List toParquetSchema(MessageType schema) { + List toParquetSchema(org.apache.parquet.hadoop.metadata.FileMetaData fileMetaData) { List result = new ArrayList(); - addToList(result, schema); + String withId = fileMetaData.getKeyValueMetaData().get(PARQUET_SCHEMA_FIELD_WITH_ID); + addToList(result, fileMetaData.getSchema(), !StringUtils.isBlank(withId) && Boolean.valueOf(withId)); return result; } - private void addToList(final List result, org.apache.parquet.schema.Type field) { + private void addToList(final List result, final org.apache.parquet.schema.Type field, final boolean withId) { field.accept(new TypeVisitor() { @Override public void visit(PrimitiveType primitiveType) { - SchemaElement element = schemaElementfromType(primitiveType); + SchemaElement element = schemaElementfromField(primitiveType, withId); element.setRepetition_type(toParquetRepetition(primitiveType.getRepetition())); element.setType(getType(primitiveType.getPrimitiveTypeName())); if (primitiveType.getOriginalType() != null) { @@ -157,13 +155,13 @@ public void visit(PrimitiveType primitiveType) { @Override public void visit(MessageType messageType) { - SchemaElement element = schemaElementfromType(messageType); + SchemaElement element = new SchemaElement(messageType.getName()); visitChildren(result, messageType.asGroupType(), element); } @Override public void visit(GroupType groupType) { - SchemaElement element = schemaElementfromType(groupType); + SchemaElement element = schemaElementfromField(groupType, withId); element.setRepetition_type(toParquetRepetition(groupType.getRepetition())); if (groupType.getOriginalType() != null) { element.setConverted_type(getConvertedType(groupType.getOriginalType())); @@ -176,7 +174,7 @@ private void visitChildren(final List result, element.setNum_children(groupType.getFieldCount()); result.add(element); for (org.apache.parquet.schema.Type field : groupType.getFields()) { - addToList(result, field); + addToList(result, field, withId); } } }); @@ -189,9 +187,12 @@ private void visitChildren(final List result, * @param field a field of the parquet schema * @return SchemaElement */ - private static SchemaElement schemaElementfromType(org.apache.parquet.schema.Type field) { + private static SchemaElement schemaElementfromField(org.apache.parquet.schema.Type field, boolean withId) { SchemaElement element = new SchemaElement(field.getName()); - if (field.getId() != null) { + if (withId) { + if (field.getId() == null) { + throw new InvalidSchemaException("Field id is required, but not found in " + field.toString()); + } element.setField_id(field.getId().intValue()); } return element; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java index 35c35c19c6..a5e3f8a89c 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -44,6 +44,7 @@ import java.util.Set; import java.util.TreeSet; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.Version; @@ -61,6 +62,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.InvalidSchemaException; import org.junit.Assert; import org.junit.Test; @@ -100,23 +102,55 @@ public void testPageHeader() throws IOException { @Test public void testSchemaConverter() { ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); - List parquetSchema = parquetMetadataConverter.toParquetSchema(Paper.schema); + List parquetSchema = parquetMetadataConverter.toParquetSchema(mockHadoopFileMetaData(Paper.schema, false)); MessageType schema = parquetMetadataConverter.fromParquetSchema(parquetSchema); assertEquals(Paper.schema, schema); } + @Test + public void testSchemaWithFieldId() { + final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); + final MessageType schema = Types.buildMessage() + .required(PrimitiveTypeName.BINARY) + .as(OriginalType.UTF8) + .id(1) + .named("stringField") + .optional(PrimitiveTypeName.INT32) + .as(OriginalType.INT_32) + .id(2) + .named("intField") + .named("Message"); + List parquetSchema = parquetMetadataConverter.toParquetSchema(mockHadoopFileMetaData(schema, true)); + assertEquals(schema, parquetMetadataConverter.fromParquetSchema(parquetSchema)); + } + + @Test(expected = InvalidSchemaException.class) + public void testSchemaExpectingFieldId() { + final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); + final MessageType messageType = Types.buildMessage() + .required(PrimitiveTypeName.BINARY) + .as(OriginalType.UTF8) + .named("stringField") + .optional(PrimitiveTypeName.INT32) + .as(OriginalType.INT_32) + .named("intField") + .named("Message"); + parquetMetadataConverter.toParquetSchema(mockHadoopFileMetaData(messageType, true)); + } + @Test public void testSchemaConverterDecimal() { ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); + final MessageType messageType = Types.buildMessage() + .required(PrimitiveTypeName.BINARY) + .as(OriginalType.DECIMAL).precision(9).scale(2) + .named("aBinaryDecimal") + .optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(4) + .as(OriginalType.DECIMAL).precision(9).scale(2) + .named("aFixedDecimal") + .named("Message"); List schemaElements = parquetMetadataConverter.toParquetSchema( - Types.buildMessage() - .required(PrimitiveTypeName.BINARY) - .as(OriginalType.DECIMAL).precision(9).scale(2) - .named("aBinaryDecimal") - .optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(4) - .as(OriginalType.DECIMAL).precision(9).scale(2) - .named("aFixedDecimal") - .named("Message") + mockHadoopFileMetaData(messageType, false) ); List expected = Lists.newArrayList( new SchemaElement("Message").setNum_children(2), @@ -164,6 +198,12 @@ public void testEnumEquivalence() { } } + private org.apache.parquet.hadoop.metadata.FileMetaData mockHadoopFileMetaData(MessageType messageType, boolean withId) { + return new org.apache.parquet.hadoop.metadata.FileMetaData(messageType, + withId ? ImmutableMap.of(ParquetMetadataConverter.PARQUET_SCHEMA_FIELD_WITH_ID, "true") : Collections.emptyMap(), + null); + } + private FileMetaData metadata(long... sizes) { List schema = emptyList(); List rowGroups = new ArrayList(); @@ -330,7 +370,7 @@ private ColumnChunkMetaData createColumnChunkMetaData() { 0, 0, 0, 0, 0); return md; } - + @Test public void testEncodingsCache() { ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java index b22a3556e3..a56d4c89af 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java @@ -22,7 +22,9 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.twitter.elephantbird.util.Protobufs; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.column.Dictionary; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.Converter; @@ -51,15 +53,15 @@ class ProtoMessageConverter extends GroupConverter { private final Converter[] converters; private final ParentValueContainer parent; private final Message.Builder myBuilder; + private final Configuration configuration; // used in record converter - ProtoMessageConverter(ParentValueContainer pvc, Class protoClass, GroupType parquetSchema) { - this(pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema); + ProtoMessageConverter(ParentValueContainer pvc, Class protoClass, GroupType parquetSchema, Configuration configuration) { + this(pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema, configuration); } - // For usage in message arrays - ProtoMessageConverter(ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema) { + ProtoMessageConverter(ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema, Configuration configuration) { int schemaSize = parquetSchema.getFieldCount(); converters = new Converter[schemaSize]; @@ -72,12 +74,13 @@ class ProtoMessageConverter extends GroupConverter { } myBuilder = builder; + this.configuration = configuration; Descriptors.Descriptor protoDescriptor = builder.getDescriptorForType(); - + final boolean readFieldById = configuration.getBoolean(ParquetMetadataConverter.PARQUET_SCHEMA_FIELD_WITH_ID, true); for (Type parquetField : parquetSchema.getFields()) { - // Find field by id, fall back to find field by name if no id found (legacy schema). - Descriptors.FieldDescriptor protoField = parquetField.getId() == null ? + // Find field by id, fall back to find field by name if either flag is set to false explicitly or no id found (legacy schema). + Descriptors.FieldDescriptor protoField = !readFieldById || (parquetField.getId() == null) ? protoDescriptor.findFieldByName(parquetField.getName()) : protoDescriptor.findFieldByNumber(parquetField.getId().intValue()); @@ -151,7 +154,7 @@ private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder p case LONG: return new ProtoLongConverter(pvc); case MESSAGE: { Message.Builder subBuilder = parentBuilder.newBuilderForField(fieldDescriptor); - return new ProtoMessageConverter(pvc, subBuilder, parquetType.asGroupType()); + return new ProtoMessageConverter(pvc, subBuilder, parquetType.asGroupType(), configuration); } } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java index 3a21d84486..faf7b19fc9 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -90,7 +90,7 @@ public RecordMaterializer prepareForRead(Configuration configuration, Map protobufClass = Protobufs.getProtobufClass(headerProtoClass); - return new ProtoRecordMaterializer(requestedSchema, protobufClass); + return new ProtoRecordMaterializer(requestedSchema, protobufClass, configuration); } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java index 99153a98cc..02d6d0af34 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,6 +21,7 @@ import com.google.protobuf.Message; import com.google.protobuf.MessageOrBuilder; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.schema.MessageType; /** @@ -46,13 +47,13 @@ public void add(Object a) { } - public ProtoRecordConverter(Class protoclass, MessageType parquetSchema) { - super(new SkipParentValueContainer(), protoclass, parquetSchema); + public ProtoRecordConverter(Class protoclass, MessageType parquetSchema, Configuration configuration) { + super(new SkipParentValueContainer(), protoclass, parquetSchema, configuration); reusedBuilder = getBuilder(); } - public ProtoRecordConverter(Message.Builder builder, MessageType parquetSchema) { - super(new SkipParentValueContainer(), builder, parquetSchema); + public ProtoRecordConverter(Message.Builder builder, MessageType parquetSchema, Configuration configuration) { + super(new SkipParentValueContainer(), builder, parquetSchema, configuration); reusedBuilder = getBuilder(); } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java index 039a571137..ea6b051b7d 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,6 +20,7 @@ import com.google.protobuf.Message; import com.google.protobuf.MessageOrBuilder; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; @@ -28,8 +29,8 @@ class ProtoRecordMaterializer extends RecordMaterial private final ProtoRecordConverter root; - public ProtoRecordMaterializer(MessageType requestedSchema, Class protobufClass) { - this.root = new ProtoRecordConverter(protobufClass, requestedSchema); + public ProtoRecordMaterializer(MessageType requestedSchema, Class protobufClass, Configuration configuration) { + this.root = new ProtoRecordConverter(protobufClass, requestedSchema, configuration); } @Override diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java index c0ed351046..9e6ae1e427 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,6 +26,7 @@ import com.google.protobuf.TextFormat; import com.twitter.elephantbird.util.Protobufs; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.BadConfigurationException; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.io.InvalidRecordException; @@ -118,6 +119,10 @@ public WriteContext init(Configuration configuration) { Map extraMetaData = new HashMap(); extraMetaData.put(ProtoReadSupport.PB_CLASS, protoMessage.getName()); extraMetaData.put(ProtoReadSupport.PB_DESCRIPTOR, serializeDescriptor(protoMessage)); + // By default, we will persist field id in the parquet schema metadata. + // This can be turned off by explicitly set the flag to false in the configuration. + extraMetaData.put(ParquetMetadataConverter.PARQUET_SCHEMA_FIELD_WITH_ID, + String.valueOf(configuration.getBoolean(ParquetMetadataConverter.PARQUET_SCHEMA_FIELD_WITH_ID, true))); return new WriteContext(rootSchema, extraMetaData); }