Skip to content

Commit

Permalink
adress review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
julienledem committed Sep 4, 2014
1 parent 5b6bd1b commit 3d7e35a
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ public String toString() {
}
}
/**
* [ startOffset, endOffset (
* [ startOffset, endOffset )
* @author Julien Le Dem
*/
static final class RangeMetadataFilter extends MetadataFilter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,16 @@ public static List<Footer> readSummaryFile(Configuration configuration, FileStat
}

static ParquetMetadata readSummaryMetadata(Configuration configuration, Path basePath, boolean skipRowGroups) throws IOException {
Path summaryFile = new Path(basePath, PARQUET_METADATA_FILE);
Path summaryFileLight = new Path(basePath, PARQUET_COMMON_METADATA_FILE);
Path metadataFile = new Path(basePath, PARQUET_METADATA_FILE);
Path commonMetaDataFile = new Path(basePath, PARQUET_COMMON_METADATA_FILE);
FileSystem fileSystem = basePath.getFileSystem(configuration);
if (skipRowGroups && fileSystem.exists(summaryFileLight)) {
if (skipRowGroups && fileSystem.exists(commonMetaDataFile)) {
// reading the summary file that does not contain the row groups
if (Log.INFO) LOG.info("reading summary file: " + summaryFileLight);
return readFooter(configuration, summaryFileLight, filter(skipRowGroups));
} else if (fileSystem.exists(summaryFile)) {
if (Log.INFO) LOG.info("reading summary file: " + summaryFile);
return readFooter(configuration, summaryFile, filter(skipRowGroups));
if (Log.INFO) LOG.info("reading summary file: " + commonMetaDataFile);
return readFooter(configuration, commonMetaDataFile, filter(skipRowGroups));
} else if (fileSystem.exists(metadataFile)) {
if (Log.INFO) LOG.info("reading summary file: " + metadataFile);
return readFooter(configuration, metadataFile, filter(skipRowGroups));
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,13 @@ private void initializeInternalReader(ParquetInputSplit split, Configuration con
configuration, path, range(split.getStart(), split.getEnd()));
long[] rowGroupOffsets = split.getRowGroupOffsets();
List<BlockMetaData> filteredBlocks;
// if task.side.metadata is set, rowGroupOffsets is null
if (rowGroupOffsets == null) {
// then we need to apply the predicate push down filter
Filter filter = ParquetInputFormat.getFilter(configuration);
filteredBlocks = RowGroupFilter.filterRowGroups(filter, footer.getBlocks(), footer.getFileMetaData().getSchema());
} else {
// otherwise we find the row groups that were selected on the client
Set<Long> offsets = new HashSet<Long>();
for (long offset : rowGroupOffsets) {
offsets.add(offset);
Expand All @@ -158,11 +161,14 @@ private void initializeInternalReader(ParquetInputSplit split, Configuration con
filteredBlocks.add(block);
}
}
// verify we found them all
if (filteredBlocks.size() != rowGroupOffsets.length) {
long[] foundRowGroupOffsets = new long[footer.getBlocks().size()];
for (int i = 0; i < foundRowGroupOffsets.length; i++) {
foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos();
}
// this should never happen.
// provide a good error message in case there's a bug
throw new IllegalStateException(
"All the offsets listed in the split should be found in the file."
+ " expected: " + Arrays.toString(rowGroupOffsets)
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
<shade.prefix>parquet</shade.prefix>
<hadoop.version>1.1.0</hadoop.version>
<cascading.version>2.5.3</cascading.version>
<parquet.format.version>2.1.1-SNAPSHOT</parquet.format.version>
<parquet.format.version>2.2.0-rc1</parquet.format.version>
<log4j.version>1.2.17</log4j.version>
<previous.version>1.6.0rc1</previous.version>
<thrift.executable>thrift</thrift.executable>
Expand Down

0 comments on commit 3d7e35a

Please sign in to comment.