Skip to content

Commit

Permalink
Use LRU caching for footers in ParquetInputFormat.
Browse files Browse the repository at this point in the history
  • Loading branch information
matt-martin committed Jun 19, 2014
1 parent b767ac4 commit a363622
Show file tree
Hide file tree
Showing 5 changed files with 393 additions and 10 deletions.
6 changes: 6 additions & 0 deletions parquet-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
89 changes: 89 additions & 0 deletions parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package parquet.hadoop;

import parquet.Log;

import java.util.LinkedHashMap;
import java.util.Map;

final class LruCache<K, V extends LruCache.Entry<V>> {
private static final Log LOG = Log.getLog(LruCache.class);

private static final float DEFAULT_LOAD_FACTOR = 0.75f;

private final LinkedHashMap<K, V> cacheMap;

public LruCache(int maxSize) {
this(maxSize, DEFAULT_LOAD_FACTOR, true);
}

public LruCache(final int maxSize, float loadFactor, boolean accessOrder) {
cacheMap =
new LinkedHashMap<K, V>(Math.round(maxSize / loadFactor), loadFactor, accessOrder) {
@Override
public boolean removeEldestEntry(Map.Entry<K,V> eldest) {
boolean result = size() > maxSize;
if (result) {
if (Log.DEBUG) LOG.debug("Removing eldest entry in cache: " + eldest.getKey());
}
return result;
}
};
}

public V remove(K key) {
V oldEntry = cacheMap.remove(key);
if (oldEntry != null) {
if (Log.DEBUG) LOG.debug("Removed cache entry for '" + key + "'");
}
return oldEntry;
}

public void put(K key, V newEntry) {
if (newEntry == null || !newEntry.isCurrent()) {
if (Log.WARN) LOG.warn("Ignoring new cache entry for '" + key + "' because it is " + (newEntry == null ? "null" : "not current"));
return;
}

V oldEntry = cacheMap.get(key);
if (oldEntry != null && oldEntry.isNewerThan(newEntry)) {
if (Log.WARN) LOG.warn("Ignoring new cache entry for '" + key + "' because existing cache entry is newer");
return;
}

// no existing entry or new entry is newer than old entry
oldEntry = cacheMap.put(key, newEntry);
if (Log.DEBUG) {
if (oldEntry == null) {
LOG.debug("Added new cache entry for '" + key + "'");
} else {
LOG.debug("Overwrote existing cache entry for '" + key + "'");
}
}
}

public void clear() {
cacheMap.clear();
}

public V getCurrentEntry(K key) {
V value = cacheMap.get(key);
if (Log.DEBUG) LOG.debug("Entry for '" + key + "' " + (value == null ? "not " : "") + "in cache");
if (value != null && !value.isCurrent()) {
// value is not current; remove it and return null
remove(key);
return null;
}

return value;
}

public int size() {
return cacheMap.size();
}

interface Entry<V> {
public boolean isCurrent();
public boolean isNewerThan(V otherEntry);
}

}
104 changes: 97 additions & 7 deletions parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
*/
package parquet.hadoop;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
Expand Down Expand Up @@ -78,8 +83,11 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
*/
public static final String UNBOUND_RECORD_FILTER = "parquet.read.filter";

private static final int MIN_FOOTER_CACHE_SIZE = 100;

private LruCache<Path, FootersCacheEntry> footersCache;

private Class<?> readSupportClass;
private List<Footer> footers;

