Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
julienledem committed Sep 3, 2014
1 parent fb11f02 commit f599259
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ private MetadataFilter() {}
public static final MetadataFilter NO_FILTER = new NoFilter();
public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
/**
* [ startOffset, endOffset (
* [ startOffset, endOffset )
* @param startOffset
* @param endOffset
* @return the filter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,6 @@ List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> foot
splits.addAll(generateTaskSideMDSplits(
fileBlockLocations,
fileStatus,
footer.getParquetMetadata().getFileMetaData(),
readContext.getRequestedSchema().toString(),
readContext.getReadSupportMetadata(),
minSplitSize,
Expand All @@ -532,7 +531,6 @@ private static int findBlockIndex(BlockLocation[] hdfsBlocksArray, long offset)
static <T> List<ParquetInputSplit> generateTaskSideMDSplits(
BlockLocation[] hdfsBlocksArray,
FileStatus fileStatus,
FileMetaData fileMetaData,
String requestedSchema,
Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException {
if (maxSplitSize < minSplitSize || maxSplitSize < 0 || minSplitSize < 0) {
Expand Down Expand Up @@ -560,12 +558,9 @@ private int compare(long x, long y) {
BlockLocation blockLocation;
final int nextBlockMin = findBlockIndex(hdfsBlocksArray, startOffset + minSplitSize);
final int nextBlockMax = findBlockIndex(hdfsBlocksArray, startOffset + maxSplitSize);
if (nextBlockMax == -1 && nextBlockMin == -1) {
// small last split
endOffset = fileStatus.getLen();
blockLocation = lastBlock;
} else if (nextBlockMax == nextBlockMin) {
if (nextBlockMax == nextBlockMin && nextBlockMax != -1) {
// no block boundary between min and max
// => use max for the size of the split
endOffset = startOffset + maxSplitSize;
blockLocation = hdfsBlocksArray[nextBlockMax];
} else if (nextBlockMin > -1) {
Expand All @@ -575,6 +570,7 @@ private int compare(long x, long y) {
endOffset = blockLocation.getOffset() + blockLocation.getLength();
} else {
// min and max after last block
// small last split
endOffset = fileStatus.getLen();
blockLocation = lastBlock;
}
Expand Down Expand Up @@ -674,7 +670,7 @@ int getRowGroupCount() {
return rowGroups.size();
}

public ParquetInputSplit getParquetInputSplit(FileStatus fileStatus, FileMetaData fileMetaData, String requestedSchema, Map<String, String> readSupportMetadata) throws IOException {
public ParquetInputSplit getParquetInputSplit(FileStatus fileStatus, String requestedSchema, Map<String, String> readSupportMetadata) throws IOException {
MessageType requested = MessageTypeParser.parseMessageType(requestedSchema);
long length = 0;

Expand Down Expand Up @@ -744,7 +740,6 @@ List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> foot
filteredBlocks,
fileBlockLocations,
fileStatus,
parquetMetaData.getFileMetaData(),
readContext.getRequestedSchema().toString(),
readContext.getReadSupportMetadata(),
minSplitSize,
Expand All @@ -767,7 +762,6 @@ List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> foot
* @param rowGroupBlocks data blocks (row groups)
* @param hdfsBlocksArray hdfs blocks
* @param fileStatus the containing file
* @param fileMetaData file level meta data
* @param requestedSchema the schema requested by the user
* @param readSupportMetadata the metadata provided by the readSupport implementation in init
* @param minSplitSize the mapred.min.split.size
Expand All @@ -779,7 +773,6 @@ static <T> List<ParquetInputSplit> generateSplits(
List<BlockMetaData> rowGroupBlocks,
BlockLocation[] hdfsBlocksArray,
FileStatus fileStatus,
FileMetaData fileMetaData,
String requestedSchema,
Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException {

Expand All @@ -789,7 +782,7 @@ static <T> List<ParquetInputSplit> generateSplits(
//generate splits from rowGroups of each split
List<ParquetInputSplit> resultSplits = new ArrayList<ParquetInputSplit>();
for (SplitInfo splitInfo : splitRowGroups) {
ParquetInputSplit split = splitInfo.getParquetInputSplit(fileStatus, fileMetaData, requestedSchema, readSupportMetadata);
ParquetInputSplit split = splitInfo.getParquetInputSplit(fileStatus, requestedSchema, readSupportMetadata);
resultSplits.add(split);
}
return resultSplits;
Expand Down

0 comments on commit f599259

Please sign in to comment.