From a36362271f837dfa067549ae1cc7fda627951761 Mon Sep 17 00:00:00 2001 From: Matthieu Martin Date: Thu, 19 Jun 2014 10:34:15 -0700 Subject: [PATCH 1/4] Use LRU caching for footers in ParquetInputFormat. --- parquet-hadoop/pom.xml | 6 + .../main/java/parquet/hadoop/LruCache.java | 89 +++++++++++ .../parquet/hadoop/ParquetInputFormat.java | 104 ++++++++++++- .../java/parquet/hadoop/TestInputFormat.java | 60 +++++++- .../java/parquet/hadoop/TestLruCache.java | 144 ++++++++++++++++++ 5 files changed, 393 insertions(+), 10 deletions(-) create mode 100644 parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java create mode 100644 parquet-hadoop/src/test/java/parquet/hadoop/TestLruCache.java diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index ec7b94d2de..c1b4f0770a 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -68,6 +68,12 @@ jar compile + + org.mockito + mockito-all + 1.9.5 + test + diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java b/parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java new file mode 100644 index 0000000000..92570ff3b0 --- /dev/null +++ b/parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java @@ -0,0 +1,89 @@ +package parquet.hadoop; + +import parquet.Log; + +import java.util.LinkedHashMap; +import java.util.Map; + +final class LruCache> { + private static final Log LOG = Log.getLog(LruCache.class); + + private static final float DEFAULT_LOAD_FACTOR = 0.75f; + + private final LinkedHashMap cacheMap; + + public LruCache(int maxSize) { + this(maxSize, DEFAULT_LOAD_FACTOR, true); + } + + public LruCache(final int maxSize, float loadFactor, boolean accessOrder) { + cacheMap = + new LinkedHashMap(Math.round(maxSize / loadFactor), loadFactor, accessOrder) { + @Override + public boolean removeEldestEntry(Map.Entry eldest) { + boolean result = size() > maxSize; + if (result) { + if (Log.DEBUG) LOG.debug("Removing eldest entry in cache: " + eldest.getKey()); + } + return result; + } + }; + } + + public V remove(K key) { + V oldEntry = cacheMap.remove(key); + if (oldEntry != null) { + if (Log.DEBUG) LOG.debug("Removed cache entry for '" + key + "'"); + } + return oldEntry; + } + + public void put(K key, V newEntry) { + if (newEntry == null || !newEntry.isCurrent()) { + if (Log.WARN) LOG.warn("Ignoring new cache entry for '" + key + "' because it is " + (newEntry == null ? "null" : "not current")); + return; + } + + V oldEntry = cacheMap.get(key); + if (oldEntry != null && oldEntry.isNewerThan(newEntry)) { + if (Log.WARN) LOG.warn("Ignoring new cache entry for '" + key + "' because existing cache entry is newer"); + return; + } + + // no existing entry or new entry is newer than old entry + oldEntry = cacheMap.put(key, newEntry); + if (Log.DEBUG) { + if (oldEntry == null) { + LOG.debug("Added new cache entry for '" + key + "'"); + } else { + LOG.debug("Overwrote existing cache entry for '" + key + "'"); + } + } + } + + public void clear() { + cacheMap.clear(); + } + + public V getCurrentEntry(K key) { + V value = cacheMap.get(key); + if (Log.DEBUG) LOG.debug("Entry for '" + key + "' " + (value == null ? "not " : "") + "in cache"); + if (value != null && !value.isCurrent()) { + // value is not current; remove it and return null + remove(key); + return null; + } + + return value; + } + + public int size() { + return cacheMap.size(); + } + + interface Entry { + public boolean isCurrent(); + public boolean isNewerThan(V otherEntry); + } + +} diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java index 3abb38b5d7..3f85d734ce 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java @@ -15,12 +15,17 @@ */ package parquet.hadoop; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -78,8 +83,11 @@ public class ParquetInputFormat extends FileInputFormat { */ public static final String UNBOUND_RECORD_FILTER = "parquet.read.filter"; + private static final int MIN_FOOTER_CACHE_SIZE = 100; + + private LruCache footersCache; + private Class readSupportClass; - private List