diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala index b116edb7df7c..7c3b92bf9094 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql -import java.util.concurrent.ForkJoinPool +import java.util.concurrent.Executors import scala.collection.mutable import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} @@ -137,15 +137,14 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession { assert(interrupted.length == 2, s"Interrupted operations: $interrupted.") } - // TODO(SPARK-48139): Re-enable `SparkSessionE2ESuite.interrupt tag` - ignore("interrupt tag") { + test("interrupt tag") { val session = spark import session.implicits._ // global ExecutionContext has only 2 threads in Apache Spark CI // create own thread pool for four Futures used in this test val numThreads = 4 - val fpool = new ForkJoinPool(numThreads) + val fpool = Executors.newFixedThreadPool(numThreads) val executionContext = ExecutionContext.fromExecutorService(fpool) val q1 = Future { diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index ca51e61f5ed4..857f6b0cd8f7 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark -import java.util.concurrent.{Semaphore, TimeUnit} +import java.util.concurrent.{Executors, Semaphore, TimeUnit} import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer @@ -302,7 +302,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft // global ExecutionContext has only 2 threads in Apache Spark CI // create own thread pool for four Futures used in this test val numThreads = 4 - val fpool = ThreadUtils.newForkJoinPool("job-tags-test-thread-pool", numThreads) + val fpool = Executors.newFixedThreadPool(numThreads) val executionContext = ExecutionContext.fromExecutorService(fpool) try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala index e9fd07ecf18b..088abd063d4a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import java.util.concurrent.{ConcurrentHashMap, Semaphore, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, Executors, Semaphore, TimeUnit} import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.{ExecutionContext, Future} @@ -121,7 +121,7 @@ class SparkSessionJobTaggingAndCancellationSuite // global ExecutionContext has only 2 threads in Apache Spark CI // create own thread pool for four Futures used in this test val numThreads = 3 - val fpool = ThreadUtils.newForkJoinPool("job-tags-test-thread-pool", numThreads) + val fpool = Executors.newFixedThreadPool(numThreads) val executionContext = ExecutionContext.fromExecutorService(fpool) try {