Skip to content

Commit 2f68631

Browse files
viiryacloud-fan
authored andcommitted
[SPARK-20848][SQL] Shutdown the pool after reading parquet files
## What changes were proposed in this pull request? From JIRA: On each call to spark.read.parquet, a new ForkJoinPool is created. One of the threads in the pool is kept in the WAITING state, and never stopped, which leads to unbounded growth in number of threads. We should shutdown the pool after reading parquet files. ## How was this patch tested? Added a test to ParquetFileFormatSuite. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <[email protected]> Closes #18073 from viirya/SPARK-20848. (cherry picked from commit f72ad30) Signed-off-by: Wenchen Fan <[email protected]>
1 parent 13adc0f commit 2f68631

File tree

1 file changed

+4
-1
lines changed

1 file changed

+4
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,8 @@ object ParquetFileFormat extends Logging {
496496
partFiles: Seq[FileStatus],
497497
ignoreCorruptFiles: Boolean): Seq[Footer] = {
498498
val parFiles = partFiles.par
499-
parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
499+
val pool = new ForkJoinPool(8)
500+
parFiles.tasksupport = new ForkJoinTaskSupport(pool)
500501
parFiles.flatMap { currentFile =>
501502
try {
502503
// Skips row group information since we only need the schema.
@@ -512,6 +513,8 @@ object ParquetFileFormat extends Logging {
512513
} else {
513514
throw new IOException(s"Could not read footer for file: $currentFile", e)
514515
}
516+
} finally {
517+
pool.shutdown()
515518
}
516519
}.seq
517520
}

0 commit comments

Comments
 (0)