Skip to content

Commit

Permalink
Reverted code style changes to make this PR cleaner
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Aug 27, 2014
1 parent 382baf5 commit fd351b2
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 43 deletions.
85 changes: 48 additions & 37 deletions parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {

private static final int MIN_FOOTER_CACHE_SIZE = 100;

private LruCache<FileStatusWrapper, CachedFooter> footersCache;
private LruCache<FileStatusWrapper, FootersCacheValue> footersCache;

private Class<?> readSupportClass;

Expand Down Expand Up @@ -418,15 +418,15 @@ public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
return splits;
}

private List<ParquetInputSplit> getSplitsWithCachedFooters(Configuration configuration, List<CachedFooter> cachedFooters) throws IOException {
private List<ParquetInputSplit> getSplitsWithCachedFooters(Configuration configuration, List<FootersCacheValue> cachedFooters) throws IOException {
final long maxSplitSize = configuration.getLong("mapred.max.split.size", Long.MAX_VALUE);
final long minSplitSize = Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L));
if (maxSplitSize < 0 || minSplitSize < 0) {
throw new ParquetDecodingException("maxSplitSize or minSplitSie should not be negative: maxSplitSize = " + maxSplitSize + "; minSplitSize = " + minSplitSize);
}
List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
List<Footer> footers = new ArrayList<Footer>(cachedFooters.size());
for (CachedFooter cacheFooter : cachedFooters) {
for (FootersCacheValue cacheFooter : cachedFooters) {
footers.add(cacheFooter.footer);
}
GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers, configuration.getBoolean(STRICT_TYPE_CHECKING, true));
Expand All @@ -440,15 +440,18 @@ private List<ParquetInputSplit> getSplitsWithCachedFooters(Configuration configu
long rowGroupsDropped = 0;
long totalRowGroups = 0;

for (CachedFooter cachedFooter : cachedFooters) {
for (FootersCacheValue cachedFooter : cachedFooters) {
final Path file = cachedFooter.footer.getFile();
LOG.debug(file);
FileSystem fs = file.getFileSystem(configuration);
FileStatus fileStatus = cachedFooter.getFileStatus();
ParquetMetadata parquetMetaData = cachedFooter.footer.getParquetMetadata();
List<BlockMetaData> blocks = parquetMetaData.getBlocks();

List<BlockMetaData> filteredBlocks = blocks;

totalRowGroups += blocks.size();
List<BlockMetaData> filteredBlocks = RowGroupFilter.filterRowGroups(filter, blocks, parquetMetaData.getFileMetaData().getSchema());
filteredBlocks = RowGroupFilter.filterRowGroups(filter, blocks, parquetMetaData.getFileMetaData().getSchema());
rowGroupsDropped += blocks.size() - filteredBlocks.size();

if (filteredBlocks.isEmpty()) {
Expand All @@ -465,7 +468,8 @@ private List<ParquetInputSplit> getSplitsWithCachedFooters(Configuration configu
readContext.getRequestedSchema().toString(),
readContext.getReadSupportMetadata(),
minSplitSize,
maxSplitSize));
maxSplitSize)
);
}

