Skip to content

Commit f9fc3a6

Browse files
pwoodyrobert3005
authored andcommitted
Fix for many blocks per split (apache#82)
1 parent 3ceffc6 commit f9fc3a6

File tree

2 files changed

+29
-17
lines changed

2 files changed

+29
-17
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -107,18 +107,19 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
107107
// then we need to apply the predicate push down filter
108108
footer = readFooter(configuration, file, range(split.getStart(), split.getEnd()));
109109
FilterCompat.Filter filter = getFilter(configuration);
110-
this.reader = ParquetFileReader.open(configuration, file, footer);
111-
List<RowGroupFilter.FilterLevel> filterLevels =
112-
ImmutableList.of(RowGroupFilter.FilterLevel.STATISTICS);
113-
if (configuration.getBoolean(DICTIONARY_FILTERING_ENABLED, false)) {
114-
filterLevels = ImmutableList.of(RowGroupFilter.FilterLevel.STATISTICS,
115-
RowGroupFilter.FilterLevel.DICTIONARY);
110+
try (ParquetFileReader reader = ParquetFileReader.open(configuration, file, footer)) {
111+
List<RowGroupFilter.FilterLevel> filterLevels =
112+
ImmutableList.of(RowGroupFilter.FilterLevel.STATISTICS);
113+
if (configuration.getBoolean(DICTIONARY_FILTERING_ENABLED, false)) {
114+
filterLevels = ImmutableList.of(RowGroupFilter.FilterLevel.STATISTICS,
115+
RowGroupFilter.FilterLevel.DICTIONARY);
116+
}
117+
blocks = filterRowGroups(
118+
filterLevels,
119+
filter,
120+
footer.getBlocks(),
121+
reader);
116122
}
117-
blocks = filterRowGroups(
118-
filterLevels,
119-
filter,
120-
footer.getBlocks(),
121-
reader);
122123
} else {
123124
// otherwise we find the row groups that were selected on the client
124125
footer = readFooter(configuration, file, NO_FILTER);
@@ -147,22 +148,19 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
147148
+ " out of: " + Arrays.toString(foundRowGroupOffsets)
148149
+ " in range " + split.getStart() + ", " + split.getEnd());
149150
}
150-
this.reader = new ParquetFileReader(configuration, file, footer);
151151
}
152152
this.fileSchema = footer.getFileMetaData().getSchema();
153153
Map<String, String> fileMetadata = footer.getFileMetaData().getKeyValueMetaData();
154154
ReadSupport<T> readSupport = getReadSupportInstance(getReadSupportClass(configuration));
155155
ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
156156
taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
157157
this.requestedSchema = readContext.getRequestedSchema();
158-
reader.setRequestedSchema(requestedSchema);
158+
this.reader = ParquetFileReader.open(configuration, file, new ParquetMetadata(footer.getFileMetaData(), blocks));
159+
this.reader.setRequestedSchema(requestedSchema);
159160
String sparkRequestedSchemaString =
160161
configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
161162
this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
162-
for (BlockMetaData block : blocks) {
163-
this.totalRowCount += block.getRowCount();
164-
}
165-
163+
this.totalRowCount = this.reader.getRecordCount();
166164
// For test purpose.
167165
// If the predefined accumulator exists, the row group number to read will be updated
168166
// to the accumulator. So we can check if the row groups are filtered or not in test case.

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -743,6 +743,20 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
743743
assert(option.compressionCodecClassName == "UNCOMPRESSED")
744744
}
745745
}
746+
747+
test("Pruned blocks within a file split do not get used") {
748+
withSQLConf(ParquetOutputFormat.BLOCK_SIZE -> "1",
749+
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
750+
withTempPath { f =>
751+
// Create many blocks that will fit within the maxSplitSize.
752+
// Ensure that non-contiguous blocks properly get removed within the vectorized reader.
753+
val data = sparkContext.parallelize((1 to 100) ++ (100 to 200) ++ (1 to 100), 1).toDF()
754+
data.write.parquet(f.getCanonicalPath)
755+
val df = spark.read.parquet(f.getCanonicalPath)
756+
assert(df.filter("value <=> 1").count() == 2)
757+
}
758+
}
759+
}
746760
}
747761

748762
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)

0 commit comments

Comments
 (0)