Skip to content

Commit 24836be

Browse files
committed
[SPARK-20920][SQL] ForkJoinPool pools are leaked when writing hive tables with many partitions
## What changes were proposed in this pull request? Don't leave thread pool running from AlterTableRecoverPartitionsCommand DDL command ## How was this patch tested? Existing tests. Author: Sean Owen <[email protected]> Closes #18216 from srowen/SPARK-20920. (cherry picked from commit 7b7c85e) Signed-off-by: Sean Owen <[email protected]>
1 parent dae1a98 commit 24836be

File tree

1 file changed

+13
-8
lines changed
  • sql/core/src/main/scala/org/apache/spark/sql/execution/command

1 file changed

+13
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.util.Locale
2121

2222
import scala.collection.{GenMap, GenSeq}
2323
import scala.collection.parallel.ForkJoinTaskSupport
24-
import scala.concurrent.forkjoin.ForkJoinPool
2524
import scala.util.control.NonFatal
2625

2726
import org.apache.hadoop.conf.Configuration
@@ -36,7 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
3635
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
3736
import org.apache.spark.sql.execution.datasources.PartitioningUtils
3837
import org.apache.spark.sql.types._
39-
import org.apache.spark.util.SerializableConfiguration
38+
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
4039

4140
// Note: The definition of these commands are based on the ones described in
4241
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
@@ -582,8 +581,15 @@ case class AlterTableRecoverPartitionsCommand(
582581
val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
583582
val hadoopConf = spark.sparkContext.hadoopConfiguration
584583
val pathFilter = getPathFilter(hadoopConf)
585-
val partitionSpecsAndLocs = scanPartitions(spark, fs, pathFilter, root, Map(),
586-
table.partitionColumnNames, threshold, spark.sessionState.conf.resolver)
584+
585+
val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8)
586+
val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] =
587+
try {
588+
scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold,
589+
spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq
590+
} finally {
591+
evalPool.shutdown()
592+
}
587593
val total = partitionSpecsAndLocs.length
588594
logInfo(s"Found $total partitions in $root")
589595

@@ -604,8 +610,6 @@ case class AlterTableRecoverPartitionsCommand(
604610
Seq.empty[Row]
605611
}
606612

607-
@transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
608-
609613
private def scanPartitions(
610614
spark: SparkSession,
611615
fs: FileSystem,
@@ -614,7 +618,8 @@ case class AlterTableRecoverPartitionsCommand(
614618
spec: TablePartitionSpec,
615619
partitionNames: Seq[String],
616620
threshold: Int,
617-
resolver: Resolver): GenSeq[(TablePartitionSpec, Path)] = {
621+
resolver: Resolver,
622+
evalTaskSupport: ForkJoinTaskSupport): GenSeq[(TablePartitionSpec, Path)] = {
618623
if (partitionNames.isEmpty) {
619624
return Seq(spec -> path)
620625
}
@@ -638,7 +643,7 @@ case class AlterTableRecoverPartitionsCommand(
638643
val value = ExternalCatalogUtils.unescapePathName(ps(1))
639644
if (resolver(columnName, partitionNames.head)) {
640645
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
641-
partitionNames.drop(1), threshold, resolver)
646+
partitionNames.drop(1), threshold, resolver, evalTaskSupport)
642647
} else {
643648
logWarning(
644649
s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")

0 commit comments

Comments
 (0)