Skip to content

Commit 6a7e537

Browse files
viiryaliancheng
authored andcommitted
[SPARK-8756] [SQL] Keep cached information and avoid re-calculating footers in ParquetRelation2
JIRA: https://issues.apache.org/jira/browse/SPARK-8756 Currently, in ParquetRelation2, footers are re-read every time refresh() is called. But we can check if it is possibly changed before we do the reading because reading all footers will be expensive when there are too many partitions. This pr fixes this by keeping some cached information to check it. Author: Liang-Chi Hsieh <[email protected]> Closes #7154 from viirya/cached_footer_parquet_relation and squashes the following commits: 92e9347 [Liang-Chi Hsieh] Fix indentation. ae0ec64 [Liang-Chi Hsieh] Fix wrong assignment. c8fdfb7 [Liang-Chi Hsieh] Fix it. a52b6d1 [Liang-Chi Hsieh] For comments. c2a2420 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into cached_footer_parquet_relation fa5458f [Liang-Chi Hsieh] Use Map to cache FileStatus and do merging previously loaded schema and newly loaded one. 6ae0911 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into cached_footer_parquet_relation 21bbdec [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into cached_footer_parquet_relation 12a0ed9 [Liang-Chi Hsieh] Add check of FileStatus's modification time. 186429d [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into cached_footer_parquet_relation 0ef8caf [Liang-Chi Hsieh] Keep cached information and avoid re-calculating footers.
1 parent 8fe32b4 commit 6a7e537

File tree

1 file changed

+24
-14
lines changed

1 file changed

+24
-14
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -345,24 +345,34 @@ private[sql] class ParquetRelation2(
345345
// Schema of the whole table, including partition columns.
346346
var schema: StructType = _
347347

348+
// Cached leaves
349+
var cachedLeaves: Set[FileStatus] = null
350+
348351
/**
349352
* Refreshes `FileStatus`es, footers, partition spec, and table schema.
350353
*/
351354
def refresh(): Unit = {
352-
// Lists `FileStatus`es of all leaf nodes (files) under all base directories.
353-
val leaves = cachedLeafStatuses().filter { f =>
354-
isSummaryFile(f.getPath) ||
355-
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
356-
}.toArray
357-
358-
dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
359-
metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
360-
commonMetadataStatuses =
361-
leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
362-
363-
// If we already get the schema, don't need to re-compute it since the schema merging is
364-
// time-consuming.
365-
if (dataSchema == null) {
355+
val currentLeafStatuses = cachedLeafStatuses()
356+
357+
// Check if cachedLeafStatuses is changed or not
358+
val leafStatusesChanged = (cachedLeaves == null) ||
359+
!cachedLeaves.equals(currentLeafStatuses)
360+
361+
if (leafStatusesChanged) {
362+
cachedLeaves = currentLeafStatuses.toIterator.toSet
363+
364+
// Lists `FileStatus`es of all leaf nodes (files) under all base directories.
365+
val leaves = currentLeafStatuses.filter { f =>
366+
isSummaryFile(f.getPath) ||
367+
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
368+
}.toArray
369+
370+
dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
371+
metadataStatuses =
372+
leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
373+
commonMetadataStatuses =
374+
leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
375+
366376
dataSchema = {
367377
val dataSchema0 = maybeDataSchema
368378
.orElse(readSchema())

0 commit comments

Comments
 (0)