-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PARQUET-4: Use LRU caching for footers in ParquetInputFormat. #2
Changes from 1 commit
a363622
d946445
250a398
99bb5a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
package parquet.hadoop; | ||
|
||
import parquet.Log; | ||
|
||
import java.util.LinkedHashMap; | ||
import java.util.Map; | ||
|
||
final class LruCache<K, V extends LruCache.Entry<V>> { | ||
private static final Log LOG = Log.getLog(LruCache.class); | ||
|
||
private static final float DEFAULT_LOAD_FACTOR = 0.75f; | ||
|
||
private final LinkedHashMap<K, V> cacheMap; | ||
|
||
public LruCache(int maxSize) { | ||
this(maxSize, DEFAULT_LOAD_FACTOR, true); | ||
} | ||
|
||
public LruCache(final int maxSize, float loadFactor, boolean accessOrder) { | ||
cacheMap = | ||
new LinkedHashMap<K, V>(Math.round(maxSize / loadFactor), loadFactor, accessOrder) { | ||
@Override | ||
public boolean removeEldestEntry(Map.Entry<K,V> eldest) { | ||
boolean result = size() > maxSize; | ||
if (result) { | ||
if (Log.DEBUG) LOG.debug("Removing eldest entry in cache: " + eldest.getKey()); | ||
} | ||
return result; | ||
} | ||
}; | ||
} | ||
|
||
public V remove(K key) { | ||
V oldEntry = cacheMap.remove(key); | ||
if (oldEntry != null) { | ||
if (Log.DEBUG) LOG.debug("Removed cache entry for '" + key + "'"); | ||
} | ||
return oldEntry; | ||
} | ||
|
||
public void put(K key, V newEntry) { | ||
if (newEntry == null || !newEntry.isCurrent()) { | ||
if (Log.WARN) LOG.warn("Ignoring new cache entry for '" + key + "' because it is " + (newEntry == null ? "null" : "not current")); | ||
return; | ||
} | ||
|
||
V oldEntry = cacheMap.get(key); | ||
if (oldEntry != null && oldEntry.isNewerThan(newEntry)) { | ||
if (Log.WARN) LOG.warn("Ignoring new cache entry for '" + key + "' because existing cache entry is newer"); | ||
return; | ||
} | ||
|
||
// no existing entry or new entry is newer than old entry | ||
oldEntry = cacheMap.put(key, newEntry); | ||
if (Log.DEBUG) { | ||
if (oldEntry == null) { | ||
LOG.debug("Added new cache entry for '" + key + "'"); | ||
} else { | ||
LOG.debug("Overwrote existing cache entry for '" + key + "'"); | ||
} | ||
} | ||
} | ||
|
||
public void clear() { | ||
cacheMap.clear(); | ||
} | ||
|
||
public V getCurrentEntry(K key) { | ||
V value = cacheMap.get(key); | ||
if (Log.DEBUG) LOG.debug("Entry for '" + key + "' " + (value == null ? "not " : "") + "in cache"); | ||
if (value != null && !value.isCurrent()) { | ||
// value is not current; remove it and return null | ||
remove(key); | ||
return null; | ||
} | ||
|
||
return value; | ||
} | ||
|
||
public int size() { | ||
return cacheMap.size(); | ||
} | ||
|
||
interface Entry<V> { | ||
public boolean isCurrent(); | ||
public boolean isNewerThan(V otherEntry); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,12 +15,17 @@ | |
*/ | ||
package parquet.hadoop; | ||
|
||
import java.io.FileNotFoundException; | ||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.fs.BlockLocation; | ||
|
@@ -78,8 +83,11 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> { | |
*/ | ||
public static final String UNBOUND_RECORD_FILTER = "parquet.read.filter"; | ||
|
||
private static final int MIN_FOOTER_CACHE_SIZE = 100; | ||
|
||
private LruCache<Path, FootersCacheEntry> footersCache; | ||
|
||
private Class<?> readSupportClass; | ||
private List<Footer> footers; | ||
|
||
public static void setReadSupportClass(Job job, Class<?> readSupportClass) { | ||
ContextUtil.getConfiguration(job).set(READ_SUPPORT_CLASS, readSupportClass.getName()); | ||
|
@@ -399,9 +407,7 @@ protected List<FileStatus> listStatus(JobContext jobContext) throws IOException | |
private static List<FileStatus> getAllFileRecursively( | ||
List<FileStatus> files, Configuration conf) throws IOException { | ||
List<FileStatus> result = new ArrayList<FileStatus>(); | ||
int len = files.size(); | ||
for (int i = 0; i < len; ++i) { | ||
FileStatus file = files.get(i); | ||
for (FileStatus file : files) { | ||
if (file.isDir()) { | ||
Path p = file.getPath(); | ||
FileSystem fs = p.getFileSystem(conf); | ||
|
@@ -439,10 +445,52 @@ public boolean accept(Path p){ | |
* @throws IOException | ||
*/ | ||
public List<Footer> getFooters(JobContext jobContext) throws IOException { | ||
if (footers == null) { | ||
footers = getFooters(ContextUtil.getConfiguration(jobContext), listStatus(jobContext)); | ||
List<FileStatus> statuses = listStatus(jobContext); | ||
if (statuses.isEmpty()) { | ||
return Collections.emptyList(); | ||
} | ||
|
||
Configuration config = ContextUtil.getConfiguration(jobContext); | ||
List<Footer> footers = new ArrayList<Footer>(statuses.size()); | ||
Set<FileStatus> missingStatuses = new HashSet<FileStatus>(); | ||
|
||
if (footersCache != null) { | ||
for (FileStatus status : statuses) { | ||
FootersCacheEntry cacheEntry = footersCache.getCurrentEntry(status.getPath()); | ||
if (Log.DEBUG) LOG.debug("Cache entry " + (cacheEntry == null ? "not " : "") + " found for '" + status.getPath() + "'"); | ||
if (cacheEntry != null) { | ||
footers.add(cacheEntry.getFooter()); | ||
} else { | ||
missingStatuses.add(status); | ||
} | ||
} | ||
} else { | ||
// initialize the cache to store all of the current statuses or a default minimum size. The sizing of this LRU | ||
// cache was chosen to mimic prior behavior (i.e. so that performance would be at least as good as it was before) | ||
footersCache = new LruCache<Path, FootersCacheEntry>(Math.max(statuses.size(), MIN_FOOTER_CACHE_SIZE)); | ||
missingStatuses.addAll(statuses); | ||
} | ||
if (Log.DEBUG) LOG.debug("found " + footers.size() + " footers in cache and adding up to " + | ||
missingStatuses.size() + " missing footers to the cache"); | ||
|
||
|
||
if (missingStatuses.isEmpty()) { | ||
return footers; | ||
} | ||
|
||
List<Footer> newFooters = getFooters(config, new ArrayList<FileStatus>(missingStatuses)); | ||
Map<Path, FileStatus> missingStatusesMap = new HashMap<Path, FileStatus>(missingStatuses.size()); | ||
for (FileStatus missingStatus : missingStatuses) { | ||
missingStatusesMap.put(missingStatus.getPath(), missingStatus); | ||
} | ||
for (Footer newFooter : newFooters) { | ||
// Use the original file status objects to make sure we store a conservative (older) modification time (i.e. in | ||
// case the files and footers were modified and it's not clear which version of the footers we have) | ||
FileStatus fileStatus = missingStatusesMap.get(newFooter.getFile()); | ||
footersCache.put(fileStatus.getPath(), new FootersCacheEntry(fileStatus, newFooter)); | ||
} | ||
|
||
footers.addAll(newFooters); | ||
return footers; | ||
} | ||
|
||
|
@@ -454,7 +502,7 @@ public List<Footer> getFooters(JobContext jobContext) throws IOException { | |
* @throws IOException | ||
*/ | ||
public List<Footer> getFooters(Configuration configuration, List<FileStatus> statuses) throws IOException { | ||
LOG.debug("reading " + statuses.size() + " files"); | ||
if (Log.DEBUG) LOG.debug("reading " + statuses.size() + " files"); | ||
return ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, statuses); | ||
} | ||
|
||
|
@@ -467,4 +515,46 @@ public GlobalMetaData getGlobalMetaData(JobContext jobContext) throws IOExceptio | |
return ParquetFileWriter.getGlobalMetaData(getFooters(jobContext)); | ||
} | ||
|
||
static final class FootersCacheEntry implements LruCache.Entry<FootersCacheEntry> { | ||
private final long modificationTime; | ||
private final Footer footer; | ||
|
||
public FootersCacheEntry(FileStatus status, Footer footer) { | ||
this.modificationTime = status.getModificationTime(); | ||
this.footer = new Footer(footer.getFile(), footer.getParquetMetadata()); | ||
} | ||
|
||
public boolean isCurrent() { | ||
FileSystem fs; | ||
FileStatus currentFile; | ||
Path path = footer.getFile(); | ||
try { | ||
fs = path.getFileSystem(new Configuration()); | ||
currentFile = fs.getFileStatus(path); | ||
} catch (FileNotFoundException e) { | ||
if (Log.DEBUG) LOG.debug("The '" + path + "' path was not found."); | ||
return false; | ||
} catch (IOException e) { | ||
throw new RuntimeException("Exception while checking '" + path + "': " + e, e); | ||
} | ||
long currentModTime = currentFile.getModificationTime(); | ||
boolean isCurrent = modificationTime >= currentModTime; | ||
if (Log.DEBUG && !isCurrent) LOG.debug("The cache entry for '" + currentFile.getPath() + "' is not current: " + | ||
"cached modification time=" + modificationTime + ", current modification time: " + currentModTime); | ||
return isCurrent; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually, I'm not sure this isCurrent() method is a good idea. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not entirely true. HAWQ appends to parquet files and writes new footers to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It could also be an issue, for example, with long-lived Hive sessions. Hive reuses the input format for the duration of a particular CLI session. So if the old, cached files were overwritten in HDFS (i.e. same file names but different data), then it'd be nice to have this logic in place to invalidate the entries. So that was the motivation, but I'm open to changing/removing the code if you think that's really necessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need a good way to make those two use cases coexist.
I would recommend the following:
|
||
} | ||
|
||
public Footer getFooter() { | ||
return footer; | ||
} | ||
|
||
public boolean isNewerThan(FootersCacheEntry entry) { | ||
return entry == null || modificationTime > entry.modificationTime; | ||
} | ||
|
||
public Path getPath() { | ||
return footer.getFile(); | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add javadoc to this interface