Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-4: Use LRU caching for footers in ParquetInputFormat. #2

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions parquet-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
181 changes: 181 additions & 0 deletions parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package parquet.hadoop;

import parquet.Log;

import java.util.LinkedHashMap;
import java.util.Map;

/**
* A basic implementation of an LRU cache. Besides evicting the least recently
* used entries (either based on insertion or access order), this class also
* checks for "stale" entries as entries are inserted or retrieved (note
* "staleness" is defined by the entries themselves (see
* {@link parquet.hadoop.LruCache.Value}).
*
* @param <K> The key type. Acts as the key in a {@link java.util.LinkedHashMap}
* @param <V> The value type. Must extend {@link parquet.hadoop.LruCache.Value}
* so that the "staleness" of the value can be easily determined.
*/
final class LruCache<K, V extends LruCache.Value<K, V>> {
private static final Log LOG = Log.getLog(LruCache.class);

private static final float DEFAULT_LOAD_FACTOR = 0.75f;

private final LinkedHashMap<K, V> cacheMap;

/**
* Constructs an access-order based LRU cache with {@code maxSize} entries.
* @param maxSize The maximum number of entries to store in the cache.
*/
public LruCache(final int maxSize) {
this(maxSize, DEFAULT_LOAD_FACTOR, true);
}

/**
* Constructs an LRU cache.
*
* @param maxSize The maximum number of entries to store in the cache.
* @param loadFactor Used to determine the initial capacity.
* @param accessOrder the ordering mode - {@code true} for access-order,
* {@code false} for insertion-order
*/
public LruCache(final int maxSize, final float loadFactor, final boolean accessOrder) {
int initialCapacity = Math.round(maxSize / loadFactor);
cacheMap =
new LinkedHashMap<K, V>(initialCapacity, loadFactor, accessOrder) {
@Override
public boolean removeEldestEntry(final Map.Entry<K, V> eldest) {
boolean result = size() > maxSize;
if (result) {
if (Log.DEBUG) {
LOG.debug("Removing eldest entry in cache: "
+ eldest.getKey());
}
}
return result;
}
};
}

/**
* Removes the mapping for the specified key from this cache if present.
* @param key key whose mapping is to be removed from the cache
* @return the previous value associated with key, or null if there was no
* mapping for key.
*/
public V remove(final K key) {
V oldValue = cacheMap.remove(key);
if (oldValue != null) {
if (Log.DEBUG) {
LOG.debug("Removed cache entry for '" + key + "'");
}
}
return oldValue;
}

/**
* Associates the specified value with the specified key in this cache. The
* value is only inserted if it is not null and it is considered current. If
* the cache previously contained a mapping for the key, the old value is
* replaced only if the new value is "newer" than the old one.
* @param key key with which the specified value is to be associated
* @param newValue value to be associated with the specified key
*/
public void put(final K key, final V newValue) {
if (newValue == null || !newValue.isCurrent(key)) {
if (Log.WARN) {
LOG.warn("Ignoring new cache entry for '" + key + "' because it is "
+ (newValue == null ? "null" : "not current"));
}
return;
}

V oldValue = cacheMap.get(key);
if (oldValue != null && oldValue.isNewerThan(newValue)) {
if (Log.WARN) {
LOG.warn("Ignoring new cache entry for '" + key + "' because "
+ "existing cache entry is newer");
}
return;
}

// no existing value or new value is newer than old value
oldValue = cacheMap.put(key, newValue);
if (Log.DEBUG) {
if (oldValue == null) {
LOG.debug("Added new cache entry for '" + key + "'");
} else {
LOG.debug("Overwrote existing cache entry for '" + key + "'");
}
}
}

/**
* Removes all of the mappings from this cache. The cache will be empty
* after this call returns.
*/
public void clear() {
cacheMap.clear();
}

/**
* Returns the value to which the specified key is mapped, or null if 1) the
* value is not current or 2) this cache contains no mapping for the key.
* @param key the key whose associated value is to be returned
* @return the value to which the specified key is mapped, or null if 1) the
* value is not current or 2) this cache contains no mapping for the key
*/
public V getCurrentValue(final K key) {
V value = cacheMap.get(key);
if (Log.DEBUG) {
LOG.debug("Value for '" + key + "' " + (value == null ? "not " : "")
+ "in cache");
}
if (value != null && !value.isCurrent(key)) {
// value is not current; remove it and return null
remove(key);
return null;
}

return value;
}

/**
* Returns the number of key-value mappings in this cache.
* @return the number of key-value mappings in this cache.
*/
public int size() {
return cacheMap.size();
}

/**
* {@link parquet.hadoop.LruCache} expects all values to follow this
* interface so the cache can determine 1) whether values are current (e.g.
* the referenced data has not been modified/updated in such a way that the
* value is no longer useful) and 2) whether a value is strictly "newer"
* than another value.
*
* @param <K> The key type.
* @param <V> Provides a bound for the {@link #isNewerThan(V)} method
*/
interface Value<K, V> {
/**
* Is the value still current (e.g. has the referenced data been
* modified/updated in such a way that the value is no longer useful)
* @param key the key associated with this value
* @return {@code true} the value is still current, {@code false} the value
* is no longer useful
*/
boolean isCurrent(K key);

/**
* Compares this value with the specified value to check for relative age.
* @param otherValue the value to be compared.
* @return {@code true} the value is strictly newer than the other value,
* {@code false} the value is older or just
* as new as the other value.
*/
boolean isNewerThan(V otherValue);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add javadoc to this interface


}
143 changes: 136 additions & 7 deletions parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
Expand Down Expand Up @@ -78,8 +82,11 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
*/
public static final String UNBOUND_RECORD_FILTER = "parquet.read.filter";

private static final int MIN_FOOTER_CACHE_SIZE = 100;

private LruCache<FileStatusWrapper, FootersCacheValue> footersCache;

private Class<?> readSupportClass;
private List<Footer> footers;

public static void setReadSupportClass(Job job, Class<?> readSupportClass) {
ContextUtil.getConfiguration(job).set(READ_SUPPORT_CLASS, readSupportClass.getName());
Expand Down Expand Up @@ -399,9 +406,7 @@ protected List<FileStatus> listStatus(JobContext jobContext) throws IOException
private static List<FileStatus> getAllFileRecursively(
List<FileStatus> files, Configuration conf) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
int len = files.size();
for (int i = 0; i < len; ++i) {
FileStatus file = files.get(i);
for (FileStatus file : files) {
if (file.isDir()) {
Path p = file.getPath();
FileSystem fs = p.getFileSystem(conf);
Expand Down Expand Up @@ -439,10 +444,58 @@ public boolean accept(Path p){
* @throws IOException
*/
public List<Footer> getFooters(JobContext jobContext) throws IOException {
if (footers == null) {
footers = getFooters(ContextUtil.getConfiguration(jobContext), listStatus(jobContext));
List<FileStatus> statuses = listStatus(jobContext);
if (statuses.isEmpty()) {
return Collections.emptyList();
}

Configuration config = ContextUtil.getConfiguration(jobContext);
List<Footer> footers = new ArrayList<Footer>(statuses.size());
Set<FileStatus> missingStatuses = new HashSet<FileStatus>();
Map<Path, FileStatusWrapper> missingStatusesMap =
new HashMap<Path, FileStatusWrapper>(missingStatuses.size());

if (footersCache == null) {
footersCache =
new LruCache<FileStatusWrapper, FootersCacheValue>(Math.max(statuses.size(), MIN_FOOTER_CACHE_SIZE));
}
for (FileStatus status : statuses) {
FileStatusWrapper statusWrapper = new FileStatusWrapper(status);
FootersCacheValue cacheEntry =
footersCache.getCurrentValue(statusWrapper);
if (Log.DEBUG) {
LOG.debug("Cache entry " + (cacheEntry == null ? "not " : "")
+ " found for '" + status.getPath() + "'");
}
if (cacheEntry != null) {
footers.add(cacheEntry.getFooter());
} else {
missingStatuses.add(status);
missingStatusesMap.put(status.getPath(), statusWrapper);
}
}
if (Log.DEBUG) {
LOG.debug("found " + footers.size() + " footers in cache and adding up "
+ "to " + missingStatuses.size() + " missing footers to the cache");
}


if (missingStatuses.isEmpty()) {
return footers;
}

List<Footer> newFooters =
getFooters(config, new ArrayList<FileStatus>(missingStatuses));
for (Footer newFooter : newFooters) {
// Use the original file status objects to make sure we store a
// conservative (older) modification time (i.e. in case the files and
// footers were modified and it's not clear which version of the footers
// we have)
FileStatusWrapper fileStatus = missingStatusesMap.get(newFooter.getFile());
footersCache.put(fileStatus, new FootersCacheValue(fileStatus, newFooter));
}

footers.addAll(newFooters);
return footers;
}

Expand All @@ -454,7 +507,7 @@ public List<Footer> getFooters(JobContext jobContext) throws IOException {
* @throws IOException
*/
public List<Footer> getFooters(Configuration configuration, List<FileStatus> statuses) throws IOException {
LOG.debug("reading " + statuses.size() + " files");
if (Log.DEBUG) LOG.debug("reading " + statuses.size() + " files");
return ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, statuses);
}

Expand All @@ -467,4 +520,80 @@ public GlobalMetaData getGlobalMetaData(JobContext jobContext) throws IOExceptio
return ParquetFileWriter.getGlobalMetaData(getFooters(jobContext));
}

/**
* A simple wrapper around {@link parquet.hadoop.Footer} that also includes a
* modification time associated with that footer. The modification time is
* used to determine whether the footer is still current.
*/
static final class FootersCacheValue
implements LruCache.Value<FileStatusWrapper, FootersCacheValue> {
private final long modificationTime;
private final Footer footer;

public FootersCacheValue(FileStatusWrapper status, Footer footer) {
this.modificationTime = status.getModificationTime();
this.footer = new Footer(footer.getFile(), footer.getParquetMetadata());
}

@Override
public boolean isCurrent(FileStatusWrapper key) {
long currentModTime = key.getModificationTime();
boolean isCurrent = modificationTime >= currentModTime;
if (Log.DEBUG && !isCurrent) {
LOG.debug("The cache value for '" + key + "' is not current: "
+ "cached modification time=" + modificationTime + ", "
+ "current modification time: " + currentModTime);
}
return isCurrent;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I'm not sure this isCurrent() method is a good idea.
This is going to produce a lot of requests to the namenode to get the current timestamp of files.
a lot more than is actually required to read the _metadata files.
Parquet files are immutable so I would just assume that they don't change for the lifetime of the job.
That would have unpredictable consequences anyway if they did.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely true. HAWQ appends to parquet files and writes new footers to
cover the appended data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could also be an issue, for example, with long-lived Hive sessions. Hive reuses the input format for the duration of a particular CLI session. So if the old, cached files were overwritten in HDFS (i.e. same file names but different data), then it'd be nice to have this logic in place to invalidate the entries. So that was the motivation, but I'm open to changing/removing the code if you think that's really necessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a good way to make those two use cases coexist.
In the existing code we take the state of the files when the job starts and keep it as a reference:

  • if files are appended to, it is better to have a SNAPSHOT view of the files.
  • if files are rewritten, in Hive I would expect partitions to be replaced rather than files directly. Otherwise running jobs are likely to fail. rewriting files without metadata coordination is a bad idea IMO.
    We are caching these Footers to reduce the number of files read and requests to the name node. With this patch we are going to increase a lot the number of requests to the namenode.

I would recommend the following:

  • use the result of listStatus() as a reference to invalidate your cache (List<FileStatus> statuses = listStatus(jobContext)). That way you have a snapshot view of the files at the time you're looking up the footers.
  • do not call the name node for every individual part file (no fs.getFileStatus(path);). There could be thousands of those (if not more).

}

public Footer getFooter() {
return footer;
}

@Override
public boolean isNewerThan(FootersCacheValue otherValue) {
return otherValue == null ||
modificationTime > otherValue.modificationTime;
}

public Path getPath() {
return footer.getFile();
}
}

/**
* A simple wrapper around {@link org.apache.hadoop.fs.FileStatus} with a
* meaningful "toString()" method
*/
static final class FileStatusWrapper {
private final FileStatus status;
public FileStatusWrapper(FileStatus fileStatus) {
if (fileStatus == null) {
throw new IllegalArgumentException("FileStatus object cannot be null");
}
status = fileStatus;
}

public long getModificationTime() {
return status.getModificationTime();
}

@Override
public int hashCode() {
return status.hashCode();
}

@Override
public boolean equals(Object other) {
return other instanceof FileStatusWrapper &&
status.equals(((FileStatusWrapper) other).status);
}

@Override
public String toString() {
return status.getPath().toString();
}
}

}
Loading