Skip to content

Commit

Permalink
PARQUET-2: Adding Type Persuasion for Primitive Types
Browse files Browse the repository at this point in the history
Original from the old repo: Parquet/parquet-mr#410
JIRA: https://issues.apache.org/jira/browse/PARQUET-2

These changes allow primitive types to be requested as different types than what is stored in the file format using a flag to turn off strict type checking (default is on). Types are cast to the requested type where possible and will suffer precision loss for casting where necessary (e.g. requesting a double as an int).

No performance penalty is imposed for using the type defined in the file type.  A flag exists to

A 6x6 test case is provided to test conversion between the primitive types.

Author: Daniel Weeks <[email protected]>

Closes #3 from dcw-netflix/type-persuasion and squashes the following commits:

97f4e9a [Daniel Weeks] Added documentation as suggested by code review
1c3c0c7 [Daniel Weeks] Fixed test with strict checking off
f3cb495 [Daniel Weeks] Added type persuasion for primitive types with a flag to control strict type checking for conflicting schemas, which is strict by default.
  • Loading branch information
Daniel Weeks authored and julienledem committed Jun 24, 2014
1 parent 859b6b4 commit 9ad5485
Show file tree
Hide file tree
Showing 10 changed files with 305 additions and 23 deletions.
21 changes: 19 additions & 2 deletions parquet-column/src/main/java/parquet/io/ColumnIOFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,16 @@ public class ColumnIOCreatorVisitor implements TypeVisitor {
private final MessageType requestedSchema;
private int currentRequestedIndex;
private Type currentRequestedType;
private boolean strictTypeChecking;

public ColumnIOCreatorVisitor(boolean validating, MessageType requestedSchema) {
this(validating, requestedSchema, true);
}

public ColumnIOCreatorVisitor(boolean validating, MessageType requestedSchema, boolean strictTypeChecking) {
this.validating = validating;
this.requestedSchema = requestedSchema;
this.strictTypeChecking = strictTypeChecking;
}

@Override
Expand Down Expand Up @@ -86,7 +92,8 @@ private void visitChildren(GroupColumnIO newIO, GroupType groupType, GroupType r

@Override
public void visit(PrimitiveType primitiveType) {
if (!currentRequestedType.isPrimitive() || currentRequestedType.asPrimitiveType().getPrimitiveTypeName() != primitiveType.getPrimitiveTypeName()) {
if (!currentRequestedType.isPrimitive() ||
(this.strictTypeChecking && currentRequestedType.asPrimitiveType().getPrimitiveTypeName() != primitiveType.getPrimitiveTypeName())) {
incompatibleSchema(primitiveType, currentRequestedType);
}
PrimitiveColumnIO newIO = new PrimitiveColumnIO(primitiveType, current, currentRequestedIndex, leaves.size());
Expand Down Expand Up @@ -127,7 +134,17 @@ public ColumnIOFactory(boolean validating) {
* @return the corresponding serializing/deserializing structure
*/
public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType fileSchema) {
ColumnIOCreatorVisitor visitor = new ColumnIOCreatorVisitor(validating, requestedSchema);
return getColumnIO(requestedSchema, fileSchema, true);
}

/**
* @param schema the requestedSchema we want to read/write
* @param fileSchema the file schema (when reading it can be different from the requested schema)
* @param strict should file type and requested primitive types match
* @return the corresponding serializing/deserializing structure
*/
public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType fileSchema, boolean strict) {
ColumnIOCreatorVisitor visitor = new ColumnIOCreatorVisitor(validating, requestedSchema, strict);
fileSchema.accept(visitor);
return visitor.getColumnIO();
}
Expand Down
17 changes: 16 additions & 1 deletion parquet-column/src/main/java/parquet/schema/GroupType.java
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ protected <T> List<T> convertChildren(List<GroupType> path, TypeConverter<T> con

@Override
protected Type union(Type toMerge) {
return union(toMerge, true);
}

@Override
protected Type union(Type toMerge, boolean strict) {
if (toMerge.isPrimitive()) {
throw new IncompatibleSchemaModificationException("can not merge primitive type " + toMerge + " into group type " + this);
}
Expand All @@ -305,6 +310,16 @@ protected Type union(Type toMerge) {
* @return the merged list
*/
List<Type> mergeFields(GroupType toMerge) {
return mergeFields(toMerge, true);
}

/**
* produces the list of fields resulting from merging toMerge into the fields of this
* @param toMerge the group containing the fields to merge
* @param strict should schema primitive types match
* @return the merged list
*/
List<Type> mergeFields(GroupType toMerge, boolean strict) {
List<Type> newFields = new ArrayList<Type>();
// merge existing fields
for (Type type : this.getFields()) {
Expand All @@ -314,7 +329,7 @@ List<Type> mergeFields(GroupType toMerge) {
if (fieldToMerge.getRepetition().isMoreRestrictiveThan(type.getRepetition())) {
throw new IncompatibleSchemaModificationException("repetition constraint is more restrictive: can not merge type " + fieldToMerge + " into " + type);
}
merged = type.union(fieldToMerge);
merged = type.union(fieldToMerge, strict);
} else {
merged = type;
}
Expand Down
6 changes: 5 additions & 1 deletion parquet-column/src/main/java/parquet/schema/MessageType.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ public boolean containsPath(String[] path) {
}

public MessageType union(MessageType toMerge) {
return new MessageType(this.getName(), mergeFields(toMerge));
return union(toMerge, true);
}

public MessageType union(MessageType toMerge, boolean strict) {
return new MessageType(this.getName(), mergeFields(toMerge, strict));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ abstract public void addValueToPrimitiveConverter(
private final PrimitiveTypeName primitive;
private final int length;
private final DecimalMetadata decimalMeta;

/**
* @param repetition OPTIONAL, REPEATED, REQUIRED
* @param primitive STRING, INT64, ...
Expand Down Expand Up @@ -486,7 +486,12 @@ protected boolean containsPath(String[] path, int depth) {

@Override
protected Type union(Type toMerge) {
if (!toMerge.isPrimitive() || !primitive.equals(toMerge.asPrimitiveType().getPrimitiveTypeName())) {
return union(toMerge, true);
}

@Override
protected Type union(Type toMerge, boolean strict) {
if (!toMerge.isPrimitive() || (strict && !primitive.equals(toMerge.asPrimitiveType().getPrimitiveTypeName()))) {
throw new IncompatibleSchemaModificationException("can not merge type " + toMerge + " into " + this);
}
Types.PrimitiveBuilder<PrimitiveType> builder = Types.primitive(
Expand Down
7 changes: 7 additions & 0 deletions parquet-column/src/main/java/parquet/schema/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ public boolean equals(Object other) {
* @return the union result of merging toMerge into this
*/
protected abstract Type union(Type toMerge);

/**
* @param toMerge the type to merge into this one
* @param strict should schema primitive types match
* @return the union result of merging toMerge into this
*/
protected abstract Type union(Type toMerge, boolean strict);

/**
* {@inheritDoc}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import static java.lang.String.format;
import static parquet.Log.DEBUG;
import static parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;

class InternalParquetRecordReader<T> {
private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);
Expand All @@ -57,6 +58,7 @@ class InternalParquetRecordReader<T> {
private ParquetFileReader reader;
private parquet.io.RecordReader<T> recordReader;
private UnboundRecordFilter recordFilter;
private boolean strictTypeChecking;

private long totalTimeSpentReadingBytes;
private long totalTimeSpentProcessingRecords;
Expand Down Expand Up @@ -106,7 +108,7 @@ private void checkRead() throws IOException {
BenchmarkCounter.incrementTime(timeSpentReading);
LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema);
MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking);
recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter);
startedAssemblingCurrentBlockAt = System.currentTimeMillis();
totalCountLoadedSoFar += pages.getRowCount();
Expand Down Expand Up @@ -142,7 +144,7 @@ public void initialize(MessageType requestedSchema, MessageType fileSchema,
this.recordConverter = readSupport.prepareForRead(
configuration, extraMetadata, fileSchema,
new ReadSupport.ReadContext(requestedSchema, readSupportMetadata));

this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
List<ColumnDescriptor> columns = requestedSchema.getColumns();
reader = new ParquetFileReader(configuration, file, blocks, columns);
for (BlockMetaData block : blocks) {
Expand Down
32 changes: 28 additions & 4 deletions parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class ParquetFileWriter {
private long currentChunkFirstDataPage;
private long currentChunkDictionaryPageOffset;
private long currentChunkValueCount;

private Statistics currentStatistics;

/**
Expand Down Expand Up @@ -439,11 +439,16 @@ public long getPos() throws IOException {
* @param footers the list files footers to merge
* @return the global meta data for all the footers
*/

static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
return getGlobalMetaData(footers, true);
}

static GlobalMetaData getGlobalMetaData(List<Footer> footers, boolean strict) {
GlobalMetaData fileMetaData = null;
for (Footer footer : footers) {
ParquetMetadata currentMetadata = footer.getParquetMetadata();
fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData);
fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData, strict);
}
return fileMetaData;
}
Expand All @@ -457,6 +462,13 @@ static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
static GlobalMetaData mergeInto(
FileMetaData toMerge,
GlobalMetaData mergedMetadata) {
return mergeInto(toMerge, mergedMetadata, true);
}

static GlobalMetaData mergeInto(
FileMetaData toMerge,
GlobalMetaData mergedMetadata,
boolean strict) {
MessageType schema = null;
Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>();
Set<String> createdBy = new HashSet<String>();
Expand All @@ -467,7 +479,7 @@ static GlobalMetaData mergeInto(
}
if ((schema == null && toMerge.getSchema() != null)
|| (schema != null && !schema.equals(toMerge.getSchema()))) {
schema = mergeInto(toMerge.getSchema(), schema);
schema = mergeInto(toMerge.getSchema(), schema, strict);
}
for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) {
Set<String> values = newKeyValues.get(entry.getKey());
Expand All @@ -491,10 +503,22 @@ static GlobalMetaData mergeInto(
* @return the resulting schema
*/
static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
return mergeInto(toMerge, mergedSchema, true);
}

/**
* will return the result of merging toMerge into mergedSchema
* @param toMerge the schema to merge into mergedSchema
* @param mergedSchema the schema to append the fields to
* @param strict should schema primitive types match
* @return the resulting schema
*/
static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema, boolean strict) {
if (mergedSchema == null) {
return toMerge;
}
return mergedSchema.union(toMerge);

return mergedSchema.union(toMerge, strict);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
* key to configure the filter
*/
public static final String UNBOUND_RECORD_FILTER = "parquet.read.filter";

/**
* key to configure type checking for conflicting schemas (default: true)
*/
public static final String STRICT_TYPE_CHECKING = "parquet.strict.typing";

private Class<?> readSupportClass;
private List<Footer> footers;
Expand Down Expand Up @@ -358,7 +363,7 @@ public List<ParquetInputSplit> getSplits(Configuration configuration, List<Foote
throw new ParquetDecodingException("maxSplitSize or minSplitSie should not be negative: maxSplitSize = " + maxSplitSize + "; minSplitSize = " + minSplitSize);
}
List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers, configuration.getBoolean(STRICT_TYPE_CHECKING, true));
ReadContext readContext = getReadSupport(configuration).init(new InitContext(
configuration,
globalMetaData.getKeyValueMetaData(),
Expand Down
Loading

0 comments on commit 9ad5485

Please sign in to comment.