Skip to content

Commit

Permalink
Add javadocs to parquet.hadoop.LruCache. Rename cache "entries" as ca…
Browse files Browse the repository at this point in the history
…che "values" to avoid confusion with java.util.Map.Entry (which contains key value pairs whereas our old "entries" really only refer to the values).
  • Loading branch information
matt-martin committed Jun 22, 2014
1 parent a363622 commit d946445
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 98 deletions.
98 changes: 82 additions & 16 deletions parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,37 @@
import java.util.LinkedHashMap;
import java.util.Map;

final class LruCache<K, V extends LruCache.Entry<V>> {
/**
* A basic implementation of an LRU cache. Besides evicting the least recently used entries (either based on insertion
* or access order), this class also checks for "stale" entries as entries are inserted or retrieved (note "staleness"
* is defined by the entries themselves (see {@link parquet.hadoop.LruCache.Value}.
*
* @param <K> The key type. Acts as the key in a {@link java.util.LinkedHashMap}
* @param <V> The value type. Must extend {@link parquet.hadoop.LruCache.Value} so that the "staleness" of the value
* can be easily determined.
*/
final class LruCache<K, V extends LruCache.Value<V>> {
private static final Log LOG = Log.getLog(LruCache.class);

private static final float DEFAULT_LOAD_FACTOR = 0.75f;

private final LinkedHashMap<K, V> cacheMap;

/**
* Constructs an access-order based LRU cache with {@code maxSize} entries.
* @param maxSize The maximum number of entries to store in the cache.
*/
public LruCache(int maxSize) {
this(maxSize, DEFAULT_LOAD_FACTOR, true);
}

/**
* Constructs an LRU cache.
*
* @param maxSize The maximum number of entries to store in the cache.
* @param loadFactor Used to determine the initial capacity.
* @param accessOrder the ordering mode - {@code true} for access-order, {@code false} for insertion-order
*/
public LruCache(final int maxSize, float loadFactor, boolean accessOrder) {
cacheMap =
new LinkedHashMap<K, V>(Math.round(maxSize / loadFactor), loadFactor, accessOrder) {
Expand All @@ -30,44 +50,67 @@ public boolean removeEldestEntry(Map.Entry<K,V> eldest) {
};
}

/**
* Removes the mapping for the specified key from this cache if present.
* @param key key whose mapping is to be removed from the cache
* @return the previous value associated with key, or null if there was no mapping for key.
*/
public V remove(K key) {
V oldEntry = cacheMap.remove(key);
if (oldEntry != null) {
V oldValue = cacheMap.remove(key);
if (oldValue != null) {
if (Log.DEBUG) LOG.debug("Removed cache entry for '" + key + "'");
}
return oldEntry;
return oldValue;
}

public void put(K key, V newEntry) {
if (newEntry == null || !newEntry.isCurrent()) {
if (Log.WARN) LOG.warn("Ignoring new cache entry for '" + key + "' because it is " + (newEntry == null ? "null" : "not current"));
/**
* Associates the specified value with the specified key in this cache. The value is only inserted if it is not null
* and it is considered current. If the cache previously contained a mapping for the key, the old value is replaced
* only if the new value is "newer" than the old one.
* @param key key with which the specified value is to be associated
* @param newValue value to be associated with the specified key
*/
public void put(K key, V newValue) {
if (newValue == null || !newValue.isCurrent()) {
if (Log.WARN) LOG.warn("Ignoring new cache entry for '" + key + "' because it is " +
(newValue == null ? "null" : "not current"));
return;
}

V oldEntry = cacheMap.get(key);
if (oldEntry != null && oldEntry.isNewerThan(newEntry)) {
V oldValue = cacheMap.get(key);
if (oldValue != null && oldValue.isNewerThan(newValue)) {
if (Log.WARN) LOG.warn("Ignoring new cache entry for '" + key + "' because existing cache entry is newer");
return;
}

// no existing entry or new entry is newer than old entry
oldEntry = cacheMap.put(key, newEntry);
// no existing value or new value is newer than old value
oldValue = cacheMap.put(key, newValue);
if (Log.DEBUG) {
if (oldEntry == null) {
if (oldValue == null) {
LOG.debug("Added new cache entry for '" + key + "'");
} else {
LOG.debug("Overwrote existing cache entry for '" + key + "'");
}
}
}

/**
* Removes all of the mappings from this cache. The cache will be empty after this call returns.
*/
public void clear() {
cacheMap.clear();
}

public V getCurrentEntry(K key) {
/**
* Returns the value to which the specified key is mapped, or null if 1) the value is not current or 2) this cache
* contains no mapping for the key.
* @param key the key whose associated value is to be returned
* @return the value to which the specified key is mapped, or null if 1) the value is not current or 2) this cache
* contains no mapping for the key
*/
public V getCurrentValue(K key) {
V value = cacheMap.get(key);
if (Log.DEBUG) LOG.debug("Entry for '" + key + "' " + (value == null ? "not " : "") + "in cache");
if (Log.DEBUG) LOG.debug("Value for '" + key + "' " + (value == null ? "not " : "") + "in cache");
if (value != null && !value.isCurrent()) {
// value is not current; remove it and return null
remove(key);
Expand All @@ -77,13 +120,36 @@ public V getCurrentEntry(K key) {
return value;
}

/**
* Returns the number of key-value mappings in this cache.
* @return the number of key-value mappings in this cache.
*/
public int size() {
return cacheMap.size();
}

interface Entry<V> {
/**
* {@link parquet.hadoop.LruCache} expects all values to follow this interface so the cache can determine
* 1) whether values are current (e.g. the referenced data has not been modified/updated in such a way that the value
* is no longer useful) and 2) whether a value is strictly "newer" than another value.
*
* @param <V>
*/
interface Value<V> {
/**
* Is the value still current (e.g. has the referenced data been modified/updated in such a way that the value
* is no longer useful)
* @return {@code true} the value is still current, {@code false} the value is no longer useful
*/
public boolean isCurrent();
public boolean isNewerThan(V otherEntry);

/**
* Compares this value with the specified value to check for relative age.
* @param otherValue the value to be compared.
* @return {@code true} the value is strictly newer than the other value, {@code false} the value is older or just
* as new as the other value.
*/
public boolean isNewerThan(V otherValue);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {

private static final int MIN_FOOTER_CACHE_SIZE = 100;

private LruCache<Path, FootersCacheEntry> footersCache;
private LruCache<Path, FootersCacheValue> footersCache;

private Class<?> readSupportClass;

Expand Down Expand Up @@ -456,7 +456,7 @@ public List<Footer> getFooters(JobContext jobContext) throws IOException {

if (footersCache != null) {
for (FileStatus status : statuses) {
FootersCacheEntry cacheEntry = footersCache.getCurrentEntry(status.getPath());
FootersCacheValue cacheEntry = footersCache.getCurrentValue(status.getPath());
if (Log.DEBUG) LOG.debug("Cache entry " + (cacheEntry == null ? "not " : "") + " found for '" + status.getPath() + "'");
if (cacheEntry != null) {
footers.add(cacheEntry.getFooter());
Expand All @@ -467,7 +467,7 @@ public List<Footer> getFooters(JobContext jobContext) throws IOException {
} else {
// initialize the cache to store all of the current statuses or a default minimum size. The sizing of this LRU
// cache was chosen to mimic prior behavior (i.e. so that performance would be at least as good as it was before)
footersCache = new LruCache<Path, FootersCacheEntry>(Math.max(statuses.size(), MIN_FOOTER_CACHE_SIZE));
footersCache = new LruCache<Path, FootersCacheValue>(Math.max(statuses.size(), MIN_FOOTER_CACHE_SIZE));
missingStatuses.addAll(statuses);
}
if (Log.DEBUG) LOG.debug("found " + footers.size() + " footers in cache and adding up to " +
Expand All @@ -487,7 +487,7 @@ public List<Footer> getFooters(JobContext jobContext) throws IOException {
// Use the original file status objects to make sure we store a conservative (older) modification time (i.e. in
// case the files and footers were modified and it's not clear which version of the footers we have)
FileStatus fileStatus = missingStatusesMap.get(newFooter.getFile());
footersCache.put(fileStatus.getPath(), new FootersCacheEntry(fileStatus, newFooter));
footersCache.put(fileStatus.getPath(), new FootersCacheValue(fileStatus, newFooter));
}

footers.addAll(newFooters);
Expand Down Expand Up @@ -515,11 +515,11 @@ public GlobalMetaData getGlobalMetaData(JobContext jobContext) throws IOExceptio
return ParquetFileWriter.getGlobalMetaData(getFooters(jobContext));
}

static final class FootersCacheEntry implements LruCache.Entry<FootersCacheEntry> {
static final class FootersCacheValue implements LruCache.Value<FootersCacheValue> {
private final long modificationTime;
private final Footer footer;

public FootersCacheEntry(FileStatus status, Footer footer) {
public FootersCacheValue(FileStatus status, Footer footer) {
this.modificationTime = status.getModificationTime();
this.footer = new Footer(footer.getFile(), footer.getParquetMetadata());
}
Expand All @@ -539,7 +539,7 @@ public boolean isCurrent() {
}
long currentModTime = currentFile.getModificationTime();
boolean isCurrent = modificationTime >= currentModTime;
if (Log.DEBUG && !isCurrent) LOG.debug("The cache entry for '" + currentFile.getPath() + "' is not current: " +
if (Log.DEBUG && !isCurrent) LOG.debug("The cache value for '" + currentFile.getPath() + "' is not current: " +
"cached modification time=" + modificationTime + ", current modification time: " + currentModTime);
return isCurrent;
}
Expand All @@ -548,8 +548,8 @@ public Footer getFooter() {
return footer;
}

public boolean isNewerThan(FootersCacheEntry entry) {
return entry == null || modificationTime > entry.modificationTime;
public boolean isNewerThan(FootersCacheValue otherValue) {
return otherValue == null || modificationTime > otherValue.modificationTime;
}

public Path getPath() {
Expand Down
34 changes: 17 additions & 17 deletions parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,39 +247,39 @@ public void testMultipleRowGroupsInABlockToAlignHDFSBlock() throws Exception {
}

@Test
public void testFooterCacheEntryIsCurrent() throws IOException, InterruptedException {
public void testFooterCacheValueIsCurrent() throws IOException, InterruptedException {
File tempFile = getTempFile();
FileSystem fs = FileSystem.getLocal(new Configuration());
ParquetInputFormat.FootersCacheEntry cacheEntry = getDummyCacheEntry(tempFile, fs);
ParquetInputFormat.FootersCacheValue cacheValue = getDummyCacheValue(tempFile, fs);

assertTrue(tempFile.setLastModified(tempFile.lastModified() + 5000));
assertFalse(cacheEntry.isCurrent());
assertFalse(cacheValue.isCurrent());
}

@Test
public void testFooterCacheEntryDeleted() throws IOException {
public void testFooterCacheValueDeleted() throws IOException {
File tempFile = getTempFile();
FileSystem fs = FileSystem.getLocal(new Configuration());
ParquetInputFormat.FootersCacheEntry cacheEntry = getDummyCacheEntry(tempFile, fs);
ParquetInputFormat.FootersCacheValue cacheValue = getDummyCacheValue(tempFile, fs);

assertTrue(tempFile.delete());
assertFalse(cacheEntry.isCurrent());
assertFalse(cacheValue.isCurrent());
}

@Test
public void testFooterCacheEntryIsNewer() throws IOException {
public void testFooterCacheValueIsNewer() throws IOException {
File tempFile = getTempFile();
FileSystem fs = FileSystem.getLocal(new Configuration());
ParquetInputFormat.FootersCacheEntry cacheEntry = getDummyCacheEntry(tempFile, fs);
ParquetInputFormat.FootersCacheValue cacheValue = getDummyCacheValue(tempFile, fs);

assertTrue(cacheEntry.isNewerThan(null));
assertFalse(cacheEntry.isNewerThan(cacheEntry));
assertTrue(cacheValue.isNewerThan(null));
assertFalse(cacheValue.isNewerThan(cacheValue));

assertTrue(tempFile.setLastModified(tempFile.lastModified() + 5000));
ParquetInputFormat.FootersCacheEntry newerCacheEntry = getDummyCacheEntry(tempFile, fs);
ParquetInputFormat.FootersCacheValue newerCacheValue = getDummyCacheValue(tempFile, fs);

assertTrue(newerCacheEntry.isNewerThan(cacheEntry));
assertFalse(cacheEntry.isNewerThan(newerCacheEntry));
assertTrue(newerCacheValue.isNewerThan(cacheValue));
assertFalse(cacheValue.isNewerThan(newerCacheValue));
}

private File getTempFile() throws IOException {
Expand All @@ -288,13 +288,13 @@ private File getTempFile() throws IOException {
return tempFile;
}

private ParquetInputFormat.FootersCacheEntry getDummyCacheEntry(File file, FileSystem fs) throws IOException {
private ParquetInputFormat.FootersCacheValue getDummyCacheValue(File file, FileSystem fs) throws IOException {
Path path = new Path(file.getPath());
FileStatus status = fs.getFileStatus(path);
ParquetMetadata mockMetadata = mock(ParquetMetadata.class);
ParquetInputFormat.FootersCacheEntry cacheEntry = new ParquetInputFormat.FootersCacheEntry(status, new Footer(path, mockMetadata));
assertTrue(cacheEntry.isCurrent());
return cacheEntry;
ParquetInputFormat.FootersCacheValue cacheValue = new ParquetInputFormat.FootersCacheValue(status, new Footer(path, mockMetadata));
assertTrue(cacheValue.isCurrent());
return cacheValue;
}

private List<ParquetInputSplit> generateSplitByMinMaxSize(long min, long max) throws IOException {
Expand Down
Loading

0 comments on commit d946445

Please sign in to comment.