public static void setReadSupportClass(Job job, Class<?> readSupportClass) {
ContextUtil.getConfiguration(job).set(READ_SUPPORT_CLASS, readSupportClass.getName());
Expand Down Expand Up @@ -399,9 +407,7 @@ protected List<FileStatus> listStatus(JobContext jobContext) throws IOException
private static List<FileStatus> getAllFileRecursively(
List<FileStatus> files, Configuration conf) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
int len = files.size();
for (int i = 0; i < len; ++i) {
FileStatus file = files.get(i);
for (FileStatus file : files) {
if (file.isDir()) {
Path p = file.getPath();
FileSystem fs = p.getFileSystem(conf);
Expand Down Expand Up @@ -439,10 +445,52 @@ public boolean accept(Path p){
* @throws IOException
*/
public List<Footer> getFooters(JobContext jobContext) throws IOException {
if (footers == null) {
footers = getFooters(ContextUtil.getConfiguration(jobContext), listStatus(jobContext));
List<FileStatus> statuses = listStatus(jobContext);
if (statuses.isEmpty()) {
return Collections.emptyList();
}

Configuration config = ContextUtil.getConfiguration(jobContext);
List<Footer> footers = new ArrayList<Footer>(statuses.size());
Set<FileStatus> missingStatuses = new HashSet<FileStatus>();

if (footersCache != null) {
for (FileStatus status : statuses) {
FootersCacheEntry cacheEntry = footersCache.getCurrentEntry(status.getPath());
if (Log.DEBUG) LOG.debug("Cache entry " + (cacheEntry == null ? "not " : "") + " found for '" + status.getPath() + "'");
if (cacheEntry != null) {
footers.add(cacheEntry.getFooter());
} else {
missingStatuses.add(status);
}
}
} else {
// initialize the cache to store all of the current statuses or a default minimum size. The sizing of this LRU
// cache was chosen to mimic prior behavior (i.e. so that performance would be at least as good as it was before)
footersCache = new LruCache<Path, FootersCacheEntry>(Math.max(statuses.size(), MIN_FOOTER_CACHE_SIZE));
missingStatuses.addAll(statuses);
}
if (Log.DEBUG) LOG.debug("found " + footers.size() + " footers in cache and adding up to " +
missingStatuses.size() + " missing footers to the cache");


if (missingStatuses.isEmpty()) {
return footers;
}

List<Footer> newFooters = getFooters(config, new ArrayList<FileStatus>(missingStatuses));
Map<Path, FileStatus> missingStatusesMap = new HashMap<Path, FileStatus>(missingStatuses.size());
for (FileStatus missingStatus : missingStatuses) {
missingStatusesMap.put(missingStatus.getPath(), missingStatus);
}
for (Footer newFooter : newFooters) {
// Use the original file status objects to make sure we store a conservative (older) modification time (i.e. in
// case the files and footers were modified and it's not clear which version of the footers we have)
FileStatus fileStatus = missingStatusesMap.get(newFooter.getFile());
footersCache.put(fileStatus.getPath(), new FootersCacheEntry(fileStatus, newFooter));
}

footers.addAll(newFooters);
return footers;
}

Expand All @@ -454,7 +502,7 @@ public List<Footer> getFooters(JobContext jobContext) throws IOException {
* @throws IOException
*/
public List<Footer> getFooters(Configuration configuration, List<FileStatus> statuses) throws IOException {
LOG.debug("reading " + statuses.size() + " files");
if (Log.DEBUG) LOG.debug("reading " + statuses.size() + " files");
return ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, statuses);
}

Expand All @@ -467,4 +515,46 @@ public GlobalMetaData getGlobalMetaData(JobContext jobContext) throws IOExceptio
return ParquetFileWriter.getGlobalMetaData(getFooters(jobContext));
}

static final class FootersCacheEntry implements LruCache.Entry<FootersCacheEntry> {
private final long modificationTime;
private final Footer footer;

public FootersCacheEntry(FileStatus status, Footer footer) {
this.modificationTime = status.getModificationTime();
this.footer = new Footer(footer.getFile(), footer.getParquetMetadata());
}

public boolean isCurrent() {
FileSystem fs;
FileStatus currentFile;
Path path = footer.getFile();
try {
fs = path.getFileSystem(new Configuration());
currentFile = fs.getFileStatus(path);
} catch (FileNotFoundException e) {
if (Log.DEBUG) LOG.debug("The '" + path + "' path was not found.");
return false;
} catch (IOException e) {
throw new RuntimeException("Exception while checking '" + path + "': " + e, e);
}
long currentModTime = currentFile.getModificationTime();
boolean isCurrent = modificationTime >= currentModTime;
if (Log.DEBUG && !isCurrent) LOG.debug("The cache entry for '" + currentFile.getPath() + "' is not current: " +
"cached modification time=" + modificationTime + ", current modification time: " + currentModTime);
return isCurrent;
}

public Footer getFooter() {
return footer;
}

public boolean isNewerThan(FootersCacheEntry entry) {
return entry == null || modificationTime > entry.modificationTime;
}

public Path getPath() {
return footer.getFile();
}
}

}
60 changes: 57 additions & 3 deletions parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,36 @@
*/
package parquet.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Test;
import parquet.column.Encoding;
import parquet.column.statistics.BinaryStatistics;
import parquet.hadoop.api.ReadSupport;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.hadoop.metadata.ColumnPath;
import parquet.hadoop.metadata.CompressionCodecName;
import parquet.hadoop.metadata.FileMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.io.ParquetDecodingException;
import parquet.schema.MessageType;
import parquet.schema.MessageTypeParser;
import parquet.schema.PrimitiveType.PrimitiveTypeName;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Arrays;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;

public class TestInputFormat {

Expand Down Expand Up @@ -243,6 +246,57 @@ public void testMultipleRowGroupsInABlockToAlignHDFSBlock() throws Exception {
shouldSplitLengthBe(splits, 20, 20, 10, 20, 20, 10);
}

@Test
public void testFooterCacheEntryIsCurrent() throws IOException, InterruptedException {
File tempFile = getTempFile();
FileSystem fs = FileSystem.getLocal(new Configuration());
ParquetInputFormat.FootersCacheEntry cacheEntry = getDummyCacheEntry(tempFile, fs);

assertTrue(tempFile.setLastModified(tempFile.lastModified() + 5000));
assertFalse(cacheEntry.isCurrent());
}

@Test
public void testFooterCacheEntryDeleted() throws IOException {
File tempFile = getTempFile();
FileSystem fs = FileSystem.getLocal(new Configuration());
ParquetInputFormat.FootersCacheEntry cacheEntry = getDummyCacheEntry(tempFile, fs);

assertTrue(tempFile.delete());
assertFalse(cacheEntry.isCurrent());
}

@Test
public void testFooterCacheEntryIsNewer() throws IOException {
File tempFile = getTempFile();
FileSystem fs = FileSystem.getLocal(new Configuration());
ParquetInputFormat.FootersCacheEntry cacheEntry = getDummyCacheEntry(tempFile, fs);

assertTrue(cacheEntry.isNewerThan(null));
assertFalse(cacheEntry.isNewerThan(cacheEntry));

assertTrue(tempFile.setLastModified(tempFile.lastModified() + 5000));
ParquetInputFormat.FootersCacheEntry newerCacheEntry = getDummyCacheEntry(tempFile, fs);

assertTrue(newerCacheEntry.isNewerThan(cacheEntry));
assertFalse(cacheEntry.isNewerThan(newerCacheEntry));
}

private File getTempFile() throws IOException {
File tempFile = File.createTempFile("footer_", ".txt");
tempFile.deleteOnExit();
return tempFile;
}

private ParquetInputFormat.FootersCacheEntry getDummyCacheEntry(File file, FileSystem fs) throws IOException {
Path path = new Path(file.getPath());
FileStatus status = fs.getFileStatus(path);
ParquetMetadata mockMetadata = mock(ParquetMetadata.class);
ParquetInputFormat.FootersCacheEntry cacheEntry = new ParquetInputFormat.FootersCacheEntry(status, new Footer(path, mockMetadata));
assertTrue(cacheEntry.isCurrent());
return cacheEntry;
}

private List<ParquetInputSplit> generateSplitByMinMaxSize(long min, long max) throws IOException {
return ParquetInputFormat.generateSplits(
blocks, hdfsBlocks, fileStatus, fileMetaData, schema.toString(), new HashMap<String, String>() {{
Expand Down
Loading

0 comments on commit a363622

Please sign in to comment.