From ae77413e7a1b140245e7a8803cd81c4323273e66 Mon Sep 17 00:00:00 2001 From: Matthieu Martin Date: Tue, 24 Jun 2014 15:13:53 -0700 Subject: [PATCH] Merging/cherry-picking: 1) Cascading write support (see: https://github.com/apache/incubator-parquet-mr/commit/76bbf4a88645abc657ba6e4c2dc636712f03b944) 2) Caching fix (see: https://github.com/apache/incubator-parquet-mr/pull/2 and https://github.com/matt-martin/incubator-parquet-mr/tree/99bb5a3acdf6c998f5d344bbeaeb446031f47df0) --- .../parquet/cascading/ParquetTupleScheme.java | 44 +++-- .../parquet/cascading/TupleWriteSupport.java | 103 ++++++++++ parquet-hadoop/pom.xml | 6 + .../main/java/parquet/hadoop/LruCache.java | 181 ++++++++++++++++++ .../parquet/hadoop/ParquetInputFormat.java | 143 +++++++++++++- .../parquet/hadoop/ParquetOutputFormat.java | 5 + .../java/parquet/hadoop/TestInputFormat.java | 62 +++++- .../java/parquet/hadoop/TestLruCache.java | 144 ++++++++++++++ 8 files changed, 664 insertions(+), 24 deletions(-) create mode 100644 parquet-cascading/src/main/java/parquet/cascading/TupleWriteSupport.java create mode 100644 parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java create mode 100644 parquet-hadoop/src/test/java/parquet/hadoop/TestLruCache.java diff --git a/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java b/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java index d75b6e76d..1f766f431 100644 --- a/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java +++ b/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java @@ -20,12 +20,10 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; - import org.apache.hadoop.fs.Path; - import parquet.hadoop.ParquetInputFormat; import parquet.hadoop.ParquetFileReader; + import parquet.hadoop.ParquetInputFormat; import parquet.hadoop.Footer; - import parquet.hadoop.metadata.ParquetMetadata; import parquet.hadoop.mapred.Container; import parquet.hadoop.mapred.DeprecatedParquetInputFormat; import parquet.schema.MessageType; @@ -40,6 +38,9 @@ import cascading.tap.hadoop.Hfs; import cascading.tuple.Tuple; import cascading.tuple.Fields; + import cascading.tuple.TupleEntry; + import parquet.hadoop.ParquetOutputFormat; + import parquet.hadoop.mapred.DeprecatedParquetOutputFormat; /** * A Cascading Scheme that converts Parquet groups into Cascading tuples. @@ -56,6 +57,7 @@ public class ParquetTupleScheme extends Scheme{ private static final long serialVersionUID = 0L; + private String parquetSchema; public ParquetTupleScheme() { super(); @@ -65,6 +67,20 @@ public ParquetTupleScheme(Fields sourceFields) { super(sourceFields); } + /** + * ParquetTupleScheme constructor used a sink need to be implemented + * + * @param sourceFields used for the reading step + * @param sinkFields used for the writing step + * @param schema is mandatory if you add sinkFields and needs to be the + * toString() from a MessageType. This value is going to be parsed when the + * parquet file will be created. + */ + public ParquetTupleScheme(Fields sourceFields, Fields sinkFields, final String schema) { + super(sourceFields, sinkFields); + parquetSchema = schema; + } + @SuppressWarnings("rawtypes") @Override public void sourceConfInit(FlowProcess fp, @@ -122,19 +138,23 @@ public boolean source(FlowProcess fp, SourceCall arg0, - Tap arg1, JobConf arg2) { - throw new UnsupportedOperationException("ParquetTupleScheme does not support Sinks"); - + public void sinkConfInit(FlowProcess fp, + Tap tap, JobConf jobConf) { + jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class); + jobConf.set(TupleWriteSupport.PARQUET_CASCADING_SCHEMA, parquetSchema); + ParquetOutputFormat.setWriteSupportClass(jobConf, TupleWriteSupport.class); } @Override - public boolean isSink() { return false; } - + public boolean isSink() { + return parquetSchema != null; + } @Override - public void sink(FlowProcess arg0, SinkCall arg1) - throws IOException { - throw new UnsupportedOperationException("ParquetTupleScheme does not support Sinks"); + public void sink(FlowProcess fp, SinkCall sink) + throws IOException { + TupleEntry tuple = sink.getOutgoingEntry(); + OutputCollector outputCollector = sink.getOutput(); + outputCollector.collect(null, tuple); } } \ No newline at end of file diff --git a/parquet-cascading/src/main/java/parquet/cascading/TupleWriteSupport.java b/parquet-cascading/src/main/java/parquet/cascading/TupleWriteSupport.java new file mode 100644 index 000000000..6cc13427f --- /dev/null +++ b/parquet-cascading/src/main/java/parquet/cascading/TupleWriteSupport.java @@ -0,0 +1,103 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package parquet.cascading; + +import cascading.tuple.TupleEntry; +import java.util.HashMap; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import parquet.hadoop.api.WriteSupport; +import parquet.io.api.Binary; +import parquet.io.api.RecordConsumer; +import parquet.schema.MessageType; +import parquet.schema.MessageTypeParser; +import parquet.schema.PrimitiveType; +import parquet.schema.Type; + +/** + * + * + * @author Mickaƫl Lacour + */ +public class TupleWriteSupport extends WriteSupport { + + private RecordConsumer recordConsumer; + private MessageType rootSchema; + public static final String PARQUET_CASCADING_SCHEMA = "parquet.cascading.schema"; + + @Override + public WriteContext init(Configuration configuration) { + String schema = configuration.get(PARQUET_CASCADING_SCHEMA); + rootSchema = MessageTypeParser.parseMessageType(schema); + return new WriteContext(rootSchema, new HashMap()); + } + + @Override + public void prepareForWrite(RecordConsumer recordConsumer) { + this.recordConsumer = recordConsumer; + } + + @Override + public void write(TupleEntry record) { + recordConsumer.startMessage(); + final List fields = rootSchema.getFields(); + int i = 0; + for (Type field : fields) { + recordConsumer.startField(field.getName(), i); + if (field.isPrimitive()) { + writePrimitive(record, field.asPrimitiveType()); + } else { + throw new UnsupportedOperationException("Complex type not implemented"); + } + recordConsumer.endField(field.getName(), i); + ++i; + } + recordConsumer.endMessage(); + } + + private void writePrimitive(TupleEntry record, PrimitiveType field) { + if (record == null) { + return; + } + + switch (field.getPrimitiveTypeName()) { + case BINARY: + recordConsumer.addBinary(Binary.fromString(record.getString(field.getName()))); + break; + case BOOLEAN: + recordConsumer.addBoolean(record.getBoolean(field.getName())); + break; + case INT32: + recordConsumer.addInteger(record.getInteger(field.getName())); + break; + case INT64: + recordConsumer.addLong(record.getLong(field.getName())); + break; + case DOUBLE: + recordConsumer.addDouble(record.getDouble(field.getName())); + break; + case FLOAT: + recordConsumer.addFloat(record.getFloat(field.getName())); + break; + case FIXED_LEN_BYTE_ARRAY: + throw new UnsupportedOperationException("Fixed len byte array type not implemented"); + case INT96: + throw new UnsupportedOperationException("Int96 type not implemented"); + default: + throw new UnsupportedOperationException(field.getName() + " type not implemented"); + } + } +} diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index b122e459e..ecee1e0d7 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -61,6 +61,12 @@ jar compile + + org.mockito + mockito-all + 1.9.5 + test + diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java b/parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java new file mode 100644 index 000000000..e9ecb37d2 --- /dev/null +++ b/parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java @@ -0,0 +1,181 @@ +package parquet.hadoop; + +import parquet.Log; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * A basic implementation of an LRU cache. Besides evicting the least recently + * used entries (either based on insertion or access order), this class also + * checks for "stale" entries as entries are inserted or retrieved (note + * "staleness" is defined by the entries themselves (see + * {@link parquet.hadoop.LruCache.Value}). + * + * @param The key type. Acts as the key in a {@link java.util.LinkedHashMap} + * @param The value type. Must extend {@link parquet.hadoop.LruCache.Value} + * so that the "staleness" of the value can be easily determined. + */ +final class LruCache> { + private static final Log LOG = Log.getLog(LruCache.class); + + private static final float DEFAULT_LOAD_FACTOR = 0.75f; + + private final LinkedHashMap cacheMap; + + /** + * Constructs an access-order based LRU cache with {@code maxSize} entries. + * @param maxSize The maximum number of entries to store in the cache. + */ + public LruCache(final int maxSize) { + this(maxSize, DEFAULT_LOAD_FACTOR, true); + } + + /** + * Constructs an LRU cache. + * + * @param maxSize The maximum number of entries to store in the cache. + * @param loadFactor Used to determine the initial capacity. + * @param accessOrder the ordering mode - {@code true} for access-order, + * {@code false} for insertion-order + */ + public LruCache(final int maxSize, final float loadFactor, final boolean accessOrder) { + int initialCapacity = Math.round(maxSize / loadFactor); + cacheMap = + new LinkedHashMap(initialCapacity, loadFactor, accessOrder) { + @Override + public boolean removeEldestEntry(final Map.Entry eldest) { + boolean result = size() > maxSize; + if (result) { + if (Log.DEBUG) { + LOG.debug("Removing eldest entry in cache: " + + eldest.getKey()); + } + } + return result; + } + }; + } + + /** + * Removes the mapping for the specified key from this cache if present. + * @param key key whose mapping is to be removed from the cache + * @return the previous value associated with key, or null if there was no + * mapping for key. + */ + public V remove(final K key) { + V oldValue = cacheMap.remove(key); + if (oldValue != null) { + if (Log.DEBUG) { + LOG.debug("Removed cache entry for '" + key + "'"); + } + } + return oldValue; + } + + /** + * Associates the specified value with the specified key in this cache. The + * value is only inserted if it is not null and it is considered current. If + * the cache previously contained a mapping for the key, the old value is + * replaced only if the new value is "newer" than the old one. + * @param key key with which the specified value is to be associated + * @param newValue value to be associated with the specified key + */ + public void put(final K key, final V newValue) { + if (newValue == null || !newValue.isCurrent(key)) { + if (Log.WARN) { + LOG.warn("Ignoring new cache entry for '" + key + "' because it is " + + (newValue == null ? "null" : "not current")); + } + return; + } + + V oldValue = cacheMap.get(key); + if (oldValue != null && oldValue.isNewerThan(newValue)) { + if (Log.WARN) { + LOG.warn("Ignoring new cache entry for '" + key + "' because " + + "existing cache entry is newer"); + } + return; + } + + // no existing value or new value is newer than old value + oldValue = cacheMap.put(key, newValue); + if (Log.DEBUG) { + if (oldValue == null) { + LOG.debug("Added new cache entry for '" + key + "'"); + } else { + LOG.debug("Overwrote existing cache entry for '" + key + "'"); + } + } + } + + /** + * Removes all of the mappings from this cache. The cache will be empty + * after this call returns. + */ + public void clear() { + cacheMap.clear(); + } + + /** + * Returns the value to which the specified key is mapped, or null if 1) the + * value is not current or 2) this cache contains no mapping for the key. + * @param key the key whose associated value is to be returned + * @return the value to which the specified key is mapped, or null if 1) the + * value is not current or 2) this cache contains no mapping for the key + */ + public V getCurrentValue(final K key) { + V value = cacheMap.get(key); + if (Log.DEBUG) { + LOG.debug("Value for '" + key + "' " + (value == null ? "not " : "") + + "in cache"); + } + if (value != null && !value.isCurrent(key)) { + // value is not current; remove it and return null + remove(key); + return null; + } + + return value; + } + + /** + * Returns the number of key-value mappings in this cache. + * @return the number of key-value mappings in this cache. + */ + public int size() { + return cacheMap.size(); + } + + /** + * {@link parquet.hadoop.LruCache} expects all values to follow this + * interface so the cache can determine 1) whether values are current (e.g. + * the referenced data has not been modified/updated in such a way that the + * value is no longer useful) and 2) whether a value is strictly "newer" + * than another value. + * + * @param The key type. + * @param Provides a bound for the {@link #isNewerThan(V)} method + */ + interface Value { + /** + * Is the value still current (e.g. has the referenced data been + * modified/updated in such a way that the value is no longer useful) + * @param key the key associated with this value + * @return {@code true} the value is still current, {@code false} the value + * is no longer useful + */ + boolean isCurrent(K key); + + /** + * Compares this value with the specified value to check for relative age. + * @param otherValue the value to be compared. + * @return {@code true} the value is strictly newer than the other value, + * {@code false} the value is older or just + * as new as the other value. + */ + boolean isNewerThan(V otherValue); + } + +} diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java index 16fa13eab..2eb27c249 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java @@ -18,9 +18,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -77,8 +81,11 @@ public class ParquetInputFormat extends FileInputFormat { */ public static final String UNBOUND_RECORD_FILTER = "parquet.read.filter"; + private static final int MIN_FOOTER_CACHE_SIZE = 100; + + private LruCache footersCache; + private Class readSupportClass; - private List