Skip to content

Commit 37fcda3

Browse files
Andrew Orzsxwing
authored andcommitted
[SPARK-13747][SQL] Fix concurrent query with fork-join pool
## What changes were proposed in this pull request? Fix this use case, which was already fixed in SPARK-10548 in 1.6 but was broken in master due to #9264: ``` (1 to 100).par.foreach { _ => sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count() } ``` This threw `IllegalArgumentException` consistently before this patch. For more detail, see the JIRA. ## How was this patch tested? New test in `SQLExecutionSuite`. Author: Andrew Or <[email protected]> Closes #11586 from andrewor14/fix-concurrent-sql.
1 parent dbf2a7c commit 37fcda3

File tree

2 files changed

+20
-1
lines changed

2 files changed

+20
-1
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,12 @@ class DAGScheduler(
613613
properties: Properties): Unit = {
614614
val start = System.nanoTime
615615
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
616-
Await.ready(waiter.completionFuture, atMost = Duration.Inf)
616+
// Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`,
617+
// which causes concurrent SQL executions to fail if a fork-join pool is used. Note that
618+
// due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's
619+
// safe to pass in null here. For more detail, see SPARK-13747.
620+
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
621+
waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
617622
waiter.completionFuture.value.get match {
618623
case scala.util.Success(_) =>
619624
logInfo("Job %d finished: %s, took %f s".format

sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,20 @@ class SQLExecutionSuite extends SparkFunSuite {
4949
}
5050
}
5151

52+
test("concurrent query execution with fork-join pool (SPARK-13747)") {
53+
val sc = new SparkContext("local[*]", "test")
54+
val sqlContext = new SQLContext(sc)
55+
import sqlContext.implicits._
56+
try {
57+
// Should not throw IllegalArgumentException
58+
(1 to 100).par.foreach { _ =>
59+
sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count()
60+
}
61+
} finally {
62+
sc.stop()
63+
}
64+
}
65+
5266
/**
5367
* Trigger SPARK-10548 by mocking a parent and its child thread executing queries concurrently.
5468
*/

0 commit comments

Comments
 (0)