Skip to content

Commit

Permalink
Configurable behavior on the field id persistence
Browse files Browse the repository at this point in the history
Use parameter "parquet.schema.field.with.id" to enable schema field
id persistence. When this flag is on, all schema fields should
contain id (this is generally different from the field index which
is the field position), otherwise it will be considered as an
error. This flag is set as extra metadata in the footer, and its
default value is false.

For parquet-protobuf, this flag is systematically set to true,
except that it's set to false explicitly in the configuration,
which makes field id persistence as the default behavior for
parquet-protobuf.

Change-Id: I3a073a8a42a9ec21c782ab85d8e27cd74595f45d
  • Loading branch information
qinghui-xu committed May 29, 2017
1 parent 014d30b commit ceed4b5
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.CorruptStatistics;
import org.apache.parquet.format.PageEncodingStats;
Expand All @@ -62,14 +63,9 @@
import org.apache.parquet.column.EncodingStats;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.*;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type.Repetition;
import org.apache.parquet.schema.TypeVisitor;
import org.apache.parquet.schema.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -80,6 +76,7 @@ public class ParquetMetadataConverter {
public static final MetadataFilter NO_FILTER = new NoFilter();
public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k
public static final String PARQUET_SCHEMA_FIELD_WITH_ID = "parquet.schema.field.with.id";

private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataConverter.class);

Expand Down Expand Up @@ -115,7 +112,7 @@ public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parque
}
FileMetaData fileMetaData = new FileMetaData(
currentVersion,
toParquetSchema(parquetMetadata.getFileMetaData().getSchema()),
toParquetSchema(parquetMetadata.getFileMetaData()),
numRows,
rowGroups);

Expand All @@ -129,17 +126,18 @@ public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parque
}

