Skip to content

Commit

Permalink
Merging/cherry-picking:
Browse files Browse the repository at this point in the history
  • Loading branch information
matt-martin committed Jun 24, 2014
1 parent 4d9a03b commit ae77413
Show file tree
Hide file tree
Showing 8 changed files with 664 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.fs.Path;

import parquet.hadoop.ParquetInputFormat;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.ParquetInputFormat;
import parquet.hadoop.Footer;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.hadoop.mapred.Container;
import parquet.hadoop.mapred.DeprecatedParquetInputFormat;
import parquet.schema.MessageType;
Expand All @@ -40,6 +38,9 @@
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Tuple;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;
import parquet.hadoop.ParquetOutputFormat;
import parquet.hadoop.mapred.DeprecatedParquetOutputFormat;

/**
* A Cascading Scheme that converts Parquet groups into Cascading tuples.
Expand All @@ -56,6 +57,7 @@
public class ParquetTupleScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{

private static final long serialVersionUID = 0L;
private String parquetSchema;

public ParquetTupleScheme() {
super();
Expand All @@ -65,6 +67,20 @@ public ParquetTupleScheme(Fields sourceFields) {
super(sourceFields);
}

/**
* ParquetTupleScheme constructor used a sink need to be implemented
*
* @param sourceFields used for the reading step
* @param sinkFields used for the writing step
* @param schema is mandatory if you add sinkFields and needs to be the
* toString() from a MessageType. This value is going to be parsed when the
* parquet file will be created.
*/
public ParquetTupleScheme(Fields sourceFields, Fields sinkFields, final String schema) {
super(sourceFields, sinkFields);
parquetSchema = schema;
}

