Skip to content

Commit

Permalink
Be less aggressive about checking whether the underlying file has bee…
Browse files Browse the repository at this point in the history
…n appended to/overwritten/deleted in order to minimize the number of namenode interactions.
  • Loading branch information
matt-martin committed Jun 23, 2014
1 parent d946445 commit 250a398
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 39 deletions.
10 changes: 5 additions & 5 deletions parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* @param <V> The value type. Must extend {@link parquet.hadoop.LruCache.Value} so that the "staleness" of the value
* can be easily determined.
*/
final class LruCache<K, V extends LruCache.Value<V>> {
final class LruCache<K, V extends LruCache.Value<K, V>> {
private static final Log LOG = Log.getLog(LruCache.class);

private static final float DEFAULT_LOAD_FACTOR = 0.75f;
Expand Down Expand Up @@ -71,7 +71,7 @@ public V remove(K key) {
* @param newValue value to be associated with the specified key
*/
public void put(K key, V newValue) {
if (newValue == null || !newValue.isCurrent()) {
if (newValue == null || !newValue.isCurrent(key)) {
if (Log.WARN) LOG.warn("Ignoring new cache entry for '" + key + "' because it is " +
(newValue == null ? "null" : "not current"));
return;
Expand Down Expand Up @@ -111,7 +111,7 @@ public void clear() {
public V getCurrentValue(K key) {
V value = cacheMap.get(key);
if (Log.DEBUG) LOG.debug("Value for '" + key + "' " + (value == null ? "not " : "") + "in cache");
if (value != null && !value.isCurrent()) {
if (value != null && !value.isCurrent(key)) {
// value is not current; remove it and return null
remove(key);
return null;
Expand All @@ -135,13 +135,13 @@ public int size() {
*
* @param <V>
*/
interface Value<V> {
interface Value<K, V> {
/**
* Is the value still current (e.g. has the referenced data been modified/updated in such a way that the value
* is no longer useful)
* @return {@code true} the value is still current, {@code false} the value is no longer useful
*/
public boolean isCurrent();
public boolean isCurrent(K key);

/**
* Compares this value with the specified value to check for relative age.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {

private static final int MIN_FOOTER_CACHE_SIZE = 100;

private LruCache<Path, FootersCacheValue> footersCache;
private LruCache<FileStatus, FootersCacheValue> footersCache;

private Class<?> readSupportClass;

Expand Down Expand Up @@ -456,7 +456,7 @@ public List<Footer> getFooters(JobContext jobContext) throws IOException {

if (footersCache != null) {
for (FileStatus status : statuses) {
FootersCacheValue cacheEntry = footersCache.getCurrentValue(status.getPath());
FootersCacheValue cacheEntry = footersCache.getCurrentValue(status);
if (Log.DEBUG) LOG.debug("Cache entry " + (cacheEntry == null ? "not " : "") + " found for '" + status.getPath() + "'");
if (cacheEntry != null) {
footers.add(cacheEntry.getFooter());
Expand All @@ -467,7 +467,7 @@ public List<Footer> getFooters(JobContext jobContext) throws IOException {
} else {
// initialize the cache to store all of the current statuses or a default minimum size. The sizing of this LRU
// cache was chosen to mimic prior behavior (i.e. so that performance would be at least as good as it was before)
footersCache = new LruCache<Path, FootersCacheValue>(Math.max(statuses.size(), MIN_FOOTER_CACHE_SIZE));
footersCache = new LruCache<FileStatus, FootersCacheValue>(Math.max(statuses.size(), MIN_FOOTER_CACHE_SIZE));
missingStatuses.addAll(statuses);
}
if (Log.DEBUG) LOG.debug("found " + footers.size() + " footers in cache and adding up to " +
Expand All @@ -487,7 +487,7 @@ public List<Footer> getFooters(JobContext jobContext) throws IOException {
// Use the original file status objects to make sure we store a conservative (older) modification time (i.e. in
// case the files and footers were modified and it's not clear which version of the footers we have)
FileStatus fileStatus = missingStatusesMap.get(newFooter.getFile());
footersCache.put(fileStatus.getPath(), new FootersCacheValue(fileStatus, newFooter));
footersCache.put(fileStatus, new FootersCacheValue(fileStatus, newFooter));
}

footers.addAll(newFooters);
Expand Down Expand Up @@ -515,7 +515,7 @@ public GlobalMetaData getGlobalMetaData(JobContext jobContext) throws IOExceptio
return ParquetFileWriter.getGlobalMetaData(getFooters(jobContext));
}

static final class FootersCacheValue implements LruCache.Value<FootersCacheValue> {
static final class FootersCacheValue implements LruCache.Value<FileStatus, FootersCacheValue> {
private final long modificationTime;
private final Footer footer;

Expand All @@ -524,22 +524,10 @@ public FootersCacheValue(FileStatus status, Footer footer) {
this.footer = new Footer(footer.getFile(), footer.getParquetMetadata());
}

public boolean isCurrent() {
FileSystem fs;
FileStatus currentFile;
Path path = footer.getFile();
try {
fs = path.getFileSystem(new Configuration());
currentFile = fs.getFileStatus(path);
} catch (FileNotFoundException e) {
if (Log.DEBUG) LOG.debug("The '" + path + "' path was not found.");
return false;
} catch (IOException e) {
throw new RuntimeException("Exception while checking '" + path + "': " + e, e);
}
long currentModTime = currentFile.getModificationTime();
public boolean isCurrent(FileStatus key) {
long currentModTime = key.getModificationTime();
boolean isCurrent = modificationTime >= currentModTime;
if (Log.DEBUG && !isCurrent) LOG.debug("The cache value for '" + currentFile.getPath() + "' is not current: " +
if (Log.DEBUG && !isCurrent) LOG.debug("The cache value for '" + key.getPath() + "' is not current: " +
"cached modification time=" + modificationTime + ", current modification time: " + currentModTime);
return isCurrent;
}
Expand Down
14 changes: 2 additions & 12 deletions parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,7 @@ public void testFooterCacheValueIsCurrent() throws IOException, InterruptedExcep
ParquetInputFormat.FootersCacheValue cacheValue = getDummyCacheValue(tempFile, fs);

assertTrue(tempFile.setLastModified(tempFile.lastModified() + 5000));
assertFalse(cacheValue.isCurrent());
}

@Test
public void testFooterCacheValueDeleted() throws IOException {
File tempFile = getTempFile();
FileSystem fs = FileSystem.getLocal(new Configuration());
ParquetInputFormat.FootersCacheValue cacheValue = getDummyCacheValue(tempFile, fs);

assertTrue(tempFile.delete());
assertFalse(cacheValue.isCurrent());
assertFalse(cacheValue.isCurrent(fs.getFileStatus(new Path(tempFile.getAbsolutePath()))));
}

@Test
Expand Down Expand Up @@ -293,7 +283,7 @@ private ParquetInputFormat.FootersCacheValue getDummyCacheValue(File file, FileS
FileStatus status = fs.getFileStatus(path);
ParquetMetadata mockMetadata = mock(ParquetMetadata.class);
ParquetInputFormat.FootersCacheValue cacheValue = new ParquetInputFormat.FootersCacheValue(status, new Footer(path, mockMetadata));
assertTrue(cacheValue.isCurrent());
assertTrue(cacheValue.isCurrent(status));
return cacheValue;
}

Expand Down
4 changes: 2 additions & 2 deletions parquet-hadoop/src/test/java/parquet/hadoop/TestLruCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
public class TestLruCache {
private static final String DEFAULT_KEY = "test";

private static final class SimpleValue implements LruCache.Value<SimpleValue> {
private static final class SimpleValue implements LruCache.Value<String, SimpleValue> {
private boolean current;
private boolean newerThan;

Expand All @@ -17,7 +17,7 @@ public SimpleValue(boolean current, boolean newerThan) {
}

@Override
public boolean isCurrent() {
public boolean isCurrent(String key) {
return current;
}

Expand Down

0 comments on commit 250a398

Please sign in to comment.