// Visible for testing
List<SchemaElement> toParquetSchema(MessageType schema) {
List<SchemaElement> toParquetSchema(org.apache.parquet.hadoop.metadata.FileMetaData fileMetaData) {
List<SchemaElement> result = new ArrayList<SchemaElement>();
addToList(result, schema);
String withId = fileMetaData.getKeyValueMetaData().get(PARQUET_SCHEMA_FIELD_WITH_ID);
addToList(result, fileMetaData.getSchema(), !StringUtils.isBlank(withId) && Boolean.valueOf(withId));
return result;
}

private void addToList(final List<SchemaElement> result, org.apache.parquet.schema.Type field) {
private void addToList(final List<SchemaElement> result, final org.apache.parquet.schema.Type field, final boolean withId) {
field.accept(new TypeVisitor() {
@Override
public void visit(PrimitiveType primitiveType) {
SchemaElement element = schemaElementfromType(primitiveType);
SchemaElement element = schemaElementfromField(primitiveType, withId);
element.setRepetition_type(toParquetRepetition(primitiveType.getRepetition()));
element.setType(getType(primitiveType.getPrimitiveTypeName()));
if (primitiveType.getOriginalType() != null) {
Expand All @@ -157,13 +155,13 @@ public void visit(PrimitiveType primitiveType) {

@Override
public void visit(MessageType messageType) {
SchemaElement element = schemaElementfromType(messageType);
SchemaElement element = new SchemaElement(messageType.getName());
visitChildren(result, messageType.asGroupType(), element);
}

@Override
public void visit(GroupType groupType) {
SchemaElement element = schemaElementfromType(groupType);
SchemaElement element = schemaElementfromField(groupType, withId);
element.setRepetition_type(toParquetRepetition(groupType.getRepetition()));
if (groupType.getOriginalType() != null) {
element.setConverted_type(getConvertedType(groupType.getOriginalType()));
Expand All @@ -176,7 +174,7 @@ private void visitChildren(final List<SchemaElement> result,
element.setNum_children(groupType.getFieldCount());
result.add(element);
for (org.apache.parquet.schema.Type field : groupType.getFields()) {
addToList(result, field);
addToList(result, field, withId);
}
}
});
Expand All @@ -189,9 +187,12 @@ private void visitChildren(final List<SchemaElement> result,
* @param field a field of the parquet schema
* @return SchemaElement
*/
private static SchemaElement schemaElementfromType(org.apache.parquet.schema.Type field) {
private static SchemaElement schemaElementfromField(org.apache.parquet.schema.Type field, boolean withId) {
SchemaElement element = new SchemaElement(field.getName());
if (field.getId() != null) {
if (withId) {
if (field.getId() == null) {
throw new InvalidSchemaException("Field id is required, but not found in " + field.toString());
}
element.setField_id(field.getId().intValue());
}
return element;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down Expand Up @@ -44,6 +44,7 @@
import java.util.Set;
import java.util.TreeSet;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.Version;
Expand All @@ -61,6 +62,7 @@
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.InvalidSchemaException;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -100,23 +102,55 @@ public void testPageHeader() throws IOException {
@Test
public void testSchemaConverter() {
ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
List<SchemaElement> parquetSchema = parquetMetadataConverter.toParquetSchema(Paper.schema);
List<SchemaElement> parquetSchema = parquetMetadataConverter.toParquetSchema(mockHadoopFileMetaData(Paper.schema, false));
MessageType schema = parquetMetadataConverter.fromParquetSchema(parquetSchema);
assertEquals(Paper.schema, schema);
}

@Test
public void testSchemaWithFieldId() {
final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
final MessageType schema = Types.buildMessage()
.required(PrimitiveTypeName.BINARY)
.as(OriginalType.UTF8)
.id(1)
.named("stringField")
.optional(PrimitiveTypeName.INT32)
.as(OriginalType.INT_32)
.id(2)
.named("intField")
.named("Message");
List<SchemaElement> parquetSchema = parquetMetadataConverter.toParquetSchema(mockHadoopFileMetaData(schema, true));
assertEquals(schema, parquetMetadataConverter.fromParquetSchema(parquetSchema));
}

@Test(expected = InvalidSchemaException.class)
public void testSchemaExpectingFieldId() {
final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
final MessageType messageType = Types.buildMessage()
.required(PrimitiveTypeName.BINARY)
.as(OriginalType.UTF8)
.named("stringField")
.optional(PrimitiveTypeName.INT32)
.as(OriginalType.INT_32)
.named("intField")
.named("Message");
parquetMetadataConverter.toParquetSchema(mockHadoopFileMetaData(messageType, true));
}

@Test
public void testSchemaConverterDecimal() {
ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
final MessageType messageType = Types.buildMessage()
.required(PrimitiveTypeName.BINARY)
.as(OriginalType.DECIMAL).precision(9).scale(2)
.named("aBinaryDecimal")
.optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(4)
.as(OriginalType.DECIMAL).precision(9).scale(2)
.named("aFixedDecimal")
.named("Message");
List<SchemaElement> schemaElements = parquetMetadataConverter.toParquetSchema(
Types.buildMessage()
.required(PrimitiveTypeName.BINARY)
.as(OriginalType.DECIMAL).precision(9).scale(2)
.named("aBinaryDecimal")
.optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(4)
.as(OriginalType.DECIMAL).precision(9).scale(2)
.named("aFixedDecimal")
.named("Message")
mockHadoopFileMetaData(messageType, false)
);
List<SchemaElement> expected = Lists.newArrayList(
new SchemaElement("Message").setNum_children(2),
Expand Down Expand Up @@ -164,6 +198,12 @@ public void testEnumEquivalence() {
}
}

private org.apache.parquet.hadoop.metadata.FileMetaData mockHadoopFileMetaData(MessageType messageType, boolean withId) {
return new org.apache.parquet.hadoop.metadata.FileMetaData(messageType,
withId ? ImmutableMap.of(ParquetMetadataConverter.PARQUET_SCHEMA_FIELD_WITH_ID, "true") : Collections.<String, String>emptyMap(),
null);
}

private FileMetaData metadata(long... sizes) {
List<SchemaElement> schema = emptyList();
List<RowGroup> rowGroups = new ArrayList<RowGroup>();
Expand Down Expand Up @@ -330,7 +370,7 @@ private ColumnChunkMetaData createColumnChunkMetaData() {
0, 0, 0, 0, 0);
return md;
}

@Test
public void testEncodingsCache() {
ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.twitter.elephantbird.util.Protobufs;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.io.InvalidRecordException;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.Converter;
Expand Down Expand Up @@ -51,15 +53,15 @@ class ProtoMessageConverter extends GroupConverter {
private final Converter[] converters;
private final ParentValueContainer parent;
private final Message.Builder myBuilder;
private final Configuration configuration;

// used in record converter
ProtoMessageConverter(ParentValueContainer pvc, Class<? extends Message> protoClass, GroupType parquetSchema) {
this(pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema);
ProtoMessageConverter(ParentValueContainer pvc, Class<? extends Message> protoClass, GroupType parquetSchema, Configuration configuration) {
this(pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema, configuration);
}


// For usage in message arrays
ProtoMessageConverter(ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema) {
ProtoMessageConverter(ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema, Configuration configuration) {

int schemaSize = parquetSchema.getFieldCount();
converters = new Converter[schemaSize];
Expand All @@ -72,12 +74,13 @@ class ProtoMessageConverter extends GroupConverter {
}

myBuilder = builder;
this.configuration = configuration;

Descriptors.Descriptor protoDescriptor = builder.getDescriptorForType();

final boolean readFieldById = configuration.getBoolean(ParquetMetadataConverter.PARQUET_SCHEMA_FIELD_WITH_ID, true);
for (Type parquetField : parquetSchema.getFields()) {
// Find field by id, fall back to find field by name if no id found (legacy schema).
Descriptors.FieldDescriptor protoField = parquetField.getId() == null ?
// Find field by id, fall back to find field by name if either flag is set to false explicitly or no id found (legacy schema).
Descriptors.FieldDescriptor protoField = !readFieldById || (parquetField.getId() == null) ?
protoDescriptor.findFieldByName(parquetField.getName()) :
protoDescriptor.findFieldByNumber(parquetField.getId().intValue());

Expand Down Expand Up @@ -151,7 +154,7 @@ private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder p
case LONG: return new ProtoLongConverter(pvc);
case MESSAGE: {
Message.Builder subBuilder = parentBuilder.newBuilderForField(fieldDescriptor);
return new ProtoMessageConverter(pvc, subBuilder, parquetType.asGroupType());
return new ProtoMessageConverter(pvc, subBuilder, parquetType.asGroupType(), configuration);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down Expand Up @@ -90,7 +90,7 @@ public RecordMaterializer<T> prepareForRead(Configuration configuration, Map<Str

MessageType requestedSchema = readContext.getRequestedSchema();
Class<? extends Message> protobufClass = Protobufs.getProtobufClass(headerProtoClass);
return new ProtoRecordMaterializer(requestedSchema, protobufClass);
return new ProtoRecordMaterializer(requestedSchema, protobufClass, configuration);
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -21,6 +21,7 @@

import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.schema.MessageType;

/**
Expand All @@ -46,13 +47,13 @@ public void add(Object a) {
}


public ProtoRecordConverter(Class<? extends Message> protoclass, MessageType parquetSchema) {
super(new SkipParentValueContainer(), protoclass, parquetSchema);
public ProtoRecordConverter(Class<? extends Message> protoclass, MessageType parquetSchema, Configuration configuration) {
super(new SkipParentValueContainer(), protoclass, parquetSchema, configuration);
reusedBuilder = getBuilder();
}

public ProtoRecordConverter(Message.Builder builder, MessageType parquetSchema) {
super(new SkipParentValueContainer(), builder, parquetSchema);
public ProtoRecordConverter(Message.Builder builder, MessageType parquetSchema, Configuration configuration) {
super(new SkipParentValueContainer(), builder, parquetSchema, configuration);
reusedBuilder = getBuilder();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -20,6 +20,7 @@

import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
Expand All @@ -28,8 +29,8 @@ class ProtoRecordMaterializer<T extends MessageOrBuilder> extends RecordMaterial

private final ProtoRecordConverter<T> root;

public ProtoRecordMaterializer(MessageType requestedSchema, Class<? extends Message> protobufClass) {
this.root = new ProtoRecordConverter<T>(protobufClass, requestedSchema);
public ProtoRecordMaterializer(MessageType requestedSchema, Class<? extends Message> protobufClass, Configuration configuration) {
this.root = new ProtoRecordConverter<T>(protobufClass, requestedSchema, configuration);
}

@Override
Expand Down
Loading

0 comments on commit ceed4b5

Please sign in to comment.