@SuppressWarnings("rawtypes")
@Override
public void sourceConfInit(FlowProcess<JobConf> fp,
Expand Down Expand Up @@ -122,19 +138,23 @@ public boolean source(FlowProcess<JobConf> fp, SourceCall<Object[], RecordReader

@SuppressWarnings("rawtypes")
@Override
public void sinkConfInit(FlowProcess<JobConf> arg0,
Tap<JobConf, RecordReader, OutputCollector> arg1, JobConf arg2) {
throw new UnsupportedOperationException("ParquetTupleScheme does not support Sinks");

public void sinkConfInit(FlowProcess<JobConf> fp,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
jobConf.set(TupleWriteSupport.PARQUET_CASCADING_SCHEMA, parquetSchema);
ParquetOutputFormat.setWriteSupportClass(jobConf, TupleWriteSupport.class);
}

@Override
public boolean isSink() { return false; }

public boolean isSink() {
return parquetSchema != null;
}

@Override
public void sink(FlowProcess<JobConf> arg0, SinkCall<Object[], OutputCollector> arg1)
throws IOException {
throw new UnsupportedOperationException("ParquetTupleScheme does not support Sinks");
public void sink(FlowProcess<JobConf> fp, SinkCall<Object[], OutputCollector> sink)
throws IOException {
TupleEntry tuple = sink.getOutgoingEntry();
OutputCollector outputCollector = sink.getOutput();
outputCollector.collect(null, tuple);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/**
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package parquet.cascading;

import cascading.tuple.TupleEntry;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import parquet.hadoop.api.WriteSupport;
import parquet.io.api.Binary;
import parquet.io.api.RecordConsumer;
import parquet.schema.MessageType;
import parquet.schema.MessageTypeParser;
import parquet.schema.PrimitiveType;
import parquet.schema.Type;

/**
*
*
* @author Mickaël Lacour <[email protected]>
*/
public class TupleWriteSupport extends WriteSupport<TupleEntry> {

private RecordConsumer recordConsumer;
private MessageType rootSchema;
public static final String PARQUET_CASCADING_SCHEMA = "parquet.cascading.schema";

@Override
public WriteContext init(Configuration configuration) {
String schema = configuration.get(PARQUET_CASCADING_SCHEMA);
rootSchema = MessageTypeParser.parseMessageType(schema);
return new WriteContext(rootSchema, new HashMap<String, String>());
}

@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.recordConsumer = recordConsumer;
}

@Override
public void write(TupleEntry record) {
recordConsumer.startMessage();
final List<Type> fields = rootSchema.getFields();
int i = 0;
for (Type field : fields) {
recordConsumer.startField(field.getName(), i);
if (field.isPrimitive()) {
writePrimitive(record, field.asPrimitiveType());
} else {
throw new UnsupportedOperationException("Complex type not implemented");
}
recordConsumer.endField(field.getName(), i);
++i;
}
recordConsumer.endMessage();
}

private void writePrimitive(TupleEntry record, PrimitiveType field) {
if (record == null) {
return;
}

switch (field.getPrimitiveTypeName()) {
case BINARY:
recordConsumer.addBinary(Binary.fromString(record.getString(field.getName())));
break;
case BOOLEAN:
recordConsumer.addBoolean(record.getBoolean(field.getName()));
break;
case INT32:
recordConsumer.addInteger(record.getInteger(field.getName()));
break;
case INT64:
recordConsumer.addLong(record.getLong(field.getName()));
break;
case DOUBLE:
recordConsumer.addDouble(record.getDouble(field.getName()));
break;
case FLOAT:
recordConsumer.addFloat(record.getFloat(field.getName()));
break;
case FIXED_LEN_BYTE_ARRAY:
throw new UnsupportedOperationException("Fixed len byte array type not implemented");
case INT96:
throw new UnsupportedOperationException("Int96 type not implemented");
default:
throw new UnsupportedOperationException(field.getName() + " type not implemented");
}
}
}
6 changes: 6 additions & 0 deletions parquet-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
181 changes: 181 additions & 0 deletions parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package parquet.hadoop;

import parquet.Log;

import java.util.LinkedHashMap;
import java.util.Map;

/**
* A basic implementation of an LRU cache. Besides evicting the least recently
* used entries (either based on insertion or access order), this class also
* checks for "stale" entries as entries are inserted or retrieved (note
* "staleness" is defined by the entries themselves (see
* {@link parquet.hadoop.LruCache.Value}).
*
* @param <K> The key type. Acts as the key in a {@link java.util.LinkedHashMap}
* @param <V> The value type. Must extend {@link parquet.hadoop.LruCache.Value}
* so that the "staleness" of the value can be easily determined.
*/
final class LruCache<K, V extends LruCache.Value<K, V>> {
private static final Log LOG = Log.getLog(LruCache.class);

private static final float DEFAULT_LOAD_FACTOR = 0.75f;

private final LinkedHashMap<K, V> cacheMap;

/**
* Constructs an access-order based LRU cache with {@code maxSize} entries.
* @param maxSize The maximum number of entries to store in the cache.
*/
public LruCache(final int maxSize) {
this(maxSize, DEFAULT_LOAD_FACTOR, true);
}

/**
* Constructs an LRU cache.
*
* @param maxSize The maximum number of entries to store in the cache.
* @param loadFactor Used to determine the initial capacity.
* @param accessOrder the ordering mode - {@code true} for access-order,
* {@code false} for insertion-order
*/
public LruCache(final int maxSize, final float loadFactor, final boolean accessOrder) {
int initialCapacity = Math.round(maxSize / loadFactor);
cacheMap =
new LinkedHashMap<K, V>(initialCapacity, loadFactor, accessOrder) {
@Override
public boolean removeEldestEntry(final Map.Entry<K, V> eldest) {
boolean result = size() > maxSize;
if (result) {
if (Log.DEBUG) {
LOG.debug("Removing eldest entry in cache: "
+ eldest.getKey());
}
}
return result;
}
};
}

/**
* Removes the mapping for the specified key from this cache if present.
* @param key key whose mapping is to be removed from the cache
* @return the previous value associated with key, or null if there was no
* mapping for key.
*/
public V remove(final K key) {
V oldValue = cacheMap.remove(key);
if (oldValue != null) {
if (Log.DEBUG) {
LOG.debug("Removed cache entry for '" + key + "'");
}
}
return oldValue;
}

/**
* Associates the specified value with the specified key in this cache. The
* value is only inserted if it is not null and it is considered current. If
* the cache previously contained a mapping for the key, the old value is
* replaced only if the new value is "newer" than the old one.
* @param key key with which the specified value is to be associated
* @param newValue value to be associated with the specified key
*/
public void put(final K key, final V newValue) {
if (newValue == null || !newValue.isCurrent(key)) {
if (Log.WARN) {
LOG.warn("Ignoring new cache entry for '" + key + "' because it is "
+ (newValue == null ? "null" : "not current"));
}
return;
}

V oldValue = cacheMap.get(key);
if (oldValue != null && oldValue.isNewerThan(newValue)) {
if (Log.WARN) {
LOG.warn("Ignoring new cache entry for '" + key + "' because "
+ "existing cache entry is newer");
}
return;
}

// no existing value or new value is newer than old value
oldValue = cacheMap.put(key, newValue);
if (Log.DEBUG) {
if (oldValue == null) {
LOG.debug("Added new cache entry for '" + key + "'");
} else {
LOG.debug("Overwrote existing cache entry for '" + key + "'");
}
}
}

/**
* Removes all of the mappings from this cache. The cache will be empty
* after this call returns.
*/
public void clear() {
cacheMap.clear();
}

/**
* Returns the value to which the specified key is mapped, or null if 1) the
* value is not current or 2) this cache contains no mapping for the key.
* @param key the key whose associated value is to be returned
* @return the value to which the specified key is mapped, or null if 1) the
* value is not current or 2) this cache contains no mapping for the key
*/
public V getCurrentValue(final K key) {
V value = cacheMap.get(key);
if (Log.DEBUG) {
LOG.debug("Value for '" + key + "' " + (value == null ? "not " : "")
+ "in cache");
}
if (value != null && !value.isCurrent(key)) {
// value is not current; remove it and return null
remove(key);
return null;
}

return value;
}

/**
* Returns the number of key-value mappings in this cache.
* @return the number of key-value mappings in this cache.
*/
public int size() {
return cacheMap.size();
}

/**
* {@link parquet.hadoop.LruCache} expects all values to follow this
* interface so the cache can determine 1) whether values are current (e.g.
* the referenced data has not been modified/updated in such a way that the
* value is no longer useful) and 2) whether a value is strictly "newer"
* than another value.
*
* @param <K> The key type.
* @param <V> Provides a bound for the {@link #isNewerThan(V)} method
*/
interface Value<K, V> {
/**
* Is the value still current (e.g. has the referenced data been
* modified/updated in such a way that the value is no longer useful)
* @param key the key associated with this value
* @return {@code true} the value is still current, {@code false} the value
* is no longer useful
*/
boolean isCurrent(K key);

/**
* Compares this value with the specified value to check for relative age.
* @param otherValue the value to be compared.
* @return {@code true} the value is strictly newer than the other value,
* {@code false} the value is older or just
* as new as the other value.
*/
boolean isNewerThan(V otherValue);
}

}
Loading

0 comments on commit ae77413

Please sign in to comment.