if (rowGroupsDropped > 0 && totalRowGroups > 0) {
Expand All @@ -485,12 +489,12 @@ private List<ParquetInputSplit> getSplitsWithCachedFooters(Configuration configu
* @throws IOException
*/
public List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers) throws IOException {
List<CachedFooter> cachedFooters = new ArrayList<CachedFooter>(footers.size());
List<FootersCacheValue> cachedFooters = new ArrayList<FootersCacheValue>(footers.size());

for (Footer footer : footers) {
FileSystem fs = footer.getFile().getFileSystem(configuration);
FileStatus status = fs.getFileStatus(footer.getFile());
cachedFooters.add(new CachedFooter(new FileStatusWrapper(status), footer));
cachedFooters.add(new FootersCacheValue(new FileStatusWrapper(status), footer));
}

return getSplitsWithCachedFooters(configuration, cachedFooters);
Expand All @@ -503,7 +507,7 @@ public List<ParquetInputSplit> getSplits(Configuration configuration, List<Foote
@Override
protected List<FileStatus> listStatus(JobContext jobContext) throws IOException {
return getAllFileRecursively(super.listStatus(jobContext),
ContextUtil.getConfiguration(jobContext));
ContextUtil.getConfiguration(jobContext));
}

private static List<FileStatus> getAllFileRecursively(
Expand Down Expand Up @@ -541,25 +545,45 @@ public boolean accept(Path p){
}
};

private List<CachedFooter> getCachedFooters(JobContext jobContext) throws IOException {
/**
* @param jobContext the current job context
* @return the footers for the files
* @throws IOException
*/
public List<Footer> getFooters(JobContext jobContext) throws IOException {
List<FootersCacheValue> cacheFooters= getCachedFooters(jobContext);
List<Footer> footers = new ArrayList<Footer>(cacheFooters.size());

for (FootersCacheValue cachedFooter: cacheFooters) {
footers.add(cachedFooter.footer);
}

return footers;
}

private List<FootersCacheValue> getCachedFooters(JobContext jobContext) throws IOException {
List<FileStatus> statuses = listStatus(jobContext);
if (statuses.isEmpty()) {
return Collections.emptyList();
}

Configuration config = ContextUtil.getConfiguration(jobContext);
List<CachedFooter> footers = new ArrayList<CachedFooter>(statuses.size());
List<FootersCacheValue> footers = new ArrayList<FootersCacheValue>(statuses.size());
Set<FileStatus> missingStatuses = new HashSet<FileStatus>();
Map<Path, FileStatusWrapper> missingStatusesMap = new HashMap<Path, FileStatusWrapper>(missingStatuses.size());
Map<Path, FileStatusWrapper> missingStatusesMap =
new HashMap<Path, FileStatusWrapper>(missingStatuses.size());

if (footersCache == null) {
footersCache = new LruCache<FileStatusWrapper, CachedFooter>(Math.max(statuses.size(), MIN_FOOTER_CACHE_SIZE));
footersCache =
new LruCache<FileStatusWrapper, FootersCacheValue>(Math.max(statuses.size(), MIN_FOOTER_CACHE_SIZE));
}
for (FileStatus status : statuses) {
FileStatusWrapper statusWrapper = new FileStatusWrapper(status);
CachedFooter cacheEntry = footersCache.getCurrentValue(statusWrapper);
FootersCacheValue cacheEntry =
footersCache.getCurrentValue(statusWrapper);
if (Log.DEBUG) {
LOG.debug("Cache entry " + (cacheEntry == null ? "not " : "") + " found for '" + status.getPath() + "'");
LOG.debug("Cache entry " + (cacheEntry == null ? "not " : "")
+ " found for '" + status.getPath() + "'");
}
if (cacheEntry != null) {
footers.add(cacheEntry);
Expand All @@ -578,37 +602,22 @@ private List<CachedFooter> getCachedFooters(JobContext jobContext) throws IOExce
return footers;
}

List<Footer> newFooters = getFooters(config, new ArrayList<FileStatus>(missingStatuses));
List<Footer> newFooters =
getFooters(config, new ArrayList<FileStatus>(missingStatuses));
for (Footer newFooter : newFooters) {
// Use the original file status objects to make sure we store a
// conservative (older) modification time (i.e. in case the files and
// footers were modified and it's not clear which version of the footers
// we have)
FileStatusWrapper fileStatus = missingStatusesMap.get(newFooter.getFile());
CachedFooter cacheEntry = new CachedFooter(fileStatus, newFooter);
FootersCacheValue cacheEntry = new FootersCacheValue(fileStatus, newFooter);
footersCache.put(fileStatus, cacheEntry);
footers.add(cacheEntry);
}

return footers;
}

/**
* @param jobContext the current job context
* @return the footers for the files
* @throws IOException
*/
public List<Footer> getFooters(JobContext jobContext) throws IOException {
List<CachedFooter> cacheFooters= getCachedFooters(jobContext);
List<Footer> footers = new ArrayList<Footer>(cacheFooters.size());

for (CachedFooter cachedFooter: cacheFooters) {
footers.add(cachedFooter.footer);
}

return footers;
}

/**
* the footers for the files
* @param configuration to connect to the file system
Expand All @@ -635,12 +644,13 @@ public GlobalMetaData getGlobalMetaData(JobContext jobContext) throws IOExceptio
* modification time associated with that footer. The modification time is
* used to determine whether the footer is still current.
*/
static final class CachedFooter implements LruCache.Value<FileStatusWrapper, CachedFooter> {
static final class FootersCacheValue
implements LruCache.Value<FileStatusWrapper, FootersCacheValue> {
private final long modificationTime;
private final Footer footer;
private final FileStatus status;

public CachedFooter(FileStatusWrapper status, Footer footer) {
public FootersCacheValue(FileStatusWrapper status, Footer footer) {
this.modificationTime = status.getModificationTime();
this.footer = new Footer(footer.getFile(), footer.getParquetMetadata());
this.status = status.status;
Expand All @@ -667,8 +677,9 @@ public FileStatus getFileStatus() {
}

@Override
public boolean isNewerThan(CachedFooter otherValue) {
return otherValue == null || modificationTime > otherValue.modificationTime;
public boolean isNewerThan(FootersCacheValue otherValue) {
return otherValue == null ||
modificationTime > otherValue.modificationTime;
}

public Path getPath() {
Expand Down
12 changes: 6 additions & 6 deletions parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public static BlockMetaData makeBlockFromStats(IntStatistics stats, long valueCo
public void testFooterCacheValueIsCurrent() throws IOException, InterruptedException {
File tempFile = getTempFile();
FileSystem fs = FileSystem.getLocal(new Configuration());
ParquetInputFormat.CachedFooter cacheValue = getDummyCacheValue(tempFile, fs);
ParquetInputFormat.FootersCacheValue cacheValue = getDummyCacheValue(tempFile, fs);

assertTrue(tempFile.setLastModified(tempFile.lastModified() + 5000));
assertFalse(cacheValue.isCurrent(new ParquetInputFormat.FileStatusWrapper(fs.getFileStatus(new Path(tempFile.getAbsolutePath())))));
Expand All @@ -350,13 +350,13 @@ public void testFooterCacheValueIsCurrent() throws IOException, InterruptedExcep
public void testFooterCacheValueIsNewer() throws IOException {
File tempFile = getTempFile();
FileSystem fs = FileSystem.getLocal(new Configuration());
ParquetInputFormat.CachedFooter cacheValue = getDummyCacheValue(tempFile, fs);
ParquetInputFormat.FootersCacheValue cacheValue = getDummyCacheValue(tempFile, fs);

assertTrue(cacheValue.isNewerThan(null));
assertFalse(cacheValue.isNewerThan(cacheValue));

assertTrue(tempFile.setLastModified(tempFile.lastModified() + 5000));
ParquetInputFormat.CachedFooter newerCacheValue = getDummyCacheValue(tempFile, fs);
ParquetInputFormat.FootersCacheValue newerCacheValue = getDummyCacheValue(tempFile, fs);

assertTrue(newerCacheValue.isNewerThan(cacheValue));
assertFalse(cacheValue.isNewerThan(newerCacheValue));
Expand All @@ -368,13 +368,13 @@ private File getTempFile() throws IOException {
return tempFile;
}

private ParquetInputFormat.CachedFooter getDummyCacheValue(File file, FileSystem fs) throws IOException {
private ParquetInputFormat.FootersCacheValue getDummyCacheValue(File file, FileSystem fs) throws IOException {
Path path = new Path(file.getPath());
FileStatus status = fs.getFileStatus(path);
ParquetInputFormat.FileStatusWrapper statusWrapper = new ParquetInputFormat.FileStatusWrapper(status);
ParquetMetadata mockMetadata = mock(ParquetMetadata.class);
ParquetInputFormat.CachedFooter cacheValue =
new ParquetInputFormat.CachedFooter(statusWrapper, new Footer(path, mockMetadata));
ParquetInputFormat.FootersCacheValue cacheValue =
new ParquetInputFormat.FootersCacheValue(statusWrapper, new Footer(path, mockMetadata));
assertTrue(cacheValue.isCurrent(statusWrapper));
return cacheValue;
}
Expand Down

0 comments on commit fd351b2

Please sign in